@@ -26,6 +26,7 @@ class EventManager:
26
26
_send_system_info_interval_task : Optional [asyncio .Task ] = None
27
27
_listener_tasks : Set [asyncio .Task ]
28
28
_listeners_to_wrappers : Dict [ActorEventTypes , Dict [Callable , List [Callable ]]]
29
+ _connected_to_platform_websocket : Optional [asyncio .Future ] = None
29
30
30
31
def __init__ (self , config : Configuration ) -> None :
31
32
"""Crate an instance of EventManager.
@@ -51,20 +52,24 @@ async def init(self) -> None:
51
52
52
53
# Run tasks but don't await them
53
54
if self ._config .actor_events_ws_url :
55
+ self ._connected_to_platform_websocket = asyncio .Future ()
54
56
self ._process_platform_messages_task = asyncio .create_task (self ._process_platform_messages ())
57
+ is_connected = await self ._connected_to_platform_websocket
58
+ if not is_connected :
59
+ raise RuntimeError ('Error connecting to platform events websocket!' )
55
60
else :
56
61
logger .debug ('APIFY_ACTOR_EVENTS_WS_URL env var not set, no events from Apify platform will be emitted.' )
57
62
58
63
self ._initialized = True
59
64
60
- async def close (self , event_listeners_timeout_secs : Optional [int ] = None ) -> None :
65
+ async def close (self , event_listeners_timeout_secs : Optional [float ] = None ) -> None :
61
66
"""Initialize the event manager.
62
67
63
68
This will stop listening for the platform events,
64
69
and it will wait for all the event listeners to finish.
65
70
66
71
Args:
67
- event_listeners_timeout_secs (int , optional): Optional timeout after which the pending event listeners are canceled.
72
+ event_listeners_timeout_secs (float , optional): Optional timeout after which the pending event listeners are canceled.
68
73
"""
69
74
if not self ._initialized :
70
75
raise RuntimeError ('EventManager was not initialized!' )
@@ -147,11 +152,11 @@ def emit(self, event_name: ActorEventTypes, data: Any) -> None:
147
152
148
153
self ._event_emitter .emit (event_name , data )
149
154
150
- async def wait_for_all_listeners_to_complete (self , * , timeout_secs : Optional [int ] = None ) -> None :
155
+ async def wait_for_all_listeners_to_complete (self , * , timeout_secs : Optional [float ] = None ) -> None :
151
156
"""Wait for all event listeners which are currently being executed to complete.
152
157
153
158
Args:
154
- timeout_secs (int , optional): Timeout for the wait. If the event listeners don't finish until the timeout, they will be canceled.
159
+ timeout_secs (float , optional): Timeout for the wait. If the event listeners don't finish until the timeout, they will be canceled.
155
160
"""
156
161
async def _wait_for_listeners () -> None :
157
162
results = await asyncio .gather (* self ._listener_tasks , return_exceptions = True )
@@ -175,10 +180,12 @@ async def _wait_for_listeners() -> None:
175
180
async def _process_platform_messages (self ) -> None :
176
181
# This should be called only on the platform, where we have the ACTOR_EVENTS_WS_URL configured
177
182
assert self ._config .actor_events_ws_url is not None
183
+ assert self ._connected_to_platform_websocket is not None
178
184
179
185
try :
180
186
async with websockets .client .connect (self ._config .actor_events_ws_url ) as websocket :
181
187
self ._platform_events_websocket = websocket
188
+ self ._connected_to_platform_websocket .set_result (True )
182
189
async for message in websocket :
183
190
try :
184
191
parsed_message = json .loads (message )
@@ -192,3 +199,4 @@ async def _process_platform_messages(self) -> None:
192
199
logger .exception ('Cannot parse actor event' , extra = {'message' : message })
193
200
except Exception :
194
201
logger .exception ('Error in websocket connection' )
202
+ self ._connected_to_platform_websocket .set_result (False )
0 commit comments