UN-3561 [FEAT] PG Queue 9e PR 2a — live-PG-pipeline inert foundation (dispatch backend override + barrier decrement core)#2067
UN-3561 [FEAT] PG Queue 9e PR 2a — live-PG-pipeline inert foundation (dispatch backend override + barrier decrement core)#2067muhammad-ali-e merged 3 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3561-pg-pipeline-2a-foundationZipstack/unstract:UN-3561-pg-pipeline-2a-foundationCopy head branch name to clipboard
Conversation
…(dispatch backend override + barrier decrement core) First slice of 9e PR 2 (the live PG execution pipeline), split 2a/2b/2c. 2a is the inert foundation: the two seams the live switch (2c) consumes, each with zero behaviour change on the default path. - dispatch(backend=...): per-call transport override. None (default, every call site today) keeps the env allow-list decision via select_backend — byte-identical. When set it wins over the allow-list, so 2c can route a whole execution's header/callback onto PG without opting their task names into WORKER_PG_QUEUE_ENABLED_TASKS (allow-list is for leaf tasks; the pipeline's migration unit is the execution). - Extract _barrier_pg_decrement(...) plain core out of the @worker_task barrier_pg_decr_and_check (now a thin delegator). 2c calls the core in-body on the PG-consumed path (a PG-consumed task fires no Celery .link, so the decrement runs in-body — fire-and-forget self-chaining). Inert by construction: default barrier backend is chord (Celery executions never import the PgBarrier module), and no call site passes backend=. Net behaviour change: NONE. Transport threading + live switch land in 2c; per-batch idempotency in 2b. Tests: dispatch backend-override (3) + decrement-core extraction (3, incl. real-PG in-body decrement + verbatim delegation). pg_barrier/dispatch/ barrier/routing suites green; ruff clean; worker-app bootstrap clean under WORKER_BARRIER_BACKEND=pg (both barrier tasks registered, get_barrier→PgBarrier). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
PR Review Toolkit — automated review (6 agents)
Reviewed by Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, and Code Simplifier against base feat/UN-3445-pg-queue-integration.
Verdict: no blocking issues. The _barrier_pg_decrement extraction is byte-for-byte faithful (verified via git show), the Celery wrapper delegates verbatim with max_retries=0 preserved, and the backend override is correctly inert with no live caller. Code Reviewer and Code Simplifier found nothing to change.
8 inline findings below — all MEDIUM/LOW (nice-to-have before the 2c follow-up lands a real caller). Priority order:
- MEDIUM —
_barrier_pg_decrement's "own committed transaction / no retry" contract is doc-only; a future in-body caller can silently corrupt the count. (pg_barrier.py:268) - MEDIUM — override + fairness/priority interaction is untested, yet that's the override's whole reason to exist. (test_dispatch_pg.py:124)
- MEDIUM — delegation test is a mock-tautology; strengthen to a DB-backed equivalence check. (test_pg_barrier.py:353)
- MEDIUM — docstring cross-ref mismatch + term overload: "carried in the payload" vs
routing.py'sExecutionContext. (dispatch.py:95) - LOW — comment inaccuracy: "one call = one
_cursor()txn" (actually up to two). (pg_barrier.py:268) - LOW — type:
backend: QueueBackend | NoneoverloadsNoneas a third AUTO state. (dispatch.py:80) - LOW — type: untyped
dict[str, Any]return vs the typedCallbackDescriptorinput. (pg_barrier.py:249) - LOW — rot-prone comment describing a not-yet-existent 2c mechanism. (pg_barrier.py:266)
…esolve_backend helper, comment/test fixes - pg_barrier: enforce the "own committed transaction" decrement contract loudly — _barrier_pg_decrement raises at entry if the shared connection is mid-transaction (was prose-only; the 2c in-body caller is the real risk). Safe for existing paths: Celery .link enters idle, tests use autocommit conns → always idle. Fix the inaccurate "one call = one _cursor() txn" parenthetical (the delete paths open a 2nd txn) and drop the rot-prone "(fire-and-forget self-chaining)" jargon. - routing: extract resolve_backend(task_name, override) — the override-wins- else-allow-list precedence now lives in one self-documenting place (2c reuses it); dispatch() calls it instead of inlining the None-means-auto rule. - dispatch: reword the backend= docstring — avoid the "payload" term collision (local to_payload var) and the stale routing.py cross-ref; point at the live carrier WorkflowContextData.transport + 9e-design.md. - tests: +fairness-reaches-row on the backend= override path; +real-row wrapper test that catches core-param drift (the mocked delegation test couldn't); +open-transaction guard test. Kept the mocked test as the keyword-forwarding pin. Deferred (LOW, reviewer-aligned): BarrierDecrementResult TypedDict union — lands in 2c when the in-body caller actually branches on the status. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review round addressed —
|
…ssion note All doc/comment-only, no logic change. greptile gave 4/5 "safe to merge"; these are the staleness its findings flagged, introduced by the resolve_backend extraction in the prior review round: - dispatch module docstring + routing "Scaffold posture" now name resolve_backend (wrapping select_backend) as the routing seam, not select_backend alone. - Note at the _pg_routing_logged log-once site that it's keyed on task name only, so an override-then-allow-list cutover won't re-announce (benign: override = pipeline headers vs allow-list = leaf tasks, no overlap expected; the allow-list config is still announced by _log_allow_list_once). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@greptile-apps round addressed —
|
|
59716f5
into
feat/UN-3445-pg-queue-integration
…g_batch_dedup + claim_batch / clear_execution_batches) Second slice of 9e PR 2, after 2a (#2067). Inert idempotency primitive: the durable per-batch dedup marker 2c wires into the at-least-once PG path. Why: the PG queue is at-least-once, so process_file_batch can be redelivered after a crash-before-ack → re-run batch + double-decrement the barrier (non-idempotent, max_retries=0). Recon showed existing per-file protection is only partial (Redis lock released after write; WFE COMPLETED skips tool re-exec but not necessarily the destination write; FileHistory is cross-execution only), so a durable per-batch gate is needed; per-file status stays the partial-crash backstop. - backend: PgBatchDedup model + migration 0006 — table pg_batch_dedup with a UniqueConstraint(execution_id, batch_index) (the ON CONFLICT target; its execution_id-leading index also serves the cleanup DELETE). Django-managed, extension-free, same posture as the sibling pg_queue models. - workers (pg_barrier.py, reusing the barrier's _cursor() → one PG conn per worker child): claim_batch(execution_id, batch_index) -> bool (atomic INSERT ... ON CONFLICT DO NOTHING RETURNING; True=first/decrement, False=redelivery/skip) + clear_execution_batches(execution_id) -> int (barrier-teardown cleanup; reaper sweep is the backstop). No call-site wiring — claim/clear + batch_index threading + transport switch land in 2c. Inert: new table + two helpers, no callers. Net behaviour: NONE. Tests: +8 real-PG (first-claim, redelivery-rejected, distinct-batch, distinct-exec, concurrent-exactly-one-winner, clear-only-target, clear-empty-zero, reclaim-after-clear). Migration applied to dev DB, makemigrations --check clean, bootstrap clean under WORKER_BARRIER_BACKEND=pg. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…g_batch_dedup + claim_batch / clear_execution_batches) (#2068) * UN-3562 [FEAT] PG Queue 9e PR 2b — per-batch idempotency primitive (pg_batch_dedup + claim_batch / clear_execution_batches) Second slice of 9e PR 2, after 2a (#2067). Inert idempotency primitive: the durable per-batch dedup marker 2c wires into the at-least-once PG path. Why: the PG queue is at-least-once, so process_file_batch can be redelivered after a crash-before-ack → re-run batch + double-decrement the barrier (non-idempotent, max_retries=0). Recon showed existing per-file protection is only partial (Redis lock released after write; WFE COMPLETED skips tool re-exec but not necessarily the destination write; FileHistory is cross-execution only), so a durable per-batch gate is needed; per-file status stays the partial-crash backstop. - backend: PgBatchDedup model + migration 0006 — table pg_batch_dedup with a UniqueConstraint(execution_id, batch_index) (the ON CONFLICT target; its execution_id-leading index also serves the cleanup DELETE). Django-managed, extension-free, same posture as the sibling pg_queue models. - workers (pg_barrier.py, reusing the barrier's _cursor() → one PG conn per worker child): claim_batch(execution_id, batch_index) -> bool (atomic INSERT ... ON CONFLICT DO NOTHING RETURNING; True=first/decrement, False=redelivery/skip) + clear_execution_batches(execution_id) -> int (barrier-teardown cleanup; reaper sweep is the backstop). No call-site wiring — claim/clear + batch_index threading + transport switch land in 2c. Inert: new table + two helpers, no callers. Net behaviour: NONE. Tests: +8 real-PG (first-claim, redelivery-rejected, distinct-batch, distinct-exec, concurrent-exactly-one-winner, clear-only-target, clear-empty-zero, reclaim-after-clear). Migration applied to dev DB, makemigrations --check clean, bootstrap clean under WORKER_BARRIER_BACKEND=pg. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3562 address review (muhammad-ali-e P1-P8): fix reaper-backstop docstring claim, batch_index constraint, race test, nits - P1 (verified bug): the docstrings claimed the reaper's barrier-orphan sweep reclaims orphaned pg_batch_dedup markers — it doesn't. sweep_expired_barriers DELETEs only pg_barrier_state (no cascade), so orphaned markers leak today. Reworded both PgBatchDedup + clear_execution_batches docstrings to state the leak honestly + flag the dedup-orphan sweep as intended future work. - P4: add CheckConstraint(batch_index >= 0) (writer-proof, mirrors PgQueueMessage.priority) + a test that the DB rejects a negative index. Regenerated migration 0006 to include it. - P6: claim_batch docstring no longer says it decrements; defers the single decrement to the caller (the function only inserts the marker). - P7: generalized "partial per-file protection" → "not fully idempotent on redelivery" so the rationale can't rot. - P5: documented created_at as observability-only (future age-based sweep). - P2: REQUIRE_PG_TESTS env → skip becomes fail, so the idempotency primitive can't ship untested-green in CI where PG is expected. - P3: strengthened the race test — pre-build N=8 conns in the main thread, align claims with threading.Barrier, loop 5 trials (forces the contended ON CONFLICT path instead of a serial fast-path). - P8: hoisted import os to module top. 35 pg_barrier + 9 dedup tests green; migration applied to dev DB, makemigrations --check clean; ruff clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
What
First slice of 9e PR 2 (the live PG execution pipeline), split into 2a → 2b → 2c. This is 2a, the inert foundation — the two seams the live switch (2c) consumes, each with zero behaviour change on the default path.
1.
dispatch(backend=...)— per-call transport overridedispatch()gains an optionalbackend: QueueBackend | None = None:None(the default, and every call site today) → transport is the env allow-list decision viaselect_backend()— byte-identical to today.WORKER_PG_QUEUE_ENABLED_TASKS. The allow-list is for leaf tasks; the coupled pipeline's migration unit is the execution (carried in the payload — seerouting.py).2. Extract
_barrier_pg_decrement(...)coreThe decrement logic moves out of the
@worker_task barrier_pg_decr_and_checkinto a plain_barrier_pg_decrement(...)function; the@worker_taskbecomes a thin delegator. The plain core is what 2c calls in-body on the PG-consumed path — a PG-consumed task fires no Celery.link, so the decrement must run in-body (fire-and-forget self-chaining). Behaviour for the existing Celery.linkpath is unchanged (the wrapper forwards verbatim).Why it's inert
chord→ Celery executions never import thePgBarriermodule, so the extraction is invisible to them.backend=→ the Celery dispatch path is byte-identical.transport=="pg_queue"switch land in 2c (threading an unused param earlier just trips S1172); per-batch idempotency lands in 2b.Tests
@worker_taskdelegates verbatim.test_pg_barrier(33) +test_dispatch_pg(10) + barrier/routing (53) suites green;ruffclean.WORKER_BARRIER_BACKEND=pg: both barrier tasks registered under canonical names,get_barrier()→PgBarrier.Base
Targets the long-lived
feat/UN-3445-pg-queue-integrationintegration branch (notmain).🤖 Generated with Claude Code