Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3570 [FEAT] PG Queue 9e PR 3 — Flipt canary wiring for transport resolution#2071

Merged
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3570-FEAT_flipt_canary_transportZipstack/unstract:feat/UN-3570-FEAT_flipt_canary_transportCopy head branch name to clipboard
Jun 18, 2026
Merged

UN-3570 [FEAT] PG Queue 9e PR 3 — Flipt canary wiring for transport resolution#2071
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3570-FEAT_flipt_canary_transportZipstack/unstract:feat/UN-3570-FEAT_flipt_canary_transportCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

  • resolve_transport() now makes a real decision instead of always returning Celery: an env master-gate (PG_QUEUE_TRANSPORT_ENABLED, default off) → when on, a Flipt boolean flag (pg_queue_execution_enabled) → fail-closed to Celery on any error.
  • Wired at both execution-creation chokepoints: internal_api_views.create_workflow_execution (scheduler path) and workflow_helper.execute_workflow_async (API / manual / async path).
  • New setting PG_QUEUE_TRANSPORT_ENABLED + a sample.env entry.

Why

  • This is the on/off switch for the 9e PG-queue pipeline. PR 1 left resolve_transport hardwired to Celery; this PR lets a new execution be canary-routed onto Postgres while everything else stays on Celery. Part of PG Queue Phase 9 (UN-3536), sub-task UN-3570.

How

  • Master-gate (env, default off): until ops turns it on, Flipt is never consulted and every execution rides Celery. Doubles as the instant kill-switch and the deploy-ordering safety (inert until PG consumers are deployed).
  • entity_id = execution_id → per-execution percentage-rollout stickiness. The decision is resolved once at creation and carried in the task payload, so an in-flight execution never re-buckets.
  • context carries org / workflow / pipeline ids for Flipt segment rules. All values are str-coerced: callers pass UUID objects, and a non-string value in Flipt's gRPC map<string,string> context is swallowed by the client as False, silently forcing Celery (found and fixed during dev-test).
  • Decision mirrors normalize_transport on the read side: fail-closed.

Can this PR break any existing features?

  • No. Behaviour is byte-identical to today while PG_QUEUE_TRANSPORT_ENABLED is off (the default) — the gate returns Celery before Flipt is even consulted. Even with the gate on, the Flipt flag defaults to off, and any Flipt error falls back to Celery. The flag must not be enabled in production until PG consumers are deployed (deploy-ordering safety, enforced by the default-off env gate).

Database Migrations

  • None.

Env Config

  • New: PG_QUEUE_TRANSPORT_ENABLED (default False). When not true, transport resolution never consults Flipt and every execution rides Celery — kill-switch + deploy-ordering safety.
  • Existing FLIPT_SERVICE_AVAILABLE / EVALUATION_SERVER_IP / EVALUATION_SERVER_PORT already gate Flipt availability (client fails closed to False).

Notes on Testing

  • Unit: 15 cases in test_transport.py — gate off (Flipt not consulted), gate on + flag true → pg_queue, flag false → celery, Flipt exception → fail-closed, entity_id/context shape, optional-id omission, and a UUID-coercion regression test.
  • Dev-tested end-to-end against a running stack:
    • Gate OFF: API deployment runs fully on Celery; pg_barrier_state / pg_batch_dedup untouched; COMPLETED.
    • Gate ON + flag at 100%: real API deployment routes through the PG consumers (PgBarrier → decrement remaining=0 → callback) to COMPLETED with clean teardown.
    • Live-Flipt decision matrix: gate off / flag on / Flipt unavailable → celery / pg_queue / celery.

Related Issues

  • UN-3536 (PG Queue Phase 9 — transport engine), sub-task UN-3570.
  • Follow-ups (separate sub-tasks): 2d orchestrator-on-PG; UN-3566 multi-queue consumer ergonomics.

Checklist

  • Appropriate PR title and description
  • Code follows project style guidelines
  • Self-review performed
  • Tests added that prove the feature works
  • New and existing unit tests pass locally
  • Pre-commit passes on all changed files

🤖 Generated with Claude Code

…esolution

Replace the hardwired Celery result in resolve_transport() with a real
decision: an env master-gate (PG_QUEUE_TRANSPORT_ENABLED, default off) that,
when on, consults the Flipt boolean flag pg_queue_execution_enabled. Fails
closed to Celery on any error.

- entity_id = execution_id (per-execution sticky %-rollout; resolved once,
  carried in the task payload so an in-flight execution never re-buckets)
- context carries org/workflow/pipeline ids (str-coerced — UUIDs in the gRPC
  map<string,string> context would be swallowed as False and silently force
  Celery) for segment rollouts
- both creation chokepoints wired: internal_api_views.create_workflow_execution
  (scheduler path) and workflow_helper.execute_workflow_async (API/manual/async)

Gated off by default — no behaviour change until the flag is enabled, which
requires PG consumers deployed first (deploy-ordering safety).

Tests: 15 cases in test_transport.py (gate off/on, flag true/false, fail-closed,
entity/context shape, UUID-coercion regression).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ad611df2-caad-4627-bac9-a46b5b1b9fd1

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/UN-3570-FEAT_flipt_canary_transport

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).

The change is functionally correct and fails closed safely. The central worry — that the two call sites pass different organization_id values — was verified not a bug: org_id (X-Organization-ID header) and org_schema (UserContext.get_organization_identifier()) both resolve to Organization.organization_id, so per-org Flipt segments match consistently. Findings below are observability, type-honesty, doc-accuracy, and test-gap items — no blockers.

Comment thread backend/workflow_manager/workflow_v2/transport.py Outdated
Comment thread backend/workflow_manager/workflow_v2/transport.py Outdated
Comment thread backend/workflow_manager/workflow_v2/transport.py
Comment thread backend/workflow_manager/workflow_v2/transport.py Outdated
Comment thread backend/workflow_manager/workflow_v2/transport.py
Comment thread backend/workflow_manager/internal_api_views.py
Comment thread backend/workflow_manager/workflow_v2/workflow_helper.py
Comment thread backend/sample.env
…bservability

- Widen resolve_transport id params to `str | UUID` (callers pass UUIDs; the
  body already str-coerces) — resolves the SonarCloud type-mismatch and makes
  the contract honest.
- Fail closed to celery when organization_id is missing (str(None) must never
  reach the Flipt org segment) — the helper path reads it from StateStore which
  can be empty.
- Fail closed + loud warning when the gate is ON but FLIPT_SERVICE_AVAILABLE is
  not true, so a blind Flipt can't masquerade as a healthy 100%-celery canary.
- Log the resolved transport on the gate-ON path (deliberate celery vs pg_queue
  vs blind are now distinguishable); drop the inaccurate "import-time fault"
  clause from the fail-closed comment.
- Doc: organization_id is Organization.organization_id (X-Organization-ID), not
  the DB pk.
- sample.env: document that pg_queue needs all three (gate + FLIPT_SERVICE_AVAILABLE
  + flag).
- +2 tests (missing-org → celery, gate-on + Flipt-unavailable → celery).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round addressed — b922ea453

Thanks for the thorough pass. Summary of what changed:

Fixed (5)

  • Type honesty / SonarCloudresolve_transport id params widened to str | UUID (UUID under TYPE_CHECKING); coercion stays internal. Resolves the SonarCloud type-mismatch (1 new issue → 0).
  • Missing-org guard — fail closed to celery (with a warning) when organization_id is empty, so str(None) never reaches the Flipt org segment. + regression test.
  • Observability — gate-ON path now logs the resolved transport (deliberate-celery vs deliberate-pg_queue distinguishable), and warns + short-circuits to celery when the gate is ON but FLIPT_SERVICE_AVAILABLE != true (blind Flipt no longer hides as a healthy 100%-celery canary). Dropped the inaccurate "import-time fault" comment.
  • Docstringorganization_id is Organization.organization_id (the X-Organization-ID header value), not the DB pk.
  • sample.env — documents that PG routing needs all three: gate + FLIPT_SERVICE_AVAILABLE + flag.

Tests: 15 → 17 (added missing-org and gate-on-Flipt-unavailable cases). Pre-commit green.

Deferred (3, with rationale on each thread)

  • Return WorkflowTransport enum — transport is a string wire-value throughout 9e (payload JSON / WorkflowContextData.transport: str / normalize_transport); we only ever return .value, so the typo class of bug is already impossible. Can do as a separate type-design pass.
  • View call-site test — DB + Bearer-auth path, no pytest-django; the contract was exercised end-to-end in dev-test (real create_workflow_executiontransport=pg_queue → ran on PG consumers). Filing a focused follow-up.
  • org_schema rename — appears in a second method here too; renaming one is inconsistent, renaming both is out of this PR's scope. Offered as a separate MISC cleanup.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 18, 2026 06:50
@greptile-apps

greptile-apps Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR wires real transport routing into resolve_transport() (replacing the PR 1 hardwired Celery stub): an env master-gate (PG_QUEUE_TRANSPORT_ENABLED, default off) guards a Flipt boolean evaluation, which is itself wrapped in an explicit fail-closed handler, so a Flipt outage can never break execution creation. The decision is resolved once at creation (keyed on execution_id for sticky bucketing) and carried in the task payload — never persisted on the row.

  • transport.py implements the three-gate chain (env → FLIPT_SERVICE_AVAILABLE check → check_feature_flag_status) with full str-coercion for gRPC context keys and log coverage at every decision branch.
  • Two call sites updated: internal_api_views.create_workflow_execution (scheduler path) and workflow_helper.execute_workflow_async (API/manual/async path), both passing execution_id, organization_id, workflow_id, and pipeline_id.
  • 15 unit tests pin the gate-off invariant, gate-on paths, Flipt exception fail-close, entity/context shape, and a UUID-coercion regression case; the only gap is a missing organization_id=None parametrize branch.

Confidence Score: 5/5

Safe to merge: with PG_QUEUE_TRANSPORT_ENABLED off (the default), this change is byte-identical to the previous PR 1 stub and cannot affect any existing execution path.

The master-gate default is off, so the entire new code path is inert in production until ops explicitly enables it. The Flipt evaluation path is layered with two independent fail-closed guards, both exercised by unit tests. No database migration, no persisted state change.

No files require special attention; the single gap (missing None parametrize in test_transport.py) is a test-coverage nit and does not affect runtime behavior.

Important Files Changed

Filename Overview
backend/workflow_manager/workflow_v2/transport.py Core transport resolution: replaces hardwired Celery with a multi-gate Flipt evaluation (env master-gate → Flipt boolean → fail-closed to Celery). Layered fail-safe design; string coercion for gRPC context is correct; logging at every decision branch.
backend/workflow_manager/workflow_v2/tests/test_transport.py 15 new test cases covering gate-off, gate-on, Flipt true/false, exception fail-closed, entity_id/context shape, UUID coercion regression, and optional-id omission. Missing an explicit test for organization_id=None (only "" is covered), leaving the documented None path untested.
backend/workflow_manager/internal_api_views.py Scheduler-path chokepoint: wires resolve_transport() with execution_id, org_id (from validated X-Organization-ID header), workflow.id, and pipeline_id. Clean — org_id is validated non-empty before this call.
backend/workflow_manager/workflow_v2/workflow_helper.py API/manual/async-path chokepoint: wires resolve_transport() with org_schema from StateStore (same Organization.organization_id value as the header). Null StateStore is handled fail-closed in transport.py.
backend/backend/settings/base.py Adds PG_QUEUE_TRANSPORT_ENABLED Django setting via CommonUtils.str_to_bool, defaulting to False. Straightforward and consistent with existing patterns.
backend/sample.env Documents PG_QUEUE_TRANSPORT_ENABLED with clear description of the three-gate requirement and rollback semantics. No issues.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["execution created\n(API path or scheduler path)"] --> B{"PG_QUEUE_TRANSPORT_ENABLED?"}
    B -- "false (default)" --> C["return 'celery'"]
    B -- "true" --> D{"organization_id\npresent?"}
    D -- "no / empty" --> E["log WARNING\nreturn 'celery'"]
    D -- "yes" --> F{"FLIPT_SERVICE_AVAILABLE\n== 'true'?"}
    F -- "no" --> G["log WARNING\nreturn 'celery'"]
    F -- "yes" --> H["check_feature_flag_status\nflag=pg_queue_execution_enabled\nentity_id=execution_id\ncontext={org,workflow,pipeline}"]
    H -- "exception" --> I["log WARNING + exc_info\nreturn 'celery'"]
    H -- "false" --> J["log INFO → 'celery'\nreturn 'celery'"]
    H -- "true" --> K["log INFO → 'pg_queue'\nreturn 'pg_queue'"]
    C --> L["transport carried in\nCelery task payload"]
    E --> L
    G --> L
    I --> L
    J --> L
    K --> L
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
    A["execution created\n(API path or scheduler path)"] --> B{"PG_QUEUE_TRANSPORT_ENABLED?"}
    B -- "false (default)" --> C["return 'celery'"]
    B -- "true" --> D{"organization_id\npresent?"}
    D -- "no / empty" --> E["log WARNING\nreturn 'celery'"]
    D -- "yes" --> F{"FLIPT_SERVICE_AVAILABLE\n== 'true'?"}
    F -- "no" --> G["log WARNING\nreturn 'celery'"]
    F -- "yes" --> H["check_feature_flag_status\nflag=pg_queue_execution_enabled\nentity_id=execution_id\ncontext={org,workflow,pipeline}"]
    H -- "exception" --> I["log WARNING + exc_info\nreturn 'celery'"]
    H -- "false" --> J["log INFO → 'celery'\nreturn 'celery'"]
    H -- "true" --> K["log INFO → 'pg_queue'\nreturn 'pg_queue'"]
    C --> L["transport carried in\nCelery task payload"]
    E --> L
    G --> L
    I --> L
    J --> L
    K --> L
Loading

Reviews (2): Last reviewed commit: "UN-3570 address greptile: organization_i..." | Re-trigger Greptile

Comment thread backend/workflow_manager/workflow_v2/transport.py
Comment thread backend/workflow_manager/workflow_v2/transport.py Outdated
…VAILABLE parse parity

- organization_id annotation widened to `str | UUID | None` — the helper path
  passes UserContext.get_organization_identifier() which can be None at runtime;
  the existing `if not organization_id` guard handles it, so the type should
  admit None rather than mislead callers/static analysis.
- FLIPT_SERVICE_AVAILABLE check now parses exactly like FliptClient (`.lower()`,
  no `.strip()`) so the two can't disagree on a whitespaced value like " true"
  (which would otherwise skip the "Flipt blind" warning).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

greptile round addressed — f9aee0e6f

Both P2s fixed:

  • organization_id None type — annotation now str | UUID | None (the helper path can pass None; the if not organization_id guard already handles it).
  • FLIPT_SERVICE_AVAILABLE parse parity — dropped .strip() to match FliptClient (.lower() only), so the two can never disagree on a value like " true" and the blind-Flipt warning stays in lockstep with the client.

17 tests green, pre-commit clean.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit c2e6a6f into feat/UN-3445-pg-queue-integration Jun 18, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the feat/UN-3570-FEAT_flipt_canary_transport branch June 18, 2026 07:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.