UN-3562 [FEAT] PG Queue 9e PR 2b — per-batch idempotency primitive (pg_batch_dedup + claim_batch / clear_execution_batches)#2068
UN-3562 [FEAT] PG Queue 9e PR 2b — per-batch idempotency primitive (pg_batch_dedup + claim_batch / clear_execution_batches)#2068muhammad-ali-e merged 2 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3562-pg-pipeline-2b-idempotencyZipstack/unstract:UN-3562-pg-pipeline-2b-idempotencyCopy head branch name to clipboard
Conversation
…g_batch_dedup + claim_batch / clear_execution_batches) Second slice of 9e PR 2, after 2a (#2067). Inert idempotency primitive: the durable per-batch dedup marker 2c wires into the at-least-once PG path. Why: the PG queue is at-least-once, so process_file_batch can be redelivered after a crash-before-ack → re-run batch + double-decrement the barrier (non-idempotent, max_retries=0). Recon showed existing per-file protection is only partial (Redis lock released after write; WFE COMPLETED skips tool re-exec but not necessarily the destination write; FileHistory is cross-execution only), so a durable per-batch gate is needed; per-file status stays the partial-crash backstop. - backend: PgBatchDedup model + migration 0006 — table pg_batch_dedup with a UniqueConstraint(execution_id, batch_index) (the ON CONFLICT target; its execution_id-leading index also serves the cleanup DELETE). Django-managed, extension-free, same posture as the sibling pg_queue models. - workers (pg_barrier.py, reusing the barrier's _cursor() → one PG conn per worker child): claim_batch(execution_id, batch_index) -> bool (atomic INSERT ... ON CONFLICT DO NOTHING RETURNING; True=first/decrement, False=redelivery/skip) + clear_execution_batches(execution_id) -> int (barrier-teardown cleanup; reaper sweep is the backstop). No call-site wiring — claim/clear + batch_index threading + transport switch land in 2c. Inert: new table + two helpers, no callers. Net behaviour: NONE. Tests: +8 real-PG (first-claim, redelivery-rejected, distinct-batch, distinct-exec, concurrent-exactly-one-winner, clear-only-target, clear-empty-zero, reclaim-after-clear). Migration applied to dev DB, makemigrations --check clean, bootstrap clean under WORKER_BARRIER_BACKEND=pg. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review (PR Review Toolkit)
Reviewed the per-batch idempotency primitive across 4 files using six specialized agents (code review, silent-failure, type design, test coverage, comments, simplifier). The code is well-scoped, SQL-safe (parameterized), and closely mirrors its PgBarrierState / _delete_barrier / test_pg_barrier.py siblings. No correctness defects in the production helpers. Findings below are prioritized; the one verified-inaccurate docstring (P1) is the only thing I'd treat as a should-fix before merge.
Note: a black/line-length finding was raised and dropped after verification — pre-commit uses ruff-format (line-length 90), the line is exactly 90 chars and passes; the [tool.black] entry in workers/pyproject.toml is vestigial.
…cstring claim, batch_index constraint, race test, nits - P1 (verified bug): the docstrings claimed the reaper's barrier-orphan sweep reclaims orphaned pg_batch_dedup markers — it doesn't. sweep_expired_barriers DELETEs only pg_barrier_state (no cascade), so orphaned markers leak today. Reworded both PgBatchDedup + clear_execution_batches docstrings to state the leak honestly + flag the dedup-orphan sweep as intended future work. - P4: add CheckConstraint(batch_index >= 0) (writer-proof, mirrors PgQueueMessage.priority) + a test that the DB rejects a negative index. Regenerated migration 0006 to include it. - P6: claim_batch docstring no longer says it decrements; defers the single decrement to the caller (the function only inserts the marker). - P7: generalized "partial per-file protection" → "not fully idempotent on redelivery" so the rationale can't rot. - P5: documented created_at as observability-only (future age-based sweep). - P2: REQUIRE_PG_TESTS env → skip becomes fail, so the idempotency primitive can't ship untested-green in CI where PG is expected. - P3: strengthened the race test — pre-build N=8 conns in the main thread, align claims with threading.Barrier, loop 5 trials (forces the contended ON CONFLICT path instead of a serial fast-path). - P8: hoisted import os to module top. 35 pg_barrier + 9 dedup tests green; migration applied to dev DB, makemigrations --check clean; ruff clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review round addressed —
|
|
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_barrier.py | Adds claim_batch and clear_execution_batches using the existing thread-local _cursor() pattern. The new primitives are individually correct, but PgBarrier.enqueue() — which already lives in this file — resets pg_barrier_state on execution_id reuse without clearing the new pg_batch_dedup rows; when 2c wires in claim_batch, retried executions will silently skip all batches. |
| backend/pg_queue/models.py | Adds PgBatchDedup model with a UniqueConstraint(execution_id, batch_index) as the ON CONFLICT target and a CheckConstraint(batch_index >= 0). Schema is clean, extension-free, and consistent with sibling models. |
| backend/pg_queue/migrations/0006_pgbatchdedup_and_more.py | Auto-generated migration creating pg_batch_dedup with the correct constraints; depends on 0005 and mirrors the model exactly. |
| workers/tests/test_pg_batch_dedup.py | Good test coverage: first-claim, redelivery rejection, cross-batch/cross-execution isolation, negative index DB rejection, concurrent winner test (with threading.Barrier), clear-only-target, clear-empty, and reclaim-after-clear. The concurrency test is well-structured; the skip/fail logic for REQUIRE_PG_TESTS is a nice CI safeguard. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Worker as Worker (process_file_batch)
participant PG as Postgres (pg_batch_dedup)
participant Barrier as pg_barrier_state
Note over Worker,Barrier: First delivery (2c wired path)
Worker->>PG: claim_batch(exec_id, batch_idx) INSERT ON CONFLICT DO NOTHING RETURNING
PG-->>Worker: row returned → True (first delivery)
Worker->>Worker: Process batch
Worker->>Barrier: Decrement remaining
Note over Worker,Barrier: Redelivery (crash-before-ack)
Worker->>PG: claim_batch(exec_id, batch_idx) INSERT ON CONFLICT DO NOTHING RETURNING
PG-->>Worker: no row returned → False (redelivery)
Worker->>Worker: Skip (barrier already decremented once)
Note over Worker,PG: Barrier finalise (remaining → 0)
Barrier->>PG: "clear_execution_batches(exec_id) DELETE WHERE execution_id = ?"
PG-->>Barrier: rowcount (markers removed)
Note over Worker,PG: Execution retry with same execution_id (gap)
Worker->>Barrier: PgBarrier.enqueue() UPSERT pg_barrier_state
Note right of Barrier: pg_batch_dedup NOT cleared — stale markers remain
Worker->>PG: claim_batch(exec_id, batch_idx)
PG-->>Worker: False (stale marker) — batch silently skipped
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Worker as Worker (process_file_batch)
participant PG as Postgres (pg_batch_dedup)
participant Barrier as pg_barrier_state
Note over Worker,Barrier: First delivery (2c wired path)
Worker->>PG: claim_batch(exec_id, batch_idx) INSERT ON CONFLICT DO NOTHING RETURNING
PG-->>Worker: row returned → True (first delivery)
Worker->>Worker: Process batch
Worker->>Barrier: Decrement remaining
Note over Worker,Barrier: Redelivery (crash-before-ack)
Worker->>PG: claim_batch(exec_id, batch_idx) INSERT ON CONFLICT DO NOTHING RETURNING
PG-->>Worker: no row returned → False (redelivery)
Worker->>Worker: Skip (barrier already decremented once)
Note over Worker,PG: Barrier finalise (remaining → 0)
Barrier->>PG: "clear_execution_batches(exec_id) DELETE WHERE execution_id = ?"
PG-->>Barrier: rowcount (markers removed)
Note over Worker,PG: Execution retry with same execution_id (gap)
Worker->>Barrier: PgBarrier.enqueue() UPSERT pg_barrier_state
Note right of Barrier: pg_batch_dedup NOT cleared — stale markers remain
Worker->>PG: claim_batch(exec_id, batch_idx)
PG-->>Worker: False (stale marker) — batch silently skipped
Comments Outside Diff (2)
-
workers/queue_backend/pg_barrier.py, line 236-251 (link)Stale dedup markers silently block retried executions
enqueue()resetspg_barrier_statefor a reusedexecution_idvia UPSERT (the comment at line 237 explicitly acknowledges this reuse scenario), but it does NOT callclear_execution_batches(execution_id). When 2c wires inclaim_batch, any retry of a failed or expired execution that reuses the sameexecution_idwill find the old markers still present — everyclaim_batchcall returnsFalse, all batches are silently skipped, the barrier counter never reaches 0, and the retry hangs untilexpires_at. The fix is to callclear_execution_batches(execution_id)inside the same_cursor()block as the UPSERT so both resets are atomic.Prompt To Fix With AI
This is a comment left during a code review. Path: workers/queue_backend/pg_barrier.py Line: 236-251 Comment: **Stale dedup markers silently block retried executions** `enqueue()` resets `pg_barrier_state` for a reused `execution_id` via UPSERT (the comment at line 237 explicitly acknowledges this reuse scenario), but it does NOT call `clear_execution_batches(execution_id)`. When 2c wires in `claim_batch`, any retry of a failed or expired execution that reuses the same `execution_id` will find the old markers still present — every `claim_batch` call returns `False`, all batches are silently skipped, the barrier counter never reaches 0, and the retry hangs until `expires_at`. The fix is to call `clear_execution_batches(execution_id)` inside the same `_cursor()` block as the UPSERT so both resets are atomic. How can I resolve this? If you propose a fix, please make it concise.
-
workers/queue_backend/pg_barrier.py, line 499-507 (link)barrier_pg_abortorphans dedup markers, compounding the enqueue-reuse gapbarrier_pg_abortdeletespg_barrier_state(the claim-and-teardown CTE) but has no correspondingclear_execution_batchescall. This is documented as future work in the model docstring, but it directly feeds the P1 gap on the enqueue path: an aborted execution leaves itspg_batch_deduprows in place, and ifenqueue()doesn't clear them on the next attempt,claim_batchwill block the retry. Clearing them here (alongside thepg_barrier_statedelete) would be the belt-and-suspenders fix and close the gap independently of the enqueue-side fix.Prompt To Fix With AI
This is a comment left during a code review. Path: workers/queue_backend/pg_barrier.py Line: 499-507 Comment: **`barrier_pg_abort` orphans dedup markers, compounding the enqueue-reuse gap** `barrier_pg_abort` deletes `pg_barrier_state` (the claim-and-teardown CTE) but has no corresponding `clear_execution_batches` call. This is documented as future work in the model docstring, but it directly feeds the P1 gap on the enqueue path: an aborted execution leaves its `pg_batch_dedup` rows in place, and if `enqueue()` doesn't clear them on the next attempt, `claim_batch` will block the retry. Clearing them here (alongside the `pg_barrier_state` delete) would be the belt-and-suspenders fix and close the gap independently of the enqueue-side fix. How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
workers/queue_backend/pg_barrier.py:236-251
**Stale dedup markers silently block retried executions**
`enqueue()` resets `pg_barrier_state` for a reused `execution_id` via UPSERT (the comment at line 237 explicitly acknowledges this reuse scenario), but it does NOT call `clear_execution_batches(execution_id)`. When 2c wires in `claim_batch`, any retry of a failed or expired execution that reuses the same `execution_id` will find the old markers still present — every `claim_batch` call returns `False`, all batches are silently skipped, the barrier counter never reaches 0, and the retry hangs until `expires_at`. The fix is to call `clear_execution_batches(execution_id)` inside the same `_cursor()` block as the UPSERT so both resets are atomic.
### Issue 2 of 2
workers/queue_backend/pg_barrier.py:499-507
**`barrier_pg_abort` orphans dedup markers, compounding the enqueue-reuse gap**
`barrier_pg_abort` deletes `pg_barrier_state` (the claim-and-teardown CTE) but has no corresponding `clear_execution_batches` call. This is documented as future work in the model docstring, but it directly feeds the P1 gap on the enqueue path: an aborted execution leaves its `pg_batch_dedup` rows in place, and if `enqueue()` doesn't clear them on the next attempt, `claim_batch` will block the retry. Clearing them here (alongside the `pg_barrier_state` delete) would be the belt-and-suspenders fix and close the gap independently of the enqueue-side fix.
Reviews (1): Last reviewed commit: "UN-3562 address review (muhammad-ali-e P..." | Re-trigger Greptile
@greptile-apps — thanks, sharp catch (tracked for 2c)The 3/5 finding is correct and verified: Where it lands: 2c, not 2b. The gap can only manifest once
Captured as a must-do in the 2c scope with a same- |
|
Acknowledged — the sequencing logic holds. The gap is latent-only in 2b (empty table = no stale markers to hit), so pulling the fix forward would break the deliberate "inert primitive, no call sites" invariant that makes 2b reviewable in isolation. The 2c landing plan is exactly right:
The same- |
5f2d18d
into
feat/UN-3445-pg-queue-integration
…fire-and-forget) (#2069) * UN-3563 [FEAT] PG Queue 9e PR 2c — live PG fan-out/barrier/callback (fire-and-forget) Wires the coupled pipeline's fan-out → barrier → callback onto the PG queue for a transport=="pg_queue" execution. Gated: resolve_transport() still returns celery (PR3 Flipt flips it), so the whole PG branch is present-but-unreachable — default path byte-identical. Orchestrator task (async_execute_bin) stays on Celery (hybrid); routing it onto PG is a 2d follow-up. - barrier.py: Barrier Protocol + CeleryChordBarrier/RedisDecrBarrier accept (and ignore) a `transport` param; CallbackDescriptor gains an optional `backend`. - orchestration_utils._barrier_for_transport: pg_queue → fresh PgBarrier() (bypasses the WORKER_BARRIER_BACKEND singleton), else the singleton. - pg_barrier.PgBarrier.enqueue(transport): pg_queue → fire-and-forget mode — _dispatch_header_pg sends each header via dispatch(backend=PG) with an injected _barrier_context {execution_id, batch_index, callback_descriptor}, no .link; descriptor marked backend=pg_queue; UPSERT block also clears pg_batch_dedup (greptile #2068 reuse-reset). _fire_barrier_callback self-chains the callback onto PG when backend==pg_queue. clear_execution_batches at finalise + abort. run_batch_with_barrier(): claim → work → in-body _barrier_pg_decrement; redelivery skips; exception → barrier_pg_abort. - file_processing.process_file_batch(_barrier_context=None): core routes None → _run_batch_stages (celery chord path), else → run_batch_with_barrier. - general/api fan-outs thread transport into create_chord_execution. Tests: +8 PgBarrier fire-and-forget + 2 orchestration routing + 2 process_file_batch routing. Each test file green alone; ruff clean. End-to-end forced-pg dev-test pending before PR. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 fix SonarCloud S1172: drop unused task_instance from _run_batch_stages The extracted _run_batch_stages never uses task_instance — its only purpose (deriving celery_task_id) happens in _process_file_batch_core before the call. Removed the param + updated both call sites. _process_file_batch_core keeps task_instance (it reads .request.id). Routing test mocks with *a, unaffected. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 address review (muhammad-ali-e, 15): strand-on-failure hardening + typing/dedup/docs/tests Decision (with reviewer): reaper-as-safety-net for the un-catchable strand windows + fix what's catchable + document + gate on PR3. Failure handling: - [#69 Critical] run_batch_with_barrier wraps BOTH work + decrement in the abort: a decrement-side failure (guard / DB / last-batch callback dispatch) tears the barrier down in-body instead of stranding to expiry. - [#79] extracted _abort_barrier_in_body — logs when the teardown itself fails (was silently suppressed under a misleading "torn down" message). - [#74/#81] documented the two un-catchable strand windows (hard-crash-during-work, post-commit callback-dispatch-fail) as a HARD reaper dependency for PR3. - [#86] finalise cleanup split into independent try/excepts with distinct logs. Typing / clarity: - [#1] BarrierContext(TypedDict) for _barrier_context (header fan-out, run_batch_with_barrier, process_file_batch). - [#3] renamed CallbackDescriptor "backend" -> "transport" (WorkflowTransport value; avoids the QueueBackend "pg" collision). - [#27] is_pg_transport() predicate in core; used in orchestration_utils + pg_barrier. - [#20] extracted _dispatch_pg() — single home for cycle-avoiding local import + backend=PG. - [#35] normalize_transport() at the general worker entry (parity w/ api/scheduler). - [#94] log when a header has no queue option. - [#9/#13] fixed born-stale comment + kwargs-not-args docstring. Tests (+#37/#41): last-batch self-chains callback to PG + cleans up barrier/dedup; decrement-failure aborts; PG-branch mid-loop dispatch-failure deletes row; header args/queue/pre-existing-kwargs preservation. 137 barrier/dedup/routing tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 fix run_batch_with_barrier strand-window doc inconsistency (review) The second "NOT catchable" bullet conflated two different things: it described the in-body catchable abort ("the abort here removes the row") and a *software* callback-dispatch failure — but that failure is already caught + torn down by step 3's wrap (paragraph 1), so it doesn't belong under the un-catchable heading, and on the PG path _fire_barrier_callback IS the enqueue so "committed but before the enqueue" couldn't both hold. Rewrote the bullet to the genuinely un-catchable window: a hard crash BETWEEN the decrement committing (remaining→0) and the callback enqueue completing — decrement committed (redelivery blocked by the marker), process gone before the callback enqueues or any abort runs, row survives to expiry, reaper-only recovery. Explicitly notes a software dispatch failure is the catchable case. Keeps this list an accurate spec for the PR-3 reaper-recovery dependency. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 address greptile (#2069, 2): clear dedup on mid-loop PG failure + carry fairness on PG callback Both in the gated PG path (greptile 4/5, safe to merge). - Issue 1: PgBarrier.enqueue mid-loop dispatch-failure handler now also calls clear_execution_batches on the PG path. Earlier headers may have committed a claim_batch marker; with the barrier row deleted, their in-flight barrier_pg_abort is a no-op (already_aborted) and never reaches the clear inside it, so reclaim the markers directly here. - Issue 2: the PG callback now carries the producer's fairness. Added _fairness_from_headers() to reconstruct the FairnessKey from the stored x-fairness-key headers and pass it to _dispatch_pg, so the callback rides the same org/priority as the Celery path (was always default priority). Tests: +fairness-carried / +fairness-none-safe on _fire_barrier_callback; extended the PG mid-loop test to assert an already-claimed marker is reclaimed. 75 barrier/dedup tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 fix SonarCloud S3776: reduce PgBarrier.enqueue cognitive complexity (17→under 15) Extracted the per-header dispatch loop into PgBarrier._dispatch_headers — the deeply-nested for→try/except→if/else→if (PG-vs-celery branch + mid-loop failure teardown + PG dedup-clear) was the complexity driver. enqueue now calls the helper; behaviour identical. radon: enqueue C(11)→B(6); ruff C901 passes. 75 barrier/dedup tests green; ruff + ruff-format clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 fix greptile #2069: mid-loop dedup-clear test passed for the wrong reason The pre-seeded claim_batch marker was wiped by enqueue's UPSERT block (the reuse-reset DELETE) before the dispatch loop, so the mid-loop clear_execution_batches deleted 0 rows — the count==0 assertion passed on the UPSERT, not the guard under test. Now the first dispatch side-effect claims the marker AFTER the UPSERT (simulating a fast PG consumer), so the mid-loop clear is what removes it. Verified: with the clear disabled the marker orphans (count=1). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
What
Second slice of 9e PR 2 (the live PG execution pipeline), after 2a (#2067). This is 2b, the inert idempotency primitive — the durable per-batch dedup marker that PR 2c will wire into the at-least-once PG path.
Why
The PG queue is at-least-once: a
process_file_batchtask can be redelivered after a crash-before-ack, which would re-run the batch and double-decrement the barrier (process_file_batchis non-idempotent —max_retries=0). A recon of the existing per-file protection found it only partial:WorkflowFileExecutionCOMPLETED skips tool re-execution, but the destination write may still be reached with the prior result;use_file_history) is cross-execution only — doesn't cover same-execution crash-redelivery.So a durable per-batch gate is needed. The existing per-file status remains the backstop for the partial-crash re-processing case (a re-run batch whose marker wasn't written yet skips already-done files).
Scope (2b)
PgBatchDedupmodel + migration 0006: tablepg_batch_dedupwith aUniqueConstraint(execution_id, batch_index)(theON CONFLICTtarget; itsexecution_id-leading index also serves the cleanupDELETE). Django-managed, extension-free, same posture as the siblingpg_queuemodels.queue_backend/pg_barrier.py, reusing the barrier's thread-local_cursor()→ one PG connection per worker child):claim_batch(execution_id, batch_index) -> bool— atomicINSERT … ON CONFLICT DO NOTHING RETURNING;True= first delivery (proceed + decrement),False= redelivery (skip, so the barrier decrements exactly once per batch).clear_execution_batches(execution_id) -> int— barrier-teardown cleanup; the reaper's barrier-orphan sweep is the backstop for executions that never finalise.Not in 2b
No call-site wiring.
claim_batch(at batch start) +clear_execution_batches(at barrier finalise) +batch_indexthreading + the transport switch all land in 2c. (Same "don't thread unused params early" lesson that kept 2a clean.) Net behaviour change: NONE.Tests / dev-test
makemigrations --check→ no drift.WORKER_BARRIER_BACKEND=pg;ruffclean.Base
Targets the long-lived
feat/UN-3445-pg-queue-integrationintegration branch (notmain).🤖 Generated with Claude Code