Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3561 [FEAT] PG Queue 9e PR 2a — live-PG-pipeline inert foundation (dispatch backend override + barrier decrement core)#2067

Merged
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3561-pg-pipeline-2a-foundationZipstack/unstract:UN-3561-pg-pipeline-2a-foundationCopy head branch name to clipboard
Jun 17, 2026
Merged

UN-3561 [FEAT] PG Queue 9e PR 2a — live-PG-pipeline inert foundation (dispatch backend override + barrier decrement core)#2067
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3561-pg-pipeline-2a-foundationZipstack/unstract:UN-3561-pg-pipeline-2a-foundationCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

First slice of 9e PR 2 (the live PG execution pipeline), split into 2a → 2b → 2c. This is 2a, the inert foundation — the two seams the live switch (2c) consumes, each with zero behaviour change on the default path.

1. dispatch(backend=...) — per-call transport override

dispatch() gains an optional backend: QueueBackend | None = None:

  • None (the default, and every call site today) → transport is the env allow-list decision via select_backend()byte-identical to today.
  • Set → it wins over the allow-list. This is the seam the execution-level PG pipeline (2c) uses to route a whole execution's header/callback dispatches onto PG without opting their task names into WORKER_PG_QUEUE_ENABLED_TASKS. The allow-list is for leaf tasks; the coupled pipeline's migration unit is the execution (carried in the payload — see routing.py).

2. Extract _barrier_pg_decrement(...) core

The decrement logic moves out of the @worker_task barrier_pg_decr_and_check into a plain _barrier_pg_decrement(...) function; the @worker_task becomes a thin delegator. The plain core is what 2c calls in-body on the PG-consumed path — a PG-consumed task fires no Celery .link, so the decrement must run in-body (fire-and-forget self-chaining). Behaviour for the existing Celery .link path is unchanged (the wrapper forwards verbatim).

Why it's inert

  • Default barrier backend is chordCelery executions never import the PgBarrier module, so the extraction is invisible to them.
  • No call site passes backend= → the Celery dispatch path is byte-identical.
  • Net behaviour change: NONE. Transport threading into the fan-out fns + the live transport=="pg_queue" switch land in 2c (threading an unused param earlier just trips S1172); per-batch idempotency lands in 2b.

Tests

  • dispatch backend-override (3): PG-forces-PG-without-allow-list, Celery-forces-Celery-despite-allow-list, default-None-preserves-allow-list.
  • decrement-core extraction (3): core is a plain callable (not a Celery task), runs the decrement in-body against real PG, and the @worker_task delegates verbatim.
  • Full test_pg_barrier (33) + test_dispatch_pg (10) + barrier/routing (53) suites green; ruff clean.
  • Worker-app bootstrap verified clean under WORKER_BARRIER_BACKEND=pg: both barrier tasks registered under canonical names, get_barrier()PgBarrier.

Base

Targets the long-lived feat/UN-3445-pg-queue-integration integration branch (not main).

🤖 Generated with Claude Code

…(dispatch backend override + barrier decrement core)

First slice of 9e PR 2 (the live PG execution pipeline), split 2a/2b/2c.
2a is the inert foundation: the two seams the live switch (2c) consumes,
each with zero behaviour change on the default path.

- dispatch(backend=...): per-call transport override. None (default, every
  call site today) keeps the env allow-list decision via select_backend —
  byte-identical. When set it wins over the allow-list, so 2c can route a
  whole execution's header/callback onto PG without opting their task names
  into WORKER_PG_QUEUE_ENABLED_TASKS (allow-list is for leaf tasks; the
  pipeline's migration unit is the execution).
- Extract _barrier_pg_decrement(...) plain core out of the @worker_task
  barrier_pg_decr_and_check (now a thin delegator). 2c calls the core in-body
  on the PG-consumed path (a PG-consumed task fires no Celery .link, so the
  decrement runs in-body — fire-and-forget self-chaining).

Inert by construction: default barrier backend is chord (Celery executions
never import the PgBarrier module), and no call site passes backend=. Net
behaviour change: NONE. Transport threading + live switch land in 2c;
per-batch idempotency in 2b.

Tests: dispatch backend-override (3) + decrement-core extraction (3, incl.
real-PG in-body decrement + verbatim delegation). pg_barrier/dispatch/
barrier/routing suites green; ruff clean; worker-app bootstrap clean under
WORKER_BARRIER_BACKEND=pg (both barrier tasks registered, get_barrier→PgBarrier).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 351f1f70-cdb7-4197-9da0-2523af36edf7

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3561-pg-pipeline-2a-foundation

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review Toolkit — automated review (6 agents)

Reviewed by Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, and Code Simplifier against base feat/UN-3445-pg-queue-integration.

Verdict: no blocking issues. The _barrier_pg_decrement extraction is byte-for-byte faithful (verified via git show), the Celery wrapper delegates verbatim with max_retries=0 preserved, and the backend override is correctly inert with no live caller. Code Reviewer and Code Simplifier found nothing to change.

8 inline findings below — all MEDIUM/LOW (nice-to-have before the 2c follow-up lands a real caller). Priority order:

  1. MEDIUM_barrier_pg_decrement's "own committed transaction / no retry" contract is doc-only; a future in-body caller can silently corrupt the count. (pg_barrier.py:268)
  2. MEDIUM — override + fairness/priority interaction is untested, yet that's the override's whole reason to exist. (test_dispatch_pg.py:124)
  3. MEDIUM — delegation test is a mock-tautology; strengthen to a DB-backed equivalence check. (test_pg_barrier.py:353)
  4. MEDIUM — docstring cross-ref mismatch + term overload: "carried in the payload" vs routing.py's ExecutionContext. (dispatch.py:95)
  5. LOW — comment inaccuracy: "one call = one _cursor() txn" (actually up to two). (pg_barrier.py:268)
  6. LOW — type: backend: QueueBackend | None overloads None as a third AUTO state. (dispatch.py:80)
  7. LOW — type: untyped dict[str, Any] return vs the typed CallbackDescriptor input. (pg_barrier.py:249)
  8. LOW — rot-prone comment describing a not-yet-existent 2c mechanism. (pg_barrier.py:266)

Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py Outdated
Comment thread workers/queue_backend/pg_barrier.py
Comment thread workers/queue_backend/dispatch.py Outdated
Comment thread workers/queue_backend/dispatch.py
Comment thread workers/tests/test_dispatch_pg.py
Comment thread workers/tests/test_pg_barrier.py Outdated
Comment thread workers/tests/test_pg_barrier.py
…esolve_backend helper, comment/test fixes

- pg_barrier: enforce the "own committed transaction" decrement contract
  loudly — _barrier_pg_decrement raises at entry if the shared connection is
  mid-transaction (was prose-only; the 2c in-body caller is the real risk).
  Safe for existing paths: Celery .link enters idle, tests use autocommit
  conns → always idle. Fix the inaccurate "one call = one _cursor() txn"
  parenthetical (the delete paths open a 2nd txn) and drop the rot-prone
  "(fire-and-forget self-chaining)" jargon.
- routing: extract resolve_backend(task_name, override) — the override-wins-
  else-allow-list precedence now lives in one self-documenting place (2c
  reuses it); dispatch() calls it instead of inlining the None-means-auto rule.
- dispatch: reword the backend= docstring — avoid the "payload" term collision
  (local to_payload var) and the stale routing.py cross-ref; point at the live
  carrier WorkflowContextData.transport + 9e-design.md.
- tests: +fairness-reaches-row on the backend= override path; +real-row wrapper
  test that catches core-param drift (the mocked delegation test couldn't);
  +open-transaction guard test. Kept the mocked test as the keyword-forwarding pin.

Deferred (LOW, reviewer-aligned): BarrierDecrementResult TypedDict union — lands
in 2c when the in-body caller actually branches on the status.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round addressed — e84df69c4

Thanks @muhammad-ali-e. All 8 threads handled — 6 fixed, 1 deferred (reviewer-aligned), 1 positive/noted.

Fixed

  • [MED] in-transaction enforcement (pg_barrier:268a) — the "own committed transaction" contract is now enforced loudly: _barrier_pg_decrement raises at entry if the shared connection is mid-transaction. Safe for every existing path (Celery .link enters idle; tests use autocommit conns) — trips only on the 2c misuse. + test_open_transaction_on_shared_conn_raises.
  • [LOW] comment accuracy (pg_barrier:268b) — scoped the "one _cursor() txn" parenthetical to the decrement UPDATE (the delete paths open a 2nd).
  • [LOW] comment rot (pg_barrier:266) — dropped "(fire-and-forget self-chaining)"; softened to clearly-future framing.
  • [MED] comment accuracy (dispatch:95) — reworded to avoid the payload term collision + the stale routing.py cross-ref; points at the live carrier WorkflowContextData.transport + 9e-design.md. (Note: the literal "ExecutionContext" suggestion would itself be inaccurate under the payload-carry design — see the thread.)
  • [LOW] type design (dispatch:80) — extracted resolve_backend(task_name, override) into routing.py; the None-means-auto precedence now lives in one reusable place (2c shares it).
  • [MED] test gap (test_dispatch:124) — test_override_path_carries_fairness_to_row: asserts org/priority reach the row on the backend= path.
  • [MED] test quality (test_pg_barrier:353) — added a real-row wrapper test that catches core-param drift (core not mocked); kept the mocked test as the explicit keyword-forwarding pin.

Deferred (your call, LOW)

  • BarrierDecrementResult(TypedDict) union (pg_barrier:250) → 2c, where the in-body caller actually branches on status and the shape firms up.

Verification: test_pg_barrier + test_dispatch_pg + test_routing + test_barrier_backend_selection = 76 green; ruff clean; worker-app bootstrap clean under WORKER_BARRIER_BACKEND=pg (both barrier tasks registered, get_barrier()PgBarrier, resolve_backend precedence verified). Still inert — net behaviour change on the default path: NONE.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 17, 2026 07:37
@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR is the first slice of the live PG execution pipeline (9e PR 2a), introducing two inert seams consumed by the upcoming PR 2c — a per-call backend= override on dispatch() and a plain _barrier_pg_decrement core extracted from the @worker_task wrapper. No existing call site passes backend=, so the default path is byte-identical to the previous code.

  • dispatch() backend override: a new optional backend: QueueBackend | None = None parameter routes a task to a specific transport regardless of the env allow-list; resolve_backend() in routing.py encapsulates the override-wins-else-allow-list precedence in a single, testable function.
  • _barrier_pg_decrement extraction: moves the atomic decrement logic out of the @worker_task into a plain callable so the PG-consumed pipeline path (2c) can call it in-body (PG-consumed tasks fire no .link). A new transaction-status guard raises loudly if entered mid-transaction, making the "own committed transaction" contract enforced at runtime rather than only in prose.

Confidence Score: 5/5

Safe to merge — the default path is byte-identical to before, and the two new seams are thoroughly guarded and tested.

Both changes are structurally inert: every existing dispatch() call passes backend=None, routing through the unchanged select_backend allow-list path; the _barrier_pg_decrement extraction preserves all logic verbatim and adds a runtime guard that only rejects callers already in an open transaction. The transaction-status guard is a net safety improvement over the previous design.

No files require special attention.

Important Files Changed

Filename Overview
workers/queue_backend/dispatch.py Adds optional `backend: QueueBackend
workers/queue_backend/pg_barrier.py Extracts decrement logic into _barrier_pg_decrement plain function; barrier_pg_decr_and_check becomes a thin @worker_task delegator. Adds psycopg2.extensions import and an explicit transaction-status guard that raises RuntimeError if entered mid-transaction, enforcing the "own committed transaction" contract loudly. All existing logic preserved exactly.
workers/queue_backend/routing.py Adds resolve_backend(task_name, override) — one-liner that returns override when set, else defers to select_backend. Scaffold-posture docstring updated to name the correct seam function.
workers/tests/test_dispatch_pg.py Adds TestDispatchBackendOverride with 4 tests: PG override forces PG, Celery override forces Celery despite allow-list, None preserves allow-list, fairness plumbing carried on the override path. Good coverage of new seam.
workers/tests/test_pg_barrier.py Adds TestDecrementCoreExtraction with 5 tests: core is not a Celery task, core decrements against real DB, wrapper produces same decrement as core (no-drift guard), wrapper forwards kwargs verbatim, open-transaction guard raises before touching the DB.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Caller
    participant dispatch
    participant resolve_backend
    participant select_backend
    participant _enqueue_pg
    participant Celery

    Caller->>dispatch: "dispatch(task_name, backend=None)"
    dispatch->>resolve_backend: resolve_backend(task_name, None)
    resolve_backend->>select_backend: select_backend(task_name)
    select_backend-->>resolve_backend: QueueBackend.CELERY (default)
    resolve_backend-->>dispatch: QueueBackend.CELERY
    dispatch->>Celery: send_task(...) [unchanged path]

    Note over Caller,Celery: 2c pipeline path (future)
    Caller->>dispatch: "dispatch(task_name, backend=QueueBackend.PG)"
    dispatch->>resolve_backend: resolve_backend(task_name, QueueBackend.PG)
    resolve_backend-->>dispatch: QueueBackend.PG (override wins)
    dispatch->>_enqueue_pg: _enqueue_pg(...)
    _enqueue_pg-->>Caller: PgDispatchHandle

    Note over Caller,Celery: barrier decrement paths
    Celery->>barrier_pg_decr_and_check: .link callback
    barrier_pg_decr_and_check->>_barrier_pg_decrement: delegate (guard: idle conn)
    _barrier_pg_decrement-->>barrier_pg_decr_and_check: result

    Note over Caller,Celery: 2c in-body path (future)
    Caller->>_barrier_pg_decrement: direct call (guard: idle conn)
    _barrier_pg_decrement-->>Caller: result
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Caller
    participant dispatch
    participant resolve_backend
    participant select_backend
    participant _enqueue_pg
    participant Celery

    Caller->>dispatch: "dispatch(task_name, backend=None)"
    dispatch->>resolve_backend: resolve_backend(task_name, None)
    resolve_backend->>select_backend: select_backend(task_name)
    select_backend-->>resolve_backend: QueueBackend.CELERY (default)
    resolve_backend-->>dispatch: QueueBackend.CELERY
    dispatch->>Celery: send_task(...) [unchanged path]

    Note over Caller,Celery: 2c pipeline path (future)
    Caller->>dispatch: "dispatch(task_name, backend=QueueBackend.PG)"
    dispatch->>resolve_backend: resolve_backend(task_name, QueueBackend.PG)
    resolve_backend-->>dispatch: QueueBackend.PG (override wins)
    dispatch->>_enqueue_pg: _enqueue_pg(...)
    _enqueue_pg-->>Caller: PgDispatchHandle

    Note over Caller,Celery: barrier decrement paths
    Celery->>barrier_pg_decr_and_check: .link callback
    barrier_pg_decr_and_check->>_barrier_pg_decrement: delegate (guard: idle conn)
    _barrier_pg_decrement-->>barrier_pg_decr_and_check: result

    Note over Caller,Celery: 2c in-body path (future)
    Caller->>_barrier_pg_decrement: direct call (guard: idle conn)
    _barrier_pg_decrement-->>Caller: result
Loading

Reviews (3): Last reviewed commit: "UN-3561 address greptile (3× P2): docstr..." | Re-trigger Greptile

…ssion note

All doc/comment-only, no logic change. greptile gave 4/5 "safe to merge"; these
are the staleness its findings flagged, introduced by the resolve_backend
extraction in the prior review round:
- dispatch module docstring + routing "Scaffold posture" now name resolve_backend
  (wrapping select_backend) as the routing seam, not select_backend alone.
- Note at the _pg_routing_logged log-once site that it's keyed on task name only,
  so an override-then-allow-list cutover won't re-announce (benign: override =
  pipeline headers vs allow-list = leaf tasks, no overlap expected; the allow-list
  config is still announced by _log_allow_list_once).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

@greptile-apps round addressed — 345933ee1

Thanks for the 4/5 + the flowchart. All 3 "Comments Outside Diff" (P2) fixed — each was staleness introduced by the resolve_backend extraction in the previous review round, so good catches:

  1. dispatch.py docstring (L3) — now reads "Routes each task to its transport via resolve_backend (which applies a per-call backend override, else defers to select_backend)", and the PG bullet mentions the backend= override path.
  2. routing.py "Scaffold posture" (L36-40) — the seam sentence now names resolve_backend() (wrapping the select_backend() allow-list with the per-call override) as where real PG dispatch lands.
  3. _pg_routing_logged suppression (L127-138) — added the call-out you suggested: the set is keyed on task name only, so an override-then-allow-list cutover won't re-announce. Benign given the usage split (override = pipeline headers, allow-list = leaf tasks → no overlap expected), and the allow-list config itself is still announced by _log_allow_list_once.

No logic changed (doc/comment only). 30 dispatch+routing tests green, ruff clean.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit 59716f5 into feat/UN-3445-pg-queue-integration Jun 17, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3561-pg-pipeline-2a-foundation branch June 17, 2026 08:00
muhammad-ali-e added a commit that referenced this pull request Jun 17, 2026
…g_batch_dedup + claim_batch / clear_execution_batches)

Second slice of 9e PR 2, after 2a (#2067). Inert idempotency primitive: the
durable per-batch dedup marker 2c wires into the at-least-once PG path.

Why: the PG queue is at-least-once, so process_file_batch can be redelivered
after a crash-before-ack → re-run batch + double-decrement the barrier
(non-idempotent, max_retries=0). Recon showed existing per-file protection is
only partial (Redis lock released after write; WFE COMPLETED skips tool re-exec
but not necessarily the destination write; FileHistory is cross-execution only),
so a durable per-batch gate is needed; per-file status stays the partial-crash
backstop.

- backend: PgBatchDedup model + migration 0006 — table pg_batch_dedup with a
  UniqueConstraint(execution_id, batch_index) (the ON CONFLICT target; its
  execution_id-leading index also serves the cleanup DELETE). Django-managed,
  extension-free, same posture as the sibling pg_queue models.
- workers (pg_barrier.py, reusing the barrier's _cursor() → one PG conn per
  worker child): claim_batch(execution_id, batch_index) -> bool (atomic
  INSERT ... ON CONFLICT DO NOTHING RETURNING; True=first/decrement,
  False=redelivery/skip) + clear_execution_batches(execution_id) -> int
  (barrier-teardown cleanup; reaper sweep is the backstop).

No call-site wiring — claim/clear + batch_index threading + transport switch
land in 2c. Inert: new table + two helpers, no callers. Net behaviour: NONE.

Tests: +8 real-PG (first-claim, redelivery-rejected, distinct-batch,
distinct-exec, concurrent-exactly-one-winner, clear-only-target,
clear-empty-zero, reclaim-after-clear). Migration applied to dev DB,
makemigrations --check clean, bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
muhammad-ali-e added a commit that referenced this pull request Jun 17, 2026
…g_batch_dedup + claim_batch / clear_execution_batches) (#2068)

* UN-3562 [FEAT] PG Queue 9e PR 2b — per-batch idempotency primitive (pg_batch_dedup + claim_batch / clear_execution_batches)

Second slice of 9e PR 2, after 2a (#2067). Inert idempotency primitive: the
durable per-batch dedup marker 2c wires into the at-least-once PG path.

Why: the PG queue is at-least-once, so process_file_batch can be redelivered
after a crash-before-ack → re-run batch + double-decrement the barrier
(non-idempotent, max_retries=0). Recon showed existing per-file protection is
only partial (Redis lock released after write; WFE COMPLETED skips tool re-exec
but not necessarily the destination write; FileHistory is cross-execution only),
so a durable per-batch gate is needed; per-file status stays the partial-crash
backstop.

- backend: PgBatchDedup model + migration 0006 — table pg_batch_dedup with a
  UniqueConstraint(execution_id, batch_index) (the ON CONFLICT target; its
  execution_id-leading index also serves the cleanup DELETE). Django-managed,
  extension-free, same posture as the sibling pg_queue models.
- workers (pg_barrier.py, reusing the barrier's _cursor() → one PG conn per
  worker child): claim_batch(execution_id, batch_index) -> bool (atomic
  INSERT ... ON CONFLICT DO NOTHING RETURNING; True=first/decrement,
  False=redelivery/skip) + clear_execution_batches(execution_id) -> int
  (barrier-teardown cleanup; reaper sweep is the backstop).

No call-site wiring — claim/clear + batch_index threading + transport switch
land in 2c. Inert: new table + two helpers, no callers. Net behaviour: NONE.

Tests: +8 real-PG (first-claim, redelivery-rejected, distinct-batch,
distinct-exec, concurrent-exactly-one-winner, clear-only-target,
clear-empty-zero, reclaim-after-clear). Migration applied to dev DB,
makemigrations --check clean, bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3562 address review (muhammad-ali-e P1-P8): fix reaper-backstop docstring claim, batch_index constraint, race test, nits

- P1 (verified bug): the docstrings claimed the reaper's barrier-orphan sweep
  reclaims orphaned pg_batch_dedup markers — it doesn't. sweep_expired_barriers
  DELETEs only pg_barrier_state (no cascade), so orphaned markers leak today.
  Reworded both PgBatchDedup + clear_execution_batches docstrings to state the
  leak honestly + flag the dedup-orphan sweep as intended future work.
- P4: add CheckConstraint(batch_index >= 0) (writer-proof, mirrors
  PgQueueMessage.priority) + a test that the DB rejects a negative index.
  Regenerated migration 0006 to include it.
- P6: claim_batch docstring no longer says it decrements; defers the single
  decrement to the caller (the function only inserts the marker).
- P7: generalized "partial per-file protection" → "not fully idempotent on
  redelivery" so the rationale can't rot.
- P5: documented created_at as observability-only (future age-based sweep).
- P2: REQUIRE_PG_TESTS env → skip becomes fail, so the idempotency primitive
  can't ship untested-green in CI where PG is expected.
- P3: strengthened the race test — pre-build N=8 conns in the main thread,
  align claims with threading.Barrier, loop 5 trials (forces the contended
  ON CONFLICT path instead of a serial fast-path).
- P8: hoisted import os to module top.

35 pg_barrier + 9 dedup tests green; migration applied to dev DB,
makemigrations --check clean; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.