Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3564 [FEAT] PG Queue 9e — reaper-recovery: mark stranded executions ERROR (gates PR 3)#2070

Merged
muhammad-ali-e merged 7 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3564-pg-reaper-recoveryZipstack/unstract:UN-3564-pg-reaper-recoveryCopy head branch name to clipboard
Jun 17, 2026
Merged

UN-3564 [FEAT] PG Queue 9e — reaper-recovery: mark stranded executions ERROR (gates PR 3)#2070
muhammad-ali-e merged 7 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3564-pg-reaper-recoveryZipstack/unstract:UN-3564-pg-reaper-recoveryCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

The reaper today only DELETEs expired pg_barrier_state rows — a stranded execution then vanishes silently (stuck in EXECUTING). This makes the reaper recover stranded executions. It's the hard dependency surfaced in PR 2c review (#2069) for enabling the PG transport: the un-catchable strand windows (a worker SIGKILL mid-batch; a crash after the final decrement but before the callback enqueues) otherwise bottom out at the ~6h barrier expiry.

How it works

For each pg_barrier_state row past expires_at, the leader (recover_expired_barriers), best-effort + per-execution:

  1. Marks the execution ERROR via the internal API — the same path the normal callback uses for terminal status (business state never goes direct-DB). The message distinguishes remaining>0 (work incomplete) from remaining==0 (all batches done, callback never fired). It reads status first and skips the mark if the execution is already terminal — a remaining==0 row can belong to a COMPLETED execution whose best-effort row-delete merely failed, and the backend status update has no terminal guard, so a blind mark would corrupt it — or if the row carries no org.
  2. Reclaims the queue-infra rows (pg_batch_dedup + pg_barrier_state) directly in PG — same boundary as the rest of queue_backend.

Recover-then-delete: a failure (e.g. the API unreachable) leaves that barrier row for the next sweep to retry, and never blocks the others (single-leader → no double-claim).

Why the internal API for the ERROR mark (not direct DB)

queue_backend touches only queue-infra tables directly; execution status is business state and is always transitioned via the internal API (the normal callback does exactly this). The API path is functional — it computes execution_time, truncates the error, increments attempts, and is the events/notifications/cache seam — and enforces the multi-tenant boundary. A direct UPDATE would bypass all of that.

Changes

  • Backendorganization_id column on PgBarrierState + migration 0007 (the reaper needs it for the org-scoped status API; it has only execution_id off the row otherwise).
  • WorkersPgBarrier.enqueue stamps organization_id into the barrier UPSERT; PgReaper holds a lazily-built InternalAPIClient; sweep_expired_barriersrecover_expired_barriers.

Tests / dev-test

  • Reaper suite reworked: real-PG recovery with a fake API client covering mark-ERROR (remaining>0 and ==0 messages), terminal-skip (no overwrite), org-missing skip, API-failure-leaves-row-for-retry, dedup reclaim, tick-via-real-conn — plus the existing leadership / lease / heartbeat tests. 109 reaper/barrier/dedup tests green; ruff clean.
  • Dev-test vs the real InternalAPIClient + backend: a stranded PENDING execution was marked ERROR + cleaned up; an already-COMPLETED execution was NOT overwritten ("no status overwrite") + cleaned up — the terminal-guard verified end-to-end.

Deferred (follow-up)

Callback re-fire for the remaining==0 strand (heal → COMPLETED instead of ERROR) needs callback_descriptor stored on the barrier row; until then those strands are marked ERROR. Per-stage re-enqueue of stuck file executions is a larger pipeline-recovery effort beyond this net.

Base

Targets the long-lived feat/UN-3445-pg-queue-integration integration branch (not main).

🤖 Generated with Claude Code

…s ERROR

The reaper only DELETEd expired pg_barrier_state rows — a stranded execution
vanished silently (stuck in EXECUTING). This makes it RECOVER them — the hard
dependency from PR 2c review for enabling the PG transport (un-catchable strand
windows otherwise bottom out at the ~6h barrier expiry).

Per expired barrier the leader (recover_expired_barriers), best-effort + per-exec:
- Marks the execution ERROR via the internal API (the path the normal callback
  uses — business state never goes direct-DB; the API is functional: execution_time,
  error truncation, attempts, events/notifications, multi-tenant boundary). Message
  distinguishes remaining>0 (work incomplete) vs remaining==0 (callback never fired).
  Reads status first and SKIPS if already terminal (a remaining==0 row can be a
  COMPLETED exec whose row-delete failed; update_execution has no terminal guard) or
  if the row has no org.
- Reclaims pg_batch_dedup + pg_barrier_state directly in PG (same boundary as the
  rest of queue_backend). Recover-THEN-delete: a failed mark leaves the row for the
  next sweep to retry (single-leader → no double-claim).

- backend: organization_id column on PgBarrierState + migration 0007 (reaper needs
  it for the org-scoped status API).
- workers: PgBarrier.enqueue stamps organization_id into the UPSERT; PgReaper holds
  a lazily-built InternalAPIClient; sweep_expired_barriers → recover_expired_barriers.

Tests: reaper suite reworked — real-PG recovery w/ fake API client (mark-ERROR
remaining>0/==0, terminal-skip no-overwrite, org-missing skip, API-failure leaves
row, dedup reclaim, tick-via-real-conn). 109 reaper/barrier/dedup tests green.
Dev-tested vs real InternalAPIClient+backend: PENDING stranded → ERROR + cleaned;
COMPLETED → NOT overwritten + cleaned (terminal-guard verified end-to-end).

Deferred: callback re-fire for remaining==0 (needs callback_descriptor on the row).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 535db58e-2721-4e90-8d8d-7a952b65018d

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3564-pg-reaper-recovery

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud S6741 fixed — 2ffd732e7

api.update_calls[0] (Sonar couldn't prove the list non-empty → potential IndexError) replaced with single-element unpacking (call,) = api.update_calls in the two recovery tests — removes the index and additionally pins exactly one status mark. 52 reaper tests green, ruff clean.

…in reaper tests

Replaced `api.update_calls[0]` (which Sonar can't prove non-empty → IndexError
risk) with single-element unpacking `(call,) = api.update_calls` — removes the
index and additionally asserts exactly one status mark was made. Behaviour
identical; 52 reaper tests green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR Review Toolkit pass (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier). Findings below are grouped by severity. The headline item is the status-read-failure path on _execution_status (flagged independently by two agents): it converts the reaper's central safety guarantee into one that only holds while the internal API is perfectly healthy.

Comment thread workers/queue_backend/pg_queue/reaper.py
Comment thread workers/queue_backend/pg_queue/reaper.py
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_barrier.py
Comment thread workers/tests/test_pg_barrier.py
Comment thread workers/tests/test_pg_reaper.py
Comment thread workers/tests/test_pg_reaper.py
Comment thread workers/tests/test_pg_reaper.py Outdated
…RROR (Critical) + hardening

- [Critical] _execution_status now RAISES when the status read fails
  (get_workflow_execution returns success=False, never raises) — a transient
  blip no longer reads as "non-terminal" and flips a COMPLETED execution to
  ERROR; the caller's except retains the row for retry. + test.
- [Medium] use ExecutionStatus.is_completed (single source of truth) instead of
  a local _TERMINAL_STATUSES frozenset that could drift; dropped the fake
  in (_TERMINAL_STATUSES) parens.
- [Medium] remaining is NOT-NULL int → typed int, three-way branch
  (>0 work-incomplete / ==0 callback-never-fired / <0 already-torn-down) with
  accurate messages.
- [Medium] re-guard the barrier DELETE on `expires_at < now()` + only clear
  dedup when the barrier row was actually reclaimed — a same-id re-enqueue
  (UPSERT resets expires_at) is no longer torn down mid-recovery.
- [Medium] org-missing now LEAVES the row (doesn't erase the only recovery
  handle) + logs ERROR; PgBarrier.enqueue logs ERROR at write time if a barrier
  is enqueued without an org (should never happen).
- [Low] recover_expired_barriers emits an aggregate summary; escalates to
  logger.error when a non-empty sweep recovers nothing (systemic: API down).
- [Low] refreshed the stale "(future) periodic sweep" comment in pg_barrier.

Tests: _FakeApiClient models the real success/failure contract + records
file_execution; +read-failure-retains-row, +file_execution=False, +multi-row
isolation (one fails, others recover), +org-stamping x3 (stamp/default/UPSERT
refresh). 115 reaper/barrier/dedup tests green; ruff + ruff-format clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round addressed — 3800ef1f4

Thanks @muhammad-ali-e — the Critical one was a genuine corruption hole. All 11 handled.

[Critical] read-failure → false-ERROR. get_workflow_execution returns success=False on any blip (never raises), so a failed read read as "non-terminal" and would flip a COMPLETED execution to ERROR. _execution_status now raises on not response.success → the row is retained for the next sweep, never marked. (+ test, + _FakeApiClient now models the real contract.)

Mediums

  • ExecutionStatus.is_completed (single source of truth) replaces the local _TERMINAL_STATUSES frozenset — no drift; fake parens dropped.
  • remaining typed int + three-way message (>0 / ==0 / <0 already-torn-down).
  • barrier DELETE re-guarded on expires_at < now() + dedup-delete only when the row was reclaimed — a same-id re-enqueue isn't torn down mid-recovery.
  • org-missing now leaves the row (preserves the recovery handle) + ERROR log, plus a loud write-time ERROR in enqueue.

Lows

  • aggregate sweep summary + logger.error when a non-empty sweep recovers nothing (systemic signal).
  • refreshed the stale "(future) periodic sweep" comment.

Tests (+org-stamping ×3 incl. the ON CONFLICT refresh; +read-failure-retains-row; +file_execution=False; +multi-row isolation). 115 reaper/barrier/dedup tests green; ruff + ruff-format clean.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 17, 2026 16:29
A new occurrence introduced by the prior review-round test
(test_status_read_passes_file_execution_false used api.get_calls[0][2]). Replaced
with single-element unpacking — [(_exec_id, _org, file_execution)] = api.get_calls
— index-free and additionally pins exactly one status read. 55 reaper tests green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud S6741 (new instance) fixed — 93a276e58

Different occurrence from the earlier update_calls[0] fix — this one (api.get_calls[0][2]) was introduced by the prior review round's new test_status_read_passes_file_execution_false. Replaced with index-free unpacking [(_exec_id, _org, file_execution)] = api.get_calls (also pins exactly one read). 55 reaper tests green, ruff clean.

@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR upgrades the PG-queue reaper from a silent DELETE-only orphan sweep to a full recovery loop: for each expired pg_barrier_state row the leader reads execution status, skips already-terminal executions, guards against same-id re-arms, marks the stranded execution ERROR via the internal API, and then reclaims the pg_batch_dedup and pg_barrier_state rows. It also adds an organization_id column (migration 0007) and stamps it at enqueue time so the reaper can call the org-scoped status API.

  • Reaper (reaper.py): sweep_expired_barriers replaced by recover_expired_barriers + four helper functions; handles terminal-skip, re-arm guard, org-missing skip, and per-row failure isolation with a lazy InternalAPIClient.
  • Barrier (pg_barrier.py): UPSERT extended to stamp organization_id from callback_kwargs; logs loudly when absent.
  • Tests: 13 new real-PG integration tests in test_pg_reaper.py covering all documented recovery paths; barrier tests updated for the new column.

Confidence Score: 4/5

The recovery logic is well-structured and the terminal-skip/re-arm guards are correct, but the unchecked return value of update_workflow_execution_status means a silent server-side failure could cause the barrier row to be deleted while the execution remains non-terminal — permanently stranding it.

The _mark_stranded_error function calls api_client.update_workflow_execution_status and does not inspect the return value. If the HTTP call completes but the server returns an error body (success=False without raising), the code falls through to the DELETE, removes the barrier row, and the execution is left stuck with no row for the reaper to retry on subsequent sweeps. The rest of the recovery logic — terminal guard, re-arm check, per-row failure isolation — is sound.

workers/queue_backend/pg_queue/reaper.py — specifically _mark_stranded_error and the update_workflow_execution_status return-value contract.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/reaper.py Core change: sweeps replaced by per-execution recovery with status-read, terminal-skip, re-arm guard, and API ERROR-mark. The update_workflow_execution_status return value is unchecked, creating a gap where a silent server-side failure causes the barrier row to be deleted while the execution remains non-terminal.
workers/queue_backend/pg_barrier.py Adds organization_id stamping to the barrier UPSERT, with an error log when the org is absent. The ON CONFLICT clause correctly refreshes the org on re-enqueue.
backend/pg_queue/migrations/0007_pgbarrierstate_organization_id.py Safe additive migration: adds nullable-via-default organization_id TextField with blank=True, default="" — no destructive change, existing rows get empty string.
backend/pg_queue/models.py Adds organization_id field matching the migration; no-NULL convention is consistent with PgQueueMessage.org_id.
workers/queue_backend/pg_queue/init.py Renames the public export from sweep_expired_barriers to recover_expired_barriers in both the import and __all__.
workers/tests/test_pg_reaper.py Comprehensive rework: adds _FakeApiClient, 13 new real-PG integration tests covering mark-ERROR, terminal-skip, re-arm guard, api-failure-leaves-row, dedup reclaim, and systemic-alert suppression.
workers/tests/test_pg_barrier.py Updates existing INSERT/seed helpers to include the new organization_id column; adds three new tests for org stamping, default-empty, and UPSERT refresh.

Reviews (2): Last reviewed commit: "UN-3564 fix SonarCloud S125: drop code-l..." | Re-trigger Greptile

Comment thread workers/queue_backend/pg_queue/reaper.py
Comment thread workers/queue_backend/pg_queue/reaper.py
Comment thread workers/queue_backend/pg_queue/reaper.py
…ate skip-vs-fail

- [main] re-arm race: the ERROR mark now fires only after a re-check that the
  row is STILL expired (_still_expired) immediately before marking — so a same-id
  re-enqueue (expires_at reset to future) between the sweep SELECT and the mark
  can't get its freshly-running execution flagged ERROR. The DELETE stays guarded
  on expires_at < now() as the second line of defence. + test (re-arm via the
  status-read side-effect) asserting no mark + row left intact.
- status=None on a success=True read no longer falls through to mark ERROR —
  treated as indeterminate: skip + leave the row for the next sweep. + test.
- aggregate logging distinguishes genuine failures (exceptions) from benign skips
  (terminal / re-armed / no-status / no-org): the systemic "recovered NONE / API
  down" ERROR escalates only on real failures, not on all-skipped sweeps. + caplog
  test that an all-org-missing sweep doesn't escalate.

58 reaper + 51 barrier tests green; ruff + ruff-format clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

@greptile-apps — all three addressed (f6c84ba45)

Thanks for the 3/5. All in the recovery path:

Re-arm race (main). Correct — the ERROR mark fired before the expires_at guard, so a re-enqueued live run could be marked ERROR. Fixed: the mark now fires only after a re-check that the row is still expired (_still_expired) immediately before it — a same-id re-enqueue between the sweep SELECT and the mark is detected and skipped (its new run owns the barrier). The DELETE stays guarded on expires_at < now() as the second line of defence. Added test_rearmed_execution_is_not_marked_error (re-arm simulated via the status-read side-effect): asserts no mark + the re-armed row left intact. (Residual: a microsecond TOCTOU between the re-check and the mark remains, but it requires the same execution_id re-enqueued in that window for a barrier that's already 6h-expired — and even then the guarded DELETE leaves the new run's barrier, so its callback re-marks COMPLETED. Self-correcting.)

status=None on success=True. No longer falls through to mark ERROR — treated as indeterminate: skip + leave the row for the next sweep. + test.

org-missing inflating the systemic error. The aggregate now distinguishes genuine failures (exceptions) from benign skips (terminal / re-armed / no-status / no-org). The "recovered NONE / API down" ERROR escalates only on real failures; an all-skipped sweep doesn't. + caplog test.

58 reaper + 51 barrier tests green; ruff + ruff-format clean.

@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

All three look correct in f6c84ba45.

Re-arm race_still_expired is correctly interleaved between the status read and the _mark_stranded_error call, not after it. The re-arm path returns False before the mark fires, so the DELETE guard never even runs for a re-armed row. The residual TOCTOU window (between _still_expired and _mark_stranded_error) is acknowledged and self-correcting as described.

status=None on success=True — handled as a skip (return False) rather than a raise, which is the right call: it's anomalous but not an API failure, so leaving for retry (not escalating) is correct.

Org-missing / systemic errorfailed now counts only exceptions; benign skips (terminal, re-armed, no-status, no-org) are in skipped. The logger.error "systemic" only fires when failed and not recovered. The caplog test pins this correctly.

LGTM — the recovery path is tight now. Ready to merge when the integration branch is ready.

The comment ended with a parenthesised tuple "(exec_id, org, file_execution)" —
a complete Python expression S125 reads as commented-out code. Reworded to prose
(and the stale `sweep_expired_barriers`→recover rename + a couple of inline
`foo()` prose mentions neutralised while here). Comment-only; tests unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud S125 fixed — 926df20a0

The flagged comment ended with a parenthesised tuple (exec_id, org, file_execution) — a valid Python expression S125 parses as commented-out code. Reworded to prose; also dropped a stale sweep_expired_barriers reference and a couple of inline foo() prose mentions to remove other code-like patterns. Comment-only change; tests unaffected.

@muhammad-ali-e muhammad-ali-e merged commit c872b34 into feat/UN-3445-pg-queue-integration Jun 17, 2026
4 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3564-pg-reaper-recovery branch June 17, 2026 16:52
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.