Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3655 [FIX] HITL queue — populate execution_id on enqueue (QueueResult)#2125

Merged
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3655-hitl-execution-idZipstack/unstract:UN-3655-hitl-execution-idCopy head branch name to clipboard
Jun 30, 2026
Merged

UN-3655 [FIX] HITL queue — populate execution_id on enqueue (QueueResult)#2125
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3655-hitl-execution-idZipstack/unstract:UN-3655-hitl-execution-idCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Carries the workflow execution_id into the HITL queue message so the existing hitl_queue.execution_id column stops being NULL. No schema change — the column already exists; it was simply never populated.

Why

hitl_queue.execution_id is NULL on 100% of rows — 2301/2301 in production (since 2026-02-03) and on both Celery and PG transports. A review item therefore can't be traced to its workflow_execution without joining through workflow_file_execution.

Root cause: the ETL manual-review push serialises a QueueResult into the queue message, but QueueResult carried workflow_id + file_execution_id and no execution_id — so _push_data_to_queue omitted it even though self.execution_id is in scope. The backend manual-review enqueue already reads message["execution_id"], so the column persisted NULL only because the message lacked the key.

How

  • Add execution_id to the QueueResult dataclass + to_dict().
  • Pass self.execution_id at the destination push site.

The API/packet path already includes execution_id in its message, so this brings the ETL path to parity. OSS-only — no backend/cloud change (the backend mapping already exists). Additive: the field defaults to None, so callers that don't set it are unaffected; the message gains one key (consumers read via dict.get, message is JSONB).

Transport-agnostic — also fixes the existing Celery path.

Tests

test_queue_result_execution_id.py (2): to_dict() carries execution_id (set), and emits the key as None by default (no KeyError, purely additive).

No-regression verification

Full worker suite: 1070 passed. The 28 full-suite failures are pre-existing, environmental (testdb connection failures + MagicMock-not-JSON usage mocks + test-ordering), proven by A/B: running the same 28 nodes as a subset on the base vs with this change yields a byte-identical failure set (27 fail, 1 passes) — this change has zero effect on any test. (One real-DB consumer test, test_pg_queue_consumer.py, hangs locally and was excluded; it's unrelated to QueueResult.)

E2E validates on deploy: new worker image + the pre-existing backend mapping fill the column — new hitl_queue rows will have execution_id set instead of NULL.

…ult)

hitl_queue.execution_id was NULL on 100% of rows (2301/2301 in production
since 2026-02-03, both Celery and PG transports) — a HITL item couldn't be
traced to its workflow_execution without joining through
workflow_file_execution.

The ETL manual-review push serialises a QueueResult into the queue message,
but QueueResult carried workflow_id + file_execution_id and NO execution_id,
so _push_data_to_queue omitted it even though self.execution_id is in scope.
The backend manual-review enqueue ALREADY reads message["execution_id"], so
it persisted NULL only because the message lacked the key.

Adds execution_id to the QueueResult dataclass + to_dict(), and passes
self.execution_id at the destination push site. The API/packet path already
includes execution_id in its message. OSS-only; no cloud/backend change.
Transport-agnostic (also fixes the existing Celery path). Additive — the
field defaults to None, so callers that don't set it are unaffected.

Tests: QueueResult.to_dict() includes execution_id (set + default-None).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 37d14316-b05f-4b89-beac-01c73e29fba7

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3655-hitl-execution-id

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@greptile-apps

greptile-apps Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes a long-standing NULL-column issue on hitl_queue.execution_id by carrying self.execution_id from both the backend and worker DestinationConnector into the serialised QueueResult message. No schema change is required — the column already existed but was never populated because the QueueResult dataclass lacked the field.

  • Both QueueResult dataclasses (backend queue_utils.py and workers result_models.py) gain an optional execution_id: str | None field with a None default, preserving backward compatibility with callers that don't set it.
  • Both enqueue call sites (destination.py and destination_connector.py) now pass execution_id=self.execution_id, bringing the ETL path to parity with the API/packet path that already included the key.
  • The workers version adds a __post_init__ logger.warning when execution_id is None, surfacing unexpected NULLs without breaking the enqueue flow.

Confidence Score: 5/5

Safe to merge — the change is purely additive, adds a single nullable field with a None default to a dataclass, and propagates a value that is already in scope at both call sites.

All changed code is backward compatible: the new execution_id field defaults to None, to_dict() always emits the key, and the consumer already reads via dict.get. Both enqueue call sites pass self.execution_id which is already set by their respective constructors. Tests cover the data model contract, the default-None behaviour, warning emission, and the integration wiring with a distinct value check.

No files require special attention.

Important Files Changed

Filename Overview
workers/shared/models/result_models.py Adds execution_id field and to_dict() entry to the workers QueueResult dataclass; adds a post_init warning when the field is absent so silent NULLs are visible in logs.
backend/workflow_manager/endpoint_v2/queue_utils.py Mirrors the workers change — adds execution_id to the backend QueueResult dataclass and its to_dict(). No post_init warning here (intentional; backend constructor always receives execution_id).
workers/shared/workflow/destination_connector.py Single-line change: passes execution_id=self.execution_id at the QueueResult construction site inside _push_data_to_queue.
backend/workflow_manager/endpoint_v2/destination.py Single-line change: passes execution_id=self.execution_id at the QueueResult construction site inside _create_queue_result_for_etl.
workers/tests/test_queue_result_execution_id.py New tests: pins to_dict() key presence, default-None behaviour, warning emission, and the integration wiring (execution_id=self.execution_id reaches the queue message with correct value distinct from file_execution_id).
backend/workflow_manager/endpoint_v2/tests/test_queue_result_execution_id.py New tests for the backend QueueResult: verifies that execution_id is present in to_dict() both when set and when defaulted to None.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant WDC as WorkerDestinationConnector
    participant BDC as DestinationConnector
    participant QR as QueueResult
    participant Q as HITL Queue
    participant DB as hitl_queue table

    Note over WDC,BDC: Both paths now carry execution_id

    WDC->>QR: "QueueResult(execution_id=self.execution_id, ...)"
    BDC->>QR: "QueueResult(execution_id=self.execution_id, ...)"

    QR->>QR: __post_init__: warn if execution_id is None (workers only)
    QR->>Q: to_dict() — message includes execution_id key

    Q->>DB: consumer reads message[execution_id]
    DB-->>DB: hitl_queue.execution_id populated (no longer NULL)
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant WDC as WorkerDestinationConnector
    participant BDC as DestinationConnector
    participant QR as QueueResult
    participant Q as HITL Queue
    participant DB as hitl_queue table

    Note over WDC,BDC: Both paths now carry execution_id

    WDC->>QR: "QueueResult(execution_id=self.execution_id, ...)"
    BDC->>QR: "QueueResult(execution_id=self.execution_id, ...)"

    QR->>QR: __post_init__: warn if execution_id is None (workers only)
    QR->>Q: to_dict() — message includes execution_id key

    Q->>DB: consumer reads message[execution_id]
    DB-->>DB: hitl_queue.execution_id populated (no longer NULL)
Loading

Reviews (2): Last reviewed commit: "UN-3655 [FIX] address PR #2125 review — ..." | Re-trigger Greptile

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).

Verdict: The change is correct, additive, and safe at the wire level — to_dict() always emits the execution_id key, the call site wiring matches the sibling file_execution_id, and self.execution_id is structurally non-None on the live HITL enqueue path. Code-reviewer and code-simplifier found nothing to change.

The inline comments below are improvement opportunities, not blockers. Highest value: (1) the integration point at destination_connector.py:1836 has no test, and (2) the field is modeled as optional with a None default, which re-permits the exact NULL state this PR exists to eliminate.

Out-of-diff note (cannot inline): backend/workflow_manager/endpoint_v2/queue_utils.py:231-267 has a parallel QueueResult dataclass that still lacks execution_id. If any HITL enqueue still flows through that backend path (rather than the workers path), those rows keep getting NULL execution_id — worth confirming that path is dead, else mirror the fix there.

# can persist hitl_queue.execution_id (it reads message["execution_id"]).
# Without it the column is left NULL and a review item can't be traced back
# to its execution without joining through workflow_file_execution.
execution_id: str | None = None

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type design (High value): execution_id: str | None = None models the field as optional with a None default — which re-permits the exact NULL state this PR exists to eliminate. Its sibling execution-scoped id workflow_id (line 222) is required (str, no default) and validated in __post_init__; execution_id is always available at the only call site (destination_connector.py:1836 reads self.execution_id unconditionally), so it has the same lifecycle.

Suggested: make it required and move it next to workflow_id, then add to the existing __post_init__ validation block:

if not self.execution_id:
    raise ValueError("QueueResult requires a valid execution_id")

This closes the NULL hole at construction instead of leaving it to chance. (The test_execution_id_defaults_to_none_but_key_present test currently encodes the optional behavior as acceptable and would need updating — which is correct.)

If backward-compat with unknown external callers is mandatory, at minimum add a logger.warning in __post_init__ when execution_id is falsy, so a NULL write is logged rather than silent.

"workflow_id": self.workflow_id,
"file_content": self.file_content,
"file_execution_id": self.file_execution_id,
"execution_id": self.execution_id,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maintainability (Medium): to_dict() is a hand-maintained serializer — every field that the consumer reads must be manually mirrored here. A field added to the dataclass but forgotten here fails silently as a downstream NULL, which is precisely this ticket's failure mode. Consider replacing the hand-written body with dataclasses.asdict(self) plus the one transform it needs (statusstatus.value), so adding a field automatically propagates to the wire format. Not required for this PR, but it removes the recurring two-place-edit hazard.

workflow_id=str(self.workflow_id),
whisper_hash=whisper_hash,
file_execution_id=file_execution_id,
execution_id=self.execution_id,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test gap (criticality 7 — highest-value missing test): This line is the entire point of the PR, and it has no test. The two new tests only exercise QueueResult(...).to_dict() with execution_id passed in by hand, so they still pass if this kwarg is dropped (→ defaults to Nonehitl_queue.execution_id back to NULL) or fat-fingered to self.file_execution_id (the adjacent line). Suggest a test that exercises _push_data_to_queue (line 1759) with the I/O helpers mocked and asserts the dict handed to the enqueue boundary carries the connector's execution_id — using a value distinct from file_execution_id so an adjacent-field swap is detectable.

Comment thread workers/shared/models/result_models.py Outdated
file_content: str | None = None
whisper_hash: str | None = None
file_execution_id: str | None = None
# Workflow execution id — carried into the HITL queue message so the backend

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment accuracy (comment-rot risk): This comment asserts backend behavior — that it reads message["execution_id"] and persists hitl_queue.execution_id. That consumer is pluggable_apps.manual_review_v2, which is not in this repo (only referenced via guarded imports), so the claim can't be verified or kept honest here. It also contradicts the test file's comment, which describes the access as .get("execution_id") (no-raise) rather than subscript. Suggest scoping the comment to what this repo controls and naming where the other half lives, e.g.: # Carried into the HITL queue message so the manual-review consumer (pluggable_apps.manual_review_v2, separate codebase) can populate hitl_queue.execution_id. The key name must stay in sync with that consumer. Drop the assertion about which accessor the backend uses; the durable local fact is that to_dict() always emits the key (None when unset).


The ETL manual-review push serialises a ``QueueResult`` to the queue message; the
backend manual-review enqueue persists ``hitl_queue.execution_id`` from
``message["execution_id"]``. These tests pin that contract — the key must be

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring scope: "These tests pin that contract" overstates what's covered — the tests only exercise the producer's QueueResult.to_dict() serialization, not the backend persistence half (consumer + hitl_queue.execution_id column live out-of-tree). Suggest scoping to the verifiable half: "pin the producer half of that contract — that to_dict() emits a key named exactly execution_id." Leave the consumer/column behavior as context rather than something the tests guarantee.

…rning, comment scope, backend QueueResult parity

- add the highest-value missing test: exercise _push_data_to_queue and assert
  the dict at the enqueue boundary carries the connector's execution_id,
  distinct from file_execution_id (catches a dropped kwarg / adjacent-field
  swap on the integration line — the to_dict() tests can't see it)
- QueueResult.__post_init__ logs a WARNING when execution_id is missing instead
  of silently writing a NULL (kept optional, not a hard raise: the connector's
  execution_id is nullable on some paths, so a raise would break HITL enqueue)
- rescope the field comment + test docstring to what this repo controls (the
  producer half); name the out-of-tree consumer (manual_review_v2), drop the
  backend-accessor assertion
- mirror the additive fix into the backend QueueResult
  (endpoint_v2/queue_utils.py + destination._create_queue_result) so the column
  is populated whichever destination path enqueues — addresses the out-of-diff
  note; backend parity test added

Deferred (noted on PR): to_dict() -> dataclasses.asdict() refactor (out of
scope; changing the serialization of every field carries its own regression
surface).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Thanks for the review — addressed in the latest commit:

Highest-value gap (test for the integration line) — added test_push_data_to_queue_wires_connector_execution_id: it drives _push_data_to_queue with the I/O helpers mocked and asserts the dict at the enqueue boundary carries the connector's execution_id, with execution_id != file_execution_id so a dropped kwarg or an adjacent-field swap is caught. The to_dict() tests couldn't see that line.

Type design (None default re-permits NULL) — kept the field optional but __post_init__ now logs a WARNING when execution_id is missing, so a latent NULL write is visible rather than silent. I went with the warning (your "at minimum" option) rather than a required-field raise on purpose: the connector's execution_id is genuinely nullable on some paths (destination_connector.py:228 inits None; :1003-1007 explicitly handles the falsy case), so a hard raise would convert a cosmetic NULL column into a broken HITL enqueue — strictly worse. Added a positive + negative caplog test for the warning.

Comment accuracy / docstring scope — rescoped both to what this repo controls (the producer half): named the out-of-tree consumer (pluggable_apps.manual_review_v2), dropped the backend-accessor (message[...] vs .get) assertion, and the durable local fact (to_dict() always emits the key, None when unset).

Out-of-diff: parallel backend QueueResult — good catch. Rather than trace liveness, I mirrored the additive fix into endpoint_v2/queue_utils.py + destination._create_queue_result (the backend connector has self.execution_id), so the column is populated whichever destination path enqueues. Backend parity test added.

Deferred: to_dict()dataclasses.asdict() — left as-is for this PR; switching the serialization of every field has its own regression surface (field set/ordering on the wire) and is orthogonal to the NULL fix. Happy to do it as a follow-up if you'd like.

All workers (4) + backend (2) tests green; pre-commit clean.

@muhammad-ali-e muhammad-ali-e merged commit cd965d7 into feat/UN-3445-pg-queue-integration Jun 30, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3655-hitl-execution-id branch June 30, 2026 05:48
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.