UN-3555 [FEAT] PG Queue 9d slice-2 followup — run-worker.sh reaper type + pg-queue set#2059
UN-3555 [FEAT] PG Queue 9d slice-2 followup — run-worker.sh reaper type + pg-queue set#2059muhammad-ali-e merged 3 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3555-reaper-run-worker-wiringZipstack/unstract:UN-3555-reaper-run-worker-wiringCopy head branch name to clipboard
Conversation
…pe + pg-queue set
Wires the reaper (UN-3554) into run-worker.sh and adds a pg-queue set so the
whole PG-queue group launches in one shot. Mirrors the 9c -> 9c-followup split
(launcher wiring as its own slice). Liveness probe is a separate follow-on.
- workers/pg_queue_reaper/: thin entrypoint package (python -m pg_queue_reaper
-> queue_backend.pg_queue.reaper.main). No worker-app bootstrap (the reaper
runs no Celery tasks), unlike pg_queue_consumer; exists so the process has a
stable name run-worker.sh can launch + pgrep-match.
- run-worker.sh:
- reaper / pg-queue-reaper type — opt-in (NOT in `all`), launches
`python -m pg_queue_reaper`, runs from workers root, --status/-k/-r match
via the `-m` pgrep branch (now covers consumer + reaper). Lease/interval env
documented in --help.
- pg / pg-queue SET — run_pg_queue_set launches consumer + reaper together
(always detached, like `all`). Restart (-r pg-queue) kills both members then
relaunches. list_core_worker_dirs skips the set alias (no phantom status
entry). Help documents the Celery `all` set and the PG `pg-queue` set as
independent, runnable in parallel for a dual-transport (strangler-fig) setup.
Dev-tested live: `run-worker.sh reaper -d` acquires leadership + ticks and shows
RUNNING in --status; `run-worker.sh pg-queue` brings up consumer + reaper, both
RUNNING, reaper leader, no phantom set entry; bash -n clean; --help renders.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).
Overall this is a clean, well-commented wiring slice that faithfully mirrors the existing pg_queue_consumer pattern. No bugs that break the happy path. The themes worth addressing before merge are observability of failures in the new pg-queue set launcher/restart path, and one misleading docstring. The code-simplifier found nothing actionable (the repeated blocks are intentional and clearer left explicit). Inline findings below, prioritised.
…-kill guard Toolkit review on #2059: - [High] run_pg_queue_set swallowed member start-failures (backgrounded subshells' status was lost, banner+return 0 unconditional). Now each member runs in a FOREGROUND subshell so run_worker's own `return 1` on a crash-on-start is captured; the set returns non-zero if any member fails. (The reviewer's kill -0 on `$!` would false-fail — that PID is the launcher subshell, which exits the instant it backgrounds the nohup'd worker; the foreground-subshell return value is the correct signal and isolates `cd`.) Documented that the set always runs detached (ignores -d). - [Medium] Dispatch now `|| exit 1` so a member start-failure reaches the script exit code — the only programmatic startup signal (reaper has no health port yet). - [Medium] Set-restart aggregates kill_one_worker failures and aborts the relaunch if a member survives SIGKILL (avoids a duplicate consumer double-polling Postgres). Mirrors kill_workers' discipline. - [Medium/minor] Startup banner prints `Queues: n/a` when empty (reaper). - [Low] Reworded pg_queue_reaper/__main__ docstring: the launcher DOES export WORKER_TYPE for every worker; the accurate claim is the reaper neither reads nor mutates it (vs the consumer overwriting it before `import worker`). - [Low] Added a smoke test that pg_queue_reaper.__main__ re-exports the real reaper main (guards the `python -m pg_queue_reaper` launch path against an ImportError regression). Dev-tested: `run-worker.sh pg-queue` returns exit 0 with both members up; 24 reaper tests pass; bash -n + ruff clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review addressed — commit
|
|
| Filename | Overview |
|---|---|
| workers/pg_queue_reaper/init.py | Thin package docstring only; mirrors pg_queue_consumer/init.py structure exactly. |
| workers/pg_queue_reaper/main.py | Top-level import of reaper main at module scope (outside name guard), unlike consumer which guards inside _bootstrap_and_run(). Intentional — reaper needs no heavy bootstrap; test suite already imports the same module at top level so import-time safety is verified. |
| workers/run-worker.sh | Comprehensive wiring for reaper type and pg-queue set; pgrep, kill, status, logs, and restart paths all updated consistently. Minor: run_pg_queue_set always detaches (no foreground/wait form) while run_all_workers supports both modes; --help doesn't document this. |
| workers/tests/test_pg_reaper.py | Adds TestEntryPoint that pins module.main identity against the real reaper main; correctly uses importlib to avoid triggering the if name == 'main' guard. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["run-worker.sh $WORKER_TYPE"] --> B{WORKERS lookup}
B -- "pg / pg-queue" --> C[run_pg_queue_set]
B -- "reaper / pg-queue-reaper" --> D[run_worker pg_queue_reaper]
B -- "all" --> E[run_all_workers]
B -- other --> F[run_worker type]
C --> G["subshell: run_worker pg_queue_consumer detach=true"]
C --> H["subshell: run_worker pg_queue_reaper detach=true"]
G --> I{crash within 1s?}
H --> J{crash within 1s?}
I -- yes --> K[failed=1]
J -- yes --> K
K --> L[kill_one_worker consumer + reaper]
L --> M[return 1 / exit 1]
I -- no --> N[consumer RUNNING]
J -- no --> O[reaper RUNNING leader-elected]
D --> P["uv run python -m pg_queue_reaper"]
P --> Q[queue_backend.pg_queue.reaper.main]
Q --> R[SQL recovery loop]
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
A["run-worker.sh $WORKER_TYPE"] --> B{WORKERS lookup}
B -- "pg / pg-queue" --> C[run_pg_queue_set]
B -- "reaper / pg-queue-reaper" --> D[run_worker pg_queue_reaper]
B -- "all" --> E[run_all_workers]
B -- other --> F[run_worker type]
C --> G["subshell: run_worker pg_queue_consumer detach=true"]
C --> H["subshell: run_worker pg_queue_reaper detach=true"]
G --> I{crash within 1s?}
H --> J{crash within 1s?}
I -- yes --> K[failed=1]
J -- yes --> K
K --> L[kill_one_worker consumer + reaper]
L --> M[return 1 / exit 1]
I -- no --> N[consumer RUNNING]
J -- no --> O[reaper RUNNING leader-elected]
D --> P["uv run python -m pg_queue_reaper"]
P --> Q[queue_backend.pg_queue.reaper.main]
Q --> R[SQL recovery loop]
Reviews (2): Last reviewed commit: "UN-3555 [FEAT] Address Greptile: set par..." | Re-trigger Greptile
…set alias - run_pg_queue_set: on a partial start-failure (one member up, the other crashed) tear the whole set down before returning 1 — kill both members so a restart-on-failure relaunch can't spawn a second instance over the survivor (the consumer would double-poll Postgres). All-or-nothing, mirroring the restart path's discipline. - tail_logs: handle the pg/pg-queue set alias — `--logs pg-queue` now tails both member logs (pg_queue_consumer + pg_queue_reaper) instead of looking for a non-existent workers/pg-queue/pg-queue.log and printing a misleading "no log file" error. Mirrors how list_core_worker_dirs skips the set value. Dev-tested: `--logs pg-queue` tails 2 files (consumer + reaper); bash -n clean; 24 reaper tests pass. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Greptile review addressed — commit
|
|
35c735f
into
feat/UN-3445-pg-queue-integration
…ader) (#2061) * UN-3556 [FEAT] PG Queue 9d — reaper liveness probe (heartbeat + is_leader) Closes the gap flagged in #2059's review: a reaper that crashes after startup was invisible (opt-in skip in --status + no health port). Mirrors the consumer's liveness (UN-3544). - PgReaper heartbeat: _last_tick_monotonic stamped at the START of every tick (a standby tick counts as progress — liveness tracks the loop, not leadership) + seconds_since_last_tick() / is_tick_stale(). - ReaperLivenessServer: lean HTTP probe (mirrors the consumer's LivenessServer) — /health (also /healthz, /livez) returns 200 while the tick loop is fresh, 503 when stale. Payload also surfaces is_leader (which pod holds the lease — useful for 9e debugging). The 200/503 verdict is PURELY the heartbeat, never leadership (a standby is healthy) or DB reachability (a blip must not crash-loop a fine process). - main() wires it from WORKER_PG_REAPER_HEALTH_PORT (unset → no server, no stray port); staleness window from WORKER_PG_REAPER_HEALTH_STALE_SECONDS (default 30s, comfortably above the 5s tick interval). Bind failure degrades gracefully (logs, runs probe-less). - run-worker.sh: reserve port 8086 for the reaper, export the health-port env in the reaper special-case, document the two new env vars in --help. - Tests (+13, 37 total): heartbeat fresh/stale + tick refresh; liveness server 200-fresh / 503-stale / is_leader reflected / 404 / double-start; health staleness env default+override+invalid; server-disabled-when-no-port. Dev-tested live: `run-worker.sh reaper -d` → GET :8086/health → {"status":"healthy","check":"pg_reaper_tick","is_leader":true,...}. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3556 [REFACTOR] Extract shared LivenessServer (SonarCloud duplication) SonarCloud flagged the new ReaperLivenessServer as duplicating the consumer's LivenessServer (~19 lines of HTTP-probe boilerplate, over the 3% new-code gate). Extracted one generic LivenessServer into queue_backend/pg_queue/liveness.py — parameterised by a freshness callable + the payload's check/age labels + an optional extra-status callable (the reaper's is_leader). Both sides are now thin subclasses that preserve their exact constructor signatures and wire payloads: - consumer LivenessServer(consumer, port=, stale_after=) → check="pg_queue_poll", seconds_since_last_poll (unchanged on the wire; its tests pass untouched). - ReaperLivenessServer(reaper, port=, stale_after=) → check="pg_reaper_tick", seconds_since_last_tick, is_leader. The boilerplate now lives once → duplication cleared, consumer behaviour preserved. reaper 37 tests + consumer liveness/health tests green; ruff clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3556 [FEAT] Address review: validate health port, type/guard liveness, tests Toolkit review (10 findings; several already resolved by the dedup refactor 0d2b3f7 — the threading-alias, the query-strip comment, and the extract-shared-server follow-up itself): - [Medium] Port parse: extracted _reaper_health_port_from_env() — names the var on a bad value (no more context-free int('abc') crash) and range-checks 0-65535 at parse time, so an out-of-range value can't escape the bind catch as OverflowError inside start(). main() uses it. - [Medium] liveness.py: typed _httpd/_thread as HTTPServer|None / Thread|None via TYPE_CHECKING (was Any in the shared server) — restores the lifecycle invariant + type-checking on .shutdown()/.join()/etc. - [Low] LivenessServer.__init__ re-validates stale_after > 0 (a direct caller could otherwise build an always-503 probe). - [Low] bound_port docstring: clarified the port=0 / not-started case. - [Low] _DEFAULT_HEALTH_STALE_SECONDS comment references the interval constant, not a hard-coded "5s". - [Medium/Low test gaps] +9 tests: port-env helper (unset/empty/valid/non-int named/out-of-range); main() wiring (parsed port reaches the wiring + health stopped in finally; port=None when unset); _maybe_start_health_server OSError graceful-degrade → None + logger.exception; stale_after<=0 constructor guard. reaper 46 + consumer liveness 10 green; ruff clean. The shared-server extraction (flagged as a follow-up) was already done in 0d2b3f7. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3556 [FEAT] Address Greptile: protect core payload + per-process log label - extra_status_fn could silently clobber core payload fields (status / check / age_key / stale_after_seconds) that a monitor reads. Now the handler builds extra fields first and overlays the core fields, so core ALWAYS wins — a future caller's extra dict can't corrupt the status a monitor parses. Test: an extra_status_fn returning {"status":"HACKED",...} leaves status "healthy" and check intact, while a non-reserved extra key is preserved. - The dedup refactor moved the consumer's liveness warnings to the shared liveness logger with generic text, so log-based filtering keyed on the old "PG-queue consumer: ..." would miss them. Added a log_label param (default "pg-queue"); consumer passes "pg-queue consumer", reaper "pg-queue reaper", so the messages stay attributable to the source process. 57 tests green (reaper 47 + consumer liveness 10); ruff clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
What
Followup to the reaper slice (UN-3554): wires the reaper into
run-worker.shand adds apg-queueset so the whole PG-queue group launches in one shot. Mirrors the 9c → 9c-followup split (launcher wiring as its own slice). The reaper's liveness probe is a separate follow-on.What's in it
workers/pg_queue_reaper/— a thin entrypoint package (python -m pg_queue_reaper→queue_backend.pg_queue.reaper.main). Unlikepg_queue_consumerit needs no worker-app bootstrap (the reaper runs no Celery tasks, only SQL recovery); it exists so the process has a stable namerun-worker.shcan launch and pgrep-match.run-worker.shreaper/pg-queue-reaperworker type — opt-in (not inall), launchespython -m pg_queue_reaper, runs from the workers root,--status/-k/-rmatching via the-mpgrep branch (now covers consumer + reaper). Lease/interval env documented in--help.pg/pg-queueset —run_pg_queue_setlaunches consumer + reaper together (always detached, likeall).-r pg-queuekills both members then relaunches.list_core_worker_dirsskips the set alias so status shows no phantompg-queueworker.allset and the PGpg-queueset as independent — run both in parallel for a dual-transport (strangler-fig) setup.Why a set (and what stays env/K8s)
run-worker.shis the dev / host-launch path; thepg-queueset is its ergonomics win — bring up the whole PG group in one command, alongsideall, for dual-transport dev. Production still runs each component as its own K8s Deployment, withWORKER_PG_QUEUE_ENABLED_TASKS/WORKER_BARRIER_BACKENDcontrolling what's active — so no dynamic "auto-detect available queues" logic is baked into the shell.Dev-test (live, against the running stack)
run-worker.sh reaper -d→ launched, acquired leadership, ticking;--statusshowspg_queue_reaper: RUNNING (2 proc).run-worker.sh pg-queue→ consumer + reaper both up + RUNNING, reaper leader, no phantom set entry in status.bash -nclean;--helprenders the new entries; entrypoint imports cleanly.Out of scope (follow-ons)
is_leader+ last-tick freshness) — separate slice, mirroring how consumer liveness (UN-3544) followed the consumer wiring.Base:
feat/UN-3445-pg-queue-integration. Sub-task UN-3555 under UN-3536, followup to UN-3554.