Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3607 [REFACTOR] retire the executor_rpc mirror — shared dispatcher + injected transport#2102

Merged
muhammad-ali-e merged 5 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3607-shared-executor-rpcZipstack/unstract:UN-3607-shared-executor-rpcCopy head branch name to clipboard
Jun 22, 2026
Merged

UN-3607 [REFACTOR] retire the executor_rpc mirror — shared dispatcher + injected transport#2102
muhammad-ali-e merged 5 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3607-shared-executor-rpcZipstack/unstract:UN-3607-shared-executor-rpcCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Retires the byte-for-byte backend ↔ workers executor_rpc.py mirror that SonarCloud has flagged on every executor PR. The two carried identical dispatch logic (gate + reply_key/timeout orchestration + routing + never-raises); the only real difference is the transport primitive — backend enqueues via the Django ORM, workers via psycopg2. Targets feat/UN-3445-pg-queue-integration (not main).

Net −417 lines (823 added / 1240 removed).

How — composition (injected transport), not inheritance

The shared logic now lives once in unstract.workflow_execution.executor_rpc (both backend and workers already depend on it; it can't live in unstract.core because sdk1 imports core → circular). The differing primitive is injected via a QueueTransport Protocol:

  • shared: PgExecutionDispatcher (concrete — dispatch / dispatch_async / dispatch_with_callback + the never-raises orchestration) calling transport.enqueue / wait_for_result; ExecResultRow (normalised result row so the Django-model and dict rows fold to one path); resolve_pg_transport (master-gate value supplied by the caller, then the Flipt flag); RoutingExecutionDispatcher (per-call PG-vs-Celery, with celery/pg/resolve injected).
  • backend adapter: DjangoQueueTransport (enqueue_task + PgTaskResult poll) + settings.PG_QUEUE_TRANSPORT_ENABLED master-gate + factory.
  • workers adapter: PgClientQueueTransport (PgQueueClient + PgResultBackend) + env master-gate + factory.
  • DispatchHandle / signature_to_continuation stay in unstract.core (already shared).

Both call sites (prompt-studio, structure_tool) use the unchanged get_executor_dispatcher factory.

Non-regression

The dispatch logic is the same code, relocated and transport-injected — behaviour (gate, routing, never-raises) is byte-identical.

Tests

The dispatch contract + routing + the Flipt gate matrix are tested once against a fake transport (workers suite); each side's tests shrink to its adapter (enqueue/wait shape) + factory wiring. Backend 7 + workers 36 green; the 53 other executor-touching tests pass.

Dev-tested live (gate ON)

Rebuilt the worker image, recreated the PG workers, restarted the backend — all boot clean (the new shared module imports in the built image; no ImportError). Then ran a Prompt Studio prompt and an ETL:

  • Prompt Studio (backend adapter): PG executor dispatch_with_callback: enqueued … on_success=ide_prompt_complete → 202.
  • ETL (workers adapter, blocking RPC): Executor RPC → PG transportPG executor dispatch: enqueued reply_key=… waiting for result → executor Received execute_extraction … operation=structure_pipelineexecute_extraction complete … success=True.

No tracebacks, no "No executor registered". Both transports drive the shared dispatcher and route over PG end-to-end.

🤖 Generated with Claude Code

… + injected transport

The backend and workers carried byte-for-byte mirrors of the executor-RPC dispatch
(gate + reply_key/timeout orchestration + routing); the only real difference is the
transport primitive (Django ORM vs psycopg2). Lift the shared logic into
unstract.workflow_execution.executor_rpc (which both already depend on; it can't live
in unstract.core because sdk1 imports core → circular) and inject the differing
primitive via a QueueTransport Protocol — composition, not inheritance.

- shared: PgExecutionDispatcher (concrete dispatch/async/with_callback + never-raises)
  calling transport.enqueue / wait_for_result; ExecResultRow (normalised result row);
  resolve_pg_transport (master-gate value supplied by caller, then Flipt);
  RoutingExecutionDispatcher (per-call PG-vs-Celery, injected celery/pg/resolve).
- backend adapter: DjangoQueueTransport (enqueue_task + PgTaskResult poll) + settings
  master-gate + factory.
- workers adapter: PgClientQueueTransport (PgQueueClient + PgResultBackend) + env
  master-gate + factory.

~600 duplicated lines collapse to one base + two ~40-line adapters → clears the
SonarCloud duplication gate permanently. Behaviour is byte-identical (gate, routing,
never-raises): zero regression. The dispatch contract + routing + gate matrix are
tested ONCE against a fake transport (workers suite); each side's tests shrink to its
adapter + factory wiring. Both call sites use the unchanged get_executor_dispatcher.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9d1e8510-1983-47f5-871e-803b71429e2a

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3607-shared-executor-rpc

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

…al dev only)

The Socket.IO log/result channel connects to /api/v1/socket with a websocket-only
transport, but the Vite dev proxy only forwarded /api HTTP (no `ws: true`), so the
upgrade never reached the backend and Prompt Studio results never streamed to the
UI in local dev. ws-proxying was dropped in the CRA→Vite migration (the stale
setupProxy.js comment is the leftover).

Dev-server only: `server.proxy` runs solely under `vite dev`; staging/prod serve a
built bundle behind nginx/Traefik (which already routes /api/v1/socket), so this has
no effect on any deployed environment. Rides the UN-3607 PR.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Heads-up: this PR also carries one small unrelated commit (c8219ef0c) — frontend/vite.config.js adds ws: true to the local Vite dev proxy so the Socket.IO log/result channel (/api/v1/socket, websocket-only transport) actually reaches the backend in local dev. ws-proxying was dropped in the CRA→Vite migration; this restores it.

Deployment-safe: server.proxy runs only under vite dev — staging/prod serve a built bundle behind nginx/Traefik (which already routes /api/v1/socket), so it has zero effect on any deployed env. Happy to split it into its own PR if preferred.

…onarCloud S1172)

The PG path carries org/routing in the enqueue payload, not Celery headers, and the
RoutingExecutionDispatcher strips fairness headers before delegating to the PG
dispatcher — so headers was accepted-but-ignored on dispatch/dispatch_async/
dispatch_with_callback. SonarCloud flagged it as unused (L269/L297). Removed from all
three for consistency; the RoutingExecutionDispatcher (the SDK-substitutable boundary)
keeps headers on its public methods and forwards them only to Celery. Backend 7 +
workers 36 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review — UN-3607 shared executor-RPC dispatcher

Reviewed with the PR Review Toolkit (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier). Scope: the 7 non-lockfile files in this PR, diffed against feat/UN-3445-pg-queue-integration.

Overall: a clean, behaviour-preserving refactor. The two byte-for-byte mirrors are now thin transport adapters over one shared dispatcher; the never-raises contract and fail-closed gating are faithfully preserved, and the backend path actually gains a wait_for_result try/except it previously lacked. No correctness regressions found.

Findings below are inline. Priority summary:

  • High: lost test coverage for dispatch_with_callback on_error/default-task_id; transport classes don't declare QueueTransport conformance.
  • Medium: enqueue-failure drops the carried on_error continuation; status: str COMPLETED-but-null gap; callback params typed Any; an inaccurate "strips" comment; missing dispatch docstring; residual poll-loop duplication; backend poll-loop untested.
  • Low: two confirmed ruff failures (I001, D209); silent master-gate env parse; celery: Any; dispatch_async propagation untested.

Nothing here is blocking.

Comment thread workers/tests/test_executor_rpc.py Outdated
assert kw["task_id"] == task_id and kw.get("reply_key") is None
assert kw.get("on_success") is None and kw.get("on_error") is None

def test_dispatch_with_callback_translates_signatures(self):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High — lost coverage: on_error translation + default task_id are no longer tested.

This is the only dispatch_with_callback test and it passes only on_success plus an explicit task_id="tid-7". The old mirror suites covered two behaviours that are now untested:

  1. on_errorsignature_to_continuation → forwarded as on_error= (shared module executor_rpc.py:306,312). A regression dropping on_error would pass CI — and on-error continuations are how PG-path failures notify the IDE/callback.
  2. auto-generated task_id when omitted (task_id or str(uuid.uuid4()), shared module :302).

Fix: also pass on_error=<second MagicMock signature> and assert kw["on_error"]["task_name"]; add a test_dispatch_with_callback_defaults_task_id that calls with no task_id and asserts handle.id is truthy and equals t.enqueue_calls[0]["task_id"].

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (ae5f99749). test_dispatch_with_callback_translates_both_signatures now passes on_error and asserts kw['on_error']['task_name']; added test_dispatch_with_callback_defaults_task_id (no task_id → uuid on handle == payload task_id).

error: str | None = None


class QueueTransport(Protocol):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High (highest-leverage type fix) — declare conformance to this Protocol on both transports.

QueueTransport is a structural Protocol, but neither DjangoQueueTransport (backend/pg_queue/executor_rpc.py:62) nor PgClientQueueTransport (workers/queue_backend/pg_queue/executor_rpc.py:62) inherits from it. Conformance is therefore only checked at the PgExecutionDispatcher(...) construction site — a signature drift on one side (e.g. renaming org_id) wouldn't surface until runtime, on that side only.

Fix: have both transport classes explicitly subclass QueueTransport. Protocols support explicit inheritance at zero runtime cost, and mypy/pyright will then check each implementation against the seam independently. Cheapest way to keep the seam well-typed across both trees.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (ae5f99749). Both DjangoQueueTransport and PgClientQueueTransport now explicitly inherit QueueTransport, so mypy/pyright check each implementation against the seam independently (zero runtime cost — not @runtime_checkable, just static conformance).

org = str(getattr(context, "organization_id", "") or "")
success_spec = signature_to_continuation(on_success)
error_spec = signature_to_continuation(on_error)
self._transport.enqueue(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — on an enqueue failure here, the carried on_error continuation never fires.

Unlike dispatch (guarded, never raises), dispatch_with_callback calls self._transport.enqueue(...) unguarded, so a transport error (DB down, psycopg2.OperationalError, JSON-serialization error in to_payload) propagates raw. This matches the SDK parity contract, so it's not a regression — but note the hazard: when continuations are carried in the payload and the enqueue itself fails, the executor never runs and the on_error continuation is never fired. Cleanup only happens because call sites wrap dispatch in their own try/except (e.g. prompt_studio/.../views.py:461).

Fix (no behaviour change): log-before-raise so the failure is observable in Sentry even when a caller swallows it, and add a one-line note to the docstring stating callers MUST treat a raised enqueue as the on_error path because the carried on_error will not fire:

try:
    self._transport.enqueue(...)
except Exception:
    logger.exception("PG executor dispatch_with_callback: enqueue failed (executor=%s run_id=%s)", context.executor_name, context.run_id)
    raise

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (ae5f99749, no behaviour change). dispatch_async + dispatch_with_callback now log-before-raise on enqueue failure (observable in Sentry even if a caller swallows it), and the docstring states a raised enqueue IS the failure signal since on_error can't fire (the prompt-studio views already handle this in their try/except).

return ExecutionResult.failure(
error=f"TimeoutError: executor reply not received within {timeout}s"
)
if row.status == PgTaskStatus.COMPLETED.value and row.result is not None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — a COMPLETED row with result is None is silently reported as a generic task failure.

The success branch requires status == COMPLETED.value AND result is not None; anything else falls through to return ExecutionResult.failure(error=row.error or "executor task failed") at line 265. So a producer bug (writes COMPLETED but no result, or a typo status) is indistinguishable from a genuine executor failure — there's no diagnostic that the row was malformed. ExecResultRow.status is also str | None (line 78), so the enum invariant is never enforced at the read boundary.

Fix: add an explicit branch mirroring the malformed-parse handling at lines 242-257 — if status == COMPLETED.value and row.result is None, log + return a "malformed completed row (missing result)" failure. Optionally validate status against PgTaskStatus and warn on an unknown value. Keeps the fail-safe direction while making the producer-contract violation observable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (ae5f99749). A COMPLETED row with result is None now returns a distinct Executor reported completion with no result failure (+ a warning log), not the generic executor task failed.

def dispatch_with_callback(
self,
context: ExecutionContext,
on_success: Any | None = None,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — on_success/on_error typed Any | None discards the real Signature contract.

signature_to_continuation reads .task, .options["queue"], .args, .kwargs and raises ValueError if .task/queue are missing — so the real precondition is "a Celery-Signature-like object." Typing these Any is actually weaker than the SDK method this mirrors (sdk1/.../dispatcher.py types them Signature | None), and lets any value through to a runtime ValueError.

Fix: define a small structural Protocol (e.g. class CallbackSignature(Protocol): task: str; options: dict; args: tuple; kwargs: dict) next to signature_to_continuation in unstract.core.execution_dispatch, and type both that helper and these params as CallbackSignature | None. Documents and checks the precondition at the seam.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (ae5f99749). Added a CallbackSignature(Protocol) (task/options/args/kwargs) in unstract.core.execution_dispatch and typed both signature_to_continuation and the dispatch_with_callback params with it — celery-free, a real Signature conforms structurally.

from django.db import close_old_connections

from pg_queue.flags import PG_QUEUE_FLAG_KEY
from pg_queue.models import PgTaskResult

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low — confirmed ruff I001 failure (CI enforces I). The first-party pg_queue.* imports (lines 22-23) precede the third-party unstract.* imports (lines 24-31); isort wants the local group last. Verified failing via ruff check.

Fix: run ruff check --fix (or move the two from pg_queue... lines below the unstract.workflow_execution import block).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed (ae5f99749). Note: uvx ruff (latest) and the repo's pinned pre-commit ruff disagree on first-party classification for this file — pre-commit (the CI authority) reformatted the import order on commit, so it now matches what CI enforces.

Comment thread workers/tests/test_executor_rpc.py Outdated
c.to_dict.return_value = {"run_id": "r"}
return c
class _FakeTransport:
"""Records ``enqueue`` calls and returns a configured result for the poll —

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low — confirmed ruff D209 failure (CI enforces D). A multi-line docstring must have its closing """ on its own line. Verified failing via ruff check.

Fix: move the closing quotes to a new line (or ruff check --fix):

    """Records ``enqueue`` calls and returns a configured result for the poll —
    so the shared dispatcher's logic is tested without any DB.
    """

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (ae5f99749). Moved the _FakeTransport docstring closing quotes to their own line (also fixed the same on _CeleryDispatcher).

return False
return bool(enabled)

master = os.environ.get(_MASTER_GATE_ENV, "false").lower() == "true"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low — the master-gate env parse fails silently on a fat-fingered value.

os.environ.get(_MASTER_GATE_ENV, "false").lower() == "true" resolves anything other than exactly "true" ("1", "yes", "on", "True " with whitespace) to False, leaving the whole PG transport off with no log line. An operator rolling out with PG_QUEUE_TRANSPORT_ENABLED=1 would see it silently do nothing. This is parity with the original (not a regression), but it's the one gate input that fails quietly — contrast the carefully-logged _resolve_timeout and the FLIPT_SERVICE_AVAILABLE warning.

Fix: warn when the var is set but doesn't parse to a recognized token, e.g. if os.environ.get(_MASTER_GATE_ENV) is non-empty and not in {"true","false"}.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (ae5f99749). The workers master-gate now warns when PG_QUEUE_TRANSPORT_ENABLED is set to anything other than 'true'/'false' (e.g. '1'/'yes'/'on') — no longer a silent no-op.

def __init__(
self,
*,
celery: Any,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low — celery: Any leaves the three duck-typed delegations unchecked.

The class is "duck-typed against the SDK ExecutionDispatcher" but holds it as Any, so the assumption that celery exposes dispatch/dispatch_async/dispatch_with_callback is unverified. Both factories already pass a real ExecutionDispatcher (backend/pg_queue/executor_rpc.py:119, workers/queue_backend/pg_queue/executor_rpc.py:113), and this module already imports it is available via unstract.sdk1.execution.dispatcher.

Fix: type celery: ExecutionDispatcher (import it here) — turns three duck-typed delegations into checked ones. Optionally type the routing dispatch_with_callback return (line 379) as a HasId/DispatchHandle-shaped protocol instead of Any.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (ae5f99749). Added a _CeleryDispatcher(Protocol) with the three delegated methods; the router now holds celery: _CeleryDispatcher instead of Any.

):
PgExecutionDispatcher().dispatch(self._ctx()) # must not raise
assert seen["timeout"] == 3600 # _DEFAULT_TIMEOUT
def test_dispatch_async_is_fire_and_forget(self):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low — the documented dispatch_async "enqueue failures propagate" contract is asserted nowhere.

The shared dispatch_async docstring (executor_rpc.py:272-273) deliberately lets enqueue errors propagate — an intentional asymmetry with the never-raises dispatch. This test covers only the happy path, so a future "consistency" refactor could wrap the enqueue in try/except and silently break the contract (callers rely on the raise to know the task was never enqueued).

Fix: add test_dispatch_async_propagates_enqueue_failure using _FakeTransport(enqueue_raises=RuntimeError(...)) and pytest.raises(RuntimeError) around PgExecutionDispatcher(t).dispatch_async(_ctx()).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (ae5f99749). Added test_dispatch_async_propagates_enqueue_failure (pytest.raises) — pins the intentional asymmetry with the never-raises dispatch.

…e-dup the poll loop

Toolkit review on #2102 (all 14):
- Both transports now explicitly inherit QueueTransport so a type-checker verifies
  each implementation against the seam (not just at the construction site).
- CallbackSignature Protocol in unstract.core types signature_to_continuation +
  dispatch_with_callback params (was Any); _CeleryDispatcher Protocol types the
  router's celery dependency (was Any).
- Extracted the backend poll loop into a shared poll_for_row(fetch, timeout,
  between_polls=...) — the one duplicated logic that survived.
- dispatch gets a docstring; the fire-and-forget enqueues log-before-raise (+ a note
  that a raised enqueue IS the failure signal since on_error can't fire); a COMPLETED
  row with no result is a distinct error; reworded the "strips headers" comment.
- Workers master-gate warns on a non-true/false value (was a silent no-op).
- Lint (I001/D209); tests: on_error translation + default task_id, dispatch_async
  propagates, backend multi-iteration poll. Backend 8 + workers 38 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round addressed — ae5f99749. All 14, per-thread replies on each. (SonarCloud already green.)

High

  • Both transports explicitly inherit QueueTransport → static conformance checked per-implementation, not just at the construction site.
  • Restored the lost dispatch_with_callback coverage: on_error translation + default-task_id.

Medium

  • CallbackSignature Protocol types signature_to_continuation + the callback params (was Any); _CeleryDispatcher Protocol types the router's celery (was Any).
  • Extracted the surviving duplicated poll loop into a shared poll_for_row(...); the backend adapter is now a 3-line fetch + delegate.
  • dispatch docstring; enqueue log-before-raise + a docstring note that a raised enqueue is the failure signal (on_error can't fire); a COMPLETED-but-no-result row is now a distinct error; reworded "strips headers" → "does not forward".

Low

  • Workers master-gate warns on a fat-fingered value (no longer a silent no-op); celery: Any_CeleryDispatcher; I001 (pinned-ruff order, the CI authority) + D209 fixed; added dispatch_async-propagates and backend multi-iteration-poll tests.

Backend 8 + workers 38 green; ruff/pre-commit clean.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 22, 2026 05:46
@greptile-apps

greptile-apps Bot commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR retires the byte-for-byte executor_rpc.py mirror that existed independently in both the backend and workers by extracting all transport-agnostic dispatch logic into a single shared unstract.workflow_execution.executor_rpc module and injecting the differing transport primitive via a QueueTransport Protocol. The new poll_for_row helper in unstract.core.polling unifies the backoff constants that were previously duplicated across both result pollers.

  • Shared dispatcher (PgExecutionDispatcher, RoutingExecutionDispatcher, resolve_pg_transport): owns the never-raises contract, gate logic, routing, and reply_key orchestration exactly once; both sides inject their concrete QueueTransport (DjangoQueueTransport for backend, PgClientQueueTransport for workers) at the factory call site.
  • Tests consolidated: the dispatch contract and gate matrix are now verified once against a _FakeTransport; each adapter suite is scoped to its own enqueue/poll shape and factory wiring.
  • Ancillary fix (frontend/vite.config.js): adds ws: true to the dev proxy so Socket.IO WebSocket upgrades are forwarded in dev; production Traefik routing is unchanged.

Confidence Score: 5/5

Safe to merge — the dispatch logic is identical code relocated rather than rewritten, both adapters delegate to the same never-raises contract, and the gate/routing/backoff behaviour is byte-equivalent to the removed mirrors.

The refactoring eliminates duplication via composition without changing any observable runtime behaviour: the gate logic, reply_key orchestration, backoff constants, and never-raises contract are all preserved. The new try/except around wait_for_result in PgExecutionDispatcher.dispatch is a strictly safer variant of the old uncovered path. Both transport adapters are covered by their own test suites, the shared contract is verified once against a fake transport, and the PR description documents successful live testing of both transports end-to-end. No logic was changed — only the module boundary moved.

No files require special attention.

Important Files Changed

Filename Overview
unstract/workflow-execution/src/unstract/workflow_execution/executor_rpc.py New shared module — owns dispatch contract, gate logic, routing, and the QueueTransport Protocol seam exactly once; composition-injected, no Django/psycopg2 dependency.
backend/pg_queue/executor_rpc.py Thinned to DjangoQueueTransport adapter + settings master gate + factory; dispatch logic fully delegated to shared module; all re-exports preserve existing import paths.
workers/queue_backend/pg_queue/executor_rpc.py Thinned to PgClientQueueTransport adapter + env master gate + factory; includes warning log for non-true/false env values to catch operator fat-finger errors.
unstract/core/src/unstract/core/polling.py New poll_for_row helper — capped-exponential-backoff loop with between_polls hook; used by both DjangoQueueTransport and PgResultBackend, so backoff constants live in exactly one place.
workers/queue_backend/pg_queue/result_backend.py Drops its own polling loop in favour of shared poll_for_row; poll_interval parameter still accepted for test/tuning; connection lifecycle unchanged.
workers/tests/test_executor_rpc.py Expanded to cover the shared dispatch contract (fake transport), gate matrix, routing zero-regression, workers adapter, and shared helpers — 38 tests, single canonical home for contract verification.
backend/pg_queue/tests/test_executor_rpc.py Properly scoped to backend-specific concerns: DjangoQueueTransport enqueue/poll shapes, settings master gate, and factory wiring; shared contract tests removed to avoid duplication.
frontend/vite.config.js Adds ws: true to the dev proxy so Socket.IO WebSocket upgrades are forwarded to the backend in dev mode; production path (Traefik) unaffected.
unstract/workflow-execution/pyproject.toml Adds unstract-flags and unstract-sdk1 as explicit dependencies to match the new shared executor_rpc imports; uv.lock files updated accordingly.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Caller as Caller
    participant RD as RoutingExecutionDispatcher
    participant Gate as resolve_pg_transport
    participant PGD as PgExecutionDispatcher
    participant T as QueueTransport
    participant Poll as poll_for_row
    participant CD as ExecutionDispatcher (Celery)

    Caller->>RD: dispatch(context, timeout, headers)
    RD->>Gate: resolve(context) → bool
    alt PG enabled
        RD->>PGD: dispatch(context, timeout)
        PGD->>T: enqueue(queue, context, org_id, reply_key)
        T-->>PGD: row enqueued
        PGD->>T: wait_for_result(reply_key, timeout)
        T->>Poll: poll_for_row(fetch, timeout, between_polls?)
        loop Until row or timeout
            Poll->>Poll: fetch() / sleep
        end
        Poll-->>T: ExecResultRow or None
        T-->>PGD: ExecResultRow or None
        PGD-->>Caller: ExecutionResult (never raises)
    else PG disabled
        RD->>CD: dispatch(context, timeout, headers)
        CD-->>Caller: ExecutionResult
    end
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Caller as Caller
    participant RD as RoutingExecutionDispatcher
    participant Gate as resolve_pg_transport
    participant PGD as PgExecutionDispatcher
    participant T as QueueTransport
    participant Poll as poll_for_row
    participant CD as ExecutionDispatcher (Celery)

    Caller->>RD: dispatch(context, timeout, headers)
    RD->>Gate: resolve(context) → bool
    alt PG enabled
        RD->>PGD: dispatch(context, timeout)
        PGD->>T: enqueue(queue, context, org_id, reply_key)
        T-->>PGD: row enqueued
        PGD->>T: wait_for_result(reply_key, timeout)
        T->>Poll: poll_for_row(fetch, timeout, between_polls?)
        loop Until row or timeout
            Poll->>Poll: fetch() / sleep
        end
        Poll-->>T: ExecResultRow or None
        T-->>PGD: ExecResultRow or None
        PGD-->>Caller: ExecutionResult (never raises)
    else PG disabled
        RD->>CD: dispatch(context, timeout, headers)
        CD-->>Caller: ExecutionResult
    end
Loading

Reviews (2): Last reviewed commit: "UN-3607 [FIX] greptile — unify the PG po..." | Re-trigger Greptile

Comment thread unstract/workflow-execution/src/unstract/workflow_execution/executor_rpc.py Outdated
… unstract.core)

greptile P2: poll_for_row was backend-only — the workers PgResultBackend.wait_for_result
still had its own identical backoff loop, so the constants lived in two places. Since
poll_for_row is pure (no Django/psycopg/SDK deps), moved it to unstract.core.polling
where BOTH PG result pollers import it: the backend DjangoQueueTransport and the workers
PgResultBackend. The backoff now lives in exactly one place to tune.

- new unstract.core.polling.poll_for_row (the shared backoff skeleton)
- workflow_execution.executor_rpc: dropped poll_for_row (+ its time/TypeVar imports)
- backend adapter + PgResultBackend.wait_for_result both delegate to it
- behaviour-identical: backend 8 + workers 38 + result_backend 7 (real-PG) green

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e muhammad-ali-e merged commit 20c6e0b into feat/UN-3445-pg-queue-integration Jun 22, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3607-shared-executor-rpc branch June 22, 2026 06:15
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.