UN-3544 [FEAT] PG-queue consumer liveness endpoint (poll-loop heartbeat)#2051
UN-3544 [FEAT] PG-queue consumer liveness endpoint (poll-loop heartbeat)#2051muhammad-ali-e merged 4 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3544-pg-queue-consumer-healthZipstack/unstract:UN-3544-pg-queue-consumer-healthCopy head branch name to clipboard
Conversation
Give the consumer a /health HTTP endpoint for K8s liveness probing, like every other worker — but keyed only on a poll-loop heartbeat. - consumer.py: track _last_poll_monotonic (refreshed at the top of poll_once, so a loop wedged on a long task goes stale and is detectable — which pgrep-based --status and the launch-liveness check cannot see). Expose seconds_since_last_poll() / is_poll_stale(). main() starts a LivenessServer when WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT is set (opt-in), stops it on exit. - LivenessServer: tiny stdlib HTTP server (/health, /healthz, /livez) → 200 while fresh, 503 once stale. Deliberately lean: a liveness probe must report only "is this process making progress?", NOT broker/API reachability or resource pressure (those would crash-loop a healthy consumer on a blip). So it does NOT reuse the shared HealthChecker (which also bundles an api_connectivity check that is both wrong for liveness and currently broken — its `from .api_client_singleton` import points at a non-existent module; tracked separately). - run-worker.sh: default port 8090 (outside the 8080-8089 core range, no collision), exported opt-in; documented in --help. - tests: heartbeat fresh/stale + a real bind-and-GET 200->503 endpoint test. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
PR Review Toolkit — Summary (UN-3544 PG-queue consumer liveness)
Ran 6 specialised agents (Code Reviewer, Silent Failure Hunter, Type Design, Test Analyzer, Comment Analyzer, Code Simplifier) over the 3-file diff. 14 inline findings posted. No correctness bug that risks data loss; the heartbeat/clock design is sound (monotonic, GIL-atomic, signals install fine on the main thread). Highlights:
Important (4) — fix before merge:
- Liveness bind failure (
OSError/port-in-use) aborts the whole consumer before it polls — an auxiliary probe shouldn't kill the worker. (consumer.py:337) - Static port
8090collides with the first auto-discovered pluggable worker (8090 + discovered_count); the new comment is inaccurate. (run-worker.sh:90) stop()in thefinallycan mask the realrun()exception. (consumer.py:422)- Empty
WORKER_PG_QUEUE_CONSUMER_HEALTH_PORTcrashes at launch instead of disabling the probe. (consumer.py:311)
Suggestions (10) — error logging in the HTTP handler/thread, Any-typed attrs defeating type-checking, double start() leak, double clock read, status-line shows map default not -p, a design check on the heartbeat staying green during a fast-failing loop, plus 2 test gaps (404 + /healthz//livez aliases, and a test that actually pins the top-of-poll stamp).
Test additions are solid (real-socket 200→503, construction-time seeding, threshold boundaries). Comments are unusually accurate — every load-bearing claim verified true.
Important:
- Liveness bind failure (OSError/port-in-use) now degrades gracefully: log
and continue probe-less instead of aborting the consumer before it polls.
Verified live (2nd consumer on a taken :8090 keeps draining).
- run-worker.sh: 8090 collided with the first auto-discovered pluggable worker
(8090 + count); pluggable discovery now starts at 8091, 8090 reserved.
- LivenessServer.stop() is defensive (can't raise into main()'s finally and
mask the real run() exception) and warns if the thread outlives the join.
- Empty WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT now hits the clean opt-out
(_env treats "" as unset) instead of int("") crashing at launch.
Suggestions:
- LivenessServer: guard double start(); reset state in stop(); wrap
serve_forever so a thread crash is logged; route handler errors to the
logger (log_message=pass was hiding log_error too); guard wfile.write
against client disconnects; single clock read per request.
- Type _httpd/_thread as HTTPServer|None / Thread|None via TYPE_CHECKING
(drops the Any import); restores static checking.
- run-worker.sh: status line shows the effective -p override, not the map default.
- Docstrings: phrase the helper trigger in terms of `port`; note 0.0.0.0 bind;
document that a fast-failing loop stays healthy by design (liveness must not
couple to backend reachability).
- tests: heartbeat-stamped-before-read (pins top-of-poll), /healthz + /livez
aliases + unknown-path 404, double-start rejection.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Review feedback addressed —
|
- consumer.py: use logger.exception() in the liveness-bind except block (preserves the traceback; S6679). - test: lift the walrus assignment out of the PgQueueConsumer() argument list — plain `client = MagicMock()` first (clearer; S6328). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
SonarCloud issues addressed —
|
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/consumer.py | Adds heartbeat stamp, LivenessServer class, and _maybe_start_health_server helper; query-string stripping, graceful bind-failure, and stop() ordering all look correct. |
| workers/run-worker.sh | Reserves port 8090 for the pg-queue consumer and bumps pluggable-worker auto-allocation to 8091; help text updated accordingly. |
| workers/tests/test_pg_queue_consumer.py | 8 new tests covering heartbeat freshness, staleness thresholds, real HTTP 200/503 responses, all path aliases, query-string handling, double-start guard, and restart-after-stop. |
Sequence Diagram
sequenceDiagram
participant main
participant PgQueueConsumer
participant LivenessServer
participant DaemonThread
participant K8sProbe
main->>PgQueueConsumer: construct (seeds _last_poll_monotonic)
main->>LivenessServer: _maybe_start_health_server(port, stale_after)
LivenessServer->>DaemonThread: start (HTTPServer on :8090)
main->>PgQueueConsumer: run() → poll loop
loop poll_once()
PgQueueConsumer->>PgQueueConsumer: stamp _last_poll_monotonic
PgQueueConsumer->>PgQueueConsumer: read + handle tasks
end
K8sProbe->>DaemonThread: GET /health
DaemonThread->>PgQueueConsumer: seconds_since_last_poll()
alt age ≤ stale_after
DaemonThread-->>K8sProbe: "200 {status: healthy}"
else "age > stale_after"
DaemonThread-->>K8sProbe: "503 {status: unhealthy}"
end
main->>LivenessServer: stop() [finally]
LivenessServer->>DaemonThread: httpd.shutdown() → join(5s)
Reviews (2): Last reviewed commit: "UN-3544 [FIX] Address Greptile review fe..." | Re-trigger Greptile
- LivenessServer handler strips the query string before matching paths (self.path includes it), so /health?probe=k8s returns 200 not 404. Added a query-string case to the alias test. - Document the stale-threshold trade-off prominently: the heartbeat is frozen during task execution, so WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS is also an upper bound on single-task wall-clock (a longer task trips the probe → restart → redelivery). 60s suits the sub-second leaf; raise it above max(batch_size x worst_case_task_seconds, backoff_max) for longer tasks. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Greptile feedback addressed —
|
|
Both fixes look correct. The A couple of small observations on the final state:
Nothing blocking. The implementation is clean, the test matrix is thorough (fresh/stale threshold, real bind-and-GET, alias+query-string, 404 guard, double-start, restart-after-stop), and the run-worker.sh port reservation/collision notes are clear. LGTM. |
|
9d7124f
into
feat/UN-3445-pg-queue-integration
Gives the PG-queue consumer a
/healthHTTP endpoint for K8s liveness probing — the same pattern every other worker exposes (8080–8089) — keyed on a poll-loop heartbeat. Targetsfeat/UN-3445-pg-queue-integration.Why
The consumer had no health server. This closes the recurring review thread from #2047 (toolkit "opt-in-hides-crash" steady-state gap + Greptile "no health port") properly: the heartbeat detects a stuck/wedged poll loop (process alive but not draining) — which neither
run-worker.sh --status(pgrep) nor the launch-liveness check can see. It also pre-positions the K8s rollout, where a liveness probe needs a known endpoint.What
consumer.py—_last_poll_monotonicheartbeat, refreshed at the top ofpoll_once(so a loop wedged on a long-running task goes stale).seconds_since_last_poll()/is_poll_stale().main()starts aLivenessServerwhenWORKER_PG_QUEUE_CONSUMER_HEALTH_PORTis set (opt-in), stops it on exit.LivenessServer— tiny stdlib HTTP server (/health,/healthz,/livez) → 200 while fresh, 503 once stale. Deliberately lean: a liveness probe must report only "is this process making progress?", not broker/API reachability or resource pressure — coupling those would crash-loop a healthy consumer on a transient blip.run-worker.sh— default port 8090 (outside the 8080–8089 core range → no collision), exported opt-in, documented in--help. Barepython -m pg_queue_consumerbinds nothing.Design notes
-p.HealthChecker/HealthServer: besides the liveness-semantics point above, its built-inapi_connectivitycheck has a real import bug —from .api_client_singletonresolves to a non-existentshared.infrastructure.monitoring.api_client_singleton(the module is atshared.utils.api_client_singleton), so it returns 503 for any checker without a pre-setapi_client. Reusing it would ship a broken endpoint. Filed separately (fleet-wide).Testing
is_poll_stalethreshold + a real bind-and-GET test asserting200(healthy) then503(forced stale). 23/23 non-integration tests green.curl :8090/health→200 {"status":"healthy","check":"pg_queue_poll",...}; unknown path → 404; no port set → no server.Out of scope
health.pyapi_connectivityimport bug (separate ticket).🤖 Generated with Claude Code