UN-3603 [GATED-FEAT] PG Queue — blocking executor RPC on Postgres + unify gating to a single flag#2094
UN-3603 [GATED-FEAT] PG Queue — blocking executor RPC on Postgres + unify gating to a single flag#2094muhammad-ali-e merged 5 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom feat/UN-3603-GATED-FEAT_pg_executor_rpcZipstack/unstract:feat/UN-3603-GATED-FEAT_pg_executor_rpcCopy head branch name to clipboard
Conversation
…nify gating to a single flag A parallel PG-native executor RPC for prompt-studio's blocking dispatch, gated by the single pg_queue_enabled flag. The SDK ExecutionDispatcher and the Celery executor worker are left completely untouched. - pg_task_result store (migration 0010) + PgResultBackend (store / poll-wait; idempotent ON CONFLICT; poll-based, PgBouncer-safe — no LISTEN/NOTIFY) - reply_key on the shared TaskPayload contract + backend producer - consumer result-write hook: request-reply messages store result/error + ack after one attempt; fire-and-forget path unchanged (guarded by reply_key) - backend executor_rpc: resolve_executor_transport gate + PgExecutionDispatcher (enqueue + poll, mirrors the SDK dispatch contract) + RoutingExecutionDispatcher; _get_dispatcher returns the routing dispatcher so all call sites stay unchanged - worker-pg-executor role (run-worker.sh + docker-compose; broker-free) - unify PG-queue gating to a SINGLE flag pg_queue_enabled: execution, scheduler, and executor all read one key (renamed from pg_queue_execution_enabled / pg_scheduler_enabled / pg_executor_enabled). One flip gates the whole feature; the PG_QUEUE_TRANSPORT_ENABLED env stays the master kill-switch. Gated off by default. Dev-tested live: on->PG / off->Celery cycle (COMPLETED both ways) for API + ETL; executor RPC round-trip through worker-pg-executor. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…tion, no-NULL error - PgExecutionDispatcher.dispatch: drop the unused `headers` param (PG carries fairness in the enqueue payload, not Celery headers; the routing dispatcher no longer forwards headers to the PG path) - enqueue failure handler: logger.error(exc_info=True) -> logger.exception - PgTaskResult.error: TextField(null=True) -> TextField(blank=True, default="") (no-NULL-text convention); store_result writes "" on completed; migration 0010 regenerated; result-backend test updated Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
SonarCloud addressed —
|
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review — PG Executor RPC (UN-3603)
Ran the PR Review Toolkit (code review, silent-failure hunt, type-design, test coverage, comment accuracy, simplification) over the 14-file diff. The design is sound and the zero-regression / fail-closed posture holds. Inline comments below are prioritised; the load-bearing ones are:
- Critical: unguarded success-path
_store_reply(consumer.py:249) — a transient result-store error silently re-runs the executor (paid LLM spend) and blocks the caller to its full timeout, logged only as a generic poll failure. - High:
pg_task_resultis never reaped — docstrings claim a retention sweep that does not exist anywhere; the table grows unbounded once the gate flips. Untested caller side (PgExecutionDispatcher.dispatch) and consumer reply_key branch. Blocking poll pins a DB connection for up to 3600s. - Medium: two stale
pg_executor_enabledflag references contradict the unifiedpg_queue_enabledflag; status magic-strings + untyped dict shapes across the process boundary.
Full prioritised summary is in the PR thread.
|
|
||
| if reply_key: | ||
| # Persist the task's return value for the waiting caller before ack. | ||
| self._store_reply(reply_key, result=eager.result) |
There was a problem hiding this comment.
Critical — unguarded success-path store re-runs the executor and blocks the caller.
_store_reply(...) here runs before the ack (delete) at line 251 and is not wrapped in try/except. If store_result raises (DB write error, serialization failure, dropped connection), the exception propagates out of _handle and is caught only by the generic handler in poll_once, logged as a generic "poll failed" with no reply_key/task_name. Consequences:
- the message is never deleted → vt expires → the executor re-runs the full extraction (real LLM spend) up to
max_attempts, then is dropped as poison; - the waiting caller never gets a row and blocks to its full timeout (default 3600s);
- the failure is mislabeled, contradicting the line 219-225 promise that the caller "gets a definitive result instead of blocking to its timeout" (that only holds for the task-raised branch, not a store failure).
Also latent: store_result classifies any non-dict / None result as STATUS_FAILED, so a request-reply task that legitimately returns None/empty would be mis-stored as failed (not reachable today since execute_extraction always returns a dict, but fragile for future tasks).
Fix: wrap the store with try/except logging reply_key/task_name/msg_id at error severity, and decide explicitly whether a store failure should redeliver. This branch is also entirely untested — test_pg_queue_consumer.py was not updated; add ok+reply_key (store result then ack) and boom+reply_key (store error then ack) cases.
There was a problem hiding this comment.
Fixed in dce4e1c — the success-path store is now wrapped: on failure it logs at ERROR (task/msg_id/reply_key) and acks anyway to avoid re-running the executor (LLM spend); the caller degrades to a timeout rather than a poison-loop. The task-raised branch and the drop branches go through a new guarded _fail_reply. Added consumer tests: success→store+ack, raise→store-error+ack, store-failure→still-acks.
| # is blocking on. Present → store the outcome to pg_task_result and ack | ||
| # after a single attempt (no vt-redeliver — see the except branch). | ||
| # Absent → fire-and-forget (the existing leaf/pipeline path). | ||
| reply_key = payload.get("reply_key") |
There was a problem hiding this comment.
Medium — pre-execution drop branches lose the reply, blocking the caller to full timeout.
reply_key is read here, but the earlier drop branches above (missing task_name, poison read_ct > max_attempts, unknown task) delete the row and return without storing any reply. For a request-reply message that hits one of those paths, the blocking caller waits the full timeout (default 3600s) for a row that will never appear. For the unknown-task and poison cases especially, a definitive failure reply could be stored so the caller fails fast.
Fix: in those drop branches, if the payload carries a reply_key, call _store_reply(reply_key, error=...) before deleting so the caller gets an immediate descriptive failure.
There was a problem hiding this comment.
Fixed in dce4e1c — reply_key is read up front and the drop branches (missing task_name, poison read_ct, unknown task) now call _fail_reply(reply_key, …) before deleting, so a request-reply caller gets an immediate descriptive failure instead of blocking to its timeout. Tested (unknown-task → store error + ack).
| return ExecutionResult.failure( | ||
| error=f"TimeoutError: executor reply not received within {timeout}s" | ||
| ) | ||
| if row.status == "completed" and row.result is not None: |
There was a problem hiding this comment.
Medium — magic status string + unguarded from_dict can break the never-raises contract.
Two issues on this branch:
row.status == "completed"is a bare literal here, while the writer side names itSTATUS_COMPLETEDinresult_backend.py. The two live in different deployables (backendDjango ORM reader vsworkerspsycopg2 writer) with no shared import, so the contract is matched by eye across a process boundary — rename the writer constant and this silently treats every result as a failure. Consider promoting the status vocabulary to aStrEnuminunstract.core(which both trees already depend on).ExecutionResult.from_dict(row.result)hard-indexesdata["success"]. A malformed/partialcompletedrow raisesKeyError, which is not caught — violating this class's documented "never raises" contract and surfacing as a 500 to prompt-studio.
Fix: branch on the shared enum, and wrap from_dict in try/except converting a decode error to ExecutionResult.failure(...).
There was a problem hiding this comment.
Fixed in dce4e1c — both: (1) added a shared PgTaskStatus(str, Enum) in unstract.core.data_models; the reader branches on PgTaskStatus.COMPLETED.value and the writer re-exports STATUS_COMPLETED/FAILED from it → one source of truth across the process boundary. (2) from_dict is wrapped in try/except so a malformed completed row becomes ExecutionResult.failure(...), preserving the never-raises contract.
| timeout: int | None = None, | ||
| ) -> ExecutionResult: | ||
| if timeout is None: | ||
| timeout = int(os.environ.get(_DEFAULT_TIMEOUT_ENV, _DEFAULT_TIMEOUT)) |
There was a problem hiding this comment.
High (test gap) — PgExecutionDispatcher.dispatch is entirely untested.
test_executor_rpc.py mocks PgExecutionDispatcher out, so none of this method's four outcome branches ever run, yet its contract ("never raises; failure/timeout → ExecutionResult.failure") is the load-bearing promise prompt-studio depends on.
Add DB-free tests (mock enqueue_task and _wait_for_result): enqueue raises → returns failure and does not re-raise; _wait_for_result returns None → timeout failure with the within {timeout}s message; completed row → ExecutionResult.from_dict(row.result); failed row → failure with row.error (and the empty-error → "executor task failed" fallback); completed-but-result is None falls through to failure; timeout=None reads EXECUTOR_RESULT_TIMEOUT else 3600. Also cover _wait_for_result backoff + the min(delay, remaining) deadline clamp.
There was a problem hiding this comment.
Fixed in dce4e1c — added TestPgExecutionDispatcherDispatch (8 DB-free cases, mocking enqueue_task + _wait_for_result): enqueue-raises→failure (no re-raise); timeout→failure with the within {timeout}s message; completed→from_dict; failed→row.error; empty-error→fallback; completed-but-result-None→failure; malformed-completed→failure-not-raise; timeout=None→EXECUTOR_RESULT_TIMEOUT. The _wait_for_result backoff/clamp is the same logic as PgResultBackend.wait_for_result, which has real-PG timeout + late-write tests.
| timeout, | ||
| ) | ||
| row = self._wait_for_result(reply_key, timeout) | ||
| if row is None: |
There was a problem hiding this comment.
Medium — timeout is silent and indistinguishable from a real failure.
On timeout the caller gets ExecutionResult.failure("TimeoutError: ..."), but the executor task may still be running (and spending) and the row may land just after the deadline. Neither the timeout branch nor the task-failed branch (line 162) logs anything, so a 3600s blocking RPC can give up with no operator-visible event.
Fix: log the timeout and the task-failed branches at WARNING/ERROR with reply_key/run_id/elapsed so a caller-side timeout can be correlated with the still-running/failed pg_task_result row.
There was a problem hiding this comment.
Fixed in dce4e1c — the timeout branch and the executor-failure branch now log at WARNING with reply_key/run_id, so a caller-side timeout/failure is operator-visible and correlatable with the pg_task_result row.
| class Meta: | ||
| db_table = "pg_task_result" | ||
| indexes = [ | ||
| # Drives the reaper's retention sweep (DELETE ... WHERE expires_at <= now()). |
There was a problem hiding this comment.
Medium (comment accuracy) — describes a DELETE that doesn't exist.
This index comment says it "Drives the reaper's retention sweep (DELETE ... WHERE expires_at <= now())", but no such sweep consumes pg_task_result.expires_at — the index currently backs no query. Reword to e.g. "Index for a future retention sweep; no sweeper reads it yet." (Same issue at line 318: "NULL = no expiry recorded yet" describes a state no code produces — the only writer always sets expires_at.)
There was a problem hiding this comment.
Fixed in dce4e1c — reworded the index + expires_at comments to "index for a future retention sweep; no sweeper reads it yet".
| def _get_dispatcher(): | ||
| """Executor dispatcher for the executor worker. | ||
|
|
||
| Gate-routed: when ``pg_executor_enabled`` is on the blocking |
There was a problem hiding this comment.
Medium — stale flag name contradicts the code and the PR's own goal.
This docstring says "when pg_executor_enabled is on", but no such flag exists — the dispatcher reads PG_QUEUE_FLAG_KEY = "pg_queue_enabled" (executor_rpc.py:62), which is exactly the unification this PR makes. An operator following this comment would look for a flag that isn't there.
Fix: change pg_executor_enabled → pg_queue_enabled. Also: _get_dispatcher (line 290) dropped its return annotation; the prior signature was -> ExecutionDispatcher. Consider annotating -> "RoutingExecutionDispatcher" (under TYPE_CHECKING) for consistency with the typed surroundings.
There was a problem hiding this comment.
Fixed in dce4e1c — pg_executor_enabled → pg_queue_enabled in the docstring. (Left _get_dispatcher un-annotated to avoid a forward-ref import dance; the routing dispatcher is duck-typed against the SDK surface.)
| # blocking caller. Same heavy executor runtime as worker-executor-v2 (tool | ||
| # execution → platform-service / adapters / file storage), but broker-free: it | ||
| # is the terminal of the RPC and dispatches nothing onward to Celery. Dark | ||
| # until the backend pg_executor_enabled gate is flipped. |
There was a problem hiding this comment.
Medium — stale flag name. "Dark until the backend pg_executor_enabled gate is flipped" references a non-existent flag. The worker-pg-executor service is dark until the unified pg_queue_enabled flag (read by resolve_executor_transport) is flipped. Change pg_executor_enabled → pg_queue_enabled.
There was a problem hiding this comment.
Fixed in dce4e1c — pg_executor_enabled → pg_queue_enabled in the compose comment.
| kwargs: dict[str, Any] | ||
| queue: str | None | ||
| fairness: FairnessPayload | None | ||
| reply_key: NotRequired[str | None] |
There was a problem hiding this comment.
Low — reply_key: NotRequired[str | None] has three representations for two meanings.
The key can be absent, present-and-None, or present-and-string, but only two are meaningful (fire-and-forget vs request-reply). The producer only omits it or sets a string, and the consumer reads .get("reply_key") which collapses absent and None. The | None arm is dead-but-legal and reintroduces the "two empty states" problem the PgTaskResult.error column docstring explicitly set out to avoid.
Fix: tighten to reply_key: NotRequired[str] — "request-reply" is then exactly "key present."
There was a problem hiding this comment.
Fixed in dce4e1c — tightened to reply_key: NotRequired[str] so "request-reply" is exactly "key present" (no dead | None arm).
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # How long a stored result lives before the reaper's retention sweep may delete |
There was a problem hiding this comment.
Low — retention is decoupled from the caller's actual timeout.
DEFAULT_RETENTION_SECONDS = 3600 equals the executor caller-timeout default, but a caller can raise EXECUTOR_RESULT_TIMEOUT above 3600 (PgExecutionDispatcher.dispatch honours it). The docstring's promise that a result "always outlives any caller still waiting on it" then breaks: if the retention sweep is added (see models.py:304), a result could be deleted while a caller is still polling → spurious timeout. These two constants live in separate packages with no shared source.
Fix: when the sweep lands, derive retention from max(EXECUTOR_RESULT_TIMEOUT, DEFAULT_RETENTION_SECONDS) plus a margin (or pass the caller timeout through the payload), and document the retention >= caller_timeout invariant.
There was a problem hiding this comment.
Acknowledged — deferred with the sweep (models.py:304). With no sweep yet, the decoupling is currently moot (nothing deletes a row out from under a caller). When the sweep lands I'll derive retention from max(EXECUTOR_RESULT_TIMEOUT, default) + margin and document the retention >= caller_timeout invariant. Tracked in the same follow-up.
…e-flag constant, tests, doc accuracy Toolkit review (UN-3603): - Critical: guard the success-path result store in the consumer — a store failure now logs + acks (avoids re-running the executor = LLM spend) instead of vt-redelivering; pre-execution drop branches (malformed/poison/unknown) and the task-raised branch store a definitive failure reply via a new guarded _fail_reply so the caller fails fast instead of blocking to its timeout - Med: shared PgTaskStatus StrEnum in unstract.core (writer + reader agree across the process boundary); guard ExecutionResult.from_dict so a malformed completed row can't break the never-raises contract; log timeout/failure branches - High: release the DB connection between dispatch polls (close_old_connections) + document dispatch must not run inside a transaction - Med: single source of truth for the flag key (pg_queue/flags.py PG_QUEUE_FLAG_KEY, imported by transport/ownership/executor_rpc; SCHEDULER_FLAG_KEY folded in) - Med: result-backend logs the swallowed rollback failure - doc/accuracy: soften the not-yet-wired retention-sweep claims; fix stale pg_executor_enabled -> pg_queue_enabled comments; tighten reply_key to NotRequired[str] - tests: PgExecutionDispatcher.dispatch (8 branches, DB-free) + consumer reply_key store/ack + drop-branch + store-failure-still-acks cases Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review addressed —
|
…mment-as-code - consumer.py: the two new request-reply error handlers use logger.exception() instead of logger.error(..., exc_info=True) - test_executor_rpc.py: reword the `# timeout=None` inline comment (Sonar read it as commented-out code) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
SonarCloud (round 2) —
|
|
| Filename | Overview |
|---|---|
| docker/docker-compose.yaml | Adds worker-pg-executor service but omits WORKER_PG_QUEUE_CONSUMER_VT_SECONDS and WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS; with the 30 s / 60 s defaults, any LLM task exceeding 30 s risks double-execution and token double-spend. |
| backend/pg_queue/executor_rpc.py | New PG executor RPC dispatcher: gate logic, PgExecutionDispatcher (enqueue + poll-wait), and RoutingExecutionDispatcher; one minor 'never raises' gap with env-var int conversion outside try/except. |
| workers/queue_backend/pg_queue/result_backend.py | New PgResultBackend: clean idempotent INSERT ON CONFLICT DO NOTHING store, poll-based wait, connection recycling on error, context-manager cleanup; well-structured and tested. |
| workers/queue_backend/pg_queue/consumer.py | Adds reply_key handling: definitive failure writes on drop/poison/unknown/raise, success path stores result then acks; intentional ack-on-store-failure to avoid double LLM spend is well-commented. |
| backend/pg_queue/migrations/0010_pgtaskresult.py | Additive migration creating pg_task_result table with expires_at index; safe to apply and drop. |
| unstract/core/src/unstract/core/data_models.py | Adds NotRequired[str] reply_key to TaskPayload and the shared PgTaskStatus enum; clean shared contract between writer and reader processes. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant PS as PromptStudioHelper
participant RD as RoutingExecutionDispatcher
participant Gate as resolve_executor_transport
participant PD as PgExecutionDispatcher
participant PQ as pg_queue_message
participant WK as worker-pg-executor
participant TR as pg_task_result
PS->>RD: dispatch(context)
RD->>Gate: pg_queue_enabled?
alt PG enabled
Gate-->>RD: True
RD->>PD: dispatch(context)
PD->>PQ: INSERT execute_extraction + reply_key
PD->>PD: _wait_for_result poll with backoff
WK->>PQ: SKIP LOCKED claim
WK->>WK: task.apply() execute_extraction
WK->>TR: store_result ON CONFLICT DO NOTHING
WK->>PQ: DELETE ack
PD->>TR: "SELECT WHERE task_id = reply_key"
TR-->>PD: status result error
PD-->>RD: ExecutionResult
else PG disabled
Gate-->>RD: False
RD->>RD: Celery ExecutionDispatcher.dispatch()
end
RD-->>PS: ExecutionResult
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant PS as PromptStudioHelper
participant RD as RoutingExecutionDispatcher
participant Gate as resolve_executor_transport
participant PD as PgExecutionDispatcher
participant PQ as pg_queue_message
participant WK as worker-pg-executor
participant TR as pg_task_result
PS->>RD: dispatch(context)
RD->>Gate: pg_queue_enabled?
alt PG enabled
Gate-->>RD: True
RD->>PD: dispatch(context)
PD->>PQ: INSERT execute_extraction + reply_key
PD->>PD: _wait_for_result poll with backoff
WK->>PQ: SKIP LOCKED claim
WK->>WK: task.apply() execute_extraction
WK->>TR: store_result ON CONFLICT DO NOTHING
WK->>PQ: DELETE ack
PD->>TR: "SELECT WHERE task_id = reply_key"
TR-->>PD: status result error
PD-->>RD: ExecutionResult
else PG disabled
Gate-->>RD: False
RD->>RD: Celery ExecutionDispatcher.dispatch()
end
RD-->>PS: ExecutionResult
Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 3
docker/docker-compose.yaml:732-739
**Missing VT and health-stale configuration for executor worker**
`worker-pg-executor` is missing `WORKER_PG_QUEUE_CONSUMER_VT_SECONDS` and `WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS`. The bootstrap reads these via `_env("VT_SECONDS", _DEFAULT_VT_SECONDS, int)` (line 451 of `consumer.py`) and `_env("HEALTH_STALE_SECONDS", _DEFAULT_HEALTH_STALE_SECONDS, float)`. Both defaults are 30 s and 60 s respectively.
LLM inference tasks routinely take well over 30 s. With VT=30 s, the message becomes visible again while the executor is still running — a second consumer instance will claim it and start a parallel execution, burning tokens twice. With health_stale=60 s, tasks exceeding 60 s also trigger a pod restart and re-delivery. Both values need to be set to at least the caller-side timeout (`EXECUTOR_RESULT_TIMEOUT`, default 3600 s).
Add to the `environment` block:
```
- WORKER_PG_QUEUE_CONSUMER_VT_SECONDS=3600
- WORKER_PG_QUEUE_CONSUMER_HEALTH_STALE_SECONDS=3700
```
### Issue 2 of 3
backend/pg_queue/executor_rpc.py:115-121
`int(os.environ.get(...))` is called outside the surrounding try/except block. If `EXECUTOR_RESULT_TIMEOUT` is set to a non-integer string (e.g. a typo), `int()` raises `ValueError` which propagates to the caller, violating the documented "never raises" contract for `dispatch()`.
```suggestion
def dispatch(
self,
context: ExecutionContext,
timeout: int | None = None,
) -> ExecutionResult:
if timeout is None:
try:
timeout = int(os.environ.get(_DEFAULT_TIMEOUT_ENV, _DEFAULT_TIMEOUT))
except (TypeError, ValueError):
logger.warning(
"PG executor dispatch: %s env is not a valid integer; "
"falling back to default %ss",
_DEFAULT_TIMEOUT_ENV,
_DEFAULT_TIMEOUT,
)
timeout = _DEFAULT_TIMEOUT
```
### Issue 3 of 3
backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py:289-291
The return type annotation was dropped when the dispatcher was changed from `ExecutionDispatcher` to `RoutingExecutionDispatcher`. Adding it restores static-analysis coverage and makes the duck-typed interface explicit to readers.
```suggestion
@staticmethod
def _get_dispatcher() -> "RoutingExecutionDispatcher":
"""Executor dispatcher for the executor worker.
```
Reviews (1): Last reviewed commit: "UN-3603 [GATED-FEAT] address SonarCloud ..." | Re-trigger Greptile
| @staticmethod | ||
| def _get_dispatcher() -> ExecutionDispatcher: | ||
| """Get an ExecutionDispatcher for the executor worker.""" | ||
| return ExecutionDispatcher(celery_app=celery_app) | ||
| def _get_dispatcher(): | ||
| """Executor dispatcher for the executor worker. |
There was a problem hiding this comment.
The return type annotation was dropped when the dispatcher was changed from
ExecutionDispatcher to RoutingExecutionDispatcher. Adding it restores static-analysis coverage and makes the duck-typed interface explicit to readers.
| @staticmethod | |
| def _get_dispatcher() -> ExecutionDispatcher: | |
| """Get an ExecutionDispatcher for the executor worker.""" | |
| return ExecutionDispatcher(celery_app=celery_app) | |
| def _get_dispatcher(): | |
| """Executor dispatcher for the executor worker. | |
| @staticmethod | |
| def _get_dispatcher() -> "RoutingExecutionDispatcher": | |
| """Executor dispatcher for the executor worker. |
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 289-291
Comment:
The return type annotation was dropped when the dispatcher was changed from `ExecutionDispatcher` to `RoutingExecutionDispatcher`. Adding it restores static-analysis coverage and makes the duck-typed interface explicit to readers.
```suggestion
@staticmethod
def _get_dispatcher() -> "RoutingExecutionDispatcher":
"""Executor dispatcher for the executor worker.
```
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
…uning + guard env timeout parse greptile's load-bearing finding: worker-pg-executor inherited the consumer's 30s/60s vt / health-stale defaults, so an LLM task exceeding 30s would be re-claimed mid-run once the gate ramps — double execution + token double-spend, two runs racing the same reply_key. - docker-compose: set WORKER_PG_QUEUE_CONSUMER_VT_SECONDS=3660 and HEALTH_STALE_SECONDS=3720 on worker-pg-executor (above the executor's hard EXECUTOR_TASK_TIME_LIMIT=3600), both overridable via env - executor_rpc.dispatch: guard the EXECUTOR_RESULT_TIMEOUT int parse so a misconfigured value can't raise out of dispatch() (never-raises) + test Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
greptile addressed —
|
fd38637
into
feat/UN-3445-pg-queue-integration
|
|
Greptile encountered an error while reviewing this PR. Please reach out to support@greptile.com for assistance. |
What
dispatch()can ride Postgres instead of Celery, gated by the singlepg_queue_enabledflag. The SDKExecutionDispatcherand the Celery executor worker are untouched.pg_task_resultstore (migration0010) +PgResultBackend(store + poll-wait);reply_keyon the sharedTaskPayload; consumer result-write hook; backend gate +PgExecutionDispatcher+RoutingExecutionDispatcher;worker-pg-executorrole.pg_queue_enabled— execution, scheduler, and executor now read one key (waspg_queue_execution_enabled/pg_scheduler_enabled/pg_executor_enabled).Why
Migrate the executor request-reply off Celery — the last big Celery dependency (UN-3603). Cut A = prompt-studio blocking path;
structure_tool(workflows) andide_*callbacks are later slices. Consolidating the gates to one flag keeps the rollout simple to operate (one flip = whole feature), per the rollout plan (validate across envs, then a coordinated cutover).How
pg_task_resulttable + poll-wait (PgBouncer-safe; no LISTEN/NOTIFY).worker-pg-executorruns the existingexecute_extraction(no logic fork) and the consumer writes its result/error topg_task_resultfor the blocking caller.pg_queue_enabled;PG_QUEUE_TRANSPORT_ENABLED(env) stays the master kill-switch.Can this PR break any existing features. If yes, please list possible items. If no, please explain why.
No. Gate off (default) →
_get_dispatcherreturns the unchanged CeleryExecutionDispatcherfor every mode and nopg_task_resultrow is created. New table is additive (metadata-only migration);reply_keyis optional (absent = byte-identical fire-and-forget); the consumer hook is guarded byreply_key. The flag rename is internal (the flag is read only in the backend). Pinned by zero-regression tests + 34 passing consumer tests, and validated live: a full on→PG / off→Celery cycle for API + ETL, COMPLETED both ways.Database Migrations
0010_pgtaskresult— new droppablepg_task_resulttable (additive).Env Config
None new. Gate = existing
PG_QUEUE_TRANSPORT_ENABLED+ the single Flipt flagpg_queue_enabled(off until ramp).Notes on Testing
pg_queue/tests/test_executor_rpc.py), result-backend real-PG (workers/tests/test_pg_result_backend.py). 34 existing consumer tests + the flag-rename suites (transport/ownership) green.worker-pg-executor; single-flag on→PG / off→Celery cycle for API + ETL (both COMPLETED), withqueue_message_idset on PG andtask_idset on Celery.Related Issues or PRs
UN-3603 (sub-task of UN-3536). Follow-ups: the workflow/
structure_toolexecutor path, and theide_*async callbacks.Checklist
🤖 Generated with Claude Code