Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3544 [FEAT] PG-queue consumer liveness endpoint (poll-loop heartbeat)#2051

Merged
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3544-pg-queue-consumer-healthZipstack/unstract:UN-3544-pg-queue-consumer-healthCopy head branch name to clipboard
Jun 12, 2026
Merged

UN-3544 [FEAT] PG-queue consumer liveness endpoint (poll-loop heartbeat)#2051
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3544-pg-queue-consumer-healthZipstack/unstract:UN-3544-pg-queue-consumer-healthCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

Gives the PG-queue consumer a /health HTTP endpoint for K8s liveness probing — the same pattern every other worker exposes (8080–8089) — keyed on a poll-loop heartbeat. Targets feat/UN-3445-pg-queue-integration.

Why

The consumer had no health server. This closes the recurring review thread from #2047 (toolkit "opt-in-hides-crash" steady-state gap + Greptile "no health port") properly: the heartbeat detects a stuck/wedged poll loop (process alive but not draining) — which neither run-worker.sh --status (pgrep) nor the launch-liveness check can see. It also pre-positions the K8s rollout, where a liveness probe needs a known endpoint.

What

  • consumer.py_last_poll_monotonic heartbeat, refreshed at the top of poll_once (so a loop wedged on a long-running task goes stale). seconds_since_last_poll() / is_poll_stale(). main() starts a LivenessServer when WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT is set (opt-in), stops it on exit.
  • LivenessServer — tiny stdlib HTTP server (/health, /healthz, /livez) → 200 while fresh, 503 once stale. Deliberately lean: a liveness probe must report only "is this process making progress?", not broker/API reachability or resource pressure — coupling those would crash-loop a healthy consumer on a transient blip.
  • run-worker.sh — default port 8090 (outside the 8080–8089 core range → no collision), exported opt-in, documented in --help. Bare python -m pg_queue_consumer binds nothing.

Design notes

  • Fixed-but-configurable port (not dynamic): a probe must hit a known port; a dynamically-chosen one can't be probed. In K8s each container has its own network namespace, so 8090 is per-container (no host collision); the Deployment sets the probe port. Overridable locally via the env / -p.
  • Why not reuse the shared HealthChecker/HealthServer: besides the liveness-semantics point above, its built-in api_connectivity check has a real import bug — from .api_client_singleton resolves to a non-existent shared.infrastructure.monitoring.api_client_singleton (the module is at shared.utils.api_client_singleton), so it returns 503 for any checker without a pre-set api_client. Reusing it would ship a broken endpoint. Filed separately (fleet-wide).

Testing

  • Unit: heartbeat fresh/stale + is_poll_stale threshold + a real bind-and-GET test asserting 200 (healthy) then 503 (forced stale). 23/23 non-integration tests green.
  • Live: curl :8090/health200 {"status":"healthy","check":"pg_queue_poll",...}; unknown path → 404; no port set → no server.

Out of scope

  • The K8s Deployment manifest + probe wiring (rollout) — this provides the endpoint they'll point at.
  • The shared health.py api_connectivity import bug (separate ticket).

🤖 Generated with Claude Code

Give the consumer a /health HTTP endpoint for K8s liveness probing, like
every other worker — but keyed only on a poll-loop heartbeat.

- consumer.py: track _last_poll_monotonic (refreshed at the top of poll_once,
  so a loop wedged on a long task goes stale and is detectable — which
  pgrep-based --status and the launch-liveness check cannot see). Expose
  seconds_since_last_poll() / is_poll_stale(). main() starts a LivenessServer
  when WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT is set (opt-in), stops it on exit.
- LivenessServer: tiny stdlib HTTP server (/health, /healthz, /livez) → 200
  while fresh, 503 once stale. Deliberately lean: a liveness probe must report
  only "is this process making progress?", NOT broker/API reachability or
  resource pressure (those would crash-loop a healthy consumer on a blip).
  So it does NOT reuse the shared HealthChecker (which also bundles an
  api_connectivity check that is both wrong for liveness and currently broken
  — its `from .api_client_singleton` import points at a non-existent module;
  tracked separately).
- run-worker.sh: default port 8090 (outside the 8080-8089 core range, no
  collision), exported opt-in; documented in --help.
- tests: heartbeat fresh/stale + a real bind-and-GET 200->503 endpoint test.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7e0be654-b42d-41cd-a683-b603bc630645

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3544-pg-queue-consumer-health

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review Toolkit — Summary (UN-3544 PG-queue consumer liveness)

Ran 6 specialised agents (Code Reviewer, Silent Failure Hunter, Type Design, Test Analyzer, Comment Analyzer, Code Simplifier) over the 3-file diff. 14 inline findings posted. No correctness bug that risks data loss; the heartbeat/clock design is sound (monotonic, GIL-atomic, signals install fine on the main thread). Highlights:

Important (4) — fix before merge:

  • Liveness bind failure (OSError/port-in-use) aborts the whole consumer before it polls — an auxiliary probe shouldn't kill the worker. (consumer.py:337)
  • Static port 8090 collides with the first auto-discovered pluggable worker (8090 + discovered_count); the new comment is inaccurate. (run-worker.sh:90)
  • stop() in the finally can mask the real run() exception. (consumer.py:422)
  • Empty WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT crashes at launch instead of disabling the probe. (consumer.py:311)

Suggestions (10) — error logging in the HTTP handler/thread, Any-typed attrs defeating type-checking, double start() leak, double clock read, status-line shows map default not -p, a design check on the heartbeat staying green during a fast-failing loop, plus 2 test gaps (404 + /healthz//livez aliases, and a test that actually pins the top-of-poll stamp).

Test additions are solid (real-socket 200→503, construction-time seeding, threshold boundaries). Comments are unusually accurate — every load-bearing claim verified true.

Comment thread workers/queue_backend/pg_queue/consumer.py Outdated
Comment thread workers/run-worker.sh
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/consumer.py Outdated
Comment thread workers/queue_backend/pg_queue/consumer.py Outdated
Comment thread workers/run-worker.sh
Comment thread workers/tests/test_pg_queue_consumer.py
Comment thread workers/tests/test_pg_queue_consumer.py
Important:
- Liveness bind failure (OSError/port-in-use) now degrades gracefully: log
  and continue probe-less instead of aborting the consumer before it polls.
  Verified live (2nd consumer on a taken :8090 keeps draining).
- run-worker.sh: 8090 collided with the first auto-discovered pluggable worker
  (8090 + count); pluggable discovery now starts at 8091, 8090 reserved.
- LivenessServer.stop() is defensive (can't raise into main()'s finally and
  mask the real run() exception) and warns if the thread outlives the join.
- Empty WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT now hits the clean opt-out
  (_env treats "" as unset) instead of int("") crashing at launch.

Suggestions:
- LivenessServer: guard double start(); reset state in stop(); wrap
  serve_forever so a thread crash is logged; route handler errors to the
  logger (log_message=pass was hiding log_error too); guard wfile.write
  against client disconnects; single clock read per request.
- Type _httpd/_thread as HTTPServer|None / Thread|None via TYPE_CHECKING
  (drops the Any import); restores static checking.
- run-worker.sh: status line shows the effective -p override, not the map default.
- Docstrings: phrase the helper trigger in terms of `port`; note 0.0.0.0 bind;
  document that a fast-failing loop stays healthy by design (liveness must not
  couple to backend reachability).
- tests: heartbeat-stamped-before-read (pins top-of-poll), /healthz + /livez
  aliases + unknown-path 404, double-start rejection.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review feedback addressed — ea0474cc2

Thanks for the thorough pass — all 14 findings handled, threads replied inline.

Important (4) — all fixed

  • 🔴 Bind failure no longer aborts the consumer — degrades to probe-less and keeps draining (verified live: 2nd consumer on a taken :8090 logs the failure and still polls).
  • 🔴 Port collision — pluggable discovery starts at 8091 now; 8090 reserved for the consumer.
  • 🔴 stop() is defensive (can't mask run()'s exception in the finally) + warns on a lingering thread.
  • 🔴 Empty HEALTH_PORT hits the clean opt-out (_env treats "" as unset) instead of crashing.

Suggestions (10) — all addressed

  • LivenessServer hardening: double-start guard + reset-on-stop, serve_forever crash logged, log_error routed to the logger (was hidden by log_message=pass), wfile.write guarded against client disconnects, single clock read.
  • Types: _httpd/_threadHTTPServer|None / Thread|None (dropped Any).
  • run-worker.sh: status line shows the effective -p override.
  • Docstrings: helper trigger phrased on port; 0.0.0.0 documented; design check confirmed — a fast-failing loop stays green by design (liveness must not couple to backend reachability; rationale now in is_poll_stale).
  • Tests: top-of-poll stamp test, /healthz+/livez+404 coverage, double-start rejection.

Verified: 8/8 heartbeat/liveness unit tests green; live /health → 200; bind-conflict → graceful degrade. Pre-commit clean.

- consumer.py: use logger.exception() in the liveness-bind except block
  (preserves the traceback; S6679).
- test: lift the walrus assignment out of the PgQueueConsumer() argument
  list — plain `client = MagicMock()` first (clearer; S6328).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud issues addressed — c2a62fcc2

  • consumer.py (Major) — switched the liveness-bind except block from logger.error to logger.exception (preserves the traceback).
  • test_pg_queue_consumer.py (Minor) — lifted the walrus := out of the PgQueueConsumer(...) argument list into a plain client = MagicMock() first.

Heartbeat/run-loop tests still green; pre-commit clean.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 12, 2026 11:21
@greptile-apps

greptile-apps Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

Adds a lightweight HTTP liveness endpoint (/health, /healthz, /livez) to the PG-queue consumer, backed by a poll-loop heartbeat. The server is opt-in via WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT, starts in a daemon thread, and reports 200/503 based on how long ago poll_once() last ran.

  • consumer.py — adds _last_poll_monotonic heartbeat stamped at the top of poll_once, a LivenessServer class using http.server, and a _maybe_start_health_server helper with graceful bind-failure degradation.
  • run-worker.sh — reserves port 8090 for the consumer and bumps pluggable-worker auto-allocation to start at 8091 to avoid collision.
  • tests/test_pg_queue_consumer.py — adds 8 new unit tests covering heartbeat freshness/staleness, all probe aliases, query-string stripping, double-start guard, and a real bind-and-GET integration test.

Confidence Score: 5/5

Safe to merge — the change is purely additive, opt-in by default, and cannot break the consumer's drain loop even on a bind failure.

The liveness server runs in a daemon thread independent of the poll loop; a bind failure degrades gracefully. The heartbeat is a single float write protected by the GIL, query-string stripping is in place, stop() correctly calls shutdown() before server_close(), and the port reservation in run-worker.sh avoids collision with pluggable workers.

No files require special attention.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/consumer.py Adds heartbeat stamp, LivenessServer class, and _maybe_start_health_server helper; query-string stripping, graceful bind-failure, and stop() ordering all look correct.
workers/run-worker.sh Reserves port 8090 for the pg-queue consumer and bumps pluggable-worker auto-allocation to 8091; help text updated accordingly.
workers/tests/test_pg_queue_consumer.py 8 new tests covering heartbeat freshness, staleness thresholds, real HTTP 200/503 responses, all path aliases, query-string handling, double-start guard, and restart-after-stop.

Sequence Diagram

sequenceDiagram
    participant main
    participant PgQueueConsumer
    participant LivenessServer
    participant DaemonThread
    participant K8sProbe

    main->>PgQueueConsumer: construct (seeds _last_poll_monotonic)
    main->>LivenessServer: _maybe_start_health_server(port, stale_after)
    LivenessServer->>DaemonThread: start (HTTPServer on :8090)
    main->>PgQueueConsumer: run() → poll loop

    loop poll_once()
        PgQueueConsumer->>PgQueueConsumer: stamp _last_poll_monotonic
        PgQueueConsumer->>PgQueueConsumer: read + handle tasks
    end

    K8sProbe->>DaemonThread: GET /health
    DaemonThread->>PgQueueConsumer: seconds_since_last_poll()
    alt age ≤ stale_after
        DaemonThread-->>K8sProbe: "200 {status: healthy}"
    else "age > stale_after"
        DaemonThread-->>K8sProbe: "503 {status: unhealthy}"
    end

    main->>LivenessServer: stop() [finally]
    LivenessServer->>DaemonThread: httpd.shutdown() → join(5s)
Loading

Reviews (2): Last reviewed commit: "UN-3544 [FIX] Address Greptile review fe..." | Re-trigger Greptile

Comment thread workers/queue_backend/pg_queue/consumer.py Outdated
- LivenessServer handler strips the query string before matching paths
  (self.path includes it), so /health?probe=k8s returns 200 not 404. Added
  a query-string case to the alias test.
- Document the stale-threshold trade-off prominently: the heartbeat is frozen
  during task execution, so WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS is
  also an upper bound on single-task wall-clock (a longer task trips the probe
  → restart → redelivery). 60s suits the sub-second leaf; raise it above
  max(batch_size x worst_case_task_seconds, backoff_max) for longer tasks.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Greptile feedback addressed — b54efabcc

Thanks @greptile-apps (4/5). Both findings handled:

  • Query string → 404 (consumer.py:413) — fixed. Handler now matches urlsplit(self.path).path, so /health?probe=k8s → 200. Covered by a new query-string case in the alias test.
  • Long-running tasks trip the probe (outside-diff) — working-as-designed, now documented prominently. Stamping at the top of poll_once and freezing during task execution is deliberate (it's what lets a genuinely wedged loop be detected). The consequence — WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS doubles as an upper bound on single-task wall-clock — is now spelled out on _DEFAULT_HEALTH_STALE_SECONDS with your formula: raise it above max(batch_size × worst_case_task_seconds, backoff_max) for longer tasks. The 60s default is safe for the only currently-routed leaf (send_webhook_notification, sub-second); longer tasks belong to the pipeline phase, which will set its own threshold.

8/8 heartbeat/liveness tests green; pre-commit clean.

@greptile-apps

greptile-apps Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Both fixes look correct. The urlsplit(self.path).path strip handles query strings cleanly, and the stale-threshold documentation on _DEFAULT_HEALTH_STALE_SECONDS is exactly the right place for it — it's the first thing an operator will read when tuning.

A couple of small observations on the final state:

test_double_start_is_rejected — the server.start() / server.stop() after the RuntimeError block tests restart-after-stop, which is a nice extra. Worth confirming stop() actually resets _httpd and _thread to None — it does (finally: self._httpd = None; self._thread = None), so that path is safe.

_maybe_start_health_server swallowing OSError — already deliberate and documented ("the probe is auxiliary"), just confirming the graceful-degrade path is the right call here. It is.

Nothing blocking. The implementation is clean, the test matrix is thorough (fresh/stale threshold, real bind-and-GET, alias+query-string, 404 guard, double-start, restart-after-stop), and the run-worker.sh port reservation/collision notes are clear. LGTM.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit 9d7124f into feat/UN-3445-pg-queue-integration Jun 12, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3544-pg-queue-consumer-health branch June 12, 2026 11:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.