Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

EventManager improvements and unit tests #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions 2 CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ Changelog
### Fixed

- fixed parsing messages from the platform events websocket when they have no event data
- fixed `EventManager` not waiting for platform events websocket connection during initialization

### Internal changes

- started running unit tests in CI on Windows runners in addition to Linux
- added unit tests for environment variables handling
- added unit tests for the `Configuration` class
- added unit tests for the `EventManager` class

[0.1.0](../../releases/tag/v0.1.0) - 2023-02-09
-----------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion 2 docs/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ and stops the event manager.

* **exit_code** (`int`, *optional*) – The exit code with which the actor should fail (defaults to 0).

* **event_listeners_timeout_secs** (`int`, *optional*) – How long should the actor wait for actor event listeners to finish before exiting
* **event_listeners_timeout_secs** (`float`, *optional*) – How long should the actor wait for actor event listeners to finish before exiting

* **Return type**

Expand Down
6 changes: 3 additions & 3 deletions 6 src/apify/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ async def exit(
cls,
*,
exit_code: int = 0,
event_listeners_timeout_secs: Optional[int] = EVENT_LISTENERS_TIMEOUT_SECS,
event_listeners_timeout_secs: Optional[float] = EVENT_LISTENERS_TIMEOUT_SECS,
) -> None:
"""Exit the actor instance.

Expand All @@ -305,7 +305,7 @@ async def exit(

Args:
exit_code (int, optional): The exit code with which the actor should fail (defaults to `0`).
event_listeners_timeout_secs (int, optional): How long should the actor wait for actor event listeners to finish before exiting
event_listeners_timeout_secs (float, optional): How long should the actor wait for actor event listeners to finish before exiting
"""
return await cls._get_default_instance().exit(
exit_code=exit_code,
Expand All @@ -316,7 +316,7 @@ async def _exit_internal(
self,
*,
exit_code: int = 0,
event_listeners_timeout_secs: Optional[int] = EVENT_LISTENERS_TIMEOUT_SECS,
event_listeners_timeout_secs: Optional[float] = EVENT_LISTENERS_TIMEOUT_SECS,
) -> None:
self._raise_if_not_initialized()

Expand Down
16 changes: 12 additions & 4 deletions 16 src/apify/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class EventManager:
_send_system_info_interval_task: Optional[asyncio.Task] = None
_listener_tasks: Set[asyncio.Task]
_listeners_to_wrappers: Dict[ActorEventTypes, Dict[Callable, List[Callable]]]
_connected_to_platform_websocket: Optional[asyncio.Future] = None

def __init__(self, config: Configuration) -> None:
"""Crate an instance of EventManager.
Expand All @@ -51,20 +52,24 @@ async def init(self) -> None:

# Run tasks but don't await them
if self._config.actor_events_ws_url:
self._connected_to_platform_websocket = asyncio.Future()
self._process_platform_messages_task = asyncio.create_task(self._process_platform_messages())
is_connected = await self._connected_to_platform_websocket
if not is_connected:
raise RuntimeError('Error connecting to platform events websocket!')
else:
logger.debug('APIFY_ACTOR_EVENTS_WS_URL env var not set, no events from Apify platform will be emitted.')

self._initialized = True

async def close(self, event_listeners_timeout_secs: Optional[int] = None) -> None:
async def close(self, event_listeners_timeout_secs: Optional[float] = None) -> None:
"""Initialize the event manager.

This will stop listening for the platform events,
and it will wait for all the event listeners to finish.

Args:
event_listeners_timeout_secs (int, optional): Optional timeout after which the pending event listeners are canceled.
event_listeners_timeout_secs (float, optional): Optional timeout after which the pending event listeners are canceled.
"""
if not self._initialized:
raise RuntimeError('EventManager was not initialized!')
Expand Down Expand Up @@ -147,11 +152,11 @@ def emit(self, event_name: ActorEventTypes, data: Any) -> None:

self._event_emitter.emit(event_name, data)

async def wait_for_all_listeners_to_complete(self, *, timeout_secs: Optional[int] = None) -> None:
async def wait_for_all_listeners_to_complete(self, *, timeout_secs: Optional[float] = None) -> None:
"""Wait for all event listeners which are currently being executed to complete.

Args:
timeout_secs (int, optional): Timeout for the wait. If the event listeners don't finish until the timeout, they will be canceled.
timeout_secs (float, optional): Timeout for the wait. If the event listeners don't finish until the timeout, they will be canceled.
"""
async def _wait_for_listeners() -> None:
results = await asyncio.gather(*self._listener_tasks, return_exceptions=True)
Expand All @@ -175,10 +180,12 @@ async def _wait_for_listeners() -> None:
async def _process_platform_messages(self) -> None:
# This should be called only on the platform, where we have the ACTOR_EVENTS_WS_URL configured
assert self._config.actor_events_ws_url is not None
assert self._connected_to_platform_websocket is not None

try:
async with websockets.client.connect(self._config.actor_events_ws_url) as websocket:
self._platform_events_websocket = websocket
self._connected_to_platform_websocket.set_result(True)
async for message in websocket:
try:
parsed_message = json.loads(message)
Expand All @@ -192,3 +199,4 @@ async def _process_platform_messages(self) -> None:
logger.exception('Cannot parse actor event', extra={'message': message})
except Exception:
logger.exception('Error in websocket connection')
self._connected_to_platform_websocket.set_result(False)
4 changes: 4 additions & 0 deletions 4 tests/unit/actor/test_actor_env_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ async def test_get_env_use_env_vars(self, monkeypatch: pytest.MonkeyPatch) -> No
expected_get_env[string_get_env_var] = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
monkeypatch.setenv(string_env_var, expected_get_env[string_get_env_var])

# We need this override so that the actor doesn't fail when connecting to the platform events websocket
monkeypatch.delenv(ApifyEnvVars.ACTOR_EVENTS_WS_URL)
expected_get_env[ApifyEnvVars.ACTOR_EVENTS_WS_URL.name.lower()] = None

await Actor.init()
assert expected_get_env == Actor.get_env()

Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.