Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3497 [FEAT] Introduce queue_backend seam and migrate Celery call sites#2001

Merged
chandrasekharan-zipstack merged 5 commits into
mainZipstack/unstract:mainfrom
UN-3497-queue-backend-seamZipstack/unstract:UN-3497-queue-backend-seamCopy head branch name to clipboard
Jun 1, 2026
Merged

UN-3497 [FEAT] Introduce queue_backend seam and migrate Celery call sites#2001
chandrasekharan-zipstack merged 5 commits into
mainZipstack/unstract:mainfrom
UN-3497-queue-backend-seamZipstack/unstract:UN-3497-queue-backend-seamCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

  • Add new module workers/queue_backend/ (decorator.py, dispatch.py, __init__.py) — a single seam wrapping Celery task registration and dispatch.
  • Migrate all 9 worker tasks.py files from @shared_task / @app.task to @worker_task (api-deployment, callback, executor, file_processing, general, ide_callback, log_consumer, notification, scheduler).
  • Migrate the 2 raw current_app.send_task(...) call sites to dispatch(...):
    • shared/patterns/notification/helper.py:76
    • scheduler/tasks.py:157
  • New tests/test_queue_backend_seam.py (15 equivalence tests) + updated tests/test_dispatch_sites_characterisation.py (13 tests).
  • Out of scope but bundled for convenience (see UN-3497 Jira comment for rationale): new workers/ide_callback/worker.py (parity with other worker dirs); workers/run-worker.sh DX refactor (-L/-C/-r flags, anchored pgrep, ide_callback wiring).

Why

The PG Queue migration (UN-3445) is rolling out as a per-task, feature-flagged Strangler-Fig swap, not a big-bang. To make per-task routing possible without a codebase-wide rewrite later, every task registration and every dispatch needs to go through one chokepoint. This PR creates that chokepoint and migrates every existing site to it, while keeping Celery 100% in charge of runtime behaviour today.

How

  • @worker_task is a transparent passthrough to @shared_task. Both call forms (bare and parameterised) are forwarded as-is via *args, **kwargs.
  • dispatch(task_name, *, args, kwargs, queue) is a transparent passthrough to current_app.send_task(...). The signature exposes only what existing call sites actually use — more Celery options can be added when a real caller needs them.
  • The future PG Queue branch (later Phase) lands inside queue_backend/ only; call sites stay untouched.
  • Worker worker.py files swap from celery import shared_task for from queue_backend import worker_task so the import is uniform.

Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)

No.

  • Behaviour over Celery is unchanged. @worker_task resolves to the same PromiseProxy / decorator factory as @shared_task, and dispatch() produces an identical current_app.send_task(...) call.
  • tests/test_queue_backend_seam.py proves the equivalence at the unit level (15 tests).
  • tests/test_dispatch_sites_characterisation.py proves the two migrated call sites produce identical task name / args / kwargs / queue shape pre- vs post-migration (13 tests), and includes an inventory canary that fails if a raw send_task is ever added outside the seam.
  • No queue names, task names, or routing change.
  • No database, schema, or env-var changes.

Database Migrations

  • None.

Env Config

  • None in this PR. A later Phase (PG Queue Gate) will introduce WORKER_PG_QUEUE_ENABLED_TASKS / WORKER_PG_QUEUE_ENABLED_ORGS inside queue_backend/; default empty preserves Celery routing for every task.

Relevant Docs

  • N/A. Module-level docstrings inside queue_backend/ document the current passthrough behaviour and the future routing branch.

Related Issues or PRs

Dependencies Versions

  • None. Pure-Python passthrough over existing Celery primitives.

Notes on Testing

```
cd workers
.venv/bin/python -m pytest tests/test_queue_backend_seam.py \
tests/test_dispatch_sites_characterisation.py

28 passed in ~2s

```

Verification greps (should be empty / scoped):

```
grep -rn "current_app.send_task" workers/ # only inside queue_backend/
grep -rEn "@shared_task|@app.task" workers/*/tasks.py # empty
```

Screenshots

N/A (no UI surface).

Checklist

I have read and understood the Contribution Guidelines.

…ites

Wrap @shared_task and current_app.send_task behind a single
workers/queue_backend/ module so a later phase can route specific tasks
through PG Queue without touching call sites. Pure passthrough today —
behavioural no-op over Celery, locked in by 28 equivalence and
characterisation tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4735f862-c2a3-445d-9fee-3f799ee7a67c

📥 Commits

Reviewing files that changed from the base of the PR and between 25b67a3 and c6af203.

📒 Files selected for processing (27)
  • workers/api-deployment/tasks.py
  • workers/api-deployment/worker.py
  • workers/callback/tasks.py
  • workers/callback/worker.py
  • workers/executor/tasks.py
  • workers/executor/worker.py
  • workers/file_processing/structure_tool_task.py
  • workers/file_processing/tasks.py
  • workers/file_processing/worker.py
  • workers/general/tasks.py
  • workers/general/worker.py
  • workers/ide_callback/tasks.py
  • workers/ide_callback/worker.py
  • workers/log_consumer/tasks.py
  • workers/log_consumer/worker.py
  • workers/notification/tasks.py
  • workers/notification/worker.py
  • workers/pyproject.toml
  • workers/queue_backend/__init__.py
  • workers/queue_backend/decorator.py
  • workers/queue_backend/dispatch.py
  • workers/run-worker.sh
  • workers/scheduler/tasks.py
  • workers/scheduler/worker.py
  • workers/shared/patterns/notification/helper.py
  • workers/tests/test_dispatch_sites_characterisation.py
  • workers/tests/test_queue_backend_seam.py

Summary by CodeRabbit

  • Chores
    • Internal improvements to task execution and job queuing systems for enhanced platform reliability and maintainability, modernizing how the system manages background processing and worker coordination.
    • Expanded test suite coverage for task dispatching and queue management to ensure system stability and correct operation of background jobs.

Walkthrough

This PR introduces a task-registration abstraction seam (queue_backend) that wraps Celery task decorators and dispatch calls, enabling future transitions to alternative job substrates (e.g., PG Queue) without rewriting call sites. All worker tasks migrate from direct Celery decorators to @worker_task, dispatch sites route through dispatch(), an IDE callback worker is added, and run-worker.sh gains log/restart management features.

Changes

Queue Backend Seam & Worker Task Unification

Layer / File(s) Summary
Queue Backend Seam Foundation
workers/queue_backend/__init__.py, workers/queue_backend/decorator.py, workers/queue_backend/dispatch.py
New queue_backend package acts as stable abstraction: worker_task wraps celery.shared_task, dispatch() wraps celery.current_app.send_task. Both forward all arguments unchanged, preparing codebase for future non-Celery substrate routing.
Task Decorator Migration - Core Workers
workers/api-deployment/, workers/callback/, workers/executor/, workers/file_processing/, workers/general/, workers/log_consumer/, workers/scheduler/ tasks.py & worker.py
All worker task registration across 10 modules switched from @app.task/@shared_task to @worker_task. Decorator parameters, signatures, retry/monitoring logic, and health-check implementations remain unchanged.
IDE Callback Worker & Tasks
workers/ide_callback/tasks.py, workers/ide_callback/worker.py
New IDE callback worker module with 6 tasks registered via @worker_task. Health check logic determines HEALTHY/DEGRADED by testing singleton API client availability; registered with WorkerRegistry on import.
Notification System Task & Dispatch Migration
workers/notification/tasks.py, workers/notification/worker.py, workers/shared/patterns/notification/helper.py
Notification tasks (process_notification, send_webhook_notification, etc.) switched to @worker_task. Notification helper dispatch call updated from current_app.send_task() to dispatch() while preserving task name, args/kwargs structure, and routing to notifications queue.
Dispatch Site Tests & Seam Characterization
workers/tests/test_dispatch_sites_characterisation.py, workers/tests/test_queue_backend_seam.py
Existing dispatch tests refactored to patch through queue_backend.dispatch() seam instead of raw Celery. New test module verifies worker_task and dispatch() forward to Celery identically. AST-based canary forbids raw send_task/apply_async outside queue_backend/.
Worker Script IDE Callback Support & Enhanced Management
workers/run-worker.sh
IDE callback worker registered with queue/health-port mappings and included in all startup set. Anchored hostname-based process detection replaces simple grepping. New -L/--logs (tail), -C/--clear-logs (delete), and -r/--restart modes added. Kill logic refactored with SIGTERM→SIGKILL escalation and process verification. Health-port and concurrency wiring extended for IDE callback.
Configuration & Tooling Updates
workers/pyproject.toml
queue_backend added to isort known_first_party, pytest --cov options, and coverage.run source list for consistent import organization and instrumentation.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 79.21% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'UN-3497 [FEAT] Introduce queue_backend seam and migrate Celery call sites' clearly and specifically describes the main changes: introducing a new queue_backend module and migrating Celery call sites to use it.
Description check ✅ Passed The PR description comprehensively addresses all required template sections: What (new queue_backend module and migrations), Why (enabling per-task PG Queue routing), How (transparent passthroughs), breaking changes assessment (no), database/env changes (none), testing notes, and related issues.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3497-queue-backend-seam

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comprehensive PR Review — UN-3497 queue_backend seam

Review run via PR Review Toolkit: Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, Code Simplifier — scoped to commit 13be391a (not the merge-from-main noise in the full main...HEAD diff). Also a regression sweep on the @shared_task → @worker_task swap and the run-worker.sh DX refactor.

Verdict

The Python seam itself is cleanworker_task and dispatch are honest passthroughs, exception propagation is preserved, and the 28-test lockdown is the right shape for proving equivalence when the PG Queue routing PR lands. The two genuine risks live outside the seam: (a) run-worker.sh:300 anchored pgrep regresses default local runs (status / restart / kill_one silently report STOPPED for workers that are running), and (b) the @worker_task equivalence tests are too weak — they would pass if the decorator dropped **kwargs entirely, silently stripping every autoretry_for / max_retries policy on every migrated task.

Priority

  • Critical (fix before merge): 3 — pgrep regression, weak @worker_task equivalence, missing decorator-kwarg passthrough test.
  • Important: 5 — kill_workers broad pgrep, silent kill EPERM, missing ide_callback healthcheck task, inventory canary blind spots, characterisation tests becoming tautological after seam patch.
  • Suggestion: 5 — dispatch() redundant is not None coercion, dispatch -> Protocol[DispatchHandle] instead of Any, three comment-rot fixes (stale line ref, "PR #15" placeholder, WORKER_PG_QUEUE_ENABLED_TASKS name pin).

Individual findings as inline comments below. Each is independent — feel free to defer Suggestions and ship after Critical+Important are addressed.

Comment thread workers/run-worker.sh Outdated
Comment thread workers/run-worker.sh Outdated
Comment thread workers/run-worker.sh Outdated
Comment thread workers/ide_callback/worker.py
Comment thread workers/queue_backend/dispatch.py Outdated
Comment thread workers/tests/test_queue_backend_seam.py Outdated
Comment thread workers/tests/test_queue_backend_seam.py Outdated
Comment thread workers/tests/test_queue_backend_seam.py Outdated
Comment thread workers/tests/test_dispatch_sites_characterisation.py Outdated
Comment thread workers/tests/test_dispatch_sites_characterisation.py Outdated
Critical:
* run-worker.sh:300 pgrep regression — anchored matcher required a
  trailing `-` after `-worker`, but default hostname is
  `${type}-worker@%h` (no dash unless WORKER_INSTANCE_ID set). Status,
  restart, and kill_one_worker silently reported STOPPED for any worker
  not setting that env var. Pattern now matches `(@|-)`.
* test_queue_backend_seam.py — replaced `hasattr(t, "apply")` tautology
  with real Celery registration assertions (force PromiseProxy
  resolution, verify `current_app.tasks` membership, round-trip via
  `.apply().get()`).
* test_queue_backend_seam.py — added explicit decorator-kwarg
  passthrough test pinning `name`, `bind`, `autoretry_for`,
  `max_retries`, `default_retry_delay`. Guards against a refactor like
  `return shared_task(*args)` silently stripping every retry policy.

Important:
* run-worker.sh:423 kill_workers — bulk path used loose
  `pgrep -f "uv run celery.*worker"` which would kill unrelated celery
  procs. Iterate canonical dir list and delegate to kill_one_worker so
  the anchored matcher is the single source of truth.
* run-worker.sh:414 silent EPERM — dropped `2>/dev/null || true` from
  kill invocations; re-check PIDs after sleep; warn on survivors.
* test_dispatch_sites_characterisation.py — replaced regex-based
  inventory canary with AST walker. Now catches aliased imports
  (`from celery import current_app as app`), locally-constructed apps
  (`Celery(...).send_task(...)`), and `apply_async`. Excludes
  `plugins/manual_review` (pre-existing `.apply_async` baggage, out of
  this PR's scope).
* test_dispatch_sites_characterisation.py — added
  `test_substrate_failure_surfaces_as_false` patching at the substrate
  boundary (`queue_backend.dispatch.current_app.send_task`) with
  `ConnectionError`. Complements the helper-side test so the full
  helper → seam → broker failure path is exercised without either side
  being trivially mocked.

Suggestions:
* dispatch.py — dropped redundant `args is not None else []` coercion;
  Celery normalises `None` internally. Avoids subtle list-vs-tuple
  default behaviour change for third-party routers checking
  `isinstance(args, tuple)`.
* dispatch.py — return type Any → `DispatchHandle` Protocol exposing
  `.id: str` (already in use at scheduler/tasks.py). Documents the
  real invariant the PG Queue handle will also need. Relaxed
  `list[Any]` → `Sequence[Any]`, `dict[str, Any]` → `Mapping[str, Any]`.
* Removed `PR #15` placeholder + env var name pinning
  (`WORKER_PG_QUEUE_ENABLED_TASKS`) from docstrings — describe
  behaviour, not unstable identifiers.
* test_queue_backend_seam.py:11 — replaced stale `scheduler/tasks.py:157`
  line ref with symbol-based citation
  (`_execute_scheduled_workflow` / `send_notification_to_worker`).

SonarCloud (S1192):
* run-worker.sh — defined `readonly IDE_CALLBACK_WORKER_TYPE` constant
  and replaced the 8 literal occurrences, mirroring the existing
  `EXECUTOR_WORKER_TYPE` pattern.

Test count: 28 → 31 (seam 15→17, characterisation 13→14).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review feedback addressed — c84a3e8c (now 658ea89e post-rebase onto main)

Below is a one-to-one accounting against each inline finding. 12 of 13 implemented; 1 pushed back on with rationale.

Critical (3/3 fixed)

# File Status Notes
1 run-worker.sh:300 pgrep regression ✅ Fixed Pattern now `[^[:alnum:]_]${worker_type}-worker(@
2 test_queue_backend_seam.py:178 tautological hasattr ✅ Fixed Replaced with PromiseProxy resolution (.name), current_app.tasks membership check, and round-trip via .apply().get(). MagicMock will no longer satisfy these.
3 test_queue_backend_seam.py:201 decorator kwargs never asserted ✅ Fixed New test_forwards_decorator_kwargs pins name, bind, autoretry_for=(ValueError,), max_retries=7, default_retry_delay=42. A regression like return shared_task(*args) would now fail loudly.

Important (4/5 fixed, 1 pushed back)

# File Status Notes
4 run-worker.sh:423 kill_workers loose pgrep ✅ Fixed Now iterates list_core_worker_dirs + list_pluggable_worker_dirs and delegates to kill_one_worker. Anchored matcher is the single source of truth.
5 run-worker.sh:414 silent EPERM ✅ Fixed Dropped 2>/dev/null ‖ true; re-get_worker_pids after sleep; SIGKILL survivors; warn on persistent processes; bubble exit code.
6 ide_callback/worker.py missing healthcheck Celery task Pushed back — see below.
7 Inventory canary blind spots ✅ Fixed Replaced regex with AST walker (DispatchCallFinder). Catches current_app.send_task, aliased imports, Celery(...).send_task, AND .apply_async. Pre-existing plugins/manual_review/.apply_async baggage is on a documented exception list — that's an unrelated migration, will file a follow-up Jira task.
8 test_dispatch_returns_false_on_send_task_failure tautology ✅ Fixed Renamed to test_helper_swallows_dispatch_exception (it's still a valid Python error-handling assertion). Added complement test_substrate_failure_surfaces_as_false that leaves dispatch unpatched and patches queue_backend.dispatch.current_app.send_task with ConnectionError. The full helper → seam → broker failure path is now exercised without either side being trivially mocked.

Suggestions (5/5 fixed)

# File Status Notes
9 dispatch.py:43 redundant coercion ✅ Fixed Removed if args is not None else [] (and the kwargs counterpart). Celery normalises None internally. Avoids the subtle list-vs-tuple default behaviour change. Seam test renamed to test_omitted_args_forwarded_as_none to pin the new behaviour.
10 dispatch.py:26 return type Any ✅ Fixed Added DispatchHandle Protocol exposing .id: str. Documents the real invariant (already in use at scheduler/tasks.py). Also relaxed list[Any]Sequence[Any], dict[str, Any]Mapping[str, Any].
11 decorator.py:30 "PR #15" placeholder ✅ Fixed Removed the identifier in both decorator.py docstring and the TestWorkerTaskEquivalence class docstring. Now just "a later phase".
12 __init__.py:13 + dispatch.py:5 env var name pinning ✅ Fixed Replaced with behaviour-only description: "A later phase will route specific tasks through a non-Celery substrate (PG Queue) based on configuration; until then everything goes to Celery."
13 test_queue_backend_seam.py:11 stale line ref ✅ Fixed Replaced scheduler/tasks.py:157 with symbol-based citation (_execute_scheduled_workflow / send_notification_to_worker). Same idiom as test_dispatch_sites_characterisation.py.

SonarCloud (1/1 fixed)

# File Status Notes
14 run-worker.sh:609 S1192 literal duplication ✅ Fixed Defined readonly IDE_CALLBACK_WORKER_TYPE="ide_callback" and replaced all 8 literal usages, mirroring the existing EXECUTOR_WORKER_TYPE pattern. Only remaining occurrences are the constant definition itself and one comment.

Push-back on #6 (missing @worker_task healthcheck in ide_callback/worker.py)

This was a deliberate omission, not an oversight. Earlier in the session we walked through whether the healthcheck Celery task in the other 8 worker.py modules is actually load-bearing. Findings:

  • No producer enqueues it. Grep across workers/ and backend/ for send_task("…healthcheck…"), dispatch("…healthcheck…"), apply_async, and any .healthcheck invocation returns nothing. The only consumer is tests/test_callback_sanity.py, which is scoped to the callback worker's app (not ide_callback).
  • K8s liveness path uses the Python function, not the Celery task. WorkerBuilder.setup_health_monitoring reads WorkerRegistry.get_health_checks(worker_type) and binds the registered check_*_health callables to HealthServer on port 8089. The @worker_task healthcheck Celery task is on a different path entirely.
  • run-worker.sh -s does pgrep on the worker process — never invokes the task.

The reviewer's "Effect: anything that does app.send_task('healthcheck', queue='ide_callback') will silently fail to route" is a hypothetical — there's no such caller in-tree, and producing one would require the seam-bypass that the canary now actively forbids.

I called this out explicitly in the original PR-scope Jira comment on UN-3497: "the Celery @worker_task healthcheck boilerplate that exists in the other 8 worker.py files was deliberately dropped here". The reviewer-flagged "inconsistency" is real, but the consistency is with vestigial code in the other 8 modules — the cleanup direction is to remove them, not to add a 9th. I'll file a separate ticket for that cleanup if there's appetite; this PR isn't the right home for it.


Test count

  • test_queue_backend_seam.py: 15 → 17 tests
  • test_dispatch_sites_characterisation.py: 13 → 14 tests
  • Total: 28 → 31 — all green in 1.82s

muhammad-ali-e and others added 2 commits June 1, 2026 13:19
Address SonarCloud S1192: ``get_worker_pids ... | tr '\n' ' ' | sed 's/ $//'``
was repeated at 4 call sites in run-worker.sh. Wrap the formatting pipeline
in a single helper so the literal lives in one place.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@sonarqubecloud

sonarqubecloud Bot commented Jun 1, 2026

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 1, 2026 07:53
@github-actions

github-actions Bot commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

Unstract test results

Per-group results

Status Group Tier Passed Failed Errors Skipped Duration (s)
unit-connectors unit 64 12 0 3 17.4
unit-core unit 0 0 2 0 1.2
unit-platform-service unit 9 0 1 0 1.3
unit-prompt-service unit 15 0 0 0 19.0
unit-rig unit 53 0 0 0 3.3
unit-runner unit 11 0 0 0 3.4
unit-sdk1 unit 354 0 0 0 20.5
unit-tool-registry unit 0 0 1 0 1.3
unit-workers unit 0 0 0 0 18.1
TOTAL 506 12 4 3 85.6

Critical paths

⚠️ Critical paths not yet covered

  • auth-login — User can log in and obtain a session cookie. (entry: POST /api/v1/auth/login; declared coverage: no groups declared)
  • adapter-register-llm — Register and validate an LLM adapter. (entry: POST /api/v1/adapter/; declared coverage: no groups declared)
  • workflow-create-execute — Create a workflow, configure source+destination, execute, poll, fetch result. (entry: POST /api/v1/workflow/{id}/execute/; declared coverage: e2e-workflow)
  • api-deployment-run — Deploy a workflow as an API, POST a document, receive structured JSON. (entry: POST /deployment/api/{org}/{name}/; declared coverage: e2e-api-deployment)
  • prompt-studio-fetch-response — Prompt Studio: create project, add prompt, run single-pass, get response. (entry: POST /api/v1/prompt-studio/prompt-studio-tool/{id}/fetch_response/; declared coverage: e2e-prompt-studio)
  • pipeline-etl-execute — Run an ETL pipeline from source connector to destination. (entry: POST /api/v1/pipeline/{id}/execute/; declared coverage: no groups declared)
  • usage-token-tracking — Per-execution token usage is recorded and retrievable. (entry: GET /api/v1/usage/get_token_usage/; declared coverage: no groups declared)
  • workflow-execution-fan-out — Multi-file workflow execution fans out to file-processing workers and rejoins. (entry: internal: backend → rabbitmq → workers/file_processing; declared coverage: no groups declared)
  • callback-result-delivery — Async results are posted back via the callback worker. (entry: internal: workers/callback → backend /internal endpoints; declared coverage: no groups declared)
✅ Covered critical paths
  • tool-sandbox-exec — covered by unit-runner

@greptile-apps

greptile-apps Bot commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces a queue_backend/ seam module that wraps Celery task registration (@worker_task) and dispatch (dispatch()) behind a single chokepoint, then migrates all 9 worker tasks.py files and the 2 raw current_app.send_task(...) call sites to use it. Both primitives are transparent pass-throughs today, keeping runtime behaviour identical while enabling a future per-task PG Queue routing phase to land entirely inside queue_backend/ without touching call sites.

  • queue_backend/decorator.py and queue_backend/dispatch.py are minimal, well-documented pass-throughs to celery.shared_task and current_app.send_task respectively; DispatchHandle Protocol cleanly documents the .id contract.
  • All 9 worker tasks.py files are migrated from @shared_task / @app.task to @worker_task; the two raw send_task dispatch sites are migrated to dispatch(); and a new ide_callback/worker.py entry-point is added for parity.
  • Test coverage is thorough: 15 equivalence tests lock the seam's pass-through contract and an AST-walking canary in test_dispatch_sites_characterisation.py will fail loudly if any raw send_task/apply_async call reappears outside queue_backend/.

Confidence Score: 5/5

Safe to merge — the seam is a transparent pass-through with no behavioural change; every existing task name, queue name, and argument shape is preserved.

Both primitives delegate directly to the underlying Celery calls with no transformation. The 15-test equivalence suite verifies the seam contract end-to-end, and the AST canary guards against future regressions. The only finding is a leftover unused import in ide_callback/tasks.py that has no runtime impact.

workers/ide_callback/tasks.py — stale from celery import current_app as app import.

Important Files Changed

Filename Overview
workers/queue_backend/decorator.py New seam decorator — transparent passthrough to shared_task supporting both bare and parameterised call forms.
workers/queue_backend/dispatch.py New dispatch seam — keyword-only wrapper over current_app.send_task with a minimal DispatchHandle Protocol for the return type.
workers/queue_backend/init.py Public surface re-exports dispatch and worker_task; __all__ is pinned and tested.
workers/ide_callback/tasks.py All six @app.task(...) decorators migrated to @worker_task(...), but the now-unused from celery import current_app as app import was not removed.
workers/scheduler/tasks.py current_app.send_task replaced with dispatch(); task decorators migrated to @worker_task; call shape (args, kwargs, queue) is unchanged.
workers/shared/patterns/notification/helper.py current_app.send_task replaced with dispatch() in send_notification_to_worker; call shape is identical to the previous raw call.
workers/tests/test_queue_backend_seam.py 15 equivalence tests covering dispatch forwarding, worker_task registration, and public surface — comprehensive and well-structured.
workers/tests/test_dispatch_sites_characterisation.py 13 characterisation tests pinning both dispatch sites plus an AST-walking canary that will fail if a raw send_task/apply_async reappears outside queue_backend/.
workers/ide_callback/worker.py New worker entry-point added for parity with other worker dirs; health-check pattern mirrors the existing workers.
workers/run-worker.sh DX refactor: adds -L/-C/-r flags, anchored pgrep to prevent callback/ide_callback collisions, and wires ide_callback into all lifecycle paths.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    subgraph CallSites["Call Sites"]
        NT["notification/helper.py\nsend_notification_to_worker"]
        SC["scheduler/tasks.py\n_execute_scheduled_workflow"]
        WT["worker tasks.py x9\n@worker_task decorator"]
    end

    subgraph Seam["queue_backend seam"]
        D["dispatch\ntask_name args kwargs queue"]
        DEC["worker_task\n*args **kwargs"]
    end

    subgraph Celery["Celery substrate today"]
        ST["current_app.send_task"]
        SHT["celery.shared_task"]
    end

    NT --> D
    SC --> D
    WT --> DEC
    D --> ST
    DEC --> SHT
Loading

Fix All in Claude Code

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
workers/ide_callback/tasks.py:17-18
Unused import left over from the migration. All `@app.task(...)` decorators in this file have been replaced with `@worker_task(...)`, so `current_app as app` is no longer referenced and can be removed.

```suggestion
from queue_backend import worker_task
```

Reviews (1): Last reviewed commit: "UN-3497 [FIX] Extract get_worker_pids_on..." | Re-trigger Greptile

Comment thread workers/ide_callback/tasks.py
@chandrasekharan-zipstack chandrasekharan-zipstack merged commit 9a24689 into main Jun 1, 2026
17 checks passed
@chandrasekharan-zipstack chandrasekharan-zipstack deleted the UN-3497-queue-backend-seam branch June 1, 2026 09:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Morty Proxy This is a proxified and sanitized view of the page, visit original site.