UN-3655 [FIX] HITL queue — populate execution_id on enqueue (QueueResult)#2125
UN-3655 [FIX] HITL queue — populate execution_id on enqueue (QueueResult)#2125muhammad-ali-e merged 2 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3655-hitl-execution-idZipstack/unstract:UN-3655-hitl-execution-idCopy head branch name to clipboard
Conversation
…ult) hitl_queue.execution_id was NULL on 100% of rows (2301/2301 in production since 2026-02-03, both Celery and PG transports) — a HITL item couldn't be traced to its workflow_execution without joining through workflow_file_execution. The ETL manual-review push serialises a QueueResult into the queue message, but QueueResult carried workflow_id + file_execution_id and NO execution_id, so _push_data_to_queue omitted it even though self.execution_id is in scope. The backend manual-review enqueue ALREADY reads message["execution_id"], so it persisted NULL only because the message lacked the key. Adds execution_id to the QueueResult dataclass + to_dict(), and passes self.execution_id at the destination push site. The API/packet path already includes execution_id in its message. OSS-only; no cloud/backend change. Transport-agnostic (also fixes the existing Celery path). Additive — the field defaults to None, so callers that don't set it are unaffected. Tests: QueueResult.to_dict() includes execution_id (set + default-None). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
| Filename | Overview |
|---|---|
| workers/shared/models/result_models.py | Adds execution_id field and to_dict() entry to the workers QueueResult dataclass; adds a post_init warning when the field is absent so silent NULLs are visible in logs. |
| backend/workflow_manager/endpoint_v2/queue_utils.py | Mirrors the workers change — adds execution_id to the backend QueueResult dataclass and its to_dict(). No post_init warning here (intentional; backend constructor always receives execution_id). |
| workers/shared/workflow/destination_connector.py | Single-line change: passes execution_id=self.execution_id at the QueueResult construction site inside _push_data_to_queue. |
| backend/workflow_manager/endpoint_v2/destination.py | Single-line change: passes execution_id=self.execution_id at the QueueResult construction site inside _create_queue_result_for_etl. |
| workers/tests/test_queue_result_execution_id.py | New tests: pins to_dict() key presence, default-None behaviour, warning emission, and the integration wiring (execution_id=self.execution_id reaches the queue message with correct value distinct from file_execution_id). |
| backend/workflow_manager/endpoint_v2/tests/test_queue_result_execution_id.py | New tests for the backend QueueResult: verifies that execution_id is present in to_dict() both when set and when defaulted to None. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant WDC as WorkerDestinationConnector
participant BDC as DestinationConnector
participant QR as QueueResult
participant Q as HITL Queue
participant DB as hitl_queue table
Note over WDC,BDC: Both paths now carry execution_id
WDC->>QR: "QueueResult(execution_id=self.execution_id, ...)"
BDC->>QR: "QueueResult(execution_id=self.execution_id, ...)"
QR->>QR: __post_init__: warn if execution_id is None (workers only)
QR->>Q: to_dict() — message includes execution_id key
Q->>DB: consumer reads message[execution_id]
DB-->>DB: hitl_queue.execution_id populated (no longer NULL)
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant WDC as WorkerDestinationConnector
participant BDC as DestinationConnector
participant QR as QueueResult
participant Q as HITL Queue
participant DB as hitl_queue table
Note over WDC,BDC: Both paths now carry execution_id
WDC->>QR: "QueueResult(execution_id=self.execution_id, ...)"
BDC->>QR: "QueueResult(execution_id=self.execution_id, ...)"
QR->>QR: __post_init__: warn if execution_id is None (workers only)
QR->>Q: to_dict() — message includes execution_id key
Q->>DB: consumer reads message[execution_id]
DB-->>DB: hitl_queue.execution_id populated (no longer NULL)
Reviews (2): Last reviewed commit: "UN-3655 [FIX] address PR #2125 review — ..." | Re-trigger Greptile
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).
Verdict: The change is correct, additive, and safe at the wire level — to_dict() always emits the execution_id key, the call site wiring matches the sibling file_execution_id, and self.execution_id is structurally non-None on the live HITL enqueue path. Code-reviewer and code-simplifier found nothing to change.
The inline comments below are improvement opportunities, not blockers. Highest value: (1) the integration point at destination_connector.py:1836 has no test, and (2) the field is modeled as optional with a None default, which re-permits the exact NULL state this PR exists to eliminate.
Out-of-diff note (cannot inline): backend/workflow_manager/endpoint_v2/queue_utils.py:231-267 has a parallel QueueResult dataclass that still lacks execution_id. If any HITL enqueue still flows through that backend path (rather than the workers path), those rows keep getting NULL execution_id — worth confirming that path is dead, else mirror the fix there.
| # can persist hitl_queue.execution_id (it reads message["execution_id"]). | ||
| # Without it the column is left NULL and a review item can't be traced back | ||
| # to its execution without joining through workflow_file_execution. | ||
| execution_id: str | None = None |
There was a problem hiding this comment.
Type design (High value): execution_id: str | None = None models the field as optional with a None default — which re-permits the exact NULL state this PR exists to eliminate. Its sibling execution-scoped id workflow_id (line 222) is required (str, no default) and validated in __post_init__; execution_id is always available at the only call site (destination_connector.py:1836 reads self.execution_id unconditionally), so it has the same lifecycle.
Suggested: make it required and move it next to workflow_id, then add to the existing __post_init__ validation block:
if not self.execution_id:
raise ValueError("QueueResult requires a valid execution_id")This closes the NULL hole at construction instead of leaving it to chance. (The test_execution_id_defaults_to_none_but_key_present test currently encodes the optional behavior as acceptable and would need updating — which is correct.)
If backward-compat with unknown external callers is mandatory, at minimum add a logger.warning in __post_init__ when execution_id is falsy, so a NULL write is logged rather than silent.
| "workflow_id": self.workflow_id, | ||
| "file_content": self.file_content, | ||
| "file_execution_id": self.file_execution_id, | ||
| "execution_id": self.execution_id, |
There was a problem hiding this comment.
Maintainability (Medium): to_dict() is a hand-maintained serializer — every field that the consumer reads must be manually mirrored here. A field added to the dataclass but forgotten here fails silently as a downstream NULL, which is precisely this ticket's failure mode. Consider replacing the hand-written body with dataclasses.asdict(self) plus the one transform it needs (status → status.value), so adding a field automatically propagates to the wire format. Not required for this PR, but it removes the recurring two-place-edit hazard.
| workflow_id=str(self.workflow_id), | ||
| whisper_hash=whisper_hash, | ||
| file_execution_id=file_execution_id, | ||
| execution_id=self.execution_id, |
There was a problem hiding this comment.
Test gap (criticality 7 — highest-value missing test): This line is the entire point of the PR, and it has no test. The two new tests only exercise QueueResult(...).to_dict() with execution_id passed in by hand, so they still pass if this kwarg is dropped (→ defaults to None → hitl_queue.execution_id back to NULL) or fat-fingered to self.file_execution_id (the adjacent line). Suggest a test that exercises _push_data_to_queue (line 1759) with the I/O helpers mocked and asserts the dict handed to the enqueue boundary carries the connector's execution_id — using a value distinct from file_execution_id so an adjacent-field swap is detectable.
| file_content: str | None = None | ||
| whisper_hash: str | None = None | ||
| file_execution_id: str | None = None | ||
| # Workflow execution id — carried into the HITL queue message so the backend |
There was a problem hiding this comment.
Comment accuracy (comment-rot risk): This comment asserts backend behavior — that it reads message["execution_id"] and persists hitl_queue.execution_id. That consumer is pluggable_apps.manual_review_v2, which is not in this repo (only referenced via guarded imports), so the claim can't be verified or kept honest here. It also contradicts the test file's comment, which describes the access as .get("execution_id") (no-raise) rather than subscript. Suggest scoping the comment to what this repo controls and naming where the other half lives, e.g.: # Carried into the HITL queue message so the manual-review consumer (pluggable_apps.manual_review_v2, separate codebase) can populate hitl_queue.execution_id. The key name must stay in sync with that consumer. Drop the assertion about which accessor the backend uses; the durable local fact is that to_dict() always emits the key (None when unset).
|
|
||
| The ETL manual-review push serialises a ``QueueResult`` to the queue message; the | ||
| backend manual-review enqueue persists ``hitl_queue.execution_id`` from | ||
| ``message["execution_id"]``. These tests pin that contract — the key must be |
There was a problem hiding this comment.
Docstring scope: "These tests pin that contract" overstates what's covered — the tests only exercise the producer's QueueResult.to_dict() serialization, not the backend persistence half (consumer + hitl_queue.execution_id column live out-of-tree). Suggest scoping to the verifiable half: "pin the producer half of that contract — that to_dict() emits a key named exactly execution_id." Leave the consumer/column behavior as context rather than something the tests guarantee.
…rning, comment scope, backend QueueResult parity - add the highest-value missing test: exercise _push_data_to_queue and assert the dict at the enqueue boundary carries the connector's execution_id, distinct from file_execution_id (catches a dropped kwarg / adjacent-field swap on the integration line — the to_dict() tests can't see it) - QueueResult.__post_init__ logs a WARNING when execution_id is missing instead of silently writing a NULL (kept optional, not a hard raise: the connector's execution_id is nullable on some paths, so a raise would break HITL enqueue) - rescope the field comment + test docstring to what this repo controls (the producer half); name the out-of-tree consumer (manual_review_v2), drop the backend-accessor assertion - mirror the additive fix into the backend QueueResult (endpoint_v2/queue_utils.py + destination._create_queue_result) so the column is populated whichever destination path enqueues — addresses the out-of-diff note; backend parity test added Deferred (noted on PR): to_dict() -> dataclasses.asdict() refactor (out of scope; changing the serialization of every field carries its own regression surface). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Thanks for the review — addressed in the latest commit: Highest-value gap (test for the integration line) — added Type design (None default re-permits NULL) — kept the field optional but Comment accuracy / docstring scope — rescoped both to what this repo controls (the producer half): named the out-of-tree consumer ( Out-of-diff: parallel backend Deferred: All workers (4) + backend (2) tests green; pre-commit clean. |
cd965d7
into
feat/UN-3445-pg-queue-integration
|
What
Carries the workflow
execution_idinto the HITL queue message so the existinghitl_queue.execution_idcolumn stops being NULL. No schema change — the column already exists; it was simply never populated.Why
hitl_queue.execution_idis NULL on 100% of rows — 2301/2301 in production (since 2026-02-03) and on both Celery and PG transports. A review item therefore can't be traced to itsworkflow_executionwithout joining throughworkflow_file_execution.Root cause: the ETL manual-review push serialises a
QueueResultinto the queue message, butQueueResultcarriedworkflow_id+file_execution_idand noexecution_id— so_push_data_to_queueomitted it even thoughself.execution_idis in scope. The backend manual-review enqueue already readsmessage["execution_id"], so the column persisted NULL only because the message lacked the key.How
execution_idto theQueueResultdataclass +to_dict().self.execution_idat the destination push site.The API/packet path already includes
execution_idin its message, so this brings the ETL path to parity. OSS-only — no backend/cloud change (the backend mapping already exists). Additive: the field defaults toNone, so callers that don't set it are unaffected; the message gains one key (consumers read viadict.get, message is JSONB).Transport-agnostic — also fixes the existing Celery path.
Tests
test_queue_result_execution_id.py(2):to_dict()carriesexecution_id(set), and emits the key asNoneby default (noKeyError, purely additive).No-regression verification
Full worker suite: 1070 passed. The 28 full-suite failures are pre-existing, environmental (
testdbconnection failures +MagicMock-not-JSON usage mocks + test-ordering), proven by A/B: running the same 28 nodes as a subset on the base vs with this change yields a byte-identical failure set (27 fail, 1 passes) — this change has zero effect on any test. (One real-DB consumer test,test_pg_queue_consumer.py, hangs locally and was excluded; it's unrelated toQueueResult.)E2E validates on deploy: new worker image + the pre-existing backend mapping fill the column — new
hitl_queuerows will haveexecution_idset instead of NULL.