Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[WIP] [FEAT] V1 - Time based trials #13

Closed
harini-venkataraman wants to merge 0 commit into
mainZipstack/unstract:mainfrom
feat/trialsZipstack/unstract:feat/trialsCopy head branch name to clipboard
Closed

[WIP] [FEAT] V1 - Time based trials #13
harini-venkataraman wants to merge 0 commit into
mainZipstack/unstract:mainfrom
feat/trialsZipstack/unstract:feat/trialsCopy head branch name to clipboard

Conversation

@harini-venkataraman

Copy link
Copy Markdown
Contributor

What

...

Why

...

How

...

Relevant Docs

Related Issues or PRs

Dependencies Versions / Env Variables

Notes on Testing

...

Screenshots

...

Checklist

I have read and understood the Contribution Guidelines.

chandrasekharan-zipstack added a commit that referenced this pull request Apr 17, 2026
… wrappers

Address review comments on PR #1886:

- #9 (typing): call_with_retry / acall_with_retry / iter_with_retry
  previously returned `object`, erasing caller type info. Add PEP 695
  generics so the return type flows from the wrapped callable:
  acall_with_retry now takes Callable[[], Awaitable[T]] and
  iter_with_retry takes Callable[[], Iterable[T]] -> Generator[T, ...].
- #11 / #13 (DRY): `_pop_retry_params` in embedding.py and
  `_disable_litellm_retry` in llm.py were identical logic. Lift to
  shared `pop_litellm_retry_kwargs` helper in retry_utils.py and delete
  both methods.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e added a commit that referenced this pull request Apr 17, 2026
* [FIX] Unified retry for LLM and embedding providers

litellm's retry only works for SDK-based providers (OpenAI/Azure).
httpx-based providers (Anthropic, Vertex, Bedrock, Mistral) and ALL
embedding calls silently ignore max_retries. This adds self-managed
retry with exponential backoff at the SDK layer, disabling litellm's
own retry entirely for consistency.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* [REFACTOR] DRY retry logic into reusable call_with_retry utilities

Move retry loops out of LLM/Embedding classes into generic
call_with_retry, acall_with_retry, and iter_with_retry functions
in retry_utils.py. Both classes now call these directly instead
of maintaining their own retry helper methods.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* [FIX] Consolidate retry logic, expose max_retries for all adapters

- Extract _get_retry_delay() shared helper to eliminate duplicated
  retry decision logic across call_with_retry, acall_with_retry,
  iter_with_retry, and retry_with_exponential_backoff
- Add num_retries=0 to embedding._pop_retry_params() to fully
  disable litellm's internal retry for embedding calls
- Expose max_retries in UI JSON schemas for embedding adapters
  (OpenAI, Azure, VertexAI, Ollama) and Ollama LLM — previously
  the field existed in Pydantic models but wasn't shown to users,
  silently defaulting to 0 retries
- Add debug logging to LLM and Embedding retry parameter extraction
- Clarify docstrings distinguishing is_retryable_litellm_error()
  from is_retryable_error() (different exception hierarchies)
- Remove stale noqa: C901 from simplified retry_with_exponential_backoff

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* [FIX] Set max_retries default to 3 for all embedding and Ollama LLM adapters

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* [FIX] Address greptile review: fix shadowed ConnectionError, use MRO check

- Fix `requests.ConnectionError` shadowing Python's builtin `ConnectionError`
  in `is_retryable_litellm_error()` — rename import to `RequestsConnectionError`
  and use `builtins.ConnectionError` / `builtins.TimeoutError` explicitly
- Use `__mro__`-based class name check instead of `type(error).__name__`
  to also catch subclasses of retryable error types
- P1 (num_retries not zeroed) was already fixed in prior commit

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* [FIX] Address CodeRabbit review: add APITimeoutError, validate max_retries

- Add APITimeoutError to _RETRYABLE_ERROR_NAMES for explicit OpenAI
  SDK timeout coverage
- Add _validate_max_retries() guard to call_with_retry, acall_with_retry,
  iter_with_retry to fail fast on negative values instead of silently
  returning None

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* UN-3344 [FIX] Reduce cognitive complexity and remove useless except clause

Address SonarCloud findings on PR #1886:
- S3776: Flatten retry_with_exponential_backoff.wrapper by moving the
  success logging + return out of the try block and using `continue` in
  the retry path, so the except branch only handles the give-up case.
- S2737: Drop the `except Exception: raise` clause — it was a no-op that
  added complexity without changing behavior (non-matching exceptions
  propagate naturally).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3344 [FIX] Extract retry loop to top-level helper to drop cognitive complexity

Sonar still flagged retry_with_exponential_backoff at complexity 16 after
the previous flatten. Nested def decorator / def wrapper counted against
the outer function's score. Move the retry body to a module-level
_invoke_with_retries helper so the decorator factory just delegates,
bringing the outer function well under the 15 threshold.

Behavior is unchanged — all paths (success, retry, give-up, non-retryable
propagate) are preserved and covered by the existing SDK1 tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3344 [FIX] Honor Retry-After, close stream gen on retry, share give-up log

Address review comments on PR #1886:

- #10 (resource leak): close the generator returned by fn() before
  retrying in iter_with_retry — otherwise streaming providers leak an
  in-flight HTTP socket until GC.
- #12 (behavioral regression): when we zero out SDK/wrapper retries we
  also lose the OpenAI SDK's native Retry-After handling on 429/503.
  _get_retry_delay now checks error.response.headers["retry-after"] and
  uses that value ahead of exponential backoff. HTTP-date form is not
  parsed; those fall back to backoff.
- #8 (observability gap): move the "Giving up ... after N attempt(s)"
  log into _get_retry_delay so all four retry helpers (call_with_retry,
  acall_with_retry, iter_with_retry, decorator) share the same
  exhaustion signal. Previously only the decorator path logged it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3344 [REFACTOR] Share retry-kwargs helper and add TypeVar to retry wrappers

Address review comments on PR #1886:

- #9 (typing): call_with_retry / acall_with_retry / iter_with_retry
  previously returned `object`, erasing caller type info. Add PEP 695
  generics so the return type flows from the wrapped callable:
  acall_with_retry now takes Callable[[], Awaitable[T]] and
  iter_with_retry takes Callable[[], Iterable[T]] -> Generator[T, ...].
- #11 / #13 (DRY): `_pop_retry_params` in embedding.py and
  `_disable_litellm_retry` in llm.py were identical logic. Lift to
  shared `pop_litellm_retry_kwargs` helper in retry_utils.py and delete
  both methods.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: ali <117142933+muhammad-ali-e@users.noreply.github.com>
ritwik-g pushed a commit that referenced this pull request May 8, 2026
* UN-3450 [FEAT] Add golden-path smoke test for callback worker

Adds workers/tests/test_callback_sanity.py mirroring the existing
test_executor_sanity.py pattern. Covers:

- Worker enums and registry (WorkerType.CALLBACK, QueueName.CALLBACK +
  CALLBACK_API, TaskName.PROCESS_BATCH_CALLBACK, health port 8083,
  WorkerRegistry queue config + task routing).
- Celery task wiring (process_batch_callback, process_batch_callback_api,
  healthcheck — registration, retry config, max_retries, autoretry_for).
- Full dispatch -> task -> return round-trip via Celery eager mode,
  using the simple healthcheck task (process_batch_callback itself
  needs a configured InternalAPIClient + downstream HTTP, which is
  heavy mocking territory unsuitable for a smoke test — covered later
  in #1.2 characterisation suite).

Coverage gains on callback module:
- callback/__init__.py:  0% -> 100%
- callback/worker.py:    0% -> 52%
- callback/tasks.py:     0% -> 10%
- Module total:          0% -> 12%

14 tests, run in ~17s. Regression net for spine PRs #6 (CallbackStatus
enum migration) and #13 (chord -> Barrier lift in callback chain).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3450 [FEAT] Address Greptile review on callback smoke test

Two P2 findings from Greptile, both fixed:

1. test_healthcheck_result_is_json_serializable: drop the `default=str`
   fallback in json.dumps. With it, any non-JSON-serializable value
   (UUID, datetime, custom object) gets silently coerced to a string —
   exactly the failure mode the test claims to catch. Without it, the
   assertion faithfully tests "round-trips cleanly via JSON" the way
   Celery's serializer would.

2. Add registration check for `process_batch_callback_django_compat`
   (the backward-compat task name `workflow_manager.workflow_v2.
   file_execution_tasks.process_batch_callback`). Both this task and
   `process_batch_callback` delegate to `_process_batch_callback_core`,
   so refactors that touch the core (e.g. the upcoming CallbackStatus
   enum migration) affect this path too. Three-line addition.

15 tests now (was 14), runtime 11s.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e added a commit that referenced this pull request May 27, 2026
…1950)

* UN-3452 [FEAT] Characterise the seams: dispatch + chord call sites

Sub-task A under #1.2 — characterisation suite for the seams that
upcoming spine PRs will refactor. Two new test files, zero production
changes.

Dispatch seam (unblocks PR #8 — @shared_task -> @worker_task migration):
- workers/tests/test_dispatch_sites_characterisation.py (276 lines, 11 tests)
- Locks contract on the two raw current_app.send_task call sites:
    - shared/patterns/notification/helper.py:76 (webhook dispatch)
    - scheduler/tasks.py:157 (scheduled workflow async dispatch)
- Tests pin: task name, positional args layout, kwargs layout, target
  queue, return-value semantics, error-path behaviour
- Inventory canary: fails if a third raw current_app.send_task site
  appears anywhere in workers/ source

Chord seam (unblocks PR #13 — chord -> Barrier lift):
- workers/tests/test_chord_sites_characterisation.py (316 lines, 9 tests)
- Locks contract on the chord pattern via:
    - WorkflowOrchestrationUtils.create_chord_execution (centralised helper)
    - WorkflowOrchestrationMixin.create_chord (mixin wrapper)
- Tests pin: empty-batch short-circuit (existing defense against silent
  task drops at scale — Pain Point #2 in the PG Queue decision doc),
  callback-signature construction, return-value semantics, error
  propagation, mixin's app extraction + RuntimeError on missing app
- Inventory canaries: fail if a third chord(...) call site OR a third
  `from celery import chord` import appears anywhere in workers/ source
- api-deployment/tasks.py:673 inline chord covered only by inventory
  (direct unit-testing requires heavy mocking of the 273-line
  _run_workflow_api enclosing function — out of scope here, the
  canary still catches it for PR #13)

Total: 20 tests, ~2s runtime, 0 production changes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3452 [FEAT] Address Greptile review on seam characterisation tests

Three P2 findings from Greptile, all fixed:

1. test_chord_import_only_in_two_files: add file-identity assertions
   matching the sibling call-site canary. Without these, the canary
   would silently pass if the two imports moved to entirely different
   files while count remained 2 — exactly the silent-miss scenario
   the Barrier migration could trigger.

2. TestSchedulerDispatchSite: add test_dispatch_returns_error_result_
   when_send_task_raises. The scheduler site has a real error branch
   in scheduler/tasks.py:185-192 that catches send_task exceptions
   and returns SchedulerExecutionResult.error(...) — without a
   characterisation test the upcoming dispatch() migration could
   silently change error semantics (re-raise instead of returning
   an error result, or swallow silently). Mirrors the equivalent
   notification-site test_dispatch_returns_false_on_send_task_failure.

3. skip_dirs check anchored to top-level dir relative to workers_root
   in all three inventory tests. The previous `any(part in skip_dirs
   for part in py.parts)` check would have erroneously excluded any
   path with a component named `tests` (e.g. workers/shared/
   tests_helpers/foo.py).

21 tests now (was 20), runtime ~3s.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e added a commit that referenced this pull request Jun 9, 2026
…ing nit)

Seven of Vishnu's PR review findings addressed, all backward-compat with
main-branch consumers. The three [Important] design-redesign findings
(#1 status __post_init__, #2 alias-pair invariant, #3 to_api_dict/to_json
dead code) are deferred to a follow-up shared-infra dataclass ticket
because they would either fire warning noise on existing call sites
(``worker_base.py:211/222``, ``worker_patterns.py:241`` pass wrong-enum
status) or change the wire/cache contract — neither acceptable
mid-flight while keeping zero regression on main.

Changes in this commit are either:
  * Pure additive (test methods, docstrings, observability)
  * Or provably equivalent wire output (the typed-count refactor)

So a rolling deploy where old workers and new workers run concurrently
sees identical wire shapes and identical behaviour for all current
valid data; the only observable differences are log content (better
context on the existing warning) and the presence of a new opt-in
classmethod that nothing currently calls.

* **Vishnu #8 [Suggestion]** — ``SkipReason`` docstring claimed
  "StrEnum semantics" but the class is ``(str, Enum)``, not
  ``enum.StrEnum``. The two differ on ``__str__``. Rewrote the
  docstring to describe the actual behaviour.

* **Vishnu #4a [Important — log context]** — ``_parse_skipped``
  now accepts an optional ``file_execution_id`` kwarg that
  ``from_dict`` threads through. The warning emitted for unknown
  wire values now carries the file identifier, so a real
  rolling-deploy incident is debuggable rather than a context-free
  warning. Optional kwarg with default — any existing caller passing
  one positional arg still works.

* **Vishnu #9 [Suggestion]** — added
  ``BatchExecutionResult.from_file_results(...)`` classmethod that
  derives counters from typed file results. Purely additive: no
  existing caller uses it; the constructor signature is unchanged
  so producers that need their own counter semantics keep working.

* **Vishnu #11 [Suggestion]** — ``process_file_batch_api`` was
  computing ``skipped_already_completed`` by string-matching the
  wire dicts AFTER already calling ``from_dict`` on them. Refactored
  to count from the typed list (single ``from_dict`` pass, enum
  compare). Provably equivalent for all current wire data.

* **Vishnu #4 [Important — test gap]** — added
  ``test_from_dict_unknown_skipped_is_lenient`` covering the one
  documented crash-prevention path. A regression to bare
  ``SkipReason(raw)`` would have re-introduced the rolling-deploy
  crash and kept every other test green.

* **Vishnu #5 [Important — failure-aggregation gap]** — added
  ``test_process_file_batch_api_batch_wrapper_failure_aggregation``
  that drives one success + one failure through the batch wrapper.
  The existing success-only test never exercised
  ``failed_files += 1``.

* **Vishnu #6 [Important — populated round-trip gap]** — added
  ``test_round_trip_with_populated_file_results`` and
  ``test_from_file_results_derives_counters``. The existing
  ``BatchExecutionResult`` round-trip test used
  ``file_results=[]``, so the list-comprehension in ``from_dict``
  that rebuilds nested ``FileExecutionResult`` objects was never
  executed with a populated list.

* **Vishnu #13 [Suggestion]** — replaced hardcoded line reference
  in test docstring with a symbol reference.

Deferred to follow-up shared-infra dataclass-redesign ticket:
  * #1 ``__post_init__`` status clobber — would emit warning noise
    on every existing wrong-enum call site
  * #2 alias-pair invariant — back-fill via __post_init__ would
    change the wire shape (file_name no longer None → no longer
    stripped at the top level)
  * #3 ``to_api_dict``/``to_json`` dead code — looks like a public
    SDK surface; changing the body could surprise external consumers
  * #7 recursive ``None``-strip in ``serialize_value`` — touches
    every dataclass in the codebase
  * #10 ``Any`` typing tightening — low value, mypy tightening could
    trip downstream
  * #12 producer redundant kwargs — depends on #2's reconciliation

Tests: workers chord-callback boundary suite 21 -> 25; full workers
suite 622 -> 627 (no new failures; 6 pre-existing baseline
unchanged). Five deterministic-order runs of the full suite returned
exactly 627 passed / 6 pre-existing failed — zero flakiness from
this change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e added a commit that referenced this pull request Jun 9, 2026
… / FileExecutionResult (#2020)

* UN-3513 [FEAT] Type chord-callback boundary with BatchExecutionResult / FileExecutionResult

Producers in workers/file_processing/tasks.py now build typed
dataclasses (from unstract.core.worker_models) and emit their
``.to_dict()`` instead of hand-rolled dicts. Locks the wire shape to
the dataclass schema so downstream refactors fail loud.

Scope

Producer-side typing only. Consumer (workers/callback/tasks.py +
aggregate_file_batch_results) already reads via ``.get(..., default)``
— tolerant by construction — so no consumer-side change needed.

Dataclass extensions (unstract.core.worker_models, additive only)

* BatchExecutionResult gains 3 optional fields: skipped_already_completed,
  skipped_active_duplicate, organization_id.
* FileExecutionResult gains 3 optional fields for the API path's legacy
  dict vocabulary: file_name (alias for file), result_data (alias for
  result), skipped (marker like "already_completed").
* Both from_dict updated to populate the new fields.

Producer migrations (workers/file_processing/tasks.py)

* L901 (general path, process_file_batch return):
  BatchExecutionResult(...).to_dict(). Wire dict gains file_results: []
  and errors: [] defaults — strictly additive.
* L1706, L1798, L1823 (API path returns from _process_file_batch_api_core
  helpers): FileExecutionResult(...).to_dict(). L1798 preserves the
  legacy storage_result field via dict-spread merge.

Domain-vocabulary correction on the API path

API-path producers previously returned status="completed" / "failed" —
lowercase strings matching neither ExecutionStatus (workflow-level,
uppercase) nor ApiDeploymentResultStatus (per-file, Success/Failed,
the canonical per-file vocab). Producers now emit "Success" / "Failed"
via FileExecutionResult.

Audit: no Python equality consumer was found reading the lowercase
variants (grep clean). Observability tooling pattern-matching the
old strings would need updating; this is a domain-correctness fix.

Tests

New tests/test_chord_callback_boundary.py — 14 tests, 3 classes:
* Wire-shape characterisation for BatchExecutionResult.
* Wire-shape characterisation for FileExecutionResult with alias
  fields and canonical Success/Failed vocab.
* Consumer tolerance: aggregate_file_batch_results-style .get() reads
  return expected values from the new wire shape.

sdk1's 80 worker_models tests still pass — the dataclass extensions
are strictly additive.

Regression risk: zero on consumer side, zero on backend
(doesn't import these classes; has its own FileExecutionResult in
dto.py — untouched). Status-vocab shift on API path is a deliberate
domain correction.

Test count: workers boundary suite +14 (new); sdk1 dispatcher 80/80.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3513 [FIX] Address PR review (toolkit + SkipReason enum + producer-binding tests)

A+B from the triage on PR #2020:

* tasks.py:1659 (API-path BATCH return) — migrated to
  BatchExecutionResult.to_dict(). Fixes the half-typed boundary the
  reviewer flagged. file_results, total_files, skipped_already_completed
  and organization_id are now on the wire. Successful/skipped counter
  semantic preserved (separating them is deferred to a follow-up).

* New SkipReason StrEnum (worker_models.py) with ALREADY_COMPLETED +
  ACTIVE_DUPLICATE — mirrors the batch-level skip counters on
  BatchExecutionResult. FileExecutionResult.skipped is now
  SkipReason | None. from_dict coerces. Producer uses the enum;
  the ACTIVE_DUPLICATE value has no current per-file producer but
  is exercised end-to-end via a round-trip test.

* TODO(UN-3516) marker on the three alias fields (file_name,
  result_data, skipped) — sunset ticket filed.

* Tests strengthened:
  - TestProducerBinding drives real _compile_batch_result with a
    minimal SimpleNamespace context, and drives _process_single_file_api
    via mocked api_client for the already-completed branch.
  - TestRealConsumerTolerance imports the real
    aggregate_file_batch_results — producer-consumer contract driven
    end-to-end.
  - test_none_valued_optional_fields_stripped_from_wire documents
    serialize_dataclass_to_dict's None-strip behaviour.
  - test_active_duplicate_skip_reason_round_trips proves the second
    enum value isn't dead.
  - SonarCloud python:S1244 fixed — pytest.approx.
  - skipped_files==0 NIT assertion removed.

Test count: workers boundary suite 14 -> 18; sdk1 worker_models 80/80
still green.

Deferred (separate tickets to follow): __post_init__ silent status
clobber, from_dict status discard, BatchExecutionResult invariant,
storage soft-failure, dead aggregator branch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3513 [FIX] Address second-pass review (storage_result + lenient skipped + missing producer tests)

Three findings from the second review round on PR #2020:

* HIGH — storage_result silent data loss at batch boundary. The
  per-file dict-spread at tasks.py:1816 preserved storage_result on
  the immediate return, but the value was dropped when wrapped into
  BatchExecutionResult.file_results (from_dict didn't know the key).
  Promoted to a typed FileExecutionResult.storage_result: Any | None
  field; producer now emits via the constructor; from_dict reads it
  back. The round-trip preserves it end-to-end.

* HIGH — strict SkipReason parsing would crash entire batches during
  rolling deploys if a newer producer ever emitted an unknown value.
  Added FileExecutionResult._parse_skipped, which catches ValueError
  + logs a warning + falls back to None. Standard "strict on emit,
  lenient on receive" posture for wire compat.

* MEDIUM — TestProducerBinding only covered 2 of 5 producer branches.
  Added three more tests:
  - _process_single_file_api success branch (asserts storage_result
    survives the typed wire — would catch the dict-spread revert).
  - _process_single_file_api failure branch (asserts canonical
    "Failed" vocab — catches reverts to the legacy lowercase
    "failed").
  - process_file_batch_api batch wrapper via task.apply() with an
    in-memory result_backend (asserts BatchExecutionResult shape +
    skipped_already_completed counter derived from
    SkipReason.ALREADY_COMPLETED.value).
  Strengthened the existing already-completed branch test to assert
  result_data + metadata propagation.

Bug caught by the new batch-wrapper test: process_file_batch_api was
missing execution_time on its BatchExecutionResult(...) call —
BatchExecutionResult.execution_time is a required positional, so the
API-path batch task would have crashed with TypeError on every run.
Introduced batch_start_time = time.time() at task entry and pass
execution_time = time.time() - batch_start_time. The new test would
have caught this immediately at PR time; logging it here as the
exact value of producer-binding coverage.

Test count: 18 -> 21; all green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3513 [FIX] Symmetric None-stripping for nested file_results + deterministic callback healthcheck picker

Greptile P2 #2 — None-stripping was asymmetric for nested
FileExecutionResult objects. ``serialize_dataclass_to_dict`` only
filters None at the outermost level, so a standalone
``FileExecutionResult.to_dict()`` would omit unset optional fields
while ``batch.to_dict()["file_results"][i]`` would carry explicit
``"file_name": None`` etc. for the same input. A consumer doing
``"x" in result`` membership checks would behave differently
depending on whether it read the standalone wire or the nested-in-
batch wire — a real contract divergence.

Fixed locally on ``BatchExecutionResult.to_dict()`` (not by touching
the shared ``serialize_dataclass_to_dict`` infra): post-process
``wire["file_results"]`` to drop None-valued keys, mirroring the
top-level strip. ``BatchExecutionResult.from_dict`` was already
tolerant via ``.get(...)`` so the round-trip stays clean.

Greptile P2 #1 (``status`` constructor parameter clobbered by
``__post_init__``) is the same pathology I flagged as BLOCKER #1 in
the first review round — deferred to a separate ticket with the
shared-infra dataclass redesign.

Test coverage: extended the existing
``test_none_valued_optional_fields_stripped_from_wire`` to also
assert nested symmetry — same test method, no new method added.
This keeps the pytest collection profile stable (a separate test
method would perturb celery's shared task-registry insertion
order during pytest collection and amplify a pre-existing flake
in ``test_callback_sanity.py``).

Test infra fix (bundled because it would have flaked CI on this
PR's HEAD): ``test_callback_sanity.TestEagerHealthcheckRoundTrip``
selected the healthcheck task via
``endswith(".healthcheck")`` against ``eager_app.tasks``. That
registry is a shared celery global with at least 5 worker modules
registering ``healthcheck`` (callback, executor, file_processing,
log_consumer, scheduler). ``next(...)`` returned whichever was
inserted first, which depends on pytest module-collection order
across the whole suite. The test would assert
``worker_type == "callback"`` and intermittently get ``"executor"``
or ``"file_processing"`` instead — empirically a ~10% flake rate
on this branch's HEAD, climbing to ~90% with any test-collection
perturbation. Replaced with an exact-name lookup
(``name == "callback.worker.healthcheck"``); 30/30 green across
deterministic + randomised probes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3513 [FIX] Address vishnuszipstack review (7 real fixes + 1 docstring nit)

Seven of Vishnu's PR review findings addressed, all backward-compat with
main-branch consumers. The three [Important] design-redesign findings
(#1 status __post_init__, #2 alias-pair invariant, #3 to_api_dict/to_json
dead code) are deferred to a follow-up shared-infra dataclass ticket
because they would either fire warning noise on existing call sites
(``worker_base.py:211/222``, ``worker_patterns.py:241`` pass wrong-enum
status) or change the wire/cache contract — neither acceptable
mid-flight while keeping zero regression on main.

Changes in this commit are either:
  * Pure additive (test methods, docstrings, observability)
  * Or provably equivalent wire output (the typed-count refactor)

So a rolling deploy where old workers and new workers run concurrently
sees identical wire shapes and identical behaviour for all current
valid data; the only observable differences are log content (better
context on the existing warning) and the presence of a new opt-in
classmethod that nothing currently calls.

* **Vishnu #8 [Suggestion]** — ``SkipReason`` docstring claimed
  "StrEnum semantics" but the class is ``(str, Enum)``, not
  ``enum.StrEnum``. The two differ on ``__str__``. Rewrote the
  docstring to describe the actual behaviour.

* **Vishnu #4a [Important — log context]** — ``_parse_skipped``
  now accepts an optional ``file_execution_id`` kwarg that
  ``from_dict`` threads through. The warning emitted for unknown
  wire values now carries the file identifier, so a real
  rolling-deploy incident is debuggable rather than a context-free
  warning. Optional kwarg with default — any existing caller passing
  one positional arg still works.

* **Vishnu #9 [Suggestion]** — added
  ``BatchExecutionResult.from_file_results(...)`` classmethod that
  derives counters from typed file results. Purely additive: no
  existing caller uses it; the constructor signature is unchanged
  so producers that need their own counter semantics keep working.

* **Vishnu #11 [Suggestion]** — ``process_file_batch_api`` was
  computing ``skipped_already_completed`` by string-matching the
  wire dicts AFTER already calling ``from_dict`` on them. Refactored
  to count from the typed list (single ``from_dict`` pass, enum
  compare). Provably equivalent for all current wire data.

* **Vishnu #4 [Important — test gap]** — added
  ``test_from_dict_unknown_skipped_is_lenient`` covering the one
  documented crash-prevention path. A regression to bare
  ``SkipReason(raw)`` would have re-introduced the rolling-deploy
  crash and kept every other test green.

* **Vishnu #5 [Important — failure-aggregation gap]** — added
  ``test_process_file_batch_api_batch_wrapper_failure_aggregation``
  that drives one success + one failure through the batch wrapper.
  The existing success-only test never exercised
  ``failed_files += 1``.

* **Vishnu #6 [Important — populated round-trip gap]** — added
  ``test_round_trip_with_populated_file_results`` and
  ``test_from_file_results_derives_counters``. The existing
  ``BatchExecutionResult`` round-trip test used
  ``file_results=[]``, so the list-comprehension in ``from_dict``
  that rebuilds nested ``FileExecutionResult`` objects was never
  executed with a populated list.

* **Vishnu #13 [Suggestion]** — replaced hardcoded line reference
  in test docstring with a symbol reference.

Deferred to follow-up shared-infra dataclass-redesign ticket:
  * #1 ``__post_init__`` status clobber — would emit warning noise
    on every existing wrong-enum call site
  * #2 alias-pair invariant — back-fill via __post_init__ would
    change the wire shape (file_name no longer None → no longer
    stripped at the top level)
  * #3 ``to_api_dict``/``to_json`` dead code — looks like a public
    SDK surface; changing the body could surprise external consumers
  * #7 recursive ``None``-strip in ``serialize_value`` — touches
    every dataclass in the codebase
  * #10 ``Any`` typing tightening — low value, mypy tightening could
    trip downstream
  * #12 producer redundant kwargs — depends on #2's reconciliation

Tests: workers chord-callback boundary suite 21 -> 25; full workers
suite 622 -> 627 (no new failures; 6 pre-existing baseline
unchanged). Five deterministic-order runs of the full suite returned
exactly 627 passed / 6 pre-existing failed — zero flakiness from
this change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e added a commit that referenced this pull request Jun 17, 2026
…ing + typing/dedup/docs/tests

Decision (with reviewer): reaper-as-safety-net for the un-catchable strand
windows + fix what's catchable + document + gate on PR3.

Failure handling:
- [#69 Critical] run_batch_with_barrier wraps BOTH work + decrement in the abort:
  a decrement-side failure (guard / DB / last-batch callback dispatch) tears the
  barrier down in-body instead of stranding to expiry.
- [#79] extracted _abort_barrier_in_body — logs when the teardown itself fails
  (was silently suppressed under a misleading "torn down" message).
- [#74/#81] documented the two un-catchable strand windows (hard-crash-during-work,
  post-commit callback-dispatch-fail) as a HARD reaper dependency for PR3.
- [#86] finalise cleanup split into independent try/excepts with distinct logs.

Typing / clarity:
- [#1] BarrierContext(TypedDict) for _barrier_context (header fan-out, run_batch_with_barrier,
  process_file_batch).
- [#3] renamed CallbackDescriptor "backend" -> "transport" (WorkflowTransport value;
  avoids the QueueBackend "pg" collision).
- [#27] is_pg_transport() predicate in core; used in orchestration_utils + pg_barrier.
- [#20] extracted _dispatch_pg() — single home for cycle-avoiding local import + backend=PG.
- [#35] normalize_transport() at the general worker entry (parity w/ api/scheduler).
- [#94] log when a header has no queue option.
- [#9/#13] fixed born-stale comment + kwargs-not-args docstring.

Tests (+#37/#41): last-batch self-chains callback to PG + cleans up barrier/dedup;
decrement-failure aborts; PG-branch mid-loop dispatch-failure deletes row; header
args/queue/pre-existing-kwargs preservation. 137 barrier/dedup/routing tests green;
bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
muhammad-ali-e added a commit that referenced this pull request Jun 17, 2026
…fire-and-forget) (#2069)

* UN-3563 [FEAT] PG Queue 9e PR 2c — live PG fan-out/barrier/callback (fire-and-forget)

Wires the coupled pipeline's fan-out → barrier → callback onto the PG queue for
a transport=="pg_queue" execution. Gated: resolve_transport() still returns
celery (PR3 Flipt flips it), so the whole PG branch is present-but-unreachable —
default path byte-identical. Orchestrator task (async_execute_bin) stays on
Celery (hybrid); routing it onto PG is a 2d follow-up.

- barrier.py: Barrier Protocol + CeleryChordBarrier/RedisDecrBarrier accept (and
  ignore) a `transport` param; CallbackDescriptor gains an optional `backend`.
- orchestration_utils._barrier_for_transport: pg_queue → fresh PgBarrier()
  (bypasses the WORKER_BARRIER_BACKEND singleton), else the singleton.
- pg_barrier.PgBarrier.enqueue(transport): pg_queue → fire-and-forget mode —
  _dispatch_header_pg sends each header via dispatch(backend=PG) with an injected
  _barrier_context {execution_id, batch_index, callback_descriptor}, no .link;
  descriptor marked backend=pg_queue; UPSERT block also clears pg_batch_dedup
  (greptile #2068 reuse-reset). _fire_barrier_callback self-chains the callback
  onto PG when backend==pg_queue. clear_execution_batches at finalise + abort.
  run_batch_with_barrier(): claim → work → in-body _barrier_pg_decrement;
  redelivery skips; exception → barrier_pg_abort.
- file_processing.process_file_batch(_barrier_context=None): core routes None →
  _run_batch_stages (celery chord path), else → run_batch_with_barrier.
- general/api fan-outs thread transport into create_chord_execution.

Tests: +8 PgBarrier fire-and-forget + 2 orchestration routing + 2 process_file_batch
routing. Each test file green alone; ruff clean. End-to-end forced-pg dev-test
pending before PR.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix SonarCloud S1172: drop unused task_instance from _run_batch_stages

The extracted _run_batch_stages never uses task_instance — its only purpose
(deriving celery_task_id) happens in _process_file_batch_core before the call.
Removed the param + updated both call sites. _process_file_batch_core keeps
task_instance (it reads .request.id). Routing test mocks with *a, unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 address review (muhammad-ali-e, 15): strand-on-failure hardening + typing/dedup/docs/tests

Decision (with reviewer): reaper-as-safety-net for the un-catchable strand
windows + fix what's catchable + document + gate on PR3.

Failure handling:
- [#69 Critical] run_batch_with_barrier wraps BOTH work + decrement in the abort:
  a decrement-side failure (guard / DB / last-batch callback dispatch) tears the
  barrier down in-body instead of stranding to expiry.
- [#79] extracted _abort_barrier_in_body — logs when the teardown itself fails
  (was silently suppressed under a misleading "torn down" message).
- [#74/#81] documented the two un-catchable strand windows (hard-crash-during-work,
  post-commit callback-dispatch-fail) as a HARD reaper dependency for PR3.
- [#86] finalise cleanup split into independent try/excepts with distinct logs.

Typing / clarity:
- [#1] BarrierContext(TypedDict) for _barrier_context (header fan-out, run_batch_with_barrier,
  process_file_batch).
- [#3] renamed CallbackDescriptor "backend" -> "transport" (WorkflowTransport value;
  avoids the QueueBackend "pg" collision).
- [#27] is_pg_transport() predicate in core; used in orchestration_utils + pg_barrier.
- [#20] extracted _dispatch_pg() — single home for cycle-avoiding local import + backend=PG.
- [#35] normalize_transport() at the general worker entry (parity w/ api/scheduler).
- [#94] log when a header has no queue option.
- [#9/#13] fixed born-stale comment + kwargs-not-args docstring.

Tests (+#37/#41): last-batch self-chains callback to PG + cleans up barrier/dedup;
decrement-failure aborts; PG-branch mid-loop dispatch-failure deletes row; header
args/queue/pre-existing-kwargs preservation. 137 barrier/dedup/routing tests green;
bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix run_batch_with_barrier strand-window doc inconsistency (review)

The second "NOT catchable" bullet conflated two different things: it described
the in-body catchable abort ("the abort here removes the row") and a *software*
callback-dispatch failure — but that failure is already caught + torn down by
step 3's wrap (paragraph 1), so it doesn't belong under the un-catchable
heading, and on the PG path _fire_barrier_callback IS the enqueue so "committed
but before the enqueue" couldn't both hold.

Rewrote the bullet to the genuinely un-catchable window: a hard crash BETWEEN
the decrement committing (remaining→0) and the callback enqueue completing —
decrement committed (redelivery blocked by the marker), process gone before the
callback enqueues or any abort runs, row survives to expiry, reaper-only
recovery. Explicitly notes a software dispatch failure is the catchable case.
Keeps this list an accurate spec for the PR-3 reaper-recovery dependency.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 address greptile (#2069, 2): clear dedup on mid-loop PG failure + carry fairness on PG callback

Both in the gated PG path (greptile 4/5, safe to merge).

- Issue 1: PgBarrier.enqueue mid-loop dispatch-failure handler now also calls
  clear_execution_batches on the PG path. Earlier headers may have committed a
  claim_batch marker; with the barrier row deleted, their in-flight
  barrier_pg_abort is a no-op (already_aborted) and never reaches the clear
  inside it, so reclaim the markers directly here.
- Issue 2: the PG callback now carries the producer's fairness. Added
  _fairness_from_headers() to reconstruct the FairnessKey from the stored
  x-fairness-key headers and pass it to _dispatch_pg, so the callback rides the
  same org/priority as the Celery path (was always default priority).

Tests: +fairness-carried / +fairness-none-safe on _fire_barrier_callback;
extended the PG mid-loop test to assert an already-claimed marker is reclaimed.
75 barrier/dedup tests green; bootstrap clean under WORKER_BARRIER_BACKEND=pg.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix SonarCloud S3776: reduce PgBarrier.enqueue cognitive complexity (17→under 15)

Extracted the per-header dispatch loop into PgBarrier._dispatch_headers — the
deeply-nested for→try/except→if/else→if (PG-vs-celery branch + mid-loop failure
teardown + PG dedup-clear) was the complexity driver. enqueue now calls the
helper; behaviour identical. radon: enqueue C(11)→B(6); ruff C901 passes.
75 barrier/dedup tests green; ruff + ruff-format clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3563 fix greptile #2069: mid-loop dedup-clear test passed for the wrong reason

The pre-seeded claim_batch marker was wiped by enqueue's UPSERT block (the
reuse-reset DELETE) before the dispatch loop, so the mid-loop
clear_execution_batches deleted 0 rows — the count==0 assertion passed on the
UPSERT, not the guard under test. Now the first dispatch side-effect claims the
marker AFTER the UPSERT (simulating a fast PG consumer), so the mid-loop clear
is what removes it. Verified: with the clear disabled the marker orphans (count=1).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.