Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3562 [FEAT] PG Queue 9e PR 2b — per-batch idempotency primitive (pg_batch_dedup + claim_batch / clear_execution_batches)#2068

Merged
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3562-pg-pipeline-2b-idempotencyZipstack/unstract:UN-3562-pg-pipeline-2b-idempotencyCopy head branch name to clipboard
Jun 17, 2026
Merged

UN-3562 [FEAT] PG Queue 9e PR 2b — per-batch idempotency primitive (pg_batch_dedup + claim_batch / clear_execution_batches)#2068
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3562-pg-pipeline-2b-idempotencyZipstack/unstract:UN-3562-pg-pipeline-2b-idempotencyCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Second slice of 9e PR 2 (the live PG execution pipeline), after 2a (#2067). This is 2b, the inert idempotency primitive — the durable per-batch dedup marker that PR 2c will wire into the at-least-once PG path.

Why

The PG queue is at-least-once: a process_file_batch task can be redelivered after a crash-before-ack, which would re-run the batch and double-decrement the barrier (process_file_batch is non-idempotent — max_retries=0). A recon of the existing per-file protection found it only partial:

  • the Redis per-file destination lock is released after the write (not a durable token, free again on redelivery);
  • WorkflowFileExecution COMPLETED skips tool re-execution, but the destination write may still be reached with the prior result;
  • FileHistory (use_file_history) is cross-execution only — doesn't cover same-execution crash-redelivery.

So a durable per-batch gate is needed. The existing per-file status remains the backstop for the partial-crash re-processing case (a re-run batch whose marker wasn't written yet skips already-done files).

Scope (2b)

  • BackendPgBatchDedup model + migration 0006: table pg_batch_dedup with a UniqueConstraint(execution_id, batch_index) (the ON CONFLICT target; its execution_id-leading index also serves the cleanup DELETE). Django-managed, extension-free, same posture as the sibling pg_queue models.
  • Workers (queue_backend/pg_barrier.py, reusing the barrier's thread-local _cursor()one PG connection per worker child):
    • claim_batch(execution_id, batch_index) -> bool — atomic INSERT … ON CONFLICT DO NOTHING RETURNING; True = first delivery (proceed + decrement), False = redelivery (skip, so the barrier decrements exactly once per batch).
    • clear_execution_batches(execution_id) -> int — barrier-teardown cleanup; the reaper's barrier-orphan sweep is the backstop for executions that never finalise.

Not in 2b

No call-site wiring. claim_batch (at batch start) + clear_execution_batches (at barrier finalise) + batch_index threading + the transport switch all land in 2c. (Same "don't thread unused params early" lesson that kept 2a clean.) Net behaviour change: NONE.

Tests / dev-test

  • +8 real-PG tests: first-claim · redelivery-rejected · distinct-batch · distinct-exec · concurrent-exactly-one-winner · clear-only-target · clear-empty-returns-0 · reclaim-after-clear.
  • Migration 0006 generated + applied to the dev DB, makemigrations --check → no drift.
  • Worker-app bootstrap clean under WORKER_BARRIER_BACKEND=pg; ruff clean.

Base

Targets the long-lived feat/UN-3445-pg-queue-integration integration branch (not main).

🤖 Generated with Claude Code

…g_batch_dedup + claim_batch / clear_execution_batches)

Second slice of 9e PR 2, after 2a (#2067). Inert idempotency primitive: the
durable per-batch dedup marker 2c wires into the at-least-once PG path.

Why: the PG queue is at-least-once, so process_file_batch can be redelivered
after a crash-before-ack → re-run batch + double-decrement the barrier
(non-idempotent, max_retries=0). Recon showed existing per-file protection is
only partial (Redis lock released after write; WFE COMPLETED skips tool re-exec
but not necessarily the destination write; FileHistory is cross-execution only),
so a durable per-batch gate is needed; per-file status stays the partial-crash
backstop.

- backend: PgBatchDedup model + migration 0006 — table pg_batch_dedup with a
  UniqueConstraint(execution_id, batch_index) (the ON CONFLICT target; its
  execution_id-leading index also serves the cleanup DELETE). Django-managed,
  extension-free, same posture as the sibling pg_queue models.
- workers (pg_barrier.py, reusing the barrier's _cursor() → one PG conn per
  worker child): claim_batch(execution_id, batch_index) -> bool (atomic
  INSERT ... ON CONFLICT DO NOTHING RETURNING; True=first/decrement,
  False=redelivery/skip) + clear_execution_batches(execution_id) -> int
  (barrier-teardown cleanup; reaper sweep is the backstop).

No call-site wiring — claim/clear + batch_index threading + transport switch
land in 2c. Inert: new table + two helpers, no callers. Net behaviour: NONE.

Tests: +8 real-PG (first-claim, redelivery-rejected, distinct-batch,
distinct-exec, concurrent-exactly-one-winner, clear-only-target,
clear-empty-zero, reclaim-after-clear). Migration applied to dev DB,
makemigrations --check clean, bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review (PR Review Toolkit)

Reviewed the per-batch idempotency primitive across 4 files using six specialized agents (code review, silent-failure, type design, test coverage, comments, simplifier). The code is well-scoped, SQL-safe (parameterized), and closely mirrors its PgBarrierState / _delete_barrier / test_pg_barrier.py siblings. No correctness defects in the production helpers. Findings below are prioritized; the one verified-inaccurate docstring (P1) is the only thing I'd treat as a should-fix before merge.

Note: a black/line-length finding was raised and dropped after verification — pre-commit uses ruff-format (line-length 90), the line is exactly 90 chars and passes; the [tool.black] entry in workers/pyproject.toml is vestigial.

Comment thread backend/pg_queue/models.py Outdated
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/tests/test_pg_batch_dedup.py Outdated
Comment thread workers/tests/test_pg_batch_dedup.py Outdated
Comment thread backend/pg_queue/models.py
Comment thread backend/pg_queue/models.py
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread backend/pg_queue/models.py Outdated
Comment thread workers/tests/test_pg_batch_dedup.py Outdated
…cstring claim, batch_index constraint, race test, nits

- P1 (verified bug): the docstrings claimed the reaper's barrier-orphan sweep
  reclaims orphaned pg_batch_dedup markers — it doesn't. sweep_expired_barriers
  DELETEs only pg_barrier_state (no cascade), so orphaned markers leak today.
  Reworded both PgBatchDedup + clear_execution_batches docstrings to state the
  leak honestly + flag the dedup-orphan sweep as intended future work.
- P4: add CheckConstraint(batch_index >= 0) (writer-proof, mirrors
  PgQueueMessage.priority) + a test that the DB rejects a negative index.
  Regenerated migration 0006 to include it.
- P6: claim_batch docstring no longer says it decrements; defers the single
  decrement to the caller (the function only inserts the marker).
- P7: generalized "partial per-file protection" → "not fully idempotent on
  redelivery" so the rationale can't rot.
- P5: documented created_at as observability-only (future age-based sweep).
- P2: REQUIRE_PG_TESTS env → skip becomes fail, so the idempotency primitive
  can't ship untested-green in CI where PG is expected.
- P3: strengthened the race test — pre-build N=8 conns in the main thread,
  align claims with threading.Barrier, loop 5 trials (forces the contended
  ON CONFLICT path instead of a serial fast-path).
- P8: hoisted import os to module top.

35 pg_barrier + 9 dedup tests green; migration applied to dev DB,
makemigrations --check clean; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round addressed — 273ccfc75

Thanks @muhammad-ali-e. All 9 threads handled — the standout is P1, a verified bug (not just a doc nit).

P1 (verified bug) — reaper backstop claim was false. I'd written that the reaper's barrier-orphan sweep reclaims orphaned pg_batch_dedup markers. It doesn't: sweep_expired_barriers (reaper.py:131) DELETEs only pg_barrier_state, there's no cascade, so orphaned markers leak today. Both docstrings (PgBatchDedup + clear_execution_batches) now state the leak honestly and flag a dedup-orphan sweep as intended future work.

Fixed

  • P4 — added CheckConstraint(batch_index >= 0) (writer-proof, mirrors PgQueueMessage.priority) + regenerated migration 0006 + a test that the DB rejects a negative index.
  • P3 — strengthened the race test: pre-build N=8 connections in the main thread, align claims with a threading.Barrier, loop 5 trials → actually forces the contended ON CONFLICT path (and catches a non-atomic check-then-insert).
  • P2REQUIRE_PG_TESTS env flips the skip to a pytest.fail, so the idempotency primitive can't ship untested-green where PG is expected.
  • P6claim_batch docstring no longer claims it decrements; defers the single decrement to the caller.
  • P7 — generalized "partial per-file protection" → "not fully idempotent on redelivery" (rot-proof).
  • P5 — documented created_at as observability-only (future age-based sweep would index it).
  • P8 — hoisted import os to module top.

Verification: 35 pg_barrier + 9 pg_batch_dedup tests green; migration 0006 regenerated + applied to the dev DB, makemigrations --check clean; ruff clean. Still inert — net behaviour change on the default path: NONE.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 17, 2026 09:23
@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces the PgBatchDedup Django model + migration (pg_batch_dedup table) and the two worker-side primitives claim_batch / clear_execution_batches that will gate per-batch idempotency in the at-least-once PG pipeline. The primitives themselves are inert in 2b — no call-site wiring — and are backed by 8 real-PG tests covering concurrent exactly-one-winner semantics.

  • claim_batch atomically inserts a (execution_id, batch_index) row with INSERT … ON CONFLICT DO NOTHING RETURNING; True = first delivery (proceed), False = redelivery (skip).
  • clear_execution_batches deletes all markers for an execution at barrier teardown; orphaned markers (from executions that never finalise) are acknowledged as a known gap and deferred.
  • The UniqueConstraint(execution_id, batch_index) doubles as both the ON CONFLICT target and the leading-column index for per-execution cleanup DELETEs; a CheckConstraint(batch_index >= 0) enforces the domain invariant at the DB layer.

Confidence Score: 3/5

Safe to merge as an inert primitive, but the enqueue path needs a clear_execution_batches call before 2c is integrated.

The new table, model, migration, and the two worker functions are all individually correct, and the concurrent-winner test exercises the core ON CONFLICT DO NOTHING guarantee well. The concern is in PgBarrier.enqueue(), which already lives in this file: it resets pg_barrier_state via UPSERT when an execution_id is reused (the comment at line 237 explicitly names this scenario), but it doesn't clear pg_batch_dedup. When 2c wires in claim_batch, any re-enqueue of a failed or expired execution with the same ID would leave all prior dedup markers in place — every batch silently gets False, nothing is processed, and the barrier hangs until expiry. The fix is straightforward (add clear_execution_batches to the same _cursor() block as the UPSERT), but it needs to land before 2c ships.

workers/queue_backend/pg_barrier.py — the enqueue() method's UPSERT block and the barrier_pg_abort teardown path both need a clear_execution_batches call alongside their existing pg_barrier_state cleanup.

Important Files Changed

Filename Overview
workers/queue_backend/pg_barrier.py Adds claim_batch and clear_execution_batches using the existing thread-local _cursor() pattern. The new primitives are individually correct, but PgBarrier.enqueue() — which already lives in this file — resets pg_barrier_state on execution_id reuse without clearing the new pg_batch_dedup rows; when 2c wires in claim_batch, retried executions will silently skip all batches.
backend/pg_queue/models.py Adds PgBatchDedup model with a UniqueConstraint(execution_id, batch_index) as the ON CONFLICT target and a CheckConstraint(batch_index >= 0). Schema is clean, extension-free, and consistent with sibling models.
backend/pg_queue/migrations/0006_pgbatchdedup_and_more.py Auto-generated migration creating pg_batch_dedup with the correct constraints; depends on 0005 and mirrors the model exactly.
workers/tests/test_pg_batch_dedup.py Good test coverage: first-claim, redelivery rejection, cross-batch/cross-execution isolation, negative index DB rejection, concurrent winner test (with threading.Barrier), clear-only-target, clear-empty, and reclaim-after-clear. The concurrency test is well-structured; the skip/fail logic for REQUIRE_PG_TESTS is a nice CI safeguard.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Worker as Worker (process_file_batch)
    participant PG as Postgres (pg_batch_dedup)
    participant Barrier as pg_barrier_state

    Note over Worker,Barrier: First delivery (2c wired path)
    Worker->>PG: claim_batch(exec_id, batch_idx) INSERT ON CONFLICT DO NOTHING RETURNING
    PG-->>Worker: row returned → True (first delivery)
    Worker->>Worker: Process batch
    Worker->>Barrier: Decrement remaining

    Note over Worker,Barrier: Redelivery (crash-before-ack)
    Worker->>PG: claim_batch(exec_id, batch_idx) INSERT ON CONFLICT DO NOTHING RETURNING
    PG-->>Worker: no row returned → False (redelivery)
    Worker->>Worker: Skip (barrier already decremented once)

    Note over Worker,PG: Barrier finalise (remaining → 0)
    Barrier->>PG: "clear_execution_batches(exec_id) DELETE WHERE execution_id = ?"
    PG-->>Barrier: rowcount (markers removed)

    Note over Worker,PG: Execution retry with same execution_id (gap)
    Worker->>Barrier: PgBarrier.enqueue() UPSERT pg_barrier_state
    Note right of Barrier: pg_batch_dedup NOT cleared — stale markers remain
    Worker->>PG: claim_batch(exec_id, batch_idx)
    PG-->>Worker: False (stale marker) — batch silently skipped
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Worker as Worker (process_file_batch)
    participant PG as Postgres (pg_batch_dedup)
    participant Barrier as pg_barrier_state

    Note over Worker,Barrier: First delivery (2c wired path)
    Worker->>PG: claim_batch(exec_id, batch_idx) INSERT ON CONFLICT DO NOTHING RETURNING
    PG-->>Worker: row returned → True (first delivery)
    Worker->>Worker: Process batch
    Worker->>Barrier: Decrement remaining

    Note over Worker,Barrier: Redelivery (crash-before-ack)
    Worker->>PG: claim_batch(exec_id, batch_idx) INSERT ON CONFLICT DO NOTHING RETURNING
    PG-->>Worker: no row returned → False (redelivery)
    Worker->>Worker: Skip (barrier already decremented once)

    Note over Worker,PG: Barrier finalise (remaining → 0)
    Barrier->>PG: "clear_execution_batches(exec_id) DELETE WHERE execution_id = ?"
    PG-->>Barrier: rowcount (markers removed)

    Note over Worker,PG: Execution retry with same execution_id (gap)
    Worker->>Barrier: PgBarrier.enqueue() UPSERT pg_barrier_state
    Note right of Barrier: pg_batch_dedup NOT cleared — stale markers remain
    Worker->>PG: claim_batch(exec_id, batch_idx)
    PG-->>Worker: False (stale marker) — batch silently skipped
Loading

Comments Outside Diff (2)

  1. workers/queue_backend/pg_barrier.py, line 236-251 (link)

    P1 Stale dedup markers silently block retried executions

    enqueue() resets pg_barrier_state for a reused execution_id via UPSERT (the comment at line 237 explicitly acknowledges this reuse scenario), but it does NOT call clear_execution_batches(execution_id). When 2c wires in claim_batch, any retry of a failed or expired execution that reuses the same execution_id will find the old markers still present — every claim_batch call returns False, all batches are silently skipped, the barrier counter never reaches 0, and the retry hangs until expires_at. The fix is to call clear_execution_batches(execution_id) inside the same _cursor() block as the UPSERT so both resets are atomic.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: workers/queue_backend/pg_barrier.py
    Line: 236-251
    
    Comment:
    **Stale dedup markers silently block retried executions**
    
    `enqueue()` resets `pg_barrier_state` for a reused `execution_id` via UPSERT (the comment at line 237 explicitly acknowledges this reuse scenario), but it does NOT call `clear_execution_batches(execution_id)`. When 2c wires in `claim_batch`, any retry of a failed or expired execution that reuses the same `execution_id` will find the old markers still present — every `claim_batch` call returns `False`, all batches are silently skipped, the barrier counter never reaches 0, and the retry hangs until `expires_at`. The fix is to call `clear_execution_batches(execution_id)` inside the same `_cursor()` block as the UPSERT so both resets are atomic.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Claude Code

  2. workers/queue_backend/pg_barrier.py, line 499-507 (link)

    P2 barrier_pg_abort orphans dedup markers, compounding the enqueue-reuse gap

    barrier_pg_abort deletes pg_barrier_state (the claim-and-teardown CTE) but has no corresponding clear_execution_batches call. This is documented as future work in the model docstring, but it directly feeds the P1 gap on the enqueue path: an aborted execution leaves its pg_batch_dedup rows in place, and if enqueue() doesn't clear them on the next attempt, claim_batch will block the retry. Clearing them here (alongside the pg_barrier_state delete) would be the belt-and-suspenders fix and close the gap independently of the enqueue-side fix.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: workers/queue_backend/pg_barrier.py
    Line: 499-507
    
    Comment:
    **`barrier_pg_abort` orphans dedup markers, compounding the enqueue-reuse gap**
    
    `barrier_pg_abort` deletes `pg_barrier_state` (the claim-and-teardown CTE) but has no corresponding `clear_execution_batches` call. This is documented as future work in the model docstring, but it directly feeds the P1 gap on the enqueue path: an aborted execution leaves its `pg_batch_dedup` rows in place, and if `enqueue()` doesn't clear them on the next attempt, `claim_batch` will block the retry. Clearing them here (alongside the `pg_barrier_state` delete) would be the belt-and-suspenders fix and close the gap independently of the enqueue-side fix.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Claude Code

Fix All in Claude Code

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
workers/queue_backend/pg_barrier.py:236-251
**Stale dedup markers silently block retried executions**

`enqueue()` resets `pg_barrier_state` for a reused `execution_id` via UPSERT (the comment at line 237 explicitly acknowledges this reuse scenario), but it does NOT call `clear_execution_batches(execution_id)`. When 2c wires in `claim_batch`, any retry of a failed or expired execution that reuses the same `execution_id` will find the old markers still present — every `claim_batch` call returns `False`, all batches are silently skipped, the barrier counter never reaches 0, and the retry hangs until `expires_at`. The fix is to call `clear_execution_batches(execution_id)` inside the same `_cursor()` block as the UPSERT so both resets are atomic.

### Issue 2 of 2
workers/queue_backend/pg_barrier.py:499-507
**`barrier_pg_abort` orphans dedup markers, compounding the enqueue-reuse gap**

`barrier_pg_abort` deletes `pg_barrier_state` (the claim-and-teardown CTE) but has no corresponding `clear_execution_batches` call. This is documented as future work in the model docstring, but it directly feeds the P1 gap on the enqueue path: an aborted execution leaves its `pg_batch_dedup` rows in place, and if `enqueue()` doesn't clear them on the next attempt, `claim_batch` will block the retry. Clearing them here (alongside the `pg_barrier_state` delete) would be the belt-and-suspenders fix and close the gap independently of the enqueue-side fix.

Reviews (1): Last reviewed commit: "UN-3562 address review (muhammad-ali-e P..." | Re-trigger Greptile

@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

@greptile-apps — thanks, sharp catch (tracked for 2c)

The 3/5 finding is correct and verified: PgBarrier.enqueue's UPSERT resets pg_barrier_state on execution_id reuse (the line-237 "clears leftover state from a prior run with this id" case) but does not clear pg_batch_dedup. Once claim_batch is wired, a re-enqueued/retried execution with the same id would hit stale markers → every batch gets False → nothing processed → barrier hangs to expiry. The barrier_pg_abort teardown has the same shape.

Where it lands: 2c, not 2b. The gap can only manifest once claim_batch is wired — until then pg_batch_dedup is always empty, so there's nothing to clear and no behaviour to fix. Adding the clear_execution_batches call to enqueue now would be wiring ahead of need (the same reason 2b deliberately ships the primitive with no call sites, and 2a deferred its threading to 2c). So it lands in 2c, atomically with the claim_batch wiring:

  • clear_execution_batches(execution_id) in the same _cursor() block as the enqueue UPSERT (atomic reset), and
  • the same call in the barrier_pg_abort teardown path (alongside the pg_barrier_state DELETE).

Captured as a must-do in the 2c scope with a same-execution_id-re-run dev-test to verify. 2b stays inert (net behaviour change on the default path: NONE). Your "before 2c ships" is satisfied by "first thing in 2c."

@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Acknowledged — the sequencing logic holds. The gap is latent-only in 2b (empty table = no stale markers to hit), so pulling the fix forward would break the deliberate "inert primitive, no call sites" invariant that makes 2b reviewable in isolation.

The 2c landing plan is exactly right:

  • clear_execution_batches in the same _cursor() block as the enqueue UPSERT makes the reset atomic with the pg_barrier_state overwrite — no window where new claims land against old markers.
  • Same call in barrier_pg_abort keeps the teardown paths symmetric.

The same-execution_id-re-run dev-test will be the key verification — make sure it covers the abort path too, not just the clean-enqueue path. Happy for 2b to merge on this basis.

@muhammad-ali-e muhammad-ali-e merged commit 5f2d18d into feat/UN-3445-pg-queue-integration Jun 17, 2026
5 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3562-pg-pipeline-2b-idempotency branch June 17, 2026 09:34
muhammad-ali-e added a commit that referenced this pull request Jun 17, 2026
…fire-and-forget) (#2069)

* UN-3563 [FEAT] PG Queue 9e PR 2c — live PG fan-out/barrier/callback (fire-and-forget)

Wires the coupled pipeline's fan-out → barrier → callback onto the PG queue for
a transport=="pg_queue" execution. Gated: resolve_transport() still returns
celery (PR3 Flipt flips it), so the whole PG branch is present-but-unreachable —
default path byte-identical. Orchestrator task (async_execute_bin) stays on
Celery (hybrid); routing it onto PG is a 2d follow-up.

- barrier.py: Barrier Protocol + CeleryChordBarrier/RedisDecrBarrier accept (and
  ignore) a `transport` param; CallbackDescriptor gains an optional `backend`.
- orchestration_utils._barrier_for_transport: pg_queue → fresh PgBarrier()
  (bypasses the WORKER_BARRIER_BACKEND singleton), else the singleton.
- pg_barrier.PgBarrier.enqueue(transport): pg_queue → fire-and-forget mode —
  _dispatch_header_pg sends each header via dispatch(backend=PG) with an injected
  _barrier_context {execution_id, batch_index, callback_descriptor}, no .link;
  descriptor marked backend=pg_queue; UPSERT block also clears pg_batch_dedup
  (greptile #2068 reuse-reset). _fire_barrier_callback self-chains the callback
  onto PG when backend==pg_queue. clear_execution_batches at finalise + abort.
  run_batch_with_barrier(): claim → work → in-body _barrier_pg_decrement;
  redelivery skips; exception → barrier_pg_abort.
- file_processing.process_file_batch(_barrier_context=None): core routes None →
  _run_batch_stages (celery chord path), else → run_batch_with_barrier.
- general/api fan-outs thread transport into create_chord_execution.

Tests: +8 PgBarrier fire-and-forget + 2 orchestration routing + 2 process_file_batch
routing. Each test file green alone; ruff clean. End-to-end forced-pg dev-test
pending before PR.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix SonarCloud S1172: drop unused task_instance from _run_batch_stages

The extracted _run_batch_stages never uses task_instance — its only purpose
(deriving celery_task_id) happens in _process_file_batch_core before the call.
Removed the param + updated both call sites. _process_file_batch_core keeps
task_instance (it reads .request.id). Routing test mocks with *a, unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 address review (muhammad-ali-e, 15): strand-on-failure hardening + typing/dedup/docs/tests

Decision (with reviewer): reaper-as-safety-net for the un-catchable strand
windows + fix what's catchable + document + gate on PR3.

Failure handling:
- [#69 Critical] run_batch_with_barrier wraps BOTH work + decrement in the abort:
  a decrement-side failure (guard / DB / last-batch callback dispatch) tears the
  barrier down in-body instead of stranding to expiry.
- [#79] extracted _abort_barrier_in_body — logs when the teardown itself fails
  (was silently suppressed under a misleading "torn down" message).
- [#74/#81] documented the two un-catchable strand windows (hard-crash-during-work,
  post-commit callback-dispatch-fail) as a HARD reaper dependency for PR3.
- [#86] finalise cleanup split into independent try/excepts with distinct logs.

Typing / clarity:
- [#1] BarrierContext(TypedDict) for _barrier_context (header fan-out, run_batch_with_barrier,
  process_file_batch).
- [#3] renamed CallbackDescriptor "backend" -> "transport" (WorkflowTransport value;
  avoids the QueueBackend "pg" collision).
- [#27] is_pg_transport() predicate in core; used in orchestration_utils + pg_barrier.
- [#20] extracted _dispatch_pg() — single home for cycle-avoiding local import + backend=PG.
- [#35] normalize_transport() at the general worker entry (parity w/ api/scheduler).
- [#94] log when a header has no queue option.
- [#9/#13] fixed born-stale comment + kwargs-not-args docstring.

Tests (+#37/#41): last-batch self-chains callback to PG + cleans up barrier/dedup;
decrement-failure aborts; PG-branch mid-loop dispatch-failure deletes row; header
args/queue/pre-existing-kwargs preservation. 137 barrier/dedup/routing tests green;
bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix run_batch_with_barrier strand-window doc inconsistency (review)

The second "NOT catchable" bullet conflated two different things: it described
the in-body catchable abort ("the abort here removes the row") and a *software*
callback-dispatch failure — but that failure is already caught + torn down by
step 3's wrap (paragraph 1), so it doesn't belong under the un-catchable
heading, and on the PG path _fire_barrier_callback IS the enqueue so "committed
but before the enqueue" couldn't both hold.

Rewrote the bullet to the genuinely un-catchable window: a hard crash BETWEEN
the decrement committing (remaining→0) and the callback enqueue completing —
decrement committed (redelivery blocked by the marker), process gone before the
callback enqueues or any abort runs, row survives to expiry, reaper-only
recovery. Explicitly notes a software dispatch failure is the catchable case.
Keeps this list an accurate spec for the PR-3 reaper-recovery dependency.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 address greptile (#2069, 2): clear dedup on mid-loop PG failure + carry fairness on PG callback

Both in the gated PG path (greptile 4/5, safe to merge).

- Issue 1: PgBarrier.enqueue mid-loop dispatch-failure handler now also calls
  clear_execution_batches on the PG path. Earlier headers may have committed a
  claim_batch marker; with the barrier row deleted, their in-flight
  barrier_pg_abort is a no-op (already_aborted) and never reaches the clear
  inside it, so reclaim the markers directly here.
- Issue 2: the PG callback now carries the producer's fairness. Added
  _fairness_from_headers() to reconstruct the FairnessKey from the stored
  x-fairness-key headers and pass it to _dispatch_pg, so the callback rides the
  same org/priority as the Celery path (was always default priority).

Tests: +fairness-carried / +fairness-none-safe on _fire_barrier_callback;
extended the PG mid-loop test to assert an already-claimed marker is reclaimed.
75 barrier/dedup tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix SonarCloud S3776: reduce PgBarrier.enqueue cognitive complexity (17→under 15)

Extracted the per-header dispatch loop into PgBarrier._dispatch_headers — the
deeply-nested for→try/except→if/else→if (PG-vs-celery branch + mid-loop failure
teardown + PG dedup-clear) was the complexity driver. enqueue now calls the
helper; behaviour identical. radon: enqueue C(11)→B(6); ruff C901 passes.
75 barrier/dedup tests green; ruff + ruff-format clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix greptile #2069: mid-loop dedup-clear test passed for the wrong reason

The pre-seeded claim_batch marker was wiped by enqueue's UPSERT block (the
reuse-reset DELETE) before the dispatch loop, so the mid-loop
clear_execution_batches deleted 0 rows — the count==0 assertion passed on the
UPSERT, not the guard under test. Now the first dispatch side-effect claims the
marker AFTER the UPSERT (simulating a fast PG consumer), so the mid-loop clear
is what removes it. Verified: with the clear disabled the marker orphans (count=1).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.