UN-3605 [GATED-FEAT] PG Queue — structure_tool executor RPC over Postgres (+ register executors in PG consumer)#2095
UN-3605 [GATED-FEAT] PG Queue — structure_tool executor RPC over Postgres (+ register executors in PG consumer)#2095muhammad-ali-e merged 2 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom feat/UN-3605-pg-structure-tool-executor-rpcZipstack/unstract:feat/UN-3605-pg-structure-tool-executor-rpcCopy head branch name to clipboard
Conversation
…gres (+ register executors in PG consumer)
Routes the in-workflow structure_tool executor dispatch onto the Postgres
executor RPC when the single pg_queue_enabled flag is on; Celery otherwise
(zero-regression by construction). Next slice after UN-3603 (prompt-studio
blocking path). Gated off by default.
- workers queue_backend/pg_queue/executor_rpc.py (new): the workers twin of
the backend module on worker primitives — resolve_executor_transport
(master PG_QUEUE_TRANSPORT_ENABLED env, then the single pg_queue_enabled
Flipt flag, fail-closed to Celery); PgExecutionDispatcher (enqueue
execute_extraction with a unique reply_key via PgQueueClient, poll
PgResultBackend; never raises — timeout/failure -> ExecutionResult.failure);
RoutingExecutionDispatcher (per-call gate routing; async/callback stay on
Celery); get_executor_dispatcher factory.
- to_payload gains an optional reply_key (request-reply marker; fire-and-forget
rows stay byte-identical).
- structure_tool_task swaps its dispatcher factory to get_executor_dispatcher;
the 3 blocking dispatch call sites are unchanged.
- Fix (latent gap exposed by this slice): the PG executor consumer registered
no executors ("No executor registered with name 'legacy'"). The
@ExecutorRegistry.register side-effect import lived only in the Celery
executor/worker.py, but the PG consumer bootstraps via executor/tasks.py.
Import executor.executors from executor/tasks.py so any worker registering
execute_extraction (Celery or PG) has the executors — also fixes the
prompt-studio PG path from UN-3603.
- sample.env: document the worker-side PG_QUEUE_TRANSPORT_ENABLED gate; fix a
stale pg_queue_execution_enabled -> pg_queue_enabled doc reference.
- Tests: workers executor_rpc unit suite (gate fail-closed matrix,
zero-regression routing, never-raises dispatch); subprocess regression that
importing executor.tasks alone registers the executors.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review Toolkit pass (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier).
Overall: a careful, well-documented, faithful workers-side port of the backend executor RPC. The gating, the never-raises contract, and the zero-regression-when-off property are correct. No blocking bugs. Findings below are hardening + test-coverage items; two HIGH/MED items (corrupt METADATA.json swallow at structure_tool_task.py:~858; stale ~508 line-ref at :~453) fall outside this PR's diff and are in the summary rather than inline.
| error=f"TimeoutError: executor reply not received within {timeout}s" | ||
| ) | ||
| if ( | ||
| row["status"] == PgTaskStatus.COMPLETED.value |
There was a problem hiding this comment.
[Medium — correctness / never-raises] row["status"] uses bracket access (KeyError-raising), while result/error below use .get(). This access is not inside the malformed-result try (that only guards from_dict), so a result row missing status would raise straight out of dispatch() and violate the documented never-raises contract.
if (
row.get("status") == PgTaskStatus.COMPLETED.value
and row.get("result") is not None
):Producer (result_backend.get_result) always sets status today, so this is latent — but the contract should not depend on the producer.
There was a problem hiding this comment.
Fixed in 6e3e5c6 — row.get("status") now (result/error already used .get), so a status-less row can't raise out of dispatch(). Never-raises no longer depends on the producer writing every key.
| try: | ||
| timeout = int(os.environ.get(_DEFAULT_TIMEOUT_ENV, _DEFAULT_TIMEOUT)) | ||
| except (TypeError, ValueError): | ||
| timeout = _DEFAULT_TIMEOUT |
There was a problem hiding this comment.
[Medium — silent config masking] Guarding the parse so a bad EXECUTOR_RESULT_TIMEOUT can't break the never-raises contract is right, but the swallow is silent: an operator who fat-fingers EXECUTOR_RESULT_TIMEOUT=3O00 gets the 3600s default with no signal, and every RPC may wait up to an hour instead of the intended value. Log the fallback:
except (TypeError, ValueError):
logger.warning(
"Invalid %s=%r; falling back to %ss",
_DEFAULT_TIMEOUT_ENV, os.environ.get(_DEFAULT_TIMEOUT_ENV), _DEFAULT_TIMEOUT,
)
timeout = _DEFAULT_TIMEOUTThere was a problem hiding this comment.
Fixed in 6e3e5c6 — the EXECUTOR_RESULT_TIMEOUT parse fallback now logs a WARNING (with the bad value) before defaulting to 3600s.
| self, | ||
| context: ExecutionContext, | ||
| timeout: int | None = None, | ||
| ) -> ExecutionResult: |
There was a problem hiding this comment.
[Medium — substitutability / type design] PgExecutionDispatcher.dispatch(context, timeout) omits the headers param that both the SDK ExecutionDispatcher.dispatch and RoutingExecutionDispatcher.dispatch accept. Today only RoutingExecutionDispatcher is handed out (and it drops headers on the PG path intentionally), so this is latent — but anyone calling a PgExecutionDispatcher directly (the tests do) gets a non-substitutable object, and nothing static catches it. Minimum fix: add headers: dict[str, Any] | None = None (accept-and-ignore) so the dispatch shapes match. Better: define an ExecutorDispatcher Protocol in unstract.core and annotate the SDK base + both PG classes against it — structural, so it preserves the "no SDK edit" constraint while letting mypy verify the duck-type.
There was a problem hiding this comment.
Fixed in 6e3e5c6 — added headers: dict | None = None (accept-and-ignore) to PgExecutionDispatcher.dispatch so the shape matches the SDK/Routing dispatch. The shared ExecutorDispatcher Protocol in unstract.core is the broader TODO(shared) (same place the de-dup would land).
| context.run_id, | ||
| ) | ||
| return ExecutionResult.failure( | ||
| error=f"Malformed executor result for reply_key {reply_key}" |
There was a problem hiding this comment.
[Low] The logger.exception captures the traceback (good), but the caller-facing ExecutionResult.failure only says Malformed executor result for reply_key {reply_key} — the actual parse cause isn't surfaced to a UI reading result.error. For parity with the enqueue/wait paths (which include type(exc).__name__), consider error=f"Malformed executor result ({type(exc).__name__}) for reply_key {reply_key}".
There was a problem hiding this comment.
Fixed in 6e3e5c6 — the malformed-result failure now includes type(exc).__name__, matching the enqueue/wait paths.
| in-flight RPC; structure_tool dispatches are sequential) and closes it on | ||
| exit, so a long RPC never leaks a connection. | ||
| """ | ||
| with PgResultBackend() as rb: |
There was a problem hiding this comment.
[Low — doc/concurrency note] The docstring ("structure_tool dispatches are sequential", "one per in-flight RPC") is true within one task, but the file_processing worker runs --pool=prefork with WORKER_FILE_PROCESSING_CONCURRENCY (default 4), so up to 4 processes can each hold a PgResultBackend connection open for the full RPC (up to EXECUTOR_RESULT_TIMEOUT, default 3600s). Bounded by concurrency and not a leak (context manager closes on exit), but it differs from the backend twin, which calls close_old_connections() between polls. Worth a one-line note that the pin is bounded by worker concurrency, or release the connection between polls if file_processing concurrency is ever raised.
There was a problem hiding this comment.
Fixed in 6e3e5c6 — added a docstring note that the pin is bounded by file_processing prefork concurrency (default 4), and to release between polls like the backend twin if that concurrency is raised materially. Kept connection-per-wait for now (simpler; PgResultBackend owns + closes it).
| reply_key=reply_key, | ||
| ) | ||
| with PgQueueClient() as client: | ||
| client.send(queue, payload, org_id=org_id) |
There was a problem hiding this comment.
[Medium — test gap] _enqueue is mocked in all 11 dispatch tests, so nothing verifies the actual PG-transport wiring: that it builds to_payload(_EXECUTE_TASK, args=[context.to_dict()], queue=queue, reply_key=reply_key) and passes org_id=org to client.send. Since the queue name, the args shape, and org_id are the only routing/identity info on the PG path (Celery headers are dropped), a bug here misroutes or breaks org-fairness with no test catching it. Suggest a test that patches PgQueueClient and asserts client.send is called with the queue, org_id, and a payload carrying task_name/args/reply_key.
There was a problem hiding this comment.
Fixed in 6e3e5c6 — added TestPgExecutionDispatcherEnqueueWiring that patches PgQueueClient and asserts client.send gets the queue (celery_executor_legacy), org_id, and a payload carrying task_name/args/reply_key (to_payload runs for real).
| # so the fairness headers are intentionally not forwarded here (parity | ||
| # with the backend executor RPC). | ||
| return self._pg.dispatch(context, timeout=timeout) | ||
| return self._celery.dispatch(context, timeout=timeout, headers=headers) |
There was a problem hiding this comment.
[Medium — test gap] The routing tests assert only which sub-dispatcher is called, never the arguments. Two PR claims are unverified: (a) gate-OFF forwards headers=/timeout= through to celery.dispatch unchanged (the zero-regression claim is incomplete without this), and (b) gate-ON calls pg.dispatch without headers (the intentional drop documented just above). Suggest asserting celery.dispatch.assert_called_once_with(..., timeout=9, headers={...}) in the gate-off test and "headers" not in pg.dispatch.call_args.kwargs in the gate-on test.
There was a problem hiding this comment.
Fixed in 6e3e5c6 — routing tests now assert arguments: gate-off forwards timeout= and headers= to celery.dispatch unchanged; gate-on calls pg.dispatch with timeout= and "headers" not in kwargs (the intentional drop).
| # Only set for request-reply dispatches — keeps fire-and-forget rows | ||
| # byte-identical to before this field existed (mirrors the backend producer). | ||
| if reply_key is not None: | ||
| payload["reply_key"] = reply_key |
There was a problem hiding this comment.
[Medium — test gap] This new reply_key branch is the producer side of the entire RPC contract, but TestToPayload (in test_dispatch_pg.py) predates it and never passes reply_key. A regression that drops or mis-keys the field makes the consumer (payload.get("reply_key")) silently treat the message as fire-and-forget, and the caller blocks until timeout. Add two cases: to_payload("t", reply_key="rk")["reply_key"] == "rk", and "reply_key" not in to_payload("t") (the byte-identical fire-and-forget guarantee the docstring claims).
There was a problem hiding this comment.
Fixed in 6e3e5c6 — added two TestToPayload cases: reply_key set when passed, and absent when not (the byte-identical fire-and-forget guarantee).
| dispatcher = ExecutionDispatcher(celery_app=app) | ||
| # Gate-routed: PG executor RPC when pg_queue_enabled is on, else the unchanged | ||
| # Celery ExecutionDispatcher (zero-regression by construction — see executor_rpc). | ||
| dispatcher = get_executor_dispatcher(celery_app=app) |
There was a problem hiding this comment.
[Low — test gap] This is the actual call-site swap the PR makes (direct ExecutionDispatcher → get_executor_dispatcher(celery_app=app) returning a RoutingExecutionDispatcher), but no test verifies it. test_structure_tool_task.py only covers _fairness_headers in isolation. If get_executor_dispatcher were mis-imported or returned the wrong type, nothing fails. Suggest patching structure_tool_task.get_executor_dispatcher and asserting _execute_structure_tool_impl calls it with celery_app=app and uses the result as the dispatcher.
There was a problem hiding this comment.
Fixed in 6e3e5c6 — added TestDispatcherFactory asserting _execute_structure_tool_impl builds its dispatcher via get_executor_dispatcher(celery_app=app).
| # executor consumer (``pg_queue_consumer`` loads ``tasks.py`` but not | ||
| # ``worker.py``), which would otherwise hit "No executor registered". Import is | ||
| # idempotent (module cached), so the Celery path importing it twice is harmless. | ||
| import executor.executors # noqa: E402, F401 |
There was a problem hiding this comment.
[Low — comment clarity] Verified: this import is necessary and not redundant — the PG consumer bootstraps via root worker.py → WorkerBuilder → importlib.import_module("executor.tasks") (WorkerType.to_import_path() returns executor.tasks), which does not import executor/worker.py where import executor.executors historically lived. So the fix is correct. Only the wording can mislead: pg_queue_consumer/__main__.py literally does import worker (the root worker), so "loads tasks.py but not worker.py" reads as contradictory. Suggest clarifying it means executor/worker.py (the Celery entrypoint), e.g. "...loads executor/tasks.py (via the root-worker bootstrap) but not executor/worker.py...".
There was a problem hiding this comment.
Fixed in 6e3e5c6 — reworded the comment to name executor/worker.py (the Celery entrypoint the PG-consumer bootstrap skips) instead of the ambiguous worker.py. Thanks for confirming the fix is correct.
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/executor_rpc.py | New file implementing PG-transport dispatcher routing; gate logic, never-raises contract, and the three dispatch paths are all well-handled. |
| workers/executor/tasks.py | Adds side-effect import of executor.executors to ensure the registry is populated for the PG consumer path; well-justified and idempotent. |
| workers/file_processing/structure_tool_task.py | Swaps raw ExecutionDispatcher for the gate-routed get_executor_dispatcher factory; also narrows the _run_agentic_extraction type hint from ExecutionDispatcher to RoutingExecutionDispatcher. |
| workers/tests/test_executor_rpc.py | Comprehensive DB-free tests for gate matrix, never-raises dispatch, enqueue wiring, and zero-regression routing; good coverage of timeout/failure/malformed-result paths. |
| workers/tests/test_executor_registration.py | Subprocess regression test for executor registry population; no timeout on subprocess.run, which can stall CI if any import blocks on a missing service. |
| workers/tests/test_structure_tool_task.py | New TestDispatcherFactory class pins the call-site swap to get_executor_dispatcher; cleanly stops before heavy tool-metadata work. |
| workers/tests/test_dispatch_pg.py | Two new unit tests verify reply_key round-trips correctly and that fire-and-forget payloads remain unchanged. |
| backend/sample.env | Fixes stale flag name pg_queue_execution_enabled → pg_queue_enabled in the comment block. |
| workers/sample.env | Documents the worker-side PG_QUEUE_TRANSPORT_ENABLED gate with clear three-condition description; safe addition. |
| workers/queue_backend/pg_queue/task_payload.py | Adds optional reply_key parameter to to_payload; fire-and-forget payloads remain byte-identical, and the new field is gated behind a None check. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant ST as structure_tool_task
participant RD as RoutingExecutionDispatcher
participant Gate as resolve_executor_transport
participant Flipt as Flipt
participant PG as PgExecutionDispatcher
participant Queue as pg_queue_message
participant Worker as worker-pg-executor
participant Result as pg_task_result
participant Celery as ExecutionDispatcher (Celery)
ST->>RD: dispatch(context)
RD->>Gate: resolve_executor_transport(context)
Gate->>Gate: check PG_QUEUE_TRANSPORT_ENABLED env
alt gate OFF
Gate-->>RD: False
RD->>Celery: dispatch(context, timeout, headers)
Celery-->>ST: ExecutionResult
else gate ON
Gate->>Flipt: check_feature_flag_status(pg_queue_enabled)
alt flag error / Flipt unavailable
Gate-->>RD: False (fail-closed)
RD->>Celery: dispatch(context, timeout, headers)
Celery-->>ST: ExecutionResult
else flag ON
Gate-->>RD: True
RD->>PG: dispatch(context, timeout)
PG->>Queue: INSERT (execute_extraction + reply_key)
Queue->>Worker: consume message
Worker->>Result: INSERT (status, result, reply_key)
PG->>Result: poll wait_for_result(reply_key, timeout)
alt result received
Result-->>PG: "row {status, result}"
PG-->>ST: ExecutionResult (success or failure)
else timeout
PG-->>ST: ExecutionResult.failure(TimeoutError)
end
end
end
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant ST as structure_tool_task
participant RD as RoutingExecutionDispatcher
participant Gate as resolve_executor_transport
participant Flipt as Flipt
participant PG as PgExecutionDispatcher
participant Queue as pg_queue_message
participant Worker as worker-pg-executor
participant Result as pg_task_result
participant Celery as ExecutionDispatcher (Celery)
ST->>RD: dispatch(context)
RD->>Gate: resolve_executor_transport(context)
Gate->>Gate: check PG_QUEUE_TRANSPORT_ENABLED env
alt gate OFF
Gate-->>RD: False
RD->>Celery: dispatch(context, timeout, headers)
Celery-->>ST: ExecutionResult
else gate ON
Gate->>Flipt: check_feature_flag_status(pg_queue_enabled)
alt flag error / Flipt unavailable
Gate-->>RD: False (fail-closed)
RD->>Celery: dispatch(context, timeout, headers)
Celery-->>ST: ExecutionResult
else flag ON
Gate-->>RD: True
RD->>PG: dispatch(context, timeout)
PG->>Queue: INSERT (execute_extraction + reply_key)
Queue->>Worker: consume message
Worker->>Result: INSERT (status, result, reply_key)
PG->>Result: poll wait_for_result(reply_key, timeout)
alt result received
Result-->>PG: "row {status, result}"
PG-->>ST: ExecutionResult (success or failure)
else timeout
PG-->>ST: ExecutionResult.failure(TimeoutError)
end
end
end
Reviews (2): Last reviewed commit: "UN-3605 [GATED-FEAT] Address PR review (..." | Re-trigger Greptile
Hardening + test-coverage from the review:
- dispatch: row.get("status") (never-raises must not depend on producer keys);
accept-and-ignore headers param for substitutability with the SDK/Routing
dispatch shapes; log the EXECUTOR_RESULT_TIMEOUT parse fallback instead of
swallowing it; surface the parse cause in the malformed-result error.
- timeout branch: document the orphaned-task / retry-double-run risk (greptile);
de-dup belongs at the file-execution layer (at-least-once + caller-timeout).
- _wait_for_result: note the connection pin is bounded by file_processing
prefork concurrency (vs the backend twin's close_old_connections per poll).
- executor/tasks.py: reword the registration-import comment (it's executor/
worker.py, the Celery entrypoint, that the PG consumer bootstrap skips).
- tests: org-less resolve bucketing (entity_id=run_id, no org in context);
real _enqueue wiring (queue/org_id/payload via PgQueueClient.send); routing
arg passthrough (gate-off forwards timeout+headers; gate-on drops headers);
to_payload reply_key set/omitted; structure_tool factory-swap call site.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review addressed —
|
SonarCloud — duplication on new code (
|
5a7d3d1
into
feat/UN-3445-pg-queue-integration
|
What
Routes the in-workflow structure_tool executor dispatch onto the Postgres executor RPC when the single
pg_queue_enabledflag is on; Celery otherwise (zero-regression by construction). Next slice after UN-3603 (prompt-studio blocking path). Gated off by default; lands on the PG-queue integration branch (not main).Changes
workers/queue_backend/pg_queue/executor_rpc.py(new) — workers twin of the backend module on worker primitives:resolve_executor_transport— masterPG_QUEUE_TRANSPORT_ENABLEDenv, then the singlepg_queue_enabledFlipt flag; fail-closed to Celery.PgExecutionDispatcher— enqueueexecute_extractionwith a uniquereply_keyviaPgQueueClient, pollPgResultBackend.wait_for_result; never raises (timeout/failure →ExecutionResult.failure).RoutingExecutionDispatcher— per-call gate routing;dispatch_async/dispatch_with_callbackstay on Celery (later slice).get_executor_dispatcherfactory.to_payloadgains an optionalreply_key(request-reply marker; fire-and-forget rows stay byte-identical).structure_tool_taskswaps its dispatcher factory toget_executor_dispatcher; the 3 blocking dispatch call sites are unchanged.No executor registered with name 'legacy'. The@ExecutorRegistry.registerside-effect import lived only in the Celeryexecutor/worker.py, but the PG consumer bootstraps viaexecutor/tasks.py. Importingexecutor.executorsfromexecutor/tasks.pymeans any worker registeringexecute_extraction(Celery or PG) has the executors — also fixes the prompt-studio PG path from UN-3603.sample.env— document the worker-sidePG_QUEUE_TRANSPORT_ENABLEDgate; fix a stalepg_queue_execution_enabled→pg_queue_enableddoc reference.Zero-regression
Gate off →
resolve_executor_transportreturns before any Flipt call →RoutingExecutionDispatcherdelegates to the unchanged SDKExecutionDispatcher(Celery). Nopg_task_resultrow. Flipping the flag is an instant, no-redeploy rollout/rollback (read per dispatch).Tests
workers/tests/test_executor_rpc.py— gate fail-closed matrix, zero-regression routing (gate off → Celery delegate, no enqueue), never-raises dispatch cases.workers/tests/test_executor_registration.py— subprocess regression asserting that importingexecutor.tasksalone registers the executors (mirrors the PG consumer bootstrap).Dev-tested end-to-end (gated, running stack)
Executor RPC → PG transportlogged →execute_extractionruns onworker-pg-executor→pg_task_resultcompletedrows (reply_keys match) → file executions + workflow COMPLETED.pg_task_resultrow.Out of scope (separate follow-ups)
worker-pg-fileprocsingle-consumer serializes multi-file fan-out on the PG path (own ticket).🤖 Generated with Claude Code