UN-3563 [FEAT] PG Queue 9e PR 2c — live PG fan-out/barrier/callback (fire-and-forget)#2069
UN-3563 [FEAT] PG Queue 9e PR 2c — live PG fan-out/barrier/callback (fire-and-forget)#2069muhammad-ali-e merged 7 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3563-pg-pipeline-2c-live-switchZipstack/unstract:UN-3563-pg-pipeline-2c-live-switchCopy head branch name to clipboard
Conversation
…fire-and-forget)
Wires the coupled pipeline's fan-out → barrier → callback onto the PG queue for
a transport=="pg_queue" execution. Gated: resolve_transport() still returns
celery (PR3 Flipt flips it), so the whole PG branch is present-but-unreachable —
default path byte-identical. Orchestrator task (async_execute_bin) stays on
Celery (hybrid); routing it onto PG is a 2d follow-up.
- barrier.py: Barrier Protocol + CeleryChordBarrier/RedisDecrBarrier accept (and
ignore) a `transport` param; CallbackDescriptor gains an optional `backend`.
- orchestration_utils._barrier_for_transport: pg_queue → fresh PgBarrier()
(bypasses the WORKER_BARRIER_BACKEND singleton), else the singleton.
- pg_barrier.PgBarrier.enqueue(transport): pg_queue → fire-and-forget mode —
_dispatch_header_pg sends each header via dispatch(backend=PG) with an injected
_barrier_context {execution_id, batch_index, callback_descriptor}, no .link;
descriptor marked backend=pg_queue; UPSERT block also clears pg_batch_dedup
(greptile #2068 reuse-reset). _fire_barrier_callback self-chains the callback
onto PG when backend==pg_queue. clear_execution_batches at finalise + abort.
run_batch_with_barrier(): claim → work → in-body _barrier_pg_decrement;
redelivery skips; exception → barrier_pg_abort.
- file_processing.process_file_batch(_barrier_context=None): core routes None →
_run_batch_stages (celery chord path), else → run_batch_with_barrier.
- general/api fan-outs thread transport into create_chord_execution.
Tests: +8 PgBarrier fire-and-forget + 2 orchestration routing + 2 process_file_batch
routing. Each test file green alone; ruff clean. End-to-end forced-pg dev-test
pending before PR.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…ch_stages The extracted _run_batch_stages never uses task_instance — its only purpose (deriving celery_task_id) happens in _process_file_batch_core before the call. Removed the param + updated both call sites. _process_file_batch_core keeps task_instance (it reads .request.id). Routing test mocks with *a, unaffected. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
SonarCloud S1172 fixed —
|
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review — PG Queue 9e PR 2c (live fan-out/barrier/callback)
Multi-agent review (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier). The implementation is well-structured and heavily documented; the new tests pass. The dominant theme below: the new in-body fire-and-forget path lacks the .link_error safety net of the chord path, so several failure modes (decrement raise, suppressed abort, last-batch callback-dispatch failure) degrade to a silent ~6h hang-to-expiry that redelivery can't heal because the claim is already consumed. Findings are posted inline below, prioritised by severity.
…ing + typing/dedup/docs/tests Decision (with reviewer): reaper-as-safety-net for the un-catchable strand windows + fix what's catchable + document + gate on PR3. Failure handling: - [#69 Critical] run_batch_with_barrier wraps BOTH work + decrement in the abort: a decrement-side failure (guard / DB / last-batch callback dispatch) tears the barrier down in-body instead of stranding to expiry. - [#79] extracted _abort_barrier_in_body — logs when the teardown itself fails (was silently suppressed under a misleading "torn down" message). - [#74/#81] documented the two un-catchable strand windows (hard-crash-during-work, post-commit callback-dispatch-fail) as a HARD reaper dependency for PR3. - [#86] finalise cleanup split into independent try/excepts with distinct logs. Typing / clarity: - [#1] BarrierContext(TypedDict) for _barrier_context (header fan-out, run_batch_with_barrier, process_file_batch). - [#3] renamed CallbackDescriptor "backend" -> "transport" (WorkflowTransport value; avoids the QueueBackend "pg" collision). - [#27] is_pg_transport() predicate in core; used in orchestration_utils + pg_barrier. - [#20] extracted _dispatch_pg() — single home for cycle-avoiding local import + backend=PG. - [#35] normalize_transport() at the general worker entry (parity w/ api/scheduler). - [#94] log when a header has no queue option. - [#9/#13] fixed born-stale comment + kwargs-not-args docstring. Tests (+#37/#41): last-batch self-chains callback to PG + cleans up barrier/dedup; decrement-failure aborts; PG-branch mid-loop dispatch-failure deletes row; header args/queue/pre-existing-kwargs preservation. 137 barrier/dedup/routing tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review round addressed —
|
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Re-review of the post-hardening commit (f5c6b98). The fire-and-forget PG coordination is sound and the prior pass closed the major strand windows. After verifying against the at-least-once consumer semantics, I'm raising only one new item — a self-contradiction in the strand-window contract docstring that gates the PR-3 reaper spec. (I checked a candidate claim_batch-strand concern and confirmed it's a non-issue: the consumer redelivers a raised task on vt-expiry and the dedup marker isn't committed on a claim failure, so keeping claim_batch outside the in-body teardown is correct — aborting there would wrongly fail the whole execution on a transient blip.)
…eview)
The second "NOT catchable" bullet conflated two different things: it described
the in-body catchable abort ("the abort here removes the row") and a *software*
callback-dispatch failure — but that failure is already caught + torn down by
step 3's wrap (paragraph 1), so it doesn't belong under the un-catchable
heading, and on the PG path _fire_barrier_callback IS the enqueue so "committed
but before the enqueue" couldn't both hold.
Rewrote the bullet to the genuinely un-catchable window: a hard crash BETWEEN
the decrement committing (remaining→0) and the callback enqueue completing —
decrement committed (redelivery blocked by the marker), process gone before the
callback enqueues or any abort runs, row survives to expiry, reaper-only
recovery. Explicitly notes a software dispatch failure is the catchable case.
Keeps this list an accurate spec for the PR-3 reaper-recovery dependency.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_barrier.py | Core of the PR: adds _dispatch_headers, _dispatch_header_pg, _fairness_from_headers, _fire_barrier_callback, _abort_barrier_in_body, and run_batch_with_barrier — the full PG fire-and-forget fan-out/barrier/callback chain. Previous review findings are correctly addressed. Minor: inconsistent key access in _fairness_from_headers and a misleading abort-reason string when decrement-side failures are caught. |
| workers/file_processing/tasks.py | Refactors _process_file_batch_core to delegate to run_batch_with_barrier when _barrier_context is set (PG path), otherwise runs stages directly (Celery path). Clean routing split with no Celery-path regressions. |
| workers/shared/workflow/execution/orchestration_utils.py | Adds _barrier_for_transport to select a fresh PgBarrier on pg_queue transport vs. the env-selected singleton on Celery, and threads transport through create_chord_execution. Logic is correct and tested. |
| workers/queue_backend/barrier.py | Adds BarrierContext TypedDict and a transport NotRequired field to CallbackDescriptor; extends the Barrier Protocol with a transport param. All Celery/Redis implementations accept and ignore it. Clean Protocol extension. |
| workers/tests/test_pg_barrier.py | Adds 12 new tests covering PG fire-and-forget dispatch, reuse dedup cleanup, first-delivery/redelivery/exception paths, fairness reconstruction, callback routing, and the corrected mid-loop dispatch failure test. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant O as Orchestrator
participant PB as PgBarrier.enqueue()
participant PQ as PG Queue
participant W as Worker (process_file_batch)
participant D as _barrier_pg_decrement
participant CB as Callback Task
O->>PB: "create_chord_execution(transport=pg_queue)"
PB->>PB: "UPSERT pg_barrier_state(remaining=N) + DELETE pg_batch_dedup"
loop for each header task
PB->>PQ: "dispatch(task + _barrier_context, backend=PG)"
end
PQ->>W: deliver process_file_batch(_barrier_context)
W->>W: claim_batch: True or False(skip redelivery)
W->>W: "_run_batch_stages() -> result"
W->>D: _barrier_pg_decrement(result)
D->>D: "UPDATE remaining-=1, append result"
alt "remaining > 0"
D-->>W: pending
else "remaining == 0"
D->>CB: "_fire_barrier_callback -> dispatch(backend=PG)"
D->>D: DELETE pg_barrier_state + pg_batch_dedup
CB-->>O: callback fires
end
alt exception
W->>W: "_abort_barrier_in_body -> barrier_pg_abort + clear_execution_batches"
end
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant O as Orchestrator
participant PB as PgBarrier.enqueue()
participant PQ as PG Queue
participant W as Worker (process_file_batch)
participant D as _barrier_pg_decrement
participant CB as Callback Task
O->>PB: "create_chord_execution(transport=pg_queue)"
PB->>PB: "UPSERT pg_barrier_state(remaining=N) + DELETE pg_batch_dedup"
loop for each header task
PB->>PQ: "dispatch(task + _barrier_context, backend=PG)"
end
PQ->>W: deliver process_file_batch(_barrier_context)
W->>W: claim_batch: True or False(skip redelivery)
W->>W: "_run_batch_stages() -> result"
W->>D: _barrier_pg_decrement(result)
D->>D: "UPDATE remaining-=1, append result"
alt "remaining > 0"
D-->>W: pending
else "remaining == 0"
D->>CB: "_fire_barrier_callback -> dispatch(backend=PG)"
D->>D: DELETE pg_barrier_state + pg_batch_dedup
CB-->>O: callback fires
end
alt exception
W->>W: "_abort_barrier_in_body -> barrier_pg_abort + clear_execution_batches"
end
Reviews (3): Last reviewed commit: "UN-3563 fix greptile #2069: mid-loop ded..." | Re-trigger Greptile
…re + carry fairness on PG callback Both in the gated PG path (greptile 4/5, safe to merge). - Issue 1: PgBarrier.enqueue mid-loop dispatch-failure handler now also calls clear_execution_batches on the PG path. Earlier headers may have committed a claim_batch marker; with the barrier row deleted, their in-flight barrier_pg_abort is a no-op (already_aborted) and never reaches the clear inside it, so reclaim the markers directly here. - Issue 2: the PG callback now carries the producer's fairness. Added _fairness_from_headers() to reconstruct the FairnessKey from the stored x-fairness-key headers and pass it to _dispatch_pg, so the callback rides the same org/priority as the Celery path (was always default priority). Tests: +fairness-carried / +fairness-none-safe on _fire_barrier_callback; extended the PG mid-loop test to assert an already-claimed marker is reclaimed. 75 barrier/dedup tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@greptile-apps — both findings fixed (
|
|
Both fixes look solid. The implementation is clean and the tests are thorough. Issue 1 (mid-loop dedup cleanup): The Issue 2 (PG callback fairness): 75 tests, both fixes targeted. LGTM — good to merge when ready. |
…lexity (17→under 15) Extracted the per-header dispatch loop into PgBarrier._dispatch_headers — the deeply-nested for→try/except→if/else→if (PG-vs-celery branch + mid-loop failure teardown + PG dedup-clear) was the complexity driver. enqueue now calls the helper; behaviour identical. radon: enqueue C(11)→B(6); ruff C901 passes. 75 barrier/dedup tests green; ruff + ruff-format clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
SonarCloud S3776 fixed —
|
…wrong reason The pre-seeded claim_batch marker was wiped by enqueue's UPSERT block (the reuse-reset DELETE) before the dispatch loop, so the mid-loop clear_execution_batches deleted 0 rows — the count==0 assertion passed on the UPSERT, not the guard under test. Now the first dispatch side-effect claims the marker AFTER the UPSERT (simulating a fast PG consumer), so the mid-loop clear is what removes it. Verified: with the clear disabled the marker orphans (count=1). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@greptile-apps — "test passes for the wrong reason" fixed (
|
|
The fix is exactly right and the reasoning is sound. The ordering invariant now correctly models what happens at runtime:
The old test was asserting The "verify it's load-bearing" check (no-op the mid-loop clear → count=1 orphan) is exactly the right validation criterion — it confirms causality, not just coincidence. Clean. |
|
b3f25d9
into
feat/UN-3445-pg-queue-integration
What
Third slice of 9e PR 2, after 2a (#2067) + 2b (#2068). This is the live switch: it wires the coupled pipeline's fan-out → barrier → callback onto the PG queue for a
transport=="pg_queue"execution (fire-and-forget self-chaining — no Celery chord /.link).Non-regressive by construction
resolve_transport()still returns"celery"(PR 3 wires Flipt), so the entire PG branch is present-but-unreachable — the default path is byte-identical. The flagpg_queue_execution_enabledstays off and is not consumed by any code until PR 3.Scope = hybrid
2c migrates the fan-out → barrier → callback (the at-least-once core). The orchestrator task (
async_execute_bin) stays on Celery; routing it onto PG (wholly-off-Celery) is a follow-up (2d).How it works (on
transport=="pg_queue")BarrierProtocol + Celery/Redis impls accept (and ignore) atransportparam;CallbackDescriptorgains an optionalbackendmarker.create_chord_executionroutespg_queueto a freshPgBarrier()— bypassing theWORKER_BARRIER_BACKENDsingleton (the celery substrate), so the transport selects the substrate.PgBarrier.enqueuefire-and-forget mode: each header dispatched viadispatch(backend=PG)with an injected_barrier_context {execution_id, batch_index, callback_descriptor}(no.link); the UPSERT block also clearspg_batch_dedup(the reuse-reset flagged in 2b review —@greptile-appsUN-3562 [FEAT] PG Queue 9e PR 2b — per-batch idempotency primitive (pg_batch_dedup + claim_batch / clear_execution_batches) #2068).process_file_batch(_barrier_context=None): the PG path runsrun_batch_with_barrier— claim the batch (idempotent on redelivery), run the work, decrement the barrier in-body; the task that drivesremaining → 0self-chains the callback onto PG viadispatch(backend=PG). Exceptions tear the barrier down in-body (mirrors the chord.link_error).clear_execution_batchesruns at finalise and abort.Mixed-version / rolling-deploy safety
Three layers:
transport,_barrier_context) have safe defaults, so an old payload on a new worker falls back to celery/legacy.transportabsorbed by old workers — orchestrator tasks + the callback have**kwargs, so a pre-version worker swallows an unknowntransport(noTypeError)._barrier_contextnever reaches a Celery/old worker — injected only on thepg_queuepath, which is gated off until PR 3 and routes only to the PG consumer (this code). That's whyprocess_file_batchkeeps an explicit param, not a**kwargsswallow.Operational rule (naturally enforced by flag-default-off + separate PR): deploy 2c everywhere before PR 3 flips
pg_queue_execution_enabled.Tests
_barrier_contextinjection, reuse-clear,run_batch_with_barrierfirst/redelivery/exception, callback PG-vs-celery, abort-clears-dedup.pg_queue→ freshPgBarrier(not the singleton); celery → singleton withtransportthreaded.process_file_batchrouting:_barrier_contextNone → chord stages, set →run_batch_with_barrier.ruffclean.Dev-test (end-to-end, forced
pg_queue)The coupled pipeline ran end-to-end on Postgres, fire-and-forget, twice, clean teardown:
process_file_batchclaimed + ran on a PG consumer → in-body barrier decrement →remaining=0→ callback self-chained onto the PG callback queue → ran on a second PG consumer → execution COMPLETED.pg_barrier_stateandpg_batch_dedupboth empty afterward (finalise cleanup verified). No chord, no.link.Base
Targets the long-lived
feat/UN-3445-pg-queue-integrationintegration branch (notmain). Stacked on 2b.🤖 Generated with Claude Code