Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: Add RemainingTime option for timeout argument of Actor.call and Actor.start #473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions 46 src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import sys
from contextlib import suppress
from datetime import timedelta
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar, cast, overload

from lazy_object_proxy import Proxy
Expand Down Expand Up @@ -693,7 +693,7 @@ async def start(
content_type: str | None = None,
build: str | None = None,
memory_mbytes: int | None = None,
timeout: timedelta | None = None,
timeout: timedelta | None | Literal['RemainingTime'] = None,
wait_for_finish: int | None = None,
webhooks: list[Webhook] | None = None,
) -> ActorRun:
Expand All @@ -711,7 +711,8 @@ async def start(
memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified
in the default run configuration for the Actor.
timeout: Optional timeout for the run, in seconds. By default, the run uses timeout specified in
the default run configuration for the Actor.
the default run configuration for the Actor. Using `RemainingTime` will set timeout of the other Actor
to the time remaining from this Actor timeout.
wait_for_finish: The maximum number of seconds the server waits for the run to finish. By default,
it is 0, the maximum value is 300.
webhooks: Optional ad-hoc webhooks (https://docs.apify.com/webhooks/ad-hoc-webhooks) associated with
Expand All @@ -732,18 +733,39 @@ async def start(
else:
serialized_webhooks = None

if timeout == 'RemainingTime':
actor_start_timeout = self._get_remaining_time()
elif timeout is None:
actor_start_timeout = None
elif isinstance(timeout, timedelta):
actor_start_timeout = timeout
else:
raise ValueError(f'Invalid timeout {timeout!r}: expected `None`, `"RemainingTime"`, or a `timedelta`.')

api_result = await client.actor(actor_id).start(
run_input=run_input,
content_type=content_type,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
timeout_secs=int(actor_start_timeout.total_seconds()) if actor_start_timeout is not None else None,
wait_for_finish=wait_for_finish,
webhooks=serialized_webhooks,
)

return ActorRun.model_validate(api_result)

def _get_remaining_time(self) -> timedelta | None:
"""Get time remaining from the actor timeout. Returns `None` if not on an Apify platform."""
if self.is_at_home() and self.configuration.timeout_at:
return self.configuration.timeout_at - datetime.now(tz=timezone.utc)

self.log.warning(
'Returning `None` instead of remaining time. Using `RemainingTime` argument is only possible when the Actor'
' is running on the Apify platform and when the timeout for the Actor run is set. '
f'{self.is_at_home()=}, {self.configuration.timeout_at=}'
)
return None

async def abort(
self,
run_id: str,
Expand Down Expand Up @@ -787,7 +809,7 @@ async def call(
content_type: str | None = None,
build: str | None = None,
memory_mbytes: int | None = None,
timeout: timedelta | None = None,
timeout: timedelta | None | Literal['RemainingTime'] = None,
webhooks: list[Webhook] | None = None,
wait: timedelta | None = None,
) -> ActorRun | None:
Expand All @@ -805,7 +827,8 @@ async def call(
memory_mbytes: Memory limit for the run, in megabytes. By default, the run uses a memory limit specified
in the default run configuration for the Actor.
timeout: Optional timeout for the run, in seconds. By default, the run uses timeout specified in
the default run configuration for the Actor.
the default run configuration for the Actor. Using `RemainingTime` will set timeout of the other Actor
to the time remaining from this Actor timeout.
webhooks: Optional webhooks (https://docs.apify.com/webhooks) associated with the Actor run, which can
be used to receive a notification, e.g. when the Actor finished or failed. If you already have
a webhook set up for the Actor, you do not have to add it again here.
Expand All @@ -826,12 +849,21 @@ async def call(
else:
serialized_webhooks = None

if timeout == 'RemainingTime':
actor_call_timeout = self._get_remaining_time()
elif timeout is None:
actor_call_timeout = None
elif isinstance(timeout, timedelta):
actor_call_timeout = timeout
else:
raise ValueError(f'Invalid timeout {timeout!r}: expected `None`, `"RemainingTime"`, or a `timedelta`.')

api_result = await client.actor(actor_id).call(
run_input=run_input,
content_type=content_type,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=int(timeout.total_seconds()) if timeout is not None else None,
timeout_secs=int(actor_call_timeout.total_seconds()) if actor_call_timeout is not None else None,
webhooks=serialized_webhooks,
wait_secs=int(wait.total_seconds()) if wait is not None else None,
)
Expand Down
105 changes: 105 additions & 0 deletions 105 tests/integration/test_actor_call_timeouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING

from apify import Actor

if TYPE_CHECKING:
from .conftest import MakeActorFunction, RunActorFunction


async def test_actor_start_remaining_timeout(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test that correct timeout is set when using `RemainingTime` value for the `timeout` argument.

In this test, one Actor starts itself again and checks that the timeout is correctly set on the second Actor run.
Timeout should be the remaining time of the first Actor run calculated at the moment of the other Actor start."""

async def main() -> None:
from datetime import datetime, timezone

async with Actor:
actor_input = (await Actor.get_input()) or {}
if actor_input.get('called_from_another_actor', False) is True:
# If this Actor run was started with a specific argument (the second Actor run), return immediately.
# Asserts checking the timeout are in the first Actor run.
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the Actor finishes immediately and you just check that the timeout was configured correctly? I agree that it's fine to trust the platform. Could you add a comment here that explains that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comments


# Start another run of this actor with timeout set to the time remaining in this actor run
other_run_data = await Actor.call(
actor_id=Actor.configuration.actor_id or '',
run_input={'called_from_another_actor': True},
timeout='RemainingTime',
)
assert other_run_data is not None
try:
# To make sure that the actor is started
await asyncio.sleep(5)
assert other_run_data.options is not None
assert Actor.configuration.timeout_at is not None
assert Actor.configuration.started_at is not None

remaining_time_after_actor_start = Actor.configuration.timeout_at - datetime.now(tz=timezone.utc)

assert other_run_data.options.timeout > remaining_time_after_actor_start
assert other_run_data.options.timeout < Actor.configuration.timeout_at - Actor.configuration.started_at
finally:
# Make sure the other actor run is aborted
await Actor.apify_client.run(other_run_data.id).abort()

actor = await make_actor(label='remaining-timeout', main_func=main)
run_result = await run_actor(actor)

assert run_result.status == 'SUCCEEDED'


async def test_actor_call_remaining_timeout(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test that correct timeout is set when using `RemainingTime` value for the `timeout` argument.

In this test, one Actor starts itself again and checks that the timeout is correctly set on the second Actor run.
Timeout should be the remaining time of the first Actor run calculated at the moment of the other Actor call."""

async def main() -> None:
from datetime import datetime, timezone

async with Actor:
actor_input = (await Actor.get_input()) or {}
if actor_input.get('called_from_another_actor', False) is True:
# If this Actor run was started with a specific argument (the second Actor run), return immediately.
# Asserts checking the timeout are in the first Actor run.
return

# Start another run of this actor with timeout set to the time remaining in this actor run
other_run_data = await Actor.call(
actor_id=Actor.configuration.actor_id or '',
run_input={'called_from_another_actor': True},
timeout='RemainingTime',
)

assert other_run_data is not None
try:
# To make sure that the actor is started
await asyncio.sleep(5)

assert other_run_data.options is not None
assert Actor.configuration.timeout_at is not None
assert Actor.configuration.started_at is not None

remaining_time_after_actor_start = Actor.configuration.timeout_at - datetime.now(tz=timezone.utc)

assert other_run_data.options.timeout > remaining_time_after_actor_start
assert other_run_data.options.timeout < Actor.configuration.timeout_at - Actor.configuration.started_at
finally:
# Make sure the other actor run is aborted
await Actor.apify_client.run(other_run_data.id).abort()

actor = await make_actor(label='remaining-timeout', main_func=main)
run_result = await run_actor(actor)

assert run_result.status == 'SUCCEEDED'
Morty Proxy This is a proxified and sanitized view of the page, visit original site.