Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3548 [FEAT] PgBarrier — Postgres fan-in barrier (3rd WORKER_BARRIER_BACKEND)#2053

Merged
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3548-pg-barrierZipstack/unstract:UN-3548-pg-barrierCopy head branch name to clipboard
Jun 15, 2026
Merged

UN-3548 [FEAT] PgBarrier — Postgres fan-in barrier (3rd WORKER_BARRIER_BACKEND)#2053
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3548-pg-barrierZipstack/unstract:UN-3548-pg-barrierCopy head branch name to clipboard

Conversation

@muhammad-ali-e

@muhammad-ali-e muhammad-ali-e commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Adds a third Barrier substrate, PgBarrier, selected by WORKER_BARRIER_BACKEND=pg (default stays chord). It moves the fan-in aggregation — "wait for N header tasks, then fire the callback with their results" — onto a Postgres row, so an execution can coordinate in the same DB that holds the PG queue, with no Redis or RabbitMQ chord backend. The 9e pipeline on-ramp primitive. Targets feat/UN-3445-pg-queue-integration.

Mirrors RedisDecrBarrier (6b) 1:1 — same Barrier protocol, fairness plumbing, Celery-dispatched header tasks with .link/.link_error, empty → None, missing-execution_id → raise, mid-loop dispatch cleanup. Defaults-off, zero behaviour change until the flag flips. (Header-task transport stays Celery; only the coordination moves — routing header tasks through PG is 9e proper.)

What

  • Schema (backend/pg_queue/): pg_barrier_state (execution_id PK, remaining, results jsonb, aborted, expires_at) + migration 0004.
  • pg_barrier.py: PgBarrier + barrier_pg_decr_and_check / barrier_pg_abort.
  • __init__.py: BarrierBackend.PGPgBarrier() in get_barrier().

The atomic decrement is one statement (no Lua)

UPDATE pg_barrier_state
   SET remaining = remaining - 1,
       results   = results || jsonb_build_array(%s::jsonb)
 WHERE execution_id = %s
RETURNING remaining, results, aborted

The row lock serialises concurrent decrements, so exactly one task observes remaining = 0 — verified by a real two-connection atomicity test.

Failure-masking guards

  • Reads aborted in the same statement → never fires the callback with partial results, even if an abort set the flag but its DELETE hadn't run.
  • Row-missing (already torn down) / remaining < 0 (expiry/replay) → clean up, no fire.
  • Callback dispatched before the row is deleted → a callback apply_async failure leaves the row + expiry in place rather than stranding the execution.
  • Concurrent aborts collapse to one cleanup via UPDATE … SET aborted WHERE NOT aborted RETURNING.
  • Orphan bound via expires_at; a periodic sweep job (not yet shipped) is the reclaim backstop. (Abort dedups via DELETE … RETURNING / row existence — no aborted column.)

Testing

  • 26 PgBarrier tests + the selector pg case — protocol shape, TTL env validation, enqueue (upsert / links / fairness / stale-reset / mid-loop-cleanup), decrement paths (pending / complete-fires / decrement-after-abort / negative / missing / unserialisable / NUL-teardown / dispatch-failure-preserves-row / list-result), abort (claim+delete / dedup), a real two-connection atomicity check driven through barrier_pg_decr_and_check (exactly one fires), plus DB-constraint and max_retries=0.
  • 53 barrier-related tests pass (3× stable); migration applies clean; get_barrier(pg)PgBarrier.
  • Inspected a real pg_barrier_state row via the live enqueue path (remaining counts down, results grows) to confirm the on-the-wire shape.

Out of scope

  • Routing header tasks themselves through PG (9e). A periodic orphan-sweep job (future orchestrator REAP).

🤖 Generated with Claude Code

…R_BACKEND)

Add a Postgres Barrier substrate selected by WORKER_BARRIER_BACKEND=pg (default
stays chord). Moves the fan-in aggregation ("wait for N header tasks, then fire
the callback with their results") onto a pg_barrier_state row — the same DB that
holds the PG queue, so an execution can coordinate without Redis/RabbitMQ. The
9e pipeline on-ramp primitive.

Mirrors RedisDecrBarrier 1:1 — same Barrier protocol, fairness plumbing,
Celery-dispatched header tasks with .link/.link_error, empty->None,
missing-execution_id->raise, mid-loop dispatch cleanup. Defaults-off, zero
behaviour change until the flag flips.

- Schema (backend/pg_queue): pg_barrier_state (execution_id PK, remaining,
  results jsonb, aborted, expires_at) + migration 0004.
- pg_barrier.py: PgBarrier + barrier_pg_decr_and_check / barrier_pg_abort.
  Atomic decrement is ONE statement (UPDATE ... SET remaining = remaining-1,
  results = results || jsonb_build_array(%s) ... RETURNING remaining, results,
  aborted) — row lock serialises concurrent decrements so exactly one sees 0;
  no Lua. Guards: reads aborted in the same statement (never fires partial),
  row-missing / negative-remaining clean up without firing, callback dispatched
  BEFORE row delete. Orphan bound via expires_at + opportunistic sweep in
  enqueue (periodic sweep is the backstop).
- __init__.py: BarrierBackend.PG -> PgBarrier() in get_barrier().

Tests: protocol shape, TTL env validation, enqueue (upsert/links/fairness/
stale-reset/expiry-sweep/mid-loop-cleanup), decr paths (pending/complete-fires/
aborted/negative/missing/unserialisable), abort (claim+delete/dedup), and a real
two-connection decrement-atomicity check (exactly one sees 0). Selector PG case.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 914d8e5c-c04e-49a4-8120-98f3e05ed526

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3548-pg-barrier

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).

The implementation is high quality and faithfully mirrors RedisDecrBarrier — the atomic single-statement decrement, the aborted-read-in-decrement guard, and dispatch-before-delete are all sound. Findings below are inline. Highest priority: abort claim/delete atomicity, the per-enqueue orphan sweep on the hot path, and jsonb's rejection of \u0000 (which json.dumps accepts). Nothing here is blocking the safety-critical "never fire with partial results" property, which holds.

Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/tests/test_pg_barrier.py
Comment thread workers/tests/test_pg_barrier.py Outdated
Comment thread workers/tests/test_pg_barrier.py Outdated
High:
- Abort is now ONE atomic statement: `WITH claimed AS (DELETE ... RETURNING) ...`
  — claim+teardown in a single transaction (no claimed-but-not-deleted window;
  a crash rolls back so a sibling retries). This makes the `aborted` column
  redundant — dropped it; the decrement's "row missing -> abandoned" branch now
  covers the failed-task case. The callback can only fire when remaining hits 0
  (all tasks succeeded), so a failed task (which deletes the row) can never let a
  partial-results fire.
- Dropped the per-enqueue global orphan sweep (unbounded DELETE on the hot path,
  deadlock-prone, shared the UPSERT txn). Reclaim is a future periodic sweep.
- A NUL byte survives json.dumps but jsonb rejects it -> catch the DataError and
  tear the barrier down (fail fast) instead of hanging to expiry.

Medium:
- Post-dispatch row delete is best-effort (logged, not raised) so a delete error
  can't mask the already-fired callback; documented the no-double-fire invariant
  (last decrement + max_retries=0).
- Added a DB CheckConstraint (expires_at > created_at) — the one writer-proof
  invariant; Meta comment warns off a `remaining >= 0` check (teardown needs
  negative). Softened the "periodic sweep" comments to future/not-yet-shipped.

Low:
- Extracted shared `barrier_ttl_seconds()` + `CallbackDescriptor` into barrier.py;
  both backends import them (redis keeps back-compat aliases). signature_kwargs
  dict instead of inline ** spread. Atomicity comment notes the per-transaction
  premise.

Tests: callback-dispatch-failure-preserves-row; decrement-after-abort-no-fire;
atomicity through barrier_pg_decr_and_check (two threads, exactly one fires);
list-result-as-single-element; NUL-byte teardown; DB-constraint; max_retries=0.
92 barrier tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review feedback addressed — 6ec036882

Thanks — all 13 findings handled (threads replied inline). A couple led to a nice simplification.

High

  • Abort atomicity → collapsed claim+delete into one WITH claimed AS (DELETE … RETURNING) statement. That made the aborted column redundant, so I dropped it — the "row missing → abandoned" decrement branch covers the failed-task case, and the callback can only fire when remaining hits 0 (all succeeded), so a failed task can never let a partial fire.
  • Hot-path orphan sweep → removed; reclaim is a future periodic sweep.
  • NUL byte / jsonb → catch DataError, tear the barrier down (fail fast) instead of hanging to expiry.

Medium

  • Post-dispatch delete is best-effort (logged, not raised) so it can't mask the already-fired callback.
  • Added a DB CheckConstraint(expires_at > created_at) (+ a "do not add remaining >= 0" Meta note); softened the periodic-sweep comments to future/not-yet-shipped.

Low — extracted shared barrier_ttl_seconds() + CallbackDescriptor into barrier.py (redis keeps back-compat aliases); signature_kwargs dict; atomicity-premise comment.

Tests — callback-dispatch-failure-preserves-row, decrement-after-abort-no-fire (the new structural guard), atomicity through the task (two threads, exactly one fires), list-result-as-single-element, NUL-byte teardown, DB constraint, max_retries=0.

Verified: 92 barrier tests pass (2× stable); migration regenerated + applied clean. The aborted column removal regenerated migration 0004 (renamed; the PR isn't merged so the migration is safe to replace).

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 15, 2026 07:19
@greptile-apps

greptile-apps Bot commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds PgBarrier, a third Barrier substrate selected via WORKER_BARRIER_BACKEND=pg, which moves fan-in coordination from Redis/RabbitMQ onto a pg_barrier_state Postgres row — enabling executions to coordinate in the same DB that holds the PG queue with no additional infrastructure.

  • Atomicity model: a single UPDATE … RETURNING statement serialises concurrent decrements through the row lock, guaranteeing exactly one task observes remaining = 0 and fires the callback; abort uses DELETE … RETURNING as its dedup token, making concurrent failures collapse to one cleanup.
  • Failure guards: callback dispatch happens before row deletion so a broker failure preserves state; the remaining < 0 branch handles expiry/replay without a double-fire; max_retries=0 on both link tasks prevents counter corruption from Celery retries.
  • Test coverage: 26 tests across three layers including a real two-connection concurrency check; migration, model, enum, and factory selector all tested.

Confidence Score: 5/5

Safe to merge; gated by a flag that defaults off, so no behaviour change until explicitly enabled.

The atomicity model is sound — PostgreSQL row locks correctly serialise concurrent decrements and the DELETE-based abort dedup works. Connection recovery, the remaining < 0 guard, dispatch-before-delete ordering, and max_retries=0 are all correctly reasoned. The one finding is a double log line on mid-loop dispatch failure (inner and outer handlers both log), which is noisy but not a correctness defect. All changed paths are defaults-off.

workers/queue_backend/pg_barrier.py — the double-logging path in enqueue is worth cleaning up before the flag is turned on in production.

Important Files Changed

Filename Overview
workers/queue_backend/pg_barrier.py Core PgBarrier implementation: thread-local psycopg2 connection, atomic UPDATE…RETURNING decrement, DELETE…RETURNING abort, and post-dispatch best-effort row cleanup. Minor double-logging on mid-loop dispatch failure; otherwise the atomicity model and failure-masking guards are correct.
workers/queue_backend/barrier.py Extracts shared barrier_ttl_seconds() and CallbackDescriptor TypedDict from redis_barrier.py to a shared location; clean refactoring with no logic changes.
backend/pg_queue/models.py Adds PgBarrierState model with execution_id PK, remaining, results (jsonb), created_at, expires_at; includes expires_after_created check constraint and expires_at index for the future sweep job.
backend/pg_queue/migrations/0004_pgbarrierstate_and_more.py Django migration creating pg_barrier_state table with correct PK, jsonb column, expires_at index, and check constraint. Dependency chain is correct.
workers/queue_backend/init.py Adds PgBarrier, barrier_pg_abort, barrier_pg_decr_and_check to exports and BarrierBackend.PG to the enum; get_barrier() dispatch path is exhaustive.
workers/queue_backend/redis_barrier.py Deduplicates TTL logic and CallbackDescriptor by aliasing to barrier.py; no behavioural change, internal names preserved for backward compatibility.
workers/tests/test_pg_barrier.py 26 tests in three layers: protocol shape (no DB), real injected psycopg2 connection for enqueue/decrement/abort paths, and a two-connection concurrency test verifying exactly-one-fire. Coverage is thorough.
workers/tests/test_barrier_backend_selection.py Adds PG enum value and get_barrier() dispatch test for BarrierBackend.PG; clean extension of existing test class.

Sequence Diagram

sequenceDiagram
    participant P as Producer
    participant DB as pg_barrier_state
    participant W as Celery Worker
    participant CB as Callback Task

    P->>DB: "UPSERT row (remaining=N, results=[], expires_at=now+TTL)"
    loop for each header task
        P->>W: apply_async() with .link(decr_and_check) .link_error(abort)
    end
    Note over P,W: enqueue returns _PgBarrierHandle

    par Header tasks run concurrently
        W->>DB: "UPDATE remaining-1, results||result RETURNING remaining,results"
        DB-->>W: "remaining=N-1 > 0 → pending"
    and
        W->>DB: "UPDATE remaining-1, results||result RETURNING remaining,results"
        DB-->>W: "remaining=0 → complete"
        W->>CB: "callback.apply_async(args=[all_results])"
        W->>DB: DELETE row (best-effort)
    and On any header task failure
        W->>DB: DELETE WHERE execution_id RETURNING (atomic claim)
        DB-->>W: claimed → aborted (row gone)
        Note over W: Late decrements find no row → abandoned
    end
Loading

Reviews (2): Last reviewed commit: "UN-3548 [DOCS] Drop stale aborted-column..." | Re-trigger Greptile

Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/tests/test_pg_barrier.py
…ring

The wire-model docstring's enqueue step still listed `aborted = false` as an
UPSERT column after the column was removed (abort now dedups via DELETE …
RETURNING / row existence). Remove it so a reader doesn't hunt for — or
re-add — a column that no longer exists.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Greptile feedback addressed

Both findings were stale references my own refactor left behind (the code is correct) — Greptile 4/5, safe-to-merge.

  • aborted-column docstring (pg_barrier.py) → removed from the enqueue UPSERT line (3966aed97). Abort dedups via DELETE … RETURNING; no such column.
  • PR description claimed an "expiry-sweep" test → corrected the PR body. The inline sweep (and its test) were removed in the toolkit pass; reclaim is a future periodic sweep, untested this phase by design.

No code-behaviour change beyond the docstring; 92 barrier tests still green.

@muhammad-ali-e muhammad-ali-e merged commit 7c7617b into feat/UN-3445-pg-queue-integration Jun 15, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3548-pg-barrier branch June 15, 2026 08:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.