UN-3607 [REFACTOR] retire the executor_rpc mirror — shared dispatcher + injected transport#2102
UN-3607 [REFACTOR] retire the executor_rpc mirror — shared dispatcher + injected transport#2102muhammad-ali-e merged 5 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3607-shared-executor-rpcZipstack/unstract:UN-3607-shared-executor-rpcCopy head branch name to clipboard
Conversation
… + injected transport The backend and workers carried byte-for-byte mirrors of the executor-RPC dispatch (gate + reply_key/timeout orchestration + routing); the only real difference is the transport primitive (Django ORM vs psycopg2). Lift the shared logic into unstract.workflow_execution.executor_rpc (which both already depend on; it can't live in unstract.core because sdk1 imports core → circular) and inject the differing primitive via a QueueTransport Protocol — composition, not inheritance. - shared: PgExecutionDispatcher (concrete dispatch/async/with_callback + never-raises) calling transport.enqueue / wait_for_result; ExecResultRow (normalised result row); resolve_pg_transport (master-gate value supplied by caller, then Flipt); RoutingExecutionDispatcher (per-call PG-vs-Celery, injected celery/pg/resolve). - backend adapter: DjangoQueueTransport (enqueue_task + PgTaskResult poll) + settings master-gate + factory. - workers adapter: PgClientQueueTransport (PgQueueClient + PgResultBackend) + env master-gate + factory. ~600 duplicated lines collapse to one base + two ~40-line adapters → clears the SonarCloud duplication gate permanently. Behaviour is byte-identical (gate, routing, never-raises): zero regression. The dispatch contract + routing + gate matrix are tested ONCE against a fake transport (workers suite); each side's tests shrink to its adapter + factory wiring. Both call sites use the unchanged get_executor_dispatcher. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…al dev only) The Socket.IO log/result channel connects to /api/v1/socket with a websocket-only transport, but the Vite dev proxy only forwarded /api HTTP (no `ws: true`), so the upgrade never reached the backend and Prompt Studio results never streamed to the UI in local dev. ws-proxying was dropped in the CRA→Vite migration (the stale setupProxy.js comment is the leftover). Dev-server only: `server.proxy` runs solely under `vite dev`; staging/prod serve a built bundle behind nginx/Traefik (which already routes /api/v1/socket), so this has no effect on any deployed environment. Rides the UN-3607 PR. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Heads-up: this PR also carries one small unrelated commit ( Deployment-safe: |
…onarCloud S1172) The PG path carries org/routing in the enqueue payload, not Celery headers, and the RoutingExecutionDispatcher strips fairness headers before delegating to the PG dispatcher — so headers was accepted-but-ignored on dispatch/dispatch_async/ dispatch_with_callback. SonarCloud flagged it as unused (L269/L297). Removed from all three for consistency; the RoutingExecutionDispatcher (the SDK-substitutable boundary) keeps headers on its public methods and forwards them only to Celery. Backend 7 + workers 36 green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review — UN-3607 shared executor-RPC dispatcher
Reviewed with the PR Review Toolkit (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier). Scope: the 7 non-lockfile files in this PR, diffed against feat/UN-3445-pg-queue-integration.
Overall: a clean, behaviour-preserving refactor. The two byte-for-byte mirrors are now thin transport adapters over one shared dispatcher; the never-raises contract and fail-closed gating are faithfully preserved, and the backend path actually gains a wait_for_result try/except it previously lacked. No correctness regressions found.
Findings below are inline. Priority summary:
- High: lost test coverage for
dispatch_with_callbackon_error/default-task_id; transport classes don't declareQueueTransportconformance. - Medium: enqueue-failure drops the carried
on_errorcontinuation;status: strCOMPLETED-but-null gap; callback params typedAny; an inaccurate "strips" comment; missingdispatchdocstring; residual poll-loop duplication; backend poll-loop untested. - Low: two confirmed ruff failures (I001, D209); silent master-gate env parse;
celery: Any;dispatch_asyncpropagation untested.
Nothing here is blocking.
| assert kw["task_id"] == task_id and kw.get("reply_key") is None | ||
| assert kw.get("on_success") is None and kw.get("on_error") is None | ||
|
|
||
| def test_dispatch_with_callback_translates_signatures(self): |
There was a problem hiding this comment.
High — lost coverage: on_error translation + default task_id are no longer tested.
This is the only dispatch_with_callback test and it passes only on_success plus an explicit task_id="tid-7". The old mirror suites covered two behaviours that are now untested:
on_error→signature_to_continuation→ forwarded ason_error=(shared moduleexecutor_rpc.py:306,312). A regression droppingon_errorwould pass CI — and on-error continuations are how PG-path failures notify the IDE/callback.- auto-generated
task_idwhen omitted (task_id or str(uuid.uuid4()), shared module:302).
Fix: also pass on_error=<second MagicMock signature> and assert kw["on_error"]["task_name"]; add a test_dispatch_with_callback_defaults_task_id that calls with no task_id and asserts handle.id is truthy and equals t.enqueue_calls[0]["task_id"].
There was a problem hiding this comment.
Fixed (ae5f99749). test_dispatch_with_callback_translates_both_signatures now passes on_error and asserts kw['on_error']['task_name']; added test_dispatch_with_callback_defaults_task_id (no task_id → uuid on handle == payload task_id).
| error: str | None = None | ||
|
|
||
|
|
||
| class QueueTransport(Protocol): |
There was a problem hiding this comment.
High (highest-leverage type fix) — declare conformance to this Protocol on both transports.
QueueTransport is a structural Protocol, but neither DjangoQueueTransport (backend/pg_queue/executor_rpc.py:62) nor PgClientQueueTransport (workers/queue_backend/pg_queue/executor_rpc.py:62) inherits from it. Conformance is therefore only checked at the PgExecutionDispatcher(...) construction site — a signature drift on one side (e.g. renaming org_id) wouldn't surface until runtime, on that side only.
Fix: have both transport classes explicitly subclass QueueTransport. Protocols support explicit inheritance at zero runtime cost, and mypy/pyright will then check each implementation against the seam independently. Cheapest way to keep the seam well-typed across both trees.
There was a problem hiding this comment.
Fixed (ae5f99749). Both DjangoQueueTransport and PgClientQueueTransport now explicitly inherit QueueTransport, so mypy/pyright check each implementation against the seam independently (zero runtime cost — not @runtime_checkable, just static conformance).
| org = str(getattr(context, "organization_id", "") or "") | ||
| success_spec = signature_to_continuation(on_success) | ||
| error_spec = signature_to_continuation(on_error) | ||
| self._transport.enqueue( |
There was a problem hiding this comment.
Medium — on an enqueue failure here, the carried on_error continuation never fires.
Unlike dispatch (guarded, never raises), dispatch_with_callback calls self._transport.enqueue(...) unguarded, so a transport error (DB down, psycopg2.OperationalError, JSON-serialization error in to_payload) propagates raw. This matches the SDK parity contract, so it's not a regression — but note the hazard: when continuations are carried in the payload and the enqueue itself fails, the executor never runs and the on_error continuation is never fired. Cleanup only happens because call sites wrap dispatch in their own try/except (e.g. prompt_studio/.../views.py:461).
Fix (no behaviour change): log-before-raise so the failure is observable in Sentry even when a caller swallows it, and add a one-line note to the docstring stating callers MUST treat a raised enqueue as the on_error path because the carried on_error will not fire:
try:
self._transport.enqueue(...)
except Exception:
logger.exception("PG executor dispatch_with_callback: enqueue failed (executor=%s run_id=%s)", context.executor_name, context.run_id)
raiseThere was a problem hiding this comment.
Fixed (ae5f99749, no behaviour change). dispatch_async + dispatch_with_callback now log-before-raise on enqueue failure (observable in Sentry even if a caller swallows it), and the docstring states a raised enqueue IS the failure signal since on_error can't fire (the prompt-studio views already handle this in their try/except).
| return ExecutionResult.failure( | ||
| error=f"TimeoutError: executor reply not received within {timeout}s" | ||
| ) | ||
| if row.status == PgTaskStatus.COMPLETED.value and row.result is not None: |
There was a problem hiding this comment.
Medium — a COMPLETED row with result is None is silently reported as a generic task failure.
The success branch requires status == COMPLETED.value AND result is not None; anything else falls through to return ExecutionResult.failure(error=row.error or "executor task failed") at line 265. So a producer bug (writes COMPLETED but no result, or a typo status) is indistinguishable from a genuine executor failure — there's no diagnostic that the row was malformed. ExecResultRow.status is also str | None (line 78), so the enum invariant is never enforced at the read boundary.
Fix: add an explicit branch mirroring the malformed-parse handling at lines 242-257 — if status == COMPLETED.value and row.result is None, log + return a "malformed completed row (missing result)" failure. Optionally validate status against PgTaskStatus and warn on an unknown value. Keeps the fail-safe direction while making the producer-contract violation observable.
There was a problem hiding this comment.
Fixed (ae5f99749). A COMPLETED row with result is None now returns a distinct Executor reported completion with no result failure (+ a warning log), not the generic executor task failed.
| def dispatch_with_callback( | ||
| self, | ||
| context: ExecutionContext, | ||
| on_success: Any | None = None, |
There was a problem hiding this comment.
Medium — on_success/on_error typed Any | None discards the real Signature contract.
signature_to_continuation reads .task, .options["queue"], .args, .kwargs and raises ValueError if .task/queue are missing — so the real precondition is "a Celery-Signature-like object." Typing these Any is actually weaker than the SDK method this mirrors (sdk1/.../dispatcher.py types them Signature | None), and lets any value through to a runtime ValueError.
Fix: define a small structural Protocol (e.g. class CallbackSignature(Protocol): task: str; options: dict; args: tuple; kwargs: dict) next to signature_to_continuation in unstract.core.execution_dispatch, and type both that helper and these params as CallbackSignature | None. Documents and checks the precondition at the seam.
There was a problem hiding this comment.
Fixed (ae5f99749). Added a CallbackSignature(Protocol) (task/options/args/kwargs) in unstract.core.execution_dispatch and typed both signature_to_continuation and the dispatch_with_callback params with it — celery-free, a real Signature conforms structurally.
| from django.db import close_old_connections | ||
|
|
||
| from pg_queue.flags import PG_QUEUE_FLAG_KEY | ||
| from pg_queue.models import PgTaskResult |
There was a problem hiding this comment.
Low — confirmed ruff I001 failure (CI enforces I). The first-party pg_queue.* imports (lines 22-23) precede the third-party unstract.* imports (lines 24-31); isort wants the local group last. Verified failing via ruff check.
Fix: run ruff check --fix (or move the two from pg_queue... lines below the unstract.workflow_execution import block).
There was a problem hiding this comment.
Addressed (ae5f99749). Note: uvx ruff (latest) and the repo's pinned pre-commit ruff disagree on first-party classification for this file — pre-commit (the CI authority) reformatted the import order on commit, so it now matches what CI enforces.
| c.to_dict.return_value = {"run_id": "r"} | ||
| return c | ||
| class _FakeTransport: | ||
| """Records ``enqueue`` calls and returns a configured result for the poll — |
There was a problem hiding this comment.
Low — confirmed ruff D209 failure (CI enforces D). A multi-line docstring must have its closing """ on its own line. Verified failing via ruff check.
Fix: move the closing quotes to a new line (or ruff check --fix):
"""Records ``enqueue`` calls and returns a configured result for the poll —
so the shared dispatcher's logic is tested without any DB.
"""There was a problem hiding this comment.
Fixed (ae5f99749). Moved the _FakeTransport docstring closing quotes to their own line (also fixed the same on _CeleryDispatcher).
| return False | ||
| return bool(enabled) | ||
|
|
||
| master = os.environ.get(_MASTER_GATE_ENV, "false").lower() == "true" |
There was a problem hiding this comment.
Low — the master-gate env parse fails silently on a fat-fingered value.
os.environ.get(_MASTER_GATE_ENV, "false").lower() == "true" resolves anything other than exactly "true" ("1", "yes", "on", "True " with whitespace) to False, leaving the whole PG transport off with no log line. An operator rolling out with PG_QUEUE_TRANSPORT_ENABLED=1 would see it silently do nothing. This is parity with the original (not a regression), but it's the one gate input that fails quietly — contrast the carefully-logged _resolve_timeout and the FLIPT_SERVICE_AVAILABLE warning.
Fix: warn when the var is set but doesn't parse to a recognized token, e.g. if os.environ.get(_MASTER_GATE_ENV) is non-empty and not in {"true","false"}.
There was a problem hiding this comment.
Fixed (ae5f99749). The workers master-gate now warns when PG_QUEUE_TRANSPORT_ENABLED is set to anything other than 'true'/'false' (e.g. '1'/'yes'/'on') — no longer a silent no-op.
| def __init__( | ||
| self, | ||
| *, | ||
| celery: Any, |
There was a problem hiding this comment.
Low — celery: Any leaves the three duck-typed delegations unchecked.
The class is "duck-typed against the SDK ExecutionDispatcher" but holds it as Any, so the assumption that celery exposes dispatch/dispatch_async/dispatch_with_callback is unverified. Both factories already pass a real ExecutionDispatcher (backend/pg_queue/executor_rpc.py:119, workers/queue_backend/pg_queue/executor_rpc.py:113), and this module already imports it is available via unstract.sdk1.execution.dispatcher.
Fix: type celery: ExecutionDispatcher (import it here) — turns three duck-typed delegations into checked ones. Optionally type the routing dispatch_with_callback return (line 379) as a HasId/DispatchHandle-shaped protocol instead of Any.
There was a problem hiding this comment.
Fixed (ae5f99749). Added a _CeleryDispatcher(Protocol) with the three delegated methods; the router now holds celery: _CeleryDispatcher instead of Any.
| ): | ||
| PgExecutionDispatcher().dispatch(self._ctx()) # must not raise | ||
| assert seen["timeout"] == 3600 # _DEFAULT_TIMEOUT | ||
| def test_dispatch_async_is_fire_and_forget(self): |
There was a problem hiding this comment.
Low — the documented dispatch_async "enqueue failures propagate" contract is asserted nowhere.
The shared dispatch_async docstring (executor_rpc.py:272-273) deliberately lets enqueue errors propagate — an intentional asymmetry with the never-raises dispatch. This test covers only the happy path, so a future "consistency" refactor could wrap the enqueue in try/except and silently break the contract (callers rely on the raise to know the task was never enqueued).
Fix: add test_dispatch_async_propagates_enqueue_failure using _FakeTransport(enqueue_raises=RuntimeError(...)) and pytest.raises(RuntimeError) around PgExecutionDispatcher(t).dispatch_async(_ctx()).
There was a problem hiding this comment.
Fixed (ae5f99749). Added test_dispatch_async_propagates_enqueue_failure (pytest.raises) — pins the intentional asymmetry with the never-raises dispatch.
…e-dup the poll loop Toolkit review on #2102 (all 14): - Both transports now explicitly inherit QueueTransport so a type-checker verifies each implementation against the seam (not just at the construction site). - CallbackSignature Protocol in unstract.core types signature_to_continuation + dispatch_with_callback params (was Any); _CeleryDispatcher Protocol types the router's celery dependency (was Any). - Extracted the backend poll loop into a shared poll_for_row(fetch, timeout, between_polls=...) — the one duplicated logic that survived. - dispatch gets a docstring; the fire-and-forget enqueues log-before-raise (+ a note that a raised enqueue IS the failure signal since on_error can't fire); a COMPLETED row with no result is a distinct error; reworded the "strips headers" comment. - Workers master-gate warns on a non-true/false value (was a silent no-op). - Lint (I001/D209); tests: on_error translation + default task_id, dispatch_async propagates, backend multi-iteration poll. Backend 8 + workers 38 green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Review round addressed — High
Medium
Low
Backend 8 + workers 38 green; ruff/pre-commit clean. |
|
| Filename | Overview |
|---|---|
| unstract/workflow-execution/src/unstract/workflow_execution/executor_rpc.py | New shared module — owns dispatch contract, gate logic, routing, and the QueueTransport Protocol seam exactly once; composition-injected, no Django/psycopg2 dependency. |
| backend/pg_queue/executor_rpc.py | Thinned to DjangoQueueTransport adapter + settings master gate + factory; dispatch logic fully delegated to shared module; all re-exports preserve existing import paths. |
| workers/queue_backend/pg_queue/executor_rpc.py | Thinned to PgClientQueueTransport adapter + env master gate + factory; includes warning log for non-true/false env values to catch operator fat-finger errors. |
| unstract/core/src/unstract/core/polling.py | New poll_for_row helper — capped-exponential-backoff loop with between_polls hook; used by both DjangoQueueTransport and PgResultBackend, so backoff constants live in exactly one place. |
| workers/queue_backend/pg_queue/result_backend.py | Drops its own polling loop in favour of shared poll_for_row; poll_interval parameter still accepted for test/tuning; connection lifecycle unchanged. |
| workers/tests/test_executor_rpc.py | Expanded to cover the shared dispatch contract (fake transport), gate matrix, routing zero-regression, workers adapter, and shared helpers — 38 tests, single canonical home for contract verification. |
| backend/pg_queue/tests/test_executor_rpc.py | Properly scoped to backend-specific concerns: DjangoQueueTransport enqueue/poll shapes, settings master gate, and factory wiring; shared contract tests removed to avoid duplication. |
| frontend/vite.config.js | Adds ws: true to the dev proxy so Socket.IO WebSocket upgrades are forwarded to the backend in dev mode; production path (Traefik) unaffected. |
| unstract/workflow-execution/pyproject.toml | Adds unstract-flags and unstract-sdk1 as explicit dependencies to match the new shared executor_rpc imports; uv.lock files updated accordingly. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Caller as Caller
participant RD as RoutingExecutionDispatcher
participant Gate as resolve_pg_transport
participant PGD as PgExecutionDispatcher
participant T as QueueTransport
participant Poll as poll_for_row
participant CD as ExecutionDispatcher (Celery)
Caller->>RD: dispatch(context, timeout, headers)
RD->>Gate: resolve(context) → bool
alt PG enabled
RD->>PGD: dispatch(context, timeout)
PGD->>T: enqueue(queue, context, org_id, reply_key)
T-->>PGD: row enqueued
PGD->>T: wait_for_result(reply_key, timeout)
T->>Poll: poll_for_row(fetch, timeout, between_polls?)
loop Until row or timeout
Poll->>Poll: fetch() / sleep
end
Poll-->>T: ExecResultRow or None
T-->>PGD: ExecResultRow or None
PGD-->>Caller: ExecutionResult (never raises)
else PG disabled
RD->>CD: dispatch(context, timeout, headers)
CD-->>Caller: ExecutionResult
end
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Caller as Caller
participant RD as RoutingExecutionDispatcher
participant Gate as resolve_pg_transport
participant PGD as PgExecutionDispatcher
participant T as QueueTransport
participant Poll as poll_for_row
participant CD as ExecutionDispatcher (Celery)
Caller->>RD: dispatch(context, timeout, headers)
RD->>Gate: resolve(context) → bool
alt PG enabled
RD->>PGD: dispatch(context, timeout)
PGD->>T: enqueue(queue, context, org_id, reply_key)
T-->>PGD: row enqueued
PGD->>T: wait_for_result(reply_key, timeout)
T->>Poll: poll_for_row(fetch, timeout, between_polls?)
loop Until row or timeout
Poll->>Poll: fetch() / sleep
end
Poll-->>T: ExecResultRow or None
T-->>PGD: ExecResultRow or None
PGD-->>Caller: ExecutionResult (never raises)
else PG disabled
RD->>CD: dispatch(context, timeout, headers)
CD-->>Caller: ExecutionResult
end
Reviews (2): Last reviewed commit: "UN-3607 [FIX] greptile — unify the PG po..." | Re-trigger Greptile
… unstract.core) greptile P2: poll_for_row was backend-only — the workers PgResultBackend.wait_for_result still had its own identical backoff loop, so the constants lived in two places. Since poll_for_row is pure (no Django/psycopg/SDK deps), moved it to unstract.core.polling where BOTH PG result pollers import it: the backend DjangoQueueTransport and the workers PgResultBackend. The backoff now lives in exactly one place to tune. - new unstract.core.polling.poll_for_row (the shared backoff skeleton) - workflow_execution.executor_rpc: dropped poll_for_row (+ its time/TypeVar imports) - backend adapter + PgResultBackend.wait_for_result both delegate to it - behaviour-identical: backend 8 + workers 38 + result_backend 7 (real-PG) green Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
20c6e0b
into
feat/UN-3445-pg-queue-integration
|
What
Retires the byte-for-byte backend ↔ workers
executor_rpc.pymirror that SonarCloud has flagged on every executor PR. The two carried identical dispatch logic (gate + reply_key/timeout orchestration + routing + never-raises); the only real difference is the transport primitive — backend enqueues via the Django ORM, workers via psycopg2. Targetsfeat/UN-3445-pg-queue-integration(notmain).Net −417 lines (823 added / 1240 removed).
How — composition (injected transport), not inheritance
The shared logic now lives once in
unstract.workflow_execution.executor_rpc(both backend and workers already depend on it; it can't live inunstract.corebecausesdk1importscore→ circular). The differing primitive is injected via aQueueTransportProtocol:PgExecutionDispatcher(concrete —dispatch/dispatch_async/dispatch_with_callback+ the never-raises orchestration) callingtransport.enqueue/wait_for_result;ExecResultRow(normalised result row so the Django-model and dict rows fold to one path);resolve_pg_transport(master-gate value supplied by the caller, then the Flipt flag);RoutingExecutionDispatcher(per-call PG-vs-Celery, withcelery/pg/resolveinjected).DjangoQueueTransport(enqueue_task+PgTaskResultpoll) +settings.PG_QUEUE_TRANSPORT_ENABLEDmaster-gate + factory.PgClientQueueTransport(PgQueueClient+PgResultBackend) + env master-gate + factory.DispatchHandle/signature_to_continuationstay inunstract.core(already shared).Both call sites (prompt-studio,
structure_tool) use the unchangedget_executor_dispatcherfactory.Non-regression
The dispatch logic is the same code, relocated and transport-injected — behaviour (gate, routing, never-raises) is byte-identical.
Tests
The dispatch contract + routing + the Flipt gate matrix are tested once against a fake transport (workers suite); each side's tests shrink to its adapter (enqueue/wait shape) + factory wiring. Backend 7 + workers 36 green; the 53 other executor-touching tests pass.
Dev-tested live (gate ON)
Rebuilt the worker image, recreated the PG workers, restarted the backend — all boot clean (the new shared module imports in the built image; no
ImportError). Then ran a Prompt Studio prompt and an ETL:PG executor dispatch_with_callback: enqueued … on_success=ide_prompt_complete→ 202.Executor RPC → PG transport→PG executor dispatch: enqueued reply_key=… waiting for result→ executorReceived execute_extraction … operation=structure_pipeline→execute_extraction complete … success=True.No tracebacks, no "No executor registered". Both transports drive the shared dispatcher and route over PG end-to-end.
🤖 Generated with Claude Code