Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 75454ca

Browse filesBrowse files
authored
EventManager improvements and unit tests (#72)
I've made two improvements to `EventManager`: - it now waits for the platform events websocket connection during initialization (before, if it could not connect to the websocket, it would initialize without errors, and then raise an error immediately after) - it now accepts floats as the timeout when waiting for event listeners to complete I've also written unit tests for it, since we didn't have it tested yet and this was the third time I found some bug in it, which the tests would catch if we had them.
1 parent f56ed69 commit 75454ca
Copy full SHA for 75454ca

File tree

Expand file treeCollapse file tree

6 files changed

+306
-8
lines changed
Filter options
Expand file treeCollapse file tree

6 files changed

+306
-8
lines changed

‎CHANGELOG.md

Copy file name to clipboardExpand all lines: CHANGELOG.md
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ Changelog
77
### Fixed
88

99
- fixed parsing messages from the platform events websocket when they have no event data
10+
- fixed `EventManager` not waiting for platform events websocket connection during initialization
1011

1112
### Internal changes
1213

1314
- started running unit tests in CI on Windows runners in addition to Linux
1415
- added unit tests for environment variables handling
1516
- added unit tests for the `Configuration` class
17+
- added unit tests for the `EventManager` class
1618

1719
[0.1.0](../../releases/tag/v0.1.0) - 2023-02-09
1820
-----------------------------------------------

‎docs/docs.md

Copy file name to clipboardExpand all lines: docs/docs.md
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ and stops the event manager.
349349

350350
* **exit_code** (`int`, *optional*) – The exit code with which the actor should fail (defaults to 0).
351351

352-
* **event_listeners_timeout_secs** (`int`, *optional*) – How long should the actor wait for actor event listeners to finish before exiting
352+
* **event_listeners_timeout_secs** (`float`, *optional*) – How long should the actor wait for actor event listeners to finish before exiting
353353

354354
* **Return type**
355355

‎src/apify/actor.py

Copy file name to clipboardExpand all lines: src/apify/actor.py
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ async def exit(
293293
cls,
294294
*,
295295
exit_code: int = 0,
296-
event_listeners_timeout_secs: Optional[int] = EVENT_LISTENERS_TIMEOUT_SECS,
296+
event_listeners_timeout_secs: Optional[float] = EVENT_LISTENERS_TIMEOUT_SECS,
297297
) -> None:
298298
"""Exit the actor instance.
299299
@@ -305,7 +305,7 @@ async def exit(
305305
306306
Args:
307307
exit_code (int, optional): The exit code with which the actor should fail (defaults to `0`).
308-
event_listeners_timeout_secs (int, optional): How long should the actor wait for actor event listeners to finish before exiting
308+
event_listeners_timeout_secs (float, optional): How long should the actor wait for actor event listeners to finish before exiting
309309
"""
310310
return await cls._get_default_instance().exit(
311311
exit_code=exit_code,
@@ -316,7 +316,7 @@ async def _exit_internal(
316316
self,
317317
*,
318318
exit_code: int = 0,
319-
event_listeners_timeout_secs: Optional[int] = EVENT_LISTENERS_TIMEOUT_SECS,
319+
event_listeners_timeout_secs: Optional[float] = EVENT_LISTENERS_TIMEOUT_SECS,
320320
) -> None:
321321
self._raise_if_not_initialized()
322322

‎src/apify/event_manager.py

Copy file name to clipboardExpand all lines: src/apify/event_manager.py
+12-4Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class EventManager:
2626
_send_system_info_interval_task: Optional[asyncio.Task] = None
2727
_listener_tasks: Set[asyncio.Task]
2828
_listeners_to_wrappers: Dict[ActorEventTypes, Dict[Callable, List[Callable]]]
29+
_connected_to_platform_websocket: Optional[asyncio.Future] = None
2930

3031
def __init__(self, config: Configuration) -> None:
3132
"""Crate an instance of EventManager.
@@ -51,20 +52,24 @@ async def init(self) -> None:
5152

5253
# Run tasks but don't await them
5354
if self._config.actor_events_ws_url:
55+
self._connected_to_platform_websocket = asyncio.Future()
5456
self._process_platform_messages_task = asyncio.create_task(self._process_platform_messages())
57+
is_connected = await self._connected_to_platform_websocket
58+
if not is_connected:
59+
raise RuntimeError('Error connecting to platform events websocket!')
5560
else:
5661
logger.debug('APIFY_ACTOR_EVENTS_WS_URL env var not set, no events from Apify platform will be emitted.')
5762

5863
self._initialized = True
5964

60-
async def close(self, event_listeners_timeout_secs: Optional[int] = None) -> None:
65+
async def close(self, event_listeners_timeout_secs: Optional[float] = None) -> None:
6166
"""Initialize the event manager.
6267
6368
This will stop listening for the platform events,
6469
and it will wait for all the event listeners to finish.
6570
6671
Args:
67-
event_listeners_timeout_secs (int, optional): Optional timeout after which the pending event listeners are canceled.
72+
event_listeners_timeout_secs (float, optional): Optional timeout after which the pending event listeners are canceled.
6873
"""
6974
if not self._initialized:
7075
raise RuntimeError('EventManager was not initialized!')
@@ -147,11 +152,11 @@ def emit(self, event_name: ActorEventTypes, data: Any) -> None:
147152

148153
self._event_emitter.emit(event_name, data)
149154

150-
async def wait_for_all_listeners_to_complete(self, *, timeout_secs: Optional[int] = None) -> None:
155+
async def wait_for_all_listeners_to_complete(self, *, timeout_secs: Optional[float] = None) -> None:
151156
"""Wait for all event listeners which are currently being executed to complete.
152157
153158
Args:
154-
timeout_secs (int, optional): Timeout for the wait. If the event listeners don't finish until the timeout, they will be canceled.
159+
timeout_secs (float, optional): Timeout for the wait. If the event listeners don't finish until the timeout, they will be canceled.
155160
"""
156161
async def _wait_for_listeners() -> None:
157162
results = await asyncio.gather(*self._listener_tasks, return_exceptions=True)
@@ -175,10 +180,12 @@ async def _wait_for_listeners() -> None:
175180
async def _process_platform_messages(self) -> None:
176181
# This should be called only on the platform, where we have the ACTOR_EVENTS_WS_URL configured
177182
assert self._config.actor_events_ws_url is not None
183+
assert self._connected_to_platform_websocket is not None
178184

179185
try:
180186
async with websockets.client.connect(self._config.actor_events_ws_url) as websocket:
181187
self._platform_events_websocket = websocket
188+
self._connected_to_platform_websocket.set_result(True)
182189
async for message in websocket:
183190
try:
184191
parsed_message = json.loads(message)
@@ -192,3 +199,4 @@ async def _process_platform_messages(self) -> None:
192199
logger.exception('Cannot parse actor event', extra={'message': message})
193200
except Exception:
194201
logger.exception('Error in websocket connection')
202+
self._connected_to_platform_websocket.set_result(False)

‎tests/unit/actor/test_actor_env_helpers.py

Copy file name to clipboardExpand all lines: tests/unit/actor/test_actor_env_helpers.py
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ async def test_get_env_use_env_vars(self, monkeypatch: pytest.MonkeyPatch) -> No
5151
expected_get_env[string_get_env_var] = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
5252
monkeypatch.setenv(string_env_var, expected_get_env[string_get_env_var])
5353

54+
# We need this override so that the actor doesn't fail when connecting to the platform events websocket
55+
monkeypatch.delenv(ApifyEnvVars.ACTOR_EVENTS_WS_URL)
56+
expected_get_env[ApifyEnvVars.ACTOR_EVENTS_WS_URL.name.lower()] = None
57+
5458
await Actor.init()
5559
assert expected_get_env == Actor.get_env()
5660

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.