UN-3564 [FEAT] PG Queue 9e — reaper-recovery: mark stranded executions ERROR (gates PR 3)#2070
UN-3564 [FEAT] PG Queue 9e — reaper-recovery: mark stranded executions ERROR (gates PR 3)#2070muhammad-ali-e merged 7 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3564-pg-reaper-recoveryZipstack/unstract:UN-3564-pg-reaper-recoveryCopy head branch name to clipboard
Conversation
…s ERROR The reaper only DELETEd expired pg_barrier_state rows — a stranded execution vanished silently (stuck in EXECUTING). This makes it RECOVER them — the hard dependency from PR 2c review for enabling the PG transport (un-catchable strand windows otherwise bottom out at the ~6h barrier expiry). Per expired barrier the leader (recover_expired_barriers), best-effort + per-exec: - Marks the execution ERROR via the internal API (the path the normal callback uses — business state never goes direct-DB; the API is functional: execution_time, error truncation, attempts, events/notifications, multi-tenant boundary). Message distinguishes remaining>0 (work incomplete) vs remaining==0 (callback never fired). Reads status first and SKIPS if already terminal (a remaining==0 row can be a COMPLETED exec whose row-delete failed; update_execution has no terminal guard) or if the row has no org. - Reclaims pg_batch_dedup + pg_barrier_state directly in PG (same boundary as the rest of queue_backend). Recover-THEN-delete: a failed mark leaves the row for the next sweep to retry (single-leader → no double-claim). - backend: organization_id column on PgBarrierState + migration 0007 (reaper needs it for the org-scoped status API). - workers: PgBarrier.enqueue stamps organization_id into the UPSERT; PgReaper holds a lazily-built InternalAPIClient; sweep_expired_barriers → recover_expired_barriers. Tests: reaper suite reworked — real-PG recovery w/ fake API client (mark-ERROR remaining>0/==0, terminal-skip no-overwrite, org-missing skip, API-failure leaves row, dedup reclaim, tick-via-real-conn). 109 reaper/barrier/dedup tests green. Dev-tested vs real InternalAPIClient+backend: PENDING stranded → ERROR + cleaned; COMPLETED → NOT overwritten + cleaned (terminal-guard verified end-to-end). Deferred: callback re-fire for remaining==0 (needs callback_descriptor on the row). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
for more information, see https://pre-commit.ci
SonarCloud S6741 fixed —
|
…in reaper tests Replaced `api.update_calls[0]` (which Sonar can't prove non-empty → IndexError risk) with single-element unpacking `(call,) = api.update_calls` — removes the index and additionally asserts exactly one status mark was made. Behaviour identical; 52 reaper tests green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review Toolkit pass (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier). Findings below are grouped by severity. The headline item is the status-read-failure path on _execution_status (flagged independently by two agents): it converts the reaper's central safety guarantee into one that only holds while the internal API is perfectly healthy.
…RROR (Critical) + hardening - [Critical] _execution_status now RAISES when the status read fails (get_workflow_execution returns success=False, never raises) — a transient blip no longer reads as "non-terminal" and flips a COMPLETED execution to ERROR; the caller's except retains the row for retry. + test. - [Medium] use ExecutionStatus.is_completed (single source of truth) instead of a local _TERMINAL_STATUSES frozenset that could drift; dropped the fake in (_TERMINAL_STATUSES) parens. - [Medium] remaining is NOT-NULL int → typed int, three-way branch (>0 work-incomplete / ==0 callback-never-fired / <0 already-torn-down) with accurate messages. - [Medium] re-guard the barrier DELETE on `expires_at < now()` + only clear dedup when the barrier row was actually reclaimed — a same-id re-enqueue (UPSERT resets expires_at) is no longer torn down mid-recovery. - [Medium] org-missing now LEAVES the row (doesn't erase the only recovery handle) + logs ERROR; PgBarrier.enqueue logs ERROR at write time if a barrier is enqueued without an org (should never happen). - [Low] recover_expired_barriers emits an aggregate summary; escalates to logger.error when a non-empty sweep recovers nothing (systemic: API down). - [Low] refreshed the stale "(future) periodic sweep" comment in pg_barrier. Tests: _FakeApiClient models the real success/failure contract + records file_execution; +read-failure-retains-row, +file_execution=False, +multi-row isolation (one fails, others recover), +org-stamping x3 (stamp/default/UPSERT refresh). 115 reaper/barrier/dedup tests green; ruff + ruff-format clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review round addressed —
|
A new occurrence introduced by the prior review-round test (test_status_read_passes_file_execution_false used api.get_calls[0][2]). Replaced with single-element unpacking — [(_exec_id, _org, file_execution)] = api.get_calls — index-free and additionally pins exactly one status read. 55 reaper tests green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
SonarCloud S6741 (new instance) fixed —
|
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/reaper.py | Core change: sweeps replaced by per-execution recovery with status-read, terminal-skip, re-arm guard, and API ERROR-mark. The update_workflow_execution_status return value is unchecked, creating a gap where a silent server-side failure causes the barrier row to be deleted while the execution remains non-terminal. |
| workers/queue_backend/pg_barrier.py | Adds organization_id stamping to the barrier UPSERT, with an error log when the org is absent. The ON CONFLICT clause correctly refreshes the org on re-enqueue. |
| backend/pg_queue/migrations/0007_pgbarrierstate_organization_id.py | Safe additive migration: adds nullable-via-default organization_id TextField with blank=True, default="" — no destructive change, existing rows get empty string. |
| backend/pg_queue/models.py | Adds organization_id field matching the migration; no-NULL convention is consistent with PgQueueMessage.org_id. |
| workers/queue_backend/pg_queue/init.py | Renames the public export from sweep_expired_barriers to recover_expired_barriers in both the import and __all__. |
| workers/tests/test_pg_reaper.py | Comprehensive rework: adds _FakeApiClient, 13 new real-PG integration tests covering mark-ERROR, terminal-skip, re-arm guard, api-failure-leaves-row, dedup reclaim, and systemic-alert suppression. |
| workers/tests/test_pg_barrier.py | Updates existing INSERT/seed helpers to include the new organization_id column; adds three new tests for org stamping, default-empty, and UPSERT refresh. |
Reviews (2): Last reviewed commit: "UN-3564 fix SonarCloud S125: drop code-l..." | Re-trigger Greptile
…ate skip-vs-fail - [main] re-arm race: the ERROR mark now fires only after a re-check that the row is STILL expired (_still_expired) immediately before marking — so a same-id re-enqueue (expires_at reset to future) between the sweep SELECT and the mark can't get its freshly-running execution flagged ERROR. The DELETE stays guarded on expires_at < now() as the second line of defence. + test (re-arm via the status-read side-effect) asserting no mark + row left intact. - status=None on a success=True read no longer falls through to mark ERROR — treated as indeterminate: skip + leave the row for the next sweep. + test. - aggregate logging distinguishes genuine failures (exceptions) from benign skips (terminal / re-armed / no-status / no-org): the systemic "recovered NONE / API down" ERROR escalates only on real failures, not on all-skipped sweeps. + caplog test that an all-org-missing sweep doesn't escalate. 58 reaper + 51 barrier tests green; ruff + ruff-format clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@greptile-apps — all three addressed (
|
|
All three look correct in Re-arm race —
Org-missing / systemic error — LGTM — the recovery path is tight now. Ready to merge when the integration branch is ready. |
The comment ended with a parenthesised tuple "(exec_id, org, file_execution)" — a complete Python expression S125 reads as commented-out code. Reworded to prose (and the stale `sweep_expired_barriers`→recover rename + a couple of inline `foo()` prose mentions neutralised while here). Comment-only; tests unaffected. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
SonarCloud S125 fixed —
|
c872b34
into
feat/UN-3445-pg-queue-integration
|
What
The reaper today only
DELETEs expiredpg_barrier_staterows — a stranded execution then vanishes silently (stuck inEXECUTING). This makes the reaper recover stranded executions. It's the hard dependency surfaced in PR 2c review (#2069) for enabling the PG transport: the un-catchable strand windows (a worker SIGKILL mid-batch; a crash after the final decrement but before the callback enqueues) otherwise bottom out at the ~6h barrier expiry.How it works
For each
pg_barrier_staterow pastexpires_at, the leader (recover_expired_barriers), best-effort + per-execution:remaining>0(work incomplete) fromremaining==0(all batches done, callback never fired). It reads status first and skips the mark if the execution is already terminal — aremaining==0row can belong to a COMPLETED execution whose best-effort row-delete merely failed, and the backend status update has no terminal guard, so a blind mark would corrupt it — or if the row carries no org.pg_batch_dedup+pg_barrier_state) directly in PG — same boundary as the rest ofqueue_backend.Recover-then-delete: a failure (e.g. the API unreachable) leaves that barrier row for the next sweep to retry, and never blocks the others (single-leader → no double-claim).
Why the internal API for the ERROR mark (not direct DB)
queue_backendtouches only queue-infra tables directly; execution status is business state and is always transitioned via the internal API (the normal callback does exactly this). The API path is functional — it computesexecution_time, truncates the error, incrementsattempts, and is the events/notifications/cache seam — and enforces the multi-tenant boundary. A directUPDATEwould bypass all of that.Changes
organization_idcolumn onPgBarrierState+ migration 0007 (the reaper needs it for the org-scoped status API; it has onlyexecution_idoff the row otherwise).PgBarrier.enqueuestampsorganization_idinto the barrier UPSERT;PgReaperholds a lazily-builtInternalAPIClient;sweep_expired_barriers→recover_expired_barriers.Tests / dev-test
remaining>0and==0messages), terminal-skip (no overwrite), org-missing skip, API-failure-leaves-row-for-retry, dedup reclaim, tick-via-real-conn — plus the existing leadership / lease / heartbeat tests. 109 reaper/barrier/dedup tests green;ruffclean.InternalAPIClient+ backend: a strandedPENDINGexecution was marked ERROR + cleaned up; an already-COMPLETEDexecution was NOT overwritten ("no status overwrite") + cleaned up — the terminal-guard verified end-to-end.Deferred (follow-up)
Callback re-fire for the
remaining==0strand (heal → COMPLETED instead of ERROR) needscallback_descriptorstored on the barrier row; until then those strands are marked ERROR. Per-stage re-enqueue of stuck file executions is a larger pipeline-recovery effort beyond this net.Base
Targets the long-lived
feat/UN-3445-pg-queue-integrationintegration branch (notmain).🤖 Generated with Claude Code