UN-3574 [FEAT] PG Queue 9e 2d — orchestrator (async_execute_bin) on PG + shared TaskPayload contract#2072
UN-3574 [FEAT] PG Queue 9e 2d — orchestrator (async_execute_bin) on PG + shared TaskPayload contract#2072muhammad-ali-e merged 4 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom feat/UN-3574-FEAT_orchestrator_on_pgZipstack/unstract:feat/UN-3574-FEAT_orchestrator_on_pgCopy head branch name to clipboard
Conversation
…G + shared TaskPayload Route the orchestrator task onto the PG queue when transport==pg_queue, so a pg_queue execution's orchestrate → fan-out → barrier → callback all run on PG (was hybrid: orchestrator on Celery, only fan-out/callback on PG). - Promote TaskPayload + FairnessPayload (the PG-message wire contract) to unstract.core; workers re-export them so existing imports keep working and the backend producer + worker consumer share one definition. - Backend PG producer (pg_queue/producer.py): enqueue a TaskPayload row to pg_queue_message via the ORM, mirroring the workers' PgQueueClient.send. UUIDs in args/kwargs are JSON-coerced (the message JSONField has no Django encoder). - Backend dispatch (execute_workflow_async): when transport==pg_queue, enqueue async_execute_bin to PG (general → "celery", api → "celery_api_deployments") instead of celery_app.send_task; task_id becomes "pg:<msg_id>". - Scheduler dispatch: pass backend=QueueBackend.PG when is_pg_transport(transport) (dispatch() already had the per-call override from 2a). Gated off by default — Celery path unchanged. Executor (tool run) + log workers stay Celery (PR B). Tests: 5 producer unit tests; workers regression suite green after the core move. Dev-tested end-to-end: a real API deployment ran async_execute_bin on the orchestrator PG consumer (not Celery), then fan-out → barrier → callback on PG to COMPLETED, clean teardown. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review Toolkit pass (Code Reviewer, Silent Failure Hunter, Type Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier).
Scope reviewed: the 8 changed files vs the PR base feat/UN-3445-pg-queue-integration. The producer + contract relocation are clean and well-documented; the substantive concerns are concentrated in execute_workflow_async's error/return paths and in test coverage of the new transport fork. Inline comments below, grouped by line. None are blockers individually, but #1 (broad except can mark a live PG job ERROR) and #2 (task_id format divergence) are the two I'd resolve before merge.
…act, fix dispatch fork SonarCloud (cognitive complexity 18→<15) + review round on #2072: - Extract WorkflowHelper._dispatch_orchestrator_task — the PG-vs-Celery fork moves out of execute_workflow_async (complexity down) and becomes unit-testable. - HIGH: a `dispatched` flag — a post-dispatch bookkeeping failure no longer flips a running (already-enqueued) execution to ERROR; only pre-dispatch failures do. - HIGH: task_id is now bare `str(msg_id)` (was `pg:{msg_id}`) — one format across entry paths, matching the worker PgDispatchHandle.id. - Drop the dead Celery-only TimeoutError handler (manual poll never raised it; the generic handler covers any stray case). - Consolidate the fairness contract in unstract.core: WorkloadType (StrEnum) + FAIRNESS_MIN/MAX/DEFAULT_PRIORITY + FairnessPayload.workload_type Literal; workers re-export, backend producer + workflow_helper reference them (no more hand-built "api"/"non_api" literals or triplicated [1,10] bounds). - Producer: log+re-raise on enqueue failure (parity with worker _enqueue_pg); document the TaskPayload.queue field as diagnostic-only. Tests: +dispatch-fork suite (PG vs Celery, bare id, two org sentinels), +producer boundary/datetime/failure cases, +scheduler backend=PG override assertions. 34 backend + 99 workers green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review round addressed —
|
…handlers
Both handlers in execute_workflow_async were `logger.error(..., exc_info=True)`
inside `except Exception` → switch to `logger.exception(...)` (idiomatic, and
exc_info is implicit). The post-dispatch branch drops the redundant `{error}`
from the message since the traceback is now attached.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
| Filename | Overview |
|---|---|
| backend/pg_queue/producer.py | New backend PG producer: enqueues TaskPayload to pg_queue_message via ORM, with UUID/datetime coercion, priority bounds validation, and exception logging before re-raise. Clean mirror of the workers' PgQueueClient.send path. |
| backend/workflow_manager/workflow_v2/workflow_helper.py | Adds _dispatch_orchestrator_task() helper splitting PG/Celery dispatch, a dispatched flag to guard post-dispatch bookkeeping failures from erroneously marking ERROR, and removes the vestigial celery TimeoutError branch. AsyncResult import retained for the still-present make_async_result() method. |
| unstract/core/src/unstract/core/data_models.py | Promotes WorkloadType (StrEnum), FAIRNESS_* priority bounds, FairnessPayload (TypedDict), and TaskPayload (TypedDict) from the workers into this shared core module — the single source of truth for the backend producer ↔ worker consumer wire contract. |
| workers/queue_backend/fairness.py | FairnessPayload, WorkloadType, and priority constants moved to unstract.core; re-exported here for zero import churn. all added. FAIRNESS_HEADER_NAME and FairnessKey dataclass remain worker-side. |
| workers/queue_backend/pg_queue/task_payload.py | TaskPayload definition removed (moved to unstract.core), re-exported via import. to_payload() builder stays worker-side; FairnessKey referenced only under TYPE_CHECKING so the runtime import is clean. |
| workers/scheduler/tasks.py | One-line addition: passes backend=QueueBackend.PG to dispatch() when transport is pg_queue, routing the scheduled orchestrator onto PG. None for Celery transport preserves legacy behavior. |
| backend/pg_queue/tests/test_producer.py | DB-free unit tests for enqueue_task: wire shape, UUID/datetime coercion, default queue, empty args, priority bounds, default priority, and failure propagation. Good coverage of the producer contract. |
| backend/workflow_manager/workflow_v2/tests/test_dispatch_orchestrator.py | New tests for _dispatch_orchestrator_task: PG vs Celery routing, api vs non_api workload type, empty-org sentinel distinction, and Celery path not calling pg_enqueue. |
| workers/tests/test_dispatch_sites_characterisation.py | Two new characterisation tests for the scheduler dispatch site: pg_queue transport passes backend=QueueBackend.PG, celery transport leaves backend=None. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant API as Backend API/Scheduler
participant WH as WorkflowHelper<br/>execute_workflow_async
participant DOT as _dispatch_orchestrator_task
participant PGP as pg_queue/producer<br/>enqueue_task
participant ORM as PgQueueMessage<br/>ORM
participant CEL as celery_app<br/>send_task
participant PGW as PG Queue Worker<br/>async_execute_bin consumer
API->>WH: execute_workflow_async(queue, transport, ...)
WH->>WH: "resolve_transport() → pg_queue | celery"
WH->>DOT: _dispatch_orchestrator_task(transport, queue, args, kwargs)
alt "transport == pg_queue"
DOT->>PGP: enqueue_task(task_name, queue, args, kwargs, org_id, fairness)
PGP->>PGP: _json_safe() coerce UUIDs/datetimes
PGP->>ORM: PgQueueMessage.objects.create(queue_name, message, org_id, priority)
ORM-->>PGP: row.msg_id
PGP-->>DOT: msg_id (int)
DOT-->>WH: str(msg_id)
WH->>WH: "dispatched = True"
PGW->>ORM: poll pg_queue_message WHERE queue_name
ORM-->>PGW: TaskPayload row
PGW->>PGW: run async_execute_bin(fan-out → barrier → callback)
else "transport == celery"
DOT->>CEL: send_task(async_execute_bin, args, kwargs, queue)
CEL-->>DOT: AsyncResult.id
DOT-->>WH: task_id (str)
WH->>WH: "dispatched = True"
end
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant API as Backend API/Scheduler
participant WH as WorkflowHelper<br/>execute_workflow_async
participant DOT as _dispatch_orchestrator_task
participant PGP as pg_queue/producer<br/>enqueue_task
participant ORM as PgQueueMessage<br/>ORM
participant CEL as celery_app<br/>send_task
participant PGW as PG Queue Worker<br/>async_execute_bin consumer
API->>WH: execute_workflow_async(queue, transport, ...)
WH->>WH: "resolve_transport() → pg_queue | celery"
WH->>DOT: _dispatch_orchestrator_task(transport, queue, args, kwargs)
alt "transport == pg_queue"
DOT->>PGP: enqueue_task(task_name, queue, args, kwargs, org_id, fairness)
PGP->>PGP: _json_safe() coerce UUIDs/datetimes
PGP->>ORM: PgQueueMessage.objects.create(queue_name, message, org_id, priority)
ORM-->>PGP: row.msg_id
PGP-->>DOT: msg_id (int)
DOT-->>WH: str(msg_id)
WH->>WH: "dispatched = True"
PGW->>ORM: poll pg_queue_message WHERE queue_name
ORM-->>PGW: TaskPayload row
PGW->>PGW: run async_execute_bin(fan-out → barrier → callback)
else "transport == celery"
DOT->>CEL: send_task(async_execute_bin, args, kwargs, queue)
CEL-->>DOT: AsyncResult.id
DOT-->>WH: task_id (str)
WH->>WH: "dispatched = True"
end
Reviews (2): Last reviewed commit: "UN-3574 address greptile: drop redundant..." | Re-trigger Greptile
…exception logger.exception() already attaches the active exception's traceback; stack_info additionally dumps the current call stack, producing a second overlapping stack trace per error in a direct except handler. Drop it. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
greptile round addressed —
|
|
76f39d2
into
feat/UN-3445-pg-queue-integration
What
async_execute_bin) onto the PG queue whentransport == pg_queue, so a pg_queue execution's orchestrate → fan-out → barrier → callback all run on PG (previously hybrid — orchestrator on Celery, only fan-out/callback on PG).TaskPayload,FairnessPayload) intounstract.core; the workers re-export them.backend/pg_queue/producer.py) — the backend had thepg_queuetables but no way to enqueue to them.Why
How
TaskPayload/FairnessPayloadmove tounstract.core.data_modelsso the backend producer and the worker consumer agree on one definition (separate codebases can't import each other). Workers re-export from their old modules → existing imports unchanged.TaskPayloadrow topg_queue_messagevia thePgQueueMessageORM, mirroring the workers'PgQueueClient.send. UUIDs in args/kwargs are JSON-coerced (themessageJSONFieldhas no Django encoder; the worker consumer already receives string ids on the existing PG path).execute_workflow_async): onpg_queue, enqueueasync_execute_binto PG (general → "celery",api → "celery_api_deployments") instead ofcelery_app.send_task;task_idbecomes"pg:<msg_id>". TheTimeoutErrorbranch is guarded for the no-AsyncResult PG path.backend=QueueBackend.PGwhenis_pg_transport(transport)—dispatch()already had the per-call override (from 2a).Can this PR break any existing features?
PG_QUEUE_TRANSPORT_ENABLED) or the Flipt flag is off,transportresolves toceleryand both dispatch sites take the unchanged Celery path. The shared-contract move is a pure relocation with re-exports (82 workers tests green). The executor (tool run) and log workers remain on Celery.Database Migrations
Env Config
PG_QUEUE_TRANSPORT_ENABLEDgate + Flipt flag from PR 3).Notes on Testing
async_execute_binto PG (queue=celery_api_deployments), the orchestrator PG consumer ran it (negative check: 0async_execute_binlines in the Celery general/api-deployment logs), then fan-out →PgBarrier decrement → remaining=0→ callback →COMPLETED, withpg_barrier_state/pg_batch_dedup/ queue all 0 after.Related Issues
Checklist
🤖 Generated with Claude Code