Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3534 [FEAT] PG Queue Phase 8a — queue-transport routing gate + scaffold#2033

Merged
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3534-pg-queue-routing-scaffoldZipstack/unstract:UN-3534-pg-queue-routing-scaffoldCopy head branch name to clipboard
Jun 11, 2026
Merged

UN-3534 [FEAT] PG Queue Phase 8a — queue-transport routing gate + scaffold#2033
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3534-pg-queue-routing-scaffoldZipstack/unstract:UN-3534-pg-queue-routing-scaffoldCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Adds the Strangler-Fig queue-transport routing gate that lets PG Queue (PGMQ) coexist with Celery, so task types can be migrated one at a time. Scaffold only — the PG branch is a Celery-routing stub (no PG consumer exists yet), so this is zero-behaviour-change by construction.

Targets the feat/UN-3445-pg-queue-integration integration branch (not main) — PG-queue work accumulates there until the full path is vetted.

Changes

  • queue_backend/routing.py (new)QueueBackend{CELERY,PG} enum + select_backend(task_name) reading the WORKER_PG_QUEUE_ENABLED_TASKS allow-list. Tolerant comma-separated parsing (trims whitespace, drops blanks); empty/unset → all Celery; never raises.
  • queue_backend/dispatch.py — consults select_backend() and logs the decision at DEBUG. The send_task call sits outside the QueueBackend.PG branch, so the wire is byte-identical regardless of the routing decision. PG-selected tasks still dispatch via Celery.
  • queue_backend/pg_queue/__init__.py (new) — scaffold subpackage. PGMQ is a core transport substrate, so it lives in the seam beside dispatch/routing/barrier, not under the git-ignored plugins/ cloud overlay.
  • queue_backend/__init__.py — exports QueueBackend + select_backend; docstring documents the routing axis as orthogonal to the barrier axis.
  • sample.env — documents the flag (default-safe, OSS-friendly, no Flipt server required).
  • tests — 12 routing tests including the byte-identical-dispatch characterisation pinning the inert-scaffold invariant; seam __all__ surface updated.

Why a per-task allow-list (not a global boolean)

A single global switch would move all traffic at once — the big-bang migration we explicitly avoid. The allow-list migrates one task type at a time and rolls back instantly by removing an entry. Per-org routing is intentionally deferred to the rollout phase; it slots in behind select_backend() as an additive change when tenant-level canarying is needed.

Scope guards

  • Barrier axis untouchedWORKER_BARRIER_BACKEND stays chord. Separate concern.
  • No call-site changes; no new mandatory worker; system works with and without the flag set.
  • Out of scope (later phases): real PGMQ enqueue/dequeue, consumer poll loop, orchestrator.

Testing

  • 45 queue_backend unit tests green (12 new routing tests); ruff clean.
  • Dev-tested on a running worker stack: opting send_webhook_notification into the allow-list produced the routing-selected DEBUG line while the task still dispatched via Celery and the notification delivered normally. Unset → no line, normal behaviour. New code loaded into all workers without breaking startup.

Ticket: UN-3534 (parent UN-3533, epic UN-3445)

🤖 Generated with Claude Code

…ffold

Add the Strangler-Fig routing seam that lets PG Queue (PGMQ) coexist with
Celery so task types can be migrated one at a time. Scaffold only: the PG
branch is a Celery-routing stub (no PG consumer exists yet), so this is
zero-behaviour-change by construction.

- queue_backend/routing.py: QueueBackend{CELERY,PG} + select_backend(task_name)
  reading the WORKER_PG_QUEUE_ENABLED_TASKS allow-list (default empty -> all
  Celery). Tolerant CSV parsing; never raises.
- dispatch(): consults select_backend(); PG-selected tasks are logged but
  still dispatched via Celery. The send_task call sits outside the PG branch
  so the wire is byte-identical regardless of the routing decision.
- queue_backend/pg_queue/: scaffold subpackage. PGMQ is a core transport
  substrate, so it lives in the seam beside dispatch/routing/barrier, not
  under the git-ignored plugins/ cloud overlay.
- sample.env: documents the flag (default-safe, OSS-friendly, no Flipt server).
- tests: 12 routing tests incl. the byte-identical-dispatch characterisation
  pinning the inert-scaffold invariant.

Barrier axis untouched (WORKER_BARRIER_BACKEND stays chord). Per-org routing
intentionally deferred to the rollout phase.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1e158b72-e506-4f86-83a6-e152a629be9a

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3534-pg-queue-routing-scaffold

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR Review — PG Queue Routing Scaffold (Phase 8a)

Reviewed with a 6-agent toolkit (code review, silent-failure, type design, tests, comments, simplification). This is a small, well-scoped, correct scaffold — tests pass and the byte-identical-wire invariant is real. No blocking bugs.

Themes across findings: (1) one stale _ORGS reference to an unbuilt flag; (2) the scaffold advertises itself as "observable in logs" but the only signal is a DEBUG log that is off by default, and a typo'd task name routes to Celery silently; (3) the headline dispatch test is tautological — the one new behaviour (the log line) is never asserted. Inline comments below, prioritised.

Comment thread workers/tests/test_queue_backend_seam.py Outdated
Comment thread workers/queue_backend/dispatch.py Outdated
Comment thread workers/queue_backend/routing.py Outdated
Comment thread workers/tests/test_routing.py
Comment thread workers/queue_backend/routing.py
Comment thread workers/queue_backend/routing.py Outdated
Comment thread workers/queue_backend/routing.py Outdated
Comment thread workers/queue_backend/pg_queue/__init__.py Outdated
Comment thread workers/queue_backend/pg_queue/__init__.py Outdated
- test seam: drop stale WORKER_PG_QUEUE_ENABLED_ORGS reference — the org
  axis was removed, that flag never existed [must-fix]
- observability: routing log DEBUG -> INFO so a cutover survives a default
  log config; log-once per task name bounds volume. Log the configured
  allow-list once per process so a typo'd task name is eyeballable at boot
  even when it never matches a real dispatch [important]
- tests: pin the routing branch with caplog assertions (PG -> log fires,
  Celery -> no log, bounded to once) so the inert gate can't be silently
  deleted; assert allow-list logging too [important — closes test gap]
- QueueBackend: document the is-not-== discipline (StrEnum makes a typo'd
  "== cellery" a silent False) [suggestion]
- routing: drop dead _parse_allow_list(env_var) param, read the constant
  directly; one-pass strip; test imports the constant (single source of
  truth) [nits]
- pg_queue docstring: clarify plugins/ subdirs are git-ignored while the
  dir itself is tracked; soften volatile labs branch/section/filename
  references [nits]

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review feedback addressed — 3739a2933

All 9 threads resolved (1 must-fix, 3 important, 1 suggestion, 4 nits):

Must-fix

  • Dropped the stale WORKER_PG_QUEUE_ENABLED_ORGS reference in the seam test (org axis was removed; that flag never existed).

Important

  • Observability — routing log DEBUG → INFO (survives a default log config), log-once per task name to bound volume.
  • Silent misconfig_log_allow_list_once() logs the configured allow-list at INFO once per process, so a typo'd task name is eyeballable even when it never matches a real dispatch (fires only for a non-empty list → default stays inert).
  • Test gap — added caplog tests pinning the routing branch (PG → log fires, Celery → no log, bounded-to-once). Deleting the gate now fails a test; confirmed it was green-on-delete before.

Suggestion

  • QueueBackend keeps StrEnum for symmetry with BarrierBackend + docstring note that callers must use is, not ==.

Nits

  • One-pass strip in _parse_allow_list; dropped its dead env_var param; test imports _ENABLED_TASKS_ENV_VAR (single source of truth).
  • pg_queue docstring: clarified plugins/ subdirs are git-ignored (dir itself tracked); softened volatile labs branch/section/filename refs.

50 queue_backend unit tests green (17 routing, +5 observability); ruff + pre-commit clean.

Document that the per-task allow-list may only split independent/leaf
tasks across substrates. The coupled execution pipeline (async_execute_bin
-> file processing -> callback, with the barrier fan-in) must run a single
execution entirely on one transport — its migration unit is the execution,
not the task. The next phase resolves transport once at kickoff and carries
it in ExecutionContext; select_backend then honours that carried marker
over the per-task env. Until then, only leaf tasks should be enabled here.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 11, 2026 09:13

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM ✅ (non-blocking — author can't self-approve)

All 9 findings from the prior review pass are resolved, verified against 3739a293 + 510a0c4f:

  • 🔴 Stale _ORGS reference removed (grep → zero occurrences anywhere).
  • 🟠 Routing log promoted to INFO, bounded log-once per task, plus a startup allow-list dump (_log_allow_list_once) so a typo'd task name is eyeballable at boot — the canary is now genuinely observable, not just "observable in theory."
  • 🟠 The tautological dispatch test is fixed: TestObservability pins the routing branch with positive, negative, and log-once cases, so deleting the gate now fails a test (I reproduced the original "delete gate → all green" hole; it's closed).
  • 🟡 StrEnum identity-comparison discipline documented and honoured at every call site; double-strip, dead env_var param, and both pg_queue docstring nits all addressed.

The self-added migration-coherence note (510a0c4f) — leaf tasks are safe to split, but the coupled async_execute_bin → file-processing → callback pipeline must migrate per-execution via ExecutionContext rather than per-task — is a valuable forward-looking constraint that pre-empts a real footgun in the next phase.

34/34 worker routing + seam tests pass locally. Scaffold is correct, inert (byte-identical wire), and well-tested. No blocking concerns.

One non-blocking follow-up, already parked for the rollout phase: a registry-based warning for allow-list entries that never match a real dispatched task. The current startup allow-list dump is a solid interim.

Needs a teammate's formal approval to merge (GitHub blocks self-approval).

@greptile-apps

greptile-apps Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces the Strangler-Fig queue-transport routing gate (Phase 8a), adding scaffolding that lets PG Queue (PGMQ) coexist with Celery so task types can be migrated one at a time. All observable behavior is unchanged — PG-selected tasks still dispatch via Celery, with only an INFO log as evidence.

  • routing.py adds QueueBackend enum and select_backend(), reading a comma-separated per-task allow-list from WORKER_PG_QUEUE_ENABLED_TASKS; empty/unset always resolves to Celery and the parser never raises.
  • dispatch.py wires select_backend() in as a logging gate — the send_task call is outside the routing branch, guaranteeing byte-identical wire behaviour regardless of the routing decision.
  • pg_queue/__init__.py reserves the subpackage home for the future PGMQ consumer; 12 new tests pin the inert-scaffold invariant and observability contract.

Confidence Score: 5/5

Safe to merge — the scaffold is inert by construction; PG-selected tasks still dispatch via Celery unchanged, and the routing gate is only observable through INFO logs.

The change introduces no new code path to production task delivery. The send_task call is unconditional and sits outside the routing branch, so even a misconfigured allow-list cannot alter wire behaviour. The 12 new tests explicitly pin this invariant. All previously identified concerns are pre-existing, acknowledged in comments, and scoped to a future phase where the PG consumer actually exists.

No files require special attention. routing.py and dispatch.py are the functional core but are thoroughly covered by the new test suite.

Important Files Changed

Filename Overview
workers/queue_backend/routing.py New routing gate: QueueBackend StrEnum + select_backend() + tolerant allow-list parser; log-once-per-process guard with module-level flag; well-documented scaffold posture. No functional issues.
workers/queue_backend/dispatch.py Wires select_backend() as a logging gate before the unconditional send_task call; _pg_routing_logged set bounds log volume to once per task name per prefork child. Wire behaviour is byte-identical to pre-PR.
workers/queue_backend/init.py Exports QueueBackend and select_backend in all; docstring updated to document the transport-routing axis as orthogonal to the barrier axis.
workers/queue_backend/pg_queue/init.py Scaffold subpackage placeholder; intentionally empty with a design docstring explaining placement rationale (core transport substrate, not a plugin overlay).
workers/sample.env Documents WORKER_PG_QUEUE_ENABLED_TASKS with clear leaf-task vs pipeline-task guidance; uses send_webhook_notification as the safe example and calls out async_execute_bin as a DO NOT use case.
workers/tests/test_routing.py 12 new routing tests covering backend resolution, tolerant parsing, observability log contracts, and the byte-identical dispatch invariant; autouse fixture correctly resets module-level guards between tests.
workers/tests/test_queue_backend_seam.py Updates all surface assertion to include QueueBackend and select_backend; minor docstring whitespace cleanup with no logic impact.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A([dispatch called]) --> B[select_backend task_name]
    B --> C{env WORKER_PG_QUEUE_ENABLED_TASKS}
    C -- unset / empty --> D[QueueBackend.CELERY]
    C -- task_name in allow-list --> E[QueueBackend.PG]
    C -- task_name NOT in allow-list --> D
    D --> G[send_task via Celery]
    E --> F{task_name already logged?}
    F -- No --> H[INFO log + add to _pg_routing_logged]
    H --> G
    F -- Yes --> G
    G --> I([DispatchHandle returned])
    style E fill:#f9f,stroke:#a0a
    style H fill:#ffe,stroke:#aa0
    style G fill:#cfc,stroke:#080
Loading

Reviews (2): Last reviewed commit: "UN-3534 [DOCS] Fix sample.env example to..." | Re-trigger Greptile

Comment thread workers/queue_backend/dispatch.py
Comment thread workers/queue_backend/routing.py
Comment thread workers/sample.env Outdated
async_execute_bin is the pipeline kickoff — exactly the task the coherence
note says must NOT be split per-task. Switch the example to a leaf task
(send_webhook_notification) and warn against listing coupled pipeline tasks
until ExecutionContext carries the transport choice.

Addresses Greptile review feedback.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit bfb2c4d into feat/UN-3445-pg-queue-integration Jun 11, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3534-pg-queue-routing-scaffold branch June 11, 2026 09:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.