Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3546 [FEAT] Priority-ordered PG-queue dequeue + concurrency-safe claim#2052

Merged
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3546-pg-queue-priority-dequeueZipstack/unstract:UN-3546-pg-queue-priority-dequeueCopy head branch name to clipboard
Jun 12, 2026
Merged

UN-3546 [FEAT] Priority-ordered PG-queue dequeue + concurrency-safe claim#2052
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3546-pg-queue-priority-dequeueZipstack/unstract:UN-3546-pg-queue-priority-dequeueCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

Starts enforcing the load-independent part of fairness — pipeline_priority (L3) — directly in the single-table dequeue. The fairness key already flowed on every PG message but was pure pass-through; now a higher-priority task is claimed first, FIFO within a priority. Targets feat/UN-3445-pg-queue-integration.

⚠️ Also fixes a latent concurrency bug in the core dequeue (9a)

While testing the new ordering, the claim over-claimed under concurrencyread(qty=1) returned 2 rows when other transactions touched the table. Root cause is in the original dequeue, not the new ordering:

UPDATE … WHERE msg_id IN (SELECT … FOR UPDATE SKIP LOCKED LIMIT n)EvalPlanQual re-evaluates the LIMIT subquery when a row it tried to lock was concurrently modified, so a single claim can return more than n rows. With concurrent consumers that means a read(qty=1) could pull (and bump the vt on) extra messages.

Fix — the canonical PGMQ-safe shape: lock candidates in a CTE, then UPDATE … FROM locked WHERE q.msg_id = locked.msg_id (locks exactly n once). The trailing SELECT re-orders RETURNING (otherwise unspecified) so batched claims also come back in priority order.

WITH locked AS (
    SELECT msg_id FROM pg_queue_message
     WHERE queue_name = %s AND vt <= now()
     ORDER BY priority DESC, msg_id
     FOR UPDATE SKIP LOCKED LIMIT %s
), claimed AS (
    UPDATE pg_queue_message q SET vt = now() + …, read_ct = read_ct + 1
      FROM locked WHERE q.msg_id = locked.msg_id
    RETURNING q.msg_id, q.message, q.read_ct, q.priority
)
SELECT msg_id, message, read_ct FROM claimed ORDER BY priority DESC, msg_id

This benefits the whole queue (every caller), not just priority ordering.

What

  • Schema (models.py + migration 0002): priority smallint default 5 (= FairnessKey.DEFAULT_PRIORITY); dequeue index swapped to (queue_name, priority DESC, msg_id) so the priority claim stays an indexed top-N (no full-backlog sort).
  • Enqueue: dispatch() writes priority from fairness.pipeline_priority; a bare dispatch (fairness=None) writes the neutral default.
  • Dequeue: ORDER BY priority DESC, msg_id (higher first, FIFO within a priority).

Deliberately deferred (need load / belong with the orchestrator)

L1 org-tier + L2 workload ordering, org_config/burst_max admission, staging_queue/task_queue split, orchestrator_lock leader election — the multi-org fair-admission orchestrator, justified once the pipeline routes cross-org load through PG.

Testing

  • Integration (real Postgres): priority selection one-at-a-time + batch ordering; 48 tests pass 5× consecutively across client+dispatch+routing (the combo that exposed the over-claim).
  • Unit: send() writes priority; dispatch wiring (fairness + neutral default); read param order.
  • Live end-to-end: dispatched 5 mixed-priority tasks via the real dispatch() path → drained in claim order 9 > 7 > 3a > 3b(FIFO) > 1.
  • Real flow: live API/ETL workflow notifications land in pg_queue_message with priority: 5 (neutral, fairness=null) — column populated end-to-end through the backend producer.
  • Migration applies clean; descending index confirmed in the DB.

🤖 Generated with Claude Code

…laim

Start enforcing the load-independent part of fairness — pipeline_priority
(L3) — directly in the single-table dequeue: higher priority is claimed
first, FIFO (msg_id) within a priority. The org-tier (L1) / workload (L2)
axes + burst_max admission stay deferred to the fair-admission orchestrator.

- Schema: add `priority` (smallint, default 5 = FairnessKey.DEFAULT_PRIORITY)
  to pg_queue_message; swap the dequeue index to (queue_name, priority DESC,
  msg_id) so the priority-ordered claim stays an indexed top-N.
- Enqueue: dispatch() writes priority from fairness.pipeline_priority; a bare
  dispatch (fairness=None) writes the neutral default.
- Dequeue: ORDER BY priority DESC, msg_id.

Also fixes a latent concurrency bug in the original dequeue (9a):
`UPDATE ... WHERE msg_id IN (SELECT ... FOR UPDATE SKIP LOCKED LIMIT n)` can
OVER-CLAIM under concurrent writers — EvalPlanQual re-evaluates the LIMIT
subquery when a row it tried to lock was concurrently touched, so one claim
can return more than n rows. Switched to the canonical PGMQ-safe shape: lock
candidates in a CTE, then `UPDATE ... FROM locked WHERE q.msg_id = locked.msg_id`,
which locks exactly n rows once. The trailing SELECT re-orders RETURNING (which
is otherwise unspecified) so batched claims come back in priority order too.

Tests: priority selection (one-at-a-time) + batch ordering against real
Postgres; send writes priority; dispatch wiring (fairness + neutral default);
read param order. Verified live end-to-end via dispatch()->claim-order (9>7>3a>3b>1).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 18086c96-5357-48c1-86f7-5fe15523004f

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3546-pg-queue-priority-dequeue

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review Toolkit — automated multi-agent review

Ran six specialized reviewers (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier) over the diff vs feat/UN-3445-pg-queue-integration. Inline comments are attached below; this is the prioritized summary.

The PR is solid — the CTE FROM-join claim (vs the EvalPlanQual-unsafe IN (SELECT … LIMIT n) form) is the correct PGMQ-safe shape, param ordering is right, the migration sequence is valid, and the _enqueue_pg re-raise-with-breadcrumb (no silent Celery fallback) is exactly right. Findings are about hardening, not bugs in the happy path.

🔴 Critical / Important

  1. priority has no range enforcement (flagged independently by 4 of 6 agents). client.send() writes any int straight to the DB, and the priority column has no CHECK — only FairnessKey.__post_init__ validates [1,10], and that sits on just one of the write paths. An out-of-range value is silently durable and corrupts dequeue ordering with no log/error. Today's only caller passes a validated value, but this PR is the plumbing that lets an arbitrary priority through. Fix at both layers: a ValueError guard in send() (mirroring its existing vt_seconds/qty guards) and a DB CheckConstraint (the only backstop for the raw-SQL/ORM writer). See inline on client.py send() and models.py.
  2. Index dropped vt; the "indexed top-N" comment over-claims. The new (queue_name, priority DESC, msg_id) index removes vt, so visibility is a per-row filter. Under at-least-once delivery, in-flight (future-vt) high-priority rows pile up at the front of the index and get scanned-and-skipped on every claim — per-claim cost grows with the in-flight backlog, not LIMIT. Soften the comment and consider a partial index / benchmark with a realistic in-flight backlog. See inline on client.py:57 and models.py:43.

🟡 Test gaps (inline on test_pg_queue_client.py:283)

  • The rewrite's whole reason for existing — CTE prevents over-claim under concurrent writers — has zero coverage (test_no_double_delivery only proves disjointness, not the ≤qty bound).
  • vt × priority interaction is untested (a future-vt high-priority row should be skipped for a visible lower-priority one).
  • FIFO-within-band for batch claims only exercises single-member bands.

🟢 Comment / cleanup nits (inline)

  • Module docstring (client.py:8-10, outside the diff so not inline-commentable) still describes the old UPDATE … WHERE msg_id IN (SELECT … FOR UPDATE SKIP LOCKED …) shape this PR replaced and argues against — please update it to the CTE form.
  • models.py:34: FairnessKey.DEFAULT_PRIORITY is not a real symbol (it's module-level fairness.DEFAULT_PRIORITY).
  • client.py:76: param-order comment is duplicated at the call site (224-225) — drop one.

Generated with Claude Code via the PR Review Toolkit.

Comment thread workers/queue_backend/pg_queue/client.py
Comment thread backend/pg_queue/models.py
Comment thread workers/queue_backend/pg_queue/client.py
Comment thread backend/pg_queue/models.py Outdated
Comment thread backend/pg_queue/models.py Outdated
Comment thread workers/queue_backend/pg_queue/client.py Outdated
Comment thread workers/tests/test_pg_queue_client.py
- Validate priority at the write boundary: client.send() raises ValueError on
  out-of-range (mirrors its vt_seconds/qty guards) — an out-of-range value
  would silently jump/sink the row in the priority DESC claim order.
- Add a DB CheckConstraint (priority 1..10) as the backstop no ORM/raw writer
  can bypass (migration 0003). check= (not condition=) — repo is on Django 4.2.
- Soften the "indexed top-N" comments (client.py + models.py): the dequeue is
  an index walk with vt<=now() as a per-row filter, NOT a guaranteed top-N —
  vt is not in the index, so in-flight (future-vt) high-priority rows are
  scanned past on each claim; the orchestrator's admission is the high-backlog
  answer. Update the module docstring to the CTE FROM-join shape; fix the
  fairness.DEFAULT_PRIORITY symbol reference; drop the duplicated param-order
  comment.
- Tests: send() range-guard (parametrized) + DB CheckConstraint backstop;
  concurrent-writer over-claim guard (two readers, no batch exceeds qty —
  the regression test for the EvalPlanQual fix); vt × priority (visible low
  beats invisible high); FIFO-within-band for multi-member batch bands.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review feedback addressed — 15d1d45cf

Thanks — all findings handled (threads replied inline).

🔴 Important

  • priority validationsend() now raises on out-of-range (mirrors its vt_seconds/qty guards), plus a DB CheckConstraint(1..10) (migration 0003) as the backstop no ORM/raw writer can bypass. (check=, since the repo is Django 4.2; condition= is 5.1+.)
  • index "top-N" overclaim — softened both comments: it's an index walk with vt <= now() as a per-row filter, not a guaranteed top-N; in-flight high-priority rows are scanned past (cheap at low depth; orchestrator admission is the high-backlog answer). A vt <= now() partial index isn't possible (now() is STABLE), so I documented the limit; benchmarking under a realistic backlog noted as follow-up.

🟡 Tests — added all three: concurrent-writer over-claim guard (the regression test for the EvalPlanQual fix), vt × priority (visible low beats invisible high), and FIFO-within-band for multi-member batch bands. Plus the send() range-guard + DB-constraint backstop tests.

🟢 Nits — module docstring updated to the CTE FROM-join shape; fairness.DEFAULT_PRIORITY symbol fixed; duplicate param-order comment dropped.

Verified: 56 tests pass 3× consecutively (client + dispatch + routing); migration 0003 applies clean.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 12, 2026 14:14
@greptile-apps

greptile-apps Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR enforces pipeline_priority (fairness L3) in the single-table PG-queue dequeue and simultaneously fixes a latent concurrency bug in the original claim logic. The dequeue index is replaced with (queue_name, priority DESC, msg_id) and _DEQUEUE_SQL is rewritten to the EvalPlanQual-safe CTE form that locks exactly qty rows once, preventing the over-claim that the old UPDATE … WHERE msg_id IN (SELECT … FOR UPDATE SKIP LOCKED LIMIT n) could produce under concurrent writers.

  • Schema + migrations: priority smallint default 5 added in 0002, CheckConstraint [1,10] in 0003, dequeue index updated. Both migrations replay cleanly from scratch.
  • Enqueue: dispatch() now passes fairness.pipeline_priority (or DEFAULT_PRIORITY) to PgQueueClient.send(), which validates the range before the DB insert; the CheckConstraint is the backstop for any raw writer.
  • Dequeue / concurrency fix: The CTE form (lockedclaimed → outer ORDER BY) correctly selects the top-priority visible rows, updates exactly those rows, and returns results in a stable priority+FIFO order for both qty=1 and batch claims.

Confidence Score: 5/5

Safe to merge. The CTE rewrite directly addresses a real over-claim defect that was reproducible under concurrent load, and the priority ordering is a straightforward additive column with a DB-level check constraint as a backstop.

All changes are behaviorally correct and thoroughly tested — 48 integration tests including priority ordering, batch FIFO, vt×priority interaction, and a concurrent two-reader drain test that specifically exercises the over-claim fix. Schema, application-level guards, and DB constraints are mutually reinforcing. The SQL CTE shape matches the canonical PGMQ-safe dequeue pattern. No functional regressions were identified.

No files require special attention. workers/queue_backend/pg_queue/client.py carries the most weight (the dequeue SQL rewrite and parameter order change) but is validated by both unit assertions on the exact param tuple and live integration tests.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/client.py Core change: rewrites _DEQUEUE_SQL to the EvalPlanQual-safe CTE form (lock in locked, UPDATE via FROM-join in claimed), adds ORDER BY priority DESC, msg_id, and updates send() to write and validate priority. Parameter order correctly updated to match new SQL %s positions.
backend/pg_queue/models.py Adds priority SmallIntegerField(default=5), the CheckConstraint in Meta.constraints, and updates the dequeue index to (queue_name, priority DESC, msg_id). In sync with both migrations.
backend/pg_queue/migrations/0002_remove_pgqueuemessage_pg_queue_message_dequeue_idx_and_more.py Removes the old (queue_name, vt, msg_id) index, adds priority smallint default 5, and recreates the dequeue index as (queue_name, priority DESC, msg_id). Correct Django 4.2 migration; existing rows receive the neutral default 5.
backend/pg_queue/migrations/0003_pgqueuemessage_pg_queue_message_priority_range.py Adds CheckConstraint priority BETWEEN 1 AND 10 in a separate migration. The comment correctly flags the check= to condition= rename needed when Django is upgraded to >= 6.0.
workers/queue_backend/dispatch.py Threads fairness.pipeline_priority (or DEFAULT_PRIORITY for bare dispatches) through to PgQueueClient.send(). Clean and minimal change.
workers/tests/test_pg_queue_client.py Adds comprehensive unit (priority write/reject) and integration tests (priority ordering, batch FIFO, vt×priority interaction, concurrent claim safety, and constraint drift guard). The worker termination assertion and uuid-based queue name fix are both present.
workers/tests/test_dispatch_pg.py Adds TestDispatchPriorityWiring with two mocked tests verifying priority flows from fairness key and uses DEFAULT_PRIORITY for bare dispatches.

Sequence Diagram

sequenceDiagram
    participant D as dispatch()
    participant C as PgQueueClient.send()
    participant DB as pg_queue_message (DB)
    participant R as PgQueueClient.read()
    participant W as Consumer Worker

    D->>C: send(queue, payload, org_id, priority)
    Note over C: validate priority in [MIN,MAX]
    C->>DB: "INSERT (queue_name, message, org_id, priority, vt=now())"
    DB-->>C: msg_id

    W->>R: read(queue, vt_seconds, qty)
    R->>DB: "WITH locked AS (SELECT msg_id WHERE queue_name=? AND vt<=now() ORDER BY priority DESC, msg_id FOR UPDATE SKIP LOCKED LIMIT ?)"
    Note over DB: Locks exactly qty rows, skips contended rows
    DB->>DB: "UPDATE FROM locked SET vt=now()+vt_seconds, read_ct+=1 RETURNING"
    DB-->>R: (msg_id, message, read_ct) ORDER BY priority DESC, msg_id
    R-->>W: [QueueMessage, ...]

    W->>DB: "DELETE WHERE msg_id=? (ack)"
Loading

Reviews (3): Last reviewed commit: "UN-3546 [DOCS] Note check=/condition= ha..." | Re-trigger Greptile

Comment thread workers/tests/test_pg_queue_client.py
Comment thread backend/pg_queue/models.py
- Concurrency test: assert the drain worker terminated after join (a hung
  worker now fails the test instead of passing silently while conn_b.close()
  races its in-flight queries).
- Priority-bounds drift guard: backend models.py and workers fairness.py are
  separate codebases that can't import each other, so the DB constraint bounds
  (1/10) duplicate fairness.MIN/MAX_PRIORITY. Replaced the hardcoded "42" reject
  test with test_db_check_constraint_matches_fairness_bounds — raw-inserts at
  MIN/MAX (accepted) and MIN-1/MAX+1 (CheckViolation), pinning the DB constraint
  to the fairness range so a future widening that misses one side fails loudly.
  Documented the canonical source in the model comment.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Greptile feedback addressed — b68ec9fb4

  • Concurrency test livenessassert not worker.is_alive() after the join, so a hung drain fails loudly instead of passing silently (and racing conn_b.close()).
  • Priority-bounds drift — replaced the hardcoded 42 reject test with test_db_check_constraint_matches_fairness_bounds: raw-inserts at MIN_PRIORITY/MAX_PRIORITY (accepted) and just outside (CheckViolation), all from fairness.py — so the DB constraint and the app's fairness range can't silently diverge. Model comment documents fairness.py as canonical. (A shared @unstract/ constant is the proper long-term fix; offered as a follow-up since backend/workers can't import each other today.)

10 integration tests green; both findings were non-blocking (Greptile 4/5, safe-to-merge).

…ation

Breadcrumb for a future Django upgrade: `check=` is correct on the pinned
Django 4.2 (deprecated 5.1, removed 6.0); fresh installs always replay under
the shipped Django, so leave it. When the pin reaches >= 6.0, squash (or do the
behaviour-preserving check= -> condition= edit) so a from-scratch migrate runs.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e muhammad-ali-e merged commit 0955df9 into feat/UN-3445-pg-queue-integration Jun 12, 2026
4 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3546-pg-queue-priority-dequeue branch June 12, 2026 14:38
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.