From e8b163e8a254ba088f0bfdaeb99e96549e9ee812 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Thu, 8 Jan 2026 12:55:15 -0700 Subject: [PATCH 1/5] Allow TaskFailedError to be constructed from Exception (#99) * Allow TaskFailedError to be constructed from Exception * Propogate inner exceptions in FailureDetails * Allow passing exception to fail() --- durabletask/internal/helpers.py | 11 +++++++++-- durabletask/task.py | 6 ++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/durabletask/internal/helpers.py b/durabletask/internal/helpers.py index 0b1f655..0cd2d40 100644 --- a/durabletask/internal/helpers.py +++ b/durabletask/internal/helpers.py @@ -116,11 +116,18 @@ def new_sub_orchestration_failed_event(event_id: int, ex: Exception) -> pb.Histo ) -def new_failure_details(ex: Exception) -> pb.TaskFailureDetails: +def new_failure_details(ex: Exception, _visited: Optional[set[int]] = None) -> pb.TaskFailureDetails: + if _visited is None: + _visited = set() + _visited.add(id(ex)) + inner: Optional[BaseException] = ex.__cause__ or ex.__context__ + if len(_visited) > 10 or (inner and id(inner) in _visited) or not isinstance(inner, Exception): + inner = None return pb.TaskFailureDetails( errorType=type(ex).__name__, errorMessage=str(ex), - stackTrace=wrappers_pb2.StringValue(value=''.join(traceback.format_tb(ex.__traceback__))) + stackTrace=wrappers_pb2.StringValue(value=''.join(traceback.format_tb(ex.__traceback__))), + innerFailure=new_failure_details(inner, _visited) if inner else None ) diff --git a/durabletask/task.py b/durabletask/task.py index 1ae9f49..50d0d05 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -302,8 +302,10 @@ def stack_trace(self) -> Optional[str]: class TaskFailedError(Exception): """Exception type for all orchestration task failures.""" - def __init__(self, message: str, details: pb.TaskFailureDetails): + def __init__(self, message: str, details: Union[pb.TaskFailureDetails, Exception]): super().__init__(message) + if isinstance(details, Exception): + details = pbh.new_failure_details(details) self._details = FailureDetails( details.errorMessage, details.errorType, @@ -424,7 +426,7 @@ def complete(self, result: T): if self._parent is not None: self._parent.on_child_completed(self) - def fail(self, message: str, details: pb.TaskFailureDetails): + def fail(self, message: str, details: Union[Exception, pb.TaskFailureDetails]): if self._is_complete: raise ValueError('The task has already completed.') self._exception = TaskFailedError(message, details) From 1939eea4fe5c226e7ade7b6e7f40c3a98c45cc5b Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Thu, 8 Jan 2026 13:06:12 -0700 Subject: [PATCH 2/5] Increase specificity of Task return types (#100) * Increase specificity of Task return types --- durabletask/task.py | 10 +++++----- durabletask/worker.py | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/durabletask/task.py b/durabletask/task.py index 50d0d05..fa15a97 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -117,7 +117,7 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task: def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *, input: Optional[TInput] = None, retry_policy: Optional[RetryPolicy] = None, - tags: Optional[dict[str, str]] = None) -> Task[TOutput]: + tags: Optional[dict[str, str]] = None) -> CompletableTask[TOutput]: """Schedule an activity for execution. Parameters @@ -142,7 +142,7 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *, def call_entity(self, entity: EntityInstanceId, operation: str, - input: Optional[TInput] = None) -> Task: + input: Optional[TInput] = None) -> CompletableTask: """Schedule entity function for execution. Parameters @@ -182,7 +182,7 @@ def signal_entity( pass @abstractmethod - def lock_entities(self, entities: list[EntityInstanceId]) -> Task[EntityLock]: + def lock_entities(self, entities: list[EntityInstanceId]) -> CompletableTask[EntityLock]: """Creates a Task object that locks the specified entity instances. The locks will be acquired the next time the orchestrator yields. @@ -206,7 +206,7 @@ def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput input: Optional[TInput] = None, instance_id: Optional[str] = None, retry_policy: Optional[RetryPolicy] = None, - version: Optional[str] = None) -> Task[TOutput]: + version: Optional[str] = None) -> CompletableTask[TOutput]: """Schedule sub-orchestrator function for execution. Parameters @@ -231,7 +231,7 @@ def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput # TOOD: Add a timeout parameter, which allows the task to be canceled if the event is # not received within the specified timeout. This requires support for task cancellation. @abstractmethod - def wait_for_external_event(self, name: str) -> Task: + def wait_for_external_event(self, name: str) -> CompletableTask: """Wait asynchronously for an event to be raised with the name `name`. Parameters diff --git a/durabletask/worker.py b/durabletask/worker.py index 48c2e44..af29129 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1014,7 +1014,7 @@ def create_timer_internal( self, fire_at: Union[datetime, timedelta], retryable_task: Optional[task.RetryableTask] = None, - ) -> task.Task: + ) -> task.TimerTask: id = self.next_sequence_number() if isinstance(fire_at, timedelta): fire_at = self.current_utc_datetime + fire_at @@ -1034,7 +1034,7 @@ def call_activity( input: Optional[TInput] = None, retry_policy: Optional[task.RetryPolicy] = None, tags: Optional[dict[str, str]] = None, - ) -> task.Task[TOutput]: + ) -> task.CompletableTask[TOutput]: id = self.next_sequence_number() self.call_activity_function_helper( @@ -1047,7 +1047,7 @@ def call_entity( entity: EntityInstanceId, operation: str, input: Optional[TInput] = None, - ) -> task.Task: + ) -> task.CompletableTask: id = self.next_sequence_number() self.call_entity_function_helper( @@ -1068,7 +1068,7 @@ def signal_entity( id, entity_id, operation_name, input ) - def lock_entities(self, entities: list[EntityInstanceId]) -> task.Task[EntityLock]: + def lock_entities(self, entities: list[EntityInstanceId]) -> task.CompletableTask[EntityLock]: id = self.next_sequence_number() self.lock_entities_function_helper( @@ -1084,7 +1084,7 @@ def call_sub_orchestrator( instance_id: Optional[str] = None, retry_policy: Optional[task.RetryPolicy] = None, version: Optional[str] = None, - ) -> task.Task[TOutput]: + ) -> task.CompletableTask[TOutput]: id = self.next_sequence_number() if isinstance(orchestrator, str): orchestrator_name = orchestrator @@ -1229,7 +1229,7 @@ def _exit_critical_section(self) -> None: action = pb.OrchestratorAction(id=task_id, sendEntityMessage=entity_unlock_message) self._pending_actions[task_id] = action - def wait_for_external_event(self, name: str) -> task.Task: + def wait_for_external_event(self, name: str) -> task.CompletableTask: # Check to see if this event has already been received, in which case we # can return it immediately. Otherwise, record out intent to receive an # event with the given name so that we can resume the generator when it From a491469cc12813af1c5618591645fbb0ad7962db Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Mon, 12 Jan 2026 14:34:19 -0700 Subject: [PATCH 3/5] Type-hinting improvements (#104) * Type-hinting improvements --- CHANGELOG.md | 6 ++++++ durabletask/client.py | 5 ++++- durabletask/task.py | 6 +++--- durabletask/worker.py | 16 ++++++++-------- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a2c3e59..5ce3417 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +CHANGED + +- Add/update type-hinting for various worker methods + ## v1.2.0 ADDED: diff --git a/durabletask/client.py b/durabletask/client.py index 7d03758..3b4d4e3 100644 --- a/durabletask/client.py +++ b/durabletask/client.py @@ -230,7 +230,10 @@ def purge_orchestration(self, instance_id: str, recursive: bool = True): self._logger.info(f"Purging instance '{instance_id}'.") self._stub.PurgeInstances(req) - def signal_entity(self, entity_instance_id: EntityInstanceId, operation_name: str, input: Optional[Any] = None): + def signal_entity(self, + entity_instance_id: EntityInstanceId, + operation_name: str, + input: Optional[Any] = None) -> None: req = pb.SignalEntityRequest( instanceId=str(entity_instance_id), name=operation_name, diff --git a/durabletask/task.py b/durabletask/task.py index fa15a97..4f0f55b 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -142,7 +142,7 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *, def call_entity(self, entity: EntityInstanceId, operation: str, - input: Optional[TInput] = None) -> CompletableTask: + input: Optional[TInput] = None) -> CompletableTask[Any]: """Schedule entity function for execution. Parameters @@ -538,8 +538,8 @@ def task_id(self) -> int: return self._task_id -# Orchestrators are generators that yield tasks and receive/return any type -Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task, Any, Any], TOutput]] +# Orchestrators are generators that yield tasks, receive any type, and return TOutput +Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task[Any], Any, TOutput], TOutput]] # Activities are simple functions that can be scheduled by orchestrators Activity = Callable[[ActivityContext, TInput], TOutput] diff --git a/durabletask/worker.py b/durabletask/worker.py index af29129..06ae4f2 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -150,7 +150,7 @@ def __init__(self): self.entities = {} self.entity_instances = {} - def add_orchestrator(self, fn: task.Orchestrator) -> str: + def add_orchestrator(self, fn: task.Orchestrator[TInput, TOutput]) -> str: if fn is None: raise ValueError("An orchestrator function argument is required.") @@ -158,7 +158,7 @@ def add_orchestrator(self, fn: task.Orchestrator) -> str: self.add_named_orchestrator(name, fn) return name - def add_named_orchestrator(self, name: str, fn: task.Orchestrator) -> None: + def add_named_orchestrator(self, name: str, fn: task.Orchestrator[TInput, TOutput]) -> None: if not name: raise ValueError("A non-empty orchestrator name is required.") if name in self.orchestrators: @@ -166,10 +166,10 @@ def add_named_orchestrator(self, name: str, fn: task.Orchestrator) -> None: self.orchestrators[name] = fn - def get_orchestrator(self, name: str) -> Optional[task.Orchestrator]: + def get_orchestrator(self, name: str) -> Optional[task.Orchestrator[Any, Any]]: return self.orchestrators.get(name) - def add_activity(self, fn: task.Activity) -> str: + def add_activity(self, fn: task.Activity[TInput, TOutput]) -> str: if fn is None: raise ValueError("An activity function argument is required.") @@ -177,7 +177,7 @@ def add_activity(self, fn: task.Activity) -> str: self.add_named_activity(name, fn) return name - def add_named_activity(self, name: str, fn: task.Activity) -> None: + def add_named_activity(self, name: str, fn: task.Activity[TInput, TOutput]) -> None: if not name: raise ValueError("A non-empty activity name is required.") if name in self.activities: @@ -185,7 +185,7 @@ def add_named_activity(self, name: str, fn: task.Activity) -> None: self.activities[name] = fn - def get_activity(self, name: str) -> Optional[task.Activity]: + def get_activity(self, name: str) -> Optional[task.Activity[Any, Any]]: return self.activities.get(name) def add_entity(self, fn: task.Entity) -> str: @@ -362,7 +362,7 @@ def __enter__(self): def __exit__(self, type, value, traceback): self.stop() - def add_orchestrator(self, fn: task.Orchestrator) -> str: + def add_orchestrator(self, fn: task.Orchestrator[TInput, TOutput]) -> str: """Registers an orchestrator function with the worker.""" if self._is_running: raise RuntimeError( @@ -1047,7 +1047,7 @@ def call_entity( entity: EntityInstanceId, operation: str, input: Optional[TInput] = None, - ) -> task.CompletableTask: + ) -> task.CompletableTask[Any]: id = self.next_sequence_number() self.call_entity_function_helper( From 75801acbee72a281541d370f82cd89aaac28f5d2 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Mon, 12 Jan 2026 14:37:44 -0700 Subject: [PATCH 4/5] Allow custom entity names (#105) * Allow custom entity names --- CHANGELOG.md | 1 + durabletask/task.py | 8 ++++++++ durabletask/worker.py | 16 +++++++--------- .../test_dts_class_based_entities_e2e.py | 12 ++++++------ .../test_dts_function_based_entities_e2e.py | 12 ++++++------ 5 files changed, 28 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ce3417..956c090 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 CHANGED +- Allow entities with custom names - Add/update type-hinting for various worker methods ## v1.2.0 diff --git a/durabletask/task.py b/durabletask/task.py index 4f0f55b..d02fe0f 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -615,6 +615,14 @@ def retry_timeout(self) -> Optional[timedelta]: return self._retry_timeout +def get_entity_name(fn: Entity) -> str: + if hasattr(fn, "__durable_entity_name__"): + return getattr(fn, "__durable_entity_name__") + if isinstance(fn, type) and issubclass(fn, DurableEntity): + return fn.__name__ + return get_name(fn) + + def get_name(fn: Callable) -> str: """Returns the name of the provided function""" name = fn.__name__ diff --git a/durabletask/worker.py b/durabletask/worker.py index 06ae4f2..9950677 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -188,16 +188,14 @@ def add_named_activity(self, name: str, fn: task.Activity[TInput, TOutput]) -> N def get_activity(self, name: str) -> Optional[task.Activity[Any, Any]]: return self.activities.get(name) - def add_entity(self, fn: task.Entity) -> str: + def add_entity(self, fn: task.Entity, name: Optional[str] = None) -> str: if fn is None: raise ValueError("An entity function argument is required.") - if isinstance(fn, type) and issubclass(fn, DurableEntity): - name = fn.__name__ - self.add_named_entity(name, fn) - else: - name = task.get_name(fn) - self.add_named_entity(name, fn) + if name is None: + name = task.get_entity_name(fn) + + self.add_named_entity(name, fn) return name def add_named_entity(self, name: str, fn: task.Entity) -> None: @@ -378,13 +376,13 @@ def add_activity(self, fn: task.Activity) -> str: ) return self._registry.add_activity(fn) - def add_entity(self, fn: task.Entity) -> str: + def add_entity(self, fn: task.Entity, name: Optional[str] = None) -> str: """Registers an entity function with the worker.""" if self._is_running: raise RuntimeError( "Entities cannot be added while the worker is running." ) - return self._registry.add_entity(fn) + return self._registry.add_entity(fn, name) def use_versioning(self, version: VersioningOptions) -> None: """Initializes versioning options for sub-orchestrators and activities.""" diff --git a/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py b/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py index 6075029..0910f88 100644 --- a/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py +++ b/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py @@ -17,7 +17,7 @@ endpoint = os.getenv("ENDPOINT", "http://localhost:8080") -def test_client_signal_class_entity(): +def test_client_signal_class_entity_and_custom_name(): invoked = False class EmptyEntity(entities.DurableEntity): @@ -28,12 +28,12 @@ def do_nothing(self, _): # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) as w: - w.add_entity(EmptyEntity) + w.add_entity(EmptyEntity, name="EntityNameCustom") w.start() c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) - entity_id = entities.EntityInstanceId("EmptyEntity", "testEntity") + entity_id = entities.EntityInstanceId("EntityNameCustom", "testEntity") c.signal_entity(entity_id, "do_nothing") time.sleep(2) # wait for the signal to be processed @@ -70,7 +70,7 @@ def do_nothing(self, _): assert invoked -def test_orchestration_signal_class_entity(): +def test_orchestration_signal_class_entity_and_custom_name(): invoked = False class EmptyEntity(entities.DurableEntity): @@ -79,14 +79,14 @@ def do_nothing(self, _): invoked = True def empty_orchestrator(ctx: task.OrchestrationContext, _): - entity_id = entities.EntityInstanceId("EmptyEntity", "testEntity") + entity_id = entities.EntityInstanceId("EntityNameCustom", "testEntity") ctx.signal_entity(entity_id, "do_nothing") # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(empty_orchestrator) - w.add_entity(EmptyEntity) + w.add_entity(EmptyEntity, name="EntityNameCustom") w.start() c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, diff --git a/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py b/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py index 6b857be..b3adebe 100644 --- a/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py +++ b/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py @@ -17,7 +17,7 @@ endpoint = os.getenv("ENDPOINT", "http://localhost:8080") -def test_client_signal_entity(): +def test_client_signal_entity_and_custom_name(): invoked = False def empty_entity(ctx: entities.EntityContext, _): @@ -28,12 +28,12 @@ def empty_entity(ctx: entities.EntityContext, _): # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) as w: - w.add_entity(empty_entity) + w.add_entity(empty_entity, name="EntityNameCustom") w.start() c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) - entity_id = entities.EntityInstanceId("empty_entity", "testEntity") + entity_id = entities.EntityInstanceId("EntityNameCustom", "testEntity") c.signal_entity(entity_id, "do_nothing") time.sleep(2) # wait for the signal to be processed @@ -70,7 +70,7 @@ def empty_entity(ctx: entities.EntityContext, _): assert invoked -def test_orchestration_signal_entity(): +def test_orchestration_signal_entity_and_custom_name(): invoked = False def empty_entity(ctx: entities.EntityContext, _): @@ -79,14 +79,14 @@ def empty_entity(ctx: entities.EntityContext, _): invoked = True def empty_orchestrator(ctx: task.OrchestrationContext, _): - entity_id = entities.EntityInstanceId("empty_entity", f"{ctx.instance_id}_testEntity") + entity_id = entities.EntityInstanceId("EntityNameCustom", f"{ctx.instance_id}_testEntity") ctx.signal_entity(entity_id, "do_nothing") # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(empty_orchestrator) - w.add_entity(empty_entity) + w.add_entity(empty_entity, name="EntityNameCustom") w.start() c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, From 424ffa983030c16b233246115ff626a8f3258283 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Mon, 12 Jan 2026 14:50:15 -0700 Subject: [PATCH 5/5] Prep for v1.3.0 (#106) --- CHANGELOG.md | 9 +++++++-- durabletask-azuremanaged/CHANGELOG.md | 5 +++++ durabletask-azuremanaged/pyproject.toml | 4 ++-- pyproject.toml | 2 +- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 956c090..f5f1f8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## v1.3.0 -CHANGED +ADDED - Allow entities with custom names + +CHANGED + +- Allow task.fail() to be called with Exceptions +- Update type-hinting for Task return sub-types - Add/update type-hinting for various worker methods ## v1.2.0 diff --git a/durabletask-azuremanaged/CHANGELOG.md b/durabletask-azuremanaged/CHANGELOG.md index 8d88678..6fb231b 100644 --- a/durabletask-azuremanaged/CHANGELOG.md +++ b/durabletask-azuremanaged/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## v1.3.0 + +- Updates base dependency to durabletask v1.3.0 + - See durabletask changelog for more details + ## v1.2.0 - Updates base dependency to durabletask v1.2.0 diff --git a/durabletask-azuremanaged/pyproject.toml b/durabletask-azuremanaged/pyproject.toml index f013a56..7a084eb 100644 --- a/durabletask-azuremanaged/pyproject.toml +++ b/durabletask-azuremanaged/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "durabletask.azuremanaged" -version = "1.2.0" +version = "1.3.0" description = "Durable Task Python SDK provider implementation for the Azure Durable Task Scheduler" keywords = [ "durable", @@ -26,7 +26,7 @@ requires-python = ">=3.10" license = {file = "LICENSE"} readme = "README.md" dependencies = [ - "durabletask>=1.2.0", + "durabletask>=1.3.0", "azure-identity>=1.19.0" ] diff --git a/pyproject.toml b/pyproject.toml index d970089..ec8a511 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "durabletask" -version = "1.2.0" +version = "1.3.0" description = "A Durable Task Client SDK for Python" keywords = [ "durable",