UN-3559 [FEAT] PG Queue 9e PR 1 — execution transport seam (inert)#2062
UN-3559 [FEAT] PG Queue 9e PR 1 — execution transport seam (inert)#2062muhammad-ali-e merged 4 commits into
Conversation
Establish the per-execution transport seam for the 9e coupled-pipeline
migration: the transport a workflow execution rides ("celery" | "pg_queue")
is resolved once at the creation chokepoint and carried in the task payload.
Inert — transport always resolves to "celery", so behaviour is unchanged.
Design (chosen = payload-carry, not a WorkflowExecution column):
workers/queue_backend/pg_queue/9e-design.md.
- core: WorkflowTransport enum + DEFAULT_WORKFLOW_TRANSPORT (shared vocabulary).
- backend: resolve_transport() hardwired to celery (signature shaped for PR 3's
Flipt wiring); create-execution internal API returns "transport";
execute_workflow_async adds it to the async_execute_bin kwargs.
- workers: scheduler threads transport from the create response into the
dispatch kwargs; async_execute_bin_general / _execute_general_workflow carry
it onto the live WorkflowContextData.
- tests: backend resolver + enum (5), worker WorkflowContextData carry/default
(2), dispatch characterisation updated (+ backend-resolved-transport thread).
Out of scope: live PG routing + per-batch idempotency key (PR 2); Flipt canary
wiring (PR 3); rollout (ops).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review — PG Queue 9e PR 1 (transport seam)
Reviewed with the PR Review Toolkit (code review, silent-failure, type-design, test, comment, simplifier agents) against base feat/UN-3445-pg-queue-integration. Findings verified by hand before posting.
Overall: clean, well-scoped, genuinely inert plumbing. No production-breaking issues. The notable verification result: an initially-flagged "CRITICAL crash on the legacy execute_bin path" is a false positive — the celery/celery_api_deployments queues are consumed by the workers app (-A worker worker), whose async_execute_bin router forwards transport via **kwargs to async_execute_bin_general (which accepts it). The backend celery worker (-A backend worker) only consumes dashboard_metric_events, so the legacy execute_bin is not a live consumer and does not crash. It's noted below only as dead-code hygiene.
The substantive themes are: (1) the enum exists but is never enforced at any boundary, (2) coverage is asymmetric (scheduler path tested, the dominant workflow_helper.py/worker paths are not), and (3) the design doc has several stale/incorrect anchors. Individual findings are inline. All are LOW/MEDIUM and safe to address in follow-ups; none block the inert merge.
…est fixes Address PR #2062 review (PR Review Toolkit findings): - Validation/silent-failure: add normalize_transport() (core) — fail-closed coercion of any inbound transport to a known value (unknown/None -> celery + warn). Applied at the scheduler read boundary and in WorkflowContextData __post_init__, so a garbage payload value can't reach the PR 2 fan-out read. - Comment accuracy: WorkflowContextData.transport comment now present-tense (carried in PR 1; fan-out read lands in PR 2). - Dead-code hygiene: add 'transport' to EXECUTION_EXCLUDED_PARAMS so the legacy execute_bin -> create_workflow_execution path can't TypeError. - Two-resolution-sites: documented the deliberate two-site design + the PR 3 single-chokepoint requirement at execute_workflow_async. - transport.py: drop the unused logger; leave a PR-3 fail-closed marker. - Design doc: correct anchors (transport.py / internal_api_views / workflow_ helper, not execution.py:126), class name (WorkflowContextData not WorkflowExecutionContext), scope claims (stage-1 only in PR 1), and remove the hard-coded Flipt version/date/live-state. - Tests: normalize_transport (passthrough / invalid->celery / None / logging), WorkflowContextData invalid-transport coercion. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review addressed — commit
|
|
| Filename | Overview |
|---|---|
| unstract/core/src/unstract/core/data_models.py | Adds WorkflowTransport enum, DEFAULT_WORKFLOW_TRANSPORT constant, and normalize_transport() fail-closed coercion; except ValueError correctly covers all WorkflowTransport(value) failure modes including None and garbage strings. |
| backend/workflow_manager/workflow_v2/transport.py | New file — resolve_transport() hardwired to "celery" for PR 1; module docstring accurately documents the PR 3 Flipt wiring that will replace the body. |
| backend/workflow_manager/internal_api_views.py | Adds resolve_transport() call and returns transport in the create-execution response; placement is correct (after the WorkflowExecution row is written, before the response is assembled). |
| backend/workflow_manager/workflow_v2/workflow_helper.py | Adds "transport" to EXECUTION_EXCLUDED_PARAMS (prevents the kwarg leaking into the create_workflow_execution classmethod) and resolves/threads transport in execute_workflow_async; the two-resolution-site note is accurate and tracked for PR 3. |
| workers/scheduler/tasks.py | normalize_transport is called correctly after the execution_id null-guard early return (lines 138–143 guard, line 150 extraction), so the log context string is always valid and no work is done on the error path. |
| workers/shared/models/execution_models.py | Adds transport: str = DEFAULT_WORKFLOW_TRANSPORT to WorkflowContextData with post_init coercion via normalize_transport; dataclass field ordering is valid; the live carrier is the correct class (not WorkflowExecutionContext). |
| workers/general/tasks.py | Threads transport parameter through async_execute_bin_general → _execute_general_workflow → WorkflowContextData; default matches DEFAULT_WORKFLOW_TRANSPORT ensuring backward compat with old payloads. |
| workers/queue_backend/pg_queue/9e-design.md | Design document covering payload-carry decision, rejected column alternative, deployment topology, idempotency gate, and 3-PR slice breakdown. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant BE as Backend
participant RT as resolve_transport()
participant S as Scheduler Worker
participant AH as workflow_helper
participant C as Celery (async_execute_bin)
participant WC as WorkflowContextData
note over BE,WC: Scheduler path
S->>BE: create_workflow_execution(request)
BE->>RT: resolve_transport()
RT-->>BE: "celery"
BE-->>S: "{execution_id, transport: "celery"}"
S->>S: normalize_transport("celery") → "celery"
S->>C: "dispatch(async_execute_bin, kwargs={transport: "celery"})"
C->>WC: "WorkflowContextData(transport="celery")"
note over BE,WC: API / async path
AH->>RT: resolve_transport()
RT-->>AH: "celery"
AH->>C: "send_task(async_execute_bin, kwargs={transport: "celery"})"
C->>WC: "WorkflowContextData(transport="celery")"
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant BE as Backend
participant RT as resolve_transport()
participant S as Scheduler Worker
participant AH as workflow_helper
participant C as Celery (async_execute_bin)
participant WC as WorkflowContextData
note over BE,WC: Scheduler path
S->>BE: create_workflow_execution(request)
BE->>RT: resolve_transport()
RT-->>BE: "celery"
BE-->>S: "{execution_id, transport: "celery"}"
S->>S: normalize_transport("celery") → "celery"
S->>C: "dispatch(async_execute_bin, kwargs={transport: "celery"})"
C->>WC: "WorkflowContextData(transport="celery")"
note over BE,WC: API / async path
AH->>RT: resolve_transport()
RT-->>AH: "celery"
AH->>C: "send_task(async_execute_bin, kwargs={transport: "celery"})"
C->>WC: "WorkflowContextData(transport="celery")"
Reviews (3): Last reviewed commit: "UN-3559 [FIX] 9e PR 1 — SonarCloud: drop..." | Re-trigger Greptile
…reptile P1) Move the normalize_transport(...) extraction below the `if not execution_id` guard in _execute_scheduled_workflow. Previously it ran before the guard, so an error response with no execution_id logged a misleading `[exec:None]` context and discarded the computed transport. Now transport is only resolved once execution_id is known non-empty. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Greptile P1 (scheduler |
…ams + dict literal Address SonarCloud code smells on PR #2062: - python:S1172 (x3): resolve_transport() no longer declares the unused workflow_id/pipeline_id/organization_id params — the PR-1 seam is inert and needs no inputs. PR 3 reintroduces them (keyed for Flipt) when it wires the evaluation; the two call sites (internal_api_views view, execute_workflow_async) now call resolve_transport() with no args. Tests updated. - Replace dict(...) constructor with a {...} literal in test_workflow_context_transport._make_context. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
SonarCloud code smells addressed in
Tests green (9 backend + 18 worker). |
|
d6cead2
into
feat/UN-3445-pg-queue-integration
What & why
First of 3 PRs for 9e (the coupled-pipeline migration). This one establishes the per-execution transport seam and nothing else — the transport a workflow execution rides (
"celery"|"pg_queue") is resolved once at the creation chokepoint and carried in the task payload. It is inert: transport always resolves to"celery", so behaviour is byte-identical to today.Why a seam first: 9e migrates the coupled pipeline (
async_execute_bin→ file-batch fan-out → callback) onto the Postgres queue. Because the worker code is shared across both transports and later stages are enqueued from inside the workers, every stage needs to know — per execution — which transport to stay on. This PR lays that plumbing dead (always celery) so the next PR only adds the live PG branch.Design:
workers/queue_backend/pg_queue/9e-design.md(included). Chosen approach = payload-carry, deliberately not aWorkflowExecutioncolumn (no migration on the large shared table; all PG-specific state stays in droppable PG tables).Changes
WorkflowTransportenum (celery|pg_queue) +DEFAULT_WORKFLOW_TRANSPORT(shared vocabulary).resolve_transport()hardwired to celery (signature shaped for the Flipt wiring in PR 3); create-execution internal API returns"transport";execute_workflow_asyncadds it to theasync_execute_binkwargs.transportfrom the create response into the dispatch kwargs;async_execute_bin_general/_execute_general_workflowcarry it onto the liveWorkflowContextData.Tests
test_transport.py— resolver + enum (5)test_workflow_context_transport.py—WorkflowContextDatacarry/default (2)transport, + a new test that the backend-resolved transport is threaded verbatimDev-test (against a running stack)
create-executioninternal API now returns"transport":"celery".async_execute_binsucceeded, executionCOMPLETED).Out of scope (later)
PgBarrierdecrement) + the per-batch idempotency key (ships together).pg_queue_execution_enabled(defaultfalse, no rollouts) already exists in dev — inert until PR 3 reads it.Notes
feat/UN-3445-pg-queue-integration(notmain).🤖 Generated with Claude Code