Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3602 [FIX] PG Queue — restore API-deployment timeout sync-wait on the PG path#2086

Merged
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
fix/UN-3602-FIX_pg_timeout_sync_waitZipstack/unstract:fix/UN-3602-FIX_pg_timeout_sync_waitCopy head branch name to clipboard
Jun 19, 2026
Merged

UN-3602 [FIX] PG Queue — restore API-deployment timeout sync-wait on the PG path#2086
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
fix/UN-3602-FIX_pg_timeout_sync_waitZipstack/unstract:fix/UN-3602-FIX_pg_timeout_sync_waitCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

  • Add nullable WorkflowExecution.queue_message_id (BigIntegerField) + migration 0020.
  • PG path stores the pg_queue_message.msg_id there; task_id (UUIDField) stays NULL on the PG path.
  • New WorkflowHelper._record_dispatch_handle routes the dispatch handle by transport, wrapped in its own try/except so post-dispatch bookkeeping can't skip the synchronous wait.

Why

  • Every PG-routed API deployment ignored the timeout form field and returned EXECUTING immediately. The PG dispatch handle is a bigint msg_id; writing it into the UUID task_id raised ValueError on save, which the broad post-dispatch handler swallowed (returns EXECUTING once dispatched) — so the synchronous timeout poll loop was skipped entirely. (UN-3602)

How

  • Each id lives in its own correctly-typed column: Celery uuid → task_id; PG msg_id → queue_message_id. The structural part is that handle bookkeeping now runs in its own try/except, so a bookkeeping failure can never abort the timeout wait loop again.

Can this PR break any existing features. If yes, please list possible items. If no, please explain why.

  • No. The Celery path is unchanged (still writes task_id). WorkflowExecution.task_id is informational-only — a single log reader, no AsyncResult/revoke usage, excluded from the main execution serializer, and treated as an opaque string by workers/frontend (verified across both the OSS and cloud repos). The new column is additive and nullable; nothing reads it yet.

Database Migrations

  • 0020_workflowexecution_queue_message_idAddField of a BigIntegerField(null=True) with no default. On PostgreSQL 11+ this is a metadata-only change (no table rewrite, no backfill, brief catalog lock), so it is safe on large tables.

Env Config

  • None.

Notes on Testing

  • 3 new unit tests pin the handle routing by transport (PG msg_id -> queue_message_id as int, not task_id; Celery uuid -> task_id; empty handle -> neither). Existing _dispatch_orchestrator_task tests are unchanged and green (7/7 total).
  • Live-validated on a running stack (PG path, gate on): an API deployment with timeout=300 now blocks until completion and returns COMPLETED with the result inline (queue_message_id populated, task_id NULL); without timeout it returns immediately (async) as before.

Related Issues or PRs

  • UN-3602 (sub-task of UN-3536). Builds on the PG-queue transport work on the feat/UN-3445-pg-queue-integration branch.

Checklist

  • I have added an appropriate PR title and description
  • My code follows the style guidelines of this project
  • I have performed a self-review of my code
  • I have commented on my code, particularly in hard-to-understand areas
  • I have added tests that prove my fix is effective
  • New and existing unit tests pass locally with my changes
  • I have checked my code and corrected any misspellings

🤖 Generated with Claude Code

…the PG path

On the PG transport, async_execute_bin dispatch returns the bigint
pg_queue_message msg_id as the handle. Writing it into WorkflowExecution.task_id
(a UUIDField) raised ValueError on save, which the broad post-dispatch handler
swallowed and returned EXECUTING — silently skipping the synchronous timeout
poll loop. Every PG-routed API deployment ignored `timeout` and returned
immediately.

- add nullable BigIntegerField queue_message_id to WorkflowExecution (migration
  0020; metadata-only AddField, no table rewrite, safe on large tables)
- store the PG msg_id in queue_message_id; task_id stays NULL on the PG path
- new WorkflowHelper._record_dispatch_handle routes the handle by transport,
  called inside its own try/except so post-dispatch bookkeeping can never skip
  the timeout wait again (the structural root cause)
- Celery path unchanged

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a81d2cd8-0c0d-4d19-b2f8-ea874b6c4a25

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/UN-3602-FIX_pg_timeout_sync_wait

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 19, 2026 05:53
@greptile-apps

greptile-apps Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes a silent bug on the PG-queue path where every API deployment with a timeout field ignored it and returned EXECUTING immediately. The root cause was that the PG dispatch handle (a bigint msg_id) was being written into the UUID task_id field, raising ValueError on save, which the broad post-dispatch except handler absorbed — silently skipping the entire synchronous poll loop.

  • New queue_message_id column (BigIntegerField, nullable) on WorkflowExecution so the PG handle has a correctly-typed home; migration 0020 is a metadata-only AddField safe on large PostgreSQL tables.
  • _record_dispatch_handle static method routes the dispatch handle by transport (PG bigint → queue_message_id, Celery UUID → task_id), and is now wrapped in its own inner try/except inside execute_workflow_async, guaranteeing a bookkeeping failure can never abort the timeout wait loop.
  • Three new unit tests cover PG routing, Celery routing, empty handle, malformed handle, and the regression guarantee that a bookkeeping raise still enters the poll loop.

Confidence Score: 5/5

Safe to merge — the fix is additive (nullable column, new code paths only), the Celery path is untouched, and the structural isolation of bookkeeping from the wait loop is proven by a purpose-built regression test.

The change correctly separates handle routing into typed columns, isolates bookkeeping in its own try/except, and is covered by four targeted unit tests plus a regression test that directly exercises the previously broken code path. The migration is metadata-only and the new field is not referenced by any serializer or existing reader.

No files require special attention.

Important Files Changed

Filename Overview
backend/workflow_manager/workflow_v2/workflow_helper.py Core fix: adds _record_dispatch_handle to route PG/Celery handles to the right typed column, isolated in its own try/except so a bookkeeping failure never blocks the timeout poll loop.
backend/workflow_manager/workflow_v2/execution.py Adds update_execution_queue_message_id — a clean, well-guarded static method that persists the PG bigint handle to the new column.
backend/workflow_manager/workflow_v2/models/execution.py Adds nullable BigIntegerField queue_message_id to WorkflowExecution; correctly typed and non-editable.
backend/workflow_manager/workflow_v2/migrations/0020_workflowexecution_queue_message_id.py Correct AddField migration for the new nullable BigIntegerField; dependency chain is accurate and operation is a metadata-only change on PostgreSQL 11+.
backend/workflow_manager/workflow_v2/tests/test_record_dispatch_handle.py Four DB-free tests cover PG/Celery routing, empty handle, and malformed handle; mocking strategy is clean and targeted.
backend/workflow_manager/workflow_v2/tests/test_execute_workflow_async_wait.py Regression test pinning the defense-in-depth guarantee: a bookkeeping raise must not prevent the timeout poll loop from running; correctly exercises the inner try/except isolation.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Client
    participant execute_workflow_async
    participant _dispatch_orchestrator_task
    participant _record_dispatch_handle
    participant WorkflowExecutionServiceHelper
    participant poll_loop as Timeout Poll Loop

    Client->>execute_workflow_async: "timeout=300, workflow_id, execution_id"

    execute_workflow_async->>_dispatch_orchestrator_task: transport, queue, args, kwargs
    _dispatch_orchestrator_task-->>execute_workflow_async: dispatch_handle (UUID or bigint str)
    Note over execute_workflow_async: dispatched = True

    execute_workflow_async->>_record_dispatch_handle: execution_id, transport, dispatch_handle
    alt PG transport
        _record_dispatch_handle->>WorkflowExecutionServiceHelper: update_execution_queue_message_id(int(dispatch_handle))
    else Celery transport
        _record_dispatch_handle->>WorkflowExecutionServiceHelper: "update_execution_task(task_id=dispatch_handle)"
    end

    Note over execute_workflow_async: Inner try/except: failure here is swallowed

    execute_workflow_async->>poll_loop: "timeout > -1, poll every 2s"
    loop "Until completed or timeout=0"
        poll_loop->>execute_workflow_async: _get_execution_status()
    end

    execute_workflow_async-->>Client: ExecutionResponse(status, result)
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Client
    participant execute_workflow_async
    participant _dispatch_orchestrator_task
    participant _record_dispatch_handle
    participant WorkflowExecutionServiceHelper
    participant poll_loop as Timeout Poll Loop

    Client->>execute_workflow_async: "timeout=300, workflow_id, execution_id"

    execute_workflow_async->>_dispatch_orchestrator_task: transport, queue, args, kwargs
    _dispatch_orchestrator_task-->>execute_workflow_async: dispatch_handle (UUID or bigint str)
    Note over execute_workflow_async: dispatched = True

    execute_workflow_async->>_record_dispatch_handle: execution_id, transport, dispatch_handle
    alt PG transport
        _record_dispatch_handle->>WorkflowExecutionServiceHelper: update_execution_queue_message_id(int(dispatch_handle))
    else Celery transport
        _record_dispatch_handle->>WorkflowExecutionServiceHelper: "update_execution_task(task_id=dispatch_handle)"
    end

    Note over execute_workflow_async: Inner try/except: failure here is swallowed

    execute_workflow_async->>poll_loop: "timeout > -1, poll every 2s"
    loop "Until completed or timeout=0"
        poll_loop->>execute_workflow_async: _get_execution_status()
    end

    execute_workflow_async-->>Client: ExecutionResponse(status, result)
Loading

Reviews (2): Last reviewed commit: "UN-3602 [FIX] address review — defensive..." | Re-trigger Greptile

Comment thread backend/workflow_manager/workflow_v2/workflow_helper.py Outdated
Comment thread backend/workflow_manager/workflow_v2/workflow_helper.py

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR Review (PR Review Toolkit)

Reviewed with six specialized agents: Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, and Code Simplifier.

Verdict: Solid, well-scoped fix. The core change — routing the PG msg_id into its own correctly-typed queue_message_id column and isolating the post-dispatch bookkeeping in its own try/except so it can never abort the timeout sync-wait — is correct and well-commented. No blocking correctness bugs in the shipped code paths.

The findings below are improvements, not blockers, prioritized:

High / should-address

  1. The actual UN-3602 regression (wait must proceed when handle-recording raises) is not tested — only the routing refactor is.
  2. The task_id XOR queue_message_id invariant is unenforced at the model/DB level (the exact "illegal state representable" shape that caused this bug).

Medium
3. int(dispatch_handle) can raise ValueError, now swallowed by a generic handler with no handle-specific diagnostic.
4. transport: str is stringly-typed where a WorkflowTransport enum already exists — a typo silently misroutes to task_id.
5. update_execution_queue_message_id(queue_message_id: int) is annotated int but its body guards None.

Low
6. Add a non-numeric-PG-handle test; scope the test module docstring to "routing only (mocked)."
7. Comment precision: the historical ValueError was raised inside update_execution_task on save(), not at the call site.

Inline comments follow.

null=True,
db_comment="task id of asynchronous execution",
)
queue_message_id = models.BigIntegerField(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Type Design · High] XOR invariant is unenforced. task_id (UUID, Celery) and queue_message_id (bigint, PG) are two nullable columns where, post-dispatch, exactly one should be set. That invariant lives only in prose (db_comment + the _record_dispatch_handle docstring) and in the happy-path helper — nothing stops another writer from setting both or neither. This is the same "illegal state is representable" shape that produced UN-3602.

Consider expressing it at the DB level so all writers are covered:

class Meta:
    constraints = [
        models.CheckConstraint(
            name="exec_task_xor_queue_msg",
            check=~(Q(task_id__isnull=False) & Q(queue_message_id__isnull=False)),
        ),
    ]

(Negated AND rather than strict XOR, since both are legitimately NULL pre-dispatch.) Cheapest change with the strongest guarantee.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring this one, with reasoning. A CheckConstraint is added via an AddConstraint migration that performs a validating full-table scan of workflow_execution under a SHARE ROW EXCLUSIVE lock — exactly the large-table migration cost we chose the additive-nullable-column path to avoid (the UUIDField → CharField type-change alternative was rejected for the same reason). The invariant is already guaranteed by construction here: _record_dispatch_handle branches on transport and sets exactly one column, and it's the sole writer of queue_message_id. I'd rather not add a scanning migration on this hot table in this PR — happy to file it as a standalone constraint migration (or a NOT VALID + later VALIDATE) if you want the DB-level belt-and-suspenders.

return
if is_pg_transport(transport):
WorkflowExecutionServiceHelper.update_execution_queue_message_id(
execution_id=execution_id, queue_message_id=int(dispatch_handle)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Silent Failure · Medium] int(dispatch_handle) can raise ValueError, now swallowed generically. This is the same failure class the PR fixes (a type mismatch on the PG handle), just relocated. If pg_enqueue_task ever returns a non-numeric string (version skew, future handle format, error sentinel), int() raises ValueError, which the outer except Exception (lines 667-672) catches and logs as the generic "Failed to record dispatch handle" — giving operators no hint the handle format was the problem, and the PG row silently loses its queue_message_id (cancellation/reaper later can't find the row).

Parse defensively and log the offending value:

if is_pg_transport(transport):
    try:
        msg_id = int(dispatch_handle)
    except (TypeError, ValueError):
        logger.error(
            f"[{org_schema}] PG dispatch handle {dispatch_handle!r} is not a "
            f"valid bigint msg_id for execution_id '{execution_id}'; "
            "queue_message_id not recorded"
        )
        return
    WorkflowExecutionServiceHelper.update_execution_queue_message_id(
        execution_id=execution_id, queue_message_id=msg_id
    )

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 52704e2 — defensive parse with a handle-specific logger.error (offending value included), so the generic outer guard no longer hides a malformed-handle cause. Same change addresses the greptile P2 on this line.

cls,
*,
execution_id: str,
transport: str,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Type Design · Medium] transport: str is stringly-typed at the internal helper boundary. A WorkflowTransport (str-Enum) already exists in unstract/core/.../data_models.py and is_pg_transport is the single-source predicate. With a bare str, a typo or garbage value reaches here and is_pg_transport("pg-queue") silently returns False, routing a PG handle to task_id — re-introducing exactly the bug class this PR closes. Wire-strings can stay at the edge, but normalize once and type this internal param as WorkflowTransport so the discriminant is a closed set.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring, with reasoning. resolve_transport returns str (the WorkflowTransport.value), not the enum member, so annotating this internal param as WorkflowTransport would be inaccurate unless resolve_transport's return contract changes — which ripples to every caller and the task-payload wire shape (transport is carried as a plain string), out of scope for this fix. In practice the discriminant is already closed: the only caller passes resolve_transport's output, and is_pg_transport is the single normalization predicate. Happy to do the enum-normalization as a separate refactor of resolve_transport if you'd like that hardening.


@staticmethod
def update_execution_queue_message_id(
execution_id: str, queue_message_id: int

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Type Design · Medium] Annotation/contract mismatch. The signature says queue_message_id: int, but the body (lines 437-442) guards if queue_message_id is None. The annotation lies — either it's int | None (matching the runtime guard) or the guard is dead code. Note the only caller, _record_dispatch_handle, already returns early on a falsy handle and always passes int(dispatch_handle), so the None branch is unreachable from the real path. Either make it int | None for honesty (mirrors the defensive update_execution_task), or drop the guard. Minor: the sibling guards if not task_id (catches 0/empty) while this guards is None only — queue_message_id=0 would slip through, though 0 is not a valid msg_id.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 52704e2 — annotation is now int | None, matching the is None guard in the body and mirroring update_execution_task. (msg_id is a BigAutoField starting at 1, so the queue_message_id=0 slip-through is moot in practice.)

# UUID task_id used to raise ValueError here, which bubbled to the
# post-dispatch handler and silently skipped the wait — so every
# PG-routed API deployment ignored `timeout`.)
try:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Tests · High] The actual regression is untested. This inner try/except is the heart of the fix, but no test exercises it. The new tests verify the routing of _record_dispatch_handle in isolation (helper mocked); they do not verify the defense-in-depth guarantee that a raise here still lets execute_workflow_async fall through to the if timeout > -1: poll loop instead of short-circuiting to the post-dispatch handler that returns EXECUTING immediately.

Without this test, someone could move the recording back outside the try/except (or delete it) and every existing test would still pass while UN-3602 silently returns. Suggested: patch cls._record_dispatch_handle to raise, patch _dispatch_orchestrator_task / WorkflowExecution.objects.get / _get_execution_status, and assert the poll loop runs (i.e. status is polled) rather than an immediate bare EXECUTING. This is the one test that would actually catch a re-regression.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 52704e2 — added tests/test_execute_workflow_async_wait.py. It patches _record_dispatch_handle to raise and asserts execute_workflow_async still enters the poll loop (_get_execution_status is called) rather than short-circuiting to an immediate EXECUTING. Moving the recording back outside the inner try/except (or deleting it) fails this test — exactly the re-regression guard you described.

fix routes the PG msg_id into its own ``queue_message_id`` (BigIntegerField) and
leaves ``task_id`` NULL on the PG path, so no bigint is ever forced into a UUID.

DB-free: ``WorkflowExecutionServiceHelper`` is mocked.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Tests + Comment · Low] Two small follow-ups for this file:

  1. Scope the docstring. It describes the production crash (UUID coercion → ValueError → skipped wait) in detail, but these tests mock WorkflowExecutionServiceHelper and assert routing only — they never reproduce the ValueError or the sync-wait skip. A future maintainer could read this and assume the crash itself is covered, then delete the real guard. Add: "These tests assert routing only (PG→queue_message_id as int, Celery→task_id); the UUID-coercion crash and the timeout sync-wait are not exercised here (helper is mocked)."

  2. Add a non-numeric-handle case. int(dispatch_handle) on the PG path raises ValueError for a malformed handle. A test_non_numeric_pg_handle asserting that behavior (or the graceful no-record after the fix in workflow_helper.py:547) pins the contract — today it's only protected by _dispatch_orchestrator_task always returning a numeric str(msg_id).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both fixed in 52704e2 — added test_non_numeric_pg_handle_records_nothing (asserts the defensive parse records nothing and doesn't raise), and scoped the module docstring to "routing only (mocked); the UUID-coercion crash and the timeout sync-wait are covered in test_execute_workflow_async_wait.py".

# Record the dispatch handle (Celery task_id or PG msg_id) on the row.
# Best-effort, in its OWN try/except: it must NEVER abort the
# synchronous timeout wait below. (Writing a bigint msg_id into the
# UUID task_id used to raise ValueError here, which bubbled to the

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Comment · Low] Minor imprecision. "used to raise ValueError here" reads as if the throw originated at this call site, but the ValueError was actually raised inside update_execution_taskexecution.save() (UUID coercion). Suggest: "...used to raise ValueError inside update_execution_task (UUID coercion on save), which bubbled to the post-dispatch handler..." Helps anyone grepping for the throw origin. (The comment is otherwise an excellent "why" comment — keep it.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 52704e2 — comment now reads "...used to raise ValueError inside update_execution_task (UUID coercion on save), which bubbled to the post-dispatch handler and silently skipped the wait...".

… test

Addresses PR #2086 review (toolkit + greptile):
- _record_dispatch_handle: @classmethod -> @staticmethod (never uses cls)
- parse the PG msg_id defensively (try/except TypeError/ValueError) so a
  malformed/future handle format logs a specific cause instead of being
  absorbed by the caller's generic post-dispatch guard
- update_execution_queue_message_id annotation int -> int | None (matches the
  None guard in the body; mirrors update_execution_task)
- add the missing regression test (test_execute_workflow_async_wait.py): a raise
  during handle recording must NOT skip the timeout poll loop
- add a non-numeric-PG-handle test; scope the routing-test docstring
- comment precision: the ValueError was raised inside update_execution_task on
  save (UUID coercion), not at the call site

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review addressed — 52704e280

Thanks for the thorough pass. Summary of what changed:

Fixed (7)

  • _record_dispatch_handle@staticmethod (never used cls). (greptile P2)
  • PG msg_id parsed defensively: try/except (TypeError, ValueError) with a handle-specific logger.error, so a malformed/future handle format surfaces its cause instead of being swallowed by the outer guard. (greptile P2 + Silent Failure · Med)
  • update_execution_queue_message_id annotation intint | None (matches the body guard; mirrors update_execution_task). (Type Design · Med)
  • Added the missing regression test tests/test_execute_workflow_async_wait.py: patches _record_dispatch_handle to raise and asserts the timeout poll loop still runs (_get_execution_status called) instead of short-circuiting to EXECUTING. This is the test that catches a re-regression if the recording is moved back outside the inner try/except. (Tests · High)
  • Added test_non_numeric_pg_handle_records_nothing and scoped the routing-test docstring to "routing only (mocked)". (Tests + Comment · Low)
  • Comment precision: the ValueError was raised inside update_execution_task on save(), not at the call site. (Comment · Low)

Deferred, with reasoning (2) — replied inline:

  • XOR CheckConstraint (Type Design · High): an AddConstraint migration validates via a full-table scan on this large hot table — the exact migration cost the additive-nullable-column design avoids (the UUIDField → CharField alternative was rejected for the same reason). The invariant is already guaranteed by construction (_record_dispatch_handle sets exactly one column and is the sole writer). Can land as a standalone (NOT VALID + later VALIDATE) migration if you want the DB-level guard.
  • transport: WorkflowTransport (Type Design · Med): resolve_transport returns str (the enum value), not the member, and the value is carried as a plain string in the task payload — typing this internal param as the enum would require changing resolve_transport's contract across all callers, out of scope here. The discriminant is already closed in practice (single caller + is_pg_transport predicate).

All tests green (9/9), pre-commit clean. Verdict noted — appreciated.

@muhammad-ali-e muhammad-ali-e merged commit ca73756 into feat/UN-3445-pg-queue-integration Jun 19, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the fix/UN-3602-FIX_pg_timeout_sync_wait branch June 19, 2026 06:17
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.