UN-3534 [FEAT] PG Queue Phase 8a — queue-transport routing gate + scaffold#2033
UN-3534 [FEAT] PG Queue Phase 8a — queue-transport routing gate + scaffold#2033muhammad-ali-e merged 4 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3534-pg-queue-routing-scaffoldZipstack/unstract:UN-3534-pg-queue-routing-scaffoldCopy head branch name to clipboard
Conversation
…ffold
Add the Strangler-Fig routing seam that lets PG Queue (PGMQ) coexist with
Celery so task types can be migrated one at a time. Scaffold only: the PG
branch is a Celery-routing stub (no PG consumer exists yet), so this is
zero-behaviour-change by construction.
- queue_backend/routing.py: QueueBackend{CELERY,PG} + select_backend(task_name)
reading the WORKER_PG_QUEUE_ENABLED_TASKS allow-list (default empty -> all
Celery). Tolerant CSV parsing; never raises.
- dispatch(): consults select_backend(); PG-selected tasks are logged but
still dispatched via Celery. The send_task call sits outside the PG branch
so the wire is byte-identical regardless of the routing decision.
- queue_backend/pg_queue/: scaffold subpackage. PGMQ is a core transport
substrate, so it lives in the seam beside dispatch/routing/barrier, not
under the git-ignored plugins/ cloud overlay.
- sample.env: documents the flag (default-safe, OSS-friendly, no Flipt server).
- tests: 12 routing tests incl. the byte-identical-dispatch characterisation
pinning the inert-scaffold invariant.
Barrier axis untouched (WORKER_BARRIER_BACKEND stays chord). Per-org routing
intentionally deferred to the rollout phase.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review — PG Queue Routing Scaffold (Phase 8a)
Reviewed with a 6-agent toolkit (code review, silent-failure, type design, tests, comments, simplification). This is a small, well-scoped, correct scaffold — tests pass and the byte-identical-wire invariant is real. No blocking bugs.
Themes across findings: (1) one stale _ORGS reference to an unbuilt flag; (2) the scaffold advertises itself as "observable in logs" but the only signal is a DEBUG log that is off by default, and a typo'd task name routes to Celery silently; (3) the headline dispatch test is tautological — the one new behaviour (the log line) is never asserted. Inline comments below, prioritised.
- test seam: drop stale WORKER_PG_QUEUE_ENABLED_ORGS reference — the org axis was removed, that flag never existed [must-fix] - observability: routing log DEBUG -> INFO so a cutover survives a default log config; log-once per task name bounds volume. Log the configured allow-list once per process so a typo'd task name is eyeballable at boot even when it never matches a real dispatch [important] - tests: pin the routing branch with caplog assertions (PG -> log fires, Celery -> no log, bounded to once) so the inert gate can't be silently deleted; assert allow-list logging too [important — closes test gap] - QueueBackend: document the is-not-== discipline (StrEnum makes a typo'd "== cellery" a silent False) [suggestion] - routing: drop dead _parse_allow_list(env_var) param, read the constant directly; one-pass strip; test imports the constant (single source of truth) [nits] - pg_queue docstring: clarify plugins/ subdirs are git-ignored while the dir itself is tracked; soften volatile labs branch/section/filename references [nits] Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Review feedback addressed —
|
Document that the per-task allow-list may only split independent/leaf tasks across substrates. The coupled execution pipeline (async_execute_bin -> file processing -> callback, with the barrier fan-in) must run a single execution entirely on one transport — its migration unit is the execution, not the task. The next phase resolves transport once at kickoff and carries it in ExecutionContext; select_backend then honours that carried marker over the per-task env. Until then, only leaf tasks should be enabled here. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
muhammad-ali-e
left a comment
There was a problem hiding this comment.
LGTM ✅ (non-blocking — author can't self-approve)
All 9 findings from the prior review pass are resolved, verified against 3739a293 + 510a0c4f:
- 🔴 Stale
_ORGSreference removed (grep → zero occurrences anywhere). - 🟠 Routing log promoted to INFO, bounded log-once per task, plus a startup allow-list dump (
_log_allow_list_once) so a typo'd task name is eyeballable at boot — the canary is now genuinely observable, not just "observable in theory." - 🟠 The tautological dispatch test is fixed:
TestObservabilitypins the routing branch with positive, negative, and log-once cases, so deleting the gate now fails a test (I reproduced the original "delete gate → all green" hole; it's closed). - 🟡 StrEnum identity-comparison discipline documented and honoured at every call site; double-strip, dead
env_varparam, and both pg_queue docstring nits all addressed.
The self-added migration-coherence note (510a0c4f) — leaf tasks are safe to split, but the coupled async_execute_bin → file-processing → callback pipeline must migrate per-execution via ExecutionContext rather than per-task — is a valuable forward-looking constraint that pre-empts a real footgun in the next phase.
34/34 worker routing + seam tests pass locally. Scaffold is correct, inert (byte-identical wire), and well-tested. No blocking concerns.
One non-blocking follow-up, already parked for the rollout phase: a registry-based warning for allow-list entries that never match a real dispatched task. The current startup allow-list dump is a solid interim.
Needs a teammate's formal approval to merge (GitHub blocks self-approval).
|
| Filename | Overview |
|---|---|
| workers/queue_backend/routing.py | New routing gate: QueueBackend StrEnum + select_backend() + tolerant allow-list parser; log-once-per-process guard with module-level flag; well-documented scaffold posture. No functional issues. |
| workers/queue_backend/dispatch.py | Wires select_backend() as a logging gate before the unconditional send_task call; _pg_routing_logged set bounds log volume to once per task name per prefork child. Wire behaviour is byte-identical to pre-PR. |
| workers/queue_backend/init.py | Exports QueueBackend and select_backend in all; docstring updated to document the transport-routing axis as orthogonal to the barrier axis. |
| workers/queue_backend/pg_queue/init.py | Scaffold subpackage placeholder; intentionally empty with a design docstring explaining placement rationale (core transport substrate, not a plugin overlay). |
| workers/sample.env | Documents WORKER_PG_QUEUE_ENABLED_TASKS with clear leaf-task vs pipeline-task guidance; uses send_webhook_notification as the safe example and calls out async_execute_bin as a DO NOT use case. |
| workers/tests/test_routing.py | 12 new routing tests covering backend resolution, tolerant parsing, observability log contracts, and the byte-identical dispatch invariant; autouse fixture correctly resets module-level guards between tests. |
| workers/tests/test_queue_backend_seam.py | Updates all surface assertion to include QueueBackend and select_backend; minor docstring whitespace cleanup with no logic impact. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A([dispatch called]) --> B[select_backend task_name]
B --> C{env WORKER_PG_QUEUE_ENABLED_TASKS}
C -- unset / empty --> D[QueueBackend.CELERY]
C -- task_name in allow-list --> E[QueueBackend.PG]
C -- task_name NOT in allow-list --> D
D --> G[send_task via Celery]
E --> F{task_name already logged?}
F -- No --> H[INFO log + add to _pg_routing_logged]
H --> G
F -- Yes --> G
G --> I([DispatchHandle returned])
style E fill:#f9f,stroke:#a0a
style H fill:#ffe,stroke:#aa0
style G fill:#cfc,stroke:#080
Reviews (2): Last reviewed commit: "UN-3534 [DOCS] Fix sample.env example to..." | Re-trigger Greptile
async_execute_bin is the pipeline kickoff — exactly the task the coherence note says must NOT be split per-task. Switch the example to a leaf task (send_webhook_notification) and warn against listing coupled pipeline tasks until ExecutionContext carries the transport choice. Addresses Greptile review feedback. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
bfb2c4d
into
feat/UN-3445-pg-queue-integration
What
Adds the Strangler-Fig queue-transport routing gate that lets PG Queue (PGMQ) coexist with Celery, so task types can be migrated one at a time. Scaffold only — the PG branch is a Celery-routing stub (no PG consumer exists yet), so this is zero-behaviour-change by construction.
Targets the
feat/UN-3445-pg-queue-integrationintegration branch (notmain) — PG-queue work accumulates there until the full path is vetted.Changes
queue_backend/routing.py(new) —QueueBackend{CELERY,PG}enum +select_backend(task_name)reading theWORKER_PG_QUEUE_ENABLED_TASKSallow-list. Tolerant comma-separated parsing (trims whitespace, drops blanks); empty/unset → all Celery; never raises.queue_backend/dispatch.py— consultsselect_backend()and logs the decision at DEBUG. Thesend_taskcall sits outside theQueueBackend.PGbranch, so the wire is byte-identical regardless of the routing decision. PG-selected tasks still dispatch via Celery.queue_backend/pg_queue/__init__.py(new) — scaffold subpackage. PGMQ is a core transport substrate, so it lives in the seam besidedispatch/routing/barrier, not under the git-ignoredplugins/cloud overlay.queue_backend/__init__.py— exportsQueueBackend+select_backend; docstring documents the routing axis as orthogonal to the barrier axis.sample.env— documents the flag (default-safe, OSS-friendly, no Flipt server required).__all__surface updated.Why a per-task allow-list (not a global boolean)
A single global switch would move all traffic at once — the big-bang migration we explicitly avoid. The allow-list migrates one task type at a time and rolls back instantly by removing an entry. Per-org routing is intentionally deferred to the rollout phase; it slots in behind
select_backend()as an additive change when tenant-level canarying is needed.Scope guards
WORKER_BARRIER_BACKENDstayschord. Separate concern.Testing
send_webhook_notificationinto the allow-list produced the routing-selected DEBUG line while the task still dispatched via Celery and the notification delivered normally. Unset → no line, normal behaviour. New code loaded into all workers without breaking startup.Ticket: UN-3534 (parent UN-3533, epic UN-3445)
🤖 Generated with Claude Code