Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3652 [FIX] PG orchestration failure — reconcile file counters + surface error to UI#2122

Merged
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3652-pg-orchestration-failure-uxZipstack/unstract:UN-3652-pg-orchestration-failure-uxCopy head branch name to clipboard
Jun 29, 2026
Merged

UN-3652 [FIX] PG orchestration failure — reconcile file counters + surface error to UI#2122
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3652-pg-orchestration-failure-uxZipstack/unstract:UN-3652-pg-orchestration-failure-uxCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

When a PG-routed orchestration fails (e.g. the barrier-enqueue blip from UN-3651, or any failure after total_files is set), two UX gaps remained:

  • (B) The execution went ERROR but its files showed as perpetually "in progress" — the UI derives in-progress as total − successful − failed, and successful/failed were left NULL.
  • (C) The failure reason never reached the UI (only workflow_execution.error_message + worker logs).

Fix — gated behind is_pg_transport (Celery branch byte-identical)

  • New pure helper WorkflowOrchestrationUtils.pg_failure_file_counts(transport, total_files):
    • PG → {total_files, successful_files: 0, failed_files: total} → a failed run reads "N failed" (in-progress = 0).
    • non-PG → {} (the gate — Celery path unchanged).
    • total_files is included deliberately: the backend update_status serializer rejects file aggregates without it ("total_files is required when file aggregates are provided"; also successful + failed <= total).
  • general/tasks.py _execute_general_workflow + api-deployment/tasks.py _run_workflow_api failure handlers: on PG, pass the counts to update_workflow_execution_status (B) and emit the error to the UI/WS via the workflow logger (C). The api path resolves transport defensively (it's assigned inside the try).

Non-regression

Every new line is inside if is_pg_transport(transport) / the helper returns {} for non-PG, so the Celery failure path is byte-identical. PG is still in dev/integration.

Verification

  • Unit: +14 helper/gating tests (incl. serializer-constraint guards: total_files present, successful + failed <= total). Related suites green; ruff clean.
  • Live E2E on a rebuilt image (flag on, real API deployment, pg_barrier_state renamed to force a non-retryable orchestration failure):
    • B: failed PG run → total=1, successful=0, failed=1 → UI in-progress = 0.
    • C: ❌ Workflow orchestration failed: … published to the execution's WebSocket log channel.
    • The E2E caught a real bug a unit test missed: the first cut omitted total_files and the backend rejected the update with 400 — fixed and re-verified on a clean image build.

Ref: UN-3652

…rface error to UI

When a PG-routed orchestration fails (e.g. the barrier-enqueue blip, or any
failure after total_files is set), two UX gaps remained: the execution went ERROR
but its files showed as perpetually "in progress" (UI = total - successful -
failed, with successful/failed left NULL), and the failure reason never reached
the UI (only workflow_execution.error_message + worker logs).

Fix, gated behind is_pg_transport (Celery branch byte-identical):
- New pure helper WorkflowOrchestrationUtils.pg_failure_file_counts(transport,
  total_files): PG -> {total_files, successful_files: 0, failed_files: total}
  so a failed run reads "N failed" (in-progress = 0); non-PG -> {} (the gate).
  total_files is included because the backend update_status serializer rejects
  file aggregates without it ("total_files is required when file aggregates are
  provided"; also successful + failed <= total).
- general/tasks.py _execute_general_workflow + api-deployment/tasks.py
  _run_workflow_api failure handlers: on PG, pass the counts to
  update_workflow_execution_status (B) and emit the error to the UI/WS via the
  workflow logger (C). api resolves transport defensively (assigned inside try).

Tests: +14 helper/gating unit tests (incl. serializer-constraint guards);
related suites green; ruff clean.

Dev-tested live E2E on a rebuilt image (flag on, real benchmark API deployment,
barrier table renamed to force a non-retryable orchestration failure):
- B: failed PG run -> total=1, successful=0, failed=1 (UI in-progress = 0).
- C: "❌ Workflow orchestration failed: ..." published to the execution's WS log.
The E2E caught a real bug a unit test missed: the first cut omitted total_files
and the backend rejected the update with 400 — fixed and re-verified.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9f8a73bc-f59e-49f8-a6cc-716fee040999

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3652-pg-orchestration-failure-ux

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@greptile-apps

greptile-apps Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes two UX gaps for PG-routed orchestration failures: file counters no longer stay "in progress" after an error, and the failure reason is now surfaced to the UI via WebSocket logs. Both fixes are gated behind is_pg_transport, leaving the Celery path byte-identical.

  • A new record_pg_orchestration_failure helper centralises the PG failure path; it is fully best-effort — both the UI-logging and the update_workflow_execution_status call are wrapped so neither can mask the original exception or skip the caller's re-raise.
  • In api-deployment/tasks.py, transport resolution is moved before the try block so it is always in scope when the except handler fires.

Confidence Score: 5/5

Safe to merge — the new PG failure path is well-guarded, every new line is behind is_pg_transport, and the Celery branch is byte-identical to before.

All new code is gated on is_pg_transport, so existing Celery behavior is unchanged. The shared helper is fully defensive (two independent try/except guards), the transport-resolution fix in api-deployment ensures the except handler always has a valid value, and the test suite explicitly pins the no-raise invariant. No functional defects were found in the changed paths.

No files require special attention. The test file’s kwarg-compatibility guard is worth a quick check, but it is not blocking.

Important Files Changed

Filename Overview
workers/shared/workflow/execution/orchestration_utils.py Adds record_pg_orchestration_failure: guarded against both WS-logging and status-update failures, never raises, always allows caller's re-raise to proceed. Logic is clean and the docstring accurately describes the invariants.
workers/api-deployment/tasks.py Transport resolution moved before the try block so it is always in scope for the except handler; PG path correctly delegates to the guarded helper; Celery else-branch is byte-identical to the pre-PR code.
workers/general/tasks.py PG failure path delegates to the new helper; Celery path (with its own try/except wrapper) is unchanged; total_files is always assigned before the orchestration try-block so the except handler has a valid value.
workers/tests/test_pg_orchestration_failure.py 14 tests cover counter reconciliation, UI-error surfacing, best-effort guarantees, and kwarg compatibility with the real InternalAPIClient signature. test_count_keys_are_valid_update_status_kwargs relies on bind_partial raising TypeError on unknown kwargs — valid pattern but silently passes if the method accepts **kwargs.

Reviews (2): Last reviewed commit: "UN-3652 [FIX] address PR #2122 review — ..." | Re-trigger Greptile

Comment thread workers/api-deployment/tasks.py Outdated

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier). Findings below are inline. The greptile P1 on the api handler's unwrapped status update is intentionally not duplicated.

Comment thread workers/api-deployment/tasks.py Outdated
# the try; successful=0/failed=total zeroes the UI's in-progress).
# `transport` is assigned inside the try (~L695); resolve it defensively so
# a failure before that point still fails closed to the Celery (no-op) path.
transport = locals().get("transport", DEFAULT_WORKFLOW_TRANSPORT)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[High] locals().get("transport") blinds the PG handler to early orchestration failures — the UN-3652 symptom can still occur.

transport is only bound as a local at L696 (transport = normalize_transport(...)), inside the try (starts L675). If the exception fires between L675 and L696 — e.g. in _get_callback_queue_name_api(), FairnessKey(...), or normalize_transport itself — then "transport" is not in locals(), this falls back to DEFAULT_WORKFLOW_TRANSPORT (celery), and is_pg_transport() returns False. For a genuine PG run that fails early the UI error publish is skipped and failure_counts is {}, leaving files perpetually "in progress" — exactly what this PR is meant to fix. The real transport is knowable from kwargs.get("transport") from the function's first line, so the "fail-closed to Celery" rationale doesn't hold here.

Secondary risk: reading via locals().get(...) defeats static analysis — a future rename of transport silently disables the whole PG path instead of raising NameError.

Fix: hoist transport above the try from the authoritative source and reference it directly (matching general/tasks.py, where transport is a normalized parameter), then drop this line and the duplicate assignment at L696.

Comment thread workers/general/tasks.py Outdated
# persisted at orchestration start; successful=0/failed=total).
if is_pg_transport(transport):
try:
workflow_logger.log_error(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium] workflow_logger.log_error(...) is not None-guarded here — asymmetric with the api handler.

The api handler guards its logger (if error_logger:, api-deployment/tasks.py:828), but here workflow_logger.log_error(...) is called directly. workflow_logger is nullable and other sites in this file guard it (if workflow_logger:). If it is None, the resulting AttributeError is swallowed by the surrounding except and downgraded to a generic "Failed to publish error to UI logs" warning — the user gets no UI error and the cause looks like a transient publish hiccup. Guard it explicitly: if workflow_logger: workflow_logger.log_error(...).

Comment thread workers/general/tasks.py Outdated
error_message=str(e),
**failure_counts,
)
except Exception as status_error:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium] A failed status update silently drops the counter reconciliation, and this handler never re-raises.

The PR now routes the entire reconciliation (**failure_counts) through this update_workflow_execution_status call. If it raises, the except swallows it to a warning, so both the ERROR status and the successful=0/failed=total reconciliation are lost. Unlike the api handler, this path does not re-raise (it builds error_response and continues at L785), so there is no second chance for the failure to surface — the execution is left with total_files set and files perpetually "in progress", the exact UN-3652 state. Consider logging this at error (with an error id) rather than warning, since a failed terminal-status write is a real defect. (Distinct from the greptile P1 on the api handler, which is about the missing wrapper.)

Comment thread workers/api-deployment/tasks.py Outdated
except Exception as log_error:
logger.warning(f"Failed to publish error to UI logs: {log_error}")
failure_counts = WorkflowOrchestrationUtils.pg_failure_file_counts(
transport, total_files

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] Cache-resolved files are reported as failed.

Here total_files = len(files_to_send) includes file-history cache hits (cached_count = len(cached_results); cf. the "Send all files, not just non-cached ones" comment). pg_failure_file_counts returns successful=0, failed=total_files, so files already resolved from cache are counted as failed, inflating the "N failed" figure on the run. The in-progress=0 goal is still met, but the number is misleading. Consider computing successful_files = cached_count, failed_files = total_files - cached_count at this call site (the general path has no cache-on-send concept).

"""Centralized workflow orchestration patterns and utilities."""

@staticmethod
def pg_failure_file_counts(transport: str, total_files: int) -> dict[str, int]:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] Return contract is stringly-coupled to a remote kwarg signature, with no test at the seam.

The three keys (total_files, successful_files, failed_files) must exactly match the parameter names of InternalAPIClient.update_workflow_execution_status because they are spread via **failure_counts. A rename/typo on either side surfaces only as a runtime TypeError, and only on the rare PG-failure path. The unit tests assert the literal dict but never that the keys are valid kwargs of the consumer, so they would stay green through a real break; dict[str, int] also can't express the successful + failed <= total invariant. Cheapest insurance: a regression test binding the result against the real signature, e.g. inspect.signature(InternalAPIClient.update_workflow_execution_status).bind_partial(execution_id=..., status=..., **counts). Optionally document the shape with a TypedDict (it still can't encode the arithmetic invariant, so prioritize the test). transport: str is correct as-is — the whole transport seam traffics in canonical strings.

Comment thread workers/tests/test_pg_failure_counts.py Outdated
_CELERY = WorkflowTransport.CELERY.value


class TestPgFailureFileCounts:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[High — test coverage] The behavioral change of this PR is untested; these tests cover only the pure helper.

The new tests exercise pg_failure_file_counts (a dict literal behind an is_pg_transport gate), but the actual change — the two rewritten except handlers (error surfacing to UI, the locals().get fallback, **failure_countsupdate_workflow_execution_status, the nested try/except) — has zero coverage. A regression there silently reintroduces the "files stuck in progress" bug.

Highest-value addition: a parametrized (PG vs Celery) unit test of each handler with a mocked api_client that forces the orchestration call to raise, asserting (PG) update_workflow_execution_status is called with status=ERROR, total_files=N, successful_files=0, failed_files=N, and (Celery) with no file-aggregate kwargs (proving "byte-identical" at the call site, not just in the helper). Also worth covering: the UI-logger raising must not prevent the status update / re-raise.

Minor: test_pg_counts_satisfy_serializer_constraints re-implements the serializer rule inline instead of invoking the real validator, and test_pg_marks_attempted_files_failed / test_result_zeroes_ui_in_progress largely restate the literal (they only fail if someone edits the dict, not if behavior breaks). The genuinely useful cases are the Celery/non-PG no-op tests.

Comment thread workers/api-deployment/tasks.py Outdated
Comment on lines +814 to +818
# (C) surface the error to the UI/WS execution logs, and
# (B) reconcile counters — mark the attempted files failed so the run
# reads "N failed", not "N in progress" (total_files is bound above
# the try; successful=0/failed=total zeroes the UI's in-progress).
# `transport` is assigned inside the try (~L695); resolve it defensively so

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] Comment rot: orphan (B)/(C) labels and a hard-coded line reference.

  • (C)/(B) (also in general/tasks.py and the test module docstring) reference subtasks of the external Jira plan — there is no (A), they are out of order, and nothing in the code defines them, so they become meaningless once the ticket closes. Drop the letters or renumber as self-contained (1)/(2) in logical order.
  • ~L695 (L818) is already off by one (the assignment is at L696) and will drift further with any edit. Reference it symbolically: "assigned by the normalize_transport(...) call inside the try".

Also: this ~10-line rationale is near-duplicated in general/tasks.py:757-764 and already diverging ("bound above the try" vs "persisted at orchestration start"). Consider trimming both to a one-line pointer to the pg_failure_file_counts docstring, which already carries the full rationale. (The factual claims themselves — serializer constraint, total_files binding — were verified accurate.)

…ansport, test handler

Review (greptile P1 + PR-review-toolkit). Changes:

- Extract the PG failure recording into one tested seam,
  WorkflowOrchestrationUtils.record_pg_orchestration_failure (replaces
  pg_failure_file_counts). It (C) surfaces the error to the UI logger (guarded)
  and (B) reconciles counters via update_status, with the update WRAPPED in
  try/except so a status-update failure (e.g. a 400 serializer error) is logged
  but never re-raised — it can no longer mask the original orchestration error or
  skip the caller's re-raise (greptile P1). Never raises.
- api-deployment: resolve `transport` ONCE up front from kwargs (above the try)
  and reference it directly, instead of `locals().get("transport", ...)` which
  fell back to celery for a genuine PG run that failed BEFORE the in-try
  assignment — re-introducing the exact "stuck in progress" symptom. Dropped the
  duplicate normalize_transport.
- Both handlers now branch: PG -> record_pg_orchestration_failure; else -> the
  original bare status update, so the Celery path stays byte-identical.
- general: the UI-logger call is now None-guarded inside the helper (was an
  unguarded workflow_logger.log_error).
- Tests: replace the helper-only dict tests with behaviour tests of the recorder
  (mocked api_client/logger): counter reconciliation + total_files, error
  surfacing, logger-hiccup-doesn't-block-update, update-failure-swallowed-not-
  raised, and a kwarg-seam guard binding the counts against
  InternalAPIClient.update_workflow_execution_status's signature.

Re-verified live E2E on the refactored handler (renamed pg_barrier_state to force
a non-retryable failure): PG run -> ERROR total=1/ok=0/fail=1 (in-progress 0) +
"❌ Workflow orchestration failed" published to the WS log channel.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Thanks for the deep review 🙏 — addressed in ebca67604. The core fix: extracted the PG failure-recording into one tested seam, WorkflowOrchestrationUtils.record_pg_orchestration_failure, so the handlers shrink to if is_pg_transport: record(...) / else: <original bare update>.

greptile P1 + [High] — unguarded api status update masks the original errorFixed. The update_status call now lives inside record_*, wrapped in try/except: a rejected update (e.g. the 400 my own E2E hit) is logged at error and not re-raised, so it can never mask the orchestration error or skip the handler's re-raise.

[High] — locals().get("transport") blinds early PG failuresFixed. transport is now resolved once up front from kwargs (above the try) and referenced directly; the in-try duplicate normalize_transport is removed. A PG run that fails before fan-out now gets PG handling, not a celery fallback.

[Medium] — general UI-logger not None-guardedFixed (guarded inside the helper: if workflow_logger).

[Medium] — failed status update silently drops reconciliation → the update failure is now logged at error (was warning) in the helper. The general handler's "no re-raise / build error_response" control flow is pre-existing and unchanged, but the failure is now visible.

[High — test coverage] — behavior untestedFixed. Replaced the helper-only dict tests with behavior tests of the recorder (mocked api_client/logger): counter reconciliation incl. total_files, error surfacing, UI-logger-hiccup-doesn't-block-update, update-failure-swallowed-not-raised (the P1 guarantee), and a kwarg-seam guard binding the counts against InternalAPIClient.update_workflow_execution_status's signature (your [Low] type-coupling point). Celery byte-identical is now structural: record_* is only called on the PG branch, so the Celery path never sends file aggregates.

[Low] — comment rot / (B)(C) / hard-coded line refs → trimmed the ~10-line duplicated rationale in both handlers to one-line pointers to the helper docstring; dropped the line number.

[Low] — cache-resolved files counted as faileddeferred with rationale. On an orchestration failure nothing was delivered (the callback that pushes cached results never ran), so counting all attempted files (incl. cache hits) as failed is defensible, and the in-progress=0 goal holds. A successful_files = cached_count split needs safe scoping of cached_count in the except (same unbound-var class as the transport issue) — happy to do it as a small follow-up if you'd prefer the more precise number.

Re-verified live E2E on the refactored handler (renamed pg_barrier_state to force a non-retryable failure): PG run → ERROR total=1/ok=0/fail=1 (in-progress 0) + ❌ Workflow orchestration failed published to the WS log channel. 82 related tests green, ruff clean.

@muhammad-ali-e muhammad-ali-e merged commit 5d07155 into feat/UN-3445-pg-queue-integration Jun 29, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3652-pg-orchestration-failure-ux branch June 29, 2026 13:19
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.