Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3559 [FEAT] PG Queue 9e PR 1 — execution transport seam (inert)#2062

Merged
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3559-transport-seamZipstack/unstract:UN-3559-transport-seamCopy head branch name to clipboard
Jun 17, 2026
Merged

UN-3559 [FEAT] PG Queue 9e PR 1 — execution transport seam (inert)#2062
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3559-transport-seamZipstack/unstract:UN-3559-transport-seamCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What & why

First of 3 PRs for 9e (the coupled-pipeline migration). This one establishes the per-execution transport seam and nothing else — the transport a workflow execution rides ("celery" | "pg_queue") is resolved once at the creation chokepoint and carried in the task payload. It is inert: transport always resolves to "celery", so behaviour is byte-identical to today.

Why a seam first: 9e migrates the coupled pipeline (async_execute_bin → file-batch fan-out → callback) onto the Postgres queue. Because the worker code is shared across both transports and later stages are enqueued from inside the workers, every stage needs to know — per execution — which transport to stay on. This PR lays that plumbing dead (always celery) so the next PR only adds the live PG branch.

Design: workers/queue_backend/pg_queue/9e-design.md (included). Chosen approach = payload-carry, deliberately not a WorkflowExecution column (no migration on the large shared table; all PG-specific state stays in droppable PG tables).

Changes

  • coreWorkflowTransport enum (celery | pg_queue) + DEFAULT_WORKFLOW_TRANSPORT (shared vocabulary).
  • backendresolve_transport() hardwired to celery (signature shaped for the Flipt wiring in PR 3); create-execution internal API returns "transport"; execute_workflow_async adds it to the async_execute_bin kwargs.
  • workers — scheduler threads transport from the create response into the dispatch kwargs; async_execute_bin_general / _execute_general_workflow carry it onto the live WorkflowContextData.

Tests

  • backend test_transport.py — resolver + enum (5)
  • worker test_workflow_context_transport.pyWorkflowContextData carry/default (2)
  • dispatch characterisation updated — kwargs now include transport, + a new test that the backend-resolved transport is threaded verbatim
  • 22 green locally.

Dev-test (against a running stack)

  • Live create-execution internal API now returns "transport":"celery".
  • Workers killed + restarted on the new code (9 types RUNNING); a full API-deployment execution completed end-to-end on the new code (1 batch orchestrated, async_execute_bin succeeded, execution COMPLETED).

Out of scope (later)

  • PR 2: live PG routing (self-chaining + PgBarrier decrement) + the per-batch idempotency key (ships together).
  • PR 3: Flipt canary wiring. The Boolean flag pg_queue_execution_enabled (default false, no rollouts) already exists in dev — inert until PR 3 reads it.
  • Rollout: ops (canary %, dashboards, teardown).

Notes

  • Targets feat/UN-3445-pg-queue-integration (not main).
  • Ticket: UN-3559 (sub-task of the Phase 9 story).

🤖 Generated with Claude Code

Establish the per-execution transport seam for the 9e coupled-pipeline
migration: the transport a workflow execution rides ("celery" | "pg_queue")
is resolved once at the creation chokepoint and carried in the task payload.
Inert — transport always resolves to "celery", so behaviour is unchanged.

Design (chosen = payload-carry, not a WorkflowExecution column):
workers/queue_backend/pg_queue/9e-design.md.

- core: WorkflowTransport enum + DEFAULT_WORKFLOW_TRANSPORT (shared vocabulary).
- backend: resolve_transport() hardwired to celery (signature shaped for PR 3's
  Flipt wiring); create-execution internal API returns "transport";
  execute_workflow_async adds it to the async_execute_bin kwargs.
- workers: scheduler threads transport from the create response into the
  dispatch kwargs; async_execute_bin_general / _execute_general_workflow carry
  it onto the live WorkflowContextData.
- tests: backend resolver + enum (5), worker WorkflowContextData carry/default
  (2), dispatch characterisation updated (+ backend-resolved-transport thread).

Out of scope: live PG routing + per-batch idempotency key (PR 2); Flipt canary
wiring (PR 3); rollout (ops).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 02aa8301-8364-43a1-acb1-ec4af2906d1d

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3559-transport-seam

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review — PG Queue 9e PR 1 (transport seam)

Reviewed with the PR Review Toolkit (code review, silent-failure, type-design, test, comment, simplifier agents) against base feat/UN-3445-pg-queue-integration. Findings verified by hand before posting.

Overall: clean, well-scoped, genuinely inert plumbing. No production-breaking issues. The notable verification result: an initially-flagged "CRITICAL crash on the legacy execute_bin path" is a false positive — the celery/celery_api_deployments queues are consumed by the workers app (-A worker worker), whose async_execute_bin router forwards transport via **kwargs to async_execute_bin_general (which accepts it). The backend celery worker (-A backend worker) only consumes dashboard_metric_events, so the legacy execute_bin is not a live consumer and does not crash. It's noted below only as dead-code hygiene.

The substantive themes are: (1) the enum exists but is never enforced at any boundary, (2) coverage is asymmetric (scheduler path tested, the dominant workflow_helper.py/worker paths are not), and (3) the design doc has several stale/incorrect anchors. Individual findings are inline. All are LOW/MEDIUM and safe to address in follow-ups; none block the inert merge.

Comment thread workers/shared/models/execution_models.py
Comment thread workers/shared/models/execution_models.py Outdated
Comment thread workers/scheduler/tasks.py Outdated
Comment thread backend/workflow_manager/workflow_v2/workflow_helper.py
Comment thread workers/general/tasks.py
Comment thread workers/queue_backend/pg_queue/9e-design.md Outdated
Comment thread workers/queue_backend/pg_queue/9e-design.md Outdated
Comment thread workers/queue_backend/pg_queue/9e-design.md Outdated
Comment thread workers/queue_backend/pg_queue/9e-design.md Outdated
Comment thread workers/queue_backend/pg_queue/9e-design.md Outdated
…est fixes

Address PR #2062 review (PR Review Toolkit findings):

- Validation/silent-failure: add normalize_transport() (core) — fail-closed
  coercion of any inbound transport to a known value (unknown/None -> celery +
  warn). Applied at the scheduler read boundary and in WorkflowContextData
  __post_init__, so a garbage payload value can't reach the PR 2 fan-out read.
- Comment accuracy: WorkflowContextData.transport comment now present-tense
  (carried in PR 1; fan-out read lands in PR 2).
- Dead-code hygiene: add 'transport' to EXECUTION_EXCLUDED_PARAMS so the legacy
  execute_bin -> create_workflow_execution path can't TypeError.
- Two-resolution-sites: documented the deliberate two-site design + the PR 3
  single-chokepoint requirement at execute_workflow_async.
- transport.py: drop the unused logger; leave a PR-3 fail-closed marker.
- Design doc: correct anchors (transport.py / internal_api_views / workflow_
  helper, not execution.py:126), class name (WorkflowContextData not
  WorkflowExecutionContext), scope claims (stage-1 only in PR 1), and remove the
  hard-coded Flipt version/date/live-state.
- Tests: normalize_transport (passthrough / invalid->celery / None / logging),
  WorkflowContextData invalid-transport coercion.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review addressed — commit 2bc40a207

Thanks for the thorough toolkit pass. All 14 findings handled; per-thread replies above. Summary:

Code (all fixed):

  • Validation + silent-failure (×2): added normalize_transport() in core — fail-closed coercion of any inbound transport (unknown / None / ""celery + warning). Applied at the scheduler read boundary and in WorkflowContextData.__post_init__, so a garbage payload value can never reach the PR 2 fan-out read. Coerce-not-raise is deliberate (transport has a safe default by design, unlike workflow_type).
  • Dead-code hygiene: "transport" added to EXECUTION_EXCLUDED_PARAMS (legacy execute_bin path can't TypeError).
  • Two-resolution-sites: documented the deliberate two-site design + the PR-3 single-chokepoint requirement (code comment + design doc).
  • transport.py: dropped the unused logger; left a PR-3 fail-closed marker.
  • Comment accuracy: WorkflowContextData.transport comment is now present-tense.

Design doc (all fixed): correct anchors (transport.py / internal_api_views view / execute_workflow_async, not execution.py:126); WorkflowContextData not WorkflowExecutionContext; scoped claims to stage-1 (stage-2/3 = PR 2); §8 callers by name not line numbers; removed the hard-coded Flipt version/date/live-state.

Tests added: normalize_transport (passthrough / invalid→celery / None / logging-on-invalid / no-warn-on-valid) and WorkflowContextData invalid-transport coercion. 28 green (10 backend + 18 worker).

Deferred as follow-up test coverage (3 test-gap findings — agreed non-blocking; heavier integration-style tests):

  • execute_workflow_async send_task-carries-transport — hits the DB after send_task; the kwarg is a literal, and the contract is exercised by the live dev-test + scheduler characterisation test.
  • _execute_general_workflow threading — straight param pass-through (verified by import/signature check); a clean unit test needs heavy collaborator mocking. Dataclass default + coercion are now unit-tested.
  • create-execution view returns transport — verified live (the real endpoint returned {"transport":"celery"}); a Django-DB unit test needs settings/DB infra the unit suite intentionally avoids.

Still inert — transport always resolves to celery.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 17, 2026 04:55
@greptile-apps

greptile-apps Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR establishes an inert per-execution transport seam for the 9e coupled-pipeline migration. The transport string ("celery" | "pg_queue") is resolved once at execution creation and carried in the task payload end-to-end; PR 1 hardwires the result to "celery" so runtime behaviour is byte-identical to today.

  • Core vocab (data_models.py): WorkflowTransport enum, DEFAULT_WORKFLOW_TRANSPORT, and normalize_transport() fail-closed coercion for untrusted payload/JSONB boundaries.
  • Backend emit sites: create_workflow_execution view (scheduler path) and execute_workflow_async (API/async path) each call resolve_transport() and thread the result into the dispatched task's kwargs; "transport" is also added to EXECUTION_EXCLUDED_PARAMS to prevent it leaking into the WorkflowExecution row-builder.
  • Worker carry: WorkflowContextData gains a transport field (default "celery") coerced fail-closed in __post_init__; async_execute_bin_general and _execute_general_workflow accept and forward it; scheduler threads it from the create-execution HTTP response into the dispatch kwargs.

Confidence Score: 5/5

Safe to merge — all changed paths always return "celery", making this byte-identical to the pre-PR state.

Every new code path is gated behind a hardwired "celery" constant; no conditional branching or new routing is live. The normalize_transport coercion is fail-closed and tested against None, empty, and unrecognised values. The post_init guard on WorkflowContextData prevents garbage payload values from reaching any downstream read site. Test coverage pins all key contracts (resolver, enum values, scheduler threading, context carry/coercion), and the previous review comment about normalize_transport ordering is no longer applicable — the extraction correctly happens after the execution_id null-guard early return.

No files require special attention. The two-resolution-site note in workflow_helper.py is correctly flagged as a PR 3 concern and does not affect correctness here.

Important Files Changed

Filename Overview
unstract/core/src/unstract/core/data_models.py Adds WorkflowTransport enum, DEFAULT_WORKFLOW_TRANSPORT constant, and normalize_transport() fail-closed coercion; except ValueError correctly covers all WorkflowTransport(value) failure modes including None and garbage strings.
backend/workflow_manager/workflow_v2/transport.py New file — resolve_transport() hardwired to "celery" for PR 1; module docstring accurately documents the PR 3 Flipt wiring that will replace the body.
backend/workflow_manager/internal_api_views.py Adds resolve_transport() call and returns transport in the create-execution response; placement is correct (after the WorkflowExecution row is written, before the response is assembled).
backend/workflow_manager/workflow_v2/workflow_helper.py Adds "transport" to EXECUTION_EXCLUDED_PARAMS (prevents the kwarg leaking into the create_workflow_execution classmethod) and resolves/threads transport in execute_workflow_async; the two-resolution-site note is accurate and tracked for PR 3.
workers/scheduler/tasks.py normalize_transport is called correctly after the execution_id null-guard early return (lines 138–143 guard, line 150 extraction), so the log context string is always valid and no work is done on the error path.
workers/shared/models/execution_models.py Adds transport: str = DEFAULT_WORKFLOW_TRANSPORT to WorkflowContextData with post_init coercion via normalize_transport; dataclass field ordering is valid; the live carrier is the correct class (not WorkflowExecutionContext).
workers/general/tasks.py Threads transport parameter through async_execute_bin_general → _execute_general_workflow → WorkflowContextData; default matches DEFAULT_WORKFLOW_TRANSPORT ensuring backward compat with old payloads.
workers/queue_backend/pg_queue/9e-design.md Design document covering payload-carry decision, rejected column alternative, deployment topology, idempotency gate, and 3-PR slice breakdown.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant BE as Backend
    participant RT as resolve_transport()
    participant S as Scheduler Worker
    participant AH as workflow_helper
    participant C as Celery (async_execute_bin)
    participant WC as WorkflowContextData

    note over BE,WC: Scheduler path
    S->>BE: create_workflow_execution(request)
    BE->>RT: resolve_transport()
    RT-->>BE: "celery"
    BE-->>S: "{execution_id, transport: "celery"}"
    S->>S: normalize_transport("celery") → "celery"
    S->>C: "dispatch(async_execute_bin, kwargs={transport: "celery"})"
    C->>WC: "WorkflowContextData(transport="celery")"

    note over BE,WC: API / async path
    AH->>RT: resolve_transport()
    RT-->>AH: "celery"
    AH->>C: "send_task(async_execute_bin, kwargs={transport: "celery"})"
    C->>WC: "WorkflowContextData(transport="celery")"
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant BE as Backend
    participant RT as resolve_transport()
    participant S as Scheduler Worker
    participant AH as workflow_helper
    participant C as Celery (async_execute_bin)
    participant WC as WorkflowContextData

    note over BE,WC: Scheduler path
    S->>BE: create_workflow_execution(request)
    BE->>RT: resolve_transport()
    RT-->>BE: "celery"
    BE-->>S: "{execution_id, transport: "celery"}"
    S->>S: normalize_transport("celery") → "celery"
    S->>C: "dispatch(async_execute_bin, kwargs={transport: "celery"})"
    C->>WC: "WorkflowContextData(transport="celery")"

    note over BE,WC: API / async path
    AH->>RT: resolve_transport()
    RT-->>AH: "celery"
    AH->>C: "send_task(async_execute_bin, kwargs={transport: "celery"})"
    C->>WC: "WorkflowContextData(transport="celery")"
Loading

Reviews (3): Last reviewed commit: "UN-3559 [FIX] 9e PR 1 — SonarCloud: drop..." | Re-trigger Greptile

…reptile P1)

Move the normalize_transport(...) extraction below the `if not execution_id`
guard in _execute_scheduled_workflow. Previously it ran before the guard, so an
error response with no execution_id logged a misleading `[exec:None]` context
and discarded the computed transport. Now transport is only resolved once
execution_id is known non-empty.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Greptile P1 (scheduler normalize_transport ordering) addressed in aa655c508: moved the transport resolution below the execution_id null-guard in _execute_scheduled_workflow, so the no-execution_id error path no longer logs [exec:None] and no longer computes-then-discards the transport. Pre-commit test-selection green. (This was the only open finding — confidence should now clear to 5/5.)

…ams + dict literal

Address SonarCloud code smells on PR #2062:

- python:S1172 (x3): resolve_transport() no longer declares the unused
  workflow_id/pipeline_id/organization_id params — the PR-1 seam is inert and
  needs no inputs. PR 3 reintroduces them (keyed for Flipt) when it wires the
  evaluation; the two call sites (internal_api_views view, execute_workflow_async)
  now call resolve_transport() with no args. Tests updated.
- Replace dict(...) constructor with a {...} literal in
  test_workflow_context_transport._make_context.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud code smells addressed in 1d298de46:

  • python:S1172 (×3)resolve_transport() no longer declares the unused workflow_id/pipeline_id/organization_id params; the inert PR-1 seam needs no inputs. PR 3 reintroduces them (keyed for Flipt) when it wires the evaluation, updating the two call sites then. Tests updated.
  • dict-vs-literal — replaced the dict(...) constructor with a {...} literal in the worker context test.

Tests green (9 backend + 18 worker).

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit d6cead2 into feat/UN-3445-pg-queue-integration Jun 17, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3559-transport-seam branch June 17, 2026 05:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.