UN-3566 [FEAT] PG Queue 9f — multi-queue consumer + named run-worker.sh PG roles#2073
UN-3566 [FEAT] PG Queue 9f — multi-queue consumer + named run-worker.sh PG roles#2073muhammad-ali-e merged 5 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom feat/UN-3566-FEAT_multiqueue_pg_consumerZipstack/unstract:feat/UN-3566-FEAT_multiqueue_pg_consumerCopy head branch name to clipboard
Conversation
…sh PG roles Make PG-queue workers first-class in run-worker.sh, runnable individually or as a set, like the Celery workers — removing the per-process-env footgun. - Multi-queue consumer: PgQueueConsumer takes queue_names: list[str] and polls them round-robin (per-queue read preserves the dequeue index's top-N; no queue starves another). WORKER_PG_QUEUE_CONSUMER_QUEUE is comma-parsed; a single value stays a one-element list (back-compat with the existing leaf consumer). - run-worker.sh named roles (PG_CONSUMER_ROLES), each a registry-bound consumer with its source worker-type + queue list baked in (no manual env): pg-orchestrator-api / pg-orchestrator-general (async_execute_bin has distinct impls per registry → split), pg-fileproc / pg-callback (multi-queue, one process drains ETL+API). The role name rides in argv (python -m pg_queue_consumer <role>) so pgrep (-s/-k/-r) tells co-running roles apart. - `./run-worker.sh -d pg` launches the 4 pipeline roles + reaper (mirrors `all`); each role also runs/kills/restarts/tails individually. resolve_log_file + the -L pg set + status/kill enumerate the roles. Tests: round-robin aggregation, empty-queue-no-starve, empty-list reject, comma-parse (incl single-value back-compat); existing consumer tests updated to the list arg. Dev-tested: `./run-worker.sh -d pg` → status shows 4 roles + reaper RUNNING; an API execution drains orchestrator-api → fileproc (draining both file_processing and api_file_processing) → callback → COMPLETED; `-r pg-fileproc` restarts only that role, leaving pg-callback untouched. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
… consumer pgrep - Multiple positional worker types: `./run-worker.sh -d all pg` starts the Celery set and the PG set in one shot (loops WORKER_TYPES). Warns if >1 type is given without -d, since a non-detached set `wait`s and would block the rest. - Tighten the generic pg_queue_consumer pgrep to end-anchored so --status/-k/-r for the generic consumer no longer also match (and aggregate/kill) the named role consumers, which run as `python -m pg_queue_consumer <role>` and have their own role-anchored pattern. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Define PG_ROLE_ORCH_API / _ORCH_GENERAL / _FILEPROC / _CALLBACK once and reference them in PG_CONSUMER_ROLES, PG_QUEUE_MEMBERS and WORKERS, instead of repeating each role-name string literal 4×. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…metric with 'pg') `./run-worker.sh -d celery` now runs the Celery set (== `all`, excludes the PG workers), so celery-only / pg-only / both read symmetrically: -d celery (Celery only) -d pg (PG only) -d celery pg | -d all pg (both) 'celery' maps to "all" in WORKERS → dispatches to run_all_workers, is skipped by list_core_worker_dirs (no phantom dir), and falls through to the restart-all branch under -r. The pre-existing `-L celery` log-tail alias is unaffected. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier). Findings below are inline. The single blocker is the -r pg restart bug (duplicate role consumers double-polling Postgres); everything else is High/Medium/Low. Line anchors point at the nearest diffed line — exact file:line is called out in each comment body where it differs.
…, stale comments Critical/High + cleanup from the #2073 review round: - [Critical] `-r pg` (restart set) now kills the SAME members run_pg_queue_set launches (the 4 named roles + reaper), not just the generic consumer — was leaving the roles running, so a relaunch double-polled Postgres. - [High] poll_once isolates each queue: a read/handle failure on one queue is logged and skipped so the others still run AND the work already done this cycle still counts (no false empty-queue backoff after a partial failure). - [High] fixed the stale CELERY_SET comment that claimed "no 'celery' run alias". - [Medium-High] run_pg_queue_set start-failure teardown aggregates kill_one_worker returns and surfaces a survivor (mirrors the restart path). - [Medium] de-dup + copy queue_names (list(dict.fromkeys(...))) — a duplicate would double-read a queue; the copy stops a caller mutation bypassing validation. - [Medium] _parse_queue_list warns when it drops empty entries (config typo). - [Med/Low] comment fixes: only fileproc/callback are multi-queue; "4 roles + reaper" not "consumer + reaper"; "read once per cycle in list order" not round-robin. Cleanups: drop redundant export; cwd predicate uses PG_QUEUE_MEMBERS; spell out api_file_processing_callback in help. Tests: +one-queue-failing-isolation, +batch_size qty per queue, +duplicate dedup. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review round addressed —
|
|
5f4cdf0
into
feat/UN-3445-pg-queue-integration
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/consumer.py | Signature changed from single queue_name: str to queue_names: list[str]; poll_once() now iterates per-queue with per-queue exception isolation; _parse_queue_list added for comma-separated env parsing; all tests updated. No correctness issues found. |
| workers/run-worker.sh | Adds 4 named PG consumer roles (PG_CONSUMER_ROLES map), multi-type launch support, and matching pgrep patterns. One omission: the new roles are absent from OPTIN_WORKERS, causing misleading STOPPED status after all. |
| workers/tests/test_pg_queue_consumer.py | All existing call-sites updated to list API; new TestMultiQueue class covers round-robin, starvation, failure isolation, dedup, and batch_size propagation; _parse_queue_list tested for back-compat and edge cases. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant S as run-worker.sh
participant PGS as run_pg_queue_set()
participant RW as run_worker()
participant ENV as Environment
participant L as pg_queue_consumer/__main__.py
participant C as PgQueueConsumer
S->>PGS: -d pg (or pg-queue)
loop for each role in PG_CONSUMER_ROLES + reaper
PGS->>RW: "worker_type = e.g. pg-fileproc"
RW->>ENV: "export WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=file_processing"
RW->>ENV: "export WORKER_PG_QUEUE_CONSUMER_QUEUE=file_processing,api_file_processing"
RW->>L: uv run python -m pg_queue_consumer pg-fileproc
L->>ENV: "os.environ[WORKER_TYPE] = WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE"
L->>L: import worker (registers file_processing tasks)
L->>C: "PgQueueConsumer(queue_names=[file_processing, api_file_processing])"
loop poll cycle
C->>C: poll_once()
C->>C: read(file_processing) → handle messages
C->>C: read(api_file_processing) → handle messages
end
end
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant S as run-worker.sh
participant PGS as run_pg_queue_set()
participant RW as run_worker()
participant ENV as Environment
participant L as pg_queue_consumer/__main__.py
participant C as PgQueueConsumer
S->>PGS: -d pg (or pg-queue)
loop for each role in PG_CONSUMER_ROLES + reaper
PGS->>RW: "worker_type = e.g. pg-fileproc"
RW->>ENV: "export WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=file_processing"
RW->>ENV: "export WORKER_PG_QUEUE_CONSUMER_QUEUE=file_processing,api_file_processing"
RW->>L: uv run python -m pg_queue_consumer pg-fileproc
L->>ENV: "os.environ[WORKER_TYPE] = WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE"
L->>L: import worker (registers file_processing tasks)
L->>C: "PgQueueConsumer(queue_names=[file_processing, api_file_processing])"
loop poll cycle
C->>C: poll_once()
C->>C: read(file_processing) → handle messages
C->>C: read(api_file_processing) → handle messages
end
end
Comments Outside Diff (1)
-
workers/run-worker.sh, line 169-172 (link)Named PG roles missing from
OPTIN_WORKERSThe four new pipeline consumer roles (
pg-orchestrator-api,pg-orchestrator-general,pg-fileproc,pg-callback) are inWORKERSandPG_QUEUE_MEMBERSbut not inOPTIN_WORKERS. The opt-in guard inshow_status()(line 654) suppresses STOPPED status only for workers listed there — so after./run-worker.sh all, all four roles will appear as STOPPED even though they're intentionally not part of the Celery set. The existingpg-queue-consumerandpg-queue-reaperare already inOPTIN_WORKERSfor this exact reason; the named roles follow the same opt-in pattern and should be treated identically.Prompt To Fix With AI
This is a comment left during a code review. Path: workers/run-worker.sh Line: 169-172 Comment: **Named PG roles missing from `OPTIN_WORKERS`** The four new pipeline consumer roles (`pg-orchestrator-api`, `pg-orchestrator-general`, `pg-fileproc`, `pg-callback`) are in `WORKERS` and `PG_QUEUE_MEMBERS` but not in `OPTIN_WORKERS`. The opt-in guard in `show_status()` (line 654) suppresses STOPPED status only for workers listed there — so after `./run-worker.sh all`, all four roles will appear as STOPPED even though they're intentionally not part of the Celery set. The existing `pg-queue-consumer` and `pg-queue-reaper` are already in `OPTIN_WORKERS` for this exact reason; the named roles follow the same opt-in pattern and should be treated identically. How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
workers/run-worker.sh:169-172
**Named PG roles missing from `OPTIN_WORKERS`**
The four new pipeline consumer roles (`pg-orchestrator-api`, `pg-orchestrator-general`, `pg-fileproc`, `pg-callback`) are in `WORKERS` and `PG_QUEUE_MEMBERS` but not in `OPTIN_WORKERS`. The opt-in guard in `show_status()` (line 654) suppresses STOPPED status only for workers listed there — so after `./run-worker.sh all`, all four roles will appear as STOPPED even though they're intentionally not part of the Celery set. The existing `pg-queue-consumer` and `pg-queue-reaper` are already in `OPTIN_WORKERS` for this exact reason; the named roles follow the same opt-in pattern and should be treated identically.
```suggestion
declare -A OPTIN_WORKERS=(
["$PG_QUEUE_CONSUMER_TYPE"]=1
["$PG_QUEUE_REAPER_TYPE"]=1
["$PG_ROLE_ORCH_API"]=1
["$PG_ROLE_ORCH_GENERAL"]=1
["$PG_ROLE_FILEPROC"]=1
["$PG_ROLE_CALLBACK"]=1
)
```
Reviews (1): Last reviewed commit: "UN-3566 address review: -r pg kills role..." | Re-trigger Greptile
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…trator loop (per-schedule ownership) (#2081) * UN-3596 [FEAT] PG Queue 9h-b — PG scheduler tick in the reaper/orchestrator loop (per-schedule ownership) Adds the periodic-trigger half of the orchestrator: the leader-elected reaper now also fires due, PG-owned schedules onto the PG queue — the Celery Beat replacement — without Beat/RabbitMQ in the trigger path. Dark by default; never double-fires. - pg_scheduler.py (new): dispatch_due_schedules() scans pg_periodic_schedule for pg_owned + enabled + due rows, enqueues scheduler.tasks.execute_pipeline_task on the PG `scheduler` queue AND advances next_run_at in ONE transaction (a crash between can't re-fire). A NULL next_run_at records a baseline and does NOT fire (no burst when a schedule is handed over; matches Beat). A bad cron on one row is logged and skipped without blocking the others. croniter computes next-run; all time comparisons use the DB clock. - pg_queue/models.py + migration 0009: pg_owned flag (default False = Beat owns it; the PG scheduler fires only owned rows) + due index (pg_owned, enabled, next_run_at). Default-false keeps the table inert until a schedule is handed over, and a schedule fires from exactly one side — never both. - reaper.py: the leader tick runs the scheduler AFTER recovery (a scheduler error can't starve the recovery net). - workers deps: add croniter (already a backend dep). - run-worker.sh + docker-compose: pg-scheduler consumer role + service (profile-gated) that runs the fired execute_pipeline_task. Out of scope (next slice ②c): the ramp control that flips pg_owned by percentage + disables the matching Beat PeriodicTask atomically (reusing the existing Flipt mechanism), the one-time backfill, and retiring Beat. Non-regression: pg_owned defaults False, so the reaper fires nothing until rows are explicitly owned; recovery-only behaviour is unchanged. Tests: 10 scheduler (real-PG) + 3 reaper-wiring; full reaper suite kept green via a scheduler stub. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3596 address review: per-row DB isolation, self-quiescing bad cron, typed rows, shared INSERT SQL, stronger tests - [Critical] per-row fire (INSERT+UPDATE+commit) now wrapped in try/except → rollback + log + continue, so one bad row can't poison the connection or drop the rest of the batch (mirrors recover_expired_barriers). Baseline UPDATE too. - [High] invalid cron now disables the row (enabled=FALSE) + logs once, instead of re-selecting it and emitting a traceback every ~5s tick forever. - [Med] read step (SELECT now() + due scan) wraps rollback + re-raise so the conn isn't handed back in an aborted-txn state. - [High] softened the "never double-fires" docstring + models.py comment: the guarantee is CONDITIONAL on the ②c ramp control disabling Beat; pre-ramp, safety rests on pg_owned defaulting to False. - [Med] _build_trigger_payload -> TaskPayload; workflow_id/pipeline_id typed str | uuid.UUID (| None); _DueSchedule NamedTuple binds SELECT columns to names at one site (no silent misassign on a reorder). - [Med] extracted INSERT_MESSAGE_SQL constant in client.py; send() and the scheduler share it (no verbatim SQL duplication). - [Low] comment fixes: reaper tick (ordering not isolation), execute_pipeline_task blanks vs Beat populating execution_action, models.py drop "next slice". - tests: fired == 1 (not >= 1, catches double-fire); next_run asserted at the cron's 09:00 match; baseline asserts == 0; +tz-aware next-run; +multi-row (fired == 2); +atomicity (advance UPDATE fails post-INSERT → enqueue rolls back, next_run unchanged); +reaper scheduler-error-discards-owned-conn. 75 green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3596 chore: drop accidentally-committed 9f-design.md (untracked scoping doc) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3596 address greptile P1: roll back if the bad-cron disable UPDATE fails _quiesce_invalid_cron used contextlib.suppress around the cursor block, so if the enabled=FALSE UPDATE raised, commit() was skipped and the connection was left in an aborted-transaction state — poisoning the NEXT row's INSERT (caught by the outer handler and mislogged as "failed to fire"). Wrap in try/except with conn.rollback() on failure so the connection is always clean for the next row. +test: a forced disable-UPDATE failure on a bad-cron row doesn't stop a following healthy row from firing. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3596 chore: remove superseded 9f-design.md (impl merged in #2073) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
What
PgQueueConsumernow takesqueue_names: list[str]and polls them round-robin — one process can drain several queues (e.g. ETL + API fan-out) instead of one-process-per-queue.run-worker.sh: the coupled pipeline's PG consumers are first-class, runnable individually or as a set, like the Celery workers — no per-process env.Why
WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE+-q+ distinct-p(a repeated footgun). This makes./run-worker.sh -d pg(all) and./run-worker.sh -d pg-fileproc(individual) work just like-d all/-d file. Part of PG Queue Phase 9 (UN-3536), sub-task UN-3566.How
queue_namesround-robin (read()per queue preserves the(queue_name, priority DESC, msg_id)index's top-N; no queue starves another).WORKER_PG_QUEUE_CONSUMER_QUEUEis comma-parsed; a single value stays a one-element list → back-compat with the existing leaf consumer.PG_CONSUMER_ROLES,"<source_worker_type>;<queues>"):pg-orchestrator-api(api_deployment →celery_api_deployments) andpg-orchestrator-general(general →celery) — split becauseasync_execute_binhas distinct impls per registry (api-deployment handles it as an API deployment; general routes by workflow type), so one consumer can't serve both.pg-fileproc(file_processing →file_processing,api_file_processing) andpg-callback(callback → both callback queues) — multi-queue, one process each covering ETL + API.python -m pg_queue_consumer <role>) sopgrep(-s/-k/-r) distinguishes the otherwise-identical processes. Roles share thepg_queue_consumer/launcher dir;resolve_log_file+-L pgknow the role log path../run-worker.sh -d pglaunches the 4 pipeline roles + reaper (mirrorsall);-s/-k/-r/-Loperate per role.Can this PR break any existing features?
pg-queue-consumeralias are unaffected. No schema/API change.Database Migrations
Env Config
WORKER_PG_QUEUE_CONSUMER_QUEUEnow accepts a comma-separated list (single value unchanged).Notes on Testing
./run-worker.sh -d pg→-sshows the 4 roles + reaper RUNNING; an API execution drained orchestrator-api → pg-fileproc (one process draining bothfile_processingandapi_file_processing) → pg-callback → COMPLETED;-r pg-fileprocrestarted only that role (pg-callback PID unchanged).Related Issues
Checklist
🤖 Generated with Claude Code