Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3555 [FEAT] PG Queue 9d slice-2 followup — run-worker.sh reaper type + pg-queue set#2059

Merged
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3555-reaper-run-worker-wiringZipstack/unstract:UN-3555-reaper-run-worker-wiringCopy head branch name to clipboard
Jun 16, 2026
Merged

UN-3555 [FEAT] PG Queue 9d slice-2 followup — run-worker.sh reaper type + pg-queue set#2059
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3555-reaper-run-worker-wiringZipstack/unstract:UN-3555-reaper-run-worker-wiringCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Followup to the reaper slice (UN-3554): wires the reaper into run-worker.sh and adds a pg-queue set so the whole PG-queue group launches in one shot. Mirrors the 9c → 9c-followup split (launcher wiring as its own slice). The reaper's liveness probe is a separate follow-on.

What's in it

workers/pg_queue_reaper/ — a thin entrypoint package (python -m pg_queue_reaperqueue_backend.pg_queue.reaper.main). Unlike pg_queue_consumer it needs no worker-app bootstrap (the reaper runs no Celery tasks, only SQL recovery); it exists so the process has a stable name run-worker.sh can launch and pgrep-match.

run-worker.sh

  • reaper / pg-queue-reaper worker type — opt-in (not in all), launches python -m pg_queue_reaper, runs from the workers root, --status/-k/-r matching via the -m pgrep branch (now covers consumer + reaper). Lease/interval env documented in --help.
  • pg / pg-queue setrun_pg_queue_set launches consumer + reaper together (always detached, like all). -r pg-queue kills both members then relaunches. list_core_worker_dirs skips the set alias so status shows no phantom pg-queue worker.
  • Help documents the Celery all set and the PG pg-queue set as independent — run both in parallel for a dual-transport (strangler-fig) setup.

Why a set (and what stays env/K8s)

run-worker.sh is the dev / host-launch path; the pg-queue set is its ergonomics win — bring up the whole PG group in one command, alongside all, for dual-transport dev. Production still runs each component as its own K8s Deployment, with WORKER_PG_QUEUE_ENABLED_TASKS / WORKER_BARRIER_BACKEND controlling what's active — so no dynamic "auto-detect available queues" logic is baked into the shell.

Dev-test (live, against the running stack)

  • run-worker.sh reaper -d → launched, acquired leadership, ticking; --status shows pg_queue_reaper: RUNNING (2 proc).
  • run-worker.sh pg-queue → consumer + reaper both up + RUNNING, reaper leader, no phantom set entry in status.
  • bash -n clean; --help renders the new entries; entrypoint imports cleanly.

Out of scope (follow-ons)

  • Reaper liveness probe (LivenessServer exposing is_leader + last-tick freshness) — separate slice, mirroring how consumer liveness (UN-3544) followed the consumer wiring.
  • Production K8s manifests.

Base: feat/UN-3445-pg-queue-integration. Sub-task UN-3555 under UN-3536, followup to UN-3554.

…pe + pg-queue set

Wires the reaper (UN-3554) into run-worker.sh and adds a pg-queue set so the
whole PG-queue group launches in one shot. Mirrors the 9c -> 9c-followup split
(launcher wiring as its own slice). Liveness probe is a separate follow-on.

- workers/pg_queue_reaper/: thin entrypoint package (python -m pg_queue_reaper
  -> queue_backend.pg_queue.reaper.main). No worker-app bootstrap (the reaper
  runs no Celery tasks), unlike pg_queue_consumer; exists so the process has a
  stable name run-worker.sh can launch + pgrep-match.
- run-worker.sh:
  - reaper / pg-queue-reaper type — opt-in (NOT in `all`), launches
    `python -m pg_queue_reaper`, runs from workers root, --status/-k/-r match
    via the `-m` pgrep branch (now covers consumer + reaper). Lease/interval env
    documented in --help.
  - pg / pg-queue SET — run_pg_queue_set launches consumer + reaper together
    (always detached, like `all`). Restart (-r pg-queue) kills both members then
    relaunches. list_core_worker_dirs skips the set alias (no phantom status
    entry). Help documents the Celery `all` set and the PG `pg-queue` set as
    independent, runnable in parallel for a dual-transport (strangler-fig) setup.

Dev-tested live: `run-worker.sh reaper -d` acquires leadership + ticks and shows
RUNNING in --status; `run-worker.sh pg-queue` brings up consumer + reaper, both
RUNNING, reaper leader, no phantom set entry; bash -n clean; --help renders.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f86e2da4-6077-49f7-9089-74efeba81cbc

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3555-reaper-run-worker-wiring

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).

Overall this is a clean, well-commented wiring slice that faithfully mirrors the existing pg_queue_consumer pattern. No bugs that break the happy path. The themes worth addressing before merge are observability of failures in the new pg-queue set launcher/restart path, and one misleading docstring. The code-simplifier found nothing actionable (the repeated blocks are intentional and clearer left explicit). Inline findings below, prioritised.

Comment thread workers/run-worker.sh
Comment thread workers/run-worker.sh Outdated
Comment thread workers/run-worker.sh
Comment thread workers/run-worker.sh Outdated
Comment thread workers/pg_queue_reaper/__main__.py Outdated
Comment thread workers/pg_queue_reaper/__main__.py
…-kill guard

Toolkit review on #2059:

- [High] run_pg_queue_set swallowed member start-failures (backgrounded
  subshells' status was lost, banner+return 0 unconditional). Now each member
  runs in a FOREGROUND subshell so run_worker's own `return 1` on a
  crash-on-start is captured; the set returns non-zero if any member fails.
  (The reviewer's kill -0 on `$!` would false-fail — that PID is the launcher
  subshell, which exits the instant it backgrounds the nohup'd worker; the
  foreground-subshell return value is the correct signal and isolates `cd`.)
  Documented that the set always runs detached (ignores -d).
- [Medium] Dispatch now `|| exit 1` so a member start-failure reaches the
  script exit code — the only programmatic startup signal (reaper has no
  health port yet).
- [Medium] Set-restart aggregates kill_one_worker failures and aborts the
  relaunch if a member survives SIGKILL (avoids a duplicate consumer
  double-polling Postgres). Mirrors kill_workers' discipline.
- [Medium/minor] Startup banner prints `Queues: n/a` when empty (reaper).
- [Low] Reworded pg_queue_reaper/__main__ docstring: the launcher DOES export
  WORKER_TYPE for every worker; the accurate claim is the reaper neither reads
  nor mutates it (vs the consumer overwriting it before `import worker`).
- [Low] Added a smoke test that pg_queue_reaper.__main__ re-exports the real
  reaper main (guards the `python -m pg_queue_reaper` launch path against an
  ImportError regression).

Dev-tested: `run-worker.sh pg-queue` returns exit 0 with both members up;
24 reaper tests pass; bash -n + ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review addressed — commit 2092a5cf6

All 6 toolkit findings in (SonarCloud already green). Validated each first; all valid.

High / Medium (start-failure visibility)

  • run_pg_queue_set swallowed member failures. Fixed — each member now runs in a foreground subshell so run_worker's return 1 on a crash-on-start is captured, and the set returns non-zero if any member fails. Worth noting: the suggested kill -0 $! would false-fail (that PID is the launcher subshell, which exits the moment it backgrounds the nohup'd worker — I hit exactly that: exit=1 on a healthy launch). The foreground-subshell return value is the correct signal and isolates cd.
  • Exit code now propagates — dispatch does run_pg_queue_set … || exit 1, so systemd/CI see a half-dead set as failed (the only programmatic startup signal until the reaper has a health port).
  • Set-restart aggregates kill_one_worker failures and aborts the relaunch if a member survives SIGKILL — no duplicate consumer double-polling Postgres.

Low / minor

  • Queues: n/a when empty (reaper); reworded the __main__ docstring (the launcher does export WORKER_TYPE; the reaper just never reads/mutates it); added a smoke test pinning that pg_queue_reaper.__main__ re-exports the real reaper main.

Deferred (acknowledged): a reaper that crashes after startup is still omitted from --status (opt-in skip + no health port). That's precisely the reaper liveness probe slice (next follow-on) — is_leader + last-tick freshness on a health port, mirroring the consumer's 8090.

Dev-tested: run-worker.sh pg-queueexit=0 with both members up; 24 reaper tests pass; bash -n + ruff clean.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 16, 2026 07:01
@greptile-apps

greptile-apps Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR wires the PG-queue reaper process into run-worker.sh and adds a pg-queue set alias so the full PG-queue group (consumer + reaper) can be launched in one command, mirroring the existing all Celery set pattern.

  • workers/pg_queue_reaper/ — thin entrypoint package (python -m pg_queue_reaper) with no Celery bootstrap, mirroring pg_queue_consumer; get_worker_pids now matches both on the -m <pkg> pgrep pattern.
  • run-worker.sh — adds reaper/pg-queue-reaper worker type (opt-in, not in all), a pg/pg-queue set alias that launches both members detached, restart/kill paths for the set, and a tail_logs fix that fans out to both member logs when the set alias is requested.
  • test_pg_reaper.pyTestEntryPoint pins that the launcher module correctly re-exports queue_backend.pg_queue.reaper.main, catching future import-path regressions.

Confidence Score: 5/5

Safe to merge — all new code paths (reaper type, pg-queue set, pgrep matching, kill/restart/logs routing) are consistent with the existing consumer pattern and tested.

The reaper wiring is a straightforward mechanical extension of the consumer pattern: same pgrep branch, same opt-in flag, same detach discipline, same worker-dir structure. The set's failure-teardown path correctly kills survivors before returning. No correctness issues found across any of the four changed files.

No files require special attention; the one note on run-worker.sh is a documentation gap in --help, not a correctness issue.

Important Files Changed

Filename Overview
workers/pg_queue_reaper/init.py Thin package docstring only; mirrors pg_queue_consumer/init.py structure exactly.
workers/pg_queue_reaper/main.py Top-level import of reaper main at module scope (outside name guard), unlike consumer which guards inside _bootstrap_and_run(). Intentional — reaper needs no heavy bootstrap; test suite already imports the same module at top level so import-time safety is verified.
workers/run-worker.sh Comprehensive wiring for reaper type and pg-queue set; pgrep, kill, status, logs, and restart paths all updated consistently. Minor: run_pg_queue_set always detaches (no foreground/wait form) while run_all_workers supports both modes; --help doesn't document this.
workers/tests/test_pg_reaper.py Adds TestEntryPoint that pins module.main identity against the real reaper main; correctly uses importlib to avoid triggering the if name == 'main' guard.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["run-worker.sh $WORKER_TYPE"] --> B{WORKERS lookup}
    B -- "pg / pg-queue" --> C[run_pg_queue_set]
    B -- "reaper / pg-queue-reaper" --> D[run_worker pg_queue_reaper]
    B -- "all" --> E[run_all_workers]
    B -- other --> F[run_worker type]

    C --> G["subshell: run_worker pg_queue_consumer detach=true"]
    C --> H["subshell: run_worker pg_queue_reaper detach=true"]
    G --> I{crash within 1s?}
    H --> J{crash within 1s?}
    I -- yes --> K[failed=1]
    J -- yes --> K
    K --> L[kill_one_worker consumer + reaper]
    L --> M[return 1 / exit 1]
    I -- no --> N[consumer RUNNING]
    J -- no --> O[reaper RUNNING leader-elected]

    D --> P["uv run python -m pg_queue_reaper"]
    P --> Q[queue_backend.pg_queue.reaper.main]
    Q --> R[SQL recovery loop]
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
    A["run-worker.sh $WORKER_TYPE"] --> B{WORKERS lookup}
    B -- "pg / pg-queue" --> C[run_pg_queue_set]
    B -- "reaper / pg-queue-reaper" --> D[run_worker pg_queue_reaper]
    B -- "all" --> E[run_all_workers]
    B -- other --> F[run_worker type]

    C --> G["subshell: run_worker pg_queue_consumer detach=true"]
    C --> H["subshell: run_worker pg_queue_reaper detach=true"]
    G --> I{crash within 1s?}
    H --> J{crash within 1s?}
    I -- yes --> K[failed=1]
    J -- yes --> K
    K --> L[kill_one_worker consumer + reaper]
    L --> M[return 1 / exit 1]
    I -- no --> N[consumer RUNNING]
    J -- no --> O[reaper RUNNING leader-elected]

    D --> P["uv run python -m pg_queue_reaper"]
    P --> Q[queue_backend.pg_queue.reaper.main]
    Q --> R[SQL recovery loop]
Loading

Reviews (2): Last reviewed commit: "UN-3555 [FEAT] Address Greptile: set par..." | Re-trigger Greptile

Comment thread workers/run-worker.sh
…set alias

- run_pg_queue_set: on a partial start-failure (one member up, the other
  crashed) tear the whole set down before returning 1 — kill both members so a
  restart-on-failure relaunch can't spawn a second instance over the survivor
  (the consumer would double-poll Postgres). All-or-nothing, mirroring the
  restart path's discipline.
- tail_logs: handle the pg/pg-queue set alias — `--logs pg-queue` now tails
  both member logs (pg_queue_consumer + pg_queue_reaper) instead of looking for
  a non-existent workers/pg-queue/pg-queue.log and printing a misleading
  "no log file" error. Mirrors how list_core_worker_dirs skips the set value.

Dev-tested: `--logs pg-queue` tails 2 files (consumer + reaper); bash -n clean;
24 reaper tests pass.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Greptile review addressed — commit 0b381a3a4

Both findings valid and fixed:

  • Partial startup leaves a survivor (run_pg_queue_set). On a partial start-failure (one member up, the other crashed), the set now tears both members down before return 1 — so a Restart=on-failure/CI relaunch can't spawn a second instance over the survivor (the consumer would double-poll Postgres). All-or-nothing, mirroring the restart path.
  • --logs misfired on the pg/pg-queue set alias (out-of-diff comment). tail_logs now special-cases the set alias and tails both member logs (pg_queue_consumer + pg_queue_reaper) instead of looking for a non-existent workers/pg-queue/pg-queue.log. Mirrors how list_core_worker_dirs skips the set value. Dev-tested: --logs pg-queue → "Tailing 2 log file(s)".

bash -n clean; 24 reaper tests pass. Confidence was 4/5 with these two — both resolved.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit 35c735f into feat/UN-3445-pg-queue-integration Jun 16, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3555-reaper-run-worker-wiring branch June 16, 2026 07:21
muhammad-ali-e added a commit that referenced this pull request Jun 16, 2026
…ader) (#2061)

* UN-3556 [FEAT] PG Queue 9d — reaper liveness probe (heartbeat + is_leader)

Closes the gap flagged in #2059's review: a reaper that crashes after startup
was invisible (opt-in skip in --status + no health port). Mirrors the
consumer's liveness (UN-3544).

- PgReaper heartbeat: _last_tick_monotonic stamped at the START of every tick
  (a standby tick counts as progress — liveness tracks the loop, not
  leadership) + seconds_since_last_tick() / is_tick_stale().
- ReaperLivenessServer: lean HTTP probe (mirrors the consumer's LivenessServer)
  — /health (also /healthz, /livez) returns 200 while the tick loop is fresh,
  503 when stale. Payload also surfaces is_leader (which pod holds the lease —
  useful for 9e debugging). The 200/503 verdict is PURELY the heartbeat, never
  leadership (a standby is healthy) or DB reachability (a blip must not
  crash-loop a fine process).
- main() wires it from WORKER_PG_REAPER_HEALTH_PORT (unset → no server, no
  stray port); staleness window from WORKER_PG_REAPER_HEALTH_STALE_SECONDS
  (default 30s, comfortably above the 5s tick interval). Bind failure degrades
  gracefully (logs, runs probe-less).
- run-worker.sh: reserve port 8086 for the reaper, export the health-port env
  in the reaper special-case, document the two new env vars in --help.
- Tests (+13, 37 total): heartbeat fresh/stale + tick refresh; liveness server
  200-fresh / 503-stale / is_leader reflected / 404 / double-start; health
  staleness env default+override+invalid; server-disabled-when-no-port.

Dev-tested live: `run-worker.sh reaper -d` → GET :8086/health →
{"status":"healthy","check":"pg_reaper_tick","is_leader":true,...}.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3556 [REFACTOR] Extract shared LivenessServer (SonarCloud duplication)

SonarCloud flagged the new ReaperLivenessServer as duplicating the consumer's
LivenessServer (~19 lines of HTTP-probe boilerplate, over the 3% new-code gate).

Extracted one generic LivenessServer into queue_backend/pg_queue/liveness.py —
parameterised by a freshness callable + the payload's check/age labels + an
optional extra-status callable (the reaper's is_leader). Both sides are now thin
subclasses that preserve their exact constructor signatures and wire payloads:
- consumer LivenessServer(consumer, port=, stale_after=) → check="pg_queue_poll",
  seconds_since_last_poll (unchanged on the wire; its tests pass untouched).
- ReaperLivenessServer(reaper, port=, stale_after=) → check="pg_reaper_tick",
  seconds_since_last_tick, is_leader.

The boilerplate now lives once → duplication cleared, consumer behaviour
preserved. reaper 37 tests + consumer liveness/health tests green; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3556 [FEAT] Address review: validate health port, type/guard liveness, tests

Toolkit review (10 findings; several already resolved by the dedup refactor
0d2b3f7 — the threading-alias, the query-strip comment, and the
extract-shared-server follow-up itself):

- [Medium] Port parse: extracted _reaper_health_port_from_env() — names the var
  on a bad value (no more context-free int('abc') crash) and range-checks
  0-65535 at parse time, so an out-of-range value can't escape the bind catch as
  OverflowError inside start(). main() uses it.
- [Medium] liveness.py: typed _httpd/_thread as HTTPServer|None / Thread|None via
  TYPE_CHECKING (was Any in the shared server) — restores the lifecycle invariant
  + type-checking on .shutdown()/.join()/etc.
- [Low] LivenessServer.__init__ re-validates stale_after > 0 (a direct caller
  could otherwise build an always-503 probe).
- [Low] bound_port docstring: clarified the port=0 / not-started case.
- [Low] _DEFAULT_HEALTH_STALE_SECONDS comment references the interval constant,
  not a hard-coded "5s".
- [Medium/Low test gaps] +9 tests: port-env helper (unset/empty/valid/non-int
  named/out-of-range); main() wiring (parsed port reaches the wiring + health
  stopped in finally; port=None when unset); _maybe_start_health_server OSError
  graceful-degrade → None + logger.exception; stale_after<=0 constructor guard.

reaper 46 + consumer liveness 10 green; ruff clean. The shared-server extraction
(flagged as a follow-up) was already done in 0d2b3f7.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3556 [FEAT] Address Greptile: protect core payload + per-process log label

- extra_status_fn could silently clobber core payload fields (status / check /
  age_key / stale_after_seconds) that a monitor reads. Now the handler builds
  extra fields first and overlays the core fields, so core ALWAYS wins — a
  future caller's extra dict can't corrupt the status a monitor parses.
  Test: an extra_status_fn returning {"status":"HACKED",...} leaves status
  "healthy" and check intact, while a non-reserved extra key is preserved.
- The dedup refactor moved the consumer's liveness warnings to the shared
  liveness logger with generic text, so log-based filtering keyed on the old
  "PG-queue consumer: ..." would miss them. Added a log_label param (default
  "pg-queue"); consumer passes "pg-queue consumer", reaper "pg-queue reaper", so
  the messages stay attributable to the source process.

57 tests green (reaper 47 + consumer liveness 10); ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.