UN-3602 [FIX] PG Queue — restore API-deployment timeout sync-wait on the PG path#2086
UN-3602 [FIX] PG Queue — restore API-deployment timeout sync-wait on the PG path#2086muhammad-ali-e merged 2 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom fix/UN-3602-FIX_pg_timeout_sync_waitZipstack/unstract:fix/UN-3602-FIX_pg_timeout_sync_waitCopy head branch name to clipboard
Conversation
…the PG path On the PG transport, async_execute_bin dispatch returns the bigint pg_queue_message msg_id as the handle. Writing it into WorkflowExecution.task_id (a UUIDField) raised ValueError on save, which the broad post-dispatch handler swallowed and returned EXECUTING — silently skipping the synchronous timeout poll loop. Every PG-routed API deployment ignored `timeout` and returned immediately. - add nullable BigIntegerField queue_message_id to WorkflowExecution (migration 0020; metadata-only AddField, no table rewrite, safe on large tables) - store the PG msg_id in queue_message_id; task_id stays NULL on the PG path - new WorkflowHelper._record_dispatch_handle routes the handle by transport, called inside its own try/except so post-dispatch bookkeeping can never skip the timeout wait again (the structural root cause) - Celery path unchanged Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review (PR Review Toolkit)
Reviewed with six specialized agents: Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, and Code Simplifier.
Verdict: Solid, well-scoped fix. The core change — routing the PG msg_id into its own correctly-typed queue_message_id column and isolating the post-dispatch bookkeeping in its own try/except so it can never abort the timeout sync-wait — is correct and well-commented. No blocking correctness bugs in the shipped code paths.
The findings below are improvements, not blockers, prioritized:
High / should-address
- The actual UN-3602 regression (wait must proceed when handle-recording raises) is not tested — only the routing refactor is.
- The
task_idXORqueue_message_idinvariant is unenforced at the model/DB level (the exact "illegal state representable" shape that caused this bug).
Medium
3. int(dispatch_handle) can raise ValueError, now swallowed by a generic handler with no handle-specific diagnostic.
4. transport: str is stringly-typed where a WorkflowTransport enum already exists — a typo silently misroutes to task_id.
5. update_execution_queue_message_id(queue_message_id: int) is annotated int but its body guards None.
Low
6. Add a non-numeric-PG-handle test; scope the test module docstring to "routing only (mocked)."
7. Comment precision: the historical ValueError was raised inside update_execution_task on save(), not at the call site.
Inline comments follow.
| null=True, | ||
| db_comment="task id of asynchronous execution", | ||
| ) | ||
| queue_message_id = models.BigIntegerField( |
There was a problem hiding this comment.
[Type Design · High] XOR invariant is unenforced. task_id (UUID, Celery) and queue_message_id (bigint, PG) are two nullable columns where, post-dispatch, exactly one should be set. That invariant lives only in prose (db_comment + the _record_dispatch_handle docstring) and in the happy-path helper — nothing stops another writer from setting both or neither. This is the same "illegal state is representable" shape that produced UN-3602.
Consider expressing it at the DB level so all writers are covered:
class Meta:
constraints = [
models.CheckConstraint(
name="exec_task_xor_queue_msg",
check=~(Q(task_id__isnull=False) & Q(queue_message_id__isnull=False)),
),
](Negated AND rather than strict XOR, since both are legitimately NULL pre-dispatch.) Cheapest change with the strongest guarantee.
There was a problem hiding this comment.
Deferring this one, with reasoning. A CheckConstraint is added via an AddConstraint migration that performs a validating full-table scan of workflow_execution under a SHARE ROW EXCLUSIVE lock — exactly the large-table migration cost we chose the additive-nullable-column path to avoid (the UUIDField → CharField type-change alternative was rejected for the same reason). The invariant is already guaranteed by construction here: _record_dispatch_handle branches on transport and sets exactly one column, and it's the sole writer of queue_message_id. I'd rather not add a scanning migration on this hot table in this PR — happy to file it as a standalone constraint migration (or a NOT VALID + later VALIDATE) if you want the DB-level belt-and-suspenders.
| return | ||
| if is_pg_transport(transport): | ||
| WorkflowExecutionServiceHelper.update_execution_queue_message_id( | ||
| execution_id=execution_id, queue_message_id=int(dispatch_handle) |
There was a problem hiding this comment.
[Silent Failure · Medium] int(dispatch_handle) can raise ValueError, now swallowed generically. This is the same failure class the PR fixes (a type mismatch on the PG handle), just relocated. If pg_enqueue_task ever returns a non-numeric string (version skew, future handle format, error sentinel), int() raises ValueError, which the outer except Exception (lines 667-672) catches and logs as the generic "Failed to record dispatch handle" — giving operators no hint the handle format was the problem, and the PG row silently loses its queue_message_id (cancellation/reaper later can't find the row).
Parse defensively and log the offending value:
if is_pg_transport(transport):
try:
msg_id = int(dispatch_handle)
except (TypeError, ValueError):
logger.error(
f"[{org_schema}] PG dispatch handle {dispatch_handle!r} is not a "
f"valid bigint msg_id for execution_id '{execution_id}'; "
"queue_message_id not recorded"
)
return
WorkflowExecutionServiceHelper.update_execution_queue_message_id(
execution_id=execution_id, queue_message_id=msg_id
)There was a problem hiding this comment.
Fixed in 52704e2 — defensive parse with a handle-specific logger.error (offending value included), so the generic outer guard no longer hides a malformed-handle cause. Same change addresses the greptile P2 on this line.
| cls, | ||
| *, | ||
| execution_id: str, | ||
| transport: str, |
There was a problem hiding this comment.
[Type Design · Medium] transport: str is stringly-typed at the internal helper boundary. A WorkflowTransport (str-Enum) already exists in unstract/core/.../data_models.py and is_pg_transport is the single-source predicate. With a bare str, a typo or garbage value reaches here and is_pg_transport("pg-queue") silently returns False, routing a PG handle to task_id — re-introducing exactly the bug class this PR closes. Wire-strings can stay at the edge, but normalize once and type this internal param as WorkflowTransport so the discriminant is a closed set.
There was a problem hiding this comment.
Deferring, with reasoning. resolve_transport returns str (the WorkflowTransport.value), not the enum member, so annotating this internal param as WorkflowTransport would be inaccurate unless resolve_transport's return contract changes — which ripples to every caller and the task-payload wire shape (transport is carried as a plain string), out of scope for this fix. In practice the discriminant is already closed: the only caller passes resolve_transport's output, and is_pg_transport is the single normalization predicate. Happy to do the enum-normalization as a separate refactor of resolve_transport if you'd like that hardening.
|
|
||
| @staticmethod | ||
| def update_execution_queue_message_id( | ||
| execution_id: str, queue_message_id: int |
There was a problem hiding this comment.
[Type Design · Medium] Annotation/contract mismatch. The signature says queue_message_id: int, but the body (lines 437-442) guards if queue_message_id is None. The annotation lies — either it's int | None (matching the runtime guard) or the guard is dead code. Note the only caller, _record_dispatch_handle, already returns early on a falsy handle and always passes int(dispatch_handle), so the None branch is unreachable from the real path. Either make it int | None for honesty (mirrors the defensive update_execution_task), or drop the guard. Minor: the sibling guards if not task_id (catches 0/empty) while this guards is None only — queue_message_id=0 would slip through, though 0 is not a valid msg_id.
There was a problem hiding this comment.
Fixed in 52704e2 — annotation is now int | None, matching the is None guard in the body and mirroring update_execution_task. (msg_id is a BigAutoField starting at 1, so the queue_message_id=0 slip-through is moot in practice.)
| # UUID task_id used to raise ValueError here, which bubbled to the | ||
| # post-dispatch handler and silently skipped the wait — so every | ||
| # PG-routed API deployment ignored `timeout`.) | ||
| try: |
There was a problem hiding this comment.
[Tests · High] The actual regression is untested. This inner try/except is the heart of the fix, but no test exercises it. The new tests verify the routing of _record_dispatch_handle in isolation (helper mocked); they do not verify the defense-in-depth guarantee that a raise here still lets execute_workflow_async fall through to the if timeout > -1: poll loop instead of short-circuiting to the post-dispatch handler that returns EXECUTING immediately.
Without this test, someone could move the recording back outside the try/except (or delete it) and every existing test would still pass while UN-3602 silently returns. Suggested: patch cls._record_dispatch_handle to raise, patch _dispatch_orchestrator_task / WorkflowExecution.objects.get / _get_execution_status, and assert the poll loop runs (i.e. status is polled) rather than an immediate bare EXECUTING. This is the one test that would actually catch a re-regression.
There was a problem hiding this comment.
Fixed in 52704e2 — added tests/test_execute_workflow_async_wait.py. It patches _record_dispatch_handle to raise and asserts execute_workflow_async still enters the poll loop (_get_execution_status is called) rather than short-circuiting to an immediate EXECUTING. Moving the recording back outside the inner try/except (or deleting it) fails this test — exactly the re-regression guard you described.
| fix routes the PG msg_id into its own ``queue_message_id`` (BigIntegerField) and | ||
| leaves ``task_id`` NULL on the PG path, so no bigint is ever forced into a UUID. | ||
|
|
||
| DB-free: ``WorkflowExecutionServiceHelper`` is mocked. |
There was a problem hiding this comment.
[Tests + Comment · Low] Two small follow-ups for this file:
-
Scope the docstring. It describes the production crash (UUID coercion →
ValueError→ skipped wait) in detail, but these tests mockWorkflowExecutionServiceHelperand assert routing only — they never reproduce theValueErroror the sync-wait skip. A future maintainer could read this and assume the crash itself is covered, then delete the real guard. Add: "These tests assert routing only (PG→queue_message_idas int, Celery→task_id); the UUID-coercion crash and the timeout sync-wait are not exercised here (helper is mocked)." -
Add a non-numeric-handle case.
int(dispatch_handle)on the PG path raisesValueErrorfor a malformed handle. Atest_non_numeric_pg_handleasserting that behavior (or the graceful no-record after the fix in workflow_helper.py:547) pins the contract — today it's only protected by_dispatch_orchestrator_taskalways returning a numericstr(msg_id).
There was a problem hiding this comment.
Both fixed in 52704e2 — added test_non_numeric_pg_handle_records_nothing (asserts the defensive parse records nothing and doesn't raise), and scoped the module docstring to "routing only (mocked); the UUID-coercion crash and the timeout sync-wait are covered in test_execute_workflow_async_wait.py".
| # Record the dispatch handle (Celery task_id or PG msg_id) on the row. | ||
| # Best-effort, in its OWN try/except: it must NEVER abort the | ||
| # synchronous timeout wait below. (Writing a bigint msg_id into the | ||
| # UUID task_id used to raise ValueError here, which bubbled to the |
There was a problem hiding this comment.
[Comment · Low] Minor imprecision. "used to raise ValueError here" reads as if the throw originated at this call site, but the ValueError was actually raised inside update_execution_task → execution.save() (UUID coercion). Suggest: "...used to raise ValueError inside update_execution_task (UUID coercion on save), which bubbled to the post-dispatch handler..." Helps anyone grepping for the throw origin. (The comment is otherwise an excellent "why" comment — keep it.)
There was a problem hiding this comment.
Fixed in 52704e2 — comment now reads "...used to raise ValueError inside update_execution_task (UUID coercion on save), which bubbled to the post-dispatch handler and silently skipped the wait...".
… test Addresses PR #2086 review (toolkit + greptile): - _record_dispatch_handle: @classmethod -> @staticmethod (never uses cls) - parse the PG msg_id defensively (try/except TypeError/ValueError) so a malformed/future handle format logs a specific cause instead of being absorbed by the caller's generic post-dispatch guard - update_execution_queue_message_id annotation int -> int | None (matches the None guard in the body; mirrors update_execution_task) - add the missing regression test (test_execute_workflow_async_wait.py): a raise during handle recording must NOT skip the timeout poll loop - add a non-numeric-PG-handle test; scope the routing-test docstring - comment precision: the ValueError was raised inside update_execution_task on save (UUID coercion), not at the call site Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review addressed —
|
ca73756
into
feat/UN-3445-pg-queue-integration
|
What
WorkflowExecution.queue_message_id(BigIntegerField) + migration0020.pg_queue_message.msg_idthere;task_id(UUIDField) stays NULL on the PG path.WorkflowHelper._record_dispatch_handleroutes the dispatch handle by transport, wrapped in its own try/except so post-dispatch bookkeeping can't skip the synchronous wait.Why
timeoutform field and returnedEXECUTINGimmediately. The PG dispatch handle is a bigintmsg_id; writing it into the UUIDtask_idraisedValueErroron save, which the broad post-dispatch handler swallowed (returnsEXECUTINGonce dispatched) — so the synchronoustimeoutpoll loop was skipped entirely. (UN-3602)How
task_id; PG msg_id →queue_message_id. The structural part is that handle bookkeeping now runs in its owntry/except, so a bookkeeping failure can never abort thetimeoutwait loop again.Can this PR break any existing features. If yes, please list possible items. If no, please explain why.
task_id).WorkflowExecution.task_idis informational-only — a single log reader, noAsyncResult/revokeusage, excluded from the main execution serializer, and treated as an opaque string by workers/frontend (verified across both the OSS and cloud repos). The new column is additive and nullable; nothing reads it yet.Database Migrations
0020_workflowexecution_queue_message_id—AddFieldof aBigIntegerField(null=True)with no default. On PostgreSQL 11+ this is a metadata-only change (no table rewrite, no backfill, brief catalog lock), so it is safe on large tables.Env Config
Notes on Testing
queue_message_idas int, nottask_id; Celery uuid ->task_id; empty handle -> neither). Existing_dispatch_orchestrator_tasktests are unchanged and green (7/7 total).timeout=300now blocks until completion and returnsCOMPLETEDwith the result inline (queue_message_idpopulated,task_idNULL); withouttimeoutit returns immediately (async) as before.Related Issues or PRs
Checklist
🤖 Generated with Claude Code