Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3603 [GATED-FEAT] PG Queue — blocking executor RPC on Postgres + unify gating to a single flag#2094

Merged
muhammad-ali-e merged 5 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3603-GATED-FEAT_pg_executor_rpcZipstack/unstract:feat/UN-3603-GATED-FEAT_pg_executor_rpcCopy head branch name to clipboard
Jun 19, 2026
Merged

UN-3603 [GATED-FEAT] PG Queue — blocking executor RPC on Postgres + unify gating to a single flag#2094
muhammad-ali-e merged 5 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3603-GATED-FEAT_pg_executor_rpcZipstack/unstract:feat/UN-3603-GATED-FEAT_pg_executor_rpcCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

  • A parallel PG-native executor RPC: prompt-studio's blocking dispatch() can ride Postgres instead of Celery, gated by the single pg_queue_enabled flag. The SDK ExecutionDispatcher and the Celery executor worker are untouched.
  • pg_task_result store (migration 0010) + PgResultBackend (store + poll-wait); reply_key on the shared TaskPayload; consumer result-write hook; backend gate + PgExecutionDispatcher + RoutingExecutionDispatcher; worker-pg-executor role.
  • Unify PG-queue gating to a single flag pg_queue_enabled — execution, scheduler, and executor now read one key (was pg_queue_execution_enabled / pg_scheduler_enabled / pg_executor_enabled).

Why

Migrate the executor request-reply off Celery — the last big Celery dependency (UN-3603). Cut A = prompt-studio blocking path; structure_tool (workflows) and ide_* callbacks are later slices. Consolidating the gates to one flag keeps the rollout simple to operate (one flip = whole feature), per the rollout plan (validate across envs, then a coordinated cutover).

How

  • Separate PG path selected per-dispatch by a routing dispatcher (gate read at call time → instant rollout/rollback, fail-closed). Reply via a keyed pg_task_result table + poll-wait (PgBouncer-safe; no LISTEN/NOTIFY).
  • worker-pg-executor runs the existing execute_extraction (no logic fork) and the consumer writes its result/error to pg_task_result for the blocking caller.
  • All three resolvers read pg_queue_enabled; PG_QUEUE_TRANSPORT_ENABLED (env) stays the master kill-switch.

Can this PR break any existing features. If yes, please list possible items. If no, please explain why.

No. Gate off (default) → _get_dispatcher returns the unchanged Celery ExecutionDispatcher for every mode and no pg_task_result row is created. New table is additive (metadata-only migration); reply_key is optional (absent = byte-identical fire-and-forget); the consumer hook is guarded by reply_key. The flag rename is internal (the flag is read only in the backend). Pinned by zero-regression tests + 34 passing consumer tests, and validated live: a full on→PG / off→Celery cycle for API + ETL, COMPLETED both ways.

Database Migrations

0010_pgtaskresult — new droppable pg_task_result table (additive).

Env Config

None new. Gate = existing PG_QUEUE_TRANSPORT_ENABLED + the single Flipt flag pg_queue_enabled (off until ramp).

Notes on Testing

  • New unit tests: routing/gate fail-closed + zero-regression (pg_queue/tests/test_executor_rpc.py), result-backend real-PG (workers/tests/test_pg_result_backend.py). 34 existing consumer tests + the flag-rename suites (transport/ownership) green.
  • Live-validated on a running stack: executor RPC round-trip through worker-pg-executor; single-flag on→PG / off→Celery cycle for API + ETL (both COMPLETED), with queue_message_id set on PG and task_id set on Celery.

Related Issues or PRs

UN-3603 (sub-task of UN-3536). Follow-ups: the workflow/structure_tool executor path, and the ide_* async callbacks.

Checklist

  • Appropriate PR title and description
  • Self-review done
  • Commented hard-to-understand areas
  • Added tests proving the change
  • New and existing unit tests pass locally
  • Checked for misspellings

🤖 Generated with Claude Code

…nify gating to a single flag

A parallel PG-native executor RPC for prompt-studio's blocking dispatch, gated by
the single pg_queue_enabled flag. The SDK ExecutionDispatcher and the Celery
executor worker are left completely untouched.

- pg_task_result store (migration 0010) + PgResultBackend (store / poll-wait;
  idempotent ON CONFLICT; poll-based, PgBouncer-safe — no LISTEN/NOTIFY)
- reply_key on the shared TaskPayload contract + backend producer
- consumer result-write hook: request-reply messages store result/error + ack
  after one attempt; fire-and-forget path unchanged (guarded by reply_key)
- backend executor_rpc: resolve_executor_transport gate + PgExecutionDispatcher
  (enqueue + poll, mirrors the SDK dispatch contract) + RoutingExecutionDispatcher;
  _get_dispatcher returns the routing dispatcher so all call sites stay unchanged
- worker-pg-executor role (run-worker.sh + docker-compose; broker-free)
- unify PG-queue gating to a SINGLE flag pg_queue_enabled: execution, scheduler,
  and executor all read one key (renamed from pg_queue_execution_enabled /
  pg_scheduler_enabled / pg_executor_enabled). One flip gates the whole feature;
  the PG_QUEUE_TRANSPORT_ENABLED env stays the master kill-switch.

Gated off by default. Dev-tested live: on->PG / off->Celery cycle (COMPLETED both
ways) for API + ETL; executor RPC round-trip through worker-pg-executor.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7d0c0972-ffed-4cfe-9480-6c94c05a9d30

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/UN-3603-GATED-FEAT_pg_executor_rpc

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

…tion, no-NULL error

- PgExecutionDispatcher.dispatch: drop the unused `headers` param (PG carries
  fairness in the enqueue payload, not Celery headers; the routing dispatcher no
  longer forwards headers to the PG path)
- enqueue failure handler: logger.error(exc_info=True) -> logger.exception
- PgTaskResult.error: TextField(null=True) -> TextField(blank=True, default="")
  (no-NULL-text convention); store_result writes "" on completed; migration 0010
  regenerated; result-backend test updated

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud addressed — 57fe4307b

All 3 fixed:

  1. executor_rpc.py:120 — unused headers param (Major): removed headers from PgExecutionDispatcher.dispatch. The PG path carries fairness in the enqueue payload (not Celery message headers), so RoutingExecutionDispatcher no longer forwards headers to the PG branch (the Celery branch still does).
  2. executor_rpc.py:136 — use logging.exception() (Major): the enqueue-failure handler now uses logger.exception(...) instead of logger.error(..., exc_info=True).
  3. models.py — remove null=True (Medium, Consistency): PgTaskResult.error is now TextField(blank=True, default="") (the no-NULL-text convention used elsewhere in pg_queue). store_result writes "" on a completed row; migration 0010 was regenerated and the result-backend test updated.

Tests green (8 routing + 7 result-backend), pre-commit clean, makemigrations --check clean.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR Review — PG Executor RPC (UN-3603)

Ran the PR Review Toolkit (code review, silent-failure hunt, type-design, test coverage, comment accuracy, simplification) over the 14-file diff. The design is sound and the zero-regression / fail-closed posture holds. Inline comments below are prioritised; the load-bearing ones are:

  • Critical: unguarded success-path _store_reply (consumer.py:249) — a transient result-store error silently re-runs the executor (paid LLM spend) and blocks the caller to its full timeout, logged only as a generic poll failure.
  • High: pg_task_result is never reaped — docstrings claim a retention sweep that does not exist anywhere; the table grows unbounded once the gate flips. Untested caller side (PgExecutionDispatcher.dispatch) and consumer reply_key branch. Blocking poll pins a DB connection for up to 3600s.
  • Medium: two stale pg_executor_enabled flag references contradict the unified pg_queue_enabled flag; status magic-strings + untyped dict shapes across the process boundary.

Full prioritised summary is in the PR thread.


if reply_key:
# Persist the task's return value for the waiting caller before ack.
self._store_reply(reply_key, result=eager.result)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical — unguarded success-path store re-runs the executor and blocks the caller.

_store_reply(...) here runs before the ack (delete) at line 251 and is not wrapped in try/except. If store_result raises (DB write error, serialization failure, dropped connection), the exception propagates out of _handle and is caught only by the generic handler in poll_once, logged as a generic "poll failed" with no reply_key/task_name. Consequences:

  • the message is never deleted → vt expires → the executor re-runs the full extraction (real LLM spend) up to max_attempts, then is dropped as poison;
  • the waiting caller never gets a row and blocks to its full timeout (default 3600s);
  • the failure is mislabeled, contradicting the line 219-225 promise that the caller "gets a definitive result instead of blocking to its timeout" (that only holds for the task-raised branch, not a store failure).

Also latent: store_result classifies any non-dict / None result as STATUS_FAILED, so a request-reply task that legitimately returns None/empty would be mis-stored as failed (not reachable today since execute_extraction always returns a dict, but fragile for future tasks).

Fix: wrap the store with try/except logging reply_key/task_name/msg_id at error severity, and decide explicitly whether a store failure should redeliver. This branch is also entirely untestedtest_pg_queue_consumer.py was not updated; add ok+reply_key (store result then ack) and boom+reply_key (store error then ack) cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in dce4e1c — the success-path store is now wrapped: on failure it logs at ERROR (task/msg_id/reply_key) and acks anyway to avoid re-running the executor (LLM spend); the caller degrades to a timeout rather than a poison-loop. The task-raised branch and the drop branches go through a new guarded _fail_reply. Added consumer tests: success→store+ack, raise→store-error+ack, store-failure→still-acks.

# is blocking on. Present → store the outcome to pg_task_result and ack
# after a single attempt (no vt-redeliver — see the except branch).
# Absent → fire-and-forget (the existing leaf/pipeline path).
reply_key = payload.get("reply_key")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — pre-execution drop branches lose the reply, blocking the caller to full timeout.

reply_key is read here, but the earlier drop branches above (missing task_name, poison read_ct > max_attempts, unknown task) delete the row and return without storing any reply. For a request-reply message that hits one of those paths, the blocking caller waits the full timeout (default 3600s) for a row that will never appear. For the unknown-task and poison cases especially, a definitive failure reply could be stored so the caller fails fast.

Fix: in those drop branches, if the payload carries a reply_key, call _store_reply(reply_key, error=...) before deleting so the caller gets an immediate descriptive failure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in dce4e1creply_key is read up front and the drop branches (missing task_name, poison read_ct, unknown task) now call _fail_reply(reply_key, …) before deleting, so a request-reply caller gets an immediate descriptive failure instead of blocking to its timeout. Tested (unknown-task → store error + ack).

Comment thread backend/pg_queue/executor_rpc.py Outdated
return ExecutionResult.failure(
error=f"TimeoutError: executor reply not received within {timeout}s"
)
if row.status == "completed" and row.result is not None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — magic status string + unguarded from_dict can break the never-raises contract.

Two issues on this branch:

  1. row.status == "completed" is a bare literal here, while the writer side names it STATUS_COMPLETED in result_backend.py. The two live in different deployables (backend Django ORM reader vs workers psycopg2 writer) with no shared import, so the contract is matched by eye across a process boundary — rename the writer constant and this silently treats every result as a failure. Consider promoting the status vocabulary to a StrEnum in unstract.core (which both trees already depend on).
  2. ExecutionResult.from_dict(row.result) hard-indexes data["success"]. A malformed/partial completed row raises KeyError, which is not caught — violating this class's documented "never raises" contract and surfacing as a 500 to prompt-studio.

Fix: branch on the shared enum, and wrap from_dict in try/except converting a decode error to ExecutionResult.failure(...).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in dce4e1c — both: (1) added a shared PgTaskStatus(str, Enum) in unstract.core.data_models; the reader branches on PgTaskStatus.COMPLETED.value and the writer re-exports STATUS_COMPLETED/FAILED from it → one source of truth across the process boundary. (2) from_dict is wrapped in try/except so a malformed completed row becomes ExecutionResult.failure(...), preserving the never-raises contract.

Comment thread backend/pg_queue/executor_rpc.py Outdated
timeout: int | None = None,
) -> ExecutionResult:
if timeout is None:
timeout = int(os.environ.get(_DEFAULT_TIMEOUT_ENV, _DEFAULT_TIMEOUT))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High (test gap) — PgExecutionDispatcher.dispatch is entirely untested.

test_executor_rpc.py mocks PgExecutionDispatcher out, so none of this method's four outcome branches ever run, yet its contract ("never raises; failure/timeout → ExecutionResult.failure") is the load-bearing promise prompt-studio depends on.

Add DB-free tests (mock enqueue_task and _wait_for_result): enqueue raises → returns failure and does not re-raise; _wait_for_result returns None → timeout failure with the within {timeout}s message; completed row → ExecutionResult.from_dict(row.result); failed row → failure with row.error (and the empty-error → "executor task failed" fallback); completed-but-result is None falls through to failure; timeout=None reads EXECUTOR_RESULT_TIMEOUT else 3600. Also cover _wait_for_result backoff + the min(delay, remaining) deadline clamp.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in dce4e1c — added TestPgExecutionDispatcherDispatch (8 DB-free cases, mocking enqueue_task + _wait_for_result): enqueue-raises→failure (no re-raise); timeout→failure with the within {timeout}s message; completed→from_dict; failed→row.error; empty-error→fallback; completed-but-result-None→failure; malformed-completed→failure-not-raise; timeout=NoneEXECUTOR_RESULT_TIMEOUT. The _wait_for_result backoff/clamp is the same logic as PgResultBackend.wait_for_result, which has real-PG timeout + late-write tests.

timeout,
)
row = self._wait_for_result(reply_key, timeout)
if row is None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — timeout is silent and indistinguishable from a real failure.

On timeout the caller gets ExecutionResult.failure("TimeoutError: ..."), but the executor task may still be running (and spending) and the row may land just after the deadline. Neither the timeout branch nor the task-failed branch (line 162) logs anything, so a 3600s blocking RPC can give up with no operator-visible event.

Fix: log the timeout and the task-failed branches at WARNING/ERROR with reply_key/run_id/elapsed so a caller-side timeout can be correlated with the still-running/failed pg_task_result row.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in dce4e1c — the timeout branch and the executor-failure branch now log at WARNING with reply_key/run_id, so a caller-side timeout/failure is operator-visible and correlatable with the pg_task_result row.

Comment thread backend/pg_queue/models.py Outdated
class Meta:
db_table = "pg_task_result"
indexes = [
# Drives the reaper's retention sweep (DELETE ... WHERE expires_at <= now()).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium (comment accuracy) — describes a DELETE that doesn't exist.

This index comment says it "Drives the reaper's retention sweep (DELETE ... WHERE expires_at <= now())", but no such sweep consumes pg_task_result.expires_at — the index currently backs no query. Reword to e.g. "Index for a future retention sweep; no sweeper reads it yet." (Same issue at line 318: "NULL = no expiry recorded yet" describes a state no code produces — the only writer always sets expires_at.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in dce4e1c — reworded the index + expires_at comments to "index for a future retention sweep; no sweeper reads it yet".

def _get_dispatcher():
"""Executor dispatcher for the executor worker.

Gate-routed: when ``pg_executor_enabled`` is on the blocking

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — stale flag name contradicts the code and the PR's own goal.

This docstring says "when pg_executor_enabled is on", but no such flag exists — the dispatcher reads PG_QUEUE_FLAG_KEY = "pg_queue_enabled" (executor_rpc.py:62), which is exactly the unification this PR makes. An operator following this comment would look for a flag that isn't there.

Fix: change pg_executor_enabledpg_queue_enabled. Also: _get_dispatcher (line 290) dropped its return annotation; the prior signature was -> ExecutionDispatcher. Consider annotating -> "RoutingExecutionDispatcher" (under TYPE_CHECKING) for consistency with the typed surroundings.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in dce4e1cpg_executor_enabledpg_queue_enabled in the docstring. (Left _get_dispatcher un-annotated to avoid a forward-ref import dance; the routing dispatcher is duck-typed against the SDK surface.)

Comment thread docker/docker-compose.yaml Outdated
# blocking caller. Same heavy executor runtime as worker-executor-v2 (tool
# execution → platform-service / adapters / file storage), but broker-free: it
# is the terminal of the RPC and dispatches nothing onward to Celery. Dark
# until the backend pg_executor_enabled gate is flipped.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — stale flag name. "Dark until the backend pg_executor_enabled gate is flipped" references a non-existent flag. The worker-pg-executor service is dark until the unified pg_queue_enabled flag (read by resolve_executor_transport) is flipped. Change pg_executor_enabledpg_queue_enabled.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in dce4e1cpg_executor_enabledpg_queue_enabled in the compose comment.

kwargs: dict[str, Any]
queue: str | None
fairness: FairnessPayload | None
reply_key: NotRequired[str | None]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low — reply_key: NotRequired[str | None] has three representations for two meanings.

The key can be absent, present-and-None, or present-and-string, but only two are meaningful (fire-and-forget vs request-reply). The producer only omits it or sets a string, and the consumer reads .get("reply_key") which collapses absent and None. The | None arm is dead-but-legal and reintroduces the "two empty states" problem the PgTaskResult.error column docstring explicitly set out to avoid.

Fix: tighten to reply_key: NotRequired[str] — "request-reply" is then exactly "key present."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in dce4e1c — tightened to reply_key: NotRequired[str] so "request-reply" is exactly "key present" (no dead | None arm).


logger = logging.getLogger(__name__)

# How long a stored result lives before the reaper's retention sweep may delete

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low — retention is decoupled from the caller's actual timeout.

DEFAULT_RETENTION_SECONDS = 3600 equals the executor caller-timeout default, but a caller can raise EXECUTOR_RESULT_TIMEOUT above 3600 (PgExecutionDispatcher.dispatch honours it). The docstring's promise that a result "always outlives any caller still waiting on it" then breaks: if the retention sweep is added (see models.py:304), a result could be deleted while a caller is still polling → spurious timeout. These two constants live in separate packages with no shared source.

Fix: when the sweep lands, derive retention from max(EXECUTOR_RESULT_TIMEOUT, DEFAULT_RETENTION_SECONDS) plus a margin (or pass the caller timeout through the payload), and document the retention >= caller_timeout invariant.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — deferred with the sweep (models.py:304). With no sweep yet, the decoupling is currently moot (nothing deletes a row out from under a caller). When the sweep lands I'll derive retention from max(EXECUTOR_RESULT_TIMEOUT, default) + margin and document the retention >= caller_timeout invariant. Tracked in the same follow-up.

…e-flag constant, tests, doc accuracy

Toolkit review (UN-3603):
- Critical: guard the success-path result store in the consumer — a store failure
  now logs + acks (avoids re-running the executor = LLM spend) instead of
  vt-redelivering; pre-execution drop branches (malformed/poison/unknown) and the
  task-raised branch store a definitive failure reply via a new guarded
  _fail_reply so the caller fails fast instead of blocking to its timeout
- Med: shared PgTaskStatus StrEnum in unstract.core (writer + reader agree across
  the process boundary); guard ExecutionResult.from_dict so a malformed completed
  row can't break the never-raises contract; log timeout/failure branches
- High: release the DB connection between dispatch polls (close_old_connections)
  + document dispatch must not run inside a transaction
- Med: single source of truth for the flag key (pg_queue/flags.py PG_QUEUE_FLAG_KEY,
  imported by transport/ownership/executor_rpc; SCHEDULER_FLAG_KEY folded in)
- Med: result-backend logs the swallowed rollback failure
- doc/accuracy: soften the not-yet-wired retention-sweep claims; fix stale
  pg_executor_enabled -> pg_queue_enabled comments; tighten reply_key to NotRequired[str]
- tests: PgExecutionDispatcher.dispatch (8 branches, DB-free) + consumer reply_key
  store/ack + drop-branch + store-failure-still-acks cases

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review addressed — dce4e1ca2

Thanks for the thorough pass. Disposition of the 16 findings:

Fixed (13)

  • Critical (consumer success-path store): now guarded — a store failure logs at ERROR and acks anyway (no expensive executor re-run); caller degrades to a timeout, not a poison-loop.
  • High (drop branches lose the reply): reply_key read up front; malformed/poison/unknown + task-raised branches store a definitive failure via a new guarded _fail_reply.
  • High (connection pinning): close_old_connections() between polls + documented "no open transaction".
  • High (dispatch untested): TestPgExecutionDispatcherDispatch — 8 DB-free outcome branches.
  • Med (magic status + from_dict): shared PgTaskStatus enum in unstract.core (writer+reader agree); from_dict wrapped to honour never-raises.
  • Med (timeout silent): timeout + failure branches now log with reply_key/run_id.
  • Med (flag string ×3): single pg_queue/flags.py PG_QUEUE_FLAG_KEY, imported by all three resolvers; SCHEDULER_FLAG_KEY folded in.
  • Med (rollback failure unlogged): now logged.
  • Med/Low (doc accuracy): softened the not-yet-wired retention-sweep claims; fixed stale pg_executor_enabledpg_queue_enabled; reply_keyNotRequired[str].
  • Plus consumer reply_key tests (success/raise/unknown/store-failure).

Partially fixed / deferred, with reasoning (3) — replied inline:

  • High (pg_task_result never reaped): docstrings now state the sweep is not yet wired (acceptable while gated off; tracked before the gate ramps to 100%). The actual sweep belongs with the reaper + the decommission slice — filed as follow-up rather than shipped half-done.
  • High (Flipt-outage observability / broad except): kept consistent with the sibling resolvers (resolve_transport / resolve_schedule_owner); a metric/error-id should land uniformly across all three — follow-up.
  • Med (dead-conn breadth): added the missing log; kept the PgQueueClient-mirrored recycle logic (a successful rollback() already recovers non-fatal errors; broadening would over-recycle) — would fix both modules together as a follow-up.

All tests green (45 backend incl. 8 new dispatch cases; result-backend + consumer reply_key suites), pre-commit clean, no migration drift. Verdict noted — appreciated.

…mment-as-code

- consumer.py: the two new request-reply error handlers use logger.exception()
  instead of logger.error(..., exc_info=True)
- test_executor_rpc.py: reword the `# timeout=None` inline comment (Sonar read it
  as commented-out code)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud (round 2) — 0cf46ea9d

The 3 new findings (all introduced by the previous review-fix commit) are fixed:

  • consumer.py — the two new request-reply error handlers now use logger.exception() instead of logger.error(..., exc_info=True).
  • test_executor_rpc.py:124 — reworded the # timeout=None inline comment (Sonar read it as commented-out code).

Tests green (16 executor-rpc + consumer reply_key suite), pre-commit clean.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 19, 2026 17:55
@greptile-apps

greptile-apps Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds a PG-native request-reply executor RPC path for prompt-studio's blocking dispatch() call, gated by a unified pg_queue_enabled Flipt flag that replaces the three separate per-subsystem flags. The core mechanism: PgExecutionDispatcher enqueues execute_extraction with a unique reply_key, then polls pg_task_result until the new worker-pg-executor consumer writes the result or the timeout elapses.

  • New PgTaskResult store (0010 migration + Django model + raw-SQL PgResultBackend): idempotent INSERT … ON CONFLICT DO NOTHING makes at-least-once redelivery safe; expires_at column is indexed for a future retention sweep (acknowledged as not yet wired).
  • Consumer extended with reply_key handling: definitive failure written on every drop branch so the caller gets a fast result instead of blocking to the full timeout; ack-on-store-failure avoids double LLM execution.
  • Flag unification: pg_queue.flags.PG_QUEUE_FLAG_KEY is the single constant imported by all three resolvers; transport.py re-exports it for backward-compatible imports.

Confidence Score: 3/5

Safe to merge with the VT fix applied to docker-compose; the gate defaults to off so production is unaffected until the flag is flipped, but the docker-compose as written will double-execute any LLM task running longer than 30 s once the feature is enabled.

The logic and idempotency guarantees are solid, but worker-pg-executor in docker-compose is missing WORKER_PG_QUEUE_CONSUMER_VT_SECONDS and WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS. LLM inference routinely exceeds the 30 s default VT, causing a second consumer to re-claim and re-run the same executor task — consuming tokens twice and leaving two in-flight runs racing toward the same reply_key. The gate is off by default so no existing behavior is broken today, but the issue manifests immediately once the flag is ramped.

docker/docker-compose.yaml — the new worker-pg-executor service needs VT and health-stale tuning before the gate is ramped.

Important Files Changed

Filename Overview
docker/docker-compose.yaml Adds worker-pg-executor service but omits WORKER_PG_QUEUE_CONSUMER_VT_SECONDS and WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS; with the 30 s / 60 s defaults, any LLM task exceeding 30 s risks double-execution and token double-spend.
backend/pg_queue/executor_rpc.py New PG executor RPC dispatcher: gate logic, PgExecutionDispatcher (enqueue + poll-wait), and RoutingExecutionDispatcher; one minor 'never raises' gap with env-var int conversion outside try/except.
workers/queue_backend/pg_queue/result_backend.py New PgResultBackend: clean idempotent INSERT ON CONFLICT DO NOTHING store, poll-based wait, connection recycling on error, context-manager cleanup; well-structured and tested.
workers/queue_backend/pg_queue/consumer.py Adds reply_key handling: definitive failure writes on drop/poison/unknown/raise, success path stores result then acks; intentional ack-on-store-failure to avoid double LLM spend is well-commented.
backend/pg_queue/migrations/0010_pgtaskresult.py Additive migration creating pg_task_result table with expires_at index; safe to apply and drop.
unstract/core/src/unstract/core/data_models.py Adds NotRequired[str] reply_key to TaskPayload and the shared PgTaskStatus enum; clean shared contract between writer and reader processes.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant PS as PromptStudioHelper
    participant RD as RoutingExecutionDispatcher
    participant Gate as resolve_executor_transport
    participant PD as PgExecutionDispatcher
    participant PQ as pg_queue_message
    participant WK as worker-pg-executor
    participant TR as pg_task_result

    PS->>RD: dispatch(context)
    RD->>Gate: pg_queue_enabled?
    alt PG enabled
        Gate-->>RD: True
        RD->>PD: dispatch(context)
        PD->>PQ: INSERT execute_extraction + reply_key
        PD->>PD: _wait_for_result poll with backoff
        WK->>PQ: SKIP LOCKED claim
        WK->>WK: task.apply() execute_extraction
        WK->>TR: store_result ON CONFLICT DO NOTHING
        WK->>PQ: DELETE ack
        PD->>TR: "SELECT WHERE task_id = reply_key"
        TR-->>PD: status result error
        PD-->>RD: ExecutionResult
    else PG disabled
        Gate-->>RD: False
        RD->>RD: Celery ExecutionDispatcher.dispatch()
    end
    RD-->>PS: ExecutionResult
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant PS as PromptStudioHelper
    participant RD as RoutingExecutionDispatcher
    participant Gate as resolve_executor_transport
    participant PD as PgExecutionDispatcher
    participant PQ as pg_queue_message
    participant WK as worker-pg-executor
    participant TR as pg_task_result

    PS->>RD: dispatch(context)
    RD->>Gate: pg_queue_enabled?
    alt PG enabled
        Gate-->>RD: True
        RD->>PD: dispatch(context)
        PD->>PQ: INSERT execute_extraction + reply_key
        PD->>PD: _wait_for_result poll with backoff
        WK->>PQ: SKIP LOCKED claim
        WK->>WK: task.apply() execute_extraction
        WK->>TR: store_result ON CONFLICT DO NOTHING
        WK->>PQ: DELETE ack
        PD->>TR: "SELECT WHERE task_id = reply_key"
        TR-->>PD: status result error
        PD-->>RD: ExecutionResult
    else PG disabled
        Gate-->>RD: False
        RD->>RD: Celery ExecutionDispatcher.dispatch()
    end
    RD-->>PS: ExecutionResult
Loading

Fix All in Claude Code

Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
docker/docker-compose.yaml:732-739
**Missing VT and health-stale configuration for executor worker**

`worker-pg-executor` is missing `WORKER_PG_QUEUE_CONSUMER_VT_SECONDS` and `WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS`. The bootstrap reads these via `_env("VT_SECONDS", _DEFAULT_VT_SECONDS, int)` (line 451 of `consumer.py`) and `_env("HEALTH_STALE_SECONDS", _DEFAULT_HEALTH_STALE_SECONDS, float)`. Both defaults are 30 s and 60 s respectively.

LLM inference tasks routinely take well over 30 s. With VT=30 s, the message becomes visible again while the executor is still running — a second consumer instance will claim it and start a parallel execution, burning tokens twice. With health_stale=60 s, tasks exceeding 60 s also trigger a pod restart and re-delivery. Both values need to be set to at least the caller-side timeout (`EXECUTOR_RESULT_TIMEOUT`, default 3600 s).

Add to the `environment` block:
```
- WORKER_PG_QUEUE_CONSUMER_VT_SECONDS=3600
- WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS=3700
```

### Issue 2 of 3
backend/pg_queue/executor_rpc.py:115-121
`int(os.environ.get(...))` is called outside the surrounding try/except block. If `EXECUTOR_RESULT_TIMEOUT` is set to a non-integer string (e.g. a typo), `int()` raises `ValueError` which propagates to the caller, violating the documented "never raises" contract for `dispatch()`.

```suggestion
    def dispatch(
        self,
        context: ExecutionContext,
        timeout: int | None = None,
    ) -> ExecutionResult:
        if timeout is None:
            try:
                timeout = int(os.environ.get(_DEFAULT_TIMEOUT_ENV, _DEFAULT_TIMEOUT))
            except (TypeError, ValueError):
                logger.warning(
                    "PG executor dispatch: %s env is not a valid integer; "
                    "falling back to default %ss",
                    _DEFAULT_TIMEOUT_ENV,
                    _DEFAULT_TIMEOUT,
                )
                timeout = _DEFAULT_TIMEOUT
```

### Issue 3 of 3
backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py:289-291
The return type annotation was dropped when the dispatcher was changed from `ExecutionDispatcher` to `RoutingExecutionDispatcher`. Adding it restores static-analysis coverage and makes the duck-typed interface explicit to readers.

```suggestion
    @staticmethod
    def _get_dispatcher() -> "RoutingExecutionDispatcher":
        """Executor dispatcher for the executor worker.
```

Reviews (1): Last reviewed commit: "UN-3603 [GATED-FEAT] address SonarCloud ..." | Re-trigger Greptile

Comment thread docker/docker-compose.yaml
Comment thread backend/pg_queue/executor_rpc.py Outdated
Comment on lines 289 to +291
@staticmethod
def _get_dispatcher() -> ExecutionDispatcher:
"""Get an ExecutionDispatcher for the executor worker."""
return ExecutionDispatcher(celery_app=celery_app)
def _get_dispatcher():
"""Executor dispatcher for the executor worker.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The return type annotation was dropped when the dispatcher was changed from ExecutionDispatcher to RoutingExecutionDispatcher. Adding it restores static-analysis coverage and makes the duck-typed interface explicit to readers.

Suggested change
@staticmethod
def _get_dispatcher() -> ExecutionDispatcher:
"""Get an ExecutionDispatcher for the executor worker."""
return ExecutionDispatcher(celery_app=celery_app)
def _get_dispatcher():
"""Executor dispatcher for the executor worker.
@staticmethod
def _get_dispatcher() -> "RoutingExecutionDispatcher":
"""Executor dispatcher for the executor worker.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 289-291

Comment:
The return type annotation was dropped when the dispatcher was changed from `ExecutionDispatcher` to `RoutingExecutionDispatcher`. Adding it restores static-analysis coverage and makes the duck-typed interface explicit to readers.

```suggestion
    @staticmethod
    def _get_dispatcher() -> "RoutingExecutionDispatcher":
        """Executor dispatcher for the executor worker.
```

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Fix in Claude Code

…uning + guard env timeout parse

greptile's load-bearing finding: worker-pg-executor inherited the consumer's
30s/60s vt / health-stale defaults, so an LLM task exceeding 30s would be
re-claimed mid-run once the gate ramps — double execution + token double-spend,
two runs racing the same reply_key.
- docker-compose: set WORKER_PG_QUEUE_CONSUMER_VT_SECONDS=3660 and
  HEALTH_STALE_SECONDS=3720 on worker-pg-executor (above the executor's hard
  EXECUTOR_TASK_TIME_LIMIT=3600), both overridable via env
- executor_rpc.dispatch: guard the EXECUTOR_RESULT_TIMEOUT int parse so a
  misconfigured value can't raise out of dispatch() (never-raises) + test

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

greptile addressed — 022e22c21

Thanks — the VT finding is a real one and would have bitten at ramp time.

The load-bearing finding (worker-pg-executor VT / health-stale): Fixed. The executor runs LLM inference up to EXECUTOR_TASK_TIME_LIMIT=3600s, but the consumer defaults are vt=30s / health-stale=60s — so a long task would be re-claimed mid-run (double execution + token double-spend) or killed by the liveness probe. The worker-pg-executor service now sets, above the 3600s task limit, both overridable:

  • WORKER_PG_QUEUE_CONSUMER_VT_SECONDS=3660
  • WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS=3720

(docker compose config confirms they resolve.)

Minor never-raises gap (env int-parse outside try/except): Fixed — EXECUTOR_RESULT_TIMEOUT is parsed inside a try/except (TypeError, ValueError) that falls back to the default, so a misconfigured value can't raise out of dispatch(). Added a test.

The earlier idempotency / drop-branch / ack-on-store-failure items you noted as already solid were from the prior review round. Tests green (17 executor-rpc), pre-commit + docker compose config clean.

@muhammad-ali-e muhammad-ali-e merged commit fd38637 into feat/UN-3445-pg-queue-integration Jun 19, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the feat/UN-3603-GATED-FEAT_pg_executor_rpc branch June 19, 2026 18:27
@sonarqubecloud

Copy link
Copy Markdown

@greptile-apps

greptile-apps Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Greptile encountered an error while reviewing this PR. Please reach out to support@greptile.com for assistance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.