UN-3556 [FEAT] PG Queue 9d — reaper liveness probe (heartbeat + is_leader)#2061
UN-3556 [FEAT] PG Queue 9d — reaper liveness probe (heartbeat + is_leader)#2061muhammad-ali-e merged 4 commits into
Conversation
…ader) Closes the gap flagged in #2059's review: a reaper that crashes after startup was invisible (opt-in skip in --status + no health port). Mirrors the consumer's liveness (UN-3544). - PgReaper heartbeat: _last_tick_monotonic stamped at the START of every tick (a standby tick counts as progress — liveness tracks the loop, not leadership) + seconds_since_last_tick() / is_tick_stale(). - ReaperLivenessServer: lean HTTP probe (mirrors the consumer's LivenessServer) — /health (also /healthz, /livez) returns 200 while the tick loop is fresh, 503 when stale. Payload also surfaces is_leader (which pod holds the lease — useful for 9e debugging). The 200/503 verdict is PURELY the heartbeat, never leadership (a standby is healthy) or DB reachability (a blip must not crash-loop a fine process). - main() wires it from WORKER_PG_REAPER_HEALTH_PORT (unset → no server, no stray port); staleness window from WORKER_PG_REAPER_HEALTH_STALE_SECONDS (default 30s, comfortably above the 5s tick interval). Bind failure degrades gracefully (logs, runs probe-less). - run-worker.sh: reserve port 8086 for the reaper, export the health-port env in the reaper special-case, document the two new env vars in --help. - Tests (+13, 37 total): heartbeat fresh/stale + tick refresh; liveness server 200-fresh / 503-stale / is_leader reflected / 404 / double-start; health staleness env default+override+invalid; server-disabled-when-no-port. Dev-tested live: `run-worker.sh reaper -d` → GET :8086/health → {"status":"healthy","check":"pg_reaper_tick","is_leader":true,...}. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review (PR Review Toolkit)
Ran six specialised review agents — Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, Code Simplifier — over the 4 in-scope files (base feat/UN-3445-pg-queue-integration). Overall this is a clean, well-documented change that faithfully mirrors the consumer's LivenessServer; no Critical/High issues. Inline comments below, deduped across agents and prioritised.
Top items: (1) the bare int(raw_port) port parse — context-free crash + an out-of-range port escapes the OSError graceful-degrade path; (2) _httpd/_thread typed Any where the consumer uses precise types; (3) main() port-parse + teardown are untested.
…ion) SonarCloud flagged the new ReaperLivenessServer as duplicating the consumer's LivenessServer (~19 lines of HTTP-probe boilerplate, over the 3% new-code gate). Extracted one generic LivenessServer into queue_backend/pg_queue/liveness.py — parameterised by a freshness callable + the payload's check/age labels + an optional extra-status callable (the reaper's is_leader). Both sides are now thin subclasses that preserve their exact constructor signatures and wire payloads: - consumer LivenessServer(consumer, port=, stale_after=) → check="pg_queue_poll", seconds_since_last_poll (unchanged on the wire; its tests pass untouched). - ReaperLivenessServer(reaper, port=, stale_after=) → check="pg_reaper_tick", seconds_since_last_tick, is_leader. The boilerplate now lives once → duplication cleared, consumer behaviour preserved. reaper 37 tests + consumer liveness/health tests green; ruff clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
SonarCloud duplication addressed — commit The new reaper 37 tests + consumer liveness/health tests green; ruff + pre-commit clean. |
…ess, tests Toolkit review (10 findings; several already resolved by the dedup refactor 0d2b3f7 — the threading-alias, the query-strip comment, and the extract-shared-server follow-up itself): - [Medium] Port parse: extracted _reaper_health_port_from_env() — names the var on a bad value (no more context-free int('abc') crash) and range-checks 0-65535 at parse time, so an out-of-range value can't escape the bind catch as OverflowError inside start(). main() uses it. - [Medium] liveness.py: typed _httpd/_thread as HTTPServer|None / Thread|None via TYPE_CHECKING (was Any in the shared server) — restores the lifecycle invariant + type-checking on .shutdown()/.join()/etc. - [Low] LivenessServer.__init__ re-validates stale_after > 0 (a direct caller could otherwise build an always-503 probe). - [Low] bound_port docstring: clarified the port=0 / not-started case. - [Low] _DEFAULT_HEALTH_STALE_SECONDS comment references the interval constant, not a hard-coded "5s". - [Medium/Low test gaps] +9 tests: port-env helper (unset/empty/valid/non-int named/out-of-range); main() wiring (parsed port reaches the wiring + health stopped in finally; port=None when unset); _maybe_start_health_server OSError graceful-degrade → None + logger.exception; stale_after<=0 constructor guard. reaper 46 + consumer liveness 10 green; ruff clean. The shared-server extraction (flagged as a follow-up) was already done in 0d2b3f7. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review addressed — commit
|
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/liveness.py | New shared liveness probe module extracted from consumer.py; parameterised by freshness callable, check/age labels, and optional extra-status fn; core fields overlay extra to prevent clobbering; clean implementation. |
| workers/queue_backend/pg_queue/reaper.py | Adds _last_tick_monotonic heartbeat to PgReaper, ReaperLivenessServer thin wrapper, env parsers, graceful-degrade _maybe_start_health_server, and main() wiring with finally-stop; all correct. |
| workers/queue_backend/pg_queue/consumer.py | Consumer LivenessServer reduced to a thin subclass of the shared base; constructor shape unchanged; behaviour identical — only logger namespace changes from consumer to liveness (noted in previous review). |
| workers/run-worker.sh | Reserves port 8086 in WORKER_HEALTH_PORTS, exports WORKER_PG_REAPER_HEALTH_PORT for the reaper special-case, and documents the two new env vars in --help. |
| workers/tests/test_pg_reaper.py | Adds 13 new tests covering heartbeat fresh/stale/refresh, liveness 200/503/is_leader/clobber-guard/404/double-start, env defaults/overrides/invalid, bind-failure degrade, and main() wiring; solid coverage. |
| workers/queue_backend/pg_queue/init.py | Exports LivenessServer and ReaperLivenessServer added to all; straightforward. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant main as main()
participant reaper as PgReaper
participant msh as _maybe_start_health_server
participant rls as ReaperLivenessServer
participant base as LivenessServer (daemon thread)
participant probe as K8s/probe
main->>reaper: PgReaper(lease) — sets _last_tick_monotonic
main->>msh: _reaper_health_port_from_env() / _reaper_health_stale_from_env()
main->>msh: _maybe_start_health_server(reaper, port, stale_after)
msh->>rls: ReaperLivenessServer(reaper, port, stale_after)
rls->>base: "super().__init__(freshness_fn, extra_status_fn=is_leader)"
msh->>base: server.start() — spawns daemon thread
Note over base: OSError → log + return None (probe-less)
main->>reaper: reaper.run()
loop every interval_seconds
reaper->>reaper: tick() — stamps _last_tick_monotonic at START
reaper->>reaper: renew/acquire lease, sweep if leader
end
probe->>base: GET /health
base->>reaper: "freshness_fn() = seconds_since_last_tick()"
base->>reaper: "extra_status_fn() = {is_leader: ...}"
base-->>probe: "200 healthy / 503 stale (verdict = heartbeat only)"
main->>base: health.stop() [finally]
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant main as main()
participant reaper as PgReaper
participant msh as _maybe_start_health_server
participant rls as ReaperLivenessServer
participant base as LivenessServer (daemon thread)
participant probe as K8s/probe
main->>reaper: PgReaper(lease) — sets _last_tick_monotonic
main->>msh: _reaper_health_port_from_env() / _reaper_health_stale_from_env()
main->>msh: _maybe_start_health_server(reaper, port, stale_after)
msh->>rls: ReaperLivenessServer(reaper, port, stale_after)
rls->>base: "super().__init__(freshness_fn, extra_status_fn=is_leader)"
msh->>base: server.start() — spawns daemon thread
Note over base: OSError → log + return None (probe-less)
main->>reaper: reaper.run()
loop every interval_seconds
reaper->>reaper: tick() — stamps _last_tick_monotonic at START
reaper->>reaper: renew/acquire lease, sweep if leader
end
probe->>base: GET /health
base->>reaper: "freshness_fn() = seconds_since_last_tick()"
base->>reaper: "extra_status_fn() = {is_leader: ...}"
base-->>probe: "200 healthy / 503 stale (verdict = heartbeat only)"
main->>base: health.stop() [finally]
Reviews (2): Last reviewed commit: "UN-3556 [FEAT] Address Greptile: protect..." | Re-trigger Greptile
…og label
- extra_status_fn could silently clobber core payload fields (status / check /
age_key / stale_after_seconds) that a monitor reads. Now the handler builds
extra fields first and overlays the core fields, so core ALWAYS wins — a
future caller's extra dict can't corrupt the status a monitor parses.
Test: an extra_status_fn returning {"status":"HACKED",...} leaves status
"healthy" and check intact, while a non-reserved extra key is preserved.
- The dedup refactor moved the consumer's liveness warnings to the shared
liveness logger with generic text, so log-based filtering keyed on the old
"PG-queue consumer: ..." would miss them. Added a log_label param (default
"pg-queue"); consumer passes "pg-queue consumer", reaper "pg-queue reaper", so
the messages stay attributable to the source process.
57 tests green (reaper 47 + consumer liveness 10); ruff clean.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Greptile review addressed — commit
|
|
f312388
into
feat/UN-3445-pg-queue-integration
What
The reaper liveness probe — closing the gap flagged in #2059's review: a reaper that crashes after startup was invisible (opt-in skip in
--status+ no health port). This finishes 9d. Mirrors the consumer's liveness (UN-3544).What's in it
PgReaperheartbeat —_last_tick_monotonicstamped at the start of every tick (a standby tick counts as progress — liveness tracks the loop, not leadership) +seconds_since_last_tick()/is_tick_stale().ReaperLivenessServer— lean HTTP probe (mirrors the consumer'sLivenessServer):/health(also/healthz,/livez) → 200 while the tick loop is fresh, 503 when stale. Payload also surfacesis_leader(which pod holds the lease — handy for 9e debugging). The 200/503 verdict is purely the heartbeat — never leadership (a standby is healthy) or DB reachability (a blip must not crash-loop a healthy process).main()wiring —WORKER_PG_REAPER_HEALTH_PORT(unset → no server, no stray port); staleness windowWORKER_PG_REAPER_HEALTH_STALE_SECONDS(default 30s, comfortably above the 5s tick interval). A bind failure degrades gracefully (logs, runs probe-less).run-worker.sh— reserves port 8086 for the reaper, exports the health-port env in the reaper special-case, documents the two new env vars in--help.Tests (+13 → 37 total)
Heartbeat fresh/stale + tick-refresh; liveness server 200-fresh / 503-stale /
is_leaderreflected / 404 / double-start-raises; health-staleness env default+override+invalid; server-disabled-when-no-port.Dev-test (live)
./run-worker.sh reaper -d→GET :8086/health→{"status":"healthy","check":"pg_reaper_tick","seconds_since_last_tick":4.79,"stale_after_seconds":30.0,"is_leader":true}Design note
Kept in
reaper.py(not a shared module) — consistent with the consumer'sLivenessServerliving inconsumer.py. A future cleanup could extract a shared generic server; out of scope here to avoid refactoring the merged-and-tested consumer probe.Base:
feat/UN-3445-pg-queue-integration. This completes 9d (orchestrator/reaper). Sub-task UN-3556 under UN-3536.