UN-3546 [FEAT] Priority-ordered PG-queue dequeue + concurrency-safe claim#2052
UN-3546 [FEAT] Priority-ordered PG-queue dequeue + concurrency-safe claim#2052muhammad-ali-e merged 4 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3546-pg-queue-priority-dequeueZipstack/unstract:UN-3546-pg-queue-priority-dequeueCopy head branch name to clipboard
Conversation
…laim Start enforcing the load-independent part of fairness — pipeline_priority (L3) — directly in the single-table dequeue: higher priority is claimed first, FIFO (msg_id) within a priority. The org-tier (L1) / workload (L2) axes + burst_max admission stay deferred to the fair-admission orchestrator. - Schema: add `priority` (smallint, default 5 = FairnessKey.DEFAULT_PRIORITY) to pg_queue_message; swap the dequeue index to (queue_name, priority DESC, msg_id) so the priority-ordered claim stays an indexed top-N. - Enqueue: dispatch() writes priority from fairness.pipeline_priority; a bare dispatch (fairness=None) writes the neutral default. - Dequeue: ORDER BY priority DESC, msg_id. Also fixes a latent concurrency bug in the original dequeue (9a): `UPDATE ... WHERE msg_id IN (SELECT ... FOR UPDATE SKIP LOCKED LIMIT n)` can OVER-CLAIM under concurrent writers — EvalPlanQual re-evaluates the LIMIT subquery when a row it tried to lock was concurrently touched, so one claim can return more than n rows. Switched to the canonical PGMQ-safe shape: lock candidates in a CTE, then `UPDATE ... FROM locked WHERE q.msg_id = locked.msg_id`, which locks exactly n rows once. The trailing SELECT re-orders RETURNING (which is otherwise unspecified) so batched claims come back in priority order too. Tests: priority selection (one-at-a-time) + batch ordering against real Postgres; send writes priority; dispatch wiring (fairness + neutral default); read param order. Verified live end-to-end via dispatch()->claim-order (9>7>3a>3b>1). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
PR Review Toolkit — automated multi-agent review
Ran six specialized reviewers (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier) over the diff vs feat/UN-3445-pg-queue-integration. Inline comments are attached below; this is the prioritized summary.
The PR is solid — the CTE FROM-join claim (vs the EvalPlanQual-unsafe IN (SELECT … LIMIT n) form) is the correct PGMQ-safe shape, param ordering is right, the migration sequence is valid, and the _enqueue_pg re-raise-with-breadcrumb (no silent Celery fallback) is exactly right. Findings are about hardening, not bugs in the happy path.
🔴 Critical / Important
priorityhas no range enforcement (flagged independently by 4 of 6 agents).client.send()writes anyintstraight to the DB, and theprioritycolumn has no CHECK — onlyFairnessKey.__post_init__validates[1,10], and that sits on just one of the write paths. An out-of-range value is silently durable and corrupts dequeue ordering with no log/error. Today's only caller passes a validated value, but this PR is the plumbing that lets an arbitrary priority through. Fix at both layers: aValueErrorguard insend()(mirroring its existingvt_seconds/qtyguards) and a DBCheckConstraint(the only backstop for the raw-SQL/ORM writer). See inline onclient.pysend() andmodels.py.- Index dropped
vt; the "indexed top-N" comment over-claims. The new(queue_name, priority DESC, msg_id)index removesvt, so visibility is a per-row filter. Under at-least-once delivery, in-flight (future-vt) high-priority rows pile up at the front of the index and get scanned-and-skipped on every claim — per-claim cost grows with the in-flight backlog, notLIMIT. Soften the comment and consider a partial index / benchmark with a realistic in-flight backlog. See inline onclient.py:57andmodels.py:43.
🟡 Test gaps (inline on test_pg_queue_client.py:283)
- The rewrite's whole reason for existing — CTE prevents over-claim under concurrent writers — has zero coverage (
test_no_double_deliveryonly proves disjointness, not the ≤qtybound). vt× priority interaction is untested (a future-vthigh-priority row should be skipped for a visible lower-priority one).- FIFO-within-band for batch claims only exercises single-member bands.
🟢 Comment / cleanup nits (inline)
- Module docstring (
client.py:8-10, outside the diff so not inline-commentable) still describes the oldUPDATE … WHERE msg_id IN (SELECT … FOR UPDATE SKIP LOCKED …)shape this PR replaced and argues against — please update it to the CTE form. models.py:34:FairnessKey.DEFAULT_PRIORITYis not a real symbol (it's module-levelfairness.DEFAULT_PRIORITY).client.py:76: param-order comment is duplicated at the call site (224-225) — drop one.
Generated with Claude Code via the PR Review Toolkit.
- Validate priority at the write boundary: client.send() raises ValueError on out-of-range (mirrors its vt_seconds/qty guards) — an out-of-range value would silently jump/sink the row in the priority DESC claim order. - Add a DB CheckConstraint (priority 1..10) as the backstop no ORM/raw writer can bypass (migration 0003). check= (not condition=) — repo is on Django 4.2. - Soften the "indexed top-N" comments (client.py + models.py): the dequeue is an index walk with vt<=now() as a per-row filter, NOT a guaranteed top-N — vt is not in the index, so in-flight (future-vt) high-priority rows are scanned past on each claim; the orchestrator's admission is the high-backlog answer. Update the module docstring to the CTE FROM-join shape; fix the fairness.DEFAULT_PRIORITY symbol reference; drop the duplicated param-order comment. - Tests: send() range-guard (parametrized) + DB CheckConstraint backstop; concurrent-writer over-claim guard (two readers, no batch exceeds qty — the regression test for the EvalPlanQual fix); vt × priority (visible low beats invisible high); FIFO-within-band for multi-member batch bands. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Review feedback addressed —
|
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/client.py | Core change: rewrites _DEQUEUE_SQL to the EvalPlanQual-safe CTE form (lock in locked, UPDATE via FROM-join in claimed), adds ORDER BY priority DESC, msg_id, and updates send() to write and validate priority. Parameter order correctly updated to match new SQL %s positions. |
| backend/pg_queue/models.py | Adds priority SmallIntegerField(default=5), the CheckConstraint in Meta.constraints, and updates the dequeue index to (queue_name, priority DESC, msg_id). In sync with both migrations. |
| backend/pg_queue/migrations/0002_remove_pgqueuemessage_pg_queue_message_dequeue_idx_and_more.py | Removes the old (queue_name, vt, msg_id) index, adds priority smallint default 5, and recreates the dequeue index as (queue_name, priority DESC, msg_id). Correct Django 4.2 migration; existing rows receive the neutral default 5. |
| backend/pg_queue/migrations/0003_pgqueuemessage_pg_queue_message_priority_range.py | Adds CheckConstraint priority BETWEEN 1 AND 10 in a separate migration. The comment correctly flags the check= to condition= rename needed when Django is upgraded to >= 6.0. |
| workers/queue_backend/dispatch.py | Threads fairness.pipeline_priority (or DEFAULT_PRIORITY for bare dispatches) through to PgQueueClient.send(). Clean and minimal change. |
| workers/tests/test_pg_queue_client.py | Adds comprehensive unit (priority write/reject) and integration tests (priority ordering, batch FIFO, vt×priority interaction, concurrent claim safety, and constraint drift guard). The worker termination assertion and uuid-based queue name fix are both present. |
| workers/tests/test_dispatch_pg.py | Adds TestDispatchPriorityWiring with two mocked tests verifying priority flows from fairness key and uses DEFAULT_PRIORITY for bare dispatches. |
Sequence Diagram
sequenceDiagram
participant D as dispatch()
participant C as PgQueueClient.send()
participant DB as pg_queue_message (DB)
participant R as PgQueueClient.read()
participant W as Consumer Worker
D->>C: send(queue, payload, org_id, priority)
Note over C: validate priority in [MIN,MAX]
C->>DB: "INSERT (queue_name, message, org_id, priority, vt=now())"
DB-->>C: msg_id
W->>R: read(queue, vt_seconds, qty)
R->>DB: "WITH locked AS (SELECT msg_id WHERE queue_name=? AND vt<=now() ORDER BY priority DESC, msg_id FOR UPDATE SKIP LOCKED LIMIT ?)"
Note over DB: Locks exactly qty rows, skips contended rows
DB->>DB: "UPDATE FROM locked SET vt=now()+vt_seconds, read_ct+=1 RETURNING"
DB-->>R: (msg_id, message, read_ct) ORDER BY priority DESC, msg_id
R-->>W: [QueueMessage, ...]
W->>DB: "DELETE WHERE msg_id=? (ack)"
Reviews (3): Last reviewed commit: "UN-3546 [DOCS] Note check=/condition= ha..." | Re-trigger Greptile
- Concurrency test: assert the drain worker terminated after join (a hung worker now fails the test instead of passing silently while conn_b.close() races its in-flight queries). - Priority-bounds drift guard: backend models.py and workers fairness.py are separate codebases that can't import each other, so the DB constraint bounds (1/10) duplicate fairness.MIN/MAX_PRIORITY. Replaced the hardcoded "42" reject test with test_db_check_constraint_matches_fairness_bounds — raw-inserts at MIN/MAX (accepted) and MIN-1/MAX+1 (CheckViolation), pinning the DB constraint to the fairness range so a future widening that misses one side fails loudly. Documented the canonical source in the model comment. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Greptile feedback addressed —
|
…ation Breadcrumb for a future Django upgrade: `check=` is correct on the pinned Django 4.2 (deprecated 5.1, removed 6.0); fresh installs always replay under the shipped Django, so leave it. When the pin reaches >= 6.0, squash (or do the behaviour-preserving check= -> condition= edit) so a from-scratch migrate runs. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
0955df9
into
feat/UN-3445-pg-queue-integration
|
Starts enforcing the load-independent part of fairness —
pipeline_priority(L3) — directly in the single-table dequeue. The fairness key already flowed on every PG message but was pure pass-through; now a higher-priority task is claimed first, FIFO within a priority. Targetsfeat/UN-3445-pg-queue-integration.While testing the new ordering, the claim over-claimed under concurrency —
read(qty=1)returned 2 rows when other transactions touched the table. Root cause is in the original dequeue, not the new ordering:UPDATE … WHERE msg_id IN (SELECT … FOR UPDATE SKIP LOCKED LIMIT n)— EvalPlanQual re-evaluates theLIMITsubquery when a row it tried to lock was concurrently modified, so a single claim can return more thannrows. With concurrent consumers that means aread(qty=1)could pull (and bump the vt on) extra messages.Fix — the canonical PGMQ-safe shape: lock candidates in a CTE, then
UPDATE … FROM locked WHERE q.msg_id = locked.msg_id(locks exactlynonce). The trailingSELECTre-ordersRETURNING(otherwise unspecified) so batched claims also come back in priority order.This benefits the whole queue (every caller), not just priority ordering.
What
models.py+ migration0002):priority smallint default 5(=FairnessKey.DEFAULT_PRIORITY); dequeue index swapped to(queue_name, priority DESC, msg_id)so the priority claim stays an indexed top-N (no full-backlog sort).dispatch()writespriorityfromfairness.pipeline_priority; a bare dispatch (fairness=None) writes the neutral default.ORDER BY priority DESC, msg_id(higher first, FIFO within a priority).Deliberately deferred (need load / belong with the orchestrator)
L1 org-tier + L2 workload ordering,
org_config/burst_maxadmission,staging_queue/task_queuesplit,orchestrator_lockleader election — the multi-org fair-admission orchestrator, justified once the pipeline routes cross-org load through PG.Testing
send()writes priority; dispatch wiring (fairness + neutral default); read param order.dispatch()path → drained in claim order9 > 7 > 3a > 3b(FIFO) > 1.pg_queue_messagewithpriority: 5(neutral,fairness=null) — column populated end-to-end through the backend producer.🤖 Generated with Claude Code