Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3596 [FEAT] PG Queue 9h-b — PG scheduler tick in the reaper/orchestrator loop (per-schedule ownership)#2081

Merged
muhammad-ali-e merged 5 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3596-FEAT_pg_scheduler_tickZipstack/unstract:feat/UN-3596-FEAT_pg_scheduler_tickCopy head branch name to clipboard
Jun 18, 2026
Merged

UN-3596 [FEAT] PG Queue 9h-b — PG scheduler tick in the reaper/orchestrator loop (per-schedule ownership)#2081
muhammad-ali-e merged 5 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3596-FEAT_pg_scheduler_tickZipstack/unstract:feat/UN-3596-FEAT_pg_scheduler_tickCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What & why

The periodic-trigger half of the Celery Beat replacement. The leader-elected reaper becomes "the orchestrator" — besides barrier-orphan recovery it now fires due, PG-owned schedules onto the PG queue (the labs single-orchestrator model: recover + schedule in one loop). No Beat/RabbitMQ in the trigger path. Dark by default; never double-fires.

(The downstream execution is already PG-capable via the transport seam — this migrates only the trigger.)

How it works

  • pg_scheduler.py (new) — dispatch_due_schedules() scans pg_periodic_schedule for pg_owned AND enabled AND (next_run_at IS NULL OR <= now()), and per row:
    • next_run_at IS NULL → record a baseline next time and don't fire (a freshly handed-over schedule fires at its next cron match, not immediately — matches Beat, avoids a burst on hand-over).
    • due → enqueue scheduler.tasks.execute_pipeline_task on the PG scheduler queue and advance next_run_at in one transaction (a crash between them can't re-fire).
    • a bad cron on one row is logged and skipped, never blocking the others. croniter computes next-run; all time comparisons use the DB clock.
  • pg_owned flag (migration 0009, default False) is the per-schedule switch. The scheduler fires only owned rows; default-false keeps the table inert until a schedule is handed over, and a schedule fires from exactly one side (when pg_owned, its Beat PeriodicTask is disabled by the ramp control — next slice) → never double-fires, fully reversible.
  • reaper runs the scheduler after recovery (a scheduler error can't starve the recovery net).
  • pg-scheduler consumer role (run-worker.sh + docker-compose, profile-gated) runs the fired task → existing execute_pipeline_task_v2 → creates the execution + dispatches the workflow exactly as today.
  • croniter added to workers deps (already a backend dep).

Non-regression

pg_owned defaults False, so the reaper fires nothing until rows are explicitly owned — recovery-only behaviour is unchanged, Beat keeps firing everything. No new env/flag (the data flag is the switch).

Dev-test

  • Live against the dev DB: drove the reaper's leader tick() → it fired a seeded pg_owned due schedule (1 scheduler message enqueued, last_run stamped, next_run advanced).
  • 10 scheduler tests (real-PG: due / baseline-no-fire / not-owned / disabled / not-yet-due / bad-cron) + 3 reaper-wiring tests (leader runs it, standby doesn't, runs after recovery). Full reaper suite (71 tests) green.
  • Migration applied + --check clean; compose validates (0 PG services by default, 6 under --profile pg-queue); ruff/pre-commit clean.

Out of scope (→ ②c)

The ramp control that flips pg_owned by percentage + disables the matching Beat PeriodicTask atomically (reusing the existing Flipt mechanism), the one-time backfill of pre-existing schedules, and retiring Beat.

Targets feat/UN-3445-pg-queue-integration (not main). Sub-task UN-3596.

🤖 Generated with Claude Code

…trator loop (per-schedule ownership)

Adds the periodic-trigger half of the orchestrator: the leader-elected reaper
now also fires due, PG-owned schedules onto the PG queue — the Celery Beat
replacement — without Beat/RabbitMQ in the trigger path. Dark by default;
never double-fires.

- pg_scheduler.py (new): dispatch_due_schedules() scans pg_periodic_schedule for
  pg_owned + enabled + due rows, enqueues scheduler.tasks.execute_pipeline_task
  on the PG `scheduler` queue AND advances next_run_at in ONE transaction (a
  crash between can't re-fire). A NULL next_run_at records a baseline and does
  NOT fire (no burst when a schedule is handed over; matches Beat). A bad cron on
  one row is logged and skipped without blocking the others. croniter computes
  next-run; all time comparisons use the DB clock.
- pg_queue/models.py + migration 0009: pg_owned flag (default False = Beat owns
  it; the PG scheduler fires only owned rows) + due index (pg_owned, enabled,
  next_run_at). Default-false keeps the table inert until a schedule is handed
  over, and a schedule fires from exactly one side — never both.
- reaper.py: the leader tick runs the scheduler AFTER recovery (a scheduler error
  can't starve the recovery net).
- workers deps: add croniter (already a backend dep).
- run-worker.sh + docker-compose: pg-scheduler consumer role + service
  (profile-gated) that runs the fired execute_pipeline_task.

Out of scope (next slice ②c): the ramp control that flips pg_owned by percentage
+ disables the matching Beat PeriodicTask atomically (reusing the existing Flipt
mechanism), the one-time backfill, and retiring Beat.

Non-regression: pg_owned defaults False, so the reaper fires nothing until rows
are explicitly owned; recovery-only behaviour is unchanged. Tests: 10 scheduler
(real-PG) + 3 reaper-wiring; full reaper suite kept green via a scheduler stub.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d09fb828-2f5a-4209-bdb0-38d9debdaccc

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/UN-3596-FEAT_pg_scheduler_tick

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR Review — PG Scheduler Tick (UN-3596)

Reviewed with the PR Review Toolkit (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier). Overall this is a careful, well-documented change — the inlined INSERT matches the canonical PgQueueClient.send shape, the atomic enqueue+advance is correct, leader-gating and post-recovery ordering are right, and the dark-by-default pg_owned rollout is sound.

The findings below are concentrated in error isolation (the per-row loop), one correctness invariant that is not yet enforced (no-double-fire depends on an unbuilt ramp control), and some type/test hardening. Severity is tagged per inline comment.

Priority: 🔴 the per-row try/except (else one bad schedule drops every other due fire and poisons the connection) → 🟠 bad-cron wedge + the unenforced no-double-fire invariant → 🟡 type contracts, SELECT rollback, test gaps, SQL dedup → 🔵 comment-rot nits.

Inline comments follow.

pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
)
with conn.cursor() as cur:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Critical — per-row fire has no try/except; one bad row aborts the whole batch and poisons the connection.

The fire path (INSERT + UPDATE + conn.commit(), lines 147–167) runs with no exception handling. If any statement or the commit fails (serialization failure, check-constraint violation, transient socket drop, oversized payload), the exception propagates out of dispatch_due_schedules immediately:

  • All remaining due rows in due are dropped this cycle — rows after the failing one never fire (they re-fire next tick, but the batch is non-isolated).
  • The connection is left in an aborted-transaction state (InFailedSqlTransaction on every subsequent statement) — there is no conn.rollback() anywhere in this function. The owned-conn path is rescued by _discard_owned_sweep_conn(), but an injected conn (the documented caller-supplied path, and how the real-PG tests drive it) is handed back poisoned.

The docstring (lines 85–87) already promises per-row isolation, but that only covers the cron-parse step, not the DB step. The sibling recover_expired_barriers does exactly the right thing: per-row try/exceptconn.rollback()logger.exception(...)continue.

Fix: wrap the per-row DB work (both the fire branch here and the baseline UPDATE at lines 124–139) in try/except that rolls back and continues:

try:
    with conn.cursor() as cur:
        cur.execute("INSERT INTO pg_queue_message ...", (...))
        cur.execute("UPDATE pg_periodic_schedule SET last_run_at=%s, next_run_at=%s WHERE pipeline_id=%s", (...))
    conn.commit()
except Exception:
    with contextlib.suppress(Exception):
        conn.rollback()
    logger.exception("PG scheduler: failed to fire pipeline %s — leaving for next tick", pipeline_id)
    continue
fired += 1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4c25ae5 — per-row fire (INSERT+UPDATE+commit) and the baseline UPDATE are now wrapped in try/except → conn.rollback() + logger.exception + continue, exactly like recover_expired_barriers. A row-level failure no longer poisons the (injected or owned) connection or drops the rest of the batch. +atomicity test forcing the advance UPDATE to fail post-INSERT and asserting the enqueue rolled back.

) in due:
try:
nxt = compute_next_run(cron_string, base)
except Exception:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High — a permanently-invalid cron wedges the row and emits a full traceback every tick, forever.

When compute_next_run raises (croniter raises at construction for empty/whitespace/malformed crons, and the model allows cron_string = ""), this branch logs and continues before touching next_run_at. Because the row is selected on next_run_at IS NULL OR next_run_at <= now() and its next_run_at is never advanced, the same row is re-selected on every tick. At the default ~5s reaper interval that is a logger.exception (full traceback to Sentry) every ~5s per bad row, indefinitely — burying real errors and re-parsing the bad cron each cycle. The schedule also silently never fires, with no durable surfacing to an operator.

Fix: make the bad-cron state self-quiescing and actionable — disable the row (or push next_run_at far out) so it stops being re-selected, and log once at the transition:

except Exception:
    logger.exception("PG scheduler: invalid cron %r for pipeline %s — disabling row", cron_string, pipeline_id)
    with contextlib.suppress(Exception):
        with conn.cursor() as cur:
            cur.execute("UPDATE pg_periodic_schedule SET enabled = FALSE WHERE pipeline_id = %s", (pipeline_id,))
        conn.commit()
    continue

If auto-disable is undesirable by product policy, at minimum dedupe the logging (log at error only on first observation) — re-erroring every 5s is not acceptable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4c25ae5_quiesce_invalid_cron now sets enabled = FALSE on the bad row (+ logs once at the transition), so it stops being re-selected and stops re-emitting a traceback every tick. +test asserting the bad-cron row ends up disabled while the good row still fires.

its ``next_run_at``, a due row enqueues + advances atomically. A bad cron on
one row is logged and skipped without blocking the others.
"""
with conn.cursor() as cur:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium — the read step (SELECT now() + due scan) has no rollback on error, deviating from the sibling pattern.

The directly-analogous recover_expired_barriers wraps its read in try/except: conn.rollback(); raise precisely so the connection isn't left in an aborted-txn state. Here, if the SELECT fails mid-read the exception propagates with the transaction still open. As with the fire path, the owned-conn case is rescued by _discard_owned_sweep_conn() but the injected-conn case is left poisoned.

Fix — mirror recover_expired_barriers:

try:
    with conn.cursor() as cur:
        cur.execute("SELECT now()")
        base = cur.fetchone()[0]
        cur.execute("SELECT ... WHERE ...", (base,))
        due = cur.fetchall()
    conn.commit()
except Exception:
    with contextlib.suppress(Exception):
        conn.rollback()
    raise

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4c25ae5 — the read step (SELECT now() + due scan) now wraps try/except: conn.rollback(); raise, mirroring recover_expired_barriers, so the connection is never handed back in an aborted-txn state.


Two correctness properties:

- **Never double-fires.** A row fires from exactly one side: ``pg_owned`` rows

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High — the "Never double-fires" property is stated as already-true but currently has no enforcement.

The guarantee depends on the Beat PeriodicTask being disabled for pg_owned rows — done "by the ramp control, next slice", a component that does not exist in the repo yet (confirmed: nothing sets pg_owned=True and no ramp control exists). Until it lands, a row manually flipped to pg_owned=True while its Beat PeriodicTask is still enabled will double-fire. The only thing actually preventing double-fires today is pg_owned defaulting to False (nothing is owned).

This is a cross-table invariant, so a CheckConstraint can't express it. Suggestions: (1) soften this docstring + the models.py comment to state the guarantee is conditional on the ramp control disabling Beat and that, pre-ramp, safety rests on pg_owned=False; (2) before/with the ramp control, add a reconciliation check or integration test asserting no pg_owned=True row has a correspondingly-enabled PeriodicTask.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4c25ae5 — softened both the docstring and the models.py comment: the no-double-fire guarantee is now stated as CONDITIONAL on the ②c ramp control disabling the matching Beat PeriodicTask, and that until it lands safety rests on pg_owned defaulting to False (a manual flip while the PeriodicTask is still enabled would double-fire). An integration check that no pg_owned row has an enabled PeriodicTask will land with the ramp control.

organization_id: str,
pipeline_id: object,
pipeline_name: str,
) -> dict:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium — type contract is weaker than what the code already has in hand.

  • -> dict discards a type that exists: this returns to_payload(...)'s TaskPayload (a TypedDict). The rest of the package returns typed rows everywhere (PgQueueClient.read -> list[QueueMessage], PgReaper.tick -> TickOutcome); this is the one place that drops it. Annotate -> TaskPayload.
  • workflow_id: object / pipeline_id: object (lines 53, 55) say "literally anything," the opposite of the real contract. At runtime these are uuid.UUID (psycopg2) or str (tests). Use str | uuid.UUID (and str | uuid.UUID | None for workflow_id).
  • Minor asymmetry: workflow_id is truthiness-guarded (line 66) but pipeline_id is unconditionally str(pipeline_id) (line 70) → would render "None" if ever NULL. Safe today (PK), but consider asserting non-None to make the asymmetry intentional.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4c25ae5_build_trigger_payload -> TaskPayload; workflow_id: str | uuid.UUID | None, pipeline_id: str | uuid.UUID. (pipeline_id is the PK so never None; left it unconditional str() since the NamedTuple types it non-optional.)

Comment thread backend/pg_queue/models.py Outdated
# Owned by the scheduler tick (next slice); NULL until then.
# Per-schedule rollout switch. The PG scheduler fires a row ONLY when this is
# True; when a schedule is pg_owned its Celery Beat PeriodicTask is disabled
# (done by the ramp control — next slice) so it fires from exactly one side,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 Low (comment-rot) — "done by the ramp control — next slice".

Two rot risks: (1) it states the Beat-disable as already-arranged, but that component doesn't exist yet (see the no-double-fire comment on pg_scheduler.py); (2) "next slice" is a transient delivery-plan artifact with no anchor once merged to main. Replace "next slice" with a stable reference (the ramp-control ticket, e.g. UN-xxxx, or "a later change") and make the dependency explicit so the absence of enforcement reads as intentional.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4c25ae5 — dropped 'next slice' and the already-arranged phrasing; the comment now states the dependency explicitly (the ②c ramp control must disable Beat atomically) and that pre-ramp safety rests on the False default.

Comment thread workers/tests/test_pg_scheduler.py Outdated

fired = dispatch_due_schedules(conn)

assert fired >= 1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium (test rigor) — assertions weak enough to pass under the exact regressions this module guards against.

  • assert fired >= 1 (line 116): with a single seeded row this also passes if the row fired twice — the very double-fire bug the module claims to prevent. Tighten to == 1.
  • assert next_run > past (line 124): only checks monotonicity, not correctness — passes even if the next run were computed wrong (e.g. +1 minute instead of next 09:00). Assert equality against the cron-computed value.
  • test_null_next_run_baselines_without_firing should also assert dispatch_due_schedules(conn) == 0 to pin "baseline counts as not-fired."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4c25ae5fired == 1 (catches a double-fire on a single seeded row); next_run asserted at the cron's 09:00 match (hour/minute/second), not just monotonic; the baseline test now asserts dispatch_due_schedules(conn) == 0.



class TestDispatchDueSchedules:
def test_due_owned_row_fires_and_advances(self, clean):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium (test gaps) — the two headline correctness properties and the most-likely prod bug are untested.

  1. Atomicity / no-re-fire (highest value): every test hits the happy commit path. Add a real-PG test that forces the advance UPDATE to fail after the INSERT, expects the exception, then asserts _queued_messages(conn) == [] and next_run_at unchanged — proving the INSERT rolled back with the UPDATE (defends "a crash between them can't re-fire").
  2. Timezone: compute_next_run's unit tests use naive datetimes, but production base is tz-aware (SELECT now()). Add a pure test with an aware base asserting the result is aware+correct, and in test_due_owned_row_fires_and_advances assert the persisted next_run_at equals the cron-computed value. This is the single most likely place for a real prod bug.
  3. Multi-row: no test that two due, valid, owned rows both fire (fired == 2, two distinct messages). This is the normal production case and the per-row-transaction design specifically supports it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4c25ae5 — added: (1) atomicity test — a conn wrapper fails the advance UPDATE after the INSERT; asserts no message persisted AND next_run_at unchanged (INSERT rolled back with the UPDATE); (2) tz-aware compute_next_run test (aware base → aware+correct result); (3) multi-row test (two due owned rows → fired == 2, two distinct messages).

# dummy or barrier-only connections. Tests that assert the wiring opt in via the
# returned mock.
@pytest.fixture(autouse=True)
def stub_scheduler_tick(monkeypatch):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium (test gap) — the autouse stub is reasonable, but it hides the scheduler error-isolation wiring.

Stubbing dispatch_due_schedules module-wide is the right call for the leadership/recovery/connection tests. But nothing covers the try/except at reaper.py:495-499 for the scheduler call: test_failed_sweep_discards_owned_conn only exercises the recover_expired_barriers path. If someone deleted the try/except around the scheduler call (or dropped the _discard_owned_sweep_conn()), no test would fail.

Fix (in TestSchedulerTick): set stub_scheduler_tick.side_effect = psycopg2.OperationalError(...) with recover_expired_barriers patched to return [], assert reaper.tick() raises and the owned conn was discarded.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4c25ae5 — added test_scheduler_error_discards_owned_conn in TestSchedulerTick: stub_scheduler_tick.side_effect = psycopg2.OperationalError(...) with recovery patched → asserts tick() raises AND the owned sweep conn was discarded (set to None).

Comment thread workers/pyproject.toml
# PG Queue — direct Postgres access for the bespoke SKIP LOCKED queue
# (reuses the backend's DB_* connection: PgBouncer in cloud, direct in OSS)
"psycopg2-binary==2.9.9", # matches backend; first direct-DB worker capability
"croniter>=3.0.3", # cron next-run computation for the PG scheduler tick (matches backend)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 Info — croniter lock drift. Both backend and workers pin croniter>=3.0.3, but the workers uv.lock resolved to 6.2.2 while the backend uv.lock is at 6.0.0. Same constraint and major version, so behavior is consistent — flagging only so a future lock refresh aligns them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — lock drift noted (workers resolved croniter 6.2.2 vs backend 6.0.0; same >=3.0.3 constraint + major, so behaviour is consistent). Left as-is for this PR; flagging for a future aligned lock refresh.

muhammad-ali-e and others added 2 commits June 18, 2026 23:12
…, typed rows, shared INSERT SQL, stronger tests

- [Critical] per-row fire (INSERT+UPDATE+commit) now wrapped in try/except →
  rollback + log + continue, so one bad row can't poison the connection or drop
  the rest of the batch (mirrors recover_expired_barriers). Baseline UPDATE too.
- [High] invalid cron now disables the row (enabled=FALSE) + logs once, instead
  of re-selecting it and emitting a traceback every ~5s tick forever.
- [Med] read step (SELECT now() + due scan) wraps rollback + re-raise so the
  conn isn't handed back in an aborted-txn state.
- [High] softened the "never double-fires" docstring + models.py comment: the
  guarantee is CONDITIONAL on the ②c ramp control disabling Beat; pre-ramp,
  safety rests on pg_owned defaulting to False.
- [Med] _build_trigger_payload -> TaskPayload; workflow_id/pipeline_id typed
  str | uuid.UUID (| None); _DueSchedule NamedTuple binds SELECT columns to
  names at one site (no silent misassign on a reorder).
- [Med] extracted INSERT_MESSAGE_SQL constant in client.py; send() and the
  scheduler share it (no verbatim SQL duplication).
- [Low] comment fixes: reaper tick (ordering not isolation), execute_pipeline_task
  blanks vs Beat populating execution_action, models.py drop "next slice".
- tests: fired == 1 (not >= 1, catches double-fire); next_run asserted at the
  cron's 09:00 match; baseline asserts == 0; +tz-aware next-run; +multi-row
  (fired == 2); +atomicity (advance UPDATE fails post-INSERT → enqueue rolls
  back, next_run unchanged); +reaper scheduler-error-discards-owned-conn. 75 green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…oping doc)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round 1 addressed — 4c25ae514

Thanks for the 6-agent pass. All 14 addressed (13 fixed, 1 info-ack):

Critical / High — error isolation + double-fire honesty

  • Per-row fire had no try/except → now rollback + log + continue per row (both fire + baseline UPDATE), mirroring recover_expired_barriers; no more connection-poisoning / batch-drop. +atomicity test.
  • Invalid cron wedged the row + tracebacked every tick → bad cron now disables the row (enabled=FALSE) + logs once. +test.
  • Read step had no rollbacktry/except: rollback; raise.
  • "Never double-fires" overstated → softened to CONDITIONAL on the ②c ramp control; pre-ramp safety rests on pg_owned=False default (docstring + models.py).

Medium — types / structure / tests

  • _build_trigger_payload -> TaskPayload; workflow_id/pipeline_id typed str | uuid.UUID.
  • _DueSchedule NamedTuple binds SELECT columns at one site (no silent misassign on reorder).
  • INSERT_MESSAGE_SQL extracted in client.py; send() + scheduler share it.
  • Tests: fired == 1 (catches double-fire), next_run asserted at the cron match, baseline == 0, +tz-aware, +multi-row (fired == 2), +atomicity (advance fails → enqueue rolls back), +reaper scheduler-error-discards-conn.

Low / Info

  • Comment fixes: reaper ordering-not-isolation, blanks-vs-Beat-execution_action, models.py drop "next slice".
  • croniter lock drift (6.2.2 vs backend 6.0.0): acknowledged, left for a future aligned refresh.

75 worker tests green; ruff/ruff-format/pre-commit clean. (Also dropped an accidentally-committed 9f-design.md in 83be4493c.)

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 18, 2026 17:49
@greptile-apps

greptile-apps Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces the periodic-trigger half of the PG-queue Beat replacement: a pg_scheduler.py module that fires due, pg_owned schedules onto the PG queue, wired into the leader-elected reaper loop as its second job (after orphan-barrier recovery). The pg_owned column defaults to False, so the feature is fully dark until a schedule is explicitly handed over, keeping all existing Celery Beat behaviour unchanged.

  • pg_scheduler.dispatch_due_schedules scans pg_periodic_schedule for owned, enabled, due rows; handles each in its own transaction (enqueue + next_run_at advance atomically); baselines NULL next_run_at rows without firing; auto-disables bad-cron rows and rolls back cleanly to avoid connection poisoning.
  • Migration 0009 adds the pg_owned BooleanField (default False) and rebuilds the index to cover (pg_owned, enabled, next_run_at).
  • INSERT_MESSAGE_SQL is extracted from PgQueueClient.send() into a shared constant so the scheduler can reuse the exact enqueue contract inside its own transaction without going through send() (which auto-commits).

Confidence Score: 5/5

Safe to merge into the feature branch — the change is dark by default and cannot affect any production schedule until rows are explicitly flipped to pg_owned=True.

The scheduler fires nothing until a row is manually opted in (pg_owned defaults to False), so the change is behaviorally inert against any existing data. The core correctness properties — atomic enqueue+advance, per-row isolation, connection-clean rollback on bad-cron quiesce, no burst on hand-over — are all verified by the new test suite (10 scheduler tests + 3 reaper-wiring tests). The previously flagged connection-poisoning issue in _quiesce_invalid_cron was addressed in 11258f1 and is re-proved by test_quiesce_failure_does_not_poison_next_row. Migration 0009 is additive and non-breaking. No correctness or safety issues were found.

No files require special attention.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/pg_scheduler.py New scheduler tick module; per-row isolation, atomic enqueue+advance, bad-cron quiesce with rollback guard, and correct timezone-aware time handling throughout. No issues found.
workers/queue_backend/pg_queue/reaper.py Adds dispatch_due_schedules call after recover_expired_barriers, ordered correctly; connection discard on failure mirrors the existing recovery posture.
workers/queue_backend/pg_queue/client.py Extracts INSERT_MESSAGE_SQL as a shared constant; send() continues to append RETURNING msg_id; the scheduler uses the constant directly inside its own transaction. Clean refactor with no behaviour change.
backend/pg_queue/migrations/0009_remove_pgperiodicschedule_pg_periodic_schedule_due_idx_and_more.py Drops the old two-column index, adds pg_owned (default False, non-breaking), and creates the new three-column index in the correct order for the WHERE clause used by the scheduler.
workers/tests/test_pg_scheduler.py Comprehensive test coverage: pure helper tests (cron arithmetic, tz-awareness, payload shape) plus real-PG integration tests for all dispatch paths. _FailingConn proxy cleanly proves atomicity and connection-safety properties.
workers/tests/test_pg_reaper.py Adds stub_scheduler_tick autouse fixture to isolate existing reaper tests, and three new wiring tests (leader runs it, standby skips it, ordering, connection discard on error).
docker/docker-compose.yaml Adds worker-pg-scheduler service under pg-queue profile; correctly depends on rabbitmq (the fired task may still dispatch via Celery for non-PG-routed workflows).
workers/run-worker.sh Adds pg-scheduler consumer role mapping (scheduler;scheduler) and registers it in PG_QUEUE_MEMBERS and WORKERS; consistent with the docker-compose environment variables.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Reaper as PgReaper (leader tick)
    participant Recovery as recover_expired_barriers
    participant Sched as dispatch_due_schedules
    participant DB as PostgreSQL
    participant Queue as pg_queue_message

    Reaper->>Recovery: recover orphaned barriers
    Recovery-->>Reaper: reclaimed count

    Reaper->>Sched: dispatch_due_schedules(conn)
    Sched->>DB: SELECT now() → base
    Sched->>DB: "SELECT … FROM pg_periodic_schedule WHERE pg_owned AND enabled AND (next_run_at IS NULL OR <= base)"
    DB-->>Sched: due rows list
    Sched->>DB: COMMIT (read txn)

    loop per due row
        alt next_run_at IS NULL (freshly handed over)
            Sched->>DB: "UPDATE next_run_at = nxt (baseline)"
            Sched->>DB: COMMIT
            Note over Sched: Not fired — no burst
        else row is due
            Sched->>Queue: INSERT pg_queue_message (scheduler queue)
            Sched->>DB: "UPDATE last_run_at, next_run_at = nxt"
            Sched->>DB: COMMIT (atomic: enqueue + advance)
            Note over Sched: fired++
        end
    end

    Sched-->>Reaper: fired count
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Reaper as PgReaper (leader tick)
    participant Recovery as recover_expired_barriers
    participant Sched as dispatch_due_schedules
    participant DB as PostgreSQL
    participant Queue as pg_queue_message

    Reaper->>Recovery: recover orphaned barriers
    Recovery-->>Reaper: reclaimed count

    Reaper->>Sched: dispatch_due_schedules(conn)
    Sched->>DB: SELECT now() → base
    Sched->>DB: "SELECT … FROM pg_periodic_schedule WHERE pg_owned AND enabled AND (next_run_at IS NULL OR <= base)"
    DB-->>Sched: due rows list
    Sched->>DB: COMMIT (read txn)

    loop per due row
        alt next_run_at IS NULL (freshly handed over)
            Sched->>DB: "UPDATE next_run_at = nxt (baseline)"
            Sched->>DB: COMMIT
            Note over Sched: Not fired — no burst
        else row is due
            Sched->>Queue: INSERT pg_queue_message (scheduler queue)
            Sched->>DB: "UPDATE last_run_at, next_run_at = nxt"
            Sched->>DB: COMMIT (atomic: enqueue + advance)
            Note over Sched: fired++
        end
    end

    Sched-->>Reaper: fired count
Loading

Reviews (2): Last reviewed commit: "UN-3596 chore: remove superseded 9f-desi..." | Re-trigger Greptile

Comment thread workers/queue_backend/pg_queue/pg_scheduler.py Outdated
muhammad-ali-e and others added 2 commits June 18, 2026 23:30
… fails

_quiesce_invalid_cron used contextlib.suppress around the cursor block, so if
the enabled=FALSE UPDATE raised, commit() was skipped and the connection was
left in an aborted-transaction state — poisoning the NEXT row's INSERT (caught
by the outer handler and mislogged as "failed to fire"). Wrap in try/except with
conn.rollback() on failure so the connection is always clean for the next row.
+test: a forced disable-UPDATE failure on a bad-cron row doesn't stop a
following healthy row from firing.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Greptile round addressed — 11258f1a9

Greptile passed (no Critical/High). The one P1dirty connection after a suppressed disable-UPDATE failure in _quiesce_invalid_cronfixed: replaced contextlib.suppress around the cursor block with try/exceptconn.rollback(), so a failed bad-cron disable can't poison the next row's INSERT (it was the same poison-the-next-row class the round-1 per-row isolation fixed for the fire path; this was the one remaining suppressed path). +regression test proving a following healthy row still fires.

14 scheduler tests green; ruff/pre-commit clean.

@muhammad-ali-e muhammad-ali-e merged commit e202ae0 into feat/UN-3445-pg-queue-integration Jun 18, 2026
5 checks passed
@muhammad-ali-e muhammad-ali-e deleted the feat/UN-3596-FEAT_pg_scheduler_tick branch June 18, 2026 18:04
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.