Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3574 [FEAT] PG Queue 9e 2d — orchestrator (async_execute_bin) on PG + shared TaskPayload contract#2072

Merged
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3574-FEAT_orchestrator_on_pgZipstack/unstract:feat/UN-3574-FEAT_orchestrator_on_pgCopy head branch name to clipboard
Jun 18, 2026
Merged

UN-3574 [FEAT] PG Queue 9e 2d — orchestrator (async_execute_bin) on PG + shared TaskPayload contract#2072
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3574-FEAT_orchestrator_on_pgZipstack/unstract:feat/UN-3574-FEAT_orchestrator_on_pgCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

  • Route the orchestrator task (async_execute_bin) onto the PG queue when transport == pg_queue, so a pg_queue execution's orchestrate → fan-out → barrier → callback all run on PG (previously hybrid — orchestrator on Celery, only fan-out/callback on PG).
  • Promote the PG-message wire contract (TaskPayload, FairnessPayload) into unstract.core; the workers re-export them.
  • New backend PG producer (backend/pg_queue/producer.py) — the backend had the pg_queue tables but no way to enqueue to them.

Why

  • Completes the execution-pipeline migration onto PG except the executor/tool run (PR B). Part of PG Queue Phase 9 (UN-3536), sub-task UN-3574.

How

  • Shared contract: TaskPayload / FairnessPayload move to unstract.core.data_models so the backend producer and the worker consumer agree on one definition (separate codebases can't import each other). Workers re-export from their old modules → existing imports unchanged.
  • Backend producer: enqueues a TaskPayload row to pg_queue_message via the PgQueueMessage ORM, mirroring the workers' PgQueueClient.send. UUIDs in args/kwargs are JSON-coerced (the message JSONField has no Django encoder; the worker consumer already receives string ids on the existing PG path).
  • Backend dispatch (execute_workflow_async): on pg_queue, enqueue async_execute_bin to PG (general → "celery", api → "celery_api_deployments") instead of celery_app.send_task; task_id becomes "pg:<msg_id>". The TimeoutError branch is guarded for the no-AsyncResult PG path.
  • Scheduler dispatch: pass backend=QueueBackend.PG when is_pg_transport(transport)dispatch() already had the per-call override (from 2a).

Can this PR break any existing features?

  • No. Gated off by default — while the env master-gate (PG_QUEUE_TRANSPORT_ENABLED) or the Flipt flag is off, transport resolves to celery and both dispatch sites take the unchanged Celery path. The shared-contract move is a pure relocation with re-exports (82 workers tests green). The executor (tool run) and log workers remain on Celery.

Database Migrations

  • None.

Env Config

  • None new (uses the existing PG_QUEUE_TRANSPORT_ENABLED gate + Flipt flag from PR 3).

Notes on Testing

  • Unit: 5 producer tests (wire-shape row, UUID→str coercion, default queue, empty args, priority bounds). Workers regression suite (fairness / dispatch_pg / pg_queue_client / routing) green after the core move.
  • Dev-tested end-to-end (gate+flag on): a real API deployment had the backend enqueue async_execute_bin to PG (queue=celery_api_deployments), the orchestrator PG consumer ran it (negative check: 0 async_execute_bin lines in the Celery general/api-deployment logs), then fan-out → PgBarrier decrement → remaining=0 → callback → COMPLETED, with pg_barrier_state / pg_batch_dedup / queue all 0 after.

Related Issues

  • UN-3536 (PG Queue Phase 9 — transport engine), sub-task UN-3574. Builds on PR 3 (UN-3570, Flipt canary).
  • Next: PR B — executor (tool run) on PG. Also tracked: UN-3566 (multi-queue consumer ergonomics).

Checklist

  • Appropriate PR title and description
  • Self-review performed
  • Tests added that prove the feature works
  • New and existing unit tests pass locally
  • Pre-commit passes on all changed files

🤖 Generated with Claude Code

…G + shared TaskPayload

Route the orchestrator task onto the PG queue when transport==pg_queue, so a
pg_queue execution's orchestrate → fan-out → barrier → callback all run on PG
(was hybrid: orchestrator on Celery, only fan-out/callback on PG).

- Promote TaskPayload + FairnessPayload (the PG-message wire contract) to
  unstract.core; workers re-export them so existing imports keep working and the
  backend producer + worker consumer share one definition.
- Backend PG producer (pg_queue/producer.py): enqueue a TaskPayload row to
  pg_queue_message via the ORM, mirroring the workers' PgQueueClient.send. UUIDs
  in args/kwargs are JSON-coerced (the message JSONField has no Django encoder).
- Backend dispatch (execute_workflow_async): when transport==pg_queue, enqueue
  async_execute_bin to PG (general → "celery", api → "celery_api_deployments")
  instead of celery_app.send_task; task_id becomes "pg:<msg_id>".
- Scheduler dispatch: pass backend=QueueBackend.PG when is_pg_transport(transport)
  (dispatch() already had the per-call override from 2a).

Gated off by default — Celery path unchanged. Executor (tool run) + log workers
stay Celery (PR B). Tests: 5 producer unit tests; workers regression suite green
after the core move. Dev-tested end-to-end: a real API deployment ran
async_execute_bin on the orchestrator PG consumer (not Celery), then fan-out →
barrier → callback on PG to COMPLETED, clean teardown.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3126d2b1-f7e9-4330-a37e-e1a792a3bbdf

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/UN-3574-FEAT_orchestrator_on_pg

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR Review Toolkit pass (Code Reviewer, Silent Failure Hunter, Type Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier).

Scope reviewed: the 8 changed files vs the PR base feat/UN-3445-pg-queue-integration. The producer + contract relocation are clean and well-documented; the substantive concerns are concentrated in execute_workflow_async's error/return paths and in test coverage of the new transport fork. Inline comments below, grouped by line. None are blockers individually, but #1 (broad except can mark a live PG job ERROR) and #2 (task_id format divergence) are the two I'd resolve before merge.

Comment thread backend/workflow_manager/workflow_v2/workflow_helper.py
Comment thread backend/workflow_manager/workflow_v2/workflow_helper.py Outdated
Comment thread backend/workflow_manager/workflow_v2/workflow_helper.py Outdated
Comment thread backend/workflow_manager/workflow_v2/workflow_helper.py Outdated
Comment thread backend/workflow_manager/workflow_v2/workflow_helper.py Outdated
Comment thread backend/pg_queue/producer.py Outdated
Comment thread backend/pg_queue/producer.py
Comment thread unstract/core/src/unstract/core/data_models.py Outdated
Comment thread unstract/core/src/unstract/core/data_models.py
Comment thread workers/scheduler/tasks.py
Comment thread backend/pg_queue/producer.py Outdated
…act, fix dispatch fork

SonarCloud (cognitive complexity 18→<15) + review round on #2072:
- Extract WorkflowHelper._dispatch_orchestrator_task — the PG-vs-Celery fork
  moves out of execute_workflow_async (complexity down) and becomes unit-testable.
- HIGH: a `dispatched` flag — a post-dispatch bookkeeping failure no longer flips
  a running (already-enqueued) execution to ERROR; only pre-dispatch failures do.
- HIGH: task_id is now bare `str(msg_id)` (was `pg:{msg_id}`) — one format across
  entry paths, matching the worker PgDispatchHandle.id.
- Drop the dead Celery-only TimeoutError handler (manual poll never raised it;
  the generic handler covers any stray case).
- Consolidate the fairness contract in unstract.core: WorkloadType (StrEnum) +
  FAIRNESS_MIN/MAX/DEFAULT_PRIORITY + FairnessPayload.workload_type Literal;
  workers re-export, backend producer + workflow_helper reference them (no more
  hand-built "api"/"non_api" literals or triplicated [1,10] bounds).
- Producer: log+re-raise on enqueue failure (parity with worker _enqueue_pg);
  document the TaskPayload.queue field as diagnostic-only.

Tests: +dispatch-fork suite (PG vs Celery, bare id, two org sentinels),
+producer boundary/datetime/failure cases, +scheduler backend=PG override
assertions. 34 backend + 99 workers green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round addressed — 2a224e275

Thanks for the deep pass — all 13 threads + the SonarCloud complexity issue are handled.

Structural (the HIGHs + SonarCloud):

  • Extracted _dispatch_orchestrator_taskexecute_workflow_async cognitive complexity back under 15, and the PG-vs-Celery fork is now unit-tested.
  • dispatched flag → a post-dispatch bookkeeping failure no longer flips a running execution to ERROR (only pre-dispatch does).
  • task_id = bare str(msg_id) → one format across entry paths.
  • Dropped the dead Celery-only TimeoutError handler.

Contract consolidation (into unstract.core): WorkloadType (StrEnum), FAIRNESS_MIN/MAX/DEFAULT_PRIORITY, and FairnessPayload.workload_type: Literal[...] — no more hand-built "api"/"non_api" literals or triplicated [1,10] bounds; workers re-export, backend references.

Producer: log+re-raise on enqueue failure (parity with the worker path); TaskPayload.queue documented as diagnostic-only.

Tests: +dispatch-fork suite, +producer boundary/datetime/failure, +scheduler backend=PG override assertions. 34 backend + 99 workers green; pre-commit clean.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 18, 2026 08:14
…handlers

Both handlers in execute_workflow_async were `logger.error(..., exc_info=True)`
inside `except Exception` → switch to `logger.exception(...)` (idiomatic, and
exc_info is implicit). The post-dispatch branch drops the redundant `{error}`
from the message since the traceback is now attached.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@greptile-apps

greptile-apps Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR routes the orchestrator task (async_execute_bin) through the PG queue when transport == pg_queue, completing the PG execution pipeline (excluding the executor/tool run). It also promotes TaskPayload / FairnessPayload into unstract.core as the shared producer↔consumer wire contract, with backward-compatible re-exports in the workers.

  • New backend PG producer (backend/pg_queue/producer.py): enqueues TaskPayload rows to pg_queue_message via the PgQueueMessage ORM; UUID/datetime args are JSON-coerced since the field has no Django encoder.
  • Backend dispatch refactor (workflow_helper.py): execute_workflow_async extracts the PG-vs-Celery fork into _dispatch_orchestrator_task, adds a dispatched flag so post-dispatch bookkeeping failures don't erroneously flip the execution to ERROR, and removes the now-obsolete celery_exceptions.TimeoutError branch.
  • Shared contract move: TaskPayload, FairnessPayload, WorkloadType, and priority bounds migrate from workers/queue_backend/fairness.py and task_payload.py to unstract/core/src/unstract/core/data_models.py, with re-exports preserved for zero-import-churn in existing worker code.

Confidence Score: 5/5

Safe to merge — the PG dispatch path is gated behind an env flag and Flipt toggle, so the Celery path is completely unchanged for any deployment where neither gate is enabled.

All dispatch paths are gated (env + Flipt), the shared-contract move is a pure relocation with re-exports leaving existing imports intact, the producer's exception always propagates to prevent silent double-dispatch, and the dispatched flag correctly separates pre-dispatch failures (mark ERROR) from post-dispatch bookkeeping failures (return EXECUTING). No data model migrations, no removed APIs, and the tests cover the routing decision, wire shape, UUID coercion, and both org sentinel values.

No files require special attention.

Important Files Changed

Filename Overview
backend/pg_queue/producer.py New backend PG producer: enqueues TaskPayload to pg_queue_message via ORM, with UUID/datetime coercion, priority bounds validation, and exception logging before re-raise. Clean mirror of the workers' PgQueueClient.send path.
backend/workflow_manager/workflow_v2/workflow_helper.py Adds _dispatch_orchestrator_task() helper splitting PG/Celery dispatch, a dispatched flag to guard post-dispatch bookkeeping failures from erroneously marking ERROR, and removes the vestigial celery TimeoutError branch. AsyncResult import retained for the still-present make_async_result() method.
unstract/core/src/unstract/core/data_models.py Promotes WorkloadType (StrEnum), FAIRNESS_* priority bounds, FairnessPayload (TypedDict), and TaskPayload (TypedDict) from the workers into this shared core module — the single source of truth for the backend producer ↔ worker consumer wire contract.
workers/queue_backend/fairness.py FairnessPayload, WorkloadType, and priority constants moved to unstract.core; re-exported here for zero import churn. all added. FAIRNESS_HEADER_NAME and FairnessKey dataclass remain worker-side.
workers/queue_backend/pg_queue/task_payload.py TaskPayload definition removed (moved to unstract.core), re-exported via import. to_payload() builder stays worker-side; FairnessKey referenced only under TYPE_CHECKING so the runtime import is clean.
workers/scheduler/tasks.py One-line addition: passes backend=QueueBackend.PG to dispatch() when transport is pg_queue, routing the scheduled orchestrator onto PG. None for Celery transport preserves legacy behavior.
backend/pg_queue/tests/test_producer.py DB-free unit tests for enqueue_task: wire shape, UUID/datetime coercion, default queue, empty args, priority bounds, default priority, and failure propagation. Good coverage of the producer contract.
backend/workflow_manager/workflow_v2/tests/test_dispatch_orchestrator.py New tests for _dispatch_orchestrator_task: PG vs Celery routing, api vs non_api workload type, empty-org sentinel distinction, and Celery path not calling pg_enqueue.
workers/tests/test_dispatch_sites_characterisation.py Two new characterisation tests for the scheduler dispatch site: pg_queue transport passes backend=QueueBackend.PG, celery transport leaves backend=None.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant API as Backend API/Scheduler
    participant WH as WorkflowHelper<br/>execute_workflow_async
    participant DOT as _dispatch_orchestrator_task
    participant PGP as pg_queue/producer<br/>enqueue_task
    participant ORM as PgQueueMessage<br/>ORM
    participant CEL as celery_app<br/>send_task
    participant PGW as PG Queue Worker<br/>async_execute_bin consumer

    API->>WH: execute_workflow_async(queue, transport, ...)
    WH->>WH: "resolve_transport() → pg_queue | celery"
    WH->>DOT: _dispatch_orchestrator_task(transport, queue, args, kwargs)

    alt "transport == pg_queue"
        DOT->>PGP: enqueue_task(task_name, queue, args, kwargs, org_id, fairness)
        PGP->>PGP: _json_safe() coerce UUIDs/datetimes
        PGP->>ORM: PgQueueMessage.objects.create(queue_name, message, org_id, priority)
        ORM-->>PGP: row.msg_id
        PGP-->>DOT: msg_id (int)
        DOT-->>WH: str(msg_id)
        WH->>WH: "dispatched = True"
        PGW->>ORM: poll pg_queue_message WHERE queue_name
        ORM-->>PGW: TaskPayload row
        PGW->>PGW: run async_execute_bin(fan-out → barrier → callback)
    else "transport == celery"
        DOT->>CEL: send_task(async_execute_bin, args, kwargs, queue)
        CEL-->>DOT: AsyncResult.id
        DOT-->>WH: task_id (str)
        WH->>WH: "dispatched = True"
    end
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant API as Backend API/Scheduler
    participant WH as WorkflowHelper<br/>execute_workflow_async
    participant DOT as _dispatch_orchestrator_task
    participant PGP as pg_queue/producer<br/>enqueue_task
    participant ORM as PgQueueMessage<br/>ORM
    participant CEL as celery_app<br/>send_task
    participant PGW as PG Queue Worker<br/>async_execute_bin consumer

    API->>WH: execute_workflow_async(queue, transport, ...)
    WH->>WH: "resolve_transport() → pg_queue | celery"
    WH->>DOT: _dispatch_orchestrator_task(transport, queue, args, kwargs)

    alt "transport == pg_queue"
        DOT->>PGP: enqueue_task(task_name, queue, args, kwargs, org_id, fairness)
        PGP->>PGP: _json_safe() coerce UUIDs/datetimes
        PGP->>ORM: PgQueueMessage.objects.create(queue_name, message, org_id, priority)
        ORM-->>PGP: row.msg_id
        PGP-->>DOT: msg_id (int)
        DOT-->>WH: str(msg_id)
        WH->>WH: "dispatched = True"
        PGW->>ORM: poll pg_queue_message WHERE queue_name
        ORM-->>PGW: TaskPayload row
        PGW->>PGW: run async_execute_bin(fan-out → barrier → callback)
    else "transport == celery"
        DOT->>CEL: send_task(async_execute_bin, args, kwargs, queue)
        CEL-->>DOT: AsyncResult.id
        DOT-->>WH: task_id (str)
        WH->>WH: "dispatched = True"
    end
Loading

Reviews (2): Last reviewed commit: "UN-3574 address greptile: drop redundant..." | Re-trigger Greptile

Comment thread backend/workflow_manager/workflow_v2/workflow_helper.py
…exception

logger.exception() already attaches the active exception's traceback; stack_info
additionally dumps the current call stack, producing a second overlapping stack
trace per error in a direct except handler. Drop it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

greptile round addressed — d48d4986f

Single P2 fixed: dropped the redundant stack_info=True on the pre-dispatch logger.exception() (the exception traceback is already attached; stack_info added a second overlapping stack dump). Thanks for the 4/5 / safe-to-merge.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit 76f39d2 into feat/UN-3445-pg-queue-integration Jun 18, 2026
5 checks passed
@muhammad-ali-e muhammad-ali-e deleted the feat/UN-3574-FEAT_orchestrator_on_pg branch June 18, 2026 08:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.