From 070eef7ffa6ca346d7e90a229d5a29f43de4022c Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Mon, 24 Nov 2025 12:03:46 -0700 Subject: [PATCH 01/17] Allow retrieve entity metadata from client (#77) * Allow retrieve entity metadata from client * Lint * Better state formatting * Add tests * Improve locked_by parsing * Parse last_modified as UTC * Fix tests --- durabletask/client.py | 13 +++ durabletask/entities/__init__.py | 3 +- durabletask/entities/entity_instance_id.py | 2 +- durabletask/entities/entity_metadata.py | 97 +++++++++++++++++++ .../test_dts_class_based_entities_e2e.py | 31 ++++++ .../test_dts_function_based_entities_e2e.py | 31 ++++++ 6 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 durabletask/entities/entity_metadata.py diff --git a/durabletask/client.py b/durabletask/client.py index c1508222..7d037585 100644 --- a/durabletask/client.py +++ b/durabletask/client.py @@ -12,6 +12,7 @@ from google.protobuf import wrappers_pb2 from durabletask.entities import EntityInstanceId +from durabletask.entities.entity_metadata import EntityMetadata import durabletask.internal.helpers as helpers import durabletask.internal.orchestrator_service_pb2 as pb import durabletask.internal.orchestrator_service_pb2_grpc as stubs @@ -241,3 +242,15 @@ def signal_entity(self, entity_instance_id: EntityInstanceId, operation_name: st ) self._logger.info(f"Signaling entity '{entity_instance_id}' operation '{operation_name}'.") self._stub.SignalEntity(req, None) # TODO: Cancellation timeout? + + def get_entity(self, + entity_instance_id: EntityInstanceId, + include_state: bool = True + ) -> Optional[EntityMetadata]: + req = pb.GetEntityRequest(instanceId=str(entity_instance_id), includeState=include_state) + self._logger.info(f"Getting entity '{entity_instance_id}'.") + res: pb.GetEntityResponse = self._stub.GetEntity(req) + if not res.exists: + return None + + return EntityMetadata.from_entity_response(res, include_state) diff --git a/durabletask/entities/__init__.py b/durabletask/entities/__init__.py index 4ab03c03..46f059b1 100644 --- a/durabletask/entities/__init__.py +++ b/durabletask/entities/__init__.py @@ -7,7 +7,8 @@ from durabletask.entities.durable_entity import DurableEntity from durabletask.entities.entity_lock import EntityLock from durabletask.entities.entity_context import EntityContext +from durabletask.entities.entity_metadata import EntityMetadata -__all__ = ["EntityInstanceId", "DurableEntity", "EntityLock", "EntityContext"] +__all__ = ["EntityInstanceId", "DurableEntity", "EntityLock", "EntityContext", "EntityMetadata"] PACKAGE_NAME = "durabletask.entities" diff --git a/durabletask/entities/entity_instance_id.py b/durabletask/entities/entity_instance_id.py index 53c1171f..c3b76c13 100644 --- a/durabletask/entities/entity_instance_id.py +++ b/durabletask/entities/entity_instance_id.py @@ -37,4 +37,4 @@ def parse(entity_id: str) -> Optional["EntityInstanceId"]: _, entity, key = entity_id.split("@", 2) return EntityInstanceId(entity=entity, key=key) except ValueError as ex: - raise ValueError("Invalid entity ID format", ex) + raise ValueError(f"Invalid entity ID format: {entity_id}", ex) diff --git a/durabletask/entities/entity_metadata.py b/durabletask/entities/entity_metadata.py new file mode 100644 index 00000000..68009392 --- /dev/null +++ b/durabletask/entities/entity_metadata.py @@ -0,0 +1,97 @@ +from datetime import datetime, timezone +from typing import Any, Optional, Type, TypeVar, Union, overload +from durabletask.entities.entity_instance_id import EntityInstanceId + +import durabletask.internal.orchestrator_service_pb2 as pb + +TState = TypeVar("TState") + + +class EntityMetadata: + """Class representing the metadata of a durable entity. + + This class encapsulates the metadata information of a durable entity, allowing for + easy access and manipulation of the entity's metadata within the Durable Task + Framework. + + Attributes: + id (EntityInstanceId): The unique identifier of the entity instance. + last_modified (datetime): The timestamp of the last modification to the entity. + backlog_queue_size (int): The size of the backlog queue for the entity. + locked_by (str): The identifier of the worker that currently holds the lock on the entity. + includes_state (bool): Indicates whether the metadata includes the state of the entity. + state (Optional[Any]): The current state of the entity, if included. + """ + + def __init__(self, + id: EntityInstanceId, + last_modified: datetime, + backlog_queue_size: int, + locked_by: str, + includes_state: bool, + state: Optional[Any]): + """Initializes a new instance of the EntityMetadata class. + + Args: + value: The initial state value of the entity. + """ + self.id = id + self.last_modified = last_modified + self.backlog_queue_size = backlog_queue_size + self._locked_by = locked_by + self.includes_state = includes_state + self._state = state + + @staticmethod + def from_entity_response(entity_response: pb.GetEntityResponse, includes_state: bool): + entity_id = EntityInstanceId.parse(entity_response.entity.instanceId) + if not entity_id: + raise ValueError("Invalid entity instance ID in entity response.") + entity_state = None + if includes_state: + entity_state = entity_response.entity.serializedState.value + return EntityMetadata( + id=entity_id, + last_modified=entity_response.entity.lastModifiedTime.ToDatetime(timezone.utc), + backlog_queue_size=entity_response.entity.backlogQueueSize, + locked_by=entity_response.entity.lockedBy.value, + includes_state=includes_state, + state=entity_state + ) + + @overload + def get_state(self, intended_type: Type[TState]) -> Optional[TState]: + ... + + @overload + def get_state(self, intended_type: None = None) -> Any: + ... + + def get_state(self, intended_type: Optional[Type[TState]] = None) -> Union[None, TState, Any]: + """Get the current state of the entity, optionally converting it to a specified type.""" + if intended_type is None or self._state is None: + return self._state + + if isinstance(self._state, intended_type): + return self._state + + try: + return intended_type(self._state) # type: ignore[call-arg] + except Exception as ex: + raise TypeError( + f"Could not convert state of type '{type(self._state).__name__}' to '{intended_type.__name__}'" + ) from ex + + def get_locked_by(self) -> Optional[EntityInstanceId]: + """Get the identifier of the worker that currently holds the lock on the entity. + + Returns + ------- + str + The identifier of the worker that currently holds the lock on the entity. + """ + if not self._locked_by: + return None + + # Will throw ValueError if the format is invalid + return EntityInstanceId.parse(self._locked_by) diff --git a/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py b/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py index 19e8e5ba..6075029a 100644 --- a/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py +++ b/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py @@ -1,3 +1,4 @@ +from datetime import datetime, timezone import os import time @@ -39,6 +40,36 @@ def do_nothing(self, _): assert invoked +def test_client_get_class_entity(): + invoked = False + + class EmptyEntity(entities.DurableEntity): + def do_nothing(self, _): + self.set_state(1) + nonlocal invoked # don't do this in a real app! + invoked = True + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_entity(EmptyEntity) + w.start() + + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + entity_id = entities.EntityInstanceId("EmptyEntity", "testEntity") + c.signal_entity(entity_id, "do_nothing") + time.sleep(2) # wait for the signal to be processed + state = c.get_entity(entity_id) + assert state is not None + assert state.id == entity_id + assert state.get_locked_by() is None + assert state.last_modified < datetime.now(timezone.utc) + assert state.get_state(int) == 1 + + assert invoked + + def test_orchestration_signal_class_entity(): invoked = False diff --git a/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py b/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py index 42206552..6b857bea 100644 --- a/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py +++ b/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py @@ -1,3 +1,4 @@ +from datetime import datetime, timezone import os import time @@ -39,6 +40,36 @@ def empty_entity(ctx: entities.EntityContext, _): assert invoked +def test_client_get_entity(): + invoked = False + + def empty_entity(ctx: entities.EntityContext, _): + nonlocal invoked # don't do this in a real app! + if ctx.operation == "do_nothing": + invoked = True + ctx.set_state(1) + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_entity(empty_entity) + w.start() + + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + entity_id = entities.EntityInstanceId("empty_entity", "testEntity") + c.signal_entity(entity_id, "do_nothing") + time.sleep(2) # wait for the signal to be processed + state = c.get_entity(entity_id) + assert state is not None + assert state.id == entity_id + assert state.get_locked_by() is None + assert state.last_modified < datetime.now(timezone.utc) + assert state.get_state(int) == 1 + + assert invoked + + def test_orchestration_signal_entity(): invoked = False From b4086fd62994035f6f5f7e9a104baf45891f21a0 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Mon, 24 Nov 2025 12:17:26 -0700 Subject: [PATCH 02/17] Prep for 1.1.0 release (#78) --- CHANGELOG.md | 6 ++++++ durabletask-azuremanaged/CHANGELOG.md | 7 +++++++ durabletask-azuremanaged/pyproject.toml | 4 ++-- pyproject.toml | 2 +- 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f7dc741..daffc504 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## v1.1.0 + +ADDED: + +- Allow retrieving entity metadata from the client, with or without state + ## v1.0.0 ADDED: diff --git a/durabletask-azuremanaged/CHANGELOG.md b/durabletask-azuremanaged/CHANGELOG.md index c7b339ca..efc31e0f 100644 --- a/durabletask-azuremanaged/CHANGELOG.md +++ b/durabletask-azuremanaged/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## v1.1.0 + +CHANGED: + +- Updates base dependency to durabletask v1.1.0 + - See durabletask changelog for more details + ## v1.0.0 CHANGED: diff --git a/durabletask-azuremanaged/pyproject.toml b/durabletask-azuremanaged/pyproject.toml index 8d522a7c..5c502461 100644 --- a/durabletask-azuremanaged/pyproject.toml +++ b/durabletask-azuremanaged/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "durabletask.azuremanaged" -version = "1.0.0" +version = "1.1.0" description = "Durable Task Python SDK provider implementation for the Azure Durable Task Scheduler" keywords = [ "durable", @@ -26,7 +26,7 @@ requires-python = ">=3.10" license = {file = "LICENSE"} readme = "README.md" dependencies = [ - "durabletask>=1.0.0", + "durabletask>=1.1.0", "azure-identity>=1.19.0" ] diff --git a/pyproject.toml b/pyproject.toml index 547eb7ad..111693c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "durabletask" -version = "1.0.0" +version = "1.1.0" description = "A Durable Task Client SDK for Python" keywords = [ "durable", From a20273115574660f4f78c6847e0f232b8bb7c7bf Mon Sep 17 00:00:00 2001 From: Bernd Verst Date: Fri, 5 Dec 2025 11:36:13 -0800 Subject: [PATCH 03/17] Add new development build workflows (#87) * Add new dev build workflows * rename build steps --------- Co-authored-by: Bernd Verst --- .../durabletask-azuremanaged-dev.yml | 52 +++++++++++++++++++ .../durabletask-azuremanaged-experimental.yml | 50 ++++++++++++++++++ .../workflows/durabletask-azuremanaged.yml | 2 +- .github/workflows/durabletask-dev.yml | 49 +++++++++++++++++ .github/workflows/durabletask-experiment.yml | 47 +++++++++++++++++ .github/workflows/durabletask.yml | 6 +-- 6 files changed, 202 insertions(+), 4 deletions(-) create mode 100644 .github/workflows/durabletask-azuremanaged-dev.yml create mode 100644 .github/workflows/durabletask-azuremanaged-experimental.yml create mode 100644 .github/workflows/durabletask-dev.yml create mode 100644 .github/workflows/durabletask-experiment.yml diff --git a/.github/workflows/durabletask-azuremanaged-dev.yml b/.github/workflows/durabletask-azuremanaged-dev.yml new file mode 100644 index 00000000..0ba1ece0 --- /dev/null +++ b/.github/workflows/durabletask-azuremanaged-dev.yml @@ -0,0 +1,52 @@ +name: Durable Task Scheduler SDK (durabletask-azuremanaged) Dev Release + +on: + workflow_run: + workflows: ["Durable Task Scheduler SDK (durabletask-azuremanaged)"] + types: + - completed + branches: + - main + +jobs: + publish-dev: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Extract version from tag + run: echo "VERSION=${GITHUB_REF#refs/tags/azuremanaged-v}" >> $GITHUB_ENV # Extract version from the tag + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.14" # Adjust Python version as needed + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build twine + + - name: Append dev to version in pyproject.toml + working-directory: durabletask-azuremanaged + run: | + sed -i 's/^version = "\(.*\)"/version = "\1.dev${{ github.run_number }}"/' pyproject.toml + + - name: Build package from directory durabletask-azuremanaged + working-directory: durabletask-azuremanaged + run: | + python -m build + + - name: Check package + working-directory: durabletask-azuremanaged + run: | + twine check dist/* + + - name: Publish package to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN_AZUREMANAGED }} # Store your PyPI API token in GitHub Secrets + working-directory: durabletask-azuremanaged + run: | + twine upload dist/* \ No newline at end of file diff --git a/.github/workflows/durabletask-azuremanaged-experimental.yml b/.github/workflows/durabletask-azuremanaged-experimental.yml new file mode 100644 index 00000000..d391db12 --- /dev/null +++ b/.github/workflows/durabletask-azuremanaged-experimental.yml @@ -0,0 +1,50 @@ +name: Durable Task Scheduler SDK (durabletask-azuremanaged) Experimental Release + +on: + push: + branches-ignore: + - main + - release/* + +jobs: + publish-experimental: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Extract version from tag + run: echo "VERSION=${GITHUB_REF#refs/tags/azuremanaged-v}" >> $GITHUB_ENV # Extract version from the tag + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.14" # Adjust Python version as needed + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build twine + + - name: Change the version in pyproject.toml to 0.0.0dev{github.run_number} + working-directory: durabletask-azuremanaged + run: | + sed -i 's/^version = ".*"/version = "0.0.0.dev${{ github.run_number }}"/' pyproject.toml + + - name: Build package from directory durabletask-azuremanaged + working-directory: durabletask-azuremanaged + run: | + python -m build + + - name: Check package + working-directory: durabletask-azuremanaged + run: | + twine check dist/* + + - name: Publish package to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN_AZUREMANAGED }} # Store your PyPI API token in GitHub Secrets + working-directory: durabletask-azuremanaged + run: | + twine upload dist/* \ No newline at end of file diff --git a/.github/workflows/durabletask-azuremanaged.yml b/.github/workflows/durabletask-azuremanaged.yml index c2c40aee..852b06d7 100644 --- a/.github/workflows/durabletask-azuremanaged.yml +++ b/.github/workflows/durabletask-azuremanaged.yml @@ -86,7 +86,7 @@ jobs: run: | pytest -m "dts" --verbose - publish: + publish-release: if: startsWith(github.ref, 'refs/tags/azuremanaged-v') # Only run if a matching tag is pushed needs: run-docker-tests runs-on: ubuntu-latest diff --git a/.github/workflows/durabletask-dev.yml b/.github/workflows/durabletask-dev.yml new file mode 100644 index 00000000..09ee4be4 --- /dev/null +++ b/.github/workflows/durabletask-dev.yml @@ -0,0 +1,49 @@ +name: Durable Task SDK (durabletask) Dev Release + +on: + workflow_run: + workflows: ["Durable Task SDK (durabletask)"] + types: + - completed + branches: + - main + +jobs: + publish-dev: + # needs: run-tests + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Extract version from tag + run: echo "VERSION=${GITHUB_REF#refs/tags/v}" >> $GITHUB_ENV # Extract version from the tag + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.14" # Adjust Python version as needed + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build twine + + - name: Append dev to version in pyproject.toml + run: | + sed -i 's/^version = "\(.*\)"/version = "\1.dev${{ github.run_number }}"/' pyproject.toml + + - name: Build package from root directory + run: | + python -m build + + - name: Check package + run: | + twine check dist/* + + - name: Publish package to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} # Store your PyPI API token in GitHub Secrets + run: | + twine upload dist/* \ No newline at end of file diff --git a/.github/workflows/durabletask-experiment.yml b/.github/workflows/durabletask-experiment.yml new file mode 100644 index 00000000..a9d440a5 --- /dev/null +++ b/.github/workflows/durabletask-experiment.yml @@ -0,0 +1,47 @@ +name: Durable Task SDK (durabletask) Experimental Release + +on: + push: + branches-ignore: + - main + - release/* + +jobs: + publish-experimental: + # needs: run-tests + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Extract version from tag + run: echo "VERSION=${GITHUB_REF#refs/tags/v}" >> $GITHUB_ENV # Extract version from the tag + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.14" # Adjust Python version as needed + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build twine + + - name: Change the version in pyproject.toml to 0.0.0dev{github.run_number} + run: | + sed -i 's/^version = ".*"/version = "0.0.0.dev${{ github.run_number }}"/' pyproject.toml + + - name: Build package from root directory + run: | + python -m build + + - name: Check package + run: | + twine check dist/* + + - name: Publish package to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} # Store your PyPI API token in GitHub Secrets + run: | + twine upload dist/* \ No newline at end of file diff --git a/.github/workflows/durabletask.yml b/.github/workflows/durabletask.yml index 2f417d9b..e7465ef8 100644 --- a/.github/workflows/durabletask.yml +++ b/.github/workflows/durabletask.yml @@ -2,7 +2,7 @@ name: Durable Task SDK (durabletask) on: push: - branches: + branches: - "main" tags: - "v*" # Only run for tags starting with "v" @@ -71,7 +71,7 @@ jobs: durabletask-go --port 4001 & pytest -m "e2e and not dts" --verbose - publish: + publish-release: if: startsWith(github.ref, 'refs/tags/v') # Only run if a matching tag is pushed needs: run-tests runs-on: ubuntu-latest @@ -105,4 +105,4 @@ jobs: TWINE_USERNAME: __token__ TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} # Store your PyPI API token in GitHub Secrets run: | - twine upload dist/* \ No newline at end of file + twine upload dist/* \ No newline at end of file From 6c887d1a37b4242e83150d76014bdeaf56a54e29 Mon Sep 17 00:00:00 2001 From: Bernd Verst Date: Fri, 5 Dec 2025 13:17:37 -0800 Subject: [PATCH 04/17] Allow any version to be used as the experimental dependency (#88) Co-authored-by: Bernd Verst --- .github/workflows/durabletask-azuremanaged-experimental.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/durabletask-azuremanaged-experimental.yml b/.github/workflows/durabletask-azuremanaged-experimental.yml index d391db12..444b7f96 100644 --- a/.github/workflows/durabletask-azuremanaged-experimental.yml +++ b/.github/workflows/durabletask-azuremanaged-experimental.yml @@ -30,6 +30,7 @@ jobs: working-directory: durabletask-azuremanaged run: | sed -i 's/^version = ".*"/version = "0.0.0.dev${{ github.run_number }}"/' pyproject.toml + sed -i 's/"durabletask>=.*"/"durabletask>=0.0.0dev1"/' pyproject.toml - name: Build package from directory durabletask-azuremanaged working-directory: durabletask-azuremanaged From 1b964da9ec036216fb230b9af8b17778b0e84099 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Thu, 11 Dec 2025 10:32:52 -0700 Subject: [PATCH 05/17] Add py.typed marker file to durabletask package (#81) --- durabletask/py.typed | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 durabletask/py.typed diff --git a/durabletask/py.typed b/durabletask/py.typed new file mode 100644 index 00000000..e69de29b From 4d2cafa9bf883c1a2ad29db58ce3790e475516de Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Thu, 11 Dec 2025 10:34:18 -0700 Subject: [PATCH 06/17] Update EntityInstanceId.parse signature (#80) * Update EntityInstanceId.parse signature --- durabletask/entities/entity_instance_id.py | 14 ++++++++------ durabletask/entities/entity_metadata.py | 5 +++-- durabletask/worker.py | 12 +++++++----- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/durabletask/entities/entity_instance_id.py b/durabletask/entities/entity_instance_id.py index c3b76c13..02a2595a 100644 --- a/durabletask/entities/entity_instance_id.py +++ b/durabletask/entities/entity_instance_id.py @@ -1,6 +1,3 @@ -from typing import Optional - - class EntityInstanceId: def __init__(self, entity: str, key: str): self.entity = entity @@ -20,7 +17,7 @@ def __lt__(self, other): return str(self) < str(other) @staticmethod - def parse(entity_id: str) -> Optional["EntityInstanceId"]: + def parse(entity_id: str) -> "EntityInstanceId": """Parse a string representation of an entity ID into an EntityInstanceId object. Parameters @@ -30,8 +27,13 @@ def parse(entity_id: str) -> Optional["EntityInstanceId"]: Returns ------- - Optional[EntityInstanceId] - The parsed EntityInstanceId object, or None if the input is None. + EntityInstanceId + The parsed EntityInstanceId object. + + Raises + ------ + ValueError + If the input string is not in the correct format. """ try: _, entity, key = entity_id.split("@", 2) diff --git a/durabletask/entities/entity_metadata.py b/durabletask/entities/entity_metadata.py index 68009392..3e04206d 100644 --- a/durabletask/entities/entity_metadata.py +++ b/durabletask/entities/entity_metadata.py @@ -44,8 +44,9 @@ def __init__(self, @staticmethod def from_entity_response(entity_response: pb.GetEntityResponse, includes_state: bool): - entity_id = EntityInstanceId.parse(entity_response.entity.instanceId) - if not entity_id: + try: + entity_id = EntityInstanceId.parse(entity_response.entity.instanceId) + except ValueError: raise ValueError("Invalid entity instance ID in entity response.") entity_state = None if includes_state: diff --git a/durabletask/worker.py b/durabletask/worker.py index fae345c0..0c05430f 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -750,9 +750,10 @@ def _execute_entity_batch( for operation in req.operations: start_time = datetime.now(timezone.utc) executor = _EntityExecutor(self._registry, self._logger) - entity_instance_id = EntityInstanceId.parse(instance_id) - if not entity_instance_id: - raise RuntimeError(f"Invalid entity instance ID '{operation.requestId}' in entity operation request.") + try: + entity_instance_id = EntityInstanceId.parse(instance_id) + except ValueError: + raise RuntimeError(f"Invalid entity instance ID '{instance_id}' in entity operation request.") operation_result = None @@ -1656,8 +1657,9 @@ def process_event( raise _get_wrong_action_type_error( entity_call_id, expected_method_name, action ) - entity_id = EntityInstanceId.parse(event.entityOperationCalled.targetInstanceId.value) - if not entity_id: + try: + entity_id = EntityInstanceId.parse(event.entityOperationCalled.targetInstanceId.value) + except ValueError: raise RuntimeError(f"Could not parse entity ID from targetInstanceId '{event.entityOperationCalled.targetInstanceId.value}'") ctx._entity_task_id_map[event.entityOperationCalled.requestId] = (entity_id, entity_call_id) elif event.HasField("entityOperationSignaled"): From 7bdfbcf262cfee62c7795960e3eea61624532f27 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Thu, 11 Dec 2025 10:35:08 -0700 Subject: [PATCH 07/17] Match entity method parameter names (#82) - Ensure consistency in worker.py to task.py --- durabletask/task.py | 3 ++- durabletask/worker.py | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/durabletask/task.py b/durabletask/task.py index 35708388..5f0f8586 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -139,7 +139,8 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *, pass @abstractmethod - def call_entity(self, entity: EntityInstanceId, + def call_entity(self, + entity: EntityInstanceId, operation: str, input: Optional[TInput] = None) -> Task: """Schedule entity function for execution. diff --git a/durabletask/worker.py b/durabletask/worker.py index 0c05430f..6b2d4898 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1039,14 +1039,14 @@ def call_activity( def call_entity( self, - entity_id: EntityInstanceId, + entity: EntityInstanceId, operation: str, input: Optional[TInput] = None, ) -> task.Task: id = self.next_sequence_number() self.call_entity_function_helper( - id, entity_id, operation, input=input + id, entity, operation, input=input ) return self._pending_tasks.get(id, task.CompletableTask()) @@ -1054,13 +1054,13 @@ def call_entity( def signal_entity( self, entity_id: EntityInstanceId, - operation: str, + operation_name: str, input: Optional[TInput] = None ) -> None: id = self.next_sequence_number() self.signal_entity_function_helper( - id, entity_id, operation, input + id, entity_id, operation_name, input ) def lock_entities(self, entities: list[EntityInstanceId]) -> task.Task[EntityLock]: From c54c386ab2ed6d5d81b8deb7e0121a9fbe347992 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Thu, 11 Dec 2025 10:37:12 -0700 Subject: [PATCH 08/17] Add new_uuid method to client (#83) * Add new_uuid method to client * Update entity messages to use UUIDs as requestIds --- durabletask/internal/helpers.py | 17 +++++++-- durabletask/task.py | 16 ++++++++ durabletask/worker.py | 20 ++++++++-- .../test_dts_orchestration_e2e.py | 37 +++++++++++++++++++ tests/durabletask/test_orchestration_e2e.py | 35 ++++++++++++++++++ 5 files changed, 118 insertions(+), 7 deletions(-) diff --git a/durabletask/internal/helpers.py b/durabletask/internal/helpers.py index ccd8558b..88481fa3 100644 --- a/durabletask/internal/helpers.py +++ b/durabletask/internal/helpers.py @@ -196,9 +196,14 @@ def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str], )) -def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]): +def new_call_entity_action(id: int, + parent_instance_id: str, + entity_id: EntityInstanceId, + operation: str, + encoded_input: Optional[str], + request_id: str) -> pb.OrchestratorAction: return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationCalled=pb.EntityOperationCalledEvent( - requestId=f"{parent_instance_id}:{id}", + requestId=request_id, operation=operation, scheduledTime=None, input=get_string_value(encoded_input), @@ -208,9 +213,13 @@ def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityIn ))) -def new_signal_entity_action(id: int, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]): +def new_signal_entity_action(id: int, + entity_id: EntityInstanceId, + operation: str, + encoded_input: Optional[str], + request_id: str) -> pb.OrchestratorAction: return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent( - requestId=f"{entity_id}:{id}", + requestId=request_id, operation=operation, scheduledTime=None, input=get_string_value(encoded_input), diff --git a/durabletask/task.py b/durabletask/task.py index 5f0f8586..1ae9f494 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -259,6 +259,22 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None: """ pass + @abstractmethod + def new_uuid(self) -> str: + """Create a new UUID that is safe for replay within an orchestration or operation. + + The default implementation of this method creates a name-based UUID + using the algorithm from RFC 4122 §4.3. The name input used to generate + this value is a combination of the orchestration instance ID, the current UTC datetime, + and an internally managed counter. + + Returns + ------- + str + New UUID that is safe for replay within an orchestration or operation. + """ + pass + @abstractmethod def _exit_critical_section(self) -> None: pass diff --git a/durabletask/worker.py b/durabletask/worker.py index 6b2d4898..56687bb8 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -13,6 +13,7 @@ from types import GeneratorType from enum import Enum from typing import Any, Generator, Optional, Sequence, TypeVar, Union +import uuid from packaging.version import InvalidVersion, parse import grpc @@ -33,6 +34,7 @@ TInput = TypeVar("TInput") TOutput = TypeVar("TOutput") +DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' class ConcurrencyOptions: @@ -832,6 +834,7 @@ def __init__(self, instance_id: str, registry: _Registry): # Maps criticalSectionId to task ID self._entity_lock_id_map: dict[str, int] = {} self._sequence_number = 0 + self._new_uuid_counter = 0 self._current_utc_datetime = datetime(1000, 1, 1) self._instance_id = instance_id self._registry = registry @@ -1166,7 +1169,7 @@ def call_entity_function_helper( raise RuntimeError(error_message) encoded_input = shared.to_json(input) if input is not None else None - action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input) + action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input, self.new_uuid()) self._pending_actions[id] = action fn_task = task.CompletableTask() @@ -1189,7 +1192,7 @@ def signal_entity_function_helper( encoded_input = shared.to_json(input) if input is not None else None - action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input) + action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input, self.new_uuid()) self._pending_actions[id] = action def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId]) -> None: @@ -1200,7 +1203,7 @@ def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId if not transition_valid: raise RuntimeError(error_message) - critical_section_id = f"{self.instance_id}:{id:04x}" + critical_section_id = self.new_uuid() request, target = self._entity_context.emit_acquire_message(critical_section_id, entities) @@ -1252,6 +1255,17 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None: self.set_continued_as_new(new_input, save_events) + def new_uuid(self) -> str: + NAMESPACE_UUID: str = "9e952958-5e33-4daf-827f-2fa12937b875" + + uuid_name_value = \ + f"{self._instance_id}" \ + f"_{self.current_utc_datetime.strftime(DATETIME_STRING_FORMAT)}" \ + f"_{self._new_uuid_counter}" + self._new_uuid_counter += 1 + namespace_uuid = uuid.uuid5(uuid.NAMESPACE_OID, NAMESPACE_UUID) + return str(uuid.uuid5(namespace_uuid, uuid_name_value)) + class ExecutionResults: actions: list[pb.OrchestratorAction] diff --git a/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py index 4a963fc7..7a7232e9 100644 --- a/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py +++ b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py @@ -5,6 +5,7 @@ import os import threading from datetime import timedelta +import uuid import pytest @@ -532,3 +533,39 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): assert state.serialized_input is None assert state.serialized_output is None assert state.serialized_custom_status == "\"foobaz\"" + + +def test_new_uuid(): + def noop(_: task.ActivityContext, _1): + pass + + def empty_orchestrator(ctx: task.OrchestrationContext, _): + # Assert that two new_uuid calls return different values + results = [ctx.new_uuid(), ctx.new_uuid()] + yield ctx.call_activity("noop") + # Assert that new_uuid still returns a unique value after replay + results.append(ctx.new_uuid()) + return results + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(empty_orchestrator) + w.add_activity(noop) + w.start() + + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = c.schedule_new_orchestration(empty_orchestrator) + state = c.wait_for_orchestration_completion(id, timeout=30) + + assert state is not None + assert state.name == task.get_name(empty_orchestrator) + assert state.instance_id == id + assert state.failure_details is None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + results = json.loads(state.serialized_output or "\"\"") + assert isinstance(results, list) and len(results) == 3 + assert uuid.UUID(results[0]) != uuid.UUID(results[1]) + assert uuid.UUID(results[0]) != uuid.UUID(results[2]) + assert uuid.UUID(results[1]) != uuid.UUID(results[2]) diff --git a/tests/durabletask/test_orchestration_e2e.py b/tests/durabletask/test_orchestration_e2e.py index 3db608dc..997bc504 100644 --- a/tests/durabletask/test_orchestration_e2e.py +++ b/tests/durabletask/test_orchestration_e2e.py @@ -5,6 +5,7 @@ import threading import time from datetime import timedelta +import uuid import pytest @@ -499,3 +500,37 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): assert state.serialized_input is None assert state.serialized_output is None assert state.serialized_custom_status == "\"foobaz\"" + + +def test_new_uuid(): + def noop(_: task.ActivityContext, _1): + pass + + def empty_orchestrator(ctx: task.OrchestrationContext, _): + # Assert that two new_uuid calls return different values + results = [ctx.new_uuid(), ctx.new_uuid()] + yield ctx.call_activity("noop") + # Assert that new_uuid still returns a unique value after replay + results.append(ctx.new_uuid()) + return results + + # Start a worker, which will connect to the sidecar in a background thread + with worker.TaskHubGrpcWorker() as w: + w.add_orchestrator(empty_orchestrator) + w.add_activity(noop) + w.start() + + c = client.TaskHubGrpcClient() + id = c.schedule_new_orchestration(empty_orchestrator) + state = c.wait_for_orchestration_completion(id, timeout=30) + + assert state is not None + assert state.name == task.get_name(empty_orchestrator) + assert state.instance_id == id + assert state.failure_details is None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + results = json.loads(state.serialized_output or "\"\"") + assert isinstance(results, list) and len(results) == 3 + assert uuid.UUID(results[0]) != uuid.UUID(results[1]) + assert uuid.UUID(results[0]) != uuid.UUID(results[2]) + assert uuid.UUID(results[1]) != uuid.UUID(results[2]) From c577b2b6b722b1c1f3b42bacdc4f5622f9409223 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Fri, 12 Dec 2025 12:15:21 -0700 Subject: [PATCH 09/17] Allow passing custom stub to execute operations (#85) * Allow passing custom stub to execute operations --------- Co-authored-by: Bernd Verst --- .../proto_task_hub_sidecar_service_stub.py | 33 +++++++++++++++++++ durabletask/worker.py | 13 ++++---- tests/durabletask/test_proto_task_hub_shim.py | 25 ++++++++++++++ 3 files changed, 65 insertions(+), 6 deletions(-) create mode 100644 durabletask/internal/proto_task_hub_sidecar_service_stub.py create mode 100644 tests/durabletask/test_proto_task_hub_shim.py diff --git a/durabletask/internal/proto_task_hub_sidecar_service_stub.py b/durabletask/internal/proto_task_hub_sidecar_service_stub.py new file mode 100644 index 00000000..8f51123b --- /dev/null +++ b/durabletask/internal/proto_task_hub_sidecar_service_stub.py @@ -0,0 +1,33 @@ +from typing import Any, Callable, Protocol + + +class ProtoTaskHubSidecarServiceStub(Protocol): + """A stub class matching the TaskHubSidecarServiceStub generated from the .proto file. + Allows the use of TaskHubGrpcWorker methods when a real sidecar stub is not available. + """ + Hello: Callable[..., Any] + StartInstance: Callable[..., Any] + GetInstance: Callable[..., Any] + RewindInstance: Callable[..., Any] + WaitForInstanceStart: Callable[..., Any] + WaitForInstanceCompletion: Callable[..., Any] + RaiseEvent: Callable[..., Any] + TerminateInstance: Callable[..., Any] + SuspendInstance: Callable[..., Any] + ResumeInstance: Callable[..., Any] + QueryInstances: Callable[..., Any] + PurgeInstances: Callable[..., Any] + GetWorkItems: Callable[..., Any] + CompleteActivityTask: Callable[..., Any] + CompleteOrchestratorTask: Callable[..., Any] + CompleteEntityTask: Callable[..., Any] + StreamInstanceHistory: Callable[..., Any] + CreateTaskHub: Callable[..., Any] + DeleteTaskHub: Callable[..., Any] + SignalEntity: Callable[..., Any] + GetEntity: Callable[..., Any] + QueryEntities: Callable[..., Any] + CleanEntityStorage: Callable[..., Any] + AbandonTaskActivityWorkItem: Callable[..., Any] + AbandonTaskOrchestratorWorkItem: Callable[..., Any] + AbandonTaskEntityWorkItem: Callable[..., Any] diff --git a/durabletask/worker.py b/durabletask/worker.py index 56687bb8..a4222ddb 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -24,6 +24,7 @@ from durabletask.internal.helpers import new_timestamp from durabletask.entities import DurableEntity, EntityLock, EntityInstanceId, EntityContext from durabletask.internal.orchestration_entity_context import OrchestrationEntityContext +from durabletask.internal.proto_task_hub_sidecar_service_stub import ProtoTaskHubSidecarServiceStub import durabletask.internal.helpers as ph import durabletask.internal.exceptions as pe import durabletask.internal.orchestrator_service_pb2 as pb @@ -631,7 +632,7 @@ def stop(self): def _execute_orchestrator( self, req: pb.OrchestratorRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): try: @@ -679,7 +680,7 @@ def _execute_orchestrator( def _cancel_orchestrator( self, req: pb.OrchestratorRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): stub.AbandonTaskOrchestratorWorkItem( @@ -692,7 +693,7 @@ def _cancel_orchestrator( def _execute_activity( self, req: pb.ActivityRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): instance_id = req.orchestrationInstance.instanceId @@ -725,7 +726,7 @@ def _execute_activity( def _cancel_activity( self, req: pb.ActivityRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): stub.AbandonTaskActivityWorkItem( @@ -738,7 +739,7 @@ def _cancel_activity( def _execute_entity_batch( self, req: Union[pb.EntityBatchRequest, pb.EntityRequest], - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): if isinstance(req, pb.EntityRequest): @@ -807,7 +808,7 @@ def _execute_entity_batch( def _cancel_entity_batch( self, req: Union[pb.EntityBatchRequest, pb.EntityRequest], - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): stub.AbandonTaskEntityWorkItem( diff --git a/tests/durabletask/test_proto_task_hub_shim.py b/tests/durabletask/test_proto_task_hub_shim.py new file mode 100644 index 00000000..8bd3a659 --- /dev/null +++ b/tests/durabletask/test_proto_task_hub_shim.py @@ -0,0 +1,25 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from typing import get_type_hints + +from durabletask.internal.orchestrator_service_pb2_grpc import TaskHubSidecarServiceStub +from durabletask.internal.proto_task_hub_sidecar_service_stub import ProtoTaskHubSidecarServiceStub + + +def test_proto_task_hub_shim_is_compatible(): + """Test that ProtoTaskHubSidecarServiceStub is compatible with TaskHubSidecarServiceStub.""" + protocol_attrs = set(get_type_hints(ProtoTaskHubSidecarServiceStub).keys()) + + # Instantiate TaskHubSidecarServiceStub with a dummy channel to get its attributes + class TestChannel(): + def unary_unary(self, *args, **kwargs): + pass + + def unary_stream(self, *args, **kwargs): + pass + impl_attrs = TaskHubSidecarServiceStub(TestChannel()).__dict__.keys() + + # Check missing + assert protocol_attrs == impl_attrs From 3eaf42c2f6c47a3bc3c62377699170bfef542b00 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Fri, 12 Dec 2025 12:15:52 -0700 Subject: [PATCH 10/17] Add support for new event types (#86) * Add support for new event types * Add tests for new event types --- durabletask/internal/helpers.py | 17 +++ durabletask/worker.py | 113 ++++++++++++++---- .../test_orchestration_executor.py | 73 ++++++++++- 3 files changed, 176 insertions(+), 27 deletions(-) diff --git a/durabletask/internal/helpers.py b/durabletask/internal/helpers.py index 88481fa3..0b1f655d 100644 --- a/durabletask/internal/helpers.py +++ b/durabletask/internal/helpers.py @@ -20,6 +20,11 @@ def new_orchestrator_started_event(timestamp: Optional[datetime] = None) -> pb.H return pb.HistoryEvent(eventId=-1, timestamp=ts, orchestratorStarted=pb.OrchestratorStartedEvent()) +def new_orchestrator_completed_event() -> pb.HistoryEvent: + return pb.HistoryEvent(eventId=-1, timestamp=timestamp_pb2.Timestamp(), + orchestratorCompleted=pb.OrchestratorCompletedEvent()) + + def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None, tags: Optional[dict[str, str]] = None) -> pb.HistoryEvent: return pb.HistoryEvent( @@ -119,6 +124,18 @@ def new_failure_details(ex: Exception) -> pb.TaskFailureDetails: ) +def new_event_sent_event(event_id: int, instance_id: str, input: str): + return pb.HistoryEvent( + eventId=event_id, + timestamp=timestamp_pb2.Timestamp(), + eventSent=pb.EventSentEvent( + name="", + input=get_string_value(input), + instanceId=instance_id + ) + ) + + def new_event_raised_event(name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent: return pb.HistoryEvent( eventId=-1, diff --git a/durabletask/worker.py b/durabletask/worker.py index a4222ddb..48c2e442 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -12,7 +12,7 @@ from threading import Event, Thread from types import GeneratorType from enum import Enum -from typing import Any, Generator, Optional, Sequence, TypeVar, Union +from typing import Any, Generator, Optional, Sequence, Tuple, TypeVar, Union import uuid from packaging.version import InvalidVersion, parse @@ -832,6 +832,7 @@ def __init__(self, instance_id: str, registry: _Registry): self._pending_tasks: dict[int, task.CompletableTask] = {} # Maps entity ID to task ID self._entity_task_id_map: dict[str, tuple[EntityInstanceId, int]] = {} + self._entity_lock_task_id_map: dict[str, tuple[EntityInstanceId, int]] = {} # Maps criticalSectionId to task ID self._entity_lock_id_map: dict[str, int] = {} self._sequence_number = 0 @@ -1606,33 +1607,40 @@ def process_event( else: raise TypeError("Unexpected sub-orchestration task type") elif event.HasField("eventRaised"): - # event names are case-insensitive - event_name = event.eventRaised.name.casefold() - if not ctx.is_replaying: - self._logger.info(f"{ctx.instance_id} Event raised: {event_name}") - task_list = ctx._pending_events.get(event_name, None) - decoded_result: Optional[Any] = None - if task_list: - event_task = task_list.pop(0) - if not ph.is_empty(event.eventRaised.input): - decoded_result = shared.from_json(event.eventRaised.input.value) - event_task.complete(decoded_result) - if not task_list: - del ctx._pending_events[event_name] - ctx.resume() + if event.eventRaised.name in ctx._entity_task_id_map: + entity_id, task_id = ctx._entity_task_id_map.get(event.eventRaised.name, (None, None)) + self._handle_entity_event_raised(ctx, event, entity_id, task_id, False) + elif event.eventRaised.name in ctx._entity_lock_task_id_map: + entity_id, task_id = ctx._entity_lock_task_id_map.get(event.eventRaised.name, (None, None)) + self._handle_entity_event_raised(ctx, event, entity_id, task_id, True) else: - # buffer the event - event_list = ctx._received_events.get(event_name, None) - if not event_list: - event_list = [] - ctx._received_events[event_name] = event_list - if not ph.is_empty(event.eventRaised.input): - decoded_result = shared.from_json(event.eventRaised.input.value) - event_list.append(decoded_result) + # event names are case-insensitive + event_name = event.eventRaised.name.casefold() if not ctx.is_replaying: - self._logger.info( - f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it." - ) + self._logger.info(f"{ctx.instance_id} Event raised: {event_name}") + task_list = ctx._pending_events.get(event_name, None) + decoded_result: Optional[Any] = None + if task_list: + event_task = task_list.pop(0) + if not ph.is_empty(event.eventRaised.input): + decoded_result = shared.from_json(event.eventRaised.input.value) + event_task.complete(decoded_result) + if not task_list: + del ctx._pending_events[event_name] + ctx.resume() + else: + # buffer the event + event_list = ctx._received_events.get(event_name, None) + if not event_list: + event_list = [] + ctx._received_events[event_name] = event_list + if not ph.is_empty(event.eventRaised.input): + decoded_result = shared.from_json(event.eventRaised.input.value) + event_list.append(decoded_result) + if not ctx.is_replaying: + self._logger.info( + f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it." + ) elif event.HasField("executionSuspended"): if not self._is_suspended and not ctx.is_replaying: self._logger.info(f"{ctx.instance_id}: Execution suspended.") @@ -1760,6 +1768,21 @@ def process_event( self._logger.info(f"{ctx.instance_id}: Entity operation failed.") self._logger.info(f"Data: {json.dumps(event.entityOperationFailed)}") pass + elif event.HasField("orchestratorCompleted"): + # Added in Functions only (for some reason) and does not affect orchestrator flow + pass + elif event.HasField("eventSent"): + # Check if this eventSent corresponds to an entity operation call after being translated to the old + # entity protocol by the Durable WebJobs extension. If so, treat this message similarly to + # entityOperationCalled and remove the pending action. Also store the entity id and event id for later + action = ctx._pending_actions.pop(event.eventId, None) + if action and action.HasField("sendEntityMessage"): + if action.sendEntityMessage.HasField("entityOperationCalled"): + entity_id, event_id = self._parse_entity_event_sent_input(event) + ctx._entity_task_id_map[event_id] = (entity_id, event.eventId) + elif action.sendEntityMessage.HasField("entityLockRequested"): + entity_id, event_id = self._parse_entity_event_sent_input(event) + ctx._entity_lock_task_id_map[event_id] = (entity_id, event.eventId) else: eventType = event.WhichOneof("eventType") raise task.OrchestrationStateError( @@ -1769,6 +1792,44 @@ def process_event( # The orchestrator generator function completed ctx.set_complete(generatorStopped.value, pb.ORCHESTRATION_STATUS_COMPLETED) + def _parse_entity_event_sent_input(self, event: pb.HistoryEvent) -> Tuple[EntityInstanceId, str]: + try: + entity_id = EntityInstanceId.parse(event.eventSent.instanceId) + except ValueError: + raise RuntimeError(f"Could not parse entity ID from instanceId '{event.eventSent.instanceId}'") + try: + event_id = json.loads(event.eventSent.input.value)["id"] + except (json.JSONDecodeError, KeyError, TypeError) as ex: + raise RuntimeError(f"Could not parse event ID from eventSent input '{event.eventSent.input.value}'") from ex + return entity_id, event_id + + def _handle_entity_event_raised(self, + ctx: _RuntimeOrchestrationContext, + event: pb.HistoryEvent, + entity_id: Optional[EntityInstanceId], + task_id: Optional[int], + is_lock_event: bool): + # This eventRaised represents the result of an entity operation after being translated to the old + # entity protocol by the Durable WebJobs extension + if entity_id is None: + raise RuntimeError(f"Could not retrieve entity ID for entity-related eventRaised with ID '{event.eventId}'") + if task_id is None: + raise RuntimeError(f"Could not retrieve task ID for entity-related eventRaised with ID '{event.eventId}'") + entity_task = ctx._pending_tasks.pop(task_id, None) + if not entity_task: + raise RuntimeError(f"Could not retrieve entity task for entity-related eventRaised with ID '{event.eventId}'") + result = None + if not ph.is_empty(event.eventRaised.input): + # TODO: Investigate why the event result is wrapped in a dict with "result" key + result = shared.from_json(event.eventRaised.input.value)["result"] + if is_lock_event: + ctx._entity_context.complete_acquire(event.eventRaised.name) + entity_task.complete(EntityLock(ctx)) + else: + ctx._entity_context.recover_lock_after_call(entity_id) + entity_task.complete(result) + ctx.resume() + def evaluate_orchestration_versioning(self, versioning: Optional[VersioningOptions], orchestration_version: Optional[str]) -> Optional[pb.TaskFailureDetails]: if versioning is None: return None diff --git a/tests/durabletask/test_orchestration_executor.py b/tests/durabletask/test_orchestration_executor.py index 5646f07b..8c728124 100644 --- a/tests/durabletask/test_orchestration_executor.py +++ b/tests/durabletask/test_orchestration_executor.py @@ -9,7 +9,7 @@ import durabletask.internal.helpers as helpers import durabletask.internal.orchestrator_service_pb2 as pb -from durabletask import task, worker +from durabletask import task, worker, entities logging.basicConfig( format='%(asctime)s.%(msecs)03d %(name)s %(levelname)s: %(message)s', @@ -1183,6 +1183,77 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert str(ex) in complete_action.failureDetails.errorMessage +def test_orchestrator_completed_no_effect(): + def dummy_activity(ctx, _): + pass + + def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): + yield ctx.call_activity(dummy_activity, input=orchestrator_input) + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + encoded_input = json.dumps(42) + new_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input), + helpers.new_orchestrator_completed_event()] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions + + assert len(actions) == 1 + assert type(actions[0]) is pb.OrchestratorAction + assert actions[0].id == 1 + assert actions[0].HasField("scheduleTask") + assert actions[0].scheduleTask.name == task.get_name(dummy_activity) + assert actions[0].scheduleTask.input.value == encoded_input + + +def test_entity_lock_created_as_event(): + test_entity_id = entities.EntityInstanceId("Counter", "myCounter") + + def orchestrator(ctx: task.OrchestrationContext, _): + entity_id = test_entity_id + with (yield ctx.lock_entities([entity_id])): + return (yield ctx.call_entity(entity_id, "set", 1)) + + registry = worker._Registry() + name = registry.add_orchestrator(orchestrator) + + new_events = [ + helpers.new_orchestrator_started_event(), + helpers.new_execution_started_event(name, TEST_INSTANCE_ID, None), + ] + + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) + result1 = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result1.actions + assert len(actions) == 1 + assert type(actions[0]) is pb.OrchestratorAction + assert actions[0].id == 1 + assert actions[0].HasField("sendEntityMessage") + assert actions[0].sendEntityMessage.HasField("entityLockRequested") + + old_events = new_events + event_sent_input = { + "id": actions[0].sendEntityMessage.entityLockRequested.criticalSectionId, + } + new_events = [ + helpers.new_event_sent_event(1, str(test_entity_id), json.dumps(event_sent_input)), + helpers.new_event_raised_event(event_sent_input["id"], None), + ] + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions + + assert len(actions) == 1 + assert type(actions[0]) is pb.OrchestratorAction + assert actions[0].id == 2 + assert actions[0].HasField("sendEntityMessage") + assert actions[0].sendEntityMessage.HasField("entityOperationCalled") + assert actions[0].sendEntityMessage.entityOperationCalled.targetInstanceId.value == str(test_entity_id) + + def get_and_validate_complete_orchestration_action_list(expected_action_count: int, actions: list[pb.OrchestratorAction]) -> pb.CompleteOrchestrationAction: assert len(actions) == expected_action_count assert type(actions[-1]) is pb.OrchestratorAction From a95501a4ae43d565dad1c4ab73c130fc39c7ab27 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Wed, 7 Jan 2026 10:02:33 -0700 Subject: [PATCH 11/17] Prep for 1.2.0 release (#98) --- CHANGELOG.md | 23 ++++++++++++++++++++++- durabletask-azuremanaged/CHANGELOG.md | 5 +++++ durabletask-azuremanaged/pyproject.toml | 4 ++-- pyproject.toml | 2 +- 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index daffc504..a2c3e598 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,30 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## v1.2.0 + +ADDED: + +- Added new_uuid method to orchestration clients allowing generation of replay-safe UUIDs. +- Added ProtoTaskHubSidecarServiceStub class to allow passing self-generated stubs to worker +- Added support for new event types needed for specific durable backend setups: + - orchestratorCompleted + - eventSent + - eventRaised modified to support entity events + +CHANGED: + +- Added py.typed marker file to durabletask module +- Updated type hinting on EntityInstanceId.parse() to reflect behavior +- Entity operations now use UUIDs generated with new_uuid + +FIXED: + +- Mismatched parameter names in call_entity/signal_entity from interface + ## v1.1.0 -ADDED: +ADDED: - Allow retrieving entity metadata from the client, with or without state diff --git a/durabletask-azuremanaged/CHANGELOG.md b/durabletask-azuremanaged/CHANGELOG.md index efc31e0f..8d88678e 100644 --- a/durabletask-azuremanaged/CHANGELOG.md +++ b/durabletask-azuremanaged/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## v1.2.0 + +- Updates base dependency to durabletask v1.2.0 + - See durabletask changelog for more details + ## v1.1.0 CHANGED: diff --git a/durabletask-azuremanaged/pyproject.toml b/durabletask-azuremanaged/pyproject.toml index 5c502461..f013a565 100644 --- a/durabletask-azuremanaged/pyproject.toml +++ b/durabletask-azuremanaged/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "durabletask.azuremanaged" -version = "1.1.0" +version = "1.2.0" description = "Durable Task Python SDK provider implementation for the Azure Durable Task Scheduler" keywords = [ "durable", @@ -26,7 +26,7 @@ requires-python = ">=3.10" license = {file = "LICENSE"} readme = "README.md" dependencies = [ - "durabletask>=1.1.0", + "durabletask>=1.2.0", "azure-identity>=1.19.0" ] diff --git a/pyproject.toml b/pyproject.toml index 111693c7..d9700894 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "durabletask" -version = "1.1.0" +version = "1.2.0" description = "A Durable Task Client SDK for Python" keywords = [ "durable", From e8b163e8a254ba088f0bfdaeb99e96549e9ee812 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Thu, 8 Jan 2026 12:55:15 -0700 Subject: [PATCH 12/17] Allow TaskFailedError to be constructed from Exception (#99) * Allow TaskFailedError to be constructed from Exception * Propogate inner exceptions in FailureDetails * Allow passing exception to fail() --- durabletask/internal/helpers.py | 11 +++++++++-- durabletask/task.py | 6 ++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/durabletask/internal/helpers.py b/durabletask/internal/helpers.py index 0b1f655d..0cd2d40e 100644 --- a/durabletask/internal/helpers.py +++ b/durabletask/internal/helpers.py @@ -116,11 +116,18 @@ def new_sub_orchestration_failed_event(event_id: int, ex: Exception) -> pb.Histo ) -def new_failure_details(ex: Exception) -> pb.TaskFailureDetails: +def new_failure_details(ex: Exception, _visited: Optional[set[int]] = None) -> pb.TaskFailureDetails: + if _visited is None: + _visited = set() + _visited.add(id(ex)) + inner: Optional[BaseException] = ex.__cause__ or ex.__context__ + if len(_visited) > 10 or (inner and id(inner) in _visited) or not isinstance(inner, Exception): + inner = None return pb.TaskFailureDetails( errorType=type(ex).__name__, errorMessage=str(ex), - stackTrace=wrappers_pb2.StringValue(value=''.join(traceback.format_tb(ex.__traceback__))) + stackTrace=wrappers_pb2.StringValue(value=''.join(traceback.format_tb(ex.__traceback__))), + innerFailure=new_failure_details(inner, _visited) if inner else None ) diff --git a/durabletask/task.py b/durabletask/task.py index 1ae9f494..50d0d05b 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -302,8 +302,10 @@ def stack_trace(self) -> Optional[str]: class TaskFailedError(Exception): """Exception type for all orchestration task failures.""" - def __init__(self, message: str, details: pb.TaskFailureDetails): + def __init__(self, message: str, details: Union[pb.TaskFailureDetails, Exception]): super().__init__(message) + if isinstance(details, Exception): + details = pbh.new_failure_details(details) self._details = FailureDetails( details.errorMessage, details.errorType, @@ -424,7 +426,7 @@ def complete(self, result: T): if self._parent is not None: self._parent.on_child_completed(self) - def fail(self, message: str, details: pb.TaskFailureDetails): + def fail(self, message: str, details: Union[Exception, pb.TaskFailureDetails]): if self._is_complete: raise ValueError('The task has already completed.') self._exception = TaskFailedError(message, details) From 1939eea4fe5c226e7ade7b6e7f40c3a98c45cc5b Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Thu, 8 Jan 2026 13:06:12 -0700 Subject: [PATCH 13/17] Increase specificity of Task return types (#100) * Increase specificity of Task return types --- durabletask/task.py | 10 +++++----- durabletask/worker.py | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/durabletask/task.py b/durabletask/task.py index 50d0d05b..fa15a97e 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -117,7 +117,7 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task: def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *, input: Optional[TInput] = None, retry_policy: Optional[RetryPolicy] = None, - tags: Optional[dict[str, str]] = None) -> Task[TOutput]: + tags: Optional[dict[str, str]] = None) -> CompletableTask[TOutput]: """Schedule an activity for execution. Parameters @@ -142,7 +142,7 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *, def call_entity(self, entity: EntityInstanceId, operation: str, - input: Optional[TInput] = None) -> Task: + input: Optional[TInput] = None) -> CompletableTask: """Schedule entity function for execution. Parameters @@ -182,7 +182,7 @@ def signal_entity( pass @abstractmethod - def lock_entities(self, entities: list[EntityInstanceId]) -> Task[EntityLock]: + def lock_entities(self, entities: list[EntityInstanceId]) -> CompletableTask[EntityLock]: """Creates a Task object that locks the specified entity instances. The locks will be acquired the next time the orchestrator yields. @@ -206,7 +206,7 @@ def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput input: Optional[TInput] = None, instance_id: Optional[str] = None, retry_policy: Optional[RetryPolicy] = None, - version: Optional[str] = None) -> Task[TOutput]: + version: Optional[str] = None) -> CompletableTask[TOutput]: """Schedule sub-orchestrator function for execution. Parameters @@ -231,7 +231,7 @@ def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput # TOOD: Add a timeout parameter, which allows the task to be canceled if the event is # not received within the specified timeout. This requires support for task cancellation. @abstractmethod - def wait_for_external_event(self, name: str) -> Task: + def wait_for_external_event(self, name: str) -> CompletableTask: """Wait asynchronously for an event to be raised with the name `name`. Parameters diff --git a/durabletask/worker.py b/durabletask/worker.py index 48c2e442..af29129f 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1014,7 +1014,7 @@ def create_timer_internal( self, fire_at: Union[datetime, timedelta], retryable_task: Optional[task.RetryableTask] = None, - ) -> task.Task: + ) -> task.TimerTask: id = self.next_sequence_number() if isinstance(fire_at, timedelta): fire_at = self.current_utc_datetime + fire_at @@ -1034,7 +1034,7 @@ def call_activity( input: Optional[TInput] = None, retry_policy: Optional[task.RetryPolicy] = None, tags: Optional[dict[str, str]] = None, - ) -> task.Task[TOutput]: + ) -> task.CompletableTask[TOutput]: id = self.next_sequence_number() self.call_activity_function_helper( @@ -1047,7 +1047,7 @@ def call_entity( entity: EntityInstanceId, operation: str, input: Optional[TInput] = None, - ) -> task.Task: + ) -> task.CompletableTask: id = self.next_sequence_number() self.call_entity_function_helper( @@ -1068,7 +1068,7 @@ def signal_entity( id, entity_id, operation_name, input ) - def lock_entities(self, entities: list[EntityInstanceId]) -> task.Task[EntityLock]: + def lock_entities(self, entities: list[EntityInstanceId]) -> task.CompletableTask[EntityLock]: id = self.next_sequence_number() self.lock_entities_function_helper( @@ -1084,7 +1084,7 @@ def call_sub_orchestrator( instance_id: Optional[str] = None, retry_policy: Optional[task.RetryPolicy] = None, version: Optional[str] = None, - ) -> task.Task[TOutput]: + ) -> task.CompletableTask[TOutput]: id = self.next_sequence_number() if isinstance(orchestrator, str): orchestrator_name = orchestrator @@ -1229,7 +1229,7 @@ def _exit_critical_section(self) -> None: action = pb.OrchestratorAction(id=task_id, sendEntityMessage=entity_unlock_message) self._pending_actions[task_id] = action - def wait_for_external_event(self, name: str) -> task.Task: + def wait_for_external_event(self, name: str) -> task.CompletableTask: # Check to see if this event has already been received, in which case we # can return it immediately. Otherwise, record out intent to receive an # event with the given name so that we can resume the generator when it From a491469cc12813af1c5618591645fbb0ad7962db Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Mon, 12 Jan 2026 14:34:19 -0700 Subject: [PATCH 14/17] Type-hinting improvements (#104) * Type-hinting improvements --- CHANGELOG.md | 6 ++++++ durabletask/client.py | 5 ++++- durabletask/task.py | 6 +++--- durabletask/worker.py | 16 ++++++++-------- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a2c3e598..5ce34179 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +CHANGED + +- Add/update type-hinting for various worker methods + ## v1.2.0 ADDED: diff --git a/durabletask/client.py b/durabletask/client.py index 7d037585..3b4d4e39 100644 --- a/durabletask/client.py +++ b/durabletask/client.py @@ -230,7 +230,10 @@ def purge_orchestration(self, instance_id: str, recursive: bool = True): self._logger.info(f"Purging instance '{instance_id}'.") self._stub.PurgeInstances(req) - def signal_entity(self, entity_instance_id: EntityInstanceId, operation_name: str, input: Optional[Any] = None): + def signal_entity(self, + entity_instance_id: EntityInstanceId, + operation_name: str, + input: Optional[Any] = None) -> None: req = pb.SignalEntityRequest( instanceId=str(entity_instance_id), name=operation_name, diff --git a/durabletask/task.py b/durabletask/task.py index fa15a97e..4f0f55b9 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -142,7 +142,7 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *, def call_entity(self, entity: EntityInstanceId, operation: str, - input: Optional[TInput] = None) -> CompletableTask: + input: Optional[TInput] = None) -> CompletableTask[Any]: """Schedule entity function for execution. Parameters @@ -538,8 +538,8 @@ def task_id(self) -> int: return self._task_id -# Orchestrators are generators that yield tasks and receive/return any type -Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task, Any, Any], TOutput]] +# Orchestrators are generators that yield tasks, receive any type, and return TOutput +Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task[Any], Any, TOutput], TOutput]] # Activities are simple functions that can be scheduled by orchestrators Activity = Callable[[ActivityContext, TInput], TOutput] diff --git a/durabletask/worker.py b/durabletask/worker.py index af29129f..06ae4f23 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -150,7 +150,7 @@ def __init__(self): self.entities = {} self.entity_instances = {} - def add_orchestrator(self, fn: task.Orchestrator) -> str: + def add_orchestrator(self, fn: task.Orchestrator[TInput, TOutput]) -> str: if fn is None: raise ValueError("An orchestrator function argument is required.") @@ -158,7 +158,7 @@ def add_orchestrator(self, fn: task.Orchestrator) -> str: self.add_named_orchestrator(name, fn) return name - def add_named_orchestrator(self, name: str, fn: task.Orchestrator) -> None: + def add_named_orchestrator(self, name: str, fn: task.Orchestrator[TInput, TOutput]) -> None: if not name: raise ValueError("A non-empty orchestrator name is required.") if name in self.orchestrators: @@ -166,10 +166,10 @@ def add_named_orchestrator(self, name: str, fn: task.Orchestrator) -> None: self.orchestrators[name] = fn - def get_orchestrator(self, name: str) -> Optional[task.Orchestrator]: + def get_orchestrator(self, name: str) -> Optional[task.Orchestrator[Any, Any]]: return self.orchestrators.get(name) - def add_activity(self, fn: task.Activity) -> str: + def add_activity(self, fn: task.Activity[TInput, TOutput]) -> str: if fn is None: raise ValueError("An activity function argument is required.") @@ -177,7 +177,7 @@ def add_activity(self, fn: task.Activity) -> str: self.add_named_activity(name, fn) return name - def add_named_activity(self, name: str, fn: task.Activity) -> None: + def add_named_activity(self, name: str, fn: task.Activity[TInput, TOutput]) -> None: if not name: raise ValueError("A non-empty activity name is required.") if name in self.activities: @@ -185,7 +185,7 @@ def add_named_activity(self, name: str, fn: task.Activity) -> None: self.activities[name] = fn - def get_activity(self, name: str) -> Optional[task.Activity]: + def get_activity(self, name: str) -> Optional[task.Activity[Any, Any]]: return self.activities.get(name) def add_entity(self, fn: task.Entity) -> str: @@ -362,7 +362,7 @@ def __enter__(self): def __exit__(self, type, value, traceback): self.stop() - def add_orchestrator(self, fn: task.Orchestrator) -> str: + def add_orchestrator(self, fn: task.Orchestrator[TInput, TOutput]) -> str: """Registers an orchestrator function with the worker.""" if self._is_running: raise RuntimeError( @@ -1047,7 +1047,7 @@ def call_entity( entity: EntityInstanceId, operation: str, input: Optional[TInput] = None, - ) -> task.CompletableTask: + ) -> task.CompletableTask[Any]: id = self.next_sequence_number() self.call_entity_function_helper( From 75801acbee72a281541d370f82cd89aaac28f5d2 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Mon, 12 Jan 2026 14:37:44 -0700 Subject: [PATCH 15/17] Allow custom entity names (#105) * Allow custom entity names --- CHANGELOG.md | 1 + durabletask/task.py | 8 ++++++++ durabletask/worker.py | 16 +++++++--------- .../test_dts_class_based_entities_e2e.py | 12 ++++++------ .../test_dts_function_based_entities_e2e.py | 12 ++++++------ 5 files changed, 28 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ce34179..956c0908 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 CHANGED +- Allow entities with custom names - Add/update type-hinting for various worker methods ## v1.2.0 diff --git a/durabletask/task.py b/durabletask/task.py index 4f0f55b9..d02fe0f8 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -615,6 +615,14 @@ def retry_timeout(self) -> Optional[timedelta]: return self._retry_timeout +def get_entity_name(fn: Entity) -> str: + if hasattr(fn, "__durable_entity_name__"): + return getattr(fn, "__durable_entity_name__") + if isinstance(fn, type) and issubclass(fn, DurableEntity): + return fn.__name__ + return get_name(fn) + + def get_name(fn: Callable) -> str: """Returns the name of the provided function""" name = fn.__name__ diff --git a/durabletask/worker.py b/durabletask/worker.py index 06ae4f23..9950677b 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -188,16 +188,14 @@ def add_named_activity(self, name: str, fn: task.Activity[TInput, TOutput]) -> N def get_activity(self, name: str) -> Optional[task.Activity[Any, Any]]: return self.activities.get(name) - def add_entity(self, fn: task.Entity) -> str: + def add_entity(self, fn: task.Entity, name: Optional[str] = None) -> str: if fn is None: raise ValueError("An entity function argument is required.") - if isinstance(fn, type) and issubclass(fn, DurableEntity): - name = fn.__name__ - self.add_named_entity(name, fn) - else: - name = task.get_name(fn) - self.add_named_entity(name, fn) + if name is None: + name = task.get_entity_name(fn) + + self.add_named_entity(name, fn) return name def add_named_entity(self, name: str, fn: task.Entity) -> None: @@ -378,13 +376,13 @@ def add_activity(self, fn: task.Activity) -> str: ) return self._registry.add_activity(fn) - def add_entity(self, fn: task.Entity) -> str: + def add_entity(self, fn: task.Entity, name: Optional[str] = None) -> str: """Registers an entity function with the worker.""" if self._is_running: raise RuntimeError( "Entities cannot be added while the worker is running." ) - return self._registry.add_entity(fn) + return self._registry.add_entity(fn, name) def use_versioning(self, version: VersioningOptions) -> None: """Initializes versioning options for sub-orchestrators and activities.""" diff --git a/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py b/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py index 6075029a..0910f88e 100644 --- a/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py +++ b/tests/durabletask-azuremanaged/test_dts_class_based_entities_e2e.py @@ -17,7 +17,7 @@ endpoint = os.getenv("ENDPOINT", "http://localhost:8080") -def test_client_signal_class_entity(): +def test_client_signal_class_entity_and_custom_name(): invoked = False class EmptyEntity(entities.DurableEntity): @@ -28,12 +28,12 @@ def do_nothing(self, _): # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) as w: - w.add_entity(EmptyEntity) + w.add_entity(EmptyEntity, name="EntityNameCustom") w.start() c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) - entity_id = entities.EntityInstanceId("EmptyEntity", "testEntity") + entity_id = entities.EntityInstanceId("EntityNameCustom", "testEntity") c.signal_entity(entity_id, "do_nothing") time.sleep(2) # wait for the signal to be processed @@ -70,7 +70,7 @@ def do_nothing(self, _): assert invoked -def test_orchestration_signal_class_entity(): +def test_orchestration_signal_class_entity_and_custom_name(): invoked = False class EmptyEntity(entities.DurableEntity): @@ -79,14 +79,14 @@ def do_nothing(self, _): invoked = True def empty_orchestrator(ctx: task.OrchestrationContext, _): - entity_id = entities.EntityInstanceId("EmptyEntity", "testEntity") + entity_id = entities.EntityInstanceId("EntityNameCustom", "testEntity") ctx.signal_entity(entity_id, "do_nothing") # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(empty_orchestrator) - w.add_entity(EmptyEntity) + w.add_entity(EmptyEntity, name="EntityNameCustom") w.start() c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, diff --git a/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py b/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py index 6b857bea..b3adebe1 100644 --- a/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py +++ b/tests/durabletask-azuremanaged/test_dts_function_based_entities_e2e.py @@ -17,7 +17,7 @@ endpoint = os.getenv("ENDPOINT", "http://localhost:8080") -def test_client_signal_entity(): +def test_client_signal_entity_and_custom_name(): invoked = False def empty_entity(ctx: entities.EntityContext, _): @@ -28,12 +28,12 @@ def empty_entity(ctx: entities.EntityContext, _): # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) as w: - w.add_entity(empty_entity) + w.add_entity(empty_entity, name="EntityNameCustom") w.start() c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) - entity_id = entities.EntityInstanceId("empty_entity", "testEntity") + entity_id = entities.EntityInstanceId("EntityNameCustom", "testEntity") c.signal_entity(entity_id, "do_nothing") time.sleep(2) # wait for the signal to be processed @@ -70,7 +70,7 @@ def empty_entity(ctx: entities.EntityContext, _): assert invoked -def test_orchestration_signal_entity(): +def test_orchestration_signal_entity_and_custom_name(): invoked = False def empty_entity(ctx: entities.EntityContext, _): @@ -79,14 +79,14 @@ def empty_entity(ctx: entities.EntityContext, _): invoked = True def empty_orchestrator(ctx: task.OrchestrationContext, _): - entity_id = entities.EntityInstanceId("empty_entity", f"{ctx.instance_id}_testEntity") + entity_id = entities.EntityInstanceId("EntityNameCustom", f"{ctx.instance_id}_testEntity") ctx.signal_entity(entity_id, "do_nothing") # Start a worker, which will connect to the sidecar in a background thread with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) as w: w.add_orchestrator(empty_orchestrator) - w.add_entity(empty_entity) + w.add_entity(empty_entity, name="EntityNameCustom") w.start() c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, From 424ffa983030c16b233246115ff626a8f3258283 Mon Sep 17 00:00:00 2001 From: andystaples <77818326+andystaples@users.noreply.github.com> Date: Mon, 12 Jan 2026 14:50:15 -0700 Subject: [PATCH 16/17] Prep for v1.3.0 (#106) --- CHANGELOG.md | 9 +++++++-- durabletask-azuremanaged/CHANGELOG.md | 5 +++++ durabletask-azuremanaged/pyproject.toml | 4 ++-- pyproject.toml | 2 +- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 956c0908..f5f1f8d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## v1.3.0 -CHANGED +ADDED - Allow entities with custom names + +CHANGED + +- Allow task.fail() to be called with Exceptions +- Update type-hinting for Task return sub-types - Add/update type-hinting for various worker methods ## v1.2.0 diff --git a/durabletask-azuremanaged/CHANGELOG.md b/durabletask-azuremanaged/CHANGELOG.md index 8d88678e..6fb231be 100644 --- a/durabletask-azuremanaged/CHANGELOG.md +++ b/durabletask-azuremanaged/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## v1.3.0 + +- Updates base dependency to durabletask v1.3.0 + - See durabletask changelog for more details + ## v1.2.0 - Updates base dependency to durabletask v1.2.0 diff --git a/durabletask-azuremanaged/pyproject.toml b/durabletask-azuremanaged/pyproject.toml index f013a565..7a084ebc 100644 --- a/durabletask-azuremanaged/pyproject.toml +++ b/durabletask-azuremanaged/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "durabletask.azuremanaged" -version = "1.2.0" +version = "1.3.0" description = "Durable Task Python SDK provider implementation for the Azure Durable Task Scheduler" keywords = [ "durable", @@ -26,7 +26,7 @@ requires-python = ">=3.10" license = {file = "LICENSE"} readme = "README.md" dependencies = [ - "durabletask>=1.2.0", + "durabletask>=1.3.0", "azure-identity>=1.19.0" ] diff --git a/pyproject.toml b/pyproject.toml index d9700894..ec8a511d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "durabletask" -version = "1.2.0" +version = "1.3.0" description = "A Durable Task Client SDK for Python" keywords = [ "durable", From 0e09bb8adaaa0654895770c590e773d701b87e80 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Mon, 9 Feb 2026 13:02:59 -0800 Subject: [PATCH 17/17] Fix example bugs, improve docs, and add copilot instructions (#107) - Fix entity method dispatch in worker.py to inspect method signature before passing entity_input, so class-based entity methods that take no input (e.g. get()) work without requiring a dummy parameter - Fix function_based_entity.py: if -> elif for set operation - Fix function_based_entity_actions.py: add missing 'add' operation - Fix sub-orchestration examples: use dynamic secure_channel based on endpoint instead of hardcoded True - Rewrite examples/README.md with proper formatting, virtual env setup instructions, and local dev install steps - Add .github/copilot-instructions.md with project conventions - Add .pymarkdown.json config and pymarkdownlnt to dev-requirements.txt - Add unit tests for entity executor method dispatch --- .github/copilot-instructions.md | 95 +++++++++ .pymarkdown.json | 10 + dev-requirements.txt | 1 + durabletask/worker.py | 17 +- examples/README.md | 192 +++++++++++++----- examples/activity_sequence.py | 6 +- examples/entities/class_based_entity.py | 8 +- .../entities/class_based_entity_actions.py | 8 +- examples/entities/entity_locking.py | 8 +- examples/entities/function_based_entity.py | 10 +- .../entities/function_based_entity_actions.py | 11 +- examples/fanout_fanin.py | 6 +- examples/human_interaction.py | 6 +- .../orchestrator.py | 5 +- .../worker.py | 7 +- examples/version_aware_orchestrator.py | 6 +- tests/durabletask/test_entity_executor.py | 131 ++++++++++++ 17 files changed, 428 insertions(+), 99 deletions(-) create mode 100644 .github/copilot-instructions.md create mode 100644 .pymarkdown.json create mode 100644 tests/durabletask/test_entity_executor.py diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 00000000..fa3e0bb4 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,95 @@ +# Copilot Instructions for durabletask-python + +## Project Overview + +This is the Durable Task Python SDK, providing a client and worker for +building durable orchestrations. The repo contains two packages: + +- `durabletask` — core SDK (in `durabletask/`) +- `durabletask.azuremanaged` — Azure Durable Task Scheduler provider (in `durabletask-azuremanaged/`) + +## Language and Style + +- Python 3.10+ is required. +- Use type hints for all public API signatures. +- Follow PEP 8 conventions. +- Use `autopep8` for Python formatting. + +## Python Linting + +This repository uses [flake8](https://flake8.pycqa.org/) for Python +linting. Run it after making changes to verify there are no issues: + +```bash +flake8 path/to/changed/file.py +``` + +## Markdown Style + +Use GitHub-style callouts for notes, warnings, and tips in Markdown files: + +```markdown +> [!NOTE] +> This is a note. + +> [!WARNING] +> This is a warning. + +> [!TIP] +> This is a tip. +``` + +Do **not** use bold-text callouts like `**NOTE:**` or `> **Note:**`. + +When providing shell commands in Markdown, include both Bash and +PowerShell examples if the syntax differs between them. Common cases +include multiline commands (Bash uses `\` for line continuation while +PowerShell uses a backtick `` ` ``), environment variable syntax, and +path separators. If a command is identical in both shells, a single +example is sufficient. + +## Markdown Linting + +This repository uses [pymarkdownlnt](https://pypi.org/project/pymarkdownlnt/) +for linting Markdown files. Configuration is in `.pymarkdown.json` at the +repository root. + +To lint a single file: + +```bash +pymarkdown -c .pymarkdown.json scan path/to/file.md +``` + +To lint all Markdown files in the repository: + +```bash +pymarkdown -c .pymarkdown.json scan **/*.md +``` + +Install the linter via the dev dependencies: + +```bash +pip install -r dev-requirements.txt +``` + +## Building and Testing + +Install the packages locally in editable mode: + +```bash +pip install -e . -e ./durabletask-azuremanaged +``` + +Run tests with pytest: + +```bash +pytest +``` + +## Project Structure + +- `durabletask/` — core SDK source +- `durabletask-azuremanaged/` — Azure managed provider source +- `examples/` — example orchestrations (see `examples/README.md`) +- `tests/` — test suite +- `dev-requirements.txt` — development dependencies diff --git a/.pymarkdown.json b/.pymarkdown.json new file mode 100644 index 00000000..69ca1e2a --- /dev/null +++ b/.pymarkdown.json @@ -0,0 +1,10 @@ +{ + "plugins": { + "md013": { + "line_length": 100 + }, + "md014": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/dev-requirements.txt b/dev-requirements.txt index b3ff6f7f..98f4c30e 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1 +1,2 @@ grpcio-tools +pymarkdownlnt diff --git a/durabletask/worker.py b/durabletask/worker.py index 9950677b..4d9da6de 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -1913,6 +1913,7 @@ class _EntityExecutor: def __init__(self, registry: _Registry, logger: logging.Logger): self._registry = registry self._logger = logger + self._entity_method_cache: dict[tuple[type, str], bool] = {} def execute( self, @@ -1948,7 +1949,21 @@ def execute( raise TypeError(f"Entity operation '{operation}' is not callable") # Execute the entity method entity_instance._initialize_entity_context(ctx) - entity_output = method(entity_input) + cache_key = (type(entity_instance), operation) + has_required_param = self._entity_method_cache.get(cache_key) + if has_required_param is None: + sig = inspect.signature(method) + has_required_param = any( + p.default == inspect.Parameter.empty + for p in sig.parameters.values() + if p.kind not in (inspect.Parameter.VAR_POSITIONAL, + inspect.Parameter.VAR_KEYWORD) + ) + self._entity_method_cache[cache_key] = has_required_param + if has_required_param or entity_input is not None: + entity_output = method(entity_input) + else: + entity_output = method() else: # Execute the entity function entity_output = fn(ctx, entity_input) diff --git a/examples/README.md b/examples/README.md index 0912a603..59fa7fd2 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,86 +1,176 @@ # Examples -This directory contains examples of how to author durable orchestrations using the Durable Task Python SDK in conjunction with the Durable Task Scheduler (DTS). +This directory contains examples of how to author durable orchestrations +using the Durable Task Python SDK in conjunction with the +Durable Task Scheduler (DTS). ## Prerequisites + If using a deployed Durable Task Scheduler: - - [Azure CLI](https://learn.microsoft.com/cli/azure/install-azure-cli) - - [`az durabletask` CLI extension](https://learn.microsoft.com/en-us/cli/azure/durabletask?view=azure-cli-latest) + +- [Azure CLI](https://learn.microsoft.com/cli/azure/install-azure-cli) +- [`az durabletask` CLI extension](https://learn.microsoft.com/cli/azure/durabletask?view=azure-cli-latest) ## Running the Examples + There are two separate ways to run an example: - Using the Emulator (recommended for learning and development) -- Using a deployed Scheduler and Taskhub in Azure +- Using a deployed Scheduler and Taskhub in Azure ### Running with the Emulator -We recommend using the emulator for learning and development as it's faster to set up and doesn't require any Azure resources. The emulator simulates a scheduler and taskhub, packaged into an easy-to-use Docker container. + +We recommend using the emulator for learning and development as it's +faster to set up and doesn't require any Azure resources. The emulator +simulates a scheduler and taskhub, packaged into an easy-to-use +Docker container. 1. Install Docker: If it is not already installed. -2. Pull the Docker Image for the Emulator: -```bash -docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.6 -``` +1. Pull the Docker Image for the Emulator: -3. Run the Emulator: Wait a few seconds for the container to be ready. -```bash -docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.6 -``` + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` -4. Install the Required Packages: -```bash -pip install -r requirements.txt -``` +1. Run the Emulator: Wait a few seconds for the container to be ready. + + ```bash + docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Create a Python virtual environment (recommended): + + ```bash + python -m venv .venv + ``` -Note: The example code has been updated to use the default emulator settings automatically (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + Activate the virtual environment: + + Bash: + + ```bash + source .venv/bin/activate + ``` + + PowerShell: + + ```powershell + .\.venv\Scripts\Activate.ps1 + ``` + +1. Install the Required Packages: + + ```bash + pip install -r requirements.txt + ``` + + If you are running from a local clone of the repository, install the + local packages in editable mode instead (run this from the repository + root, not the `examples/` directory): + + ```bash + pip install -e . -e ./durabletask-azuremanaged + ``` + +> [!NOTE] +> The example code uses the default emulator settings +> automatically (endpoint: `http://localhost:8080`, taskhub: `default`). +> You don't need to set any environment variables. ### Running with a Deployed Scheduler and Taskhub Resource in Azure -For production scenarios or when you're ready to deploy to Azure, you can create a taskhub using the Azure CLI: + +For production scenarios or when you're ready to deploy to Azure, you +can create a taskhub using the Azure CLI: 1. Create a Scheduler: -```bash -az durabletask scheduler create --resource-group --name --location --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 --sku-name "Dedicated" --tags "{'myattribute':'myvalue'}" -``` -2. Create Your Taskhub: -```bash -az durabletask taskhub create --resource-group --scheduler-name --name -``` + Bash: -3. Retrieve the Endpoint for the Scheduler: Locate the taskhub in the Azure portal to find the endpoint. + ```bash + az durabletask scheduler create \ + --resource-group \ + --name \ + --location \ + --ip-allowlist "[0.0.0.0/0]" \ + --sku-capacity 1 \ + --sku-name "Dedicated" \ + --tags "{'myattribute':'myvalue'}" + ``` -4. Set the Environment Variables: -Bash: -```bash -export TASKHUB= -export ENDPOINT= -``` -Powershell: -```powershell -$env:TASKHUB = "" -$env:ENDPOINT = "" -``` + PowerShell: -5. Install the Required Packages: -```bash -pip install -r requirements.txt -``` + ```powershell + az durabletask scheduler create ` + --resource-group ` + --name ` + --location ` + --ip-allowlist "[0.0.0.0/0]" ` + --sku-capacity 1 ` + --sku-name "Dedicated" ` + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create Your Taskhub: + + Bash: + + ```bash + az durabletask taskhub create \ + --resource-group \ + --scheduler-name \ + --name + ``` + + PowerShell: + + ```powershell + az durabletask taskhub create ` + --resource-group ` + --scheduler-name ` + --name + ``` + +1. Retrieve the Endpoint for the Scheduler: Locate the taskhub in the + Azure portal to find the endpoint. + +1. Set the Environment Variables: + + Bash: + + ```bash + export TASKHUB= + export ENDPOINT= + ``` + + PowerShell: + + ```powershell + $env:TASKHUB = "" + $env:ENDPOINT = "" + ``` + +1. Install the Required Packages: + + ```bash + pip install -r requirements.txt + ``` + +### Executing the Examples -### Running the Examples You can now execute any of the examples in this directory using Python: ```bash -python3 example_file.py +python activity_sequence.py ``` -### Review Orchestration History and Status in the Durable Task Scheduler Dashboard -To access the Durable Task Scheduler Dashboard, follow these steps: +### Review Orchestration History and Status -- **Using the Emulator**: By default, the dashboard runs on portal 8082. Navigate to http://localhost:8082 and click on the default task hub. +To access the Durable Task Scheduler Dashboard, follow these steps: -- **Using a Deployed Scheduler**: Navigate to the Scheduler resource. Then, go to the Task Hub subresource that you are using and click on the dashboard URL in the top right corner. +- **Using the Emulator**: By default, the dashboard runs on port 8082. + Navigate to and click on the default task hub. -```sh -python3 activity_sequence.py -``` +- **Using a Deployed Scheduler**: Navigate to the Scheduler resource. + Then, go to the Task Hub subresource that you are using and click on + the dashboard URL in the top right corner. diff --git a/examples/activity_sequence.py b/examples/activity_sequence.py index 38c013db..420935d7 100644 --- a/examples/activity_sequence.py +++ b/examples/activity_sequence.py @@ -33,10 +33,8 @@ def sequence(ctx: task.OrchestrationContext, _): print(f"Using endpoint: {endpoint}") # Set credential to None for emulator, or DefaultAzureCredential for Azure -credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() - -# configure and start the worker - use secure_channel=False for emulator -secure_channel = endpoint != "http://localhost:8080" +secure_channel = endpoint.startswith("https://") +credential = DefaultAzureCredential() if secure_channel else None with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential) as w: w.add_orchestrator(sequence) diff --git a/examples/entities/class_based_entity.py b/examples/entities/class_based_entity.py index f211b659..e1b581d5 100644 --- a/examples/entities/class_based_entity.py +++ b/examples/entities/class_based_entity.py @@ -15,7 +15,7 @@ def set(self, input: int): def add(self, input: int): current_state = self.get_state(int, 0) - new_state = current_state + (input or 1) + new_state = current_state + (1 if input is None else input) self.set_state(new_state) return new_state @@ -44,10 +44,8 @@ def counter_orchestrator(ctx: task.OrchestrationContext, _): print(f"Using endpoint: {endpoint}") # Set credential to None for emulator, or DefaultAzureCredential for Azure -credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() - -# configure and start the worker - use secure_channel=False for emulator -secure_channel = endpoint != "http://localhost:8080" +secure_channel = endpoint.startswith("https://") +credential = DefaultAzureCredential() if secure_channel else None with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential) as w: w.add_orchestrator(counter_orchestrator) diff --git a/examples/entities/class_based_entity_actions.py b/examples/entities/class_based_entity_actions.py index 8a382184..5240bb75 100644 --- a/examples/entities/class_based_entity_actions.py +++ b/examples/entities/class_based_entity_actions.py @@ -15,7 +15,7 @@ def set(self, input: int): def add(self, input: int): current_state = self.get_state(int, 0) - new_state = current_state + (input or 1) + new_state = current_state + (1 if input is None else input) self.set_state(new_state) return new_state @@ -63,10 +63,8 @@ def hello_orchestrator(ctx: task.OrchestrationContext, _): print(f"Using endpoint: {endpoint}") # Set credential to None for emulator, or DefaultAzureCredential for Azure -credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() - -# configure and start the worker - use secure_channel=False for emulator -secure_channel = endpoint != "http://localhost:8080" +secure_channel = endpoint.startswith("https://") +credential = DefaultAzureCredential() if secure_channel else None with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential) as w: w.add_orchestrator(counter_orchestrator) diff --git a/examples/entities/entity_locking.py b/examples/entities/entity_locking.py index cdc25ab3..c6c1bda9 100644 --- a/examples/entities/entity_locking.py +++ b/examples/entities/entity_locking.py @@ -15,7 +15,7 @@ def set(self, input: int): def add(self, input: int): current_state = self.get_state(int, 0) - new_state = current_state + (input or 1) + new_state = current_state + (1 if input is None else input) self.set_state(new_state) return new_state @@ -46,10 +46,8 @@ def counter_orchestrator(ctx: task.OrchestrationContext, _): print(f"Using endpoint: {endpoint}") # Set credential to None for emulator, or DefaultAzureCredential for Azure -credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() - -# configure and start the worker - use secure_channel=False for emulator -secure_channel = endpoint != "http://localhost:8080" +secure_channel = endpoint.startswith("https://") +credential = DefaultAzureCredential() if secure_channel else None with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential) as w: w.add_orchestrator(counter_orchestrator) diff --git a/examples/entities/function_based_entity.py b/examples/entities/function_based_entity.py index a43b86d2..85bcdedb 100644 --- a/examples/entities/function_based_entity.py +++ b/examples/entities/function_based_entity.py @@ -13,9 +13,9 @@ def counter(ctx: entities.EntityContext, input: int) -> Optional[int]: if ctx.operation == "set": ctx.set_state(input) - if ctx.operation == "add": + elif ctx.operation == "add": current_state = ctx.get_state(int, 0) - new_state = current_state + (input or 1) + new_state = current_state + (1 if input is None else input) ctx.set_state(new_state) return new_state elif ctx.operation == "get": @@ -45,10 +45,8 @@ def counter_orchestrator(ctx: task.OrchestrationContext, _): print(f"Using endpoint: {endpoint}") # Set credential to None for emulator, or DefaultAzureCredential for Azure -credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() - -# configure and start the worker - use secure_channel=False for emulator -secure_channel = endpoint != "http://localhost:8080" +secure_channel = endpoint.startswith("https://") +credential = DefaultAzureCredential() if secure_channel else None with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential) as w: w.add_orchestrator(counter_orchestrator) diff --git a/examples/entities/function_based_entity_actions.py b/examples/entities/function_based_entity_actions.py index 129eb6c3..28eb29a7 100644 --- a/examples/entities/function_based_entity_actions.py +++ b/examples/entities/function_based_entity_actions.py @@ -13,6 +13,11 @@ def counter(ctx: entities.EntityContext, input: int) -> Optional[int]: if ctx.operation == "set": ctx.set_state(input) + elif ctx.operation == "add": + current_state = ctx.get_state(int, 0) + new_state = current_state + (1 if input is None else input) + ctx.set_state(new_state) + return new_state elif ctx.operation == "get": return ctx.get_state(int, 0) elif ctx.operation == "update_parent": @@ -57,10 +62,8 @@ def hello_orchestrator(ctx: task.OrchestrationContext, _): print(f"Using endpoint: {endpoint}") # Set credential to None for emulator, or DefaultAzureCredential for Azure -credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() - -# configure and start the worker - use secure_channel=False for emulator -secure_channel = endpoint != "http://localhost:8080" +secure_channel = endpoint.startswith("https://") +credential = DefaultAzureCredential() if secure_channel else None with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential) as w: w.add_orchestrator(counter_orchestrator) diff --git a/examples/fanout_fanin.py b/examples/fanout_fanin.py index a606731b..0975d924 100644 --- a/examples/fanout_fanin.py +++ b/examples/fanout_fanin.py @@ -58,10 +58,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): print(f"Using endpoint: {endpoint}") # Set credential to None for emulator, or DefaultAzureCredential for Azure -credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() - -# configure and start the worker - use secure_channel=False for emulator -secure_channel = endpoint != "http://localhost:8080" +secure_channel = endpoint.startswith("https://") +credential = DefaultAzureCredential() if secure_channel else None with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential) as w: w.add_orchestrator(orchestrator) diff --git a/examples/human_interaction.py b/examples/human_interaction.py index ae93cd29..9d607581 100644 --- a/examples/human_interaction.py +++ b/examples/human_interaction.py @@ -114,10 +114,8 @@ def prompt_for_approval(): print(f"Using endpoint: {endpoint}") # Set credential to None for emulator, or DefaultAzureCredential for Azure - credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() - - # Configure and start the worker - use secure_channel=False for emulator - secure_channel = endpoint != "http://localhost:8080" + secure_channel = endpoint.startswith("https://") + credential = DefaultAzureCredential() if secure_channel else None with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential) as w: w.add_orchestrator(purchase_order_workflow) diff --git a/examples/sub-orchestrations-with-fan-out-fan-in/orchestrator.py b/examples/sub-orchestrations-with-fan-out-fan-in/orchestrator.py index a5e013b3..eef2edc6 100644 --- a/examples/sub-orchestrations-with-fan-out-fan-in/orchestrator.py +++ b/examples/sub-orchestrations-with-fan-out-fan-in/orchestrator.py @@ -11,10 +11,11 @@ print(f"Using endpoint: {endpoint}") # Set credential to None for emulator, or DefaultAzureCredential for Azure -credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() +secure_channel = endpoint.startswith("https://") +credential = DefaultAzureCredential() if secure_channel else None # Create a client, start an orchestration, and wait for it to finish -c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, +c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential) instance_id = c.schedule_new_orchestration("orchestrator") diff --git a/examples/sub-orchestrations-with-fan-out-fan-in/worker.py b/examples/sub-orchestrations-with-fan-out-fan-in/worker.py index 8ca447d3..45620cda 100644 --- a/examples/sub-orchestrations-with-fan-out-fan-in/worker.py +++ b/examples/sub-orchestrations-with-fan-out-fan-in/worker.py @@ -119,10 +119,9 @@ def orchestrator(ctx, _): print(f"Using endpoint: {endpoint}") # Set credential to None for emulator, or DefaultAzureCredential for Azure -credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() - -# Configure and start the worker -with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, +secure_channel = endpoint.startswith("https://") +credential = DefaultAzureCredential() if secure_channel else None +with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential) as w: w.add_orchestrator(orchestrator) diff --git a/examples/version_aware_orchestrator.py b/examples/version_aware_orchestrator.py index 15ac9619..b0af11a4 100644 --- a/examples/version_aware_orchestrator.py +++ b/examples/version_aware_orchestrator.py @@ -47,10 +47,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): print(f"Using endpoint: {endpoint}") # Set credential to None for emulator, or DefaultAzureCredential for Azure -credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() - -# configure and start the worker - use secure_channel=False for emulator -secure_channel = endpoint != "http://localhost:8080" +secure_channel = endpoint.startswith("https://") +credential = DefaultAzureCredential() if secure_channel else None with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential) as w: # This worker is versioned for v2, as the orchestrator code has already been updated diff --git a/tests/durabletask/test_entity_executor.py b/tests/durabletask/test_entity_executor.py new file mode 100644 index 00000000..851edd7d --- /dev/null +++ b/tests/durabletask/test_entity_executor.py @@ -0,0 +1,131 @@ +"""Unit tests for the _EntityExecutor class in durabletask.worker.""" +import logging + +from durabletask import entities +from durabletask.internal.entity_state_shim import StateShim +from durabletask.worker import _EntityExecutor, _Registry + + +def _make_executor(*entity_args) -> _EntityExecutor: + """Helper to create an _EntityExecutor with registered entities.""" + registry = _Registry() + for entity in entity_args: + registry.add_entity(entity) + return _EntityExecutor(registry, logging.getLogger("test")) + + +def _execute(executor, entity_name, operation, encoded_input=None): + """Helper to execute an entity operation.""" + entity_id = entities.EntityInstanceId(entity_name, "test-key") + state = StateShim(None) + return executor.execute("test-orchestration", entity_id, operation, state, encoded_input) + + +class TestClassBasedEntityMethodDispatch: + """Tests for class-based entity method dispatch in _EntityExecutor.""" + + def test_method_with_no_input_parameter(self): + """Methods that don't accept input should work without _=None.""" + class Counter(entities.DurableEntity): + def get(self): + return self.get_state(int, 0) + + executor = _make_executor(Counter) + result = _execute(executor, "Counter", "get") + assert result == "0" + + def test_method_with_input_parameter(self): + """Methods that accept input should receive entity_input.""" + class Counter(entities.DurableEntity): + def set(self, value: int): + self.set_state(value) + + executor = _make_executor(Counter) + result = _execute(executor, "Counter", "set", "42") + assert result is None + + def test_method_with_input_returns_value(self): + """Methods that accept input and return a value.""" + class Counter(entities.DurableEntity): + def add(self, value: int): + current = self.get_state(int, 0) + new_value = current + value + self.set_state(new_value) + return new_value + + executor = _make_executor(Counter) + result = _execute(executor, "Counter", "add", "5") + assert result == "5" + + def test_mix_of_methods_with_and_without_input(self): + """An entity with both input and no-input methods should work.""" + class Counter(entities.DurableEntity): + def set(self, value: int): + self.set_state(value) + + def get(self): + return self.get_state(int, 0) + + executor = _make_executor(Counter) + entity_id = entities.EntityInstanceId("Counter", "test-key") + + # set requires input + state = StateShim(None) + executor.execute("test-orch", entity_id, "set", state, "10") + state.commit() + + # get does not require input — reuse state to simulate persistence + result = executor.execute("test-orch", entity_id, "get", state, None) + assert result == "10" + + def test_method_with_optional_parameter_uses_default(self): + """Methods with default parameters should use defaults when no input is provided.""" + class Counter(entities.DurableEntity): + def add(self, value: int = 1): + current = self.get_state(int, 0) + new_value = current + value + self.set_state(new_value) + return new_value + + executor = _make_executor(Counter) + + # No input provided — should use default value of 1 + result = _execute(executor, "Counter", "add") + assert result == "1" + + def test_method_with_optional_parameter_uses_provided_input(self): + """Methods with default parameters should use provided input when given.""" + class Counter(entities.DurableEntity): + def add(self, value: int = 1): + current = self.get_state(int, 0) + new_value = current + value + self.set_state(new_value) + return new_value + + executor = _make_executor(Counter) + + # Input provided — should use it instead of default + result = _execute(executor, "Counter", "add", "5") + assert result == "5" + + +class TestFunctionBasedEntityDispatch: + """Tests for function-based entity dispatch in _EntityExecutor.""" + + def test_function_entity_receives_context_and_input(self): + """Function-based entities always receive (ctx, input).""" + def counter(ctx: entities.EntityContext, input): + if ctx.operation == "get": + return ctx.get_state(int, 0) + elif ctx.operation == "set": + ctx.set_state(input) + + executor = _make_executor(counter) + entity_id = entities.EntityInstanceId("counter", "test-key") + state = StateShim(None) + + executor.execute("test-orch", entity_id, "set", state, "42") + state.commit() + + result = executor.execute("test-orch", entity_id, "get", state, None) + assert result == "42"