UN-3596 [FEAT] PG Queue 9h-b — PG scheduler tick in the reaper/orchestrator loop (per-schedule ownership)#2081
UN-3596 [FEAT] PG Queue 9h-b — PG scheduler tick in the reaper/orchestrator loop (per-schedule ownership)#2081muhammad-ali-e merged 5 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom feat/UN-3596-FEAT_pg_scheduler_tickZipstack/unstract:feat/UN-3596-FEAT_pg_scheduler_tickCopy head branch name to clipboard
Conversation
…trator loop (per-schedule ownership) Adds the periodic-trigger half of the orchestrator: the leader-elected reaper now also fires due, PG-owned schedules onto the PG queue — the Celery Beat replacement — without Beat/RabbitMQ in the trigger path. Dark by default; never double-fires. - pg_scheduler.py (new): dispatch_due_schedules() scans pg_periodic_schedule for pg_owned + enabled + due rows, enqueues scheduler.tasks.execute_pipeline_task on the PG `scheduler` queue AND advances next_run_at in ONE transaction (a crash between can't re-fire). A NULL next_run_at records a baseline and does NOT fire (no burst when a schedule is handed over; matches Beat). A bad cron on one row is logged and skipped without blocking the others. croniter computes next-run; all time comparisons use the DB clock. - pg_queue/models.py + migration 0009: pg_owned flag (default False = Beat owns it; the PG scheduler fires only owned rows) + due index (pg_owned, enabled, next_run_at). Default-false keeps the table inert until a schedule is handed over, and a schedule fires from exactly one side — never both. - reaper.py: the leader tick runs the scheduler AFTER recovery (a scheduler error can't starve the recovery net). - workers deps: add croniter (already a backend dep). - run-worker.sh + docker-compose: pg-scheduler consumer role + service (profile-gated) that runs the fired execute_pipeline_task. Out of scope (next slice ②c): the ramp control that flips pg_owned by percentage + disables the matching Beat PeriodicTask atomically (reusing the existing Flipt mechanism), the one-time backfill, and retiring Beat. Non-regression: pg_owned defaults False, so the reaper fires nothing until rows are explicitly owned; recovery-only behaviour is unchanged. Tests: 10 scheduler (real-PG) + 3 reaper-wiring; full reaper suite kept green via a scheduler stub. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review — PG Scheduler Tick (UN-3596)
Reviewed with the PR Review Toolkit (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier). Overall this is a careful, well-documented change — the inlined INSERT matches the canonical PgQueueClient.send shape, the atomic enqueue+advance is correct, leader-gating and post-recovery ordering are right, and the dark-by-default pg_owned rollout is sound.
The findings below are concentrated in error isolation (the per-row loop), one correctness invariant that is not yet enforced (no-double-fire depends on an unbuilt ramp control), and some type/test hardening. Severity is tagged per inline comment.
Priority: 🔴 the per-row try/except (else one bad schedule drops every other due fire and poisons the connection) → 🟠 bad-cron wedge + the unenforced no-double-fire invariant → 🟡 type contracts, SELECT rollback, test gaps, SQL dedup → 🔵 comment-rot nits.
Inline comments follow.
| pipeline_id=pipeline_id, | ||
| pipeline_name=pipeline_name, | ||
| ) | ||
| with conn.cursor() as cur: |
There was a problem hiding this comment.
🔴 Critical — per-row fire has no try/except; one bad row aborts the whole batch and poisons the connection.
The fire path (INSERT + UPDATE + conn.commit(), lines 147–167) runs with no exception handling. If any statement or the commit fails (serialization failure, check-constraint violation, transient socket drop, oversized payload), the exception propagates out of dispatch_due_schedules immediately:
- All remaining due rows in
dueare dropped this cycle — rows after the failing one never fire (they re-fire next tick, but the batch is non-isolated). - The connection is left in an aborted-transaction state (
InFailedSqlTransactionon every subsequent statement) — there is noconn.rollback()anywhere in this function. The owned-conn path is rescued by_discard_owned_sweep_conn(), but an injected conn (the documented caller-supplied path, and how the real-PG tests drive it) is handed back poisoned.
The docstring (lines 85–87) already promises per-row isolation, but that only covers the cron-parse step, not the DB step. The sibling recover_expired_barriers does exactly the right thing: per-row try/except → conn.rollback() → logger.exception(...) → continue.
Fix: wrap the per-row DB work (both the fire branch here and the baseline UPDATE at lines 124–139) in try/except that rolls back and continues:
try:
with conn.cursor() as cur:
cur.execute("INSERT INTO pg_queue_message ...", (...))
cur.execute("UPDATE pg_periodic_schedule SET last_run_at=%s, next_run_at=%s WHERE pipeline_id=%s", (...))
conn.commit()
except Exception:
with contextlib.suppress(Exception):
conn.rollback()
logger.exception("PG scheduler: failed to fire pipeline %s — leaving for next tick", pipeline_id)
continue
fired += 1There was a problem hiding this comment.
Fixed in 4c25ae5 — per-row fire (INSERT+UPDATE+commit) and the baseline UPDATE are now wrapped in try/except → conn.rollback() + logger.exception + continue, exactly like recover_expired_barriers. A row-level failure no longer poisons the (injected or owned) connection or drops the rest of the batch. +atomicity test forcing the advance UPDATE to fail post-INSERT and asserting the enqueue rolled back.
| ) in due: | ||
| try: | ||
| nxt = compute_next_run(cron_string, base) | ||
| except Exception: |
There was a problem hiding this comment.
🟠 High — a permanently-invalid cron wedges the row and emits a full traceback every tick, forever.
When compute_next_run raises (croniter raises at construction for empty/whitespace/malformed crons, and the model allows cron_string = ""), this branch logs and continues before touching next_run_at. Because the row is selected on next_run_at IS NULL OR next_run_at <= now() and its next_run_at is never advanced, the same row is re-selected on every tick. At the default ~5s reaper interval that is a logger.exception (full traceback to Sentry) every ~5s per bad row, indefinitely — burying real errors and re-parsing the bad cron each cycle. The schedule also silently never fires, with no durable surfacing to an operator.
Fix: make the bad-cron state self-quiescing and actionable — disable the row (or push next_run_at far out) so it stops being re-selected, and log once at the transition:
except Exception:
logger.exception("PG scheduler: invalid cron %r for pipeline %s — disabling row", cron_string, pipeline_id)
with contextlib.suppress(Exception):
with conn.cursor() as cur:
cur.execute("UPDATE pg_periodic_schedule SET enabled = FALSE WHERE pipeline_id = %s", (pipeline_id,))
conn.commit()
continueIf auto-disable is undesirable by product policy, at minimum dedupe the logging (log at error only on first observation) — re-erroring every 5s is not acceptable.
There was a problem hiding this comment.
Fixed in 4c25ae5 — _quiesce_invalid_cron now sets enabled = FALSE on the bad row (+ logs once at the transition), so it stops being re-selected and stops re-emitting a traceback every tick. +test asserting the bad-cron row ends up disabled while the good row still fires.
| its ``next_run_at``, a due row enqueues + advances atomically. A bad cron on | ||
| one row is logged and skipped without blocking the others. | ||
| """ | ||
| with conn.cursor() as cur: |
There was a problem hiding this comment.
🟡 Medium — the read step (SELECT now() + due scan) has no rollback on error, deviating from the sibling pattern.
The directly-analogous recover_expired_barriers wraps its read in try/except: conn.rollback(); raise precisely so the connection isn't left in an aborted-txn state. Here, if the SELECT fails mid-read the exception propagates with the transaction still open. As with the fire path, the owned-conn case is rescued by _discard_owned_sweep_conn() but the injected-conn case is left poisoned.
Fix — mirror recover_expired_barriers:
try:
with conn.cursor() as cur:
cur.execute("SELECT now()")
base = cur.fetchone()[0]
cur.execute("SELECT ... WHERE ...", (base,))
due = cur.fetchall()
conn.commit()
except Exception:
with contextlib.suppress(Exception):
conn.rollback()
raiseThere was a problem hiding this comment.
Fixed in 4c25ae5 — the read step (SELECT now() + due scan) now wraps try/except: conn.rollback(); raise, mirroring recover_expired_barriers, so the connection is never handed back in an aborted-txn state.
|
|
||
| Two correctness properties: | ||
|
|
||
| - **Never double-fires.** A row fires from exactly one side: ``pg_owned`` rows |
There was a problem hiding this comment.
🟠 High — the "Never double-fires" property is stated as already-true but currently has no enforcement.
The guarantee depends on the Beat PeriodicTask being disabled for pg_owned rows — done "by the ramp control, next slice", a component that does not exist in the repo yet (confirmed: nothing sets pg_owned=True and no ramp control exists). Until it lands, a row manually flipped to pg_owned=True while its Beat PeriodicTask is still enabled will double-fire. The only thing actually preventing double-fires today is pg_owned defaulting to False (nothing is owned).
This is a cross-table invariant, so a CheckConstraint can't express it. Suggestions: (1) soften this docstring + the models.py comment to state the guarantee is conditional on the ramp control disabling Beat and that, pre-ramp, safety rests on pg_owned=False; (2) before/with the ramp control, add a reconciliation check or integration test asserting no pg_owned=True row has a correspondingly-enabled PeriodicTask.
There was a problem hiding this comment.
Fixed in 4c25ae5 — softened both the docstring and the models.py comment: the no-double-fire guarantee is now stated as CONDITIONAL on the ②c ramp control disabling the matching Beat PeriodicTask, and that until it lands safety rests on pg_owned defaulting to False (a manual flip while the PeriodicTask is still enabled would double-fire). An integration check that no pg_owned row has an enabled PeriodicTask will land with the ramp control.
| organization_id: str, | ||
| pipeline_id: object, | ||
| pipeline_name: str, | ||
| ) -> dict: |
There was a problem hiding this comment.
🟡 Medium — type contract is weaker than what the code already has in hand.
-> dictdiscards a type that exists: this returnsto_payload(...)'sTaskPayload(a TypedDict). The rest of the package returns typed rows everywhere (PgQueueClient.read -> list[QueueMessage],PgReaper.tick -> TickOutcome); this is the one place that drops it. Annotate-> TaskPayload.workflow_id: object/pipeline_id: object(lines 53, 55) say "literally anything," the opposite of the real contract. At runtime these areuuid.UUID(psycopg2) orstr(tests). Usestr | uuid.UUID(andstr | uuid.UUID | Noneforworkflow_id).- Minor asymmetry:
workflow_idis truthiness-guarded (line 66) butpipeline_idis unconditionallystr(pipeline_id)(line 70) → would render"None"if ever NULL. Safe today (PK), but consider asserting non-None to make the asymmetry intentional.
There was a problem hiding this comment.
Fixed in 4c25ae5 — _build_trigger_payload -> TaskPayload; workflow_id: str | uuid.UUID | None, pipeline_id: str | uuid.UUID. (pipeline_id is the PK so never None; left it unconditional str() since the NamedTuple types it non-optional.)
| # Owned by the scheduler tick (next slice); NULL until then. | ||
| # Per-schedule rollout switch. The PG scheduler fires a row ONLY when this is | ||
| # True; when a schedule is pg_owned its Celery Beat PeriodicTask is disabled | ||
| # (done by the ramp control — next slice) so it fires from exactly one side, |
There was a problem hiding this comment.
🔵 Low (comment-rot) — "done by the ramp control — next slice".
Two rot risks: (1) it states the Beat-disable as already-arranged, but that component doesn't exist yet (see the no-double-fire comment on pg_scheduler.py); (2) "next slice" is a transient delivery-plan artifact with no anchor once merged to main. Replace "next slice" with a stable reference (the ramp-control ticket, e.g. UN-xxxx, or "a later change") and make the dependency explicit so the absence of enforcement reads as intentional.
There was a problem hiding this comment.
Fixed in 4c25ae5 — dropped 'next slice' and the already-arranged phrasing; the comment now states the dependency explicitly (the ②c ramp control must disable Beat atomically) and that pre-ramp safety rests on the False default.
|
|
||
| fired = dispatch_due_schedules(conn) | ||
|
|
||
| assert fired >= 1 |
There was a problem hiding this comment.
🟡 Medium (test rigor) — assertions weak enough to pass under the exact regressions this module guards against.
assert fired >= 1(line 116): with a single seeded row this also passes if the row fired twice — the very double-fire bug the module claims to prevent. Tighten to== 1.assert next_run > past(line 124): only checks monotonicity, not correctness — passes even if the next run were computed wrong (e.g. +1 minute instead of next 09:00). Assert equality against the cron-computed value.test_null_next_run_baselines_without_firingshould alsoassert dispatch_due_schedules(conn) == 0to pin "baseline counts as not-fired."
There was a problem hiding this comment.
Fixed in 4c25ae5 — fired == 1 (catches a double-fire on a single seeded row); next_run asserted at the cron's 09:00 match (hour/minute/second), not just monotonic; the baseline test now asserts dispatch_due_schedules(conn) == 0.
|
|
||
|
|
||
| class TestDispatchDueSchedules: | ||
| def test_due_owned_row_fires_and_advances(self, clean): |
There was a problem hiding this comment.
🟡 Medium (test gaps) — the two headline correctness properties and the most-likely prod bug are untested.
- Atomicity / no-re-fire (highest value): every test hits the happy commit path. Add a real-PG test that forces the advance
UPDATEto fail after the INSERT, expects the exception, then asserts_queued_messages(conn) == []andnext_run_atunchanged — proving the INSERT rolled back with the UPDATE (defends "a crash between them can't re-fire"). - Timezone:
compute_next_run's unit tests use naive datetimes, but productionbaseis tz-aware (SELECT now()). Add a pure test with an aware base asserting the result is aware+correct, and intest_due_owned_row_fires_and_advancesassert the persistednext_run_atequals the cron-computed value. This is the single most likely place for a real prod bug. - Multi-row: no test that two due, valid, owned rows both fire (
fired == 2, two distinct messages). This is the normal production case and the per-row-transaction design specifically supports it.
There was a problem hiding this comment.
Fixed in 4c25ae5 — added: (1) atomicity test — a conn wrapper fails the advance UPDATE after the INSERT; asserts no message persisted AND next_run_at unchanged (INSERT rolled back with the UPDATE); (2) tz-aware compute_next_run test (aware base → aware+correct result); (3) multi-row test (two due owned rows → fired == 2, two distinct messages).
| # dummy or barrier-only connections. Tests that assert the wiring opt in via the | ||
| # returned mock. | ||
| @pytest.fixture(autouse=True) | ||
| def stub_scheduler_tick(monkeypatch): |
There was a problem hiding this comment.
🟡 Medium (test gap) — the autouse stub is reasonable, but it hides the scheduler error-isolation wiring.
Stubbing dispatch_due_schedules module-wide is the right call for the leadership/recovery/connection tests. But nothing covers the try/except at reaper.py:495-499 for the scheduler call: test_failed_sweep_discards_owned_conn only exercises the recover_expired_barriers path. If someone deleted the try/except around the scheduler call (or dropped the _discard_owned_sweep_conn()), no test would fail.
Fix (in TestSchedulerTick): set stub_scheduler_tick.side_effect = psycopg2.OperationalError(...) with recover_expired_barriers patched to return [], assert reaper.tick() raises and the owned conn was discarded.
There was a problem hiding this comment.
Fixed in 4c25ae5 — added test_scheduler_error_discards_owned_conn in TestSchedulerTick: stub_scheduler_tick.side_effect = psycopg2.OperationalError(...) with recovery patched → asserts tick() raises AND the owned sweep conn was discarded (set to None).
| # PG Queue — direct Postgres access for the bespoke SKIP LOCKED queue | ||
| # (reuses the backend's DB_* connection: PgBouncer in cloud, direct in OSS) | ||
| "psycopg2-binary==2.9.9", # matches backend; first direct-DB worker capability | ||
| "croniter>=3.0.3", # cron next-run computation for the PG scheduler tick (matches backend) |
There was a problem hiding this comment.
🔵 Info — croniter lock drift. Both backend and workers pin croniter>=3.0.3, but the workers uv.lock resolved to 6.2.2 while the backend uv.lock is at 6.0.0. Same constraint and major version, so behavior is consistent — flagging only so a future lock refresh aligns them.
There was a problem hiding this comment.
Acknowledged — lock drift noted (workers resolved croniter 6.2.2 vs backend 6.0.0; same >=3.0.3 constraint + major, so behaviour is consistent). Left as-is for this PR; flagging for a future aligned lock refresh.
…, typed rows, shared INSERT SQL, stronger tests - [Critical] per-row fire (INSERT+UPDATE+commit) now wrapped in try/except → rollback + log + continue, so one bad row can't poison the connection or drop the rest of the batch (mirrors recover_expired_barriers). Baseline UPDATE too. - [High] invalid cron now disables the row (enabled=FALSE) + logs once, instead of re-selecting it and emitting a traceback every ~5s tick forever. - [Med] read step (SELECT now() + due scan) wraps rollback + re-raise so the conn isn't handed back in an aborted-txn state. - [High] softened the "never double-fires" docstring + models.py comment: the guarantee is CONDITIONAL on the ②c ramp control disabling Beat; pre-ramp, safety rests on pg_owned defaulting to False. - [Med] _build_trigger_payload -> TaskPayload; workflow_id/pipeline_id typed str | uuid.UUID (| None); _DueSchedule NamedTuple binds SELECT columns to names at one site (no silent misassign on a reorder). - [Med] extracted INSERT_MESSAGE_SQL constant in client.py; send() and the scheduler share it (no verbatim SQL duplication). - [Low] comment fixes: reaper tick (ordering not isolation), execute_pipeline_task blanks vs Beat populating execution_action, models.py drop "next slice". - tests: fired == 1 (not >= 1, catches double-fire); next_run asserted at the cron's 09:00 match; baseline asserts == 0; +tz-aware next-run; +multi-row (fired == 2); +atomicity (advance UPDATE fails post-INSERT → enqueue rolls back, next_run unchanged); +reaper scheduler-error-discards-owned-conn. 75 green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…oping doc) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review round 1 addressed —
|
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/pg_scheduler.py | New scheduler tick module; per-row isolation, atomic enqueue+advance, bad-cron quiesce with rollback guard, and correct timezone-aware time handling throughout. No issues found. |
| workers/queue_backend/pg_queue/reaper.py | Adds dispatch_due_schedules call after recover_expired_barriers, ordered correctly; connection discard on failure mirrors the existing recovery posture. |
| workers/queue_backend/pg_queue/client.py | Extracts INSERT_MESSAGE_SQL as a shared constant; send() continues to append RETURNING msg_id; the scheduler uses the constant directly inside its own transaction. Clean refactor with no behaviour change. |
| backend/pg_queue/migrations/0009_remove_pgperiodicschedule_pg_periodic_schedule_due_idx_and_more.py | Drops the old two-column index, adds pg_owned (default False, non-breaking), and creates the new three-column index in the correct order for the WHERE clause used by the scheduler. |
| workers/tests/test_pg_scheduler.py | Comprehensive test coverage: pure helper tests (cron arithmetic, tz-awareness, payload shape) plus real-PG integration tests for all dispatch paths. _FailingConn proxy cleanly proves atomicity and connection-safety properties. |
| workers/tests/test_pg_reaper.py | Adds stub_scheduler_tick autouse fixture to isolate existing reaper tests, and three new wiring tests (leader runs it, standby skips it, ordering, connection discard on error). |
| docker/docker-compose.yaml | Adds worker-pg-scheduler service under pg-queue profile; correctly depends on rabbitmq (the fired task may still dispatch via Celery for non-PG-routed workflows). |
| workers/run-worker.sh | Adds pg-scheduler consumer role mapping (scheduler;scheduler) and registers it in PG_QUEUE_MEMBERS and WORKERS; consistent with the docker-compose environment variables. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Reaper as PgReaper (leader tick)
participant Recovery as recover_expired_barriers
participant Sched as dispatch_due_schedules
participant DB as PostgreSQL
participant Queue as pg_queue_message
Reaper->>Recovery: recover orphaned barriers
Recovery-->>Reaper: reclaimed count
Reaper->>Sched: dispatch_due_schedules(conn)
Sched->>DB: SELECT now() → base
Sched->>DB: "SELECT … FROM pg_periodic_schedule WHERE pg_owned AND enabled AND (next_run_at IS NULL OR <= base)"
DB-->>Sched: due rows list
Sched->>DB: COMMIT (read txn)
loop per due row
alt next_run_at IS NULL (freshly handed over)
Sched->>DB: "UPDATE next_run_at = nxt (baseline)"
Sched->>DB: COMMIT
Note over Sched: Not fired — no burst
else row is due
Sched->>Queue: INSERT pg_queue_message (scheduler queue)
Sched->>DB: "UPDATE last_run_at, next_run_at = nxt"
Sched->>DB: COMMIT (atomic: enqueue + advance)
Note over Sched: fired++
end
end
Sched-->>Reaper: fired count
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Reaper as PgReaper (leader tick)
participant Recovery as recover_expired_barriers
participant Sched as dispatch_due_schedules
participant DB as PostgreSQL
participant Queue as pg_queue_message
Reaper->>Recovery: recover orphaned barriers
Recovery-->>Reaper: reclaimed count
Reaper->>Sched: dispatch_due_schedules(conn)
Sched->>DB: SELECT now() → base
Sched->>DB: "SELECT … FROM pg_periodic_schedule WHERE pg_owned AND enabled AND (next_run_at IS NULL OR <= base)"
DB-->>Sched: due rows list
Sched->>DB: COMMIT (read txn)
loop per due row
alt next_run_at IS NULL (freshly handed over)
Sched->>DB: "UPDATE next_run_at = nxt (baseline)"
Sched->>DB: COMMIT
Note over Sched: Not fired — no burst
else row is due
Sched->>Queue: INSERT pg_queue_message (scheduler queue)
Sched->>DB: "UPDATE last_run_at, next_run_at = nxt"
Sched->>DB: COMMIT (atomic: enqueue + advance)
Note over Sched: fired++
end
end
Sched-->>Reaper: fired count
Reviews (2): Last reviewed commit: "UN-3596 chore: remove superseded 9f-desi..." | Re-trigger Greptile
… fails _quiesce_invalid_cron used contextlib.suppress around the cursor block, so if the enabled=FALSE UPDATE raised, commit() was skipped and the connection was left in an aborted-transaction state — poisoning the NEXT row's INSERT (caught by the outer handler and mislogged as "failed to fire"). Wrap in try/except with conn.rollback() on failure so the connection is always clean for the next row. +test: a forced disable-UPDATE failure on a bad-cron row doesn't stop a following healthy row from firing. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Greptile round addressed —
|
e202ae0
into
feat/UN-3445-pg-queue-integration
|
What & why
The periodic-trigger half of the Celery Beat replacement. The leader-elected reaper becomes "the orchestrator" — besides barrier-orphan recovery it now fires due, PG-owned schedules onto the PG queue (the labs single-orchestrator model: recover + schedule in one loop). No Beat/RabbitMQ in the trigger path. Dark by default; never double-fires.
(The downstream execution is already PG-capable via the transport seam — this migrates only the trigger.)
How it works
pg_scheduler.py(new) —dispatch_due_schedules()scanspg_periodic_scheduleforpg_owned AND enabled AND (next_run_at IS NULL OR <= now()), and per row:next_run_at IS NULL→ record a baseline next time and don't fire (a freshly handed-over schedule fires at its next cron match, not immediately — matches Beat, avoids a burst on hand-over).scheduler.tasks.execute_pipeline_taskon the PGschedulerqueue and advancenext_run_atin one transaction (a crash between them can't re-fire).cronitercomputes next-run; all time comparisons use the DB clock.pg_ownedflag (migration 0009, defaultFalse) is the per-schedule switch. The scheduler fires only owned rows; default-false keeps the table inert until a schedule is handed over, and a schedule fires from exactly one side (whenpg_owned, its BeatPeriodicTaskis disabled by the ramp control — next slice) → never double-fires, fully reversible.pg-schedulerconsumer role (run-worker.sh + docker-compose, profile-gated) runs the fired task → existingexecute_pipeline_task_v2→ creates the execution + dispatches the workflow exactly as today.croniteradded to workers deps (already a backend dep).Non-regression
pg_owneddefaultsFalse, so the reaper fires nothing until rows are explicitly owned — recovery-only behaviour is unchanged, Beat keeps firing everything. No new env/flag (the data flag is the switch).Dev-test
tick()→ it fired a seededpg_owneddue schedule (1schedulermessage enqueued,last_runstamped,next_runadvanced).--checkclean; compose validates (0 PG services by default, 6 under--profile pg-queue); ruff/pre-commit clean.Out of scope (→ ②c)
The ramp control that flips
pg_ownedby percentage + disables the matching BeatPeriodicTaskatomically (reusing the existing Flipt mechanism), the one-time backfill of pre-existing schedules, and retiring Beat.Targets
feat/UN-3445-pg-queue-integration(notmain). Sub-task UN-3596.🤖 Generated with Claude Code