UN-3539 [FEAT] PG Queue Phase 9c — consumer poll loop (claim → run → ack)#2045
UN-3539 [FEAT] PG Queue Phase 9c — consumer poll loop (claim → run → ack)#2045muhammad-ali-e merged 4 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3539-pg-queue-consumerZipstack/unstract:UN-3539-pg-queue-consumerCopy head branch name to clipboard
Conversation
…ack) PgQueueConsumer drains pg_queue_message and runs each claimed task in-process: poll_once() claims a batch (SKIP LOCKED + vt via PgQueueClient.read), runs it via current_app.tasks[name].apply(throw=True), and acks by deleting on success. Task failure -> leave the row (vt expiry redelivers, at-least-once); unknown task -> drop + error (no poison loop). run() adds an empty-queue backoff loop + SIGTERM/SIGINT graceful stop; main() is a `python -m` entrypoint (env-configured). Completes the leaf-first end-to-end path: 8a route -> 9b enqueue -> 9a store -> 9c consume+run. Validated live on the dev stack: a real send_webhook_notification routed to PG was claimed by the consumer, POSTed to Slack (HTTP 200), and acked (row removed). Deployment note: the consumer PROCESS must bootstrap the worker app (import the task modules) so current_app.tasks resolves the task — an entrypoint/rollout concern, not consumer logic. Documented in the module docstring; the rollout phase wires a consumer container that boots the app. Tests: 5 unit (run+ack, fail->no-ack, unknown->drop, empty, graceful stop) + 1 real-PG integration (enqueue -> poll -> execute -> ack). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…Cloud) The connect-to-dev-DB + skip-if-unreachable/unmigrated block was copy-pasted across test_pg_queue_client / test_dispatch_pg / test_pg_queue_consumer (6.3% duplication on new code, over the 3% gate). Extracted into shared pg_conn / pg_client fixtures + an integration_pg_conn() helper in tests/conftest.py; the three files now use them. Behaviour unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated review — PG Queue Phase 9c consumer
Ran six specialised review passes (code review, silent-failure hunt, type design, test coverage, comment accuracy, simplification) over the 5-file diff. The consumer is clean, well-documented, and the conftest dedup is correct. Findings below are posted inline.
Headline: five of six passes independently flagged the same issue — the consumer silently drops the fairness (and queue) payload field that the producer serialises and that task_payload.py documents the consumer as rebuilding into the x-fairness-key header. That is the one I'd resolve (implement it, or correct the contract docstring) before merge. Everything else is Important/Suggestion-grade.
- fairness header rebuilt from the payload on the PG run path (was dropped) — a PG-routed run now mirrors the Celery dispatch contract. - poison-message guard: surface read_ct (QueueMessage + dequeue RETURNING), drop a task that keeps failing past max_attempts (default 5, env WORKER_PG_QUEUE_CONSUMER_MAX_ATTEMPTS) with a loud ERROR carrying the payload, instead of redelivering forever. - malformed payload (missing task_name) -> distinct "missing task_name" drop log, not a misleading "unknown task None". - run() wraps poll_once() so a transient read/DB blip backs off and continues instead of tearing down the loop (the client self-recovers). - ack: WARN when delete() finds no row (task exceeded vt -> possible double-run). - __init__ validates positive tuning params; main() wires backoff_max + max_attempts via env (prefix helper); non-main-thread signal-install failure -> WARNING. - tests: poison drop, missing task_name, fairness-header propagation, multi-message batch, ack-no-row warn, construction validation, backoff growth/reset, poll-error resilience; fixed the read mock for read_ct. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Review feedback addressed —
|
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/consumer.py | New consumer: poll_once → _handle → ack/drop/requeue. Default batch_size=1 documented to avoid shared-VT double-run. Poison cap, backoff validation, graceful-stop signal handler all look correct. |
| workers/queue_backend/pg_queue/client.py | Added read_ct to RETURNING clause and QueueMessage (required field, no default). delete() demoted to DEBUG to avoid duplicate warning with consumer-level log. All changes are correct. |
| workers/tests/conftest.py | Shared pg_conn / pg_client fixtures extracted here from per-file duplicates. Skip logic (unreachable DB, missing migration) is preserved correctly. |
| workers/tests/test_pg_queue_consumer.py | New test file: unit tests cover run+ack, fail→no-ack, unknown drop, poison drop, malformed drop, fairness header, backoff, graceful stop, and multi-message batch. Integration test exercises full enqueue→poll→execute→ack with explicit cleanup in finally block. |
| workers/tests/test_dispatch_pg.py | Local pg_client fixture removed; now delegates to shared conftest fixture. Functionality unchanged. |
| workers/tests/test_pg_queue_client.py | Mock fetchall tuples updated to include read_ct column; local pg_conn fixture removed in favour of conftest. Second-connection test updated with a clear comment that TEST_DB_HOST is already set. |
Sequence Diagram
sequenceDiagram
participant Consumer as PgQueueConsumer
participant Client as PgQueueClient
participant DB as pg_queue_message
participant Task as celery task
Consumer->>Client: read(queue, vt_seconds, qty)
Client->>DB: "UPDATE SET vt+=vt read_ct+=1 WHERE vt<=now() SKIP LOCKED LIMIT qty RETURNING"
DB-->>Client: claimed rows committed
Client-->>Consumer: list of QueueMessage
alt missing task_name
Consumer->>Client: delete(msg_id)
Note right of Consumer: drop malformed
else read_ct exceeds max_attempts
Consumer->>Client: delete(msg_id)
Note right of Consumer: drop poison
else task not in registry
Consumer->>Client: delete(msg_id)
Note right of Consumer: drop unknown
else task found
Consumer->>Task: "task.apply args kwargs headers throw=True"
alt task succeeds
Task-->>Consumer: result
Consumer->>Client: delete(msg_id)
Client->>DB: DELETE WHERE msg_id
Note right of Consumer: ack
else task raises
Task-->>Consumer: exception
Note right of Consumer: leave row for VT redelivery
end
end
alt queue was non-empty
Consumer->>Consumer: reset backoff poll immediately
else queue was empty
Consumer->>Consumer: sleep backoff then grow to backoff_max
end
Reviews (2): Last reviewed commit: "UN-3539 [FIX] Address Greptile review on..." | Re-trigger Greptile
- [P1] batch shared-vt window: default batch_size 10 -> 1 so each message gets its own visibility window. The batch's vt is set atomically at claim but messages run sequentially, so with batch_size>1 the tail could exceed vt and be re-claimed mid-run (double-run). Batching stays opt-in; doc the vt > batch x worst-case-duration constraint. - [P2] QueueMessage.read_ct: drop the misleading =0 default (a "never claimed" state the dequeue can't produce — read_ct is always >=1). 0 would silently bypass the poison guard; now required, all callers supply it. - [P2] __init__: reject backoff_max < poll_interval (else min(poll*2, max) shrinks the backoff below poll_interval instead of growing). - [P2] dedup the ack-miss warning: client.delete() now logs at DEBUG; the consumer keeps the contextual WARNING (it names the task). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
12631c1
into
feat/UN-3445-pg-queue-integration
…guard (#2047) * UN-3541 [FEAT] Wire PG-queue consumer into run-worker.sh + bootstrap guard Make the 9c PG-queue consumer (UN-3539) runnable via the normal worker flow, safely. Split from #2045 to keep that PR focused on consumer logic. - Launcher (pg_queue_consumer/__main__.py): set WORKER_TYPE to the source worker (default notification) BEFORE `import worker`, so the right tasks register. worker.py loads exactly one worker type's tasks; a bare import would load the general worker's and drop every notification as unknown. - run-worker.sh: `pg-queue-consumer` type runs `python -m pg_queue_consumer` (not a celery command) from the workers root; queue via env. - Startup guard: PgQueueConsumer.run() refuses to start on an empty task registry — fail loud instead of silently dropping every message. - Drop the hard-coded 8086 health port (consumer runs no health server). Integration fixes found during live dev-test (real send_webhook_notification end-to-end → Slack HTTP 200): - Opt-in status: consumer is not part of `all`; shown only when running. - Log path: detach writes to an absolute $worker_dir/$type.log so -L/-C find it (also fixes the same latent bug for pluggable workers). - PID discovery: get_worker_pids matches the `python -m` invocation, so --status / -k / -r work for the consumer. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * UN-3541 [FIX] Address PR #2047 review feedback - run-worker.sh: verify liveness after a detached launch (kill -0 + tail); `set -e` doesn't apply to `&`, so a fork that died on startup was reported as "started" — acute for the health-port-less consumer (High). - consumer.py: log the registered application task names at startup so a *wrong* (non-empty but mismatched) registry is diagnosable — the guard only catches an *empty* one (Medium observability). - consumer.py: type _env() with a TypeVar (was `cast: type -> object`, erasing types at the typed __init__) and name the offending var on a bad cast instead of a context-free ValueError (Medium). - tests: add the guard's positive / require_tasks=False bypass / built-in filter arms (only the failure arm was covered). - get_worker_pids: warn on pgrep rc>1 (operational/regex error) instead of collapsing it to "not running" (Low). - run-worker.sh: extract the repeated "pg_queue_consumer" literal into a readonly constant (SonarCloud S1192). - Comment accuracy: build_celery_app configures but does not import tasks; `notification` is the worker that owns the leaf task, not the task itself; generalise the "every notification dropped" example. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * UN-3541 [FIX] Address Greptile review feedback - __main__.py: move the WORKER_TYPE mutation + `import worker` bootstrap into a guarded _bootstrap_and_run() called only under `__name__ == "__main__"`, so an accidental import (test/IDE/type-checker walking __main__) no longer overwrites WORKER_TYPE or triggers a full worker-app bootstrap. - run-worker.sh: list `pg-queue-consumer` in the usage/--help worker types, plus a note for its WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE overrides. - run-worker.sh: clarify the post-launch liveness check is a best-effort fast-fail for *immediate* (sub-second) crash-on-import/bad-config faults, not a connectivity check; kept general (an immediate crash can hit any worker) and noted the `all` subshells overlap the 1s with inter-launch sleep. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
What
The consumer half — makes a PG-routed task actually execute.
PgQueueConsumerdrainspg_queue_messageand runs each claimed task in-process. Completes the leaf-first end-to-end path: 8a route → 9b enqueue → 9a store → 9c consume+run.Targets
feat/UN-3445-pg-queue-integration.Validated live (dev stack)
A real API execution's Slack webhook was opted into PG and the full loop ran end-to-end:
A production task executed entirely off Postgres, no Celery broker.
Changes
queue_backend/pg_queue/consumer.py(new) —PgQueueConsumer:poll_once()— claims a batch viaPgQueueClient.read(SKIP LOCKED + visibility timeout), runs each task viacurrent_app.tasks[name].apply(args, kwargs, throw=True)(executes the body in-process), and acks bydelete()on success.vtexpires and it's redelivered (at-least-once → tasks must be idempotent). Unknown task → drop +logger.error(no poison loop).run()— poll loop with empty-queue backoff + SIGTERM/SIGINT graceful stop.main()—python -m queue_backend.pg_queue.consumer, env-configured.Deployment note (not consumer logic)
The consumer process must bootstrap the worker app (import the task modules) so
current_app.tasksresolves the task — otherwise it treats every task as unknown. This is an entrypoint/rollout concern: the rollout phase wires aworker-pg-queue-consumercontainer that boots the worker app before polling. Documented in the module docstring.Out of scope (later)
read_ctcap; multi-queue fan-out; the single-orchestrator + fair admission (9d); container/K8s wiring (rollout).Ticket: UN-3539 (parent UN-3536, epic UN-3445)
🤖 Generated with Claude Code