Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3566 [FEAT] PG Queue 9f — multi-queue consumer + named run-worker.sh PG roles#2073

Merged
muhammad-ali-e merged 5 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3566-FEAT_multiqueue_pg_consumerZipstack/unstract:feat/UN-3566-FEAT_multiqueue_pg_consumerCopy head branch name to clipboard
Jun 18, 2026
Merged

UN-3566 [FEAT] PG Queue 9f — multi-queue consumer + named run-worker.sh PG roles#2073
muhammad-ali-e merged 5 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3566-FEAT_multiqueue_pg_consumerZipstack/unstract:feat/UN-3566-FEAT_multiqueue_pg_consumerCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

  • Multi-queue PG consumer: PgQueueConsumer now takes queue_names: list[str] and polls them round-robin — one process can drain several queues (e.g. ETL + API fan-out) instead of one-process-per-queue.
  • Named PG roles in run-worker.sh: the coupled pipeline's PG consumers are first-class, runnable individually or as a set, like the Celery workers — no per-process env.

Why

  • Running the PG pipeline meant launching 3–4 consumers each with hand-set WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE + -q + distinct -p (a repeated footgun). This makes ./run-worker.sh -d pg (all) and ./run-worker.sh -d pg-fileproc (individual) work just like -d all / -d file. Part of PG Queue Phase 9 (UN-3536), sub-task UN-3566.

How

  • Consumer: queue_names round-robin (read() per queue preserves the (queue_name, priority DESC, msg_id) index's top-N; no queue starves another). WORKER_PG_QUEUE_CONSUMER_QUEUE is comma-parsed; a single value stays a one-element list → back-compat with the existing leaf consumer.
  • Roles (PG_CONSUMER_ROLES, "<source_worker_type>;<queues>"):
    • pg-orchestrator-api (api_deployment → celery_api_deployments) and pg-orchestrator-general (general → celery) — split because async_execute_bin has distinct impls per registry (api-deployment handles it as an API deployment; general routes by workflow type), so one consumer can't serve both.
    • pg-fileproc (file_processing → file_processing,api_file_processing) and pg-callback (callback → both callback queues) — multi-queue, one process each covering ETL + API.
    • The role name rides in argv (python -m pg_queue_consumer <role>) so pgrep (-s/-k/-r) distinguishes the otherwise-identical processes. Roles share the pg_queue_consumer/ launcher dir; resolve_log_file + -L pg know the role log path.
  • ./run-worker.sh -d pg launches the 4 pipeline roles + reaper (mirrors all); -s/-k/-r/-L operate per role.

Can this PR break any existing features?

  • No. All changes are consumer-side ergonomics, gated off by the existing transport flag — no runtime change until PG consumers are run. The single-queue config still works (one-element list), so the existing leaf-webhook consumer and the pg-queue-consumer alias are unaffected. No schema/API change.

Database Migrations

  • None.

Env Config

  • None new. WORKER_PG_QUEUE_CONSUMER_QUEUE now accepts a comma-separated list (single value unchanged).

Notes on Testing

  • Unit: round-robin aggregation across queues; empty queue doesn't starve others; empty-list rejected; comma-parse incl. single-value back-compat. Existing consumer tests updated to the list arg.
  • Dev-tested end-to-end: ./run-worker.sh -d pg-s shows the 4 roles + reaper RUNNING; an API execution drained orchestrator-api → pg-fileproc (one process draining both file_processing and api_file_processing) → pg-callback → COMPLETED; -r pg-fileproc restarted only that role (pg-callback PID unchanged).

Related Issues

Checklist

  • Appropriate PR title and description
  • Self-review performed
  • Tests added that prove the feature works
  • New and existing unit tests pass locally
  • Pre-commit passes on all changed files

🤖 Generated with Claude Code

…sh PG roles

Make PG-queue workers first-class in run-worker.sh, runnable individually or as a
set, like the Celery workers — removing the per-process-env footgun.

- Multi-queue consumer: PgQueueConsumer takes queue_names: list[str] and polls
  them round-robin (per-queue read preserves the dequeue index's top-N; no queue
  starves another). WORKER_PG_QUEUE_CONSUMER_QUEUE is comma-parsed; a single
  value stays a one-element list (back-compat with the existing leaf consumer).
- run-worker.sh named roles (PG_CONSUMER_ROLES), each a registry-bound consumer
  with its source worker-type + queue list baked in (no manual env):
  pg-orchestrator-api / pg-orchestrator-general (async_execute_bin has distinct
  impls per registry → split), pg-fileproc / pg-callback (multi-queue, one
  process drains ETL+API). The role name rides in argv (python -m
  pg_queue_consumer <role>) so pgrep (-s/-k/-r) tells co-running roles apart.
- `./run-worker.sh -d pg` launches the 4 pipeline roles + reaper (mirrors `all`);
  each role also runs/kills/restarts/tails individually. resolve_log_file + the
  -L pg set + status/kill enumerate the roles.

Tests: round-robin aggregation, empty-queue-no-starve, empty-list reject,
comma-parse (incl single-value back-compat); existing consumer tests updated to
the list arg.

Dev-tested: `./run-worker.sh -d pg` → status shows 4 roles + reaper RUNNING; an
API execution drains orchestrator-api → fileproc (draining both file_processing
and api_file_processing) → callback → COMPLETED; `-r pg-fileproc` restarts only
that role, leaving pg-callback untouched.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e5e30918-776f-415c-bcbd-c674165ea247

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/UN-3566-FEAT_multiqueue_pg_consumer

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

muhammad-ali-e and others added 3 commits June 18, 2026 14:55
… consumer pgrep

- Multiple positional worker types: `./run-worker.sh -d all pg` starts the Celery
  set and the PG set in one shot (loops WORKER_TYPES). Warns if >1 type is given
  without -d, since a non-detached set `wait`s and would block the rest.
- Tighten the generic pg_queue_consumer pgrep to end-anchored so --status/-k/-r
  for the generic consumer no longer also match (and aggregate/kill) the named
  role consumers, which run as `python -m pg_queue_consumer <role>` and have
  their own role-anchored pattern.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Define PG_ROLE_ORCH_API / _ORCH_GENERAL / _FILEPROC / _CALLBACK once and
reference them in PG_CONSUMER_ROLES, PG_QUEUE_MEMBERS and WORKERS, instead of
repeating each role-name string literal 4×.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…metric with 'pg')

`./run-worker.sh -d celery` now runs the Celery set (== `all`, excludes the PG
workers), so celery-only / pg-only / both read symmetrically:
  -d celery   (Celery only)
  -d pg       (PG only)
  -d celery pg | -d all pg   (both)

'celery' maps to "all" in WORKERS → dispatches to run_all_workers, is skipped by
list_core_worker_dirs (no phantom dir), and falls through to the restart-all
branch under -r. The pre-existing `-L celery` log-tail alias is unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier). Findings below are inline. The single blocker is the -r pg restart bug (duplicate role consumers double-polling Postgres); everything else is High/Medium/Low. Line anchors point at the nearest diffed line — exact file:line is called out in each comment body where it differs.

Comment thread workers/run-worker.sh
Comment thread workers/queue_backend/pg_queue/consumer.py
Comment thread workers/run-worker.sh
Comment thread workers/run-worker.sh
Comment thread workers/queue_backend/pg_queue/consumer.py Outdated
Comment thread workers/run-worker.sh Outdated
Comment thread workers/run-worker.sh
Comment thread workers/run-worker.sh
Comment thread workers/run-worker.sh Outdated
Comment thread workers/tests/test_pg_queue_consumer.py
…, stale comments

Critical/High + cleanup from the #2073 review round:
- [Critical] `-r pg` (restart set) now kills the SAME members run_pg_queue_set
  launches (the 4 named roles + reaper), not just the generic consumer — was
  leaving the roles running, so a relaunch double-polled Postgres.
- [High] poll_once isolates each queue: a read/handle failure on one queue is
  logged and skipped so the others still run AND the work already done this cycle
  still counts (no false empty-queue backoff after a partial failure).
- [High] fixed the stale CELERY_SET comment that claimed "no 'celery' run alias".
- [Medium-High] run_pg_queue_set start-failure teardown aggregates kill_one_worker
  returns and surfaces a survivor (mirrors the restart path).
- [Medium] de-dup + copy queue_names (list(dict.fromkeys(...))) — a duplicate
  would double-read a queue; the copy stops a caller mutation bypassing validation.
- [Medium] _parse_queue_list warns when it drops empty entries (config typo).
- [Med/Low] comment fixes: only fileproc/callback are multi-queue; "4 roles +
  reaper" not "consumer + reaper"; "read once per cycle in list order" not
  round-robin. Cleanups: drop redundant export; cwd predicate uses PG_QUEUE_MEMBERS;
  spell out api_file_processing_callback in help.

Tests: +one-queue-failing-isolation, +batch_size qty per queue, +duplicate dedup.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round addressed — 9b3905e5e

Thanks for the deep pass. All 13 handled:

Correctness (Critical/High):

  • -r pg now kills the 4 roles + reaper (was leaving the roles → double-poll on relaunch); dev-tested.
  • poll_once isolates each queue (one queue failing no longer aborts the others or triggers a false empty-backoff).
  • de-dup + copy queue_names; run_pg_queue_set teardown surfaces a survivor.

Comments/cleanup: fixed the stale "no celery run alias" comment, the "each role is multi-queue" / "consumer + reaper" / "round-robin" inaccuracies; dropped the redundant export; cwd predicate now uses PG_QUEUE_MEMBERS; _parse_queue_list warns on dropped entries; spelled out the queue name in help.

Tests: +partial-failure isolation, +batch_size qty per queue, +duplicate dedup.

Deferred (1, you flagged follow-up): the all pg startup-signal asymmetry is pre-existing in run_all_workers (subshell wait without status capture) — a dedicated follow-up, not 9f.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 18, 2026 10:03
@muhammad-ali-e muhammad-ali-e merged commit 5f4cdf0 into feat/UN-3445-pg-queue-integration Jun 18, 2026
5 checks passed
@muhammad-ali-e muhammad-ali-e deleted the feat/UN-3566-FEAT_multiqueue_pg_consumer branch June 18, 2026 10:04
@greptile-apps

greptile-apps Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR upgrades PgQueueConsumer to accept a list of queue names and poll them round-robin per cycle, and wires four named pipeline consumer roles into run-worker.sh so each can be started individually or as a set via ./run-worker.sh -d pg.

  • consumer.py: queue_name: strqueue_names: list[str]; poll_once() iterates per-queue with isolated exception handling so one queue's failure doesn't abort the others; _parse_queue_list handles comma-separated env back-compatibly.
  • run-worker.sh: Adds PG_CONSUMER_ROLES (baked-in worker-type + queues per role), per-role pgrep patterns, multi-type launch (-d all pg), and the celery run alias; log, status, kill, and restart paths all follow PG_CONSUMER_ROLES so the new roles are self-consistent.
  • Tests: Updated all existing call-sites to the list API; new TestMultiQueue covers round-robin aggregation, starvation prevention, per-queue failure isolation, deduplication, and _parse_queue_list back-compat.

Confidence Score: 4/5

Safe to merge; the change is additive and gated by the existing transport flag, so no live Celery paths are affected.

The Python multi-queue consumer logic is correct: per-queue exception isolation, order-preserving dedup, back-compat single-value parsing, and the updated tests all look solid. The only gap is that the four new named roles are absent from OPTIN_WORKERS, so show_status will display them as STOPPED after a plain ./run-worker.sh all — the same misleading output the existing pg-queue-consumer entry in OPTIN_WORKERS was added to prevent.

workers/run-worker.sh — the OPTIN_WORKERS map needs the four new named roles added alongside the existing PG consumer and reaper entries.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/consumer.py Signature changed from single queue_name: str to queue_names: list[str]; poll_once() now iterates per-queue with per-queue exception isolation; _parse_queue_list added for comma-separated env parsing; all tests updated. No correctness issues found.
workers/run-worker.sh Adds 4 named PG consumer roles (PG_CONSUMER_ROLES map), multi-type launch support, and matching pgrep patterns. One omission: the new roles are absent from OPTIN_WORKERS, causing misleading STOPPED status after all.
workers/tests/test_pg_queue_consumer.py All existing call-sites updated to list API; new TestMultiQueue class covers round-robin, starvation, failure isolation, dedup, and batch_size propagation; _parse_queue_list tested for back-compat and edge cases.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant S as run-worker.sh
    participant PGS as run_pg_queue_set()
    participant RW as run_worker()
    participant ENV as Environment
    participant L as pg_queue_consumer/__main__.py
    participant C as PgQueueConsumer

    S->>PGS: -d pg (or pg-queue)
    loop for each role in PG_CONSUMER_ROLES + reaper
        PGS->>RW: "worker_type = e.g. pg-fileproc"
        RW->>ENV: "export WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=file_processing"
        RW->>ENV: "export WORKER_PG_QUEUE_CONSUMER_QUEUE=file_processing,api_file_processing"
        RW->>L: uv run python -m pg_queue_consumer pg-fileproc
        L->>ENV: "os.environ[WORKER_TYPE] = WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE"
        L->>L: import worker (registers file_processing tasks)
        L->>C: "PgQueueConsumer(queue_names=[file_processing, api_file_processing])"
        loop poll cycle
            C->>C: poll_once()
            C->>C: read(file_processing) → handle messages
            C->>C: read(api_file_processing) → handle messages
        end
    end
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant S as run-worker.sh
    participant PGS as run_pg_queue_set()
    participant RW as run_worker()
    participant ENV as Environment
    participant L as pg_queue_consumer/__main__.py
    participant C as PgQueueConsumer

    S->>PGS: -d pg (or pg-queue)
    loop for each role in PG_CONSUMER_ROLES + reaper
        PGS->>RW: "worker_type = e.g. pg-fileproc"
        RW->>ENV: "export WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=file_processing"
        RW->>ENV: "export WORKER_PG_QUEUE_CONSUMER_QUEUE=file_processing,api_file_processing"
        RW->>L: uv run python -m pg_queue_consumer pg-fileproc
        L->>ENV: "os.environ[WORKER_TYPE] = WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE"
        L->>L: import worker (registers file_processing tasks)
        L->>C: "PgQueueConsumer(queue_names=[file_processing, api_file_processing])"
        loop poll cycle
            C->>C: poll_once()
            C->>C: read(file_processing) → handle messages
            C->>C: read(api_file_processing) → handle messages
        end
    end
Loading

Comments Outside Diff (1)

  1. workers/run-worker.sh, line 169-172 (link)

    P2 Named PG roles missing from OPTIN_WORKERS

    The four new pipeline consumer roles (pg-orchestrator-api, pg-orchestrator-general, pg-fileproc, pg-callback) are in WORKERS and PG_QUEUE_MEMBERS but not in OPTIN_WORKERS. The opt-in guard in show_status() (line 654) suppresses STOPPED status only for workers listed there — so after ./run-worker.sh all, all four roles will appear as STOPPED even though they're intentionally not part of the Celery set. The existing pg-queue-consumer and pg-queue-reaper are already in OPTIN_WORKERS for this exact reason; the named roles follow the same opt-in pattern and should be treated identically.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: workers/run-worker.sh
    Line: 169-172
    
    Comment:
    **Named PG roles missing from `OPTIN_WORKERS`**
    
    The four new pipeline consumer roles (`pg-orchestrator-api`, `pg-orchestrator-general`, `pg-fileproc`, `pg-callback`) are in `WORKERS` and `PG_QUEUE_MEMBERS` but not in `OPTIN_WORKERS`. The opt-in guard in `show_status()` (line 654) suppresses STOPPED status only for workers listed there — so after `./run-worker.sh all`, all four roles will appear as STOPPED even though they're intentionally not part of the Celery set. The existing `pg-queue-consumer` and `pg-queue-reaper` are already in `OPTIN_WORKERS` for this exact reason; the named roles follow the same opt-in pattern and should be treated identically.
    
    
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Claude Code

Fix All in Claude Code

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
workers/run-worker.sh:169-172
**Named PG roles missing from `OPTIN_WORKERS`**

The four new pipeline consumer roles (`pg-orchestrator-api`, `pg-orchestrator-general`, `pg-fileproc`, `pg-callback`) are in `WORKERS` and `PG_QUEUE_MEMBERS` but not in `OPTIN_WORKERS`. The opt-in guard in `show_status()` (line 654) suppresses STOPPED status only for workers listed there — so after `./run-worker.sh all`, all four roles will appear as STOPPED even though they're intentionally not part of the Celery set. The existing `pg-queue-consumer` and `pg-queue-reaper` are already in `OPTIN_WORKERS` for this exact reason; the named roles follow the same opt-in pattern and should be treated identically.

```suggestion
declare -A OPTIN_WORKERS=(
    ["$PG_QUEUE_CONSUMER_TYPE"]=1
    ["$PG_QUEUE_REAPER_TYPE"]=1
    ["$PG_ROLE_ORCH_API"]=1
    ["$PG_ROLE_ORCH_GENERAL"]=1
    ["$PG_ROLE_FILEPROC"]=1
    ["$PG_ROLE_CALLBACK"]=1
)
```

Reviews (1): Last reviewed commit: "UN-3566 address review: -r pg kills role..." | Re-trigger Greptile

muhammad-ali-e added a commit that referenced this pull request Jun 18, 2026
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
muhammad-ali-e added a commit that referenced this pull request Jun 18, 2026
…trator loop (per-schedule ownership) (#2081)

* UN-3596 [FEAT] PG Queue 9h-b — PG scheduler tick in the reaper/orchestrator loop (per-schedule ownership)

Adds the periodic-trigger half of the orchestrator: the leader-elected reaper
now also fires due, PG-owned schedules onto the PG queue — the Celery Beat
replacement — without Beat/RabbitMQ in the trigger path. Dark by default;
never double-fires.

- pg_scheduler.py (new): dispatch_due_schedules() scans pg_periodic_schedule for
  pg_owned + enabled + due rows, enqueues scheduler.tasks.execute_pipeline_task
  on the PG `scheduler` queue AND advances next_run_at in ONE transaction (a
  crash between can't re-fire). A NULL next_run_at records a baseline and does
  NOT fire (no burst when a schedule is handed over; matches Beat). A bad cron on
  one row is logged and skipped without blocking the others. croniter computes
  next-run; all time comparisons use the DB clock.
- pg_queue/models.py + migration 0009: pg_owned flag (default False = Beat owns
  it; the PG scheduler fires only owned rows) + due index (pg_owned, enabled,
  next_run_at). Default-false keeps the table inert until a schedule is handed
  over, and a schedule fires from exactly one side — never both.
- reaper.py: the leader tick runs the scheduler AFTER recovery (a scheduler error
  can't starve the recovery net).
- workers deps: add croniter (already a backend dep).
- run-worker.sh + docker-compose: pg-scheduler consumer role + service
  (profile-gated) that runs the fired execute_pipeline_task.

Out of scope (next slice ②c): the ramp control that flips pg_owned by percentage
+ disables the matching Beat PeriodicTask atomically (reusing the existing Flipt
mechanism), the one-time backfill, and retiring Beat.

Non-regression: pg_owned defaults False, so the reaper fires nothing until rows
are explicitly owned; recovery-only behaviour is unchanged. Tests: 10 scheduler
(real-PG) + 3 reaper-wiring; full reaper suite kept green via a scheduler stub.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3596 address review: per-row DB isolation, self-quiescing bad cron, typed rows, shared INSERT SQL, stronger tests

- [Critical] per-row fire (INSERT+UPDATE+commit) now wrapped in try/except →
  rollback + log + continue, so one bad row can't poison the connection or drop
  the rest of the batch (mirrors recover_expired_barriers). Baseline UPDATE too.
- [High] invalid cron now disables the row (enabled=FALSE) + logs once, instead
  of re-selecting it and emitting a traceback every ~5s tick forever.
- [Med] read step (SELECT now() + due scan) wraps rollback + re-raise so the
  conn isn't handed back in an aborted-txn state.
- [High] softened the "never double-fires" docstring + models.py comment: the
  guarantee is CONDITIONAL on the ②c ramp control disabling Beat; pre-ramp,
  safety rests on pg_owned defaulting to False.
- [Med] _build_trigger_payload -> TaskPayload; workflow_id/pipeline_id typed
  str | uuid.UUID (| None); _DueSchedule NamedTuple binds SELECT columns to
  names at one site (no silent misassign on a reorder).
- [Med] extracted INSERT_MESSAGE_SQL constant in client.py; send() and the
  scheduler share it (no verbatim SQL duplication).
- [Low] comment fixes: reaper tick (ordering not isolation), execute_pipeline_task
  blanks vs Beat populating execution_action, models.py drop "next slice".
- tests: fired == 1 (not >= 1, catches double-fire); next_run asserted at the
  cron's 09:00 match; baseline asserts == 0; +tz-aware next-run; +multi-row
  (fired == 2); +atomicity (advance UPDATE fails post-INSERT → enqueue rolls
  back, next_run unchanged); +reaper scheduler-error-discards-owned-conn. 75 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3596 chore: drop accidentally-committed 9f-design.md (untracked scoping doc)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3596 address greptile P1: roll back if the bad-cron disable UPDATE fails

_quiesce_invalid_cron used contextlib.suppress around the cursor block, so if
the enabled=FALSE UPDATE raised, commit() was skipped and the connection was
left in an aborted-transaction state — poisoning the NEXT row's INSERT (caught
by the outer handler and mislogged as "failed to fire"). Wrap in try/except with
conn.rollback() on failure so the connection is always clean for the next row.
+test: a forced disable-UPDATE failure on a bad-cron row doesn't stop a
following healthy row from firing.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* UN-3596 chore: remove superseded 9f-design.md (impl merged in #2073)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.