Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3541 [FEAT] Wire PG-queue consumer into run-worker.sh + bootstrap guard#2047

Merged
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3541-pg-queue-consumer-launcherZipstack/unstract:UN-3541-pg-queue-consumer-launcherCopy head branch name to clipboard
Jun 12, 2026
Merged

UN-3541 [FEAT] Wire PG-queue consumer into run-worker.sh + bootstrap guard#2047
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3541-pg-queue-consumer-launcherZipstack/unstract:UN-3541-pg-queue-consumer-launcherCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

Makes the 9c PG-queue consumer (UN-3539, #2045) runnable through the normal worker flow — and safe to launch. Split out so #2045 stays focused on consumer logic.

Targets the long-lived feat/UN-3445-pg-queue-integration branch (not main).

Why

The consumer is task-name routed: it reads a task_name from each PG message and runs it from the worker app's task registry. So the launch path must (a) bootstrap the right task set and (b) fail loudly if it didn't — otherwise every message is silently dropped as "unknown task".

What

Bootstrapping launcherworkers/pg_queue_consumer/__main__.py

  • Sets WORKER_TYPE to the source worker (default notification) before import worker. The root worker.py loads exactly one worker type's tasks.py; a bare import would load the general worker's tasks and drop every notification. Overridable via WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE.

run-worker.sh wiring

  • New pg-queue-consumer worker type → uv run python -m pg_queue_consumer (a plain Postgres poll loop, not a Celery/RabbitMQ worker), run from the workers root; queue routed via WORKER_PG_QUEUE_CONSUMER_QUEUE.
  • Opt-in: not part of ./run-worker.sh all (default behaviour unchanged — the new arch is experimental/flag-gated). Status shows it only when running, so a deliberate non-start isn't reported as STOPPED.

Startup guardPgQueueConsumer.run(require_tasks=True)

  • Refuses to start on an empty task registry (a strong signal the worker app wasn't bootstrapped). Fails loud instead of silently destroying data.

Health-port fix

  • Dropped the hard-coded 8086 — the consumer runs no health server, so it binds no port (avoids a collision). A liveness endpoint, if added later, reads an env port.

Integration fixes found during live dev-test

  • Log path — detached workers now write to an absolute $worker_dir/$type.log, so -L/-C find it (also fixes the same latent bug for pluggable workers, which likewise run from the workers root).
  • PID discoveryget_worker_pids matches the python -m invocation, so --status / -k / -r work for the consumer.

Testing

  • Live end-to-end: ./run-worker.sh pg-queue-consumer drained real send_webhook_notification messages from the PG queue → Slack HTTP 200, tasks succeeded, rows deleted.
  • Unit: new test asserts the startup guard refuses an empty-registry app; full consumer suite green.
  • Verified --status (RUNNING/hidden-when-stopped), single-worker -r, and log resolution.

Out of scope (rollout)

  • Real liveness/health endpoint + K8s Deployment for the consumer; per-org canary; runbook.

🤖 Generated with Claude Code

…guard

Make the 9c PG-queue consumer (UN-3539) runnable via the normal worker
flow, safely. Split from #2045 to keep that PR focused on consumer logic.

- Launcher (pg_queue_consumer/__main__.py): set WORKER_TYPE to the source
  worker (default notification) BEFORE `import worker`, so the right tasks
  register. worker.py loads exactly one worker type's tasks; a bare import
  would load the general worker's and drop every notification as unknown.
- run-worker.sh: `pg-queue-consumer` type runs `python -m pg_queue_consumer`
  (not a celery command) from the workers root; queue via env.
- Startup guard: PgQueueConsumer.run() refuses to start on an empty task
  registry — fail loud instead of silently dropping every message.
- Drop the hard-coded 8086 health port (consumer runs no health server).

Integration fixes found during live dev-test (real send_webhook_notification
end-to-end → Slack HTTP 200):
- Opt-in status: consumer is not part of `all`; shown only when running.
- Log path: detach writes to an absolute $worker_dir/$type.log so -L/-C
  find it (also fixes the same latent bug for pluggable workers).
- PID discovery: get_worker_pids matches the `python -m` invocation, so
  --status / -k / -r work for the consumer.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bbf9d843-231f-459e-ad7e-9eb7505ce974

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3541-pg-queue-consumer-launcher

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review Toolkit — summary (PR #2047)

Ran six specialized agents (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, PR Test Analyzer, Comment Analyzer, Code Simplifier). The PR is small, well-factored, and unusually well-commented; tests pass and bash -n is clean. No merge-blocking correctness bug found. Findings below are posted inline; three target pre-existing (unchanged) lines outside this diff and so are listed here only:

Pre-existing lines (cannot anchor inline):

  • 🟠 consumer.py:248 _env() castingcast(os.getenv(...)) raises a bare ValueError (e.g. int("abc")) with no var name / error id. Wrap the cast and name the offending WORKER_PG_QUEUE_CONSUMER_* var. (Loud is correct, but combined with the backgrounded-launch issue it gets swallowed into the log.)
  • 🟠 consumer.py:248-249 _env typingdef _env(suffix, default, cast: type) -> object erases types at the typed PgQueueConsumer.__init__ boundary and fights the repo's strict mypy. Use a TypeVar: def _env(suffix: str, default: _T, cast: Callable[[str], _T]) -> _T.
  • 🟡 consumer.py:215-221 poll-loop except Exception — correctly survives transient blips, but equally swallows permanent faults (schema drift, revoked grants, a _handle bug) into an infinite 0.5 Hz log-spam loop. Count consecutive failures and escalate past a threshold.

Priority order: 🔴 nohup-no-liveness (run-worker.sh:718) → 🟠 wrong-registry observability (consumer.py:185), _env casting/typing → 🟡 opt-in-hides-crash, queue-skew visibility, test gaps, call-chain comment → 🟢 pgrep || true, comment wording.

The Code Simplifier found nothing to change (the two duplication points are intentional, behavior-preserving).

Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/run-worker.sh Outdated
Comment thread workers/run-worker.sh
Comment thread workers/run-worker.sh Outdated
Comment thread workers/run-worker.sh Outdated
Comment thread workers/pg_queue_consumer/__main__.py Outdated
Comment thread workers/pg_queue_consumer/__main__.py
Comment thread workers/pg_queue_consumer/__main__.py
- run-worker.sh: verify liveness after a detached launch (kill -0 + tail);
  `set -e` doesn't apply to `&`, so a fork that died on startup was reported
  as "started" — acute for the health-port-less consumer (High).
- consumer.py: log the registered application task names at startup so a
  *wrong* (non-empty but mismatched) registry is diagnosable — the guard only
  catches an *empty* one (Medium observability).
- consumer.py: type _env() with a TypeVar (was `cast: type -> object`,
  erasing types at the typed __init__) and name the offending var on a bad
  cast instead of a context-free ValueError (Medium).
- tests: add the guard's positive / require_tasks=False bypass / built-in
  filter arms (only the failure arm was covered).
- get_worker_pids: warn on pgrep rc>1 (operational/regex error) instead of
  collapsing it to "not running" (Low).
- run-worker.sh: extract the repeated "pg_queue_consumer" literal into a
  readonly constant (SonarCloud S1192).
- Comment accuracy: build_celery_app configures but does not import tasks;
  `notification` is the worker that owns the leaf task, not the task itself;
  generalise the "every notification dropped" example.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review feedback addressed — 2f717c6d1

Thanks for the thorough pass. Validity-checked each finding; here's the disposition.

Fixed

  • 🔴 nohup no-liveness (run-worker.sh) — detached launch now verifies kill -0 after sleep 1, prints the log tail + returns non-zero on a dead fork. Applied to the shared detach path (benefits every worker).
  • 🟠 wrong-registry observability (consumer.py) — run() logs the registered application task names at startup, so a non-empty-but-mismatched registry is diagnosable from one line (… 7 application task(s) registered: …, send_webhook_notification).
  • 🟠 _env casting + typingTypeVar-typed (was cast: type -> object, erasing types at the typed __init__); a bad value now raises Invalid WORKER_PG_QUEUE_CONSUMER_<X>=… instead of a context-free ValueError.
  • 🟡 test gaps — added the guard's positive / require_tasks=False bypass / built-in-filter arms.
  • 🟢 pgrep || true — warns on rc > 1 (operational/regex error) instead of collapsing it to "not running".
  • 🟢 comment accuracy ×3 — build_celery_app configures but doesn't import tasks; notification is the worker that owns the leaf task send_webhook_notification; generalised the "every notification dropped" example.
  • SonarCloud S1192 — extracted the repeated "pg_queue_consumer" literal into a readonly constant.

Deferred (with rationale, replied inline)

  • 🟡 opt-in-hides-crash — start-time crashes now caught by the liveness check; steady-state (died-later) detection needs the future health endpoint (already a rollout item).
  • 🟡 queue-skew — producer/consumer queues are verified-aligned for the one migrated task; cross-queue detection belongs with the second leaf migration (don't want untested cross-queue logic now).
  • 🟡 poll-loop escalation past a failure threshold — pre-existing 9c (UN-3539 [FEAT] PG Queue Phase 9c — consumer poll loop (claim → run → ack) #2045) behaviour, and it changes crash semantics (exit-on-sustained-fault vs log-and-continue) — worth its own discussion/PR rather than bundling here.

Re-verified live after the changes: ./run-worker.sh -r -d pg-queue-consumer → liveness passes, --status shows RUNNING, startup log lists the 7 notification tasks. Consumer test suite green.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 12, 2026 09:53
@greptile-apps

greptile-apps Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR wires the PG-queue consumer into the normal worker launch flow (run-worker.sh) and adds a startup guard that refuses to run against an empty task registry, preventing silent message loss from a mis-bootstrapped worker app.

  • __main__.py overrides WORKER_TYPE before import worker so the right source worker's tasks are loaded; the env mutation is safely gated behind if __name__ == "__main__", addressing the previous review comment about module-level side-effects.
  • consumer.py gains a require_tasks guard, a startup log of registered task names (to catch a wrong-type bootstrap that slips past the empty-registry check), and a properly typed _env helper.
  • run-worker.sh adds pg-queue-consumer as an opt-in worker type with a dedicated pgrep pattern for python -m invocations, an absolute log path so -L/-C work regardless of cwd, and an OPTIN_WORKERS map that suppresses spurious STOPPED status after an all launch.

Confidence Score: 5/5

Safe to merge — the consumer is opt-in, default all behaviour is unchanged, and the startup guard makes misconfiguration loud rather than silent.

All three changed subsystems are coherent: the __main__.py env override is correctly guarded, the require_tasks guard fires before any polling begins and is bypassed by the test suite via current_app with the module-level @shared_task tasks, and the shell wiring (pgrep pattern, absolute log path, OPTIN_WORKERS) is consistent with how pluggable workers are handled. The previous review comments about module-level side-effects and the missing usage-help entry have both been addressed in this revision.

No files require special attention.

Important Files Changed

Filename Overview
workers/pg_queue_consumer/main.py New entry-point that overrides WORKER_TYPE before importing worker, guarded behind if __name__ == "__main__" to prevent side-effects on import; addresses the previous review comment about module-level env mutation.
workers/pg_queue_consumer/init.py New package marker with docstring explaining the consumer's bootstrap requirements and launch path; no logic.
workers/queue_backend/pg_queue/consumer.py Adds require_tasks startup guard, startup-log of registered task names, typed _env helper, and improved TypeVar-based type inference; all changes are well-scoped and the guard correctly excludes celery.* builtins.
workers/run-worker.sh Wires pg-queue-consumer as an opt-in worker type: correct pgrep pattern for python -m invocation, absolute log path fix, OPTIN_WORKERS map to suppress spurious STOPPED status, and best-effort crash detection on detach.
workers/tests/test_pg_queue_consumer.py Adds four tests covering: empty-registry refusal, positive start with registered tasks via current_app, require_tasks=False bypass, and builtin-exclusion in _registered_task_count; correctly relies on module-level @shared_task tasks for the positive arm.

Sequence Diagram

sequenceDiagram
    participant Shell as run-worker.sh
    participant UV as uv run python
    participant Main as pg_queue_consumer.__main__
    participant Worker as worker.py (bootstrap)
    participant Consumer as PgQueueConsumer.run()
    participant PG as pg_queue_message

    Shell->>Shell: "export WORKER_TYPE=pg_queue_consumer"
    Shell->>Shell: "export WORKER_PG_QUEUE_CONSUMER_QUEUE=notifications"
    Shell->>Shell: "export WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=notification"
    Shell->>UV: uv run python -m pg_queue_consumer
    UV->>Main: "__name__ == "__main__" → _bootstrap_and_run()"
    Main->>Main: "os.environ["WORKER_TYPE"] = "notification""
    Main->>Worker: import worker (registers notification tasks)
    Worker-->>Main: tasks registered in current_app
    Main->>Consumer: main() → PgQueueConsumer(...).run()
    Consumer->>Consumer: "require_tasks guard: _registered_task_count() > 0?"
    alt registry empty (wrong bootstrap)
        Consumer-->>Shell: RuntimeError (fail loud)
    else tasks present
        Consumer->>Consumer: install signal handlers
        loop poll loop
            Consumer->>PG: SKIP LOCKED claim batch
            PG-->>Consumer: messages (or empty)
            Consumer->>Consumer: task.apply() in-process
            Consumer->>PG: DELETE msg_id (ack)
        end
    end
Loading

Reviews (2): Last reviewed commit: "UN-3541 [FIX] Address Greptile review fe..." | Re-trigger Greptile

Comment thread workers/run-worker.sh
Comment thread workers/pg_queue_consumer/__main__.py Outdated
- __main__.py: move the WORKER_TYPE mutation + `import worker` bootstrap into
  a guarded _bootstrap_and_run() called only under `__name__ == "__main__"`,
  so an accidental import (test/IDE/type-checker walking __main__) no longer
  overwrites WORKER_TYPE or triggers a full worker-app bootstrap.
- run-worker.sh: list `pg-queue-consumer` in the usage/--help worker types,
  plus a note for its WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE overrides.
- run-worker.sh: clarify the post-launch liveness check is a best-effort
  fast-fail for *immediate* (sub-second) crash-on-import/bad-config faults,
  not a connectivity check; kept general (an immediate crash can hit any
  worker) and noted the `all` subshells overlap the 1s with inter-launch sleep.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Greptile feedback addressed — 5bec0f8b4

Thanks @greptile-apps (confidence 4/5). All three P2 findings handled:

  • Module-level side-effects in __main__.py — moved the WORKER_TYPE mutation + import worker into a guarded _bootstrap_and_run() called only under if __name__ == "__main__":. Accidental imports are now inert (verified: import leaves WORKER_TYPE untouched and doesn't bootstrap worker; python -m path unchanged).
  • usage() help text (outside-diff comment) — added pg-queue-consumer to the worker-type list plus a note documenting the WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE overrides.
  • Liveness check scope — kept general but clarified in the comment that it's a best-effort fast-fail for immediate (sub-second) crashes (import/config faults, which affect any worker), explicitly not a connectivity check. Left the sleep at 1s (bumping it trades latency for only partial slow-failure coverage, which needs the real health endpoint); all is unaffected since those subshells overlap the 1s with the inter-launch sleep. Rationale on the thread.

Re-verified live after the changes: python -m pg_queue_consumer boots the 7 notification tasks, --status → RUNNING, --help lists the worker.

@greptile-apps

greptile-apps Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

All three issues are well-addressed. The __main__.py fix is clean — _bootstrap_and_run() correctly encapsulates both the env mutation and the import side-effect, and the guard ensures tests/IDEs importing the module stay inert. The docstring and inline comment both do a good job explaining the "why" for the os.environ overwrite (vs setdefault).

No further concerns on these changes. The PR looks good from a review standpoint on these items.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit b58ee1e into feat/UN-3445-pg-queue-integration Jun 12, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3541-pg-queue-consumer-launcher branch June 12, 2026 10:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.