Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3608 [FEAT] PG Queue 9i — executor async/callback path on PG (self-chained continuations)#2097

Merged
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3608-pg-queue-executor-callbackZipstack/unstract:UN-3608-pg-queue-executor-callbackCopy head branch name to clipboard
Jun 21, 2026
Merged

UN-3608 [FEAT] PG Queue 9i — executor async/callback path on PG (self-chained continuations)#2097
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3608-pg-queue-executor-callbackZipstack/unstract:UN-3608-pg-queue-executor-callbackCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Migrates the executor RPC's async/callback path off Celery onto the PG queue, completing the executor-transport migration (the blocking path landed in 9h-c). Targets the feat/UN-3445-pg-queue-integration branch (not main).

Today dispatch_with_callback (Prompt Studio run/index/extract, lookups) always rides Celery link/link_error. This routes it through Postgres instead, via §5 fire-and-forget self-chaining: the executor consumer, after running execute_extraction, enqueues the continuation itself.

How

  • Routingdispatch_async / dispatch_with_callback now pick PG-vs-Celery per call via the single pg_queue_enabled flag (same gate as the rest of the feature), in both backend and workers executor_rpc.py.
  • Self-chaining — the on_success/on_error Celery Signatures are translated to a serialisable ContinuationSpec (task_name/kwargs/queue) carried in the task payload. After the executor runs, the consumer self-chains the matching continuation onto the callback queue: the result dict prepended on success, the dispatch task_id on error (Celery link_error parity). It acks regardless of chain outcome — a vt-redelivery would re-run the executor (LLM double-spend); a chain failure is logged loud and best-effort.
  • Consumer — new gated worker-pg-ide-callback service drains the ide_callback queue (compose profile pg-queue).
  • Wire contractContinuationSpec + on_success/on_error/task_id added to the shared TaskPayload in unstract.core (all optional, set only when present).

Zero-regression

Every new path is gated and fails closed to Celery. Gate OFF is byte-identical to the prior behaviour: dispatch_with_callback delegates to the unchanged SDK ExecutionDispatcher (send_task(..., link=, link_error=)), no continuation payload, no self-chain. Call sites are unchanged — they keep passing Celery Signatures; the dispatcher translates them only on the PG branch.

Tests

  • payload set/unset (optional keys), signature → spec translation (incl. missing-queue fail-fast)
  • consumer self-chain success/error branches + chain-enqueue-failure-still-acks guard
  • routing gate ON/OFF for all three dispatch modes (backend + workers)

Dev-tested end-to-end (flag ON)

backend dispatch_with_callback → PG → worker-pg-executor execute_extraction → self-chain → worker-pg-ide-callback ide_prompt_complete → emit-websocket 200, output persisted — zero RabbitMQ. Non-regression confirmed with a flag ON/OFF A/B (identical behaviour).

After this slice the executor transport is fully on PG; RabbitMQ can be decommissioned next.

🤖 Generated with Claude Code

…-chained continuations)

Migrate the executor RPC's async/callback path off Celery onto the PG queue,
completing the executor-transport migration (the blocking path landed in 9h-c).

- dispatch_async / dispatch_with_callback now route PG-vs-Celery per call via the
  single pg_queue_enabled flag (was always Celery). Backend + workers.
- §5 fire-and-forget self-chaining: on_success/on_error Celery Signatures are
  translated to serialisable ContinuationSpecs carried in the payload; after the
  executor runs execute_extraction the consumer self-chains the matching
  continuation onto the callback queue (result prepended on success, dispatch
  task_id on error), acking regardless to avoid an LLM double-spend.
- New gated worker-pg-ide-callback consumer drains the ide_callback queue
  (Prompt Studio run/index/extract, lookups). Compose profile pg-queue.
- ContinuationSpec + on_success/on_error/task_id added to the shared TaskPayload
  wire contract (unstract.core).

Zero-regression: every new path is gated and fails closed to Celery; gate OFF is
byte-identical to the prior Celery behaviour. Call sites unchanged — they keep
passing Celery Signatures; the dispatcher translates only on the PG branch.

Tests: payload set/unset, signature->spec translation, consumer self-chain
success/error + enqueue-failure guard, routing gate (backend + workers).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6f88ed26-efb2-45c9-8048-267c0a1d052d

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3608-pg-queue-executor-callback

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review — PG Queue 9i (executor async/callback path)

Ran the PR Review Toolkit (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier) over ab80a494..7afc6d19. Overall the implementation is well-structured and unusually well-commented, and the happy-path test coverage is solid. The inline comments below are the deduplicated findings, prioritised.

Critical (1): the early-drop branches (malformed / poison / unknown-task) never chain on_error, so a dispatch_with_callback caller hangs forever with no user-facing error.

Important (5): the reply_key XOR callback invariant is documented but unenforced; the PG on_error callback can't recover the real executor error via AsyncResult; success-result self-chaining and the backend producer don't JSON-coerce callback payloads (silent drop / insert error on UUID/datetime); _signature_to_spec accepts an empty task_name and silently drops positional callback args.

Suggestions: _DispatchHandle + _signature_to_spec are duplicated byte-for-byte across backend/workers; returned ExecutionResult(success=False) routes to on_success; over-broad swallow in _continuation_org; consumer helpers re-widen typed payloads to bare dict; minor comment/jargon polish.

Test gaps (no inline anchor — the branches aren't in this diff): add coverage for on_error firing on the drop branches, the both-keys-present precedence, and dispatch_async/dispatch_with_callback enqueue-failure propagation.

reply_key,
)
return
if on_error:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical — on_error callback dropped on the early-drop branches. This new block correctly self-chains on_error when the run raises, but the three early-drop branches above — malformed payload, poison/max_attempts exceeded, and unknown/unregistered task — only call _fail_reply(reply_key, ...) and then delete(). For a dispatch_with_callback message (no reply_key, has on_error) those branches delete the row and never chain on_error. A poison executor task is exactly the realistic failure mode, and it's the case where the user most needs the error surfaced; instead the HTTP-202 caller waits on a WebSocket error event that never arrives (run appears to hang, no terminal state). This is asymmetric with the reply_key path, which does fail those branches.

Fix: in each early-drop branch, in addition to _fail_reply, chain the error continuation when present, e.g. a helper _fail_dispatch(payload, error=...) that does _fail_reply(reply_key, ...) and if on_error: self._chain_continuation(on_error, prepend=payload.get('task_id') or '', payload=payload) before the delete(). Add tests for the unknown-task / poison / malformed cases with on_error set.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (42862651b). Added a shared _fail_dispatch(payload, error=...) used by the run-raised path and all three early-drop branches (malformed / poison / unknown task), so a dispatch_with_callback failure always self-chains on_error. Tests cover all three drop branches with on_error set.

# regardless — same anti-double-spend reasoning as reply_key (a
# vt-redelivery would re-run the executor / re-spend LLM tokens).
# _chain_continuation is best-effort, so the ack never wedges.
self._chain_continuation(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important — PG on_error loses the real executor error text. prepend=payload.get('task_id') hands the callback (e.g. ide_index_error) the dispatch task_id as failed_task_id, but that callback resolves the actual message via AsyncResult(failed_task_id, app=app).result. On the PG path the executor ran eagerly via task.apply and never wrote to a Celery result backend under that id, so AsyncResult(...).result is empty and the callback degrades to its generic default error string. Parity is structural (right positional shape) but not behavioural — the real error is dropped.

Fix: the consumer has the exception in hand at the failure branch — carry the real error text into the on_error continuation (e.g. prepend f"{type(exc).__name__}: {exc}", or pass it via the continuation kwargs), or store the error under task_id in a backend the callback can read.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (42862651b). The consumer now carries the real error text into callback_kwargs['error'], and _get_task_error prefers it over the (empty-on-PG) AsyncResult lookup. Test asserts RuntimeError: boom reaches the on_error continuation.

# Async/callback: self-chain the success continuation onto the callback
# queue before the ack — the §5 hand-off (PG analogue of Celery's link).
# Best-effort (never raises): a chain failure logs + still acks, so the
# executor is not re-run (LLM double-spend); the callback is lost.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important — success callback silently dropped on non-JSON-safe results. prepend=eager.result is enqueued as args=[eager.result]; client.send serialises with plain json.dumps(message) (no default=str). An executor result dict containing a UUID/datetime — exactly the kind of payload these results carry — makes json.dumps raise TypeError, which _chain_continuation's broad except swallows ("FAILED to self-chain"), so the success callback and its user-facing WebSocket event are lost. The Celery path serialises via its configured serializer and has no such gap.

Fix: JSON-coerce the prepended result before enqueuing (mirror backend/pg_queue/producer.py:_json_safe), or give client.send's json.dumps a default=str.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (42862651b). Added _json_safe (json round-trip with default=str) applied to the prepended value in _chain_continuation. Test uses a task returning a datetime and asserts the chained arg is coerced + plain-json serialisable.

message.msg_id,
reply_key,
)
elif on_success:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion — a returned ExecutionResult(success=False) chains on_success. The success/error split keys purely on whether task.apply(...) raised, but execute_extraction can return a failed ExecutionResult (success=False, via ExecutionResult.failure(...)) without raising. That takes this elif on_success branch and chains the success continuation with a failure payload; on_error never fires. This matches Celery semantics (link_error also only fires on raised exceptions), so it's parity rather than a regression — but given the user-facing impact, either inspect eager.result.success is False and route to on_error, or add a comment documenting that returned-failure deliberately follows the success path, so a future reader doesn't mistake it for the drop-branch bug above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented (42862651b). Added a comment on the elif on_success branch: "success" == did-not-raise (Celery link parity); a returned failed ExecutionResult deliberately follows on_success — distinct from the drop-branch bug above.

self._result_backend = PgResultBackend()
self._result_backend.store_result(reply_key, result=result, error=error)

def _chain_continuation(self, spec: dict, *, prepend: object, payload: dict) -> None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion — restore the typed contract at the consumer boundary. _chain_continuation(self, spec: dict, *, prepend, payload: dict) re-widens to bare dict, discarding the new ContinuationSpec / TaskPayload types the PR added precisely for these shapes. A key rename across the producer↔consumer boundary (e.g. task_namename) wouldn't be caught by the type checker. Annotate spec: ContinuationSpec, payload: TaskPayload (imports already available in this tree).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (42862651b). _chain_continuation(spec: ContinuationSpec, *, payload: TaskPayload, ...) and _continuation_org(payload: TaskPayload) now carry the real types.

Comment thread backend/pg_queue/producer.py Outdated
if reply_key is not None:
message["reply_key"] = reply_key
if on_success is not None:
message["on_success"] = on_success

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important — on_success/on_error bypass _json_safe. args/kwargs go through _json_safe(...) above (lines 96-97), but the continuation specs (each with a nested kwargs dict) are written into the JSONField verbatim. If a callback's kwargs carry a UUID/datetime, PgQueueMessage.objects.create raises at insert time — and unlike the worker path this is caller-visible at dispatch. Fix: message["on_success"] = _json_safe(on_success) (and likewise on_error).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (42862651b). message['on_success'] = _json_safe(on_success) (and on_error). Test passes a UUID in callback_kwargs and asserts it's coerced.

"PG self-chaining routes by the row's queue and cannot default it"
)
return ContinuationSpec(
task_name=sig.task,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important — _signature_to_spec validates queue but not task, and silently drops positional args. Two gaps with the same fail-fast philosophy already applied to queue:

  1. task_name=sig.task is read with no guard. If sig.task is empty/None (a malformed signature), the continuation enqueues with a falsy task_name and is dropped far downstream as "malformed payload: missing task_name" — a lost callback with a misleading log. Add if not getattr(sig, 'task', None): raise ValueError(...).
  2. ContinuationSpec has no args, and only sig.kwargs is read. A Celery Signature may carry positional args; on the PG branch they're silently dropped, so the callback runs with a different arg list than on Celery — violating the "call sites unchanged" contract. Today's call sites are kwargs-only (latent), but make it loud: if getattr(sig, 'args', None): raise ValueError("callback signature has positional args; PG self-chaining supports kwargs only").

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (42862651b, in the now-shared unstract.core.execution_dispatch). Guards both: raises on a falsy task, and raises on positional args (kwargs-only callbacks). Tests for each.

_DEFAULT_TIMEOUT = 3600


class _DispatchHandle:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion — _DispatchHandle and _signature_to_spec are duplicated byte-for-byte with backend/pg_queue/executor_rpc.py. Both files now carry identical copies of this class and the _signature_to_spec helper (and near-identical dispatch_async/dispatch_with_callback bodies). The new translation logic — especially the correctness-bearing "fail fast on missing queue" rule — can now drift silently between the two copies. ContinuationSpec/PgTaskStatus were hoisted into unstract.core for exactly this reason; _DispatchHandle and _signature_to_spec (no Django/worker deps) belong there too. Import both copies from one shared definition.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — extracted DispatchHandle + signature_to_continuation into unstract.core.execution_dispatch (commit 0f8b57151); both mirrors import them and the duplicated TestSignatureToSpec is gone in favour of one shared test. (Full mirror retirement remains UN-3607.)

the prompt-studio call sites keep passing Celery ``Signature``s unchanged and
the dispatcher translates them only on the PG branch.

The consumer prepends the chained argument the callback expects (the executor

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion — "before kwargs" is slightly misleading. _chain_continuation passes the chained value as a separate positional args=[prepend] list and kwargs as a distinct mapping — they aren't concatenated into one positional sequence, so "before kwargs" reads as if kwargs were positional. Reword to e.g.: "… passes the chained argument as the callback's first positional arg, alongside the spec's kwargs — mirroring how Celery's link prepends the parent task's return value."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (42862651b). Reworded to '… passes the chained value as the callback's first positional arg, alongside the spec's kwargs as a distinct mapping …'.

Comment thread docker/docker-compose.yaml Outdated
# IDE callback consumer (③c) — drains the ``ide_callback`` queue the executor
# self-chains onto for async/callback dispatch (Prompt Studio run/index/extract,
# lookups). Registers the ide_callback worker's tasks (ide_prompt_complete,
# ide_index_complete, extraction_complete, …). Dark until dispatch_with_callback

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion — "Dark" is jargon for a compose file. Clear to the authors (idle / no traffic) but opaque to an operator scanning services. Suggest: "Idle (receives no work) until dispatch_with_callback routes to PG, i.e. the gating flag is on". Also worth confirming: the sibling worker-pg-callback/worker-pg-scheduler declare depends_on: rabbitmq; this service doesn't. Likely fine since ide_callback tasks only do API writes + ws emits, but verify none dispatch onward to a Celery broker.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (42862651b). Reworded 'Dark' → 'Idle (receives no work) until …'. Confirmed no broker dep needed: grepped ide_callback/tasks.py — no .delay/.apply_async/send_task/signature/chord; these callbacks only do API writes + ws emits. Added that note to the compose comment.

…ct.core

Cuts the new-code duplication SonarCloud flagged on the backend/workers
executor_rpc.py mirror by moving the two genuinely transport-agnostic pieces
of the async/callback path into unstract.core (alongside ContinuationSpec):

- DispatchHandle (the .id-only AsyncResult duck-type)
- signature_to_continuation (Celery Signature -> ContinuationSpec)

Both mirrors now import them instead of redefining; the duplicated
TestSignatureToSpec is removed from both test suites in favour of one shared
test. The dispatch_async/dispatch_with_callback method bodies stay mirrored
(they genuinely differ by transport — Django ORM vs psycopg2); retiring that
residue is the separate shared-package follow-up (UN-3607).

No behaviour change: pure code movement, gate and contract unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud duplication on new code — addressed (0f8b57151).

Most of the flagged duplication is the intentional backend ↔ workers executor_rpc.py mirror (same finding as UN-3605; the two differ only by transport — Django ORM vs psycopg2 — and the full de-dup is the shared-package follow-up UN-3607, which can't land yet without a circular sdk1 → core import).

But I've cut the new-code slice now by lifting the two genuinely transport-agnostic helpers I added into unstract.core (alongside ContinuationSpec):

  • DispatchHandle (the .id-only AsyncResult duck-type)
  • signature_to_continuation (Celery SignatureContinuationSpec)

Both mirrors import them instead of redefining, and the duplicated TestSignatureToSpec is removed from both suites in favour of one shared test. Net −34 lines. The dispatch_async/dispatch_with_callback method bodies stay mirrored (genuinely transport-specific) — that residue is UN-3607.

No behaviour change: pure code movement, gate + contract unchanged. Backend (20) + workers (28) executor-RPC suites green; ruff/pre-commit clean.

… json-safety, guards

Toolkit review on #2097:
- Critical: chain on_error on the early-drop branches too (malformed / poison /
  unknown task), via a shared _fail_dispatch — a dispatch_with_callback failure
  always reaches its on_error callback, not only when the task body raised.
- Carry the real error text to on_error via callback_kwargs['error'] (PG has no
  Celery AsyncResult for the callback to recover it from); error callbacks prefer it.
- JSON-coerce the self-chained prepend (UUID/datetime in an executor result no
  longer makes client.send's plain json.dumps raise and silently drop the callback).
- Enforce the reply_key XOR on_success/on_error invariant in to_payload + enqueue_task.
- _json_safe the continuation specs in the backend producer.
- signature_to_continuation: guard task name + reject positional-arg callbacks.
- Restore ContinuationSpec/TaskPayload types at the consumer boundary; drop the
  over-broad except in _continuation_org; document the returned-failure->on_success
  parity; reword the ContinuationSpec doc and the compose "Dark" comment.

Tests for every fix. Backend 35, workers 76 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round addressed42862651b (fixes) + 0f8b57151 (the duplication extraction). Per-thread replies on each.

Critical

  • on_error is now self-chained on the early-drop branches too (malformed / poison / unknown task), via a shared _fail_dispatch — a dispatch_with_callback failure always reaches its on_error callback, not only when the task body raised.

Important

  • on_error now carries the real error text (callback_kwargs['error']; _get_task_error prefers it over the empty-on-PG AsyncResult lookup).
  • self-chained prepend is JSON-coerced (UUID/datetime no longer silently drops the callback).
  • reply_key XOR on_success/on_error enforced in to_payload + enqueue_task.
  • backend producer _json_safes the continuation specs.
  • signature_to_continuation guards the task name + rejects positional-arg callbacks.

Suggestions

  • typed _chain_continuation/_continuation_org boundaries; dropped the over-broad except; documented the returned-failure→on_success parity; reworded the ContinuationSpec doc and the compose "Dark" comment (+ confirmed no broker dep — ide_callback tasks do no onward Celery dispatch).
  • duplication: DispatchHandle + signature_to_continuation lifted to unstract.core (one shared test); the dispatcher method-body residue is the UN-3607 shared-package follow-up.

Tests added for every fix. Backend 35, workers 76 green; ruff/pre-commit clean. SonarCloud new-code duplication cut by the extraction (net −34 there).

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 21, 2026 06:54
@greptile-apps

greptile-apps Bot commented Jun 21, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

Migrates the executor RPC's async/callback path (Prompt Studio run/index/extract, lookups) from Celery link/link_error to the PG queue via §5 fire-and-forget self-chaining — completing the executor transport migration. Every new path is gated behind pg_queue_enabled and fails closed to Celery when the flag is off.

  • Wire contract: ContinuationSpec and on_success/on_error/task_id fields added (all NotRequired) to TaskPayload in unstract.core; backend producer and worker to_payload updated in lockstep with a mutual-exclusivity guard against reply_key.
  • Self-chaining: After the executor consumer runs execute_extraction, it enqueues the success/error continuation via _chain_continuation; early-drop branches (malformed, unknown, poison) route through _fail_dispatch so on_error always fires even if the task never ran.
  • New service: worker-pg-ide-callback drains the ide_callback queue (profile pg-queue), with VT/health-stale thresholds set above a slow internal-API write; port 8100:8090 does not conflict with worker-pg-executor (8099:8090).

Confidence Score: 4/5

Safe to merge with one fix: the exception-handler log in consumer.py claims errors are always surfaced to the caller when they are silently dropped for on_success-only dispatches.

The self-chaining machinery is solid and well-tested. One log message in the exception handler reads "surfaced via reply/on_error + acked" for ALL guarded dispatch shapes, including on_success-only failures where _fail_dispatch is a no-op and the caller gets no terminal event. An operator debugging a hung prompt-studio session would see that message and wrongly assume the frontend was notified, masking the real gap. Every other aspect of the PR — mutual exclusivity guards, JSON-coercion, fallback from on_success to on_error on enqueue failure, early-drop branches, VT settings, and gating — is correct and thoroughly tested.

workers/queue_backend/pg_queue/consumer.py — the exception handler log message at line 264

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/consumer.py Self-chaining (§5) added: success/error continuations are enqueued after task execution. The _fail_dispatch helper correctly surfaces errors on both the request-reply and on_error paths, but the exception-handler log message incorrectly states the error was "surfaced via reply/on_error" for on_success-only dispatches where the failure is silently dropped.
unstract/core/src/unstract/core/execution_dispatch.py New shared module: DispatchHandle (minimal AsyncResult duck-type) and signature_to_continuation (Celery Signature → ContinuationSpec). Fail-fast on missing task, missing queue, or positional args. Clean, no issues.
unstract/core/src/unstract/core/data_models.py Adds ContinuationSpec TypedDict and on_success/on_error/task_id NotRequired fields to TaskPayload. Clean, backward-compatible wire format change.
backend/pg_queue/executor_rpc.py dispatch_async and dispatch_with_callback added to PgExecutionDispatcher; RoutingExecutionDispatcher updated to gate both on pg_queue_enabled flag. No issues found.
workers/queue_backend/pg_queue/executor_rpc.py Mirror of the backend PgExecutionDispatcher for the worker context. _enqueue refactored to keyword-only optional params; dispatch_async and dispatch_with_callback added. No issues.
workers/ide_callback/tasks.py _get_task_error now takes an explicit parameter; call sites pass cb.get("error") so PG-path errors (no AsyncResult backend) are surfaced correctly. explicit is not None check applied per prior review.
docker/docker-compose.yaml New worker-pg-ide-callback service on port 8100:8090 (no conflict with worker-pg-executor's 8099:8090). VT/health-stale vars set to 120/180s per prior review feedback.
backend/pg_queue/producer.py enqueue_task extended with on_success/on_error/task_id; mutual exclusivity with reply_key validated; continuation specs JSON-coerced via _json_safe. No issues.
workers/queue_backend/pg_queue/task_payload.py to_payload extended with on_success/on_error/task_id; mutual exclusivity check mirrors the backend producer. Clean.
workers/tests/test_pg_queue_consumer.py TestSelfChain covers success/error self-chaining, early-drop on_error delivery, non-JSON-safe result coercion, on_success-only failure ack, fallback-to-on_error on enqueue failure, and no-continuation fire-and-forget. Thorough.
workers/tests/test_executor_rpc.py Comprehensive new tests for PG async/callback wiring and shared dispatch helpers. No issues.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant FE as Frontend
    participant BE as Backend (dispatch_with_callback)
    participant PGQ as pg_queue_message
    participant EX as worker-pg-executor
    participant CBQ as ide_callback queue
    participant CB as worker-pg-ide-callback
    participant WS as WebSocket

    FE->>BE: Prompt Studio run/index/extract
    BE->>PGQ: enqueue execute_extraction (on_success, on_error, task_id)
    BE-->>FE: HTTP 202 (task_id via DispatchHandle.id)

    EX->>PGQ: SKIP LOCKED claim
    PGQ-->>EX: message (payload with on_success/on_error)

    alt Task succeeds
        EX->>EX: task.apply() → eager.result
        EX->>CBQ: "_chain_continuation(on_success, prepend=result)"
        EX->>PGQ: DELETE (ack)
        alt on_success enqueue fails
            EX->>CBQ: _fail_dispatch → _chain_continuation(on_error)
        end
    else Task raises
        EX->>EX: task.apply() → Exception
        EX->>CBQ: "_fail_dispatch → _chain_continuation(on_error, error=exc_str)"
        EX->>PGQ: DELETE (ack — avoids LLM re-run)
    end

    CB->>CBQ: SKIP LOCKED claim
    CBQ-->>CB: continuation payload (task_name, args, kwargs)
    CB->>CB: ide_prompt_complete / ide_prompt_error
    CB->>WS: emit event → Frontend
    CB->>CBQ: DELETE (ack)
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant FE as Frontend
    participant BE as Backend (dispatch_with_callback)
    participant PGQ as pg_queue_message
    participant EX as worker-pg-executor
    participant CBQ as ide_callback queue
    participant CB as worker-pg-ide-callback
    participant WS as WebSocket

    FE->>BE: Prompt Studio run/index/extract
    BE->>PGQ: enqueue execute_extraction (on_success, on_error, task_id)
    BE-->>FE: HTTP 202 (task_id via DispatchHandle.id)

    EX->>PGQ: SKIP LOCKED claim
    PGQ-->>EX: message (payload with on_success/on_error)

    alt Task succeeds
        EX->>EX: task.apply() → eager.result
        EX->>CBQ: "_chain_continuation(on_success, prepend=result)"
        EX->>PGQ: DELETE (ack)
        alt on_success enqueue fails
            EX->>CBQ: _fail_dispatch → _chain_continuation(on_error)
        end
    else Task raises
        EX->>EX: task.apply() → Exception
        EX->>CBQ: "_fail_dispatch → _chain_continuation(on_error, error=exc_str)"
        EX->>PGQ: DELETE (ack — avoids LLM re-run)
    end

    CB->>CBQ: SKIP LOCKED claim
    CBQ-->>CB: continuation payload (task_name, args, kwargs)
    CB->>CB: ide_prompt_complete / ide_prompt_error
    CB->>WS: emit event → Frontend
    CB->>CBQ: DELETE (ack)
Loading

Fix All in Claude Code

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
workers/queue_backend/pg_queue/consumer.py:264-269
**Misleading "surfaced" log for on_success-only failures**

The log message fires for every `reply_key or on_success or on_error` branch, claiming the error was "surfaced via reply/on_error + acked". But when only `on_success` is set (`on_error` absent), `_fail_dispatch` is a no-op — no reply channel exists and nothing is chained. The executor message is correctly acked (to avoid an LLM re-run), but the failure is silently dropped from the caller's perspective. An operator debugging a hung prompt-studio session would see "surfaced via reply/on_error" and wrongly conclude the frontend was notified, derailing the investigation.

This case is explicitly tested and accepted as correct behavior, but the log should distinguish between the cases. Replace the single `logger.exception` with two messages: one for when `reply_key or on_error` is set (error was surfaced) and one for the on_success-only failure (acked silently, caller has no terminal event).

Reviews (2): Last reviewed commit: "UN-3608 [FIX] review round 2 — on_succes..." | Re-trigger Greptile

Comment thread workers/ide_callback/tasks.py Outdated
Comment thread docker/docker-compose.yaml

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second-pass review (PG executor async/callback path). The items from the first pass are genuinely addressed in 42862651. Two new correctness/silent-failure findings on the dispatch path below; the rest (latent type-design hardening, a docker-compose comment-vs-depends_on mismatch, and a critical test gap on _get_task_error's explicit-error precedence) are in the summary rather than posted inline.

if reply_key:
# Record the failure so the waiting caller gets a definitive
# result instead of blocking to its timeout, then ACK. We do NOT
if reply_key or on_error:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness — an on_success-only callback dispatch that raises falls through to vt-redelivery and re-runs the executor. The guard here is if reply_key or on_error:, but dispatch_with_callback(on_success=..., on_error=None) is a permitted shape: both kwargs default to None and signature_to_continuation(None) returns None, so an on_success-only callback payload is fully reachable. When such a dispatch's executor raises, this condition is false and control drops into the fire-and-forget branch below, leaving the row for vt-expiry redelivery — exactly the costly re-run (LLM double-spend) the comment on lines 252-258 says this path must avoid — and the caller never receives a terminal event.

Broaden the guard to if reply_key or on_success or on_error: so any reply/callback dispatch acks-on-failure rather than redelivering, or enforce in dispatch_with_callback that on_error is required whenever on_success is set.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (ec16fefda). Broadened the failure guard to if reply_key or on_success or on_error: — an on_success-only callback dispatch that raises now ACKs instead of vt-redelivering (no executor re-run). Test: on_success-only + raise → delete called, not redelivered.

# path — the on_success callback receives the failed result and renders
# it — exactly as on Celery. This is NOT the missing-on_error drop bug
# the early-drop branches handle.
self._chain_continuation(on_success, prepend=eager.result, payload=payload)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent failure — a failed on_success enqueue leaves the caller with no terminal event. On the success path _chain_continuation is best-effort and never raises: if the enqueue fails (transport/DB error, connection drop, a bad spec["queue"]), it logs and the executor message is acked with no fallback. Since the on_success callback (ide_index_complete / ide_prompt_complete / extraction_complete) is what emits the WebSocket event the prompt-studio UI blocks on after its 202, a dropped success continuation leaves the user with neither a success nor an error event — the session hangs indefinitely, after the LLM spend already occurred.

This is distinct from the now-fixed non-JSON-safe-result drop: _json_safe removes one cause of the enqueue raising, but a transport/DB failure still triggers the same silent drop with no recovery channel. Consider having _chain_continuation return whether it enqueued, and on a success-path failure fall back to _fail_dispatch(payload, error="result delivery failed; see worker logs") so the session still terminates. At minimum, carry run_id/task_id/org_id into the failure log so the stranded session is correlatable later.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (ec16fefda). _chain_continuation now returns whether it enqueued; on a success-path enqueue failure we fall back to _fail_dispatch(payload, error='result delivery failed; see worker logs') so the caller gets a terminal on_error event instead of hanging. The failure log now carries run_id/task_id/org for correlation. Test asserts the on_error fallback fires when the on_success send fails.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(duplicate of the review below — submitted twice by mistake; see the grouped findings in the other second-pass review)

…queue fallback, ide-callback vt

- Correctness: broaden the failure guard to `reply_key or on_success or on_error`
  so an on_success-only callback dispatch that raises ACKs instead of falling
  through to vt-redelivery (which would re-run the executor / re-spend LLM tokens).
- Silent failure: _chain_continuation now returns whether it enqueued; on a
  success-path enqueue failure, fall back to on_error so the HTTP-202 caller gets
  a terminal event instead of hanging. Failure log carries run_id/task_id/org.
- ide_callback _get_task_error: `if explicit is not None` (don't discard an
  explicit "" error) — greptile.
- worker-pg-ide-callback: set VT_SECONDS=120 / HEALTH_STALE=180 (overridable) so a
  slow internal-API write can't be re-claimed mid-run and double-emit — greptile.

Tests: on_success-only-raise acks (no redelivery); success-enqueue-failure falls
back to on_error. Workers self-chain + ide_callback green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round 2 addressedec16fefda. Per-thread replies on each.

  • Correctness (on_success-only redelivery): broadened the failure guard to reply_key or on_success or on_error, so an on_success-only callback dispatch that raises ACKs instead of vt-redelivering (no executor re-run / LLM double-spend).
  • Silent failure (dropped success enqueue): _chain_continuation now reports whether it enqueued; on a success-path failure we fall back to on_error so the HTTP-202 caller still gets a terminal event instead of hanging, and the failure log carries run_id/task_id/org.
  • greptile: _get_task_errorif explicit is not None; worker-pg-ide-callback now sets VT=120 / health-stale=180 (overridable) so a slow internal-API write can't be re-claimed mid-run and double-emit.

Tests added for the two correctness fixes. Workers self-chain + ide_callback green.

On the SonarCloud gate: new-code duplication is now 10.4% (was 30.5%) after lifting the shared helpers — the remainder is the backend↔workers executor_rpc.py dispatcher-body mirror, the same intentional duplication accepted on UN-3605, whose full removal is the shared-package follow-up UN-3607 (can't land here without a circular sdk1 → core import).

@muhammad-ali-e muhammad-ali-e merged commit 46c53d1 into feat/UN-3445-pg-queue-integration Jun 21, 2026
4 of 6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3608-pg-queue-executor-callback branch June 21, 2026 07:31
@sonarqubecloud

Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
9.8% Duplication on New Code (required ≤ 3%)

See analysis details on SonarQube Cloud

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.