UN-3497 [FEAT] Introduce queue_backend seam and migrate Celery call sites#2001
UN-3497 [FEAT] Introduce queue_backend seam and migrate Celery call sites#2001chandrasekharan-zipstack merged 5 commits intomainZipstack/unstract:mainfrom UN-3497-queue-backend-seamZipstack/unstract:UN-3497-queue-backend-seamCopy head branch name to clipboard
Conversation
…ites Wrap @shared_task and current_app.send_task behind a single workers/queue_backend/ module so a later phase can route specific tasks through PG Queue without touching call sites. Pure passthrough today — behavioural no-op over Celery, locked in by 28 equivalence and characterisation tests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (27)
Summary by CodeRabbit
WalkthroughThis PR introduces a task-registration abstraction seam ( ChangesQueue Backend Seam & Worker Task Unification
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Comprehensive PR Review — UN-3497 queue_backend seam
Review run via PR Review Toolkit: Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, Code Simplifier — scoped to commit 13be391a (not the merge-from-main noise in the full main...HEAD diff). Also a regression sweep on the @shared_task → @worker_task swap and the run-worker.sh DX refactor.
Verdict
The Python seam itself is clean — worker_task and dispatch are honest passthroughs, exception propagation is preserved, and the 28-test lockdown is the right shape for proving equivalence when the PG Queue routing PR lands. The two genuine risks live outside the seam: (a) run-worker.sh:300 anchored pgrep regresses default local runs (status / restart / kill_one silently report STOPPED for workers that are running), and (b) the @worker_task equivalence tests are too weak — they would pass if the decorator dropped **kwargs entirely, silently stripping every autoretry_for / max_retries policy on every migrated task.
Priority
- Critical (fix before merge): 3 — pgrep regression, weak @worker_task equivalence, missing decorator-kwarg passthrough test.
- Important: 5 —
kill_workersbroad pgrep, silent kill EPERM, missingide_callbackhealthcheck task, inventory canary blind spots, characterisation tests becoming tautological after seam patch. - Suggestion: 5 —
dispatch()redundantis not Nonecoercion,dispatch -> Protocol[DispatchHandle]instead ofAny, three comment-rot fixes (stale line ref, "PR #15" placeholder,WORKER_PG_QUEUE_ENABLED_TASKSname pin).
Individual findings as inline comments below. Each is independent — feel free to defer Suggestions and ship after Critical+Important are addressed.
Critical:
* run-worker.sh:300 pgrep regression — anchored matcher required a
trailing `-` after `-worker`, but default hostname is
`${type}-worker@%h` (no dash unless WORKER_INSTANCE_ID set). Status,
restart, and kill_one_worker silently reported STOPPED for any worker
not setting that env var. Pattern now matches `(@|-)`.
* test_queue_backend_seam.py — replaced `hasattr(t, "apply")` tautology
with real Celery registration assertions (force PromiseProxy
resolution, verify `current_app.tasks` membership, round-trip via
`.apply().get()`).
* test_queue_backend_seam.py — added explicit decorator-kwarg
passthrough test pinning `name`, `bind`, `autoretry_for`,
`max_retries`, `default_retry_delay`. Guards against a refactor like
`return shared_task(*args)` silently stripping every retry policy.
Important:
* run-worker.sh:423 kill_workers — bulk path used loose
`pgrep -f "uv run celery.*worker"` which would kill unrelated celery
procs. Iterate canonical dir list and delegate to kill_one_worker so
the anchored matcher is the single source of truth.
* run-worker.sh:414 silent EPERM — dropped `2>/dev/null || true` from
kill invocations; re-check PIDs after sleep; warn on survivors.
* test_dispatch_sites_characterisation.py — replaced regex-based
inventory canary with AST walker. Now catches aliased imports
(`from celery import current_app as app`), locally-constructed apps
(`Celery(...).send_task(...)`), and `apply_async`. Excludes
`plugins/manual_review` (pre-existing `.apply_async` baggage, out of
this PR's scope).
* test_dispatch_sites_characterisation.py — added
`test_substrate_failure_surfaces_as_false` patching at the substrate
boundary (`queue_backend.dispatch.current_app.send_task`) with
`ConnectionError`. Complements the helper-side test so the full
helper → seam → broker failure path is exercised without either side
being trivially mocked.
Suggestions:
* dispatch.py — dropped redundant `args is not None else []` coercion;
Celery normalises `None` internally. Avoids subtle list-vs-tuple
default behaviour change for third-party routers checking
`isinstance(args, tuple)`.
* dispatch.py — return type Any → `DispatchHandle` Protocol exposing
`.id: str` (already in use at scheduler/tasks.py). Documents the
real invariant the PG Queue handle will also need. Relaxed
`list[Any]` → `Sequence[Any]`, `dict[str, Any]` → `Mapping[str, Any]`.
* Removed `PR #15` placeholder + env var name pinning
(`WORKER_PG_QUEUE_ENABLED_TASKS`) from docstrings — describe
behaviour, not unstable identifiers.
* test_queue_backend_seam.py:11 — replaced stale `scheduler/tasks.py:157`
line ref with symbol-based citation
(`_execute_scheduled_workflow` / `send_notification_to_worker`).
SonarCloud (S1192):
* run-worker.sh — defined `readonly IDE_CALLBACK_WORKER_TYPE` constant
and replaced the 8 literal occurrences, mirroring the existing
`EXECUTOR_WORKER_TYPE` pattern.
Test count: 28 → 31 (seam 15→17, characterisation 13→14).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Review feedback addressed — c84a3e8c (now
|
| # | File | Status | Notes |
|---|---|---|---|
| 1 | run-worker.sh:300 pgrep regression |
✅ Fixed | Pattern now `[^[:alnum:]_]${worker_type}-worker(@ |
| 2 | test_queue_backend_seam.py:178 tautological hasattr |
✅ Fixed | Replaced with PromiseProxy resolution (.name), current_app.tasks membership check, and round-trip via .apply().get(). MagicMock will no longer satisfy these. |
| 3 | test_queue_backend_seam.py:201 decorator kwargs never asserted |
✅ Fixed | New test_forwards_decorator_kwargs pins name, bind, autoretry_for=(ValueError,), max_retries=7, default_retry_delay=42. A regression like return shared_task(*args) would now fail loudly. |
Important (4/5 fixed, 1 pushed back)
| # | File | Status | Notes |
|---|---|---|---|
| 4 | run-worker.sh:423 kill_workers loose pgrep |
✅ Fixed | Now iterates list_core_worker_dirs + list_pluggable_worker_dirs and delegates to kill_one_worker. Anchored matcher is the single source of truth. |
| 5 | run-worker.sh:414 silent EPERM |
✅ Fixed | Dropped 2>/dev/null ‖ true; re-get_worker_pids after sleep; SIGKILL survivors; warn on persistent processes; bubble exit code. |
| 6 | ide_callback/worker.py missing healthcheck Celery task |
❌ Pushed back — see below. | |
| 7 | Inventory canary blind spots | ✅ Fixed | Replaced regex with AST walker (DispatchCallFinder). Catches current_app.send_task, aliased imports, Celery(...).send_task, AND .apply_async. Pre-existing plugins/manual_review/.apply_async baggage is on a documented exception list — that's an unrelated migration, will file a follow-up Jira task. |
| 8 | test_dispatch_returns_false_on_send_task_failure tautology |
✅ Fixed | Renamed to test_helper_swallows_dispatch_exception (it's still a valid Python error-handling assertion). Added complement test_substrate_failure_surfaces_as_false that leaves dispatch unpatched and patches queue_backend.dispatch.current_app.send_task with ConnectionError. The full helper → seam → broker failure path is now exercised without either side being trivially mocked. |
Suggestions (5/5 fixed)
| # | File | Status | Notes |
|---|---|---|---|
| 9 | dispatch.py:43 redundant coercion |
✅ Fixed | Removed if args is not None else [] (and the kwargs counterpart). Celery normalises None internally. Avoids the subtle list-vs-tuple default behaviour change. Seam test renamed to test_omitted_args_forwarded_as_none to pin the new behaviour. |
| 10 | dispatch.py:26 return type Any |
✅ Fixed | Added DispatchHandle Protocol exposing .id: str. Documents the real invariant (already in use at scheduler/tasks.py). Also relaxed list[Any] → Sequence[Any], dict[str, Any] → Mapping[str, Any]. |
| 11 | decorator.py:30 "PR #15" placeholder |
✅ Fixed | Removed the identifier in both decorator.py docstring and the TestWorkerTaskEquivalence class docstring. Now just "a later phase". |
| 12 | __init__.py:13 + dispatch.py:5 env var name pinning |
✅ Fixed | Replaced with behaviour-only description: "A later phase will route specific tasks through a non-Celery substrate (PG Queue) based on configuration; until then everything goes to Celery." |
| 13 | test_queue_backend_seam.py:11 stale line ref |
✅ Fixed | Replaced scheduler/tasks.py:157 with symbol-based citation (_execute_scheduled_workflow / send_notification_to_worker). Same idiom as test_dispatch_sites_characterisation.py. |
SonarCloud (1/1 fixed)
| # | File | Status | Notes |
|---|---|---|---|
| 14 | run-worker.sh:609 S1192 literal duplication |
✅ Fixed | Defined readonly IDE_CALLBACK_WORKER_TYPE="ide_callback" and replaced all 8 literal usages, mirroring the existing EXECUTOR_WORKER_TYPE pattern. Only remaining occurrences are the constant definition itself and one comment. |
Push-back on #6 (missing @worker_task healthcheck in ide_callback/worker.py)
This was a deliberate omission, not an oversight. Earlier in the session we walked through whether the healthcheck Celery task in the other 8 worker.py modules is actually load-bearing. Findings:
- No producer enqueues it. Grep across
workers/andbackend/forsend_task("…healthcheck…"),dispatch("…healthcheck…"),apply_async, and any.healthcheckinvocation returns nothing. The only consumer istests/test_callback_sanity.py, which is scoped to the callback worker's app (notide_callback). - K8s liveness path uses the Python function, not the Celery task.
WorkerBuilder.setup_health_monitoringreadsWorkerRegistry.get_health_checks(worker_type)and binds the registeredcheck_*_healthcallables toHealthServeron port 8089. The@worker_task healthcheckCelery task is on a different path entirely. run-worker.sh -sdoespgrepon the worker process — never invokes the task.
The reviewer's "Effect: anything that does app.send_task('healthcheck', queue='ide_callback') will silently fail to route" is a hypothetical — there's no such caller in-tree, and producing one would require the seam-bypass that the canary now actively forbids.
I called this out explicitly in the original PR-scope Jira comment on UN-3497: "the Celery @worker_task healthcheck boilerplate that exists in the other 8 worker.py files was deliberately dropped here". The reviewer-flagged "inconsistency" is real, but the consistency is with vestigial code in the other 8 modules — the cleanup direction is to remove them, not to add a 9th. I'll file a separate ticket for that cleanup if there's appetite; this PR isn't the right home for it.
Test count
test_queue_backend_seam.py: 15 → 17 teststest_dispatch_sites_characterisation.py: 13 → 14 tests- Total: 28 → 31 — all green in 1.82s
Address SonarCloud S1192: ``get_worker_pids ... | tr '\n' ' ' | sed 's/ $//'`` was repeated at 4 call sites in run-worker.sh. Wrap the formatting pipeline in a single helper so the literal lives in one place. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Unstract test resultsPer-group results
Critical paths
|
|
| Filename | Overview |
|---|---|
| workers/queue_backend/decorator.py | New seam decorator — transparent passthrough to shared_task supporting both bare and parameterised call forms. |
| workers/queue_backend/dispatch.py | New dispatch seam — keyword-only wrapper over current_app.send_task with a minimal DispatchHandle Protocol for the return type. |
| workers/queue_backend/init.py | Public surface re-exports dispatch and worker_task; __all__ is pinned and tested. |
| workers/ide_callback/tasks.py | All six @app.task(...) decorators migrated to @worker_task(...), but the now-unused from celery import current_app as app import was not removed. |
| workers/scheduler/tasks.py | current_app.send_task replaced with dispatch(); task decorators migrated to @worker_task; call shape (args, kwargs, queue) is unchanged. |
| workers/shared/patterns/notification/helper.py | current_app.send_task replaced with dispatch() in send_notification_to_worker; call shape is identical to the previous raw call. |
| workers/tests/test_queue_backend_seam.py | 15 equivalence tests covering dispatch forwarding, worker_task registration, and public surface — comprehensive and well-structured. |
| workers/tests/test_dispatch_sites_characterisation.py | 13 characterisation tests pinning both dispatch sites plus an AST-walking canary that will fail if a raw send_task/apply_async reappears outside queue_backend/. |
| workers/ide_callback/worker.py | New worker entry-point added for parity with other worker dirs; health-check pattern mirrors the existing workers. |
| workers/run-worker.sh | DX refactor: adds -L/-C/-r flags, anchored pgrep to prevent callback/ide_callback collisions, and wires ide_callback into all lifecycle paths. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
subgraph CallSites["Call Sites"]
NT["notification/helper.py\nsend_notification_to_worker"]
SC["scheduler/tasks.py\n_execute_scheduled_workflow"]
WT["worker tasks.py x9\n@worker_task decorator"]
end
subgraph Seam["queue_backend seam"]
D["dispatch\ntask_name args kwargs queue"]
DEC["worker_task\n*args **kwargs"]
end
subgraph Celery["Celery substrate today"]
ST["current_app.send_task"]
SHT["celery.shared_task"]
end
NT --> D
SC --> D
WT --> DEC
D --> ST
DEC --> SHT
Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
workers/ide_callback/tasks.py:17-18
Unused import left over from the migration. All `@app.task(...)` decorators in this file have been replaced with `@worker_task(...)`, so `current_app as app` is no longer referenced and can be removed.
```suggestion
from queue_backend import worker_task
```
Reviews (1): Last reviewed commit: "UN-3497 [FIX] Extract get_worker_pids_on..." | Re-trigger Greptile
What
workers/queue_backend/(decorator.py,dispatch.py,__init__.py) — a single seam wrapping Celery task registration and dispatch.tasks.pyfiles from@shared_task/@app.taskto@worker_task(api-deployment, callback, executor, file_processing, general, ide_callback, log_consumer, notification, scheduler).current_app.send_task(...)call sites todispatch(...):shared/patterns/notification/helper.py:76scheduler/tasks.py:157tests/test_queue_backend_seam.py(15 equivalence tests) + updatedtests/test_dispatch_sites_characterisation.py(13 tests).workers/ide_callback/worker.py(parity with other worker dirs);workers/run-worker.shDX refactor (-L/-C/-rflags, anchored pgrep, ide_callback wiring).Why
The PG Queue migration (UN-3445) is rolling out as a per-task, feature-flagged Strangler-Fig swap, not a big-bang. To make per-task routing possible without a codebase-wide rewrite later, every task registration and every dispatch needs to go through one chokepoint. This PR creates that chokepoint and migrates every existing site to it, while keeping Celery 100% in charge of runtime behaviour today.
How
@worker_taskis a transparent passthrough to@shared_task. Both call forms (bare and parameterised) are forwarded as-is via*args, **kwargs.dispatch(task_name, *, args, kwargs, queue)is a transparent passthrough tocurrent_app.send_task(...). The signature exposes only what existing call sites actually use — more Celery options can be added when a real caller needs them.queue_backend/only; call sites stay untouched.worker.pyfiles swapfrom celery import shared_taskforfrom queue_backend import worker_taskso the import is uniform.Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
No.
@worker_taskresolves to the samePromiseProxy/ decorator factory as@shared_task, anddispatch()produces an identicalcurrent_app.send_task(...)call.tests/test_queue_backend_seam.pyproves the equivalence at the unit level (15 tests).tests/test_dispatch_sites_characterisation.pyproves the two migrated call sites produce identical task name / args / kwargs / queue shape pre- vs post-migration (13 tests), and includes an inventory canary that fails if a rawsend_taskis ever added outside the seam.Database Migrations
Env Config
WORKER_PG_QUEUE_ENABLED_TASKS/WORKER_PG_QUEUE_ENABLED_ORGSinsidequeue_backend/; default empty preserves Celery routing for every task.Relevant Docs
queue_backend/document the current passthrough behaviour and the future routing branch.Related Issues or PRs
Dependencies Versions
Notes on Testing
```
cd workers
.venv/bin/python -m pytest tests/test_queue_backend_seam.py \
tests/test_dispatch_sites_characterisation.py
28 passed in ~2s
```
Verification greps (should be empty / scoped):
```
grep -rn "current_app.send_task" workers/ # only inside queue_backend/
grep -rEn "@shared_task|@app.task" workers/*/tasks.py # empty
```
Screenshots
N/A (no UI surface).
Checklist
I have read and understood the Contribution Guidelines.