Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit ba7f757

Browse filesBrowse files
Pijukatelvdusek
andauthored
feat: Add RemainingTime option for timeout argument of Actor.call and Actor.start (#473)
### Description Added convenient way to start another Actor with reasonable timeout to prevent lingering actor runs after main actor shuts down. Added test. `RemainingTime` value of the timeout argument is not a default one and has to be explicitly passed by the user. Using this value will calculate remaining time of this actor run and pass it as a timeout to the actor that is being started. ### Issues - Closes: #472 --------- Co-authored-by: Vlada Dusek <v.dusek96@gmail.com>
1 parent 2d6d303 commit ba7f757
Copy full SHA for ba7f757

File tree

Expand file treeCollapse file tree

2 files changed

+144
-7
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+144
-7
lines changed

‎src/apify/_actor.py

Copy file name to clipboardExpand all lines: src/apify/_actor.py
+39-7Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import os
55
import sys
66
from contextlib import suppress
7-
from datetime import timedelta
7+
from datetime import datetime, timedelta, timezone
88
from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar, cast, overload
99

1010
from lazy_object_proxy import Proxy
@@ -693,7 +693,7 @@ async def start(
693693
content_type: str | None = None,
694694
build: str | None = None,
695695
memory_mbytes: int | None = None,
696-
timeout: timedelta | None = None,
696+
timeout: timedelta | None | Literal['RemainingTime'] = None,
697697
wait_for_finish: int | None = None,
698698
webhooks: list[Webhook] | None = None,
699699
) -> ActorRun:
@@ -711,7 +711,8 @@ async def start(
711711
memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified
712712
in the default run configuration for the Actor.
713713
timeout: Optional timeout for the run, in seconds. By default, the run uses timeout specified in
714-
the default run configuration for the Actor.
714+
the default run configuration for the Actor. Using `RemainingTime` will set timeout of the other Actor
715+
to the time remaining from this Actor timeout.
715716
wait_for_finish: The maximum number of seconds the server waits for the run to finish. By default,
716717
it is 0, the maximum value is 300.
717718
webhooks: Optional ad-hoc webhooks (https://docs.apify.com/webhooks/ad-hoc-webhooks) associated with
@@ -732,18 +733,39 @@ async def start(
732733
else:
733734
serialized_webhooks = None
734735

736+
if timeout == 'RemainingTime':
737+
actor_start_timeout = self._get_remaining_time()
738+
elif timeout is None:
739+
actor_start_timeout = None
740+
elif isinstance(timeout, timedelta):
741+
actor_start_timeout = timeout
742+
else:
743+
raise ValueError(f'Invalid timeout {timeout!r}: expected `None`, `"RemainingTime"`, or a `timedelta`.')
744+
735745
api_result = await client.actor(actor_id).start(
736746
run_input=run_input,
737747
content_type=content_type,
738748
build=build,
739749
memory_mbytes=memory_mbytes,
740-
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
750+
timeout_secs=int(actor_start_timeout.total_seconds()) if actor_start_timeout is not None else None,
741751
wait_for_finish=wait_for_finish,
742752
webhooks=serialized_webhooks,
743753
)
744754

745755
return ActorRun.model_validate(api_result)
746756

757+
def _get_remaining_time(self) -> timedelta | None:
758+
"""Get time remaining from the actor timeout. Returns `None` if not on an Apify platform."""
759+
if self.is_at_home() and self.configuration.timeout_at:
760+
return self.configuration.timeout_at - datetime.now(tz=timezone.utc)
761+
762+
self.log.warning(
763+
'Returning `None` instead of remaining time. Using `RemainingTime` argument is only possible when the Actor'
764+
' is running on the Apify platform and when the timeout for the Actor run is set. '
765+
f'{self.is_at_home()=}, {self.configuration.timeout_at=}'
766+
)
767+
return None
768+
747769
async def abort(
748770
self,
749771
run_id: str,
@@ -787,7 +809,7 @@ async def call(
787809
content_type: str | None = None,
788810
build: str | None = None,
789811
memory_mbytes: int | None = None,
790-
timeout: timedelta | None = None,
812+
timeout: timedelta | None | Literal['RemainingTime'] = None,
791813
webhooks: list[Webhook] | None = None,
792814
wait: timedelta | None = None,
793815
) -> ActorRun | None:
@@ -805,7 +827,8 @@ async def call(
805827
memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified
806828
in the default run configuration for the Actor.
807829
timeout: Optional timeout for the run, in seconds. By default, the run uses timeout specified in
808-
the default run configuration for the Actor.
830+
the default run configuration for the Actor. Using `RemainingTime` will set timeout of the other Actor
831+
to the time remaining from this Actor timeout.
809832
webhooks: Optional webhooks (https://docs.apify.com/webhooks) associated with the Actor run, which can
810833
be used to receive a notification, e.g. when the Actor finished or failed. If you already have
811834
a webhook set up for the Actor, you do not have to add it again here.
@@ -826,12 +849,21 @@ async def call(
826849
else:
827850
serialized_webhooks = None
828851

852+
if timeout == 'RemainingTime':
853+
actor_call_timeout = self._get_remaining_time()
854+
elif timeout is None:
855+
actor_call_timeout = None
856+
elif isinstance(timeout, timedelta):
857+
actor_call_timeout = timeout
858+
else:
859+
raise ValueError(f'Invalid timeout {timeout!r}: expected `None`, `"RemainingTime"`, or a `timedelta`.')
860+
829861
api_result = await client.actor(actor_id).call(
830862
run_input=run_input,
831863
content_type=content_type,
832864
build=build,
833865
memory_mbytes=memory_mbytes,
834-
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
866+
timeout_secs=int(actor_call_timeout.total_seconds()) if actor_call_timeout is not None else None,
835867
webhooks=serialized_webhooks,
836868
wait_secs=int(wait.total_seconds()) if wait is not None else None,
837869
)
+105Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from typing import TYPE_CHECKING
5+
6+
from apify import Actor
7+
8+
if TYPE_CHECKING:
9+
from .conftest import MakeActorFunction, RunActorFunction
10+
11+
12+
async def test_actor_start_remaining_timeout(
13+
make_actor: MakeActorFunction,
14+
run_actor: RunActorFunction,
15+
) -> None:
16+
"""Test that correct timeout is set when using `RemainingTime` value for the `timeout` argument.
17+
18+
In this test, one Actor starts itself again and checks that the timeout is correctly set on the second Actor run.
19+
Timeout should be the remaining time of the first Actor run calculated at the moment of the other Actor start."""
20+
21+
async def main() -> None:
22+
from datetime import datetime, timezone
23+
24+
async with Actor:
25+
actor_input = (await Actor.get_input()) or {}
26+
if actor_input.get('called_from_another_actor', False) is True:
27+
# If this Actor run was started with a specific argument (the second Actor run), return immediately.
28+
# Asserts checking the timeout are in the first Actor run.
29+
return
30+
31+
# Start another run of this actor with timeout set to the time remaining in this actor run
32+
other_run_data = await Actor.call(
33+
actor_id=Actor.configuration.actor_id or '',
34+
run_input={'called_from_another_actor': True},
35+
timeout='RemainingTime',
36+
)
37+
assert other_run_data is not None
38+
try:
39+
# To make sure that the actor is started
40+
await asyncio.sleep(5)
41+
assert other_run_data.options is not None
42+
assert Actor.configuration.timeout_at is not None
43+
assert Actor.configuration.started_at is not None
44+
45+
remaining_time_after_actor_start = Actor.configuration.timeout_at - datetime.now(tz=timezone.utc)
46+
47+
assert other_run_data.options.timeout > remaining_time_after_actor_start
48+
assert other_run_data.options.timeout < Actor.configuration.timeout_at - Actor.configuration.started_at
49+
finally:
50+
# Make sure the other actor run is aborted
51+
await Actor.apify_client.run(other_run_data.id).abort()
52+
53+
actor = await make_actor(label='remaining-timeout', main_func=main)
54+
run_result = await run_actor(actor)
55+
56+
assert run_result.status == 'SUCCEEDED'
57+
58+
59+
async def test_actor_call_remaining_timeout(
60+
make_actor: MakeActorFunction,
61+
run_actor: RunActorFunction,
62+
) -> None:
63+
"""Test that correct timeout is set when using `RemainingTime` value for the `timeout` argument.
64+
65+
In this test, one Actor starts itself again and checks that the timeout is correctly set on the second Actor run.
66+
Timeout should be the remaining time of the first Actor run calculated at the moment of the other Actor call."""
67+
68+
async def main() -> None:
69+
from datetime import datetime, timezone
70+
71+
async with Actor:
72+
actor_input = (await Actor.get_input()) or {}
73+
if actor_input.get('called_from_another_actor', False) is True:
74+
# If this Actor run was started with a specific argument (the second Actor run), return immediately.
75+
# Asserts checking the timeout are in the first Actor run.
76+
return
77+
78+
# Start another run of this actor with timeout set to the time remaining in this actor run
79+
other_run_data = await Actor.call(
80+
actor_id=Actor.configuration.actor_id or '',
81+
run_input={'called_from_another_actor': True},
82+
timeout='RemainingTime',
83+
)
84+
85+
assert other_run_data is not None
86+
try:
87+
# To make sure that the actor is started
88+
await asyncio.sleep(5)
89+
90+
assert other_run_data.options is not None
91+
assert Actor.configuration.timeout_at is not None
92+
assert Actor.configuration.started_at is not None
93+
94+
remaining_time_after_actor_start = Actor.configuration.timeout_at - datetime.now(tz=timezone.utc)
95+
96+
assert other_run_data.options.timeout > remaining_time_after_actor_start
97+
assert other_run_data.options.timeout < Actor.configuration.timeout_at - Actor.configuration.started_at
98+
finally:
99+
# Make sure the other actor run is aborted
100+
await Actor.apify_client.run(other_run_data.id).abort()
101+
102+
actor = await make_actor(label='remaining-timeout', main_func=main)
103+
run_result = await run_actor(actor)
104+
105+
assert run_result.status == 'SUCCEEDED'

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.