UN-3548 [FEAT] PgBarrier — Postgres fan-in barrier (3rd WORKER_BARRIER_BACKEND)#2053
UN-3548 [FEAT] PgBarrier — Postgres fan-in barrier (3rd WORKER_BARRIER_BACKEND)#2053muhammad-ali-e merged 3 commits into
Conversation
…R_BACKEND)
Add a Postgres Barrier substrate selected by WORKER_BARRIER_BACKEND=pg (default
stays chord). Moves the fan-in aggregation ("wait for N header tasks, then fire
the callback with their results") onto a pg_barrier_state row — the same DB that
holds the PG queue, so an execution can coordinate without Redis/RabbitMQ. The
9e pipeline on-ramp primitive.
Mirrors RedisDecrBarrier 1:1 — same Barrier protocol, fairness plumbing,
Celery-dispatched header tasks with .link/.link_error, empty->None,
missing-execution_id->raise, mid-loop dispatch cleanup. Defaults-off, zero
behaviour change until the flag flips.
- Schema (backend/pg_queue): pg_barrier_state (execution_id PK, remaining,
results jsonb, aborted, expires_at) + migration 0004.
- pg_barrier.py: PgBarrier + barrier_pg_decr_and_check / barrier_pg_abort.
Atomic decrement is ONE statement (UPDATE ... SET remaining = remaining-1,
results = results || jsonb_build_array(%s) ... RETURNING remaining, results,
aborted) — row lock serialises concurrent decrements so exactly one sees 0;
no Lua. Guards: reads aborted in the same statement (never fires partial),
row-missing / negative-remaining clean up without firing, callback dispatched
BEFORE row delete. Orphan bound via expires_at + opportunistic sweep in
enqueue (periodic sweep is the backstop).
- __init__.py: BarrierBackend.PG -> PgBarrier() in get_barrier().
Tests: protocol shape, TTL env validation, enqueue (upsert/links/fairness/
stale-reset/expiry-sweep/mid-loop-cleanup), decr paths (pending/complete-fires/
aborted/negative/missing/unserialisable), abort (claim+delete/dedup), and a real
two-connection decrement-atomicity check (exactly one sees 0). Selector PG case.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).
The implementation is high quality and faithfully mirrors RedisDecrBarrier — the atomic single-statement decrement, the aborted-read-in-decrement guard, and dispatch-before-delete are all sound. Findings below are inline. Highest priority: abort claim/delete atomicity, the per-enqueue orphan sweep on the hot path, and jsonb's rejection of \u0000 (which json.dumps accepts). Nothing here is blocking the safety-critical "never fire with partial results" property, which holds.
High: - Abort is now ONE atomic statement: `WITH claimed AS (DELETE ... RETURNING) ...` — claim+teardown in a single transaction (no claimed-but-not-deleted window; a crash rolls back so a sibling retries). This makes the `aborted` column redundant — dropped it; the decrement's "row missing -> abandoned" branch now covers the failed-task case. The callback can only fire when remaining hits 0 (all tasks succeeded), so a failed task (which deletes the row) can never let a partial-results fire. - Dropped the per-enqueue global orphan sweep (unbounded DELETE on the hot path, deadlock-prone, shared the UPSERT txn). Reclaim is a future periodic sweep. - A NUL byte survives json.dumps but jsonb rejects it -> catch the DataError and tear the barrier down (fail fast) instead of hanging to expiry. Medium: - Post-dispatch row delete is best-effort (logged, not raised) so a delete error can't mask the already-fired callback; documented the no-double-fire invariant (last decrement + max_retries=0). - Added a DB CheckConstraint (expires_at > created_at) — the one writer-proof invariant; Meta comment warns off a `remaining >= 0` check (teardown needs negative). Softened the "periodic sweep" comments to future/not-yet-shipped. Low: - Extracted shared `barrier_ttl_seconds()` + `CallbackDescriptor` into barrier.py; both backends import them (redis keeps back-compat aliases). signature_kwargs dict instead of inline ** spread. Atomicity comment notes the per-transaction premise. Tests: callback-dispatch-failure-preserves-row; decrement-after-abort-no-fire; atomicity through barrier_pg_decr_and_check (two threads, exactly one fires); list-result-as-single-element; NUL-byte teardown; DB-constraint; max_retries=0. 92 barrier tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Review feedback addressed —
|
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_barrier.py | Core PgBarrier implementation: thread-local psycopg2 connection, atomic UPDATE…RETURNING decrement, DELETE…RETURNING abort, and post-dispatch best-effort row cleanup. Minor double-logging on mid-loop dispatch failure; otherwise the atomicity model and failure-masking guards are correct. |
| workers/queue_backend/barrier.py | Extracts shared barrier_ttl_seconds() and CallbackDescriptor TypedDict from redis_barrier.py to a shared location; clean refactoring with no logic changes. |
| backend/pg_queue/models.py | Adds PgBarrierState model with execution_id PK, remaining, results (jsonb), created_at, expires_at; includes expires_after_created check constraint and expires_at index for the future sweep job. |
| backend/pg_queue/migrations/0004_pgbarrierstate_and_more.py | Django migration creating pg_barrier_state table with correct PK, jsonb column, expires_at index, and check constraint. Dependency chain is correct. |
| workers/queue_backend/init.py | Adds PgBarrier, barrier_pg_abort, barrier_pg_decr_and_check to exports and BarrierBackend.PG to the enum; get_barrier() dispatch path is exhaustive. |
| workers/queue_backend/redis_barrier.py | Deduplicates TTL logic and CallbackDescriptor by aliasing to barrier.py; no behavioural change, internal names preserved for backward compatibility. |
| workers/tests/test_pg_barrier.py | 26 tests in three layers: protocol shape (no DB), real injected psycopg2 connection for enqueue/decrement/abort paths, and a two-connection concurrency test verifying exactly-one-fire. Coverage is thorough. |
| workers/tests/test_barrier_backend_selection.py | Adds PG enum value and get_barrier() dispatch test for BarrierBackend.PG; clean extension of existing test class. |
Sequence Diagram
sequenceDiagram
participant P as Producer
participant DB as pg_barrier_state
participant W as Celery Worker
participant CB as Callback Task
P->>DB: "UPSERT row (remaining=N, results=[], expires_at=now+TTL)"
loop for each header task
P->>W: apply_async() with .link(decr_and_check) .link_error(abort)
end
Note over P,W: enqueue returns _PgBarrierHandle
par Header tasks run concurrently
W->>DB: "UPDATE remaining-1, results||result RETURNING remaining,results"
DB-->>W: "remaining=N-1 > 0 → pending"
and
W->>DB: "UPDATE remaining-1, results||result RETURNING remaining,results"
DB-->>W: "remaining=0 → complete"
W->>CB: "callback.apply_async(args=[all_results])"
W->>DB: DELETE row (best-effort)
and On any header task failure
W->>DB: DELETE WHERE execution_id RETURNING (atomic claim)
DB-->>W: claimed → aborted (row gone)
Note over W: Late decrements find no row → abandoned
end
Reviews (2): Last reviewed commit: "UN-3548 [DOCS] Drop stale aborted-column..." | Re-trigger Greptile
…ring The wire-model docstring's enqueue step still listed `aborted = false` as an UPSERT column after the column was removed (abort now dedups via DELETE … RETURNING / row existence). Remove it so a reader doesn't hunt for — or re-add — a column that no longer exists. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Greptile feedback addressedBoth findings were stale references my own refactor left behind (the code is correct) — Greptile 4/5, safe-to-merge.
No code-behaviour change beyond the docstring; 92 barrier tests still green. |
7c7617b
into
feat/UN-3445-pg-queue-integration
Adds a third
Barriersubstrate,PgBarrier, selected byWORKER_BARRIER_BACKEND=pg(default stayschord). It moves the fan-in aggregation — "wait for N header tasks, then fire the callback with their results" — onto a Postgres row, so an execution can coordinate in the same DB that holds the PG queue, with no Redis or RabbitMQ chord backend. The 9e pipeline on-ramp primitive. Targetsfeat/UN-3445-pg-queue-integration.Mirrors
RedisDecrBarrier(6b) 1:1 — sameBarrierprotocol, fairness plumbing, Celery-dispatched header tasks with.link/.link_error,empty → None, missing-execution_id→ raise, mid-loop dispatch cleanup. Defaults-off, zero behaviour change until the flag flips. (Header-task transport stays Celery; only the coordination moves — routing header tasks through PG is 9e proper.)What
backend/pg_queue/):pg_barrier_state(execution_id PK, remaining, results jsonb, aborted, expires_at) + migration0004.pg_barrier.py:PgBarrier+barrier_pg_decr_and_check/barrier_pg_abort.__init__.py:BarrierBackend.PG→PgBarrier()inget_barrier().The atomic decrement is one statement (no Lua)
The row lock serialises concurrent decrements, so exactly one task observes
remaining = 0— verified by a real two-connection atomicity test.Failure-masking guards
abortedin the same statement → never fires the callback with partial results, even if an abort set the flag but its DELETE hadn't run.remaining < 0(expiry/replay) → clean up, no fire.apply_asyncfailure leaves the row + expiry in place rather than stranding the execution.UPDATE … SET aborted WHERE NOT aborted RETURNING.expires_at; a periodic sweep job (not yet shipped) is the reclaim backstop. (Abort dedups viaDELETE … RETURNING/ row existence — noabortedcolumn.)Testing
pgcase — protocol shape, TTL env validation, enqueue (upsert / links / fairness / stale-reset / mid-loop-cleanup), decrement paths (pending / complete-fires / decrement-after-abort / negative / missing / unserialisable / NUL-teardown / dispatch-failure-preserves-row / list-result), abort (claim+delete / dedup), a real two-connection atomicity check driven throughbarrier_pg_decr_and_check(exactly one fires), plus DB-constraint andmax_retries=0.get_barrier(pg)→PgBarrier.pg_barrier_staterow via the liveenqueuepath (remaining counts down, results grows) to confirm the on-the-wire shape.Out of scope
🤖 Generated with Claude Code