UN-3652 [FIX] PG orchestration failure — reconcile file counters + surface error to UI#2122
UN-3652 [FIX] PG orchestration failure — reconcile file counters + surface error to UI#2122muhammad-ali-e merged 2 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3652-pg-orchestration-failure-uxZipstack/unstract:UN-3652-pg-orchestration-failure-uxCopy head branch name to clipboard
Conversation
…rface error to UI
When a PG-routed orchestration fails (e.g. the barrier-enqueue blip, or any
failure after total_files is set), two UX gaps remained: the execution went ERROR
but its files showed as perpetually "in progress" (UI = total - successful -
failed, with successful/failed left NULL), and the failure reason never reached
the UI (only workflow_execution.error_message + worker logs).
Fix, gated behind is_pg_transport (Celery branch byte-identical):
- New pure helper WorkflowOrchestrationUtils.pg_failure_file_counts(transport,
total_files): PG -> {total_files, successful_files: 0, failed_files: total}
so a failed run reads "N failed" (in-progress = 0); non-PG -> {} (the gate).
total_files is included because the backend update_status serializer rejects
file aggregates without it ("total_files is required when file aggregates are
provided"; also successful + failed <= total).
- general/tasks.py _execute_general_workflow + api-deployment/tasks.py
_run_workflow_api failure handlers: on PG, pass the counts to
update_workflow_execution_status (B) and emit the error to the UI/WS via the
workflow logger (C). api resolves transport defensively (assigned inside try).
Tests: +14 helper/gating unit tests (incl. serializer-constraint guards);
related suites green; ruff clean.
Dev-tested live E2E on a rebuilt image (flag on, real benchmark API deployment,
barrier table renamed to force a non-retryable orchestration failure):
- B: failed PG run -> total=1, successful=0, failed=1 (UI in-progress = 0).
- C: "❌ Workflow orchestration failed: ..." published to the execution's WS log.
The E2E caught a real bug a unit test missed: the first cut omitted total_files
and the backend rejected the update with 400 — fixed and re-verified.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
| Filename | Overview |
|---|---|
| workers/shared/workflow/execution/orchestration_utils.py | Adds record_pg_orchestration_failure: guarded against both WS-logging and status-update failures, never raises, always allows caller's re-raise to proceed. Logic is clean and the docstring accurately describes the invariants. |
| workers/api-deployment/tasks.py | Transport resolution moved before the try block so it is always in scope for the except handler; PG path correctly delegates to the guarded helper; Celery else-branch is byte-identical to the pre-PR code. |
| workers/general/tasks.py | PG failure path delegates to the new helper; Celery path (with its own try/except wrapper) is unchanged; total_files is always assigned before the orchestration try-block so the except handler has a valid value. |
| workers/tests/test_pg_orchestration_failure.py | 14 tests cover counter reconciliation, UI-error surfacing, best-effort guarantees, and kwarg compatibility with the real InternalAPIClient signature. test_count_keys_are_valid_update_status_kwargs relies on bind_partial raising TypeError on unknown kwargs — valid pattern but silently passes if the method accepts **kwargs. |
Reviews (2): Last reviewed commit: "UN-3652 [FIX] address PR #2122 review — ..." | Re-trigger Greptile
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier). Findings below are inline. The greptile P1 on the api handler's unwrapped status update is intentionally not duplicated.
| # the try; successful=0/failed=total zeroes the UI's in-progress). | ||
| # `transport` is assigned inside the try (~L695); resolve it defensively so | ||
| # a failure before that point still fails closed to the Celery (no-op) path. | ||
| transport = locals().get("transport", DEFAULT_WORKFLOW_TRANSPORT) |
There was a problem hiding this comment.
[High] locals().get("transport") blinds the PG handler to early orchestration failures — the UN-3652 symptom can still occur.
transport is only bound as a local at L696 (transport = normalize_transport(...)), inside the try (starts L675). If the exception fires between L675 and L696 — e.g. in _get_callback_queue_name_api(), FairnessKey(...), or normalize_transport itself — then "transport" is not in locals(), this falls back to DEFAULT_WORKFLOW_TRANSPORT (celery), and is_pg_transport() returns False. For a genuine PG run that fails early the UI error publish is skipped and failure_counts is {}, leaving files perpetually "in progress" — exactly what this PR is meant to fix. The real transport is knowable from kwargs.get("transport") from the function's first line, so the "fail-closed to Celery" rationale doesn't hold here.
Secondary risk: reading via locals().get(...) defeats static analysis — a future rename of transport silently disables the whole PG path instead of raising NameError.
Fix: hoist transport above the try from the authoritative source and reference it directly (matching general/tasks.py, where transport is a normalized parameter), then drop this line and the duplicate assignment at L696.
| # persisted at orchestration start; successful=0/failed=total). | ||
| if is_pg_transport(transport): | ||
| try: | ||
| workflow_logger.log_error( |
There was a problem hiding this comment.
[Medium] workflow_logger.log_error(...) is not None-guarded here — asymmetric with the api handler.
The api handler guards its logger (if error_logger:, api-deployment/tasks.py:828), but here workflow_logger.log_error(...) is called directly. workflow_logger is nullable and other sites in this file guard it (if workflow_logger:). If it is None, the resulting AttributeError is swallowed by the surrounding except and downgraded to a generic "Failed to publish error to UI logs" warning — the user gets no UI error and the cause looks like a transient publish hiccup. Guard it explicitly: if workflow_logger: workflow_logger.log_error(...).
| error_message=str(e), | ||
| **failure_counts, | ||
| ) | ||
| except Exception as status_error: |
There was a problem hiding this comment.
[Medium] A failed status update silently drops the counter reconciliation, and this handler never re-raises.
The PR now routes the entire reconciliation (**failure_counts) through this update_workflow_execution_status call. If it raises, the except swallows it to a warning, so both the ERROR status and the successful=0/failed=total reconciliation are lost. Unlike the api handler, this path does not re-raise (it builds error_response and continues at L785), so there is no second chance for the failure to surface — the execution is left with total_files set and files perpetually "in progress", the exact UN-3652 state. Consider logging this at error (with an error id) rather than warning, since a failed terminal-status write is a real defect. (Distinct from the greptile P1 on the api handler, which is about the missing wrapper.)
| except Exception as log_error: | ||
| logger.warning(f"Failed to publish error to UI logs: {log_error}") | ||
| failure_counts = WorkflowOrchestrationUtils.pg_failure_file_counts( | ||
| transport, total_files |
There was a problem hiding this comment.
[Low] Cache-resolved files are reported as failed.
Here total_files = len(files_to_send) includes file-history cache hits (cached_count = len(cached_results); cf. the "Send all files, not just non-cached ones" comment). pg_failure_file_counts returns successful=0, failed=total_files, so files already resolved from cache are counted as failed, inflating the "N failed" figure on the run. The in-progress=0 goal is still met, but the number is misleading. Consider computing successful_files = cached_count, failed_files = total_files - cached_count at this call site (the general path has no cache-on-send concept).
| """Centralized workflow orchestration patterns and utilities.""" | ||
|
|
||
| @staticmethod | ||
| def pg_failure_file_counts(transport: str, total_files: int) -> dict[str, int]: |
There was a problem hiding this comment.
[Low] Return contract is stringly-coupled to a remote kwarg signature, with no test at the seam.
The three keys (total_files, successful_files, failed_files) must exactly match the parameter names of InternalAPIClient.update_workflow_execution_status because they are spread via **failure_counts. A rename/typo on either side surfaces only as a runtime TypeError, and only on the rare PG-failure path. The unit tests assert the literal dict but never that the keys are valid kwargs of the consumer, so they would stay green through a real break; dict[str, int] also can't express the successful + failed <= total invariant. Cheapest insurance: a regression test binding the result against the real signature, e.g. inspect.signature(InternalAPIClient.update_workflow_execution_status).bind_partial(execution_id=..., status=..., **counts). Optionally document the shape with a TypedDict (it still can't encode the arithmetic invariant, so prioritize the test). transport: str is correct as-is — the whole transport seam traffics in canonical strings.
| _CELERY = WorkflowTransport.CELERY.value | ||
|
|
||
|
|
||
| class TestPgFailureFileCounts: |
There was a problem hiding this comment.
[High — test coverage] The behavioral change of this PR is untested; these tests cover only the pure helper.
The new tests exercise pg_failure_file_counts (a dict literal behind an is_pg_transport gate), but the actual change — the two rewritten except handlers (error surfacing to UI, the locals().get fallback, **failure_counts → update_workflow_execution_status, the nested try/except) — has zero coverage. A regression there silently reintroduces the "files stuck in progress" bug.
Highest-value addition: a parametrized (PG vs Celery) unit test of each handler with a mocked api_client that forces the orchestration call to raise, asserting (PG) update_workflow_execution_status is called with status=ERROR, total_files=N, successful_files=0, failed_files=N, and (Celery) with no file-aggregate kwargs (proving "byte-identical" at the call site, not just in the helper). Also worth covering: the UI-logger raising must not prevent the status update / re-raise.
Minor: test_pg_counts_satisfy_serializer_constraints re-implements the serializer rule inline instead of invoking the real validator, and test_pg_marks_attempted_files_failed / test_result_zeroes_ui_in_progress largely restate the literal (they only fail if someone edits the dict, not if behavior breaks). The genuinely useful cases are the Celery/non-PG no-op tests.
| # (C) surface the error to the UI/WS execution logs, and | ||
| # (B) reconcile counters — mark the attempted files failed so the run | ||
| # reads "N failed", not "N in progress" (total_files is bound above | ||
| # the try; successful=0/failed=total zeroes the UI's in-progress). | ||
| # `transport` is assigned inside the try (~L695); resolve it defensively so |
There was a problem hiding this comment.
[Low] Comment rot: orphan (B)/(C) labels and a hard-coded line reference.
(C)/(B)(also ingeneral/tasks.pyand the test module docstring) reference subtasks of the external Jira plan — there is no(A), they are out of order, and nothing in the code defines them, so they become meaningless once the ticket closes. Drop the letters or renumber as self-contained(1)/(2)in logical order.~L695(L818) is already off by one (the assignment is at L696) and will drift further with any edit. Reference it symbolically: "assigned by thenormalize_transport(...)call inside thetry".
Also: this ~10-line rationale is near-duplicated in general/tasks.py:757-764 and already diverging ("bound above the try" vs "persisted at orchestration start"). Consider trimming both to a one-line pointer to the pg_failure_file_counts docstring, which already carries the full rationale. (The factual claims themselves — serializer constraint, total_files binding — were verified accurate.)
…ansport, test handler
Review (greptile P1 + PR-review-toolkit). Changes:
- Extract the PG failure recording into one tested seam,
WorkflowOrchestrationUtils.record_pg_orchestration_failure (replaces
pg_failure_file_counts). It (C) surfaces the error to the UI logger (guarded)
and (B) reconciles counters via update_status, with the update WRAPPED in
try/except so a status-update failure (e.g. a 400 serializer error) is logged
but never re-raised — it can no longer mask the original orchestration error or
skip the caller's re-raise (greptile P1). Never raises.
- api-deployment: resolve `transport` ONCE up front from kwargs (above the try)
and reference it directly, instead of `locals().get("transport", ...)` which
fell back to celery for a genuine PG run that failed BEFORE the in-try
assignment — re-introducing the exact "stuck in progress" symptom. Dropped the
duplicate normalize_transport.
- Both handlers now branch: PG -> record_pg_orchestration_failure; else -> the
original bare status update, so the Celery path stays byte-identical.
- general: the UI-logger call is now None-guarded inside the helper (was an
unguarded workflow_logger.log_error).
- Tests: replace the helper-only dict tests with behaviour tests of the recorder
(mocked api_client/logger): counter reconciliation + total_files, error
surfacing, logger-hiccup-doesn't-block-update, update-failure-swallowed-not-
raised, and a kwarg-seam guard binding the counts against
InternalAPIClient.update_workflow_execution_status's signature.
Re-verified live E2E on the refactored handler (renamed pg_barrier_state to force
a non-retryable failure): PG run -> ERROR total=1/ok=0/fail=1 (in-progress 0) +
"❌ Workflow orchestration failed" published to the WS log channel.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Thanks for the deep review 🙏 — addressed in greptile P1 + [High] — unguarded api status update masks the original error → Fixed. The [High] — [Medium] — general UI-logger not None-guarded → Fixed (guarded inside the helper: [Medium] — failed status update silently drops reconciliation → the update failure is now logged at [High — test coverage] — behavior untested → Fixed. Replaced the helper-only dict tests with behavior tests of the recorder (mocked [Low] — comment rot / [Low] — cache-resolved files counted as failed → deferred with rationale. On an orchestration failure nothing was delivered (the callback that pushes cached results never ran), so counting all attempted files (incl. cache hits) as failed is defensible, and the in-progress=0 goal holds. A Re-verified live E2E on the refactored handler (renamed |
5d07155
into
feat/UN-3445-pg-queue-integration
|
What
When a PG-routed orchestration fails (e.g. the barrier-enqueue blip from UN-3651, or any failure after
total_filesis set), two UX gaps remained:ERRORbut its files showed as perpetually "in progress" — the UI derives in-progress astotal − successful − failed, andsuccessful/failedwere left NULL.workflow_execution.error_message+ worker logs).Fix — gated behind
is_pg_transport(Celery branch byte-identical)WorkflowOrchestrationUtils.pg_failure_file_counts(transport, total_files):{total_files, successful_files: 0, failed_files: total}→ a failed run reads "N failed" (in-progress = 0).{}(the gate — Celery path unchanged).total_filesis included deliberately: the backendupdate_statusserializer rejects file aggregates without it ("total_files is required when file aggregates are provided"; alsosuccessful + failed <= total).general/tasks.py_execute_general_workflow+api-deployment/tasks.py_run_workflow_apifailure handlers: on PG, pass the counts toupdate_workflow_execution_status(B) and emit the error to the UI/WS via the workflow logger (C). The api path resolvestransportdefensively (it's assigned inside thetry).Non-regression
Every new line is inside
if is_pg_transport(transport)/ the helper returns{}for non-PG, so the Celery failure path is byte-identical. PG is still in dev/integration.Verification
total_filespresent,successful + failed <= total). Related suites green; ruff clean.pg_barrier_staterenamed to force a non-retryable orchestration failure):total=1, successful=0, failed=1→ UI in-progress = 0.❌ Workflow orchestration failed: …published to the execution's WebSocket log channel.total_filesand the backend rejected the update with400— fixed and re-verified on a clean image build.Ref: UN-3652