Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3539 [FEAT] PG Queue Phase 9c — consumer poll loop (claim → run → ack)#2045

Merged
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3539-pg-queue-consumerZipstack/unstract:UN-3539-pg-queue-consumerCopy head branch name to clipboard
Jun 12, 2026
Merged

UN-3539 [FEAT] PG Queue Phase 9c — consumer poll loop (claim → run → ack)#2045
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3539-pg-queue-consumerZipstack/unstract:UN-3539-pg-queue-consumerCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

The consumer half — makes a PG-routed task actually execute. PgQueueConsumer drains pg_queue_message and runs each claimed task in-process. Completes the leaf-first end-to-end path: 8a route → 9b enqueue → 9a store → 9c consume+run.

Targets feat/UN-3445-pg-queue-integration.

Validated live (dev stack)

A real API execution's Slack webhook was opted into PG and the full loop ran end-to-end:

dispatch(send_webhook_notification) → pg_queue_message row (queue=notifications)
consumer.poll_once() → claim → run → Webhook sent to Slack (HTTP 200) → ack (row removed)

A production task executed entirely off Postgres, no Celery broker.

Changes

  • queue_backend/pg_queue/consumer.py (new)PgQueueConsumer:
    • poll_once() — claims a batch via PgQueueClient.read (SKIP LOCKED + visibility timeout), runs each task via current_app.tasks[name].apply(args, kwargs, throw=True) (executes the body in-process), and acks by delete() on success.
    • Task failure → leave the row; its vt expires and it's redelivered (at-least-once → tasks must be idempotent). Unknown task → drop + logger.error (no poison loop).
    • run() — poll loop with empty-queue backoff + SIGTERM/SIGINT graceful stop. main()python -m queue_backend.pg_queue.consumer, env-configured.
  • tests — 5 unit (run+ack, fail→no-ack, unknown→drop, empty, graceful stop) + 1 real-PG integration (enqueue → poll → execute → ack).

Deployment note (not consumer logic)

The consumer process must bootstrap the worker app (import the task modules) so current_app.tasks resolves the task — otherwise it treats every task as unknown. This is an entrypoint/rollout concern: the rollout phase wires a worker-pg-queue-consumer container that boots the worker app before polling. Documented in the module docstring.

Out of scope (later)

  • Poison-message dead-letter via read_ct cap; multi-queue fan-out; the single-orchestrator + fair admission (9d); container/K8s wiring (rollout).

Ticket: UN-3539 (parent UN-3536, epic UN-3445)

🤖 Generated with Claude Code

…ack)

PgQueueConsumer drains pg_queue_message and runs each claimed task
in-process: poll_once() claims a batch (SKIP LOCKED + vt via
PgQueueClient.read), runs it via current_app.tasks[name].apply(throw=True),
and acks by deleting on success. Task failure -> leave the row (vt expiry
redelivers, at-least-once); unknown task -> drop + error (no poison loop).
run() adds an empty-queue backoff loop + SIGTERM/SIGINT graceful stop;
main() is a `python -m` entrypoint (env-configured).

Completes the leaf-first end-to-end path: 8a route -> 9b enqueue -> 9a
store -> 9c consume+run. Validated live on the dev stack: a real
send_webhook_notification routed to PG was claimed by the consumer, POSTed
to Slack (HTTP 200), and acked (row removed).

Deployment note: the consumer PROCESS must bootstrap the worker app (import
the task modules) so current_app.tasks resolves the task — an
entrypoint/rollout concern, not consumer logic. Documented in the module
docstring; the rollout phase wires a consumer container that boots the app.

Tests: 5 unit (run+ack, fail->no-ack, unknown->drop, empty, graceful stop)
+ 1 real-PG integration (enqueue -> poll -> execute -> ack).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 188d0788-ebda-4678-b081-2d886149321b

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3539-pg-queue-consumer

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

…Cloud)

The connect-to-dev-DB + skip-if-unreachable/unmigrated block was copy-pasted
across test_pg_queue_client / test_dispatch_pg / test_pg_queue_consumer
(6.3% duplication on new code, over the 3% gate). Extracted into shared
pg_conn / pg_client fixtures + an integration_pg_conn() helper in
tests/conftest.py; the three files now use them. Behaviour unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated review — PG Queue Phase 9c consumer

Ran six specialised review passes (code review, silent-failure hunt, type design, test coverage, comment accuracy, simplification) over the 5-file diff. The consumer is clean, well-documented, and the conftest dedup is correct. Findings below are posted inline.

Headline: five of six passes independently flagged the same issue — the consumer silently drops the fairness (and queue) payload field that the producer serialises and that task_payload.py documents the consumer as rebuilding into the x-fairness-key header. That is the one I'd resolve (implement it, or correct the contract docstring) before merge. Everything else is Important/Suggestion-grade.

Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/consumer.py Outdated
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/consumer.py Outdated
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/tests/test_pg_queue_consumer.py
- fairness header rebuilt from the payload on the PG run path (was dropped)
  — a PG-routed run now mirrors the Celery dispatch contract.
- poison-message guard: surface read_ct (QueueMessage + dequeue RETURNING),
  drop a task that keeps failing past max_attempts (default 5, env
  WORKER_PG_QUEUE_CONSUMER_MAX_ATTEMPTS) with a loud ERROR carrying the
  payload, instead of redelivering forever.
- malformed payload (missing task_name) -> distinct "missing task_name"
  drop log, not a misleading "unknown task None".
- run() wraps poll_once() so a transient read/DB blip backs off and
  continues instead of tearing down the loop (the client self-recovers).
- ack: WARN when delete() finds no row (task exceeded vt -> possible
  double-run).
- __init__ validates positive tuning params; main() wires backoff_max +
  max_attempts via env (prefix helper); non-main-thread signal-install
  failure -> WARNING.
- tests: poison drop, missing task_name, fairness-header propagation,
  multi-message batch, ack-no-row warn, construction validation, backoff
  growth/reset, poll-error resilience; fixed the read mock for read_ct.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review feedback addressed — 902bb42dc

All 9 comments (3 Important):

Important — correctness/resilience:

  • Fairness header now rebuilt from the payload on the PG run path (was silently dropped) → mirrors the Celery dispatch contract.
  • Poison guard: surfaced read_ct (dequeue RETURNING + QueueMessage) and drop a task that keeps failing past max_attempts (default 5, env-tunable) with a loud ERROR + payload, instead of redelivering forever.
  • Malformed payload (missing task_name) → distinct "missing task_name" drop log, not a misleading "unknown task None".

Suggestions:

  • run() wraps poll_once() — a transient blip backs off + continues instead of killing the loop.
  • delete() returning False (vt-exceeded → re-claim) now WARNs (possible double-run).
  • __init__ validates positive tuning params (fail at construction); main() wires backoff_max + max_attempts via env (prefix helper); off-main-thread signal failure → WARNING.

Tests: poison drop, missing task_name, fairness propagation, multi-message batch (order + selective ack), ack-no-row warn, construction validation, backoff growth/reset, poll-error resilience. Consumer suite now 14 tests.

57 queue_backend tests green; ruff + pre-commit clean; SonarCloud gate already passing.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 12, 2026 04:59
@greptile-apps

greptile-apps Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces PgQueueConsumer, the consumer half of the Postgres-based task queue integration that replaces Celery broker for PG-routed tasks. It closes the end-to-end path: enqueue → claim → execute in-process → ack (delete).

  • consumer.py (new): PgQueueConsumer polls one queue via PgQueueClient.read (SKIP LOCKED + VT), runs each claimed task eagerly with task.apply(..., throw=True), acks on success by deleting the row, and handles malformed/unregistered/poison messages as explicit drops. The run() loop adds exponential empty-queue backoff and SIGTERM/SIGINT graceful stop.
  • client.py (updated): _DEQUEUE_SQL now returns read_ct; QueueMessage gains a required read_ct: int field (no default, so a zero "never-claimed" value can't silently bypass poison detection); delete() demoted from WARNING to DEBUG since the consumer-level log carries the task name context.
  • Tests: Shared pg_conn/pg_client fixtures moved to conftest.py; unit suite (test_pg_queue_consumer.py) covers run+ack, fail→no-ack, unknown drop, empty, backoff, and graceful stop; one integration test exercises the full loop against real Postgres.

Confidence Score: 5/5

Safe to merge — the consumer is well-contained, all edge-cases (poison messages, unknown tasks, malformed payloads, VT expiry double-run) are handled and tested, and the previous review feedback has been fully addressed.

The consumer logic is straightforward and correct: default batch_size=1 sidesteps the shared-VT sequential-processing race, the required read_ct field closes the poison-guard bypass, the duplicate-warning issue is resolved by demoting client.delete() to DEBUG, and the backoff_max validation is in place. The test suite covers all identified failure modes including the full enqueue→poll→execute→ack integration path.

No files require special attention.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/consumer.py New consumer: poll_once → _handle → ack/drop/requeue. Default batch_size=1 documented to avoid shared-VT double-run. Poison cap, backoff validation, graceful-stop signal handler all look correct.
workers/queue_backend/pg_queue/client.py Added read_ct to RETURNING clause and QueueMessage (required field, no default). delete() demoted to DEBUG to avoid duplicate warning with consumer-level log. All changes are correct.
workers/tests/conftest.py Shared pg_conn / pg_client fixtures extracted here from per-file duplicates. Skip logic (unreachable DB, missing migration) is preserved correctly.
workers/tests/test_pg_queue_consumer.py New test file: unit tests cover run+ack, fail→no-ack, unknown drop, poison drop, malformed drop, fairness header, backoff, graceful stop, and multi-message batch. Integration test exercises full enqueue→poll→execute→ack with explicit cleanup in finally block.
workers/tests/test_dispatch_pg.py Local pg_client fixture removed; now delegates to shared conftest fixture. Functionality unchanged.
workers/tests/test_pg_queue_client.py Mock fetchall tuples updated to include read_ct column; local pg_conn fixture removed in favour of conftest. Second-connection test updated with a clear comment that TEST_DB_HOST is already set.

Sequence Diagram

sequenceDiagram
    participant Consumer as PgQueueConsumer
    participant Client as PgQueueClient
    participant DB as pg_queue_message
    participant Task as celery task

    Consumer->>Client: read(queue, vt_seconds, qty)
    Client->>DB: "UPDATE SET vt+=vt read_ct+=1 WHERE vt<=now() SKIP LOCKED LIMIT qty RETURNING"
    DB-->>Client: claimed rows committed
    Client-->>Consumer: list of QueueMessage

    alt missing task_name
        Consumer->>Client: delete(msg_id)
        Note right of Consumer: drop malformed
    else read_ct exceeds max_attempts
        Consumer->>Client: delete(msg_id)
        Note right of Consumer: drop poison
    else task not in registry
        Consumer->>Client: delete(msg_id)
        Note right of Consumer: drop unknown
    else task found
        Consumer->>Task: "task.apply args kwargs headers throw=True"
        alt task succeeds
            Task-->>Consumer: result
            Consumer->>Client: delete(msg_id)
            Client->>DB: DELETE WHERE msg_id
            Note right of Consumer: ack
        else task raises
            Task-->>Consumer: exception
            Note right of Consumer: leave row for VT redelivery
        end
    end

    alt queue was non-empty
        Consumer->>Consumer: reset backoff poll immediately
    else queue was empty
        Consumer->>Consumer: sleep backoff then grow to backoff_max
    end
Loading

Reviews (2): Last reviewed commit: "UN-3539 [FIX] Address Greptile review on..." | Re-trigger Greptile

Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/client.py Outdated
- [P1] batch shared-vt window: default batch_size 10 -> 1 so each message
  gets its own visibility window. The batch's vt is set atomically at claim
  but messages run sequentially, so with batch_size>1 the tail could exceed
  vt and be re-claimed mid-run (double-run). Batching stays opt-in; doc the
  vt > batch x worst-case-duration constraint.
- [P2] QueueMessage.read_ct: drop the misleading =0 default (a "never
  claimed" state the dequeue can't produce — read_ct is always >=1). 0 would
  silently bypass the poison guard; now required, all callers supply it.
- [P2] __init__: reject backoff_max < poll_interval (else min(poll*2, max)
  shrinks the backoff below poll_interval instead of growing).
- [P2] dedup the ack-miss warning: client.delete() now logs at DEBUG; the
  consumer keeps the contextual WARNING (it names the task).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit 12631c1 into feat/UN-3445-pg-queue-integration Jun 12, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3539-pg-queue-consumer branch June 12, 2026 05:49
muhammad-ali-e added a commit that referenced this pull request Jun 12, 2026
…guard (#2047)

* UN-3541 [FEAT] Wire PG-queue consumer into run-worker.sh + bootstrap guard

Make the 9c PG-queue consumer (UN-3539) runnable via the normal worker
flow, safely. Split from #2045 to keep that PR focused on consumer logic.

- Launcher (pg_queue_consumer/__main__.py): set WORKER_TYPE to the source
  worker (default notification) BEFORE `import worker`, so the right tasks
  register. worker.py loads exactly one worker type's tasks; a bare import
  would load the general worker's and drop every notification as unknown.
- run-worker.sh: `pg-queue-consumer` type runs `python -m pg_queue_consumer`
  (not a celery command) from the workers root; queue via env.
- Startup guard: PgQueueConsumer.run() refuses to start on an empty task
  registry — fail loud instead of silently dropping every message.
- Drop the hard-coded 8086 health port (consumer runs no health server).

Integration fixes found during live dev-test (real send_webhook_notification
end-to-end → Slack HTTP 200):
- Opt-in status: consumer is not part of `all`; shown only when running.
- Log path: detach writes to an absolute $worker_dir/$type.log so -L/-C
  find it (also fixes the same latent bug for pluggable workers).
- PID discovery: get_worker_pids matches the `python -m` invocation, so
  --status / -k / -r work for the consumer.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3541 [FIX] Address PR #2047 review feedback

- run-worker.sh: verify liveness after a detached launch (kill -0 + tail);
  `set -e` doesn't apply to `&`, so a fork that died on startup was reported
  as "started" — acute for the health-port-less consumer (High).
- consumer.py: log the registered application task names at startup so a
  *wrong* (non-empty but mismatched) registry is diagnosable — the guard only
  catches an *empty* one (Medium observability).
- consumer.py: type _env() with a TypeVar (was `cast: type -> object`,
  erasing types at the typed __init__) and name the offending var on a bad
  cast instead of a context-free ValueError (Medium).
- tests: add the guard's positive / require_tasks=False bypass / built-in
  filter arms (only the failure arm was covered).
- get_worker_pids: warn on pgrep rc>1 (operational/regex error) instead of
  collapsing it to "not running" (Low).
- run-worker.sh: extract the repeated "pg_queue_consumer" literal into a
  readonly constant (SonarCloud S1192).
- Comment accuracy: build_celery_app configures but does not import tasks;
  `notification` is the worker that owns the leaf task, not the task itself;
  generalise the "every notification dropped" example.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* UN-3541 [FIX] Address Greptile review feedback

- __main__.py: move the WORKER_TYPE mutation + `import worker` bootstrap into
  a guarded _bootstrap_and_run() called only under `__name__ == "__main__"`,
  so an accidental import (test/IDE/type-checker walking __main__) no longer
  overwrites WORKER_TYPE or triggers a full worker-app bootstrap.
- run-worker.sh: list `pg-queue-consumer` in the usage/--help worker types,
  plus a note for its WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE overrides.
- run-worker.sh: clarify the post-launch liveness check is a best-effort
  fast-fail for *immediate* (sub-second) crash-on-import/bad-config faults,
  not a connectivity check; kept general (an immediate crash can hit any
  worker) and noted the `all` subshells overlap the 1s with inter-launch sleep.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.