[WIP] [FEAT] V1 - Time based trials #13
Closed
harini-venkataraman wants to merge 0 commit into
Closed
[WIP] [FEAT] V1 - Time based trials #13harini-venkataraman wants to merge 0 commit into
harini-venkataraman wants to merge 0 commit into
Conversation
a9b9db3 to
57e454c
Compare
chandrasekharan-zipstack
added a commit
that referenced
this pull request
Apr 17, 2026
… wrappers Address review comments on PR #1886: - #9 (typing): call_with_retry / acall_with_retry / iter_with_retry previously returned `object`, erasing caller type info. Add PEP 695 generics so the return type flows from the wrapped callable: acall_with_retry now takes Callable[[], Awaitable[T]] and iter_with_retry takes Callable[[], Iterable[T]] -> Generator[T, ...]. - #11 / #13 (DRY): `_pop_retry_params` in embedding.py and `_disable_litellm_retry` in llm.py were identical logic. Lift to shared `pop_litellm_retry_kwargs` helper in retry_utils.py and delete both methods. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e
added a commit
that referenced
this pull request
Apr 17, 2026
* [FIX] Unified retry for LLM and embedding providers litellm's retry only works for SDK-based providers (OpenAI/Azure). httpx-based providers (Anthropic, Vertex, Bedrock, Mistral) and ALL embedding calls silently ignore max_retries. This adds self-managed retry with exponential backoff at the SDK layer, disabling litellm's own retry entirely for consistency. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * [REFACTOR] DRY retry logic into reusable call_with_retry utilities Move retry loops out of LLM/Embedding classes into generic call_with_retry, acall_with_retry, and iter_with_retry functions in retry_utils.py. Both classes now call these directly instead of maintaining their own retry helper methods. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * [FIX] Consolidate retry logic, expose max_retries for all adapters - Extract _get_retry_delay() shared helper to eliminate duplicated retry decision logic across call_with_retry, acall_with_retry, iter_with_retry, and retry_with_exponential_backoff - Add num_retries=0 to embedding._pop_retry_params() to fully disable litellm's internal retry for embedding calls - Expose max_retries in UI JSON schemas for embedding adapters (OpenAI, Azure, VertexAI, Ollama) and Ollama LLM — previously the field existed in Pydantic models but wasn't shown to users, silently defaulting to 0 retries - Add debug logging to LLM and Embedding retry parameter extraction - Clarify docstrings distinguishing is_retryable_litellm_error() from is_retryable_error() (different exception hierarchies) - Remove stale noqa: C901 from simplified retry_with_exponential_backoff Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * [FIX] Set max_retries default to 3 for all embedding and Ollama LLM adapters Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * [FIX] Address greptile review: fix shadowed ConnectionError, use MRO check - Fix `requests.ConnectionError` shadowing Python's builtin `ConnectionError` in `is_retryable_litellm_error()` — rename import to `RequestsConnectionError` and use `builtins.ConnectionError` / `builtins.TimeoutError` explicitly - Use `__mro__`-based class name check instead of `type(error).__name__` to also catch subclasses of retryable error types - P1 (num_retries not zeroed) was already fixed in prior commit Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * [FIX] Address CodeRabbit review: add APITimeoutError, validate max_retries - Add APITimeoutError to _RETRYABLE_ERROR_NAMES for explicit OpenAI SDK timeout coverage - Add _validate_max_retries() guard to call_with_retry, acall_with_retry, iter_with_retry to fail fast on negative values instead of silently returning None Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * UN-3344 [FIX] Reduce cognitive complexity and remove useless except clause Address SonarCloud findings on PR #1886: - S3776: Flatten retry_with_exponential_backoff.wrapper by moving the success logging + return out of the try block and using `continue` in the retry path, so the except branch only handles the give-up case. - S2737: Drop the `except Exception: raise` clause — it was a no-op that added complexity without changing behavior (non-matching exceptions propagate naturally). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * UN-3344 [FIX] Extract retry loop to top-level helper to drop cognitive complexity Sonar still flagged retry_with_exponential_backoff at complexity 16 after the previous flatten. Nested def decorator / def wrapper counted against the outer function's score. Move the retry body to a module-level _invoke_with_retries helper so the decorator factory just delegates, bringing the outer function well under the 15 threshold. Behavior is unchanged — all paths (success, retry, give-up, non-retryable propagate) are preserved and covered by the existing SDK1 tests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * UN-3344 [FIX] Honor Retry-After, close stream gen on retry, share give-up log Address review comments on PR #1886: - #10 (resource leak): close the generator returned by fn() before retrying in iter_with_retry — otherwise streaming providers leak an in-flight HTTP socket until GC. - #12 (behavioral regression): when we zero out SDK/wrapper retries we also lose the OpenAI SDK's native Retry-After handling on 429/503. _get_retry_delay now checks error.response.headers["retry-after"] and uses that value ahead of exponential backoff. HTTP-date form is not parsed; those fall back to backoff. - #8 (observability gap): move the "Giving up ... after N attempt(s)" log into _get_retry_delay so all four retry helpers (call_with_retry, acall_with_retry, iter_with_retry, decorator) share the same exhaustion signal. Previously only the decorator path logged it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * UN-3344 [REFACTOR] Share retry-kwargs helper and add TypeVar to retry wrappers Address review comments on PR #1886: - #9 (typing): call_with_retry / acall_with_retry / iter_with_retry previously returned `object`, erasing caller type info. Add PEP 695 generics so the return type flows from the wrapped callable: acall_with_retry now takes Callable[[], Awaitable[T]] and iter_with_retry takes Callable[[], Iterable[T]] -> Generator[T, ...]. - #11 / #13 (DRY): `_pop_retry_params` in embedding.py and `_disable_litellm_retry` in llm.py were identical logic. Lift to shared `pop_litellm_retry_kwargs` helper in retry_utils.py and delete both methods. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: ali <117142933+muhammad-ali-e@users.noreply.github.com>
11 tasks
ritwik-g
pushed a commit
that referenced
this pull request
May 8, 2026
* UN-3450 [FEAT] Add golden-path smoke test for callback worker Adds workers/tests/test_callback_sanity.py mirroring the existing test_executor_sanity.py pattern. Covers: - Worker enums and registry (WorkerType.CALLBACK, QueueName.CALLBACK + CALLBACK_API, TaskName.PROCESS_BATCH_CALLBACK, health port 8083, WorkerRegistry queue config + task routing). - Celery task wiring (process_batch_callback, process_batch_callback_api, healthcheck — registration, retry config, max_retries, autoretry_for). - Full dispatch -> task -> return round-trip via Celery eager mode, using the simple healthcheck task (process_batch_callback itself needs a configured InternalAPIClient + downstream HTTP, which is heavy mocking territory unsuitable for a smoke test — covered later in #1.2 characterisation suite). Coverage gains on callback module: - callback/__init__.py: 0% -> 100% - callback/worker.py: 0% -> 52% - callback/tasks.py: 0% -> 10% - Module total: 0% -> 12% 14 tests, run in ~17s. Regression net for spine PRs #6 (CallbackStatus enum migration) and #13 (chord -> Barrier lift in callback chain). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * UN-3450 [FEAT] Address Greptile review on callback smoke test Two P2 findings from Greptile, both fixed: 1. test_healthcheck_result_is_json_serializable: drop the `default=str` fallback in json.dumps. With it, any non-JSON-serializable value (UUID, datetime, custom object) gets silently coerced to a string — exactly the failure mode the test claims to catch. Without it, the assertion faithfully tests "round-trips cleanly via JSON" the way Celery's serializer would. 2. Add registration check for `process_batch_callback_django_compat` (the backward-compat task name `workflow_manager.workflow_v2. file_execution_tasks.process_batch_callback`). Both this task and `process_batch_callback` delegate to `_process_batch_callback_core`, so refactors that touch the core (e.g. the upcoming CallbackStatus enum migration) affect this path too. Three-line addition. 15 tests now (was 14), runtime 11s. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e
added a commit
that referenced
this pull request
May 27, 2026
…1950) * UN-3452 [FEAT] Characterise the seams: dispatch + chord call sites Sub-task A under #1.2 — characterisation suite for the seams that upcoming spine PRs will refactor. Two new test files, zero production changes. Dispatch seam (unblocks PR #8 — @shared_task -> @worker_task migration): - workers/tests/test_dispatch_sites_characterisation.py (276 lines, 11 tests) - Locks contract on the two raw current_app.send_task call sites: - shared/patterns/notification/helper.py:76 (webhook dispatch) - scheduler/tasks.py:157 (scheduled workflow async dispatch) - Tests pin: task name, positional args layout, kwargs layout, target queue, return-value semantics, error-path behaviour - Inventory canary: fails if a third raw current_app.send_task site appears anywhere in workers/ source Chord seam (unblocks PR #13 — chord -> Barrier lift): - workers/tests/test_chord_sites_characterisation.py (316 lines, 9 tests) - Locks contract on the chord pattern via: - WorkflowOrchestrationUtils.create_chord_execution (centralised helper) - WorkflowOrchestrationMixin.create_chord (mixin wrapper) - Tests pin: empty-batch short-circuit (existing defense against silent task drops at scale — Pain Point #2 in the PG Queue decision doc), callback-signature construction, return-value semantics, error propagation, mixin's app extraction + RuntimeError on missing app - Inventory canaries: fail if a third chord(...) call site OR a third `from celery import chord` import appears anywhere in workers/ source - api-deployment/tasks.py:673 inline chord covered only by inventory (direct unit-testing requires heavy mocking of the 273-line _run_workflow_api enclosing function — out of scope here, the canary still catches it for PR #13) Total: 20 tests, ~2s runtime, 0 production changes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * UN-3452 [FEAT] Address Greptile review on seam characterisation tests Three P2 findings from Greptile, all fixed: 1. test_chord_import_only_in_two_files: add file-identity assertions matching the sibling call-site canary. Without these, the canary would silently pass if the two imports moved to entirely different files while count remained 2 — exactly the silent-miss scenario the Barrier migration could trigger. 2. TestSchedulerDispatchSite: add test_dispatch_returns_error_result_ when_send_task_raises. The scheduler site has a real error branch in scheduler/tasks.py:185-192 that catches send_task exceptions and returns SchedulerExecutionResult.error(...) — without a characterisation test the upcoming dispatch() migration could silently change error semantics (re-raise instead of returning an error result, or swallow silently). Mirrors the equivalent notification-site test_dispatch_returns_false_on_send_task_failure. 3. skip_dirs check anchored to top-level dir relative to workers_root in all three inventory tests. The previous `any(part in skip_dirs for part in py.parts)` check would have erroneously excluded any path with a component named `tests` (e.g. workers/shared/ tests_helpers/foo.py). 21 tests now (was 20), runtime ~3s. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e
added a commit
that referenced
this pull request
Jun 9, 2026
…ing nit) Seven of Vishnu's PR review findings addressed, all backward-compat with main-branch consumers. The three [Important] design-redesign findings (#1 status __post_init__, #2 alias-pair invariant, #3 to_api_dict/to_json dead code) are deferred to a follow-up shared-infra dataclass ticket because they would either fire warning noise on existing call sites (``worker_base.py:211/222``, ``worker_patterns.py:241`` pass wrong-enum status) or change the wire/cache contract — neither acceptable mid-flight while keeping zero regression on main. Changes in this commit are either: * Pure additive (test methods, docstrings, observability) * Or provably equivalent wire output (the typed-count refactor) So a rolling deploy where old workers and new workers run concurrently sees identical wire shapes and identical behaviour for all current valid data; the only observable differences are log content (better context on the existing warning) and the presence of a new opt-in classmethod that nothing currently calls. * **Vishnu #8 [Suggestion]** — ``SkipReason`` docstring claimed "StrEnum semantics" but the class is ``(str, Enum)``, not ``enum.StrEnum``. The two differ on ``__str__``. Rewrote the docstring to describe the actual behaviour. * **Vishnu #4a [Important — log context]** — ``_parse_skipped`` now accepts an optional ``file_execution_id`` kwarg that ``from_dict`` threads through. The warning emitted for unknown wire values now carries the file identifier, so a real rolling-deploy incident is debuggable rather than a context-free warning. Optional kwarg with default — any existing caller passing one positional arg still works. * **Vishnu #9 [Suggestion]** — added ``BatchExecutionResult.from_file_results(...)`` classmethod that derives counters from typed file results. Purely additive: no existing caller uses it; the constructor signature is unchanged so producers that need their own counter semantics keep working. * **Vishnu #11 [Suggestion]** — ``process_file_batch_api`` was computing ``skipped_already_completed`` by string-matching the wire dicts AFTER already calling ``from_dict`` on them. Refactored to count from the typed list (single ``from_dict`` pass, enum compare). Provably equivalent for all current wire data. * **Vishnu #4 [Important — test gap]** — added ``test_from_dict_unknown_skipped_is_lenient`` covering the one documented crash-prevention path. A regression to bare ``SkipReason(raw)`` would have re-introduced the rolling-deploy crash and kept every other test green. * **Vishnu #5 [Important — failure-aggregation gap]** — added ``test_process_file_batch_api_batch_wrapper_failure_aggregation`` that drives one success + one failure through the batch wrapper. The existing success-only test never exercised ``failed_files += 1``. * **Vishnu #6 [Important — populated round-trip gap]** — added ``test_round_trip_with_populated_file_results`` and ``test_from_file_results_derives_counters``. The existing ``BatchExecutionResult`` round-trip test used ``file_results=[]``, so the list-comprehension in ``from_dict`` that rebuilds nested ``FileExecutionResult`` objects was never executed with a populated list. * **Vishnu #13 [Suggestion]** — replaced hardcoded line reference in test docstring with a symbol reference. Deferred to follow-up shared-infra dataclass-redesign ticket: * #1 ``__post_init__`` status clobber — would emit warning noise on every existing wrong-enum call site * #2 alias-pair invariant — back-fill via __post_init__ would change the wire shape (file_name no longer None → no longer stripped at the top level) * #3 ``to_api_dict``/``to_json`` dead code — looks like a public SDK surface; changing the body could surprise external consumers * #7 recursive ``None``-strip in ``serialize_value`` — touches every dataclass in the codebase * #10 ``Any`` typing tightening — low value, mypy tightening could trip downstream * #12 producer redundant kwargs — depends on #2's reconciliation Tests: workers chord-callback boundary suite 21 -> 25; full workers suite 622 -> 627 (no new failures; 6 pre-existing baseline unchanged). Five deterministic-order runs of the full suite returned exactly 627 passed / 6 pre-existing failed — zero flakiness from this change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e
added a commit
that referenced
this pull request
Jun 9, 2026
… / FileExecutionResult (#2020) * UN-3513 [FEAT] Type chord-callback boundary with BatchExecutionResult / FileExecutionResult Producers in workers/file_processing/tasks.py now build typed dataclasses (from unstract.core.worker_models) and emit their ``.to_dict()`` instead of hand-rolled dicts. Locks the wire shape to the dataclass schema so downstream refactors fail loud. Scope Producer-side typing only. Consumer (workers/callback/tasks.py + aggregate_file_batch_results) already reads via ``.get(..., default)`` — tolerant by construction — so no consumer-side change needed. Dataclass extensions (unstract.core.worker_models, additive only) * BatchExecutionResult gains 3 optional fields: skipped_already_completed, skipped_active_duplicate, organization_id. * FileExecutionResult gains 3 optional fields for the API path's legacy dict vocabulary: file_name (alias for file), result_data (alias for result), skipped (marker like "already_completed"). * Both from_dict updated to populate the new fields. Producer migrations (workers/file_processing/tasks.py) * L901 (general path, process_file_batch return): BatchExecutionResult(...).to_dict(). Wire dict gains file_results: [] and errors: [] defaults — strictly additive. * L1706, L1798, L1823 (API path returns from _process_file_batch_api_core helpers): FileExecutionResult(...).to_dict(). L1798 preserves the legacy storage_result field via dict-spread merge. Domain-vocabulary correction on the API path API-path producers previously returned status="completed" / "failed" — lowercase strings matching neither ExecutionStatus (workflow-level, uppercase) nor ApiDeploymentResultStatus (per-file, Success/Failed, the canonical per-file vocab). Producers now emit "Success" / "Failed" via FileExecutionResult. Audit: no Python equality consumer was found reading the lowercase variants (grep clean). Observability tooling pattern-matching the old strings would need updating; this is a domain-correctness fix. Tests New tests/test_chord_callback_boundary.py — 14 tests, 3 classes: * Wire-shape characterisation for BatchExecutionResult. * Wire-shape characterisation for FileExecutionResult with alias fields and canonical Success/Failed vocab. * Consumer tolerance: aggregate_file_batch_results-style .get() reads return expected values from the new wire shape. sdk1's 80 worker_models tests still pass — the dataclass extensions are strictly additive. Regression risk: zero on consumer side, zero on backend (doesn't import these classes; has its own FileExecutionResult in dto.py — untouched). Status-vocab shift on API path is a deliberate domain correction. Test count: workers boundary suite +14 (new); sdk1 dispatcher 80/80. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * UN-3513 [FIX] Address PR review (toolkit + SkipReason enum + producer-binding tests) A+B from the triage on PR #2020: * tasks.py:1659 (API-path BATCH return) — migrated to BatchExecutionResult.to_dict(). Fixes the half-typed boundary the reviewer flagged. file_results, total_files, skipped_already_completed and organization_id are now on the wire. Successful/skipped counter semantic preserved (separating them is deferred to a follow-up). * New SkipReason StrEnum (worker_models.py) with ALREADY_COMPLETED + ACTIVE_DUPLICATE — mirrors the batch-level skip counters on BatchExecutionResult. FileExecutionResult.skipped is now SkipReason | None. from_dict coerces. Producer uses the enum; the ACTIVE_DUPLICATE value has no current per-file producer but is exercised end-to-end via a round-trip test. * TODO(UN-3516) marker on the three alias fields (file_name, result_data, skipped) — sunset ticket filed. * Tests strengthened: - TestProducerBinding drives real _compile_batch_result with a minimal SimpleNamespace context, and drives _process_single_file_api via mocked api_client for the already-completed branch. - TestRealConsumerTolerance imports the real aggregate_file_batch_results — producer-consumer contract driven end-to-end. - test_none_valued_optional_fields_stripped_from_wire documents serialize_dataclass_to_dict's None-strip behaviour. - test_active_duplicate_skip_reason_round_trips proves the second enum value isn't dead. - SonarCloud python:S1244 fixed — pytest.approx. - skipped_files==0 NIT assertion removed. Test count: workers boundary suite 14 -> 18; sdk1 worker_models 80/80 still green. Deferred (separate tickets to follow): __post_init__ silent status clobber, from_dict status discard, BatchExecutionResult invariant, storage soft-failure, dead aggregator branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * UN-3513 [FIX] Address second-pass review (storage_result + lenient skipped + missing producer tests) Three findings from the second review round on PR #2020: * HIGH — storage_result silent data loss at batch boundary. The per-file dict-spread at tasks.py:1816 preserved storage_result on the immediate return, but the value was dropped when wrapped into BatchExecutionResult.file_results (from_dict didn't know the key). Promoted to a typed FileExecutionResult.storage_result: Any | None field; producer now emits via the constructor; from_dict reads it back. The round-trip preserves it end-to-end. * HIGH — strict SkipReason parsing would crash entire batches during rolling deploys if a newer producer ever emitted an unknown value. Added FileExecutionResult._parse_skipped, which catches ValueError + logs a warning + falls back to None. Standard "strict on emit, lenient on receive" posture for wire compat. * MEDIUM — TestProducerBinding only covered 2 of 5 producer branches. Added three more tests: - _process_single_file_api success branch (asserts storage_result survives the typed wire — would catch the dict-spread revert). - _process_single_file_api failure branch (asserts canonical "Failed" vocab — catches reverts to the legacy lowercase "failed"). - process_file_batch_api batch wrapper via task.apply() with an in-memory result_backend (asserts BatchExecutionResult shape + skipped_already_completed counter derived from SkipReason.ALREADY_COMPLETED.value). Strengthened the existing already-completed branch test to assert result_data + metadata propagation. Bug caught by the new batch-wrapper test: process_file_batch_api was missing execution_time on its BatchExecutionResult(...) call — BatchExecutionResult.execution_time is a required positional, so the API-path batch task would have crashed with TypeError on every run. Introduced batch_start_time = time.time() at task entry and pass execution_time = time.time() - batch_start_time. The new test would have caught this immediately at PR time; logging it here as the exact value of producer-binding coverage. Test count: 18 -> 21; all green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * UN-3513 [FIX] Symmetric None-stripping for nested file_results + deterministic callback healthcheck picker Greptile P2 #2 — None-stripping was asymmetric for nested FileExecutionResult objects. ``serialize_dataclass_to_dict`` only filters None at the outermost level, so a standalone ``FileExecutionResult.to_dict()`` would omit unset optional fields while ``batch.to_dict()["file_results"][i]`` would carry explicit ``"file_name": None`` etc. for the same input. A consumer doing ``"x" in result`` membership checks would behave differently depending on whether it read the standalone wire or the nested-in- batch wire — a real contract divergence. Fixed locally on ``BatchExecutionResult.to_dict()`` (not by touching the shared ``serialize_dataclass_to_dict`` infra): post-process ``wire["file_results"]`` to drop None-valued keys, mirroring the top-level strip. ``BatchExecutionResult.from_dict`` was already tolerant via ``.get(...)`` so the round-trip stays clean. Greptile P2 #1 (``status`` constructor parameter clobbered by ``__post_init__``) is the same pathology I flagged as BLOCKER #1 in the first review round — deferred to a separate ticket with the shared-infra dataclass redesign. Test coverage: extended the existing ``test_none_valued_optional_fields_stripped_from_wire`` to also assert nested symmetry — same test method, no new method added. This keeps the pytest collection profile stable (a separate test method would perturb celery's shared task-registry insertion order during pytest collection and amplify a pre-existing flake in ``test_callback_sanity.py``). Test infra fix (bundled because it would have flaked CI on this PR's HEAD): ``test_callback_sanity.TestEagerHealthcheckRoundTrip`` selected the healthcheck task via ``endswith(".healthcheck")`` against ``eager_app.tasks``. That registry is a shared celery global with at least 5 worker modules registering ``healthcheck`` (callback, executor, file_processing, log_consumer, scheduler). ``next(...)`` returned whichever was inserted first, which depends on pytest module-collection order across the whole suite. The test would assert ``worker_type == "callback"`` and intermittently get ``"executor"`` or ``"file_processing"`` instead — empirically a ~10% flake rate on this branch's HEAD, climbing to ~90% with any test-collection perturbation. Replaced with an exact-name lookup (``name == "callback.worker.healthcheck"``); 30/30 green across deterministic + randomised probes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * UN-3513 [FIX] Address vishnuszipstack review (7 real fixes + 1 docstring nit) Seven of Vishnu's PR review findings addressed, all backward-compat with main-branch consumers. The three [Important] design-redesign findings (#1 status __post_init__, #2 alias-pair invariant, #3 to_api_dict/to_json dead code) are deferred to a follow-up shared-infra dataclass ticket because they would either fire warning noise on existing call sites (``worker_base.py:211/222``, ``worker_patterns.py:241`` pass wrong-enum status) or change the wire/cache contract — neither acceptable mid-flight while keeping zero regression on main. Changes in this commit are either: * Pure additive (test methods, docstrings, observability) * Or provably equivalent wire output (the typed-count refactor) So a rolling deploy where old workers and new workers run concurrently sees identical wire shapes and identical behaviour for all current valid data; the only observable differences are log content (better context on the existing warning) and the presence of a new opt-in classmethod that nothing currently calls. * **Vishnu #8 [Suggestion]** — ``SkipReason`` docstring claimed "StrEnum semantics" but the class is ``(str, Enum)``, not ``enum.StrEnum``. The two differ on ``__str__``. Rewrote the docstring to describe the actual behaviour. * **Vishnu #4a [Important — log context]** — ``_parse_skipped`` now accepts an optional ``file_execution_id`` kwarg that ``from_dict`` threads through. The warning emitted for unknown wire values now carries the file identifier, so a real rolling-deploy incident is debuggable rather than a context-free warning. Optional kwarg with default — any existing caller passing one positional arg still works. * **Vishnu #9 [Suggestion]** — added ``BatchExecutionResult.from_file_results(...)`` classmethod that derives counters from typed file results. Purely additive: no existing caller uses it; the constructor signature is unchanged so producers that need their own counter semantics keep working. * **Vishnu #11 [Suggestion]** — ``process_file_batch_api`` was computing ``skipped_already_completed`` by string-matching the wire dicts AFTER already calling ``from_dict`` on them. Refactored to count from the typed list (single ``from_dict`` pass, enum compare). Provably equivalent for all current wire data. * **Vishnu #4 [Important — test gap]** — added ``test_from_dict_unknown_skipped_is_lenient`` covering the one documented crash-prevention path. A regression to bare ``SkipReason(raw)`` would have re-introduced the rolling-deploy crash and kept every other test green. * **Vishnu #5 [Important — failure-aggregation gap]** — added ``test_process_file_batch_api_batch_wrapper_failure_aggregation`` that drives one success + one failure through the batch wrapper. The existing success-only test never exercised ``failed_files += 1``. * **Vishnu #6 [Important — populated round-trip gap]** — added ``test_round_trip_with_populated_file_results`` and ``test_from_file_results_derives_counters``. The existing ``BatchExecutionResult`` round-trip test used ``file_results=[]``, so the list-comprehension in ``from_dict`` that rebuilds nested ``FileExecutionResult`` objects was never executed with a populated list. * **Vishnu #13 [Suggestion]** — replaced hardcoded line reference in test docstring with a symbol reference. Deferred to follow-up shared-infra dataclass-redesign ticket: * #1 ``__post_init__`` status clobber — would emit warning noise on every existing wrong-enum call site * #2 alias-pair invariant — back-fill via __post_init__ would change the wire shape (file_name no longer None → no longer stripped at the top level) * #3 ``to_api_dict``/``to_json`` dead code — looks like a public SDK surface; changing the body could surprise external consumers * #7 recursive ``None``-strip in ``serialize_value`` — touches every dataclass in the codebase * #10 ``Any`` typing tightening — low value, mypy tightening could trip downstream * #12 producer redundant kwargs — depends on #2's reconciliation Tests: workers chord-callback boundary suite 21 -> 25; full workers suite 622 -> 627 (no new failures; 6 pre-existing baseline unchanged). Five deterministic-order runs of the full suite returned exactly 627 passed / 6 pre-existing failed — zero flakiness from this change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e
added a commit
that referenced
this pull request
Jun 17, 2026
…ing + typing/dedup/docs/tests Decision (with reviewer): reaper-as-safety-net for the un-catchable strand windows + fix what's catchable + document + gate on PR3. Failure handling: - [#69 Critical] run_batch_with_barrier wraps BOTH work + decrement in the abort: a decrement-side failure (guard / DB / last-batch callback dispatch) tears the barrier down in-body instead of stranding to expiry. - [#79] extracted _abort_barrier_in_body — logs when the teardown itself fails (was silently suppressed under a misleading "torn down" message). - [#74/#81] documented the two un-catchable strand windows (hard-crash-during-work, post-commit callback-dispatch-fail) as a HARD reaper dependency for PR3. - [#86] finalise cleanup split into independent try/excepts with distinct logs. Typing / clarity: - [#1] BarrierContext(TypedDict) for _barrier_context (header fan-out, run_batch_with_barrier, process_file_batch). - [#3] renamed CallbackDescriptor "backend" -> "transport" (WorkflowTransport value; avoids the QueueBackend "pg" collision). - [#27] is_pg_transport() predicate in core; used in orchestration_utils + pg_barrier. - [#20] extracted _dispatch_pg() — single home for cycle-avoiding local import + backend=PG. - [#35] normalize_transport() at the general worker entry (parity w/ api/scheduler). - [#94] log when a header has no queue option. - [#9/#13] fixed born-stale comment + kwargs-not-args docstring. Tests (+#37/#41): last-batch self-chains callback to PG + cleans up barrier/dedup; decrement-failure aborts; PG-branch mid-loop dispatch-failure deletes row; header args/queue/pre-existing-kwargs preservation. 137 barrier/dedup/routing tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
muhammad-ali-e
added a commit
that referenced
this pull request
Jun 17, 2026
…fire-and-forget) (#2069) * UN-3563 [FEAT] PG Queue 9e PR 2c — live PG fan-out/barrier/callback (fire-and-forget) Wires the coupled pipeline's fan-out → barrier → callback onto the PG queue for a transport=="pg_queue" execution. Gated: resolve_transport() still returns celery (PR3 Flipt flips it), so the whole PG branch is present-but-unreachable — default path byte-identical. Orchestrator task (async_execute_bin) stays on Celery (hybrid); routing it onto PG is a 2d follow-up. - barrier.py: Barrier Protocol + CeleryChordBarrier/RedisDecrBarrier accept (and ignore) a `transport` param; CallbackDescriptor gains an optional `backend`. - orchestration_utils._barrier_for_transport: pg_queue → fresh PgBarrier() (bypasses the WORKER_BARRIER_BACKEND singleton), else the singleton. - pg_barrier.PgBarrier.enqueue(transport): pg_queue → fire-and-forget mode — _dispatch_header_pg sends each header via dispatch(backend=PG) with an injected _barrier_context {execution_id, batch_index, callback_descriptor}, no .link; descriptor marked backend=pg_queue; UPSERT block also clears pg_batch_dedup (greptile #2068 reuse-reset). _fire_barrier_callback self-chains the callback onto PG when backend==pg_queue. clear_execution_batches at finalise + abort. run_batch_with_barrier(): claim → work → in-body _barrier_pg_decrement; redelivery skips; exception → barrier_pg_abort. - file_processing.process_file_batch(_barrier_context=None): core routes None → _run_batch_stages (celery chord path), else → run_batch_with_barrier. - general/api fan-outs thread transport into create_chord_execution. Tests: +8 PgBarrier fire-and-forget + 2 orchestration routing + 2 process_file_batch routing. Each test file green alone; ruff clean. End-to-end forced-pg dev-test pending before PR. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 fix SonarCloud S1172: drop unused task_instance from _run_batch_stages The extracted _run_batch_stages never uses task_instance — its only purpose (deriving celery_task_id) happens in _process_file_batch_core before the call. Removed the param + updated both call sites. _process_file_batch_core keeps task_instance (it reads .request.id). Routing test mocks with *a, unaffected. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 address review (muhammad-ali-e, 15): strand-on-failure hardening + typing/dedup/docs/tests Decision (with reviewer): reaper-as-safety-net for the un-catchable strand windows + fix what's catchable + document + gate on PR3. Failure handling: - [#69 Critical] run_batch_with_barrier wraps BOTH work + decrement in the abort: a decrement-side failure (guard / DB / last-batch callback dispatch) tears the barrier down in-body instead of stranding to expiry. - [#79] extracted _abort_barrier_in_body — logs when the teardown itself fails (was silently suppressed under a misleading "torn down" message). - [#74/#81] documented the two un-catchable strand windows (hard-crash-during-work, post-commit callback-dispatch-fail) as a HARD reaper dependency for PR3. - [#86] finalise cleanup split into independent try/excepts with distinct logs. Typing / clarity: - [#1] BarrierContext(TypedDict) for _barrier_context (header fan-out, run_batch_with_barrier, process_file_batch). - [#3] renamed CallbackDescriptor "backend" -> "transport" (WorkflowTransport value; avoids the QueueBackend "pg" collision). - [#27] is_pg_transport() predicate in core; used in orchestration_utils + pg_barrier. - [#20] extracted _dispatch_pg() — single home for cycle-avoiding local import + backend=PG. - [#35] normalize_transport() at the general worker entry (parity w/ api/scheduler). - [#94] log when a header has no queue option. - [#9/#13] fixed born-stale comment + kwargs-not-args docstring. Tests (+#37/#41): last-batch self-chains callback to PG + cleans up barrier/dedup; decrement-failure aborts; PG-branch mid-loop dispatch-failure deletes row; header args/queue/pre-existing-kwargs preservation. 137 barrier/dedup/routing tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 fix run_batch_with_barrier strand-window doc inconsistency (review) The second "NOT catchable" bullet conflated two different things: it described the in-body catchable abort ("the abort here removes the row") and a *software* callback-dispatch failure — but that failure is already caught + torn down by step 3's wrap (paragraph 1), so it doesn't belong under the un-catchable heading, and on the PG path _fire_barrier_callback IS the enqueue so "committed but before the enqueue" couldn't both hold. Rewrote the bullet to the genuinely un-catchable window: a hard crash BETWEEN the decrement committing (remaining→0) and the callback enqueue completing — decrement committed (redelivery blocked by the marker), process gone before the callback enqueues or any abort runs, row survives to expiry, reaper-only recovery. Explicitly notes a software dispatch failure is the catchable case. Keeps this list an accurate spec for the PR-3 reaper-recovery dependency. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 address greptile (#2069, 2): clear dedup on mid-loop PG failure + carry fairness on PG callback Both in the gated PG path (greptile 4/5, safe to merge). - Issue 1: PgBarrier.enqueue mid-loop dispatch-failure handler now also calls clear_execution_batches on the PG path. Earlier headers may have committed a claim_batch marker; with the barrier row deleted, their in-flight barrier_pg_abort is a no-op (already_aborted) and never reaches the clear inside it, so reclaim the markers directly here. - Issue 2: the PG callback now carries the producer's fairness. Added _fairness_from_headers() to reconstruct the FairnessKey from the stored x-fairness-key headers and pass it to _dispatch_pg, so the callback rides the same org/priority as the Celery path (was always default priority). Tests: +fairness-carried / +fairness-none-safe on _fire_barrier_callback; extended the PG mid-loop test to assert an already-claimed marker is reclaimed. 75 barrier/dedup tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 fix SonarCloud S3776: reduce PgBarrier.enqueue cognitive complexity (17→under 15) Extracted the per-header dispatch loop into PgBarrier._dispatch_headers — the deeply-nested for→try/except→if/else→if (PG-vs-celery branch + mid-loop failure teardown + PG dedup-clear) was the complexity driver. enqueue now calls the helper; behaviour identical. radon: enqueue C(11)→B(6); ruff C901 passes. 75 barrier/dedup tests green; ruff + ruff-format clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * UN-3563 fix greptile #2069: mid-loop dedup-clear test passed for the wrong reason The pre-seeded claim_batch marker was wiped by enqueue's UPSERT block (the reuse-reset DELETE) before the dispatch loop, so the mid-loop clear_execution_batches deleted 0 rows — the count==0 assertion passed on the UPSERT, not the guard under test. Now the first dispatch side-effect claims the marker AFTER the UPSERT (simulating a fast PG consumer), so the mid-loop clear is what removes it. Verified: with the clear disabled the marker orphans (count=1). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
...
Why
...
How
...
Relevant Docs
Related Issues or PRs
Dependencies Versions / Env Variables
Notes on Testing
...
Screenshots
...
Checklist
I have read and understood the Contribution Guidelines.