diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 73e1be7c..b8fcdc05 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -4,7 +4,7 @@ import os import sys from contextlib import suppress -from datetime import timedelta +from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar, cast, overload from lazy_object_proxy import Proxy @@ -693,7 +693,7 @@ async def start( content_type: str | None = None, build: str | None = None, memory_mbytes: int | None = None, - timeout: timedelta | None = None, + timeout: timedelta | None | Literal['RemainingTime'] = None, wait_for_finish: int | None = None, webhooks: list[Webhook] | None = None, ) -> ActorRun: @@ -711,7 +711,8 @@ async def start( memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified in the default run configuration for the Actor. timeout: Optional timeout for the run, in seconds. By default, the run uses timeout specified in - the default run configuration for the Actor. + the default run configuration for the Actor. Using `RemainingTime` will set timeout of the other Actor + to the time remaining from this Actor timeout. wait_for_finish: The maximum number of seconds the server waits for the run to finish. By default, it is 0, the maximum value is 300. webhooks: Optional ad-hoc webhooks (https://docs.apify.com/webhooks/ad-hoc-webhooks) associated with @@ -732,18 +733,39 @@ async def start( else: serialized_webhooks = None + if timeout == 'RemainingTime': + actor_start_timeout = self._get_remaining_time() + elif timeout is None: + actor_start_timeout = None + elif isinstance(timeout, timedelta): + actor_start_timeout = timeout + else: + raise ValueError(f'Invalid timeout {timeout!r}: expected `None`, `"RemainingTime"`, or a `timedelta`.') + api_result = await client.actor(actor_id).start( run_input=run_input, content_type=content_type, build=build, memory_mbytes=memory_mbytes, - timeout_secs=int(timeout.total_seconds()) if timeout is not None else None, + timeout_secs=int(actor_start_timeout.total_seconds()) if actor_start_timeout is not None else None, wait_for_finish=wait_for_finish, webhooks=serialized_webhooks, ) return ActorRun.model_validate(api_result) + def _get_remaining_time(self) -> timedelta | None: + """Get time remaining from the actor timeout. Returns `None` if not on an Apify platform.""" + if self.is_at_home() and self.configuration.timeout_at: + return self.configuration.timeout_at - datetime.now(tz=timezone.utc) + + self.log.warning( + 'Returning `None` instead of remaining time. Using `RemainingTime` argument is only possible when the Actor' + ' is running on the Apify platform and when the timeout for the Actor run is set. ' + f'{self.is_at_home()=}, {self.configuration.timeout_at=}' + ) + return None + async def abort( self, run_id: str, @@ -787,7 +809,7 @@ async def call( content_type: str | None = None, build: str | None = None, memory_mbytes: int | None = None, - timeout: timedelta | None = None, + timeout: timedelta | None | Literal['RemainingTime'] = None, webhooks: list[Webhook] | None = None, wait: timedelta | None = None, ) -> ActorRun | None: @@ -805,7 +827,8 @@ async def call( memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified in the default run configuration for the Actor. timeout: Optional timeout for the run, in seconds. By default, the run uses timeout specified in - the default run configuration for the Actor. + the default run configuration for the Actor. Using `RemainingTime` will set timeout of the other Actor + to the time remaining from this Actor timeout. webhooks: Optional webhooks (https://docs.apify.com/webhooks) associated with the Actor run, which can be used to receive a notification, e.g. when the Actor finished or failed. If you already have a webhook set up for the Actor, you do not have to add it again here. @@ -826,12 +849,21 @@ async def call( else: serialized_webhooks = None + if timeout == 'RemainingTime': + actor_call_timeout = self._get_remaining_time() + elif timeout is None: + actor_call_timeout = None + elif isinstance(timeout, timedelta): + actor_call_timeout = timeout + else: + raise ValueError(f'Invalid timeout {timeout!r}: expected `None`, `"RemainingTime"`, or a `timedelta`.') + api_result = await client.actor(actor_id).call( run_input=run_input, content_type=content_type, build=build, memory_mbytes=memory_mbytes, - timeout_secs=int(timeout.total_seconds()) if timeout is not None else None, + timeout_secs=int(actor_call_timeout.total_seconds()) if actor_call_timeout is not None else None, webhooks=serialized_webhooks, wait_secs=int(wait.total_seconds()) if wait is not None else None, ) diff --git a/tests/integration/test_actor_call_timeouts.py b/tests/integration/test_actor_call_timeouts.py new file mode 100644 index 00000000..c60ca93a --- /dev/null +++ b/tests/integration/test_actor_call_timeouts.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING + +from apify import Actor + +if TYPE_CHECKING: + from .conftest import MakeActorFunction, RunActorFunction + + +async def test_actor_start_remaining_timeout( + make_actor: MakeActorFunction, + run_actor: RunActorFunction, +) -> None: + """Test that correct timeout is set when using `RemainingTime` value for the `timeout` argument. + + In this test, one Actor starts itself again and checks that the timeout is correctly set on the second Actor run. + Timeout should be the remaining time of the first Actor run calculated at the moment of the other Actor start.""" + + async def main() -> None: + from datetime import datetime, timezone + + async with Actor: + actor_input = (await Actor.get_input()) or {} + if actor_input.get('called_from_another_actor', False) is True: + # If this Actor run was started with a specific argument (the second Actor run), return immediately. + # Asserts checking the timeout are in the first Actor run. + return + + # Start another run of this actor with timeout set to the time remaining in this actor run + other_run_data = await Actor.call( + actor_id=Actor.configuration.actor_id or '', + run_input={'called_from_another_actor': True}, + timeout='RemainingTime', + ) + assert other_run_data is not None + try: + # To make sure that the actor is started + await asyncio.sleep(5) + assert other_run_data.options is not None + assert Actor.configuration.timeout_at is not None + assert Actor.configuration.started_at is not None + + remaining_time_after_actor_start = Actor.configuration.timeout_at - datetime.now(tz=timezone.utc) + + assert other_run_data.options.timeout > remaining_time_after_actor_start + assert other_run_data.options.timeout < Actor.configuration.timeout_at - Actor.configuration.started_at + finally: + # Make sure the other actor run is aborted + await Actor.apify_client.run(other_run_data.id).abort() + + actor = await make_actor(label='remaining-timeout', main_func=main) + run_result = await run_actor(actor) + + assert run_result.status == 'SUCCEEDED' + + +async def test_actor_call_remaining_timeout( + make_actor: MakeActorFunction, + run_actor: RunActorFunction, +) -> None: + """Test that correct timeout is set when using `RemainingTime` value for the `timeout` argument. + + In this test, one Actor starts itself again and checks that the timeout is correctly set on the second Actor run. + Timeout should be the remaining time of the first Actor run calculated at the moment of the other Actor call.""" + + async def main() -> None: + from datetime import datetime, timezone + + async with Actor: + actor_input = (await Actor.get_input()) or {} + if actor_input.get('called_from_another_actor', False) is True: + # If this Actor run was started with a specific argument (the second Actor run), return immediately. + # Asserts checking the timeout are in the first Actor run. + return + + # Start another run of this actor with timeout set to the time remaining in this actor run + other_run_data = await Actor.call( + actor_id=Actor.configuration.actor_id or '', + run_input={'called_from_another_actor': True}, + timeout='RemainingTime', + ) + + assert other_run_data is not None + try: + # To make sure that the actor is started + await asyncio.sleep(5) + + assert other_run_data.options is not None + assert Actor.configuration.timeout_at is not None + assert Actor.configuration.started_at is not None + + remaining_time_after_actor_start = Actor.configuration.timeout_at - datetime.now(tz=timezone.utc) + + assert other_run_data.options.timeout > remaining_time_after_actor_start + assert other_run_data.options.timeout < Actor.configuration.timeout_at - Actor.configuration.started_at + finally: + # Make sure the other actor run is aborted + await Actor.apify_client.run(other_run_data.id).abort() + + actor = await make_actor(label='remaining-timeout', main_func=main) + run_result = await run_actor(actor) + + assert run_result.status == 'SUCCEEDED'