Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3556 [FEAT] PG Queue 9d — reaper liveness probe (heartbeat + is_leader)#2061

Merged
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3556-reaper-livenessZipstack/unstract:UN-3556-reaper-livenessCopy head branch name to clipboard
Jun 16, 2026
Merged

UN-3556 [FEAT] PG Queue 9d — reaper liveness probe (heartbeat + is_leader)#2061
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3556-reaper-livenessZipstack/unstract:UN-3556-reaper-livenessCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

The reaper liveness probe — closing the gap flagged in #2059's review: a reaper that crashes after startup was invisible (opt-in skip in --status + no health port). This finishes 9d. Mirrors the consumer's liveness (UN-3544).

What's in it

  • PgReaper heartbeat_last_tick_monotonic stamped at the start of every tick (a standby tick counts as progress — liveness tracks the loop, not leadership) + seconds_since_last_tick() / is_tick_stale().
  • ReaperLivenessServer — lean HTTP probe (mirrors the consumer's LivenessServer): /health (also /healthz, /livez) → 200 while the tick loop is fresh, 503 when stale. Payload also surfaces is_leader (which pod holds the lease — handy for 9e debugging). The 200/503 verdict is purely the heartbeat — never leadership (a standby is healthy) or DB reachability (a blip must not crash-loop a healthy process).
  • main() wiringWORKER_PG_REAPER_HEALTH_PORT (unset → no server, no stray port); staleness window WORKER_PG_REAPER_HEALTH_STALE_SECONDS (default 30s, comfortably above the 5s tick interval). A bind failure degrades gracefully (logs, runs probe-less).
  • run-worker.sh — reserves port 8086 for the reaper, exports the health-port env in the reaper special-case, documents the two new env vars in --help.

Tests (+13 → 37 total)

Heartbeat fresh/stale + tick-refresh; liveness server 200-fresh / 503-stale / is_leader reflected / 404 / double-start-raises; health-staleness env default+override+invalid; server-disabled-when-no-port.

Dev-test (live)

./run-worker.sh reaper -dGET :8086/health
{"status":"healthy","check":"pg_reaper_tick","seconds_since_last_tick":4.79,"stale_after_seconds":30.0,"is_leader":true}

Design note

Kept in reaper.py (not a shared module) — consistent with the consumer's LivenessServer living in consumer.py. A future cleanup could extract a shared generic server; out of scope here to avoid refactoring the merged-and-tested consumer probe.

Base: feat/UN-3445-pg-queue-integration. This completes 9d (orchestrator/reaper). Sub-task UN-3556 under UN-3536.

…ader)

Closes the gap flagged in #2059's review: a reaper that crashes after startup
was invisible (opt-in skip in --status + no health port). Mirrors the
consumer's liveness (UN-3544).

- PgReaper heartbeat: _last_tick_monotonic stamped at the START of every tick
  (a standby tick counts as progress — liveness tracks the loop, not
  leadership) + seconds_since_last_tick() / is_tick_stale().
- ReaperLivenessServer: lean HTTP probe (mirrors the consumer's LivenessServer)
  — /health (also /healthz, /livez) returns 200 while the tick loop is fresh,
  503 when stale. Payload also surfaces is_leader (which pod holds the lease —
  useful for 9e debugging). The 200/503 verdict is PURELY the heartbeat, never
  leadership (a standby is healthy) or DB reachability (a blip must not
  crash-loop a fine process).
- main() wires it from WORKER_PG_REAPER_HEALTH_PORT (unset → no server, no
  stray port); staleness window from WORKER_PG_REAPER_HEALTH_STALE_SECONDS
  (default 30s, comfortably above the 5s tick interval). Bind failure degrades
  gracefully (logs, runs probe-less).
- run-worker.sh: reserve port 8086 for the reaper, export the health-port env
  in the reaper special-case, document the two new env vars in --help.
- Tests (+13, 37 total): heartbeat fresh/stale + tick refresh; liveness server
  200-fresh / 503-stale / is_leader reflected / 404 / double-start; health
  staleness env default+override+invalid; server-disabled-when-no-port.

Dev-tested live: `run-worker.sh reaper -d` → GET :8086/health →
{"status":"healthy","check":"pg_reaper_tick","is_leader":true,...}.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7d3c9c2d-b84a-499a-afad-99c6ed55d90f

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3556-reaper-liveness

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review (PR Review Toolkit)

Ran six specialised review agents — Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, Code Simplifier — over the 4 in-scope files (base feat/UN-3445-pg-queue-integration). Overall this is a clean, well-documented change that faithfully mirrors the consumer's LivenessServer; no Critical/High issues. Inline comments below, deduped across agents and prioritised.

Top items: (1) the bare int(raw_port) port parse — context-free crash + an out-of-range port escapes the OSError graceful-degrade path; (2) _httpd/_thread typed Any where the consumer uses precise types; (3) main() port-parse + teardown are untested.

Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py
Comment thread workers/queue_backend/pg_queue/reaper.py
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
…ion)

SonarCloud flagged the new ReaperLivenessServer as duplicating the consumer's
LivenessServer (~19 lines of HTTP-probe boilerplate, over the 3% new-code gate).

Extracted one generic LivenessServer into queue_backend/pg_queue/liveness.py —
parameterised by a freshness callable + the payload's check/age labels + an
optional extra-status callable (the reaper's is_leader). Both sides are now thin
subclasses that preserve their exact constructor signatures and wire payloads:
- consumer LivenessServer(consumer, port=, stale_after=) → check="pg_queue_poll",
  seconds_since_last_poll (unchanged on the wire; its tests pass untouched).
- ReaperLivenessServer(reaper, port=, stale_after=) → check="pg_reaper_tick",
  seconds_since_last_tick, is_leader.

The boilerplate now lives once → duplication cleared, consumer behaviour
preserved. reaper 37 tests + consumer liveness/health tests green; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud duplication addressed — commit 0d2b3f721.

The new ReaperLivenessServer was duplicating the consumer's LivenessServer (~19 lines of HTTP-probe boilerplate, over the 3% new-code gate). Extracted one generic LivenessServer into queue_backend/pg_queue/liveness.py (parameterised by a freshness callable + the payload check/age labels + an optional extra-status callable for the reaper's is_leader). Both sides are now thin subclasses that preserve their exact constructor signatures and wire payloads — so the boilerplate lives once and the consumer's behaviour/tests are unchanged.

reaper 37 tests + consumer liveness/health tests green; ruff + pre-commit clean.

…ess, tests

Toolkit review (10 findings; several already resolved by the dedup refactor
0d2b3f7 — the threading-alias, the query-strip comment, and the
extract-shared-server follow-up itself):

- [Medium] Port parse: extracted _reaper_health_port_from_env() — names the var
  on a bad value (no more context-free int('abc') crash) and range-checks
  0-65535 at parse time, so an out-of-range value can't escape the bind catch as
  OverflowError inside start(). main() uses it.
- [Medium] liveness.py: typed _httpd/_thread as HTTPServer|None / Thread|None via
  TYPE_CHECKING (was Any in the shared server) — restores the lifecycle invariant
  + type-checking on .shutdown()/.join()/etc.
- [Low] LivenessServer.__init__ re-validates stale_after > 0 (a direct caller
  could otherwise build an always-503 probe).
- [Low] bound_port docstring: clarified the port=0 / not-started case.
- [Low] _DEFAULT_HEALTH_STALE_SECONDS comment references the interval constant,
  not a hard-coded "5s".
- [Medium/Low test gaps] +9 tests: port-env helper (unset/empty/valid/non-int
  named/out-of-range); main() wiring (parsed port reaches the wiring + health
  stopped in finally; port=None when unset); _maybe_start_health_server OSError
  graceful-degrade → None + logger.exception; stale_after<=0 constructor guard.

reaper 46 + consumer liveness 10 green; ruff clean. The shared-server extraction
(flagged as a follow-up) was already done in 0d2b3f7.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review addressed — commit 40a5f587a

All 10 toolkit findings in. Validated each against the current code first — and notably, 3 were already resolved by the dedup refactor (0d2b3f721) that landed in response to SonarCloud: the import threading as _threading alias, the missing query-strip comment, and the "extract a shared liveness server" follow-up itself.

Newly fixed

  • Health-port parse — extracted _reaper_health_port_from_env(): names the var on a bad value (no more context-free int('abc') crash) and range-checks 0–65535 at parse time, so an out-of-range port can't escape the bind catch as OverflowError inside start().
  • Typing_httpd/_thread are HTTPServer|None / Thread|None (via TYPE_CHECKING) in the shared liveness.py; the Any divergence was copy-drift, gone with the single implementation.
  • stale_after > 0 re-validated in the constructor (an always-503 probe would crash-loop the pod).
  • bound_port docstring clarified for the port=0/not-started case; the stale-window comment references the interval constant instead of a hard-coded "5s".

Test gaps closed (+9) — port-env helper (unset/empty/valid/non-int-named/out-of-range); main() wiring (parsed port reaches the wiring + health.stop() in the finally; port=None when unset); _maybe_start_health_server OSError graceful-degrade → None + logger.exception; stale_after<=0 constructor guard.

reaper 46 + consumer liveness 10 green; ruff + pre-commit clean. SonarCloud gate already passed.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 16, 2026 11:48
@greptile-apps

greptile-apps Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR completes the reaper liveness probe (9d) by extracting a shared LivenessServer base class into liveness.py and building ReaperLivenessServer on top of it, mirroring the consumer's existing probe. The heartbeat stamps _last_tick_monotonic at the start of every tick (standby ticks count as progress), and the HTTP verdict is purely the heartbeat — never leadership or DB reachability.

  • liveness.py — new shared generic liveness probe; extra_status_fn fields are applied first and then overlaid by core fields, so status/check/age-key can never be clobbered by a future caller.
  • reaper.pyPgReaper gains heartbeat methods; ReaperLivenessServer wraps the base with seconds_since_last_tick + is_leader; _maybe_start_health_server degrades gracefully on OSError; main() wires the probe in a finally-protected start/stop.
  • run-worker.sh — reserves port 8086 for the reaper, always exports WORKER_PG_REAPER_HEALTH_PORT in the reaper special-case, and documents WORKER_PG_REAPER_HEALTH_STALE_SECONDS in --help.

Confidence Score: 5/5

Safe to merge — the probe is purely additive and auxiliary; a bind failure degrades gracefully, the liveness verdict never touches leadership or DB reachability, and the main() finally block ensures the server is always stopped.

The changes are well-scoped: a new shared module, a thin subclass, env parsers with explicit validation, and a graceful-degrade wrapper. The previously flagged extra_status_fn clobber issue has been addressed (core fields now overlay). Thread safety of the heartbeat timestamp is safe under CPython's GIL. The 13 new tests cover the critical paths (200/503 verdict, is_leader reflection, env validation, bind-failure degrade, main wiring). No correctness issues found.

No files require special attention.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/liveness.py New shared liveness probe module extracted from consumer.py; parameterised by freshness callable, check/age labels, and optional extra-status fn; core fields overlay extra to prevent clobbering; clean implementation.
workers/queue_backend/pg_queue/reaper.py Adds _last_tick_monotonic heartbeat to PgReaper, ReaperLivenessServer thin wrapper, env parsers, graceful-degrade _maybe_start_health_server, and main() wiring with finally-stop; all correct.
workers/queue_backend/pg_queue/consumer.py Consumer LivenessServer reduced to a thin subclass of the shared base; constructor shape unchanged; behaviour identical — only logger namespace changes from consumer to liveness (noted in previous review).
workers/run-worker.sh Reserves port 8086 in WORKER_HEALTH_PORTS, exports WORKER_PG_REAPER_HEALTH_PORT for the reaper special-case, and documents the two new env vars in --help.
workers/tests/test_pg_reaper.py Adds 13 new tests covering heartbeat fresh/stale/refresh, liveness 200/503/is_leader/clobber-guard/404/double-start, env defaults/overrides/invalid, bind-failure degrade, and main() wiring; solid coverage.
workers/queue_backend/pg_queue/init.py Exports LivenessServer and ReaperLivenessServer added to all; straightforward.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant main as main()
    participant reaper as PgReaper
    participant msh as _maybe_start_health_server
    participant rls as ReaperLivenessServer
    participant base as LivenessServer (daemon thread)
    participant probe as K8s/probe

    main->>reaper: PgReaper(lease) — sets _last_tick_monotonic
    main->>msh: _reaper_health_port_from_env() / _reaper_health_stale_from_env()
    main->>msh: _maybe_start_health_server(reaper, port, stale_after)
    msh->>rls: ReaperLivenessServer(reaper, port, stale_after)
    rls->>base: "super().__init__(freshness_fn, extra_status_fn=is_leader)"
    msh->>base: server.start() — spawns daemon thread
    Note over base: OSError → log + return None (probe-less)
    main->>reaper: reaper.run()
    loop every interval_seconds
        reaper->>reaper: tick() — stamps _last_tick_monotonic at START
        reaper->>reaper: renew/acquire lease, sweep if leader
    end
    probe->>base: GET /health
    base->>reaper: "freshness_fn() = seconds_since_last_tick()"
    base->>reaper: "extra_status_fn() = {is_leader: ...}"
    base-->>probe: "200 healthy / 503 stale (verdict = heartbeat only)"
    main->>base: health.stop() [finally]
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant main as main()
    participant reaper as PgReaper
    participant msh as _maybe_start_health_server
    participant rls as ReaperLivenessServer
    participant base as LivenessServer (daemon thread)
    participant probe as K8s/probe

    main->>reaper: PgReaper(lease) — sets _last_tick_monotonic
    main->>msh: _reaper_health_port_from_env() / _reaper_health_stale_from_env()
    main->>msh: _maybe_start_health_server(reaper, port, stale_after)
    msh->>rls: ReaperLivenessServer(reaper, port, stale_after)
    rls->>base: "super().__init__(freshness_fn, extra_status_fn=is_leader)"
    msh->>base: server.start() — spawns daemon thread
    Note over base: OSError → log + return None (probe-less)
    main->>reaper: reaper.run()
    loop every interval_seconds
        reaper->>reaper: tick() — stamps _last_tick_monotonic at START
        reaper->>reaper: renew/acquire lease, sweep if leader
    end
    probe->>base: GET /health
    base->>reaper: "freshness_fn() = seconds_since_last_tick()"
    base->>reaper: "extra_status_fn() = {is_leader: ...}"
    base-->>probe: "200 healthy / 503 stale (verdict = heartbeat only)"
    main->>base: health.stop() [finally]
Loading

Reviews (2): Last reviewed commit: "UN-3556 [FEAT] Address Greptile: protect..." | Re-trigger Greptile

Comment thread workers/queue_backend/pg_queue/liveness.py
…og label

- extra_status_fn could silently clobber core payload fields (status / check /
  age_key / stale_after_seconds) that a monitor reads. Now the handler builds
  extra fields first and overlays the core fields, so core ALWAYS wins — a
  future caller's extra dict can't corrupt the status a monitor parses.
  Test: an extra_status_fn returning {"status":"HACKED",...} leaves status
  "healthy" and check intact, while a non-reserved extra key is preserved.
- The dedup refactor moved the consumer's liveness warnings to the shared
  liveness logger with generic text, so log-based filtering keyed on the old
  "PG-queue consumer: ..." would miss them. Added a log_label param (default
  "pg-queue"); consumer passes "pg-queue consumer", reaper "pg-queue reaper", so
  the messages stay attributable to the source process.

57 tests green (reaper 47 + consumer liveness 10); ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Greptile review addressed — commit b7776c0ee

Both findings valid and fixed:

  • extra_status_fn could clobber core payload fields — the handler now builds the extra fields first and overlays the core fields (status/check/age_key/stale_after_seconds), so core always wins; a future caller's extra dict can't corrupt the status a monitor parses. Test added (an extra_status_fn returning {"status":"HACKED",...} leaves the core intact while preserving a non-reserved extra key).
  • Log namespace/text change from the dedup refactor — good catch: the consumer's liveness warnings moved to the shared liveness logger with generic text. Added a log_label param (default "pg-queue"); the consumer passes "pg-queue consumer" and the reaper "pg-queue reaper", so the messages stay attributable to the source process. (The logger namespace is now …pg_queue.liveness since that's where the shared code lives — but the message text identifies the process.)

57 tests green (reaper 47 + consumer liveness 10); ruff + pre-commit clean. Confidence was 4/5 with these two — both resolved.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit f312388 into feat/UN-3445-pg-queue-integration Jun 16, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3556-reaper-liveness branch June 16, 2026 12:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.