UN-3541 [FEAT] Wire PG-queue consumer into run-worker.sh + bootstrap guard#2047
UN-3541 [FEAT] Wire PG-queue consumer into run-worker.sh + bootstrap guard#2047muhammad-ali-e merged 3 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3541-pg-queue-consumer-launcherZipstack/unstract:UN-3541-pg-queue-consumer-launcherCopy head branch name to clipboard
Conversation
…guard Make the 9c PG-queue consumer (UN-3539) runnable via the normal worker flow, safely. Split from #2045 to keep that PR focused on consumer logic. - Launcher (pg_queue_consumer/__main__.py): set WORKER_TYPE to the source worker (default notification) BEFORE `import worker`, so the right tasks register. worker.py loads exactly one worker type's tasks; a bare import would load the general worker's and drop every notification as unknown. - run-worker.sh: `pg-queue-consumer` type runs `python -m pg_queue_consumer` (not a celery command) from the workers root; queue via env. - Startup guard: PgQueueConsumer.run() refuses to start on an empty task registry — fail loud instead of silently dropping every message. - Drop the hard-coded 8086 health port (consumer runs no health server). Integration fixes found during live dev-test (real send_webhook_notification end-to-end → Slack HTTP 200): - Opt-in status: consumer is not part of `all`; shown only when running. - Log path: detach writes to an absolute $worker_dir/$type.log so -L/-C find it (also fixes the same latent bug for pluggable workers). - PID discovery: get_worker_pids matches the `python -m` invocation, so --status / -k / -r work for the consumer. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
PR Review Toolkit — summary (PR #2047)
Ran six specialized agents (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, PR Test Analyzer, Comment Analyzer, Code Simplifier). The PR is small, well-factored, and unusually well-commented; tests pass and bash -n is clean. No merge-blocking correctness bug found. Findings below are posted inline; three target pre-existing (unchanged) lines outside this diff and so are listed here only:
Pre-existing lines (cannot anchor inline):
- 🟠
consumer.py:248_env()casting —cast(os.getenv(...))raises a bareValueError(e.g.int("abc")) with no var name / error id. Wrap the cast and name the offendingWORKER_PG_QUEUE_CONSUMER_*var. (Loud is correct, but combined with the backgrounded-launch issue it gets swallowed into the log.) - 🟠
consumer.py:248-249_envtyping —def _env(suffix, default, cast: type) -> objecterases types at the typedPgQueueConsumer.__init__boundary and fights the repo's strict mypy. Use aTypeVar:def _env(suffix: str, default: _T, cast: Callable[[str], _T]) -> _T. - 🟡
consumer.py:215-221poll-loopexcept Exception— correctly survives transient blips, but equally swallows permanent faults (schema drift, revoked grants, a_handlebug) into an infinite 0.5 Hz log-spam loop. Count consecutive failures and escalate past a threshold.
Priority order: 🔴 nohup-no-liveness (run-worker.sh:718) → 🟠 wrong-registry observability (consumer.py:185), _env casting/typing → 🟡 opt-in-hides-crash, queue-skew visibility, test gaps, call-chain comment → 🟢 pgrep || true, comment wording.
The Code Simplifier found nothing to change (the two duplication points are intentional, behavior-preserving).
- run-worker.sh: verify liveness after a detached launch (kill -0 + tail); `set -e` doesn't apply to `&`, so a fork that died on startup was reported as "started" — acute for the health-port-less consumer (High). - consumer.py: log the registered application task names at startup so a *wrong* (non-empty but mismatched) registry is diagnosable — the guard only catches an *empty* one (Medium observability). - consumer.py: type _env() with a TypeVar (was `cast: type -> object`, erasing types at the typed __init__) and name the offending var on a bad cast instead of a context-free ValueError (Medium). - tests: add the guard's positive / require_tasks=False bypass / built-in filter arms (only the failure arm was covered). - get_worker_pids: warn on pgrep rc>1 (operational/regex error) instead of collapsing it to "not running" (Low). - run-worker.sh: extract the repeated "pg_queue_consumer" literal into a readonly constant (SonarCloud S1192). - Comment accuracy: build_celery_app configures but does not import tasks; `notification` is the worker that owns the leaf task, not the task itself; generalise the "every notification dropped" example. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Review feedback addressed —
|
|
| Filename | Overview |
|---|---|
| workers/pg_queue_consumer/main.py | New entry-point that overrides WORKER_TYPE before importing worker, guarded behind if __name__ == "__main__" to prevent side-effects on import; addresses the previous review comment about module-level env mutation. |
| workers/pg_queue_consumer/init.py | New package marker with docstring explaining the consumer's bootstrap requirements and launch path; no logic. |
| workers/queue_backend/pg_queue/consumer.py | Adds require_tasks startup guard, startup-log of registered task names, typed _env helper, and improved TypeVar-based type inference; all changes are well-scoped and the guard correctly excludes celery.* builtins. |
| workers/run-worker.sh | Wires pg-queue-consumer as an opt-in worker type: correct pgrep pattern for python -m invocation, absolute log path fix, OPTIN_WORKERS map to suppress spurious STOPPED status, and best-effort crash detection on detach. |
| workers/tests/test_pg_queue_consumer.py | Adds four tests covering: empty-registry refusal, positive start with registered tasks via current_app, require_tasks=False bypass, and builtin-exclusion in _registered_task_count; correctly relies on module-level @shared_task tasks for the positive arm. |
Sequence Diagram
sequenceDiagram
participant Shell as run-worker.sh
participant UV as uv run python
participant Main as pg_queue_consumer.__main__
participant Worker as worker.py (bootstrap)
participant Consumer as PgQueueConsumer.run()
participant PG as pg_queue_message
Shell->>Shell: "export WORKER_TYPE=pg_queue_consumer"
Shell->>Shell: "export WORKER_PG_QUEUE_CONSUMER_QUEUE=notifications"
Shell->>Shell: "export WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=notification"
Shell->>UV: uv run python -m pg_queue_consumer
UV->>Main: "__name__ == "__main__" → _bootstrap_and_run()"
Main->>Main: "os.environ["WORKER_TYPE"] = "notification""
Main->>Worker: import worker (registers notification tasks)
Worker-->>Main: tasks registered in current_app
Main->>Consumer: main() → PgQueueConsumer(...).run()
Consumer->>Consumer: "require_tasks guard: _registered_task_count() > 0?"
alt registry empty (wrong bootstrap)
Consumer-->>Shell: RuntimeError (fail loud)
else tasks present
Consumer->>Consumer: install signal handlers
loop poll loop
Consumer->>PG: SKIP LOCKED claim batch
PG-->>Consumer: messages (or empty)
Consumer->>Consumer: task.apply() in-process
Consumer->>PG: DELETE msg_id (ack)
end
end
Reviews (2): Last reviewed commit: "UN-3541 [FIX] Address Greptile review fe..." | Re-trigger Greptile
- __main__.py: move the WORKER_TYPE mutation + `import worker` bootstrap into a guarded _bootstrap_and_run() called only under `__name__ == "__main__"`, so an accidental import (test/IDE/type-checker walking __main__) no longer overwrites WORKER_TYPE or triggers a full worker-app bootstrap. - run-worker.sh: list `pg-queue-consumer` in the usage/--help worker types, plus a note for its WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE overrides. - run-worker.sh: clarify the post-launch liveness check is a best-effort fast-fail for *immediate* (sub-second) crash-on-import/bad-config faults, not a connectivity check; kept general (an immediate crash can hit any worker) and noted the `all` subshells overlap the 1s with inter-launch sleep. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Greptile feedback addressed —
|
|
All three issues are well-addressed. The No further concerns on these changes. The PR looks good from a review standpoint on these items. |
|
b58ee1e
into
feat/UN-3445-pg-queue-integration
Makes the 9c PG-queue consumer (UN-3539, #2045) runnable through the normal worker flow — and safe to launch. Split out so #2045 stays focused on consumer logic.
Targets the long-lived
feat/UN-3445-pg-queue-integrationbranch (notmain).Why
The consumer is task-name routed: it reads a
task_namefrom each PG message and runs it from the worker app's task registry. So the launch path must (a) bootstrap the right task set and (b) fail loudly if it didn't — otherwise every message is silently dropped as "unknown task".What
Bootstrapping launcher —
workers/pg_queue_consumer/__main__.pyWORKER_TYPEto the source worker (defaultnotification) beforeimport worker. The rootworker.pyloads exactly one worker type'stasks.py; a bare import would load thegeneralworker's tasks and drop every notification. Overridable viaWORKER_PG_QUEUE_CONSUMER_WORKER_TYPE.run-worker.sh wiring
pg-queue-consumerworker type →uv run python -m pg_queue_consumer(a plain Postgres poll loop, not a Celery/RabbitMQ worker), run from the workers root; queue routed viaWORKER_PG_QUEUE_CONSUMER_QUEUE../run-worker.sh all(default behaviour unchanged — the new arch is experimental/flag-gated). Status shows it only when running, so a deliberate non-start isn't reported asSTOPPED.Startup guard —
PgQueueConsumer.run(require_tasks=True)Health-port fix
8086— the consumer runs no health server, so it binds no port (avoids a collision). A liveness endpoint, if added later, reads an env port.Integration fixes found during live dev-test
$worker_dir/$type.log, so-L/-Cfind it (also fixes the same latent bug for pluggable workers, which likewise run from the workers root).get_worker_pidsmatches thepython -minvocation, so--status/-k/-rwork for the consumer.Testing
./run-worker.sh pg-queue-consumerdrained realsend_webhook_notificationmessages from the PG queue → Slack HTTP 200, tasks succeeded, rows deleted.--status(RUNNING/hidden-when-stopped), single-worker-r, and log resolution.Out of scope (rollout)
🤖 Generated with Claude Code