UN-3570 [FEAT] PG Queue 9e PR 3 — Flipt canary wiring for transport resolution#2071
UN-3570 [FEAT] PG Queue 9e PR 3 — Flipt canary wiring for transport resolution#2071muhammad-ali-e merged 3 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom feat/UN-3570-FEAT_flipt_canary_transportZipstack/unstract:feat/UN-3570-FEAT_flipt_canary_transportCopy head branch name to clipboard
Conversation
…esolution Replace the hardwired Celery result in resolve_transport() with a real decision: an env master-gate (PG_QUEUE_TRANSPORT_ENABLED, default off) that, when on, consults the Flipt boolean flag pg_queue_execution_enabled. Fails closed to Celery on any error. - entity_id = execution_id (per-execution sticky %-rollout; resolved once, carried in the task payload so an in-flight execution never re-buckets) - context carries org/workflow/pipeline ids (str-coerced — UUIDs in the gRPC map<string,string> context would be swallowed as False and silently force Celery) for segment rollouts - both creation chokepoints wired: internal_api_views.create_workflow_execution (scheduler path) and workflow_helper.execute_workflow_async (API/manual/async) Gated off by default — no behaviour change until the flag is enabled, which requires PG consumers deployed first (deploy-ordering safety). Tests: 15 cases in test_transport.py (gate off/on, flag true/false, fail-closed, entity/context shape, UUID-coercion regression). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).
The change is functionally correct and fails closed safely. The central worry — that the two call sites pass different organization_id values — was verified not a bug: org_id (X-Organization-ID header) and org_schema (UserContext.get_organization_identifier()) both resolve to Organization.organization_id, so per-org Flipt segments match consistently. Findings below are observability, type-honesty, doc-accuracy, and test-gap items — no blockers.
…bservability - Widen resolve_transport id params to `str | UUID` (callers pass UUIDs; the body already str-coerces) — resolves the SonarCloud type-mismatch and makes the contract honest. - Fail closed to celery when organization_id is missing (str(None) must never reach the Flipt org segment) — the helper path reads it from StateStore which can be empty. - Fail closed + loud warning when the gate is ON but FLIPT_SERVICE_AVAILABLE is not true, so a blind Flipt can't masquerade as a healthy 100%-celery canary. - Log the resolved transport on the gate-ON path (deliberate celery vs pg_queue vs blind are now distinguishable); drop the inaccurate "import-time fault" clause from the fail-closed comment. - Doc: organization_id is Organization.organization_id (X-Organization-ID), not the DB pk. - sample.env: document that pg_queue needs all three (gate + FLIPT_SERVICE_AVAILABLE + flag). - +2 tests (missing-org → celery, gate-on + Flipt-unavailable → celery). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review round addressed —
|
|
| Filename | Overview |
|---|---|
| backend/workflow_manager/workflow_v2/transport.py | Core transport resolution: replaces hardwired Celery with a multi-gate Flipt evaluation (env master-gate → Flipt boolean → fail-closed to Celery). Layered fail-safe design; string coercion for gRPC context is correct; logging at every decision branch. |
| backend/workflow_manager/workflow_v2/tests/test_transport.py | 15 new test cases covering gate-off, gate-on, Flipt true/false, exception fail-closed, entity_id/context shape, UUID coercion regression, and optional-id omission. Missing an explicit test for organization_id=None (only "" is covered), leaving the documented None path untested. |
| backend/workflow_manager/internal_api_views.py | Scheduler-path chokepoint: wires resolve_transport() with execution_id, org_id (from validated X-Organization-ID header), workflow.id, and pipeline_id. Clean — org_id is validated non-empty before this call. |
| backend/workflow_manager/workflow_v2/workflow_helper.py | API/manual/async-path chokepoint: wires resolve_transport() with org_schema from StateStore (same Organization.organization_id value as the header). Null StateStore is handled fail-closed in transport.py. |
| backend/backend/settings/base.py | Adds PG_QUEUE_TRANSPORT_ENABLED Django setting via CommonUtils.str_to_bool, defaulting to False. Straightforward and consistent with existing patterns. |
| backend/sample.env | Documents PG_QUEUE_TRANSPORT_ENABLED with clear description of the three-gate requirement and rollback semantics. No issues. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["execution created\n(API path or scheduler path)"] --> B{"PG_QUEUE_TRANSPORT_ENABLED?"}
B -- "false (default)" --> C["return 'celery'"]
B -- "true" --> D{"organization_id\npresent?"}
D -- "no / empty" --> E["log WARNING\nreturn 'celery'"]
D -- "yes" --> F{"FLIPT_SERVICE_AVAILABLE\n== 'true'?"}
F -- "no" --> G["log WARNING\nreturn 'celery'"]
F -- "yes" --> H["check_feature_flag_status\nflag=pg_queue_execution_enabled\nentity_id=execution_id\ncontext={org,workflow,pipeline}"]
H -- "exception" --> I["log WARNING + exc_info\nreturn 'celery'"]
H -- "false" --> J["log INFO → 'celery'\nreturn 'celery'"]
H -- "true" --> K["log INFO → 'pg_queue'\nreturn 'pg_queue'"]
C --> L["transport carried in\nCelery task payload"]
E --> L
G --> L
I --> L
J --> L
K --> L
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
A["execution created\n(API path or scheduler path)"] --> B{"PG_QUEUE_TRANSPORT_ENABLED?"}
B -- "false (default)" --> C["return 'celery'"]
B -- "true" --> D{"organization_id\npresent?"}
D -- "no / empty" --> E["log WARNING\nreturn 'celery'"]
D -- "yes" --> F{"FLIPT_SERVICE_AVAILABLE\n== 'true'?"}
F -- "no" --> G["log WARNING\nreturn 'celery'"]
F -- "yes" --> H["check_feature_flag_status\nflag=pg_queue_execution_enabled\nentity_id=execution_id\ncontext={org,workflow,pipeline}"]
H -- "exception" --> I["log WARNING + exc_info\nreturn 'celery'"]
H -- "false" --> J["log INFO → 'celery'\nreturn 'celery'"]
H -- "true" --> K["log INFO → 'pg_queue'\nreturn 'pg_queue'"]
C --> L["transport carried in\nCelery task payload"]
E --> L
G --> L
I --> L
J --> L
K --> L
Reviews (2): Last reviewed commit: "UN-3570 address greptile: organization_i..." | Re-trigger Greptile
…VAILABLE parse parity - organization_id annotation widened to `str | UUID | None` — the helper path passes UserContext.get_organization_identifier() which can be None at runtime; the existing `if not organization_id` guard handles it, so the type should admit None rather than mislead callers/static analysis. - FLIPT_SERVICE_AVAILABLE check now parses exactly like FliptClient (`.lower()`, no `.strip()`) so the two can't disagree on a whitespaced value like " true" (which would otherwise skip the "Flipt blind" warning). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
greptile round addressed —
|
|
c2e6a6f
into
feat/UN-3445-pg-queue-integration
What
resolve_transport()now makes a real decision instead of always returning Celery: an env master-gate (PG_QUEUE_TRANSPORT_ENABLED, default off) → when on, a Flipt boolean flag (pg_queue_execution_enabled) → fail-closed to Celery on any error.internal_api_views.create_workflow_execution(scheduler path) andworkflow_helper.execute_workflow_async(API / manual / async path).PG_QUEUE_TRANSPORT_ENABLED+ asample.enventry.Why
resolve_transporthardwired to Celery; this PR lets a new execution be canary-routed onto Postgres while everything else stays on Celery. Part of PG Queue Phase 9 (UN-3536), sub-task UN-3570.How
str-coerced: callers pass UUID objects, and a non-string value in Flipt's gRPCmap<string,string>context is swallowed by the client asFalse, silently forcing Celery (found and fixed during dev-test).normalize_transporton the read side: fail-closed.Can this PR break any existing features?
PG_QUEUE_TRANSPORT_ENABLEDis off (the default) — the gate returns Celery before Flipt is even consulted. Even with the gate on, the Flipt flag defaults to off, and any Flipt error falls back to Celery. The flag must not be enabled in production until PG consumers are deployed (deploy-ordering safety, enforced by the default-off env gate).Database Migrations
Env Config
PG_QUEUE_TRANSPORT_ENABLED(defaultFalse). When nottrue, transport resolution never consults Flipt and every execution rides Celery — kill-switch + deploy-ordering safety.FLIPT_SERVICE_AVAILABLE/EVALUATION_SERVER_IP/EVALUATION_SERVER_PORTalready gate Flipt availability (client fails closed toFalse).Notes on Testing
test_transport.py— gate off (Flipt not consulted), gate on + flag true → pg_queue, flag false → celery, Flipt exception → fail-closed, entity_id/context shape, optional-id omission, and a UUID-coercion regression test.pg_barrier_state/pg_batch_dedupuntouched; COMPLETED.Related Issues
Checklist
🤖 Generated with Claude Code