Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3563 [FEAT] PG Queue 9e PR 2c — live PG fan-out/barrier/callback (fire-and-forget)#2069

Merged
muhammad-ali-e merged 7 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3563-pg-pipeline-2c-live-switchZipstack/unstract:UN-3563-pg-pipeline-2c-live-switchCopy head branch name to clipboard
Jun 17, 2026
Merged

UN-3563 [FEAT] PG Queue 9e PR 2c — live PG fan-out/barrier/callback (fire-and-forget)#2069
muhammad-ali-e merged 7 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3563-pg-pipeline-2c-live-switchZipstack/unstract:UN-3563-pg-pipeline-2c-live-switchCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Third slice of 9e PR 2, after 2a (#2067) + 2b (#2068). This is the live switch: it wires the coupled pipeline's fan-out → barrier → callback onto the PG queue for a transport=="pg_queue" execution (fire-and-forget self-chaining — no Celery chord / .link).

Non-regressive by construction

resolve_transport() still returns "celery" (PR 3 wires Flipt), so the entire PG branch is present-but-unreachable — the default path is byte-identical. The flag pg_queue_execution_enabled stays off and is not consumed by any code until PR 3.

Scope = hybrid

2c migrates the fan-out → barrier → callback (the at-least-once core). The orchestrator task (async_execute_bin) stays on Celery; routing it onto PG (wholly-off-Celery) is a follow-up (2d).

How it works (on transport=="pg_queue")

  • Barrier Protocol + Celery/Redis impls accept (and ignore) a transport param; CallbackDescriptor gains an optional backend marker.
  • create_chord_execution routes pg_queue to a fresh PgBarrier() — bypassing the WORKER_BARRIER_BACKEND singleton (the celery substrate), so the transport selects the substrate.
  • PgBarrier.enqueue fire-and-forget mode: each header dispatched via dispatch(backend=PG) with an injected _barrier_context {execution_id, batch_index, callback_descriptor} (no .link); the UPSERT block also clears pg_batch_dedup (the reuse-reset flagged in 2b review — @greptile-apps UN-3562 [FEAT] PG Queue 9e PR 2b — per-batch idempotency primitive (pg_batch_dedup + claim_batch / clear_execution_batches) #2068).
  • process_file_batch(_barrier_context=None): the PG path runs run_batch_with_barrierclaim the batch (idempotent on redelivery), run the work, decrement the barrier in-body; the task that drives remaining → 0 self-chains the callback onto PG via dispatch(backend=PG). Exceptions tear the barrier down in-body (mirrors the chord .link_error). clear_execution_batches runs at finalise and abort.

Mixed-version / rolling-deploy safety

Three layers:

  1. Defaults — new params (transport, _barrier_context) have safe defaults, so an old payload on a new worker falls back to celery/legacy.
  2. transport absorbed by old workers — orchestrator tasks + the callback have **kwargs, so a pre-version worker swallows an unknown transport (no TypeError).
  3. _barrier_context never reaches a Celery/old worker — injected only on the pg_queue path, which is gated off until PR 3 and routes only to the PG consumer (this code). That's why process_file_batch keeps an explicit param, not a **kwargs swallow.

Operational rule (naturally enforced by flag-default-off + separate PR): deploy 2c everywhere before PR 3 flips pg_queue_execution_enabled.

Tests

  • +8 PgBarrier fire-and-forget: header PG-dispatch + _barrier_context injection, reuse-clear, run_batch_with_barrier first/redelivery/exception, callback PG-vs-celery, abort-clears-dedup.
  • +2 orchestration routing: pg_queue → fresh PgBarrier (not the singleton); celery → singleton with transport threaded.
  • +2 process_file_batch routing: _barrier_context None → chord stages, set → run_batch_with_barrier.
  • Each test file green alone; ruff clean.

Dev-test (end-to-end, forced pg_queue)

The coupled pipeline ran end-to-end on Postgres, fire-and-forget, twice, clean teardown:
process_file_batch claimed + ran on a PG consumer → in-body barrier decrement → remaining=0 → callback self-chained onto the PG callback queue → ran on a second PG consumer → execution COMPLETED. pg_barrier_state and pg_batch_dedup both empty afterward (finalise cleanup verified). No chord, no .link.

Base

Targets the long-lived feat/UN-3445-pg-queue-integration integration branch (not main). Stacked on 2b.

🤖 Generated with Claude Code

…fire-and-forget)

Wires the coupled pipeline's fan-out → barrier → callback onto the PG queue for
a transport=="pg_queue" execution. Gated: resolve_transport() still returns
celery (PR3 Flipt flips it), so the whole PG branch is present-but-unreachable —
default path byte-identical. Orchestrator task (async_execute_bin) stays on
Celery (hybrid); routing it onto PG is a 2d follow-up.

- barrier.py: Barrier Protocol + CeleryChordBarrier/RedisDecrBarrier accept (and
  ignore) a `transport` param; CallbackDescriptor gains an optional `backend`.
- orchestration_utils._barrier_for_transport: pg_queue → fresh PgBarrier()
  (bypasses the WORKER_BARRIER_BACKEND singleton), else the singleton.
- pg_barrier.PgBarrier.enqueue(transport): pg_queue → fire-and-forget mode —
  _dispatch_header_pg sends each header via dispatch(backend=PG) with an injected
  _barrier_context {execution_id, batch_index, callback_descriptor}, no .link;
  descriptor marked backend=pg_queue; UPSERT block also clears pg_batch_dedup
  (greptile #2068 reuse-reset). _fire_barrier_callback self-chains the callback
  onto PG when backend==pg_queue. clear_execution_batches at finalise + abort.
  run_batch_with_barrier(): claim → work → in-body _barrier_pg_decrement;
  redelivery skips; exception → barrier_pg_abort.
- file_processing.process_file_batch(_barrier_context=None): core routes None →
  _run_batch_stages (celery chord path), else → run_batch_with_barrier.
- general/api fan-outs thread transport into create_chord_execution.

Tests: +8 PgBarrier fire-and-forget + 2 orchestration routing + 2 process_file_batch
routing. Each test file green alone; ruff clean. End-to-end forced-pg dev-test
pending before PR.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d11cb45b-4847-4221-8eb4-006744aa8727

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3563-pg-pipeline-2c-live-switch

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

…ch_stages

The extracted _run_batch_stages never uses task_instance — its only purpose
(deriving celery_task_id) happens in _process_file_batch_core before the call.
Removed the param + updated both call sites. _process_file_batch_core keeps
task_instance (it reads .request.id). Routing test mocks with *a, unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud S1172 fixed — d986c3823

_run_batch_stages no longer takes the unused task_instance param. When I extracted it from _process_file_batch_core, task_instance's only use (deriving celery_task_id) stayed in the caller, so the extracted helper received it but never used it. Removed the param + updated both call sites; _process_file_batch_core keeps it (it reads .request.id). Routing test mocks with *a, unaffected; ruff clean.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR Review — PG Queue 9e PR 2c (live fan-out/barrier/callback)

Multi-agent review (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier). The implementation is well-structured and heavily documented; the new tests pass. The dominant theme below: the new in-body fire-and-forget path lacks the .link_error safety net of the chord path, so several failure modes (decrement raise, suppressed abort, last-batch callback-dispatch failure) degrade to a silent ~6h hang-to-expiry that redelivery can't heal because the claim is already consumed. Findings are posted inline below, prioritised by severity.

Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/shared/workflow/execution/orchestration_utils.py Outdated
Comment thread workers/general/tasks.py
Comment thread workers/tests/test_pg_barrier.py
Comment thread workers/tests/test_pg_barrier.py
…ing + typing/dedup/docs/tests

Decision (with reviewer): reaper-as-safety-net for the un-catchable strand
windows + fix what's catchable + document + gate on PR3.

Failure handling:
- [#69 Critical] run_batch_with_barrier wraps BOTH work + decrement in the abort:
  a decrement-side failure (guard / DB / last-batch callback dispatch) tears the
  barrier down in-body instead of stranding to expiry.
- [#79] extracted _abort_barrier_in_body — logs when the teardown itself fails
  (was silently suppressed under a misleading "torn down" message).
- [#74/#81] documented the two un-catchable strand windows (hard-crash-during-work,
  post-commit callback-dispatch-fail) as a HARD reaper dependency for PR3.
- [#86] finalise cleanup split into independent try/excepts with distinct logs.

Typing / clarity:
- [#1] BarrierContext(TypedDict) for _barrier_context (header fan-out, run_batch_with_barrier,
  process_file_batch).
- [#3] renamed CallbackDescriptor "backend" -> "transport" (WorkflowTransport value;
  avoids the QueueBackend "pg" collision).
- [#27] is_pg_transport() predicate in core; used in orchestration_utils + pg_barrier.
- [#20] extracted _dispatch_pg() — single home for cycle-avoiding local import + backend=PG.
- [#35] normalize_transport() at the general worker entry (parity w/ api/scheduler).
- [#94] log when a header has no queue option.
- [#9/#13] fixed born-stale comment + kwargs-not-args docstring.

Tests (+#37/#41): last-batch self-chains callback to PG + cleans up barrier/dedup;
decrement-failure aborts; PG-branch mid-loop dispatch-failure deletes row; header
args/queue/pre-existing-kwargs preservation. 137 barrier/dedup/routing tests green;
bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round addressed — f5c6b98d5

Thanks @muhammad-ali-e — excellent review, especially the strand-on-failure analysis. All 15 handled.

Decision on the strand windows (Critical #69 + High #74 + High #81): reaper-as-safety-net (confirmed with the author). Fixed everything catchable in-process; the un-catchable windows are now documented as a hard reaper dependency for PR 3 rather than "future work". 2c is gated off (resolve_transport→celery), so none of these can bite until the reaper recovery net exists.

Failure handling

Typing / clarity

Tests (#37/#41) — last-batch self-chains callback to PG + cleans up barrier/dedup; decrement-failure aborts; PG-branch mid-loop dispatch-failure deletes row; header args/queue/pre-existing-kwargs preservation. (Claim race already covered at the claim_batch level — run_batch_with_barrier decrements iff the claim won.)

Verification: 137 barrier/dedup/routing tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg; ruff clean. Still inert — net behaviour on the default path: NONE.

The reaper-recovery dependency (sweep stranded barriers → mark ERROR / re-drive, reclaim pg_batch_dedup) is now an explicit gate on enabling pg_queue_execution_enabled in PR 3.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-review of the post-hardening commit (f5c6b98). The fire-and-forget PG coordination is sound and the prior pass closed the major strand windows. After verifying against the at-least-once consumer semantics, I'm raising only one new item — a self-contradiction in the strand-window contract docstring that gates the PR-3 reaper spec. (I checked a candidate claim_batch-strand concern and confirmed it's a non-issue: the consumer redelivers a raised task on vt-expiry and the dedup marker isn't committed on a claim failure, so keeping claim_batch outside the in-body teardown is correct — aborting there would wrongly fail the whole execution on a transient blip.)

Comment thread workers/queue_backend/pg_barrier.py Outdated
…eview)

The second "NOT catchable" bullet conflated two different things: it described
the in-body catchable abort ("the abort here removes the row") and a *software*
callback-dispatch failure — but that failure is already caught + torn down by
step 3's wrap (paragraph 1), so it doesn't belong under the un-catchable
heading, and on the PG path _fire_barrier_callback IS the enqueue so "committed
but before the enqueue" couldn't both hold.

Rewrote the bullet to the genuinely un-catchable window: a hard crash BETWEEN
the decrement committing (remaining→0) and the callback enqueue completing —
decrement committed (redelivery blocked by the marker), process gone before the
callback enqueues or any abort runs, row survives to expiry, reaper-only
recovery. Explicitly notes a software dispatch failure is the catchable case.
Keeps this list an accurate spec for the PR-3 reaper-recovery dependency.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 17, 2026 11:51
@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR wires the coupled pipeline's fan-out → barrier → callback onto the PG queue (transport==\"pg_queue\") via fire-and-forget self-chaining — no Celery chord or .link. The path is present-but-unreachable until PR 3 flips pg_queue_execution_enabled, so the default Celery path is byte-identical to pre-change.

  • PgBarrier gains a transport parameter: on pg_queue, header tasks are dispatched onto PG with an injected _barrier_context (no .link); the UPSERT block atomically resets pg_batch_dedup markers for execution reuse; mid-loop failures now call clear_execution_batches (addressing the previous review finding).
  • run_batch_with_barrier (new) implements the in-body claim → work → decrement protocol: claim_batch provides idempotent exactly-once decrement on redelivery; the last batch to reach remaining=0 self-chains the aggregating callback onto PG.
  • _process_file_batch_core routes by barrier_context: None → Celery chord path unchanged; non-Nonerun_batch_with_barrier in-body path.

Confidence Score: 5/5

Safe to merge — the entire PG branch is unreachable until a separate feature flag is flipped in PR 3, so no production path is affected by this change.

The Celery default path is byte-identical to pre-change. Both previous inline review findings (mid-loop dedup cleanup, fairness-header parity on PG callback) are correctly addressed with matching tests. The new in-body barrier protocol is well-guarded: idempotent claim, transaction-status guard on the decrement, and in-body abort mirroring .link_error. The two minor observations do not affect correctness or reachability.

workers/queue_backend/pg_barrier.py — the sole file with minor notes; all other changed files are clean.

Important Files Changed

Filename Overview
workers/queue_backend/pg_barrier.py Core of the PR: adds _dispatch_headers, _dispatch_header_pg, _fairness_from_headers, _fire_barrier_callback, _abort_barrier_in_body, and run_batch_with_barrier — the full PG fire-and-forget fan-out/barrier/callback chain. Previous review findings are correctly addressed. Minor: inconsistent key access in _fairness_from_headers and a misleading abort-reason string when decrement-side failures are caught.
workers/file_processing/tasks.py Refactors _process_file_batch_core to delegate to run_batch_with_barrier when _barrier_context is set (PG path), otherwise runs stages directly (Celery path). Clean routing split with no Celery-path regressions.
workers/shared/workflow/execution/orchestration_utils.py Adds _barrier_for_transport to select a fresh PgBarrier on pg_queue transport vs. the env-selected singleton on Celery, and threads transport through create_chord_execution. Logic is correct and tested.
workers/queue_backend/barrier.py Adds BarrierContext TypedDict and a transport NotRequired field to CallbackDescriptor; extends the Barrier Protocol with a transport param. All Celery/Redis implementations accept and ignore it. Clean Protocol extension.
workers/tests/test_pg_barrier.py Adds 12 new tests covering PG fire-and-forget dispatch, reuse dedup cleanup, first-delivery/redelivery/exception paths, fairness reconstruction, callback routing, and the corrected mid-loop dispatch failure test.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant O as Orchestrator
    participant PB as PgBarrier.enqueue()
    participant PQ as PG Queue
    participant W as Worker (process_file_batch)
    participant D as _barrier_pg_decrement
    participant CB as Callback Task

    O->>PB: "create_chord_execution(transport=pg_queue)"
    PB->>PB: "UPSERT pg_barrier_state(remaining=N) + DELETE pg_batch_dedup"
    loop for each header task
        PB->>PQ: "dispatch(task + _barrier_context, backend=PG)"
    end
    PQ->>W: deliver process_file_batch(_barrier_context)
    W->>W: claim_batch: True or False(skip redelivery)
    W->>W: "_run_batch_stages() -> result"
    W->>D: _barrier_pg_decrement(result)
    D->>D: "UPDATE remaining-=1, append result"
    alt "remaining > 0"
        D-->>W: pending
    else "remaining == 0"
        D->>CB: "_fire_barrier_callback -> dispatch(backend=PG)"
        D->>D: DELETE pg_barrier_state + pg_batch_dedup
        CB-->>O: callback fires
    end
    alt exception
        W->>W: "_abort_barrier_in_body -> barrier_pg_abort + clear_execution_batches"
    end
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant O as Orchestrator
    participant PB as PgBarrier.enqueue()
    participant PQ as PG Queue
    participant W as Worker (process_file_batch)
    participant D as _barrier_pg_decrement
    participant CB as Callback Task

    O->>PB: "create_chord_execution(transport=pg_queue)"
    PB->>PB: "UPSERT pg_barrier_state(remaining=N) + DELETE pg_batch_dedup"
    loop for each header task
        PB->>PQ: "dispatch(task + _barrier_context, backend=PG)"
    end
    PQ->>W: deliver process_file_batch(_barrier_context)
    W->>W: claim_batch: True or False(skip redelivery)
    W->>W: "_run_batch_stages() -> result"
    W->>D: _barrier_pg_decrement(result)
    D->>D: "UPDATE remaining-=1, append result"
    alt "remaining > 0"
        D-->>W: pending
    else "remaining == 0"
        D->>CB: "_fire_barrier_callback -> dispatch(backend=PG)"
        D->>D: DELETE pg_barrier_state + pg_batch_dedup
        CB-->>O: callback fires
    end
    alt exception
        W->>W: "_abort_barrier_in_body -> barrier_pg_abort + clear_execution_batches"
    end
Loading

Reviews (3): Last reviewed commit: "UN-3563 fix greptile #2069: mid-loop ded..." | Re-trigger Greptile

Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py
…re + carry fairness on PG callback

Both in the gated PG path (greptile 4/5, safe to merge).

- Issue 1: PgBarrier.enqueue mid-loop dispatch-failure handler now also calls
  clear_execution_batches on the PG path. Earlier headers may have committed a
  claim_batch marker; with the barrier row deleted, their in-flight
  barrier_pg_abort is a no-op (already_aborted) and never reaches the clear
  inside it, so reclaim the markers directly here.
- Issue 2: the PG callback now carries the producer's fairness. Added
  _fairness_from_headers() to reconstruct the FairnessKey from the stored
  x-fairness-key headers and pass it to _dispatch_pg, so the callback rides the
  same org/priority as the Celery path (was always default priority).

Tests: +fairness-carried / +fairness-none-safe on _fire_barrier_callback;
extended the PG mid-loop test to assert an already-claimed marker is reclaimed.
75 barrier/dedup tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

@greptile-apps — both findings fixed (a2d7ba8c9)

Thanks for the 4/5. Both were real (in the gated PG path):

Issue 1 — dedup markers orphaned on mid-loop PG dispatch failure. Correct: with the barrier row deleted, an earlier header's in-flight barrier_pg_abort returns already_aborted and never reaches the clear_execution_batches inside it. Fixed — the mid-loop handler now calls clear_execution_batches(execution_id) directly on the PG path after _delete_barrier. + test asserting an already-claimed marker is reclaimed (not orphaned).

Issue 2 — PG callback dropped fairness headers. Correct — the callback was always dispatched at default priority, diverging from the Celery path. Went with your "extend the descriptor to carry the FairnessKey value" suggestion via a small _fairness_from_headers() that reconstructs the FairnessKey from the stored x-fairness-key headers and passes it to _dispatch_pg, so the PG callback rides the producer's org/priority. + tests for the fairness-carried and no-fairness (None-safe) paths.

75 barrier/dedup tests green; ruff + ruff-format clean; bootstrap clean under WORKER_BARRIER_BACKEND=pg. Still gated/inert by default.

@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Both fixes look solid. The implementation is clean and the tests are thorough.

Issue 1 (mid-loop dedup cleanup): The clear_execution_batches call in the mid-loop handler is correctly scoped to the is_pg branch, and test_pg_mid_loop_dispatch_failure_deletes_row_and_clears_dedup makes the right assertion: a pre-committed marker at index 0 gets reclaimed along with the barrier row. The invariant is closed.

Issue 2 (PG callback fairness): _fairness_from_headers is clean. The DEFAULT_PRIORITY fallback on pipeline_priority=payload.get("pipeline_priority", DEFAULT_PRIORITY) correctly handles headers produced before pipeline_priority was a field. Note: WorkloadType(payload["workload_type"]) is a hard lookup with no fallback — that's the right call here (a malformed stored header is a programming error, not a runtime condition), so the absence of a try/except is intentional and correct. Both test paths (fairness_carried / without_fairness_passes_none) pin the contract precisely.

75 tests, both fixes targeted. LGTM — good to merge when ready.

…lexity (17→under 15)

Extracted the per-header dispatch loop into PgBarrier._dispatch_headers — the
deeply-nested for→try/except→if/else→if (PG-vs-celery branch + mid-loop failure
teardown + PG dedup-clear) was the complexity driver. enqueue now calls the
helper; behaviour identical. radon: enqueue C(11)→B(6); ruff C901 passes.
75 barrier/dedup tests green; ruff + ruff-format clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud S3776 fixed — 90c61ce02

PgBarrier.enqueue cognitive complexity 17 → under 15. Extracted the per-header dispatch loop into PgBarrier._dispatch_headers — the deeply-nested for → try/except → if/else → if (PG-vs-celery branch + mid-loop teardown + PG dedup-clear) was the driver; that nesting was added across the last two review rounds. Behaviour identical, just relocated. radon: enqueue C(11)→B(6); ruff C901 passes at the 15 threshold; 75 barrier/dedup tests green.

…wrong reason

The pre-seeded claim_batch marker was wiped by enqueue's UPSERT block (the
reuse-reset DELETE) before the dispatch loop, so the mid-loop
clear_execution_batches deleted 0 rows — the count==0 assertion passed on the
UPSERT, not the guard under test. Now the first dispatch side-effect claims the
marker AFTER the UPSERT (simulating a fast PG consumer), so the mid-loop clear
is what removes it. Verified: with the clear disabled the marker orphans (count=1).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

@greptile-apps — "test passes for the wrong reason" fixed (9e078fb94)

Spot on. The pre-seeded claim_batch("exec-midfail", 0) marker was wiped by enqueue's UPSERT block (the reuse-reset DELETE FROM pg_batch_dedup) before the dispatch loop ran, so the mid-loop clear_execution_batches deleted 0 rows — the count == 0 assertion passed on the UPSERT, not the guard under test.

Fixed per your suggestion: the first dispatch side-effect now calls claim_batch("exec-midfail", 0) before returning its handle — inserting the marker after the UPSERT cleared stale ones (a fast PG consumer claiming its batch). The second dispatch fails, and the mid-loop clear_execution_batches is now what removes the marker.

Verified it's load-bearing: with the mid-loop clear no-op'd the marker orphans (count=1); with it enabled, count=0. 57 barrier/dedup tests green, ruff clean.

@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

The fix is exactly right and the reasoning is sound. The ordering invariant now correctly models what happens at runtime:

  1. UPSERT clears stale dedup rows (reuse-reset)
  2. PG consumer races in and claims its batch (the side-effect on the first dispatch)
  3. Loop aborts mid-flightclear_execution_batches removes the marker the consumer inserted

The old test was asserting count == 0 after a code path that never actually exercised clear_execution_batches — the UPSERT block had already wiped the row before the guard had a chance to run. The fix makes the marker insertion temporally live (post-UPSERT), so the only way count == 0 can hold is if clear_execution_batches did its job.

The "verify it's load-bearing" check (no-op the mid-loop clear → count=1 orphan) is exactly the right validation criterion — it confirms causality, not just coincidence. Clean.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit b3f25d9 into feat/UN-3445-pg-queue-integration Jun 17, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3563-pg-pipeline-2c-live-switch branch June 17, 2026 12:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.