add non-retryable errors, and shutdown helpers#33
add non-retryable errors, and shutdown helpers#33acroca merged 4 commits intodapr:maindapr/durabletask-python:mainfrom filintod:filinto/asyncio-p1filintod/durabletask-python:filinto/asyncio-p1Copy head branch name to clipboard
Conversation
f6f6a40 to
de68786
Compare
| self._stub = stubs.TaskHubSidecarServiceStub(channel) | ||
| self._logger = shared.get_logger("client", log_handler, log_formatter) | ||
|
|
||
| def __enter__(self): |
There was a problem hiding this comment.
add context manager option for clean closing
durabletask/client.py
Outdated
| # gRPC timeout mapping (pytest unit tests may pass None explicitly) | ||
| grpc_timeout = None if (timeout is None or timeout == 0) else timeout | ||
|
|
||
| # If timeout is None or 0, skip pre-checks/polling and call server-side wait directly |
There was a problem hiding this comment.
improves resource consumption on server side that might also lag behind client side
| pass | ||
|
|
||
|
|
||
| class NonRetryableError(Exception): |
There was a problem hiding this comment.
this is a new helper, that is present in Temporal but not us, where we can defined errors that are non-retryable so activities don't attempt to retry when raised
| next_delay_f, self._retry_policy.max_retry_interval.total_seconds() | ||
| ) | ||
| return timedelta(seconds=next_delay_f) | ||
| return timedelta(seconds=next_delay_f) |
There was a problem hiding this comment.
this fixes a bug with retry, as the login in line 400 above f datetime.utcnow() < retry_expiration: means that we should retry, but as this was badly indented if for some reason max_retry_interval is not none this was not working.
There was a problem hiding this comment.
this is also kind of mentioned in one of the gotchas in dapr/python-sdk#836, I found this bug beforehand, the other gotchas are gotchas or not-explained behavior
There was a problem hiding this comment.
added some info in README to cover the gotchas, but we might need to add to python-sdk
examples/components/statestore.yaml
Outdated
| @@ -0,0 +1,16 @@ | ||
| apiVersion: dapr.io/v1alpha1 |
There was a problem hiding this comment.
needed for e2e tests with dapr that should substitute durabletask-go tests with dapr setup
df310f8 to
d9ed06e
Compare
|
@acroca ptal |
d9ed06e to
7321905
Compare
durabletask/client.py
Outdated
| res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion( | ||
| req, timeout=grpc_timeout | ||
| ) |
There was a problem hiding this comment.
I don't understand this. grpc_timeout is set to None in both 0 and None cases but if I understand correctly, when timeout is None we wait forever, but timeout 0 won't wait at all, right?
There was a problem hiding this comment.
well the current behavior has not change a timeout of 0 still means wait forever. that kind of make sense, why would you call this function to not wait
durabletask/client.py
Outdated
| if current_state and current_state.runtime_status in [ | ||
| OrchestrationStatus.COMPLETED, | ||
| OrchestrationStatus.FAILED, | ||
| OrchestrationStatus.TERMINATED, | ||
| ]: |
There was a problem hiding this comment.
From https://github.com/dapr/durabletask-go/blob/7f28b2408db77ed48b1b03ecc71624fc456ccca3/api/orchestration.go#L196-L201, CANCELLED is also a condition for a workflow to be considered in a terminal state.
But what's the reason for this check? Why not just call the WaitForInstanceCompletion? You are still sending a call to the runtime to get the current state.
There was a problem hiding this comment.
it is maybe a premature optimization, but on python for things that are closed quickly use polls without taking server longer running streaming https://grpc.io/docs/guides/performance/#python
durabletask/task.py
Outdated
| if isinstance(t, str): | ||
| if t: |
There was a problem hiding this comment.
can't we check it all at once?
| if isinstance(t, str): | |
| if t: | |
| if isinstance(t, str) and len(t)>0: |
durabletask/worker.py
Outdated
| self._channel_options = channel_options | ||
| self._stop_timeout = stop_timeout | ||
| # Track in-flight activity executions for graceful draining | ||
| import threading as _threading |
There was a problem hiding this comment.
Move this import to the top of the file 🙏
| current_reader_thread.start() | ||
| loop = asyncio.get_running_loop() | ||
| while not self._shutdown.is_set(): | ||
| try: |
There was a problem hiding this comment.
I don't see why this try was removed. If I understand correctly, the exceptions that were captured here will now be captured outside of the while, right? Why is this preferred now?
There was a problem hiding this comment.
mainly reduce extra logging/indentation self._logger.warning(f"Error in work item stream: {e}"). I could put it back, I think it was just bothering me with the extra duplicated messages that were not helping me
There was a problem hiding this comment.
there's not really much before this point and the other try, maybe GetWorkItems
durabletask/worker.py
Outdated
| """ | ||
| end: Optional[float] = None | ||
| if timeout is not None: | ||
| import time as _t |
There was a problem hiding this comment.
Move all the imports to the top please
|
@acroca ptal |
…noisy logs Signed-off-by: Filinto Duran <1373693+filintod@users.noreply.github.com>
763ef39 to
c513506
Compare
tests/durabletask/test_client.py
Outdated
| mock_channel.close.side_effect = Exception("close failed") | ||
| mock_get_channel.return_value = mock_channel | ||
|
|
||
| from durabletask import client |
durabletask/worker.py
Outdated
| except grpc.RpcError as rpc_error: # type: ignore | ||
| # Treat common shutdown/termination races as benign to avoid noisy logs | ||
| code = rpc_error.code() # type: ignore | ||
| details = str(rpc_error) | ||
| benign = code in { | ||
| grpc.StatusCode.CANCELLED, | ||
| grpc.StatusCode.UNAVAILABLE, | ||
| grpc.StatusCode.UNKNOWN, | ||
| } and ( | ||
| "unknown instance ID/task ID combo" in details | ||
| or "Channel closed" in details | ||
| or "Locally cancelled by application" in details | ||
| ) | ||
| if self._shutdown.is_set() or benign: | ||
| self._logger.debug( | ||
| f"Ignoring activity completion delivery error during shutdown/benign condition: {rpc_error}" | ||
| ) | ||
| else: | ||
| self._logger.exception( | ||
| f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {rpc_error}" | ||
| ) |
There was a problem hiding this comment.
Can we combine this logic with the other one that looks very similar?
durabletask/worker.py
Outdated
|
|
||
| self._async_worker_manager = _AsyncWorkerManager(self._concurrency_options) | ||
| # Readiness flag set once the worker has an active stream to the sidecar | ||
| self._ready = Event() |
| actions = result.actions | ||
| complete_action = get_and_validate_single_complete_orchestration_action(actions) | ||
| assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED | ||
| assert complete_action.failureDetails.errorMessage.__contains__("Activity task #1 failed: boom") |
There was a problem hiding this comment.
It'd be good to test the activity have been called exactly once, to make sure is not retrying
There was a problem hiding this comment.
actually, this test only check the event processing pipeline and making sure that when you send a nonretryable error in the event loop to be process it fails the workflow. There is a test after that checks that when a retryable error is raised a new timer is created (line 1526 test_activity_generic_exception_is_retryable)
|
@acroca ptal |
Signed-off-by: Filinto Duran <1373693+filintod@users.noreply.github.com>
This is a split from asyncio PR #13 . Removing changes not related to asyncio changes