Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3605 [GATED-FEAT] PG Queue — structure_tool executor RPC over Postgres (+ register executors in PG consumer)#2095

Merged
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3605-pg-structure-tool-executor-rpcZipstack/unstract:feat/UN-3605-pg-structure-tool-executor-rpcCopy head branch name to clipboard
Jun 20, 2026
Merged

UN-3605 [GATED-FEAT] PG Queue — structure_tool executor RPC over Postgres (+ register executors in PG consumer)#2095
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3605-pg-structure-tool-executor-rpcZipstack/unstract:feat/UN-3605-pg-structure-tool-executor-rpcCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Routes the in-workflow structure_tool executor dispatch onto the Postgres executor RPC when the single pg_queue_enabled flag is on; Celery otherwise (zero-regression by construction). Next slice after UN-3603 (prompt-studio blocking path). Gated off by default; lands on the PG-queue integration branch (not main).

Changes

  • workers/queue_backend/pg_queue/executor_rpc.py (new) — workers twin of the backend module on worker primitives:
    • resolve_executor_transport — master PG_QUEUE_TRANSPORT_ENABLED env, then the single pg_queue_enabled Flipt flag; fail-closed to Celery.
    • PgExecutionDispatcher — enqueue execute_extraction with a unique reply_key via PgQueueClient, poll PgResultBackend.wait_for_result; never raises (timeout/failure → ExecutionResult.failure).
    • RoutingExecutionDispatcher — per-call gate routing; dispatch_async/dispatch_with_callback stay on Celery (later slice). get_executor_dispatcher factory.
  • to_payload gains an optional reply_key (request-reply marker; fire-and-forget rows stay byte-identical).
  • structure_tool_task swaps its dispatcher factory to get_executor_dispatcher; the 3 blocking dispatch call sites are unchanged.
  • Fix (latent gap exposed by this slice): the PG executor consumer registered no executorsNo executor registered with name 'legacy'. The @ExecutorRegistry.register side-effect import lived only in the Celery executor/worker.py, but the PG consumer bootstraps via executor/tasks.py. Importing executor.executors from executor/tasks.py means any worker registering execute_extraction (Celery or PG) has the executors — also fixes the prompt-studio PG path from UN-3603.
  • sample.env — document the worker-side PG_QUEUE_TRANSPORT_ENABLED gate; fix a stale pg_queue_execution_enabledpg_queue_enabled doc reference.

Zero-regression

Gate off → resolve_executor_transport returns before any Flipt call → RoutingExecutionDispatcher delegates to the unchanged SDK ExecutionDispatcher (Celery). No pg_task_result row. Flipping the flag is an instant, no-redeploy rollout/rollback (read per dispatch).

Tests

  • workers/tests/test_executor_rpc.py — gate fail-closed matrix, zero-regression routing (gate off → Celery delegate, no enqueue), never-raises dispatch cases.
  • workers/tests/test_executor_registration.py — subprocess regression asserting that importing executor.tasks alone registers the executors (mirrors the PG consumer bootstrap).

Dev-tested end-to-end (gated, running stack)

  • Flag ONExecutor RPC → PG transport logged → execute_extraction runs on worker-pg-executorpg_task_result completed rows (reply_keys match) → file executions + workflow COMPLETED.
  • Flag OFF → Celery executor path, no pg_task_result row.

Out of scope (separate follow-ups)

  • worker-pg-fileproc single-consumer serializes multi-file fan-out on the PG path (own ticket).
  • Continuation/callback path and Celery decommission are later slices.

🤖 Generated with Claude Code

…gres (+ register executors in PG consumer)

Routes the in-workflow structure_tool executor dispatch onto the Postgres
executor RPC when the single pg_queue_enabled flag is on; Celery otherwise
(zero-regression by construction). Next slice after UN-3603 (prompt-studio
blocking path). Gated off by default.

- workers queue_backend/pg_queue/executor_rpc.py (new): the workers twin of
  the backend module on worker primitives — resolve_executor_transport
  (master PG_QUEUE_TRANSPORT_ENABLED env, then the single pg_queue_enabled
  Flipt flag, fail-closed to Celery); PgExecutionDispatcher (enqueue
  execute_extraction with a unique reply_key via PgQueueClient, poll
  PgResultBackend; never raises — timeout/failure -> ExecutionResult.failure);
  RoutingExecutionDispatcher (per-call gate routing; async/callback stay on
  Celery); get_executor_dispatcher factory.
- to_payload gains an optional reply_key (request-reply marker; fire-and-forget
  rows stay byte-identical).
- structure_tool_task swaps its dispatcher factory to get_executor_dispatcher;
  the 3 blocking dispatch call sites are unchanged.
- Fix (latent gap exposed by this slice): the PG executor consumer registered
  no executors ("No executor registered with name 'legacy'"). The
  @ExecutorRegistry.register side-effect import lived only in the Celery
  executor/worker.py, but the PG consumer bootstraps via executor/tasks.py.
  Import executor.executors from executor/tasks.py so any worker registering
  execute_extraction (Celery or PG) has the executors — also fixes the
  prompt-studio PG path from UN-3603.
- sample.env: document the worker-side PG_QUEUE_TRANSPORT_ENABLED gate; fix a
  stale pg_queue_execution_enabled -> pg_queue_enabled doc reference.
- Tests: workers executor_rpc unit suite (gate fail-closed matrix,
  zero-regression routing, never-raises dispatch); subprocess regression that
  importing executor.tasks alone registers the executors.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3def3a22-3bb9-4e0f-b174-9f86f71ad524

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/UN-3605-pg-structure-tool-executor-rpc

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR Review Toolkit pass (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier).

Overall: a careful, well-documented, faithful workers-side port of the backend executor RPC. The gating, the never-raises contract, and the zero-regression-when-off property are correct. No blocking bugs. Findings below are hardening + test-coverage items; two HIGH/MED items (corrupt METADATA.json swallow at structure_tool_task.py:~858; stale ~508 line-ref at :~453) fall outside this PR's diff and are in the summary rather than inline.

error=f"TimeoutError: executor reply not received within {timeout}s"
)
if (
row["status"] == PgTaskStatus.COMPLETED.value

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium — correctness / never-raises] row["status"] uses bracket access (KeyError-raising), while result/error below use .get(). This access is not inside the malformed-result try (that only guards from_dict), so a result row missing status would raise straight out of dispatch() and violate the documented never-raises contract.

if (
    row.get("status") == PgTaskStatus.COMPLETED.value
    and row.get("result") is not None
):

Producer (result_backend.get_result) always sets status today, so this is latent — but the contract should not depend on the producer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6e3e5c6row.get("status") now (result/error already used .get), so a status-less row can't raise out of dispatch(). Never-raises no longer depends on the producer writing every key.

try:
timeout = int(os.environ.get(_DEFAULT_TIMEOUT_ENV, _DEFAULT_TIMEOUT))
except (TypeError, ValueError):
timeout = _DEFAULT_TIMEOUT

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium — silent config masking] Guarding the parse so a bad EXECUTOR_RESULT_TIMEOUT can't break the never-raises contract is right, but the swallow is silent: an operator who fat-fingers EXECUTOR_RESULT_TIMEOUT=3O00 gets the 3600s default with no signal, and every RPC may wait up to an hour instead of the intended value. Log the fallback:

except (TypeError, ValueError):
    logger.warning(
        "Invalid %s=%r; falling back to %ss",
        _DEFAULT_TIMEOUT_ENV, os.environ.get(_DEFAULT_TIMEOUT_ENV), _DEFAULT_TIMEOUT,
    )
    timeout = _DEFAULT_TIMEOUT

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6e3e5c6 — the EXECUTOR_RESULT_TIMEOUT parse fallback now logs a WARNING (with the bad value) before defaulting to 3600s.

self,
context: ExecutionContext,
timeout: int | None = None,
) -> ExecutionResult:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium — substitutability / type design] PgExecutionDispatcher.dispatch(context, timeout) omits the headers param that both the SDK ExecutionDispatcher.dispatch and RoutingExecutionDispatcher.dispatch accept. Today only RoutingExecutionDispatcher is handed out (and it drops headers on the PG path intentionally), so this is latent — but anyone calling a PgExecutionDispatcher directly (the tests do) gets a non-substitutable object, and nothing static catches it. Minimum fix: add headers: dict[str, Any] | None = None (accept-and-ignore) so the dispatch shapes match. Better: define an ExecutorDispatcher Protocol in unstract.core and annotate the SDK base + both PG classes against it — structural, so it preserves the "no SDK edit" constraint while letting mypy verify the duck-type.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6e3e5c6 — added headers: dict | None = None (accept-and-ignore) to PgExecutionDispatcher.dispatch so the shape matches the SDK/Routing dispatch. The shared ExecutorDispatcher Protocol in unstract.core is the broader TODO(shared) (same place the de-dup would land).

context.run_id,
)
return ExecutionResult.failure(
error=f"Malformed executor result for reply_key {reply_key}"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] The logger.exception captures the traceback (good), but the caller-facing ExecutionResult.failure only says Malformed executor result for reply_key {reply_key} — the actual parse cause isn't surfaced to a UI reading result.error. For parity with the enqueue/wait paths (which include type(exc).__name__), consider error=f"Malformed executor result ({type(exc).__name__}) for reply_key {reply_key}".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6e3e5c6 — the malformed-result failure now includes type(exc).__name__, matching the enqueue/wait paths.

in-flight RPC; structure_tool dispatches are sequential) and closes it on
exit, so a long RPC never leaks a connection.
"""
with PgResultBackend() as rb:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low — doc/concurrency note] The docstring ("structure_tool dispatches are sequential", "one per in-flight RPC") is true within one task, but the file_processing worker runs --pool=prefork with WORKER_FILE_PROCESSING_CONCURRENCY (default 4), so up to 4 processes can each hold a PgResultBackend connection open for the full RPC (up to EXECUTOR_RESULT_TIMEOUT, default 3600s). Bounded by concurrency and not a leak (context manager closes on exit), but it differs from the backend twin, which calls close_old_connections() between polls. Worth a one-line note that the pin is bounded by worker concurrency, or release the connection between polls if file_processing concurrency is ever raised.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6e3e5c6 — added a docstring note that the pin is bounded by file_processing prefork concurrency (default 4), and to release between polls like the backend twin if that concurrency is raised materially. Kept connection-per-wait for now (simpler; PgResultBackend owns + closes it).

reply_key=reply_key,
)
with PgQueueClient() as client:
client.send(queue, payload, org_id=org_id)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium — test gap] _enqueue is mocked in all 11 dispatch tests, so nothing verifies the actual PG-transport wiring: that it builds to_payload(_EXECUTE_TASK, args=[context.to_dict()], queue=queue, reply_key=reply_key) and passes org_id=org to client.send. Since the queue name, the args shape, and org_id are the only routing/identity info on the PG path (Celery headers are dropped), a bug here misroutes or breaks org-fairness with no test catching it. Suggest a test that patches PgQueueClient and asserts client.send is called with the queue, org_id, and a payload carrying task_name/args/reply_key.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6e3e5c6 — added TestPgExecutionDispatcherEnqueueWiring that patches PgQueueClient and asserts client.send gets the queue (celery_executor_legacy), org_id, and a payload carrying task_name/args/reply_key (to_payload runs for real).

# so the fairness headers are intentionally not forwarded here (parity
# with the backend executor RPC).
return self._pg.dispatch(context, timeout=timeout)
return self._celery.dispatch(context, timeout=timeout, headers=headers)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium — test gap] The routing tests assert only which sub-dispatcher is called, never the arguments. Two PR claims are unverified: (a) gate-OFF forwards headers=/timeout= through to celery.dispatch unchanged (the zero-regression claim is incomplete without this), and (b) gate-ON calls pg.dispatch without headers (the intentional drop documented just above). Suggest asserting celery.dispatch.assert_called_once_with(..., timeout=9, headers={...}) in the gate-off test and "headers" not in pg.dispatch.call_args.kwargs in the gate-on test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6e3e5c6 — routing tests now assert arguments: gate-off forwards timeout= and headers= to celery.dispatch unchanged; gate-on calls pg.dispatch with timeout= and "headers" not in kwargs (the intentional drop).

# Only set for request-reply dispatches — keeps fire-and-forget rows
# byte-identical to before this field existed (mirrors the backend producer).
if reply_key is not None:
payload["reply_key"] = reply_key

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium — test gap] This new reply_key branch is the producer side of the entire RPC contract, but TestToPayload (in test_dispatch_pg.py) predates it and never passes reply_key. A regression that drops or mis-keys the field makes the consumer (payload.get("reply_key")) silently treat the message as fire-and-forget, and the caller blocks until timeout. Add two cases: to_payload("t", reply_key="rk")["reply_key"] == "rk", and "reply_key" not in to_payload("t") (the byte-identical fire-and-forget guarantee the docstring claims).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6e3e5c6 — added two TestToPayload cases: reply_key set when passed, and absent when not (the byte-identical fire-and-forget guarantee).

dispatcher = ExecutionDispatcher(celery_app=app)
# Gate-routed: PG executor RPC when pg_queue_enabled is on, else the unchanged
# Celery ExecutionDispatcher (zero-regression by construction — see executor_rpc).
dispatcher = get_executor_dispatcher(celery_app=app)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low — test gap] This is the actual call-site swap the PR makes (direct ExecutionDispatcherget_executor_dispatcher(celery_app=app) returning a RoutingExecutionDispatcher), but no test verifies it. test_structure_tool_task.py only covers _fairness_headers in isolation. If get_executor_dispatcher were mis-imported or returned the wrong type, nothing fails. Suggest patching structure_tool_task.get_executor_dispatcher and asserting _execute_structure_tool_impl calls it with celery_app=app and uses the result as the dispatcher.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6e3e5c6 — added TestDispatcherFactory asserting _execute_structure_tool_impl builds its dispatcher via get_executor_dispatcher(celery_app=app).

Comment thread workers/executor/tasks.py
# executor consumer (``pg_queue_consumer`` loads ``tasks.py`` but not
# ``worker.py``), which would otherwise hit "No executor registered". Import is
# idempotent (module cached), so the Celery path importing it twice is harmless.
import executor.executors # noqa: E402, F401

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low — comment clarity] Verified: this import is necessary and not redundant — the PG consumer bootstraps via root worker.pyWorkerBuilderimportlib.import_module("executor.tasks") (WorkerType.to_import_path() returns executor.tasks), which does not import executor/worker.py where import executor.executors historically lived. So the fix is correct. Only the wording can mislead: pg_queue_consumer/__main__.py literally does import worker (the root worker), so "loads tasks.py but not worker.py" reads as contradictory. Suggest clarifying it means executor/worker.py (the Celery entrypoint), e.g. "...loads executor/tasks.py (via the root-worker bootstrap) but not executor/worker.py...".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6e3e5c6 — reworded the comment to name executor/worker.py (the Celery entrypoint the PG-consumer bootstrap skips) instead of the ambiguous worker.py. Thanks for confirming the fix is correct.

@greptile-apps

greptile-apps Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR routes the in-workflow structure_tool executor dispatch onto a Postgres request-reply transport when the pg_queue_enabled Flipt flag is on, and also fixes a latent executor-registry gap where the PG consumer would fail with "No executor registered" because the side-effect import previously lived only in the Celery entrypoint.

  • executor_rpc.py (new): Adds PgExecutionDispatcher (enqueue + poll pattern with a unique reply_key), RoutingExecutionDispatcher (per-call PG-vs-Celery gate), and resolve_executor_transport (master env gate → Flipt flag, fails closed to Celery); the structure_tool call site is unchanged beyond swapping to the get_executor_dispatcher factory.
  • executor/tasks.py: Moves import executor.executors from the Celery-only worker.py entrypoint into tasks.py so the @ExecutorRegistry.register side effects run for any consumer (Celery or PG) that imports the task module.
  • Tests: DB-free gate-matrix and never-raises contract tests in test_executor_rpc.py, a subprocess regression in test_executor_registration.py that verifies the registry fix, and a call-site factory-swap test in test_structure_tool_task.py.

Confidence Score: 5/5

Safe to merge onto the integration branch; the gate defaults to off so no production path is affected until PG_QUEUE_TRANSPORT_ENABLED=True and the Flipt flag are both enabled.

All three dispatch modes (gate off, flag off, flag on) are covered by DB-free unit tests; the never-raises contract is pinned for enqueue failure, wait failure, timeout, and malformed rows. The executor-registry fix is regression-tested in a fresh subprocess. The only non-test observation is that PgExecutionDispatcher is eagerly constructed even when the gate is off, which is harmless today but is a forward compatibility note.

No files require special attention; test_executor_registration.py lacks a subprocess timeout but this is a pre-existing pattern note rather than a correctness concern.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/executor_rpc.py New file implementing PG-transport dispatcher routing; gate logic, never-raises contract, and the three dispatch paths are all well-handled.
workers/executor/tasks.py Adds side-effect import of executor.executors to ensure the registry is populated for the PG consumer path; well-justified and idempotent.
workers/file_processing/structure_tool_task.py Swaps raw ExecutionDispatcher for the gate-routed get_executor_dispatcher factory; also narrows the _run_agentic_extraction type hint from ExecutionDispatcher to RoutingExecutionDispatcher.
workers/tests/test_executor_rpc.py Comprehensive DB-free tests for gate matrix, never-raises dispatch, enqueue wiring, and zero-regression routing; good coverage of timeout/failure/malformed-result paths.
workers/tests/test_executor_registration.py Subprocess regression test for executor registry population; no timeout on subprocess.run, which can stall CI if any import blocks on a missing service.
workers/tests/test_structure_tool_task.py New TestDispatcherFactory class pins the call-site swap to get_executor_dispatcher; cleanly stops before heavy tool-metadata work.
workers/tests/test_dispatch_pg.py Two new unit tests verify reply_key round-trips correctly and that fire-and-forget payloads remain unchanged.
backend/sample.env Fixes stale flag name pg_queue_execution_enabled → pg_queue_enabled in the comment block.
workers/sample.env Documents the worker-side PG_QUEUE_TRANSPORT_ENABLED gate with clear three-condition description; safe addition.
workers/queue_backend/pg_queue/task_payload.py Adds optional reply_key parameter to to_payload; fire-and-forget payloads remain byte-identical, and the new field is gated behind a None check.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant ST as structure_tool_task
    participant RD as RoutingExecutionDispatcher
    participant Gate as resolve_executor_transport
    participant Flipt as Flipt
    participant PG as PgExecutionDispatcher
    participant Queue as pg_queue_message
    participant Worker as worker-pg-executor
    participant Result as pg_task_result
    participant Celery as ExecutionDispatcher (Celery)

    ST->>RD: dispatch(context)
    RD->>Gate: resolve_executor_transport(context)
    Gate->>Gate: check PG_QUEUE_TRANSPORT_ENABLED env
    alt gate OFF
        Gate-->>RD: False
        RD->>Celery: dispatch(context, timeout, headers)
        Celery-->>ST: ExecutionResult
    else gate ON
        Gate->>Flipt: check_feature_flag_status(pg_queue_enabled)
        alt flag error / Flipt unavailable
            Gate-->>RD: False (fail-closed)
            RD->>Celery: dispatch(context, timeout, headers)
            Celery-->>ST: ExecutionResult
        else flag ON
            Gate-->>RD: True
            RD->>PG: dispatch(context, timeout)
            PG->>Queue: INSERT (execute_extraction + reply_key)
            Queue->>Worker: consume message
            Worker->>Result: INSERT (status, result, reply_key)
            PG->>Result: poll wait_for_result(reply_key, timeout)
            alt result received
                Result-->>PG: "row {status, result}"
                PG-->>ST: ExecutionResult (success or failure)
            else timeout
                PG-->>ST: ExecutionResult.failure(TimeoutError)
            end
        end
    end
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant ST as structure_tool_task
    participant RD as RoutingExecutionDispatcher
    participant Gate as resolve_executor_transport
    participant Flipt as Flipt
    participant PG as PgExecutionDispatcher
    participant Queue as pg_queue_message
    participant Worker as worker-pg-executor
    participant Result as pg_task_result
    participant Celery as ExecutionDispatcher (Celery)

    ST->>RD: dispatch(context)
    RD->>Gate: resolve_executor_transport(context)
    Gate->>Gate: check PG_QUEUE_TRANSPORT_ENABLED env
    alt gate OFF
        Gate-->>RD: False
        RD->>Celery: dispatch(context, timeout, headers)
        Celery-->>ST: ExecutionResult
    else gate ON
        Gate->>Flipt: check_feature_flag_status(pg_queue_enabled)
        alt flag error / Flipt unavailable
            Gate-->>RD: False (fail-closed)
            RD->>Celery: dispatch(context, timeout, headers)
            Celery-->>ST: ExecutionResult
        else flag ON
            Gate-->>RD: True
            RD->>PG: dispatch(context, timeout)
            PG->>Queue: INSERT (execute_extraction + reply_key)
            Queue->>Worker: consume message
            Worker->>Result: INSERT (status, result, reply_key)
            PG->>Result: poll wait_for_result(reply_key, timeout)
            alt result received
                Result-->>PG: "row {status, result}"
                PG-->>ST: ExecutionResult (success or failure)
            else timeout
                PG-->>ST: ExecutionResult.failure(TimeoutError)
            end
        end
    end
Loading

Reviews (2): Last reviewed commit: "UN-3605 [GATED-FEAT] Address PR review (..." | Re-trigger Greptile

Comment thread workers/queue_backend/pg_queue/executor_rpc.py
Hardening + test-coverage from the review:
- dispatch: row.get("status") (never-raises must not depend on producer keys);
  accept-and-ignore headers param for substitutability with the SDK/Routing
  dispatch shapes; log the EXECUTOR_RESULT_TIMEOUT parse fallback instead of
  swallowing it; surface the parse cause in the malformed-result error.
- timeout branch: document the orphaned-task / retry-double-run risk (greptile);
  de-dup belongs at the file-execution layer (at-least-once + caller-timeout).
- _wait_for_result: note the connection pin is bounded by file_processing
  prefork concurrency (vs the backend twin's close_old_connections per poll).
- executor/tasks.py: reword the registration-import comment (it's executor/
  worker.py, the Celery entrypoint, that the PG consumer bootstrap skips).
- tests: org-less resolve bucketing (entity_id=run_id, no org in context);
  real _enqueue wiring (queue/org_id/payload via PgQueueClient.send); routing
  arg passthrough (gate-off forwards timeout+headers; gate-on drops headers);
  to_payload reply_key set/omitted; structure_tool factory-swap call site.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review addressed — 6e3e5c6c2

Thanks for the thorough pass. All toolkit + greptile items addressed; 30 workers tests green, pre-commit clean.

Code (executor_rpc.py):

  • row.get("status") so a status-less row can't break the never-raises contract.
  • dispatch accepts (and ignores) headers — substitutable with the SDK/Routing shapes.
  • EXECUTOR_RESULT_TIMEOUT parse fallback now logs a WARNING instead of swallowing.
  • Malformed-result error includes type(exc).__name__.
  • Docstring note: the poll connection pin is bounded by file_processing prefork concurrency (vs the backend twin's close_old_connections per poll).
  • Timeout branch: explicit orphaned-task / retry-double-run note (greptile) — de-dup belongs at the file-execution layer; transport stays at-least-once + caller-timeout.
  • executor/tasks.py: reworded the registration-import comment (executor/worker.py, not the ambiguous worker.py).

Tests (the coverage gaps):

  • org-less resolve bucketing (entity_id=run_id, no org in context)
  • real _enqueue wiring (queue / org_id / payload via PgQueueClient.send)
  • routing arg passthrough (gate-off forwards timeout+headers; gate-on drops headers)
  • to_payload reply_key set/omitted
  • structure_tool factory-swap call site

SonarCloud — 20.4% duplication on new code: this is the intentional workers-twin of backend/pg_queue/executor_rpc.py (two transports on different primitives — psycopg2 + env here vs Django ORM + settings there). A true de-dup is the TODO(shared) already noted in the module: lift resolve_executor_transport + the reply_key/poll contract into unstract.core and annotate an ExecutorDispatcher Protocol — a deliberately separate slice (it also touches the already-merged backend module). Happy to do that extraction as a follow-up if we'd rather see the gate green now; flagging rather than expanding this slice's scope.

Out-of-diff items from the toolkit summary (corrupt METADATA.json swallow ~structure_tool_task.py:858, stale line-ref) are outside this PR's changes — leaving them for their own change.

@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud — duplication on new code (executor_rpc.py, 24.3%): intentional, accepting for this slice

workers/queue_backend/pg_queue/executor_rpc.py is the deliberate workers-twin of backend/pg_queue/executor_rpc.py — two transport adapters (psycopg2 + env here vs Django ORM + Django settings there) sharing the same contract. I looked at extracting the shared logic into one place; the dependency graph makes a clean single-home not available without a new package:

  • The shared logic needs both unstract.flags (check_feature_flag_status) and unstract.sdk1 (ExecutionResult / ExecutionDispatcher).
  • The only package both backend and workers share beneath them is unstract.core — but unstract.sdk1 already depends on unstract.core, so core can't import sdk1 (circular). The dispatcher/result logic can't live there.
  • Hosting it in sdk1 would couple the SDK to Flipt + the PG transport, which violates the UN-3603 design constraint (the SDK stays transport/flag-agnostic; gating/routing is an app concern).
  • A true single copy therefore needs a new shared package (unstract-executor-rpc) depended on by both — disproportionate for this slice, and it would also rework the already-merged backend module.

Partial extraction wouldn't clear the 3% gate (it'd need ~all 77 duplicated lines removed), so it's effectively binary: accept the intentional mirror, or build the shared package.

Decision: accept the duplication for this PR (it's a reviewed, intentional mirror behind a fail-closed gate, off by default) and track the shared-package consolidation as its own follow-up — same place the TODO(shared) in the module points. The SDK-purity constraint is worth more than the metric here.

All other review items (toolkit + greptile) are addressed in 6e3e5c6c2; 30 workers tests green, pre-commit clean. Good to merge from my side.

@muhammad-ali-e muhammad-ali-e merged commit 5a7d3d1 into feat/UN-3445-pg-queue-integration Jun 20, 2026
5 of 6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the feat/UN-3605-pg-structure-tool-executor-rpc branch June 20, 2026 10:35
@sonarqubecloud

Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
10.1% Duplication on New Code (required ≤ 3%)

See analysis details on SonarQube Cloud

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.