Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3581 [FEAT] PG Queue 9h-a — pg_periodic_schedule mirror table + dual-write (inert)#2080

Merged
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3581-FEAT_pg_periodic_schedule_mirrorZipstack/unstract:feat/UN-3581-FEAT_pg_periodic_schedule_mirrorCopy head branch name to clipboard
Jun 18, 2026
Merged

UN-3581 [FEAT] PG Queue 9h-a — pg_periodic_schedule mirror table + dual-write (inert)#2080
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
feat/UN-3581-FEAT_pg_periodic_schedule_mirrorZipstack/unstract:feat/UN-3581-FEAT_pg_periodic_schedule_mirrorCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What & why

First inert slice toward replacing Celery Beat with a Postgres-backed periodic scheduler. The downstream execution is already PG-capable via the transport seam — this track migrates only the trigger (Beat → a leader-elected scheduler loop, to be folded into the reaper/orchestrator).

This slice only mirrors each scheduled pipeline's cron definition into a new table. Nothing reads it yet, so behaviour is unchanged — it's the inert primitive the next slice builds on.

Changes

  • backend/pg_queue/models.py — new PgPeriodicSchedule (one row per scheduled pipeline): pipeline_id PK, organization_id, workflow_id, pipeline_name, cron_string, enabled. last_run_at / next_run_at are left NULL here — the scheduler tick owns all cron computation in the next slice. Index on (enabled, next_run_at) for the future "due schedules" query.
  • migration 0008 — generated; makemigrations --check clean.
  • backend/scheduler/tasks.py — dual-write the mirror from the four schedule choke-points that already manage the django_celery_beat PeriodicTask: create/update (upsert, fields from the documented task_args layout), pause/resume (toggle enabled), delete. Toggles are placed right after task.save() so a downstream pipeline-status failure can't desync the mirror. Every mirror write is best-effort (try/except + log) — a mirror failure can never break the existing Beat scheduling path.
  • tests — 6 DB-free unit tests: field extraction, enable/disable toggle, delete, delete-when-PeriodicTask-missing, and the best-effort-swallow guarantee.

Non-regression

Nothing reads the new table; the PeriodicTask and the celery-beat container are untouched. Best-effort writes isolate the mirror from the Beat path.

Dev-test

  • Migration applied to a dev DB; makemigrations --check clean.
  • Full lifecycle exercised against the real DB: CREATE → DISABLE → ENABLE → DELETE all tracked correctly on the mirror row.
  • 6 unit tests green; ruff / ruff-format / pre-commit clean.

Targets feat/UN-3445-pg-queue-integration (not main). Sub-task UN-3581.

🤖 Generated with Claude Code

…al-write (inert)

First inert slice toward replacing Celery Beat with a PG-backed periodic
scheduler (to be folded into the leader-elected reaper/orchestrator loop). This
slice only mirrors each scheduled pipeline's cron definition into a new Postgres
table; nothing reads it yet, so behaviour is unchanged.

- backend/pg_queue/models.py: new PgPeriodicSchedule (pipeline_id PK, org_id,
  workflow_id, pipeline_name, cron_string, enabled; last_run_at/next_run_at left
  NULL — the scheduler tick owns all cron computation in the next slice). Index
  on (enabled, next_run_at) for the future "due schedules" query.
- migration 0008 (generated; makemigrations --check clean).
- backend/scheduler/tasks.py: dual-write the mirror from the four schedule
  choke-points (create/update, pause, resume, delete) that already manage the
  django_celery_beat PeriodicTask. Toggles are placed right after task.save() so
  a downstream pipeline-status failure can't desync the mirror. Every mirror
  write is best-effort (try/except + log) — a mirror failure can NEVER break the
  existing Beat scheduling path.
- tests: 6 DB-free unit tests (field extraction, enable/disable, delete,
  delete-when-PeriodicTask-missing, best-effort swallow).

Inert / non-regressive: nothing reads the table; the PeriodicTask + celery-beat
container are untouched.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3c992978-063c-421e-a99f-18e6162e3a25

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/UN-3581-FEAT_pg_periodic_schedule_mirror

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR Review — pg_periodic_schedule inert mirror

Ran the PR Review Toolkit (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier) against this single-commit PR. The inert/best-effort guarantee is correctly implemented and well-tested; migration matches the model. Findings below are scoped accordingly — most are Low/Medium because nothing reads the table yet, but two should be addressed before the table goes live (next slice).

Prioritized summary

# Severity Finding Location
1 High pipeline_name mirrors the synthetic label "Pipeline job-<uuid>", not a real pipeline name tasks.py:118
2 Medium enable/disable/delete use filter().update()/.delete() → silent no-op + asymmetric divergence vs the update_or_create upsert; mirror can never self-heal a missing row tasks.py:56
3 Medium updated_at (auto_now) is NOT advanced by the .update() enable/disable path models.py:264
4 Medium Positional task_args coupling silently degrades to placeholder data; the swallowed IndexError hides arg drift tasks.py:116
5 Medium cron_string is unvalidated free text, unlike every sibling model's domain CheckConstraint models.py:257
6 Medium Test gaps: set_enabled/delete failure-swallowing, short task_args, enabled=False, and no DB-backed/makemigrations --check to catch schema drift behind the mock tests file
7 Low Helper params typed Any; upstream already guarantees str / `str None`
8 Low Test fixture _NAME="Nightly ETL" pins a contract production never produces tests:18
9 Nit Index name pg_sched_due_idx drops the full-table prefix used by siblings models.py:270

Green (confirmed correct): the swallow-and-log best-effort pattern, ordering so a mirror failure can't roll back the Beat write, delete cleaning the mirror outside the DoesNotExist try/except, the migration↔model match, and the no-NULL-text convention. The code-simplifier found no churn worth making.

Inline comments follow.

Comment thread backend/scheduler/tasks.py Outdated
pipeline_id=task_name,
workflow_id=task_args[0] if len(task_args) > 0 else None,
organization_id=task_args[1] if len(task_args) > 1 else "",
pipeline_name=task_args[6] if len(task_args) > 6 else "",

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High — pipeline_name stores the synthetic job label, not the pipeline name.

Flagged independently by the code-review, silent-failure, comment, and test agents. task_args[6] on this path is the name built in SchedulerHelper.add_or_update_job as f"Pipeline job-{pipeline.id}" (helper.py:68 → helper.py:60). So this column is always persisted as "Pipeline job-<uuid>" — effectively a prefixed duplicate of pipeline_id, never the user-facing name the field name implies. (The other producer, piepline_api_execution_views.py, does pass the real pipeline.pipeline_name at index 6, but it dispatches via celery_app.send_task and never mirrors — so the mirror only ever sees the synthetic label.)

The broad except can't catch this (it's a valid string write) and the unit test feeds _NAME="Nightly ETL", hiding it.

Fix: source the real name from the Pipeline object and pass it explicitly, OR drop pipeline_name from this inert slice rather than persisting a misleading value. At minimum, correct the layout comment and the test fixture so the false contract isn't pinned.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b856507 — the mirror upsert moved to SchedulerHelper._schedule_task_job, which holds the Pipeline object and now passes pipeline_name=pipeline.pipeline_name (the real user-facing name). create_or_update_periodic_task no longer mirrors / parses args. Added a helper-wiring test asserting the real name flows (not the synthetic Pipeline job-<id> label).

Comment thread backend/scheduler/tasks.py Outdated

def _mirror_periodic_schedule_set_enabled(pipeline_id: Any, enabled: bool) -> None:
try:
PgPeriodicSchedule.objects.filter(pipeline_id=pipeline_id).update(enabled=enabled)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — silent no-op desync; asymmetric with the upsert path.

filter(pipeline_id=...).update(...) (and the .delete() sibling at line 66) silently match zero rows if the mirror row doesn't exist — e.g. a pipeline scheduled before this code shipped, or one whose update_or_create upsert previously failed and was swallowed. The Beat PeriodicTask stays correct, but the mirror diverges with no signal, and because enable/disable only updates-if-present it can never self-heal: a pipeline that is only ever paused/resumed is never backfilled. The future scheduler that reads enabled+next_run_at would then silently never fire (or fire a paused) pipeline.

This asymmetry (update_or_create on create vs filter().update() here) is the mechanism. Acceptable for the inert slice, but worth hardening before the table is read.

Fix: standardize the mutating paths on update_or_create semantics so every mirror mutation converges regardless of prior row existence, OR check the .update()/.delete() rowcount and logger.info when it's 0 so the gap is observable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b856507_mirror_periodic_schedule_set_enabled now checks the .update() rowcount and logger.infos when it matches 0 (a pre-existing/unmirrored schedule), so the gap is observable. Full self-heal/backfill of pre-existing schedules lands with the scheduler that reads the table (②b) — agreed it's not needed while the table is inert. +test asserting the 0-row log.

last_run_at = models.DateTimeField(null=True, blank=True)
next_run_at = models.DateTimeField(null=True, blank=True)
created_at = models.DateTimeField(default=timezone.now)
updated_at = models.DateTimeField(auto_now=True)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — updated_at (auto_now=True) is not bumped by the enable/disable path.

Django's auto_now fires only on Model.save()/update_or_create, not on queryset .update(). _mirror_periodic_schedule_set_enabled (tasks.py:56) uses .filter().update(enabled=...), so pause/resume changes enabled without advancing updated_at. The upsert path bumps it correctly, making updated_at an unreliable "last changed" signal that silently misses enable/disable mutations.

Fix: add "updated_at": timezone.now() to the .update(...) call in _mirror_periodic_schedule_set_enabled, or add a one-line comment that updated_at intentionally tracks only definition upserts.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b856507.update(enabled=..., updated_at=timezone.now()); queryset .update() doesn't fire auto_now, so this keeps updated_at accurate on pause/resume. Dev-tested: updated_at advances on disable.

Comment thread backend/scheduler/tasks.py Outdated
# [workflow_id, organization_id, execution_action, "", pipeline_id, False, name].
_mirror_periodic_schedule_upsert(
pipeline_id=task_name,
workflow_id=task_args[0] if len(task_args) > 0 else None,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — positional task_args coupling silently degrades to placeholder data.

The task_args[N] if len(task_args) > N else <default> guards prevent an IndexError crash (good), but they convert a contract drift into silent placeholder data: if a future change reorders or shortens task_args (note the TODO: Remove unused args with a migration on this very contract), the mirror quietly stores organization_id="", workflow_id=None, etc., with no log. Worse, since this runs inside _mirror_periodic_schedule_upsert's broad except, even an off-by-one IndexError would be swallowed and silently drop the mirror row — and the short-list branches have zero test coverage.

Fix: create_or_update_periodic_task is called from _schedule_task_job, which already has workflow_id/organization_id/name as named locals (helper.py:41-44). Pass them as named params instead of re-parsing the serialized positional list. If positional parsing must stay, logger.warning when len(task_args) is shorter than expected.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b856507 — dropped the positional task_args[N] parsing entirely. The upsert is now driven from _schedule_task_job with named locals (workflow_id, organization_id, pipeline.pipeline_name, pipeline.pk), so a future task_args reorder/shorten can't silently degrade the mirror to placeholders.

organization_id = models.TextField(blank=True, default="")
workflow_id = models.UUIDField(null=True, blank=True)
pipeline_name = models.TextField(blank=True, default="")
cron_string = models.TextField()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — cron_string is unvalidated free text, against this file's own convention.

Every sibling model encodes its domain bounds as a DB constraint (PgQueueMessage.priority range, PgBatchDedup.batch_index >= 0, PgBarrierState.expires_at > created_at), yet cron_string is a bare TextField() with no validator and no length cap — despite the Beat path requiring exactly 5 whitespace-separated fields (cron_string.split() at tasks.py:85). A malformed value mirrors silently and only fails later in the future scheduler tick, far from the write site.

Fix: a cron CheckConstraint is impractical, so validate-at-write is the pragmatic fit: check the 5-field split at the helper boundary (where the cron is already in hand) before mirroring, and/or add a MaxLengthValidator/length cap. Don't over-engineer a regex into the schema.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The structural 5-field validation already exists upstream and runs before the mirror: create_or_update_periodic_task does minute, hour, dom, moy, dow = cron_string.split() (raises ValueError on a non-5-field value), and the mirror upsert now runs after that in the same helper call — so a malformed cron never reaches the table. Full cron semantics are validated by croniter in ②b (the reader/next-run computation), where a parse failure is actionable. Kept TextField per your 'don't over-engineer a regex into the schema' note. Added a comment to that effect.

# Must not raise, and the mirror is still cleaned.
tasks.delete_periodic_task(_PIPELINE_ID)

sched.objects.filter.return_value.delete.assert_called_once()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — test gaps (pr-test-analyzer).

The happy-path contract is well covered and follows the DB-free pg_queue/tests/test_producer.py convention, but several gaps could hide real bugs once the table goes live:

  1. Failure-swallowing is tested for the upsert only. Add tests that make sched.objects.filter(...).update / .delete raise and assert disable_task/enable_task/delete_periodic_task don't propagate and still call PipelineProcessor.update_pipeline — this is the central "never break Beat" guarantee for 2 of 3 helpers, currently unverified (tasks.py:54-71).
  2. Short task_args — every test uses the full 7-element list, so the else fallbacks (tasks.py:116-118) and the swallowed-IndexError path are never exercised. Call with []/a 2-element list and assert the defaults land.
  3. enabled=False propagation through create_or_update_periodic_task (real callers pass enabled=pipeline.active).
  4. Schema drift is invisible — the model is fully mocked, so a field-name mismatch between the upsert kwargs and the new model/migration passes green and fails only at runtime (then gets swallowed). Add one @pytest.mark.django_db round-trip, or a CI makemigrations --check gate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b856507: added (1) failure-swallow tests for disable/enable/delete asserting they don't propagate AND still call update_pipeline; (3) enabled=False upsert test; the 0-row-match log test. (2) short-task_args is now moot — the positional parsing was removed (named params from the helper). (4) schema-drift: makemigrations --check is clean and the real-DB lifecycle dev-test (CREATE→DISABLE→ENABLE→DELETE) exercises the actual model round-trip; a django_db unit round-trip needs the gated unit-backend rig group (postgres) which doesn't list scheduler/tests yet — noted as the same gap prior pg_queue PRs have.

Comment thread backend/scheduler/tasks.py Outdated
# ---------------------------------------------------------------------------


def _mirror_periodic_schedule_upsert(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low — helper params typed Any hide a known contract.

The upstream caller already guarantees these are strings: str(pipeline.pk), str(workflow_id), str(name), and organization_id from UserContext.get_organization_identifier() (helper.py:48-61). Tightening to pipeline_id: str, organization_id: str, workflow_id: str | None, pipeline_name: str (keep cron_string: str, enabled: bool) costs nothing and documents that the mirror receives stringified UUIDs the UUIDField then coerces.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b856507mirror_periodic_schedule_upsert params tightened to pipeline_id: str, organization_id: str, workflow_id: str | None, pipeline_name: str (+ cron_string: str, enabled: bool).

_PIPELINE_ID = "11111111-1111-1111-1111-111111111111"
_WORKFLOW_ID = "22222222-2222-2222-2222-222222222222"
_ORG = "org_abc"
_NAME = "Nightly ETL"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low — fixture pins a contract production never produces.

_NAME = "Nightly ETL" at index 6 (asserted as pipeline_name) masks the High finding on tasks.py:118 — the real value flowing through SchedulerHelper is "Pipeline job-<uuid>". Use a representative fixture, e.g. _NAME = f"Pipeline job-{_PIPELINE_ID}", so the test reflects reality rather than enshrining a false expectation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved by the High fix (b856507): the mirror now stores the real pipeline.pipeline_name, so a representative name in the fixture is now the correct expectation, not a false one. The new helper-wiring test explicitly passes the synthetic Pipeline job-<id> label as the PeriodicTask name and asserts the mirror still records the real pipeline name — pinning the right contract.

Comment thread backend/pg_queue/models.py Outdated
db_table = "pg_periodic_schedule"
indexes = [
# Drives the (future) "due schedules" dequeue in the scheduler tick.
models.Index(fields=["enabled", "next_run_at"], name="pg_sched_due_idx"),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit — index name drops the table prefix used by siblings.

Sibling indexes use the full table name as prefix (pg_queue_message_dequeue_idx, pg_barrier_expires_idx); this one abbreviates to pg_sched_due_idx. Cosmetic only — rename to pg_periodic_schedule_due_idx for consistency if you're already churning the migration, otherwise leave it (a rename costs a migration).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b856507 — renamed to pg_periodic_schedule_due_idx (matches the sibling pg_queue_message_dequeue_idx / pg_barrier_expires_idx convention). Migration 0008 regenerated in place (not yet merged, so no extra migration).

… toggles, tighten types

- [High] pipeline_name now sourced from the Pipeline object in
  SchedulerHelper._schedule_task_job (pipeline.pipeline_name), not task_args[6]
  — which carries the synthetic "Pipeline job-<id>" label, never the user name.
  The mirror upsert moves to the helper (clean named ids, no positional arg
  parsing); create_or_update_periodic_task is Beat-only again.
- [Med] _mirror_periodic_schedule_set_enabled now bumps updated_at explicitly
  (queryset .update() does not fire auto_now) and logs when it matches 0 rows
  (a pre-existing/unmirrored schedule — backfill lands in ②b).
- [Med] dropped the positional task_args[N] parsing entirely (the helper passes
  named fields), removing the silent-placeholder-on-contract-drift risk.
- [Low] tightened mirror helper param types (str / str | None) — the callers
  already pass stringified UUIDs.
- [Nit] index renamed pg_sched_due_idx -> pg_periodic_schedule_due_idx (sibling
  convention); migration 0008 regenerated (not yet merged).
- tests: cover the standalone upsert (+enabled=False, +failure-swallow), the
  helper wiring proving the real pipeline_name flows, disable/enable/delete
  failure-swallow + still-calls-update_pipeline, and the 0-row log. 11 tests.

cron_string validation: the existing 5-field split unpack in
create_or_update_periodic_task already gates the Beat path before the mirror
runs; full cron semantics are validated by croniter in ②b (the reader).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round 1 addressed — b85650794

Thanks for the thorough pass (6 agents). All 9 addressed:

High

  • pipeline_name was the synthetic label → mirror upsert moved into SchedulerHelper._schedule_task_job, which sources the real pipeline.pipeline_name + clean ids from the Pipeline object. create_or_update_periodic_task is Beat-only again (no arg parsing). +helper-wiring test.

Medium

  • enable/disable can't self-heal / silent 0-row.update() rowcount checked + logger.info on 0 match; backfill of pre-existing rows is a ②b concern (table is inert). +test.
  • updated_at not bumped by .update() → now update(enabled=…, updated_at=timezone.now()); dev-tested it advances.
  • positional task_args coupling → parsing removed entirely (named params from the helper).
  • cron unvalidated → the existing 5-field split() unpack gates the Beat path before the mirror runs (mirror is now after it in the same call); full cron semantics validated by croniter in ②b. Kept TextField per your "no regex in schema" note.
  • test gaps → added failure-swallow for disable/enable/delete (+ still-calls-update_pipeline), enabled=False, 0-row log. Short-task_args moot (parsing removed). Schema-drift: makemigrations --check clean + real-DB lifecycle dev-test.

Low / Nit

  • Mirror helper param types tightened to str / str | None.
  • Test fixture: the real-name expectation is now correct (was masking the High); the wiring test passes the synthetic label and asserts the real name is stored.
  • Index renamed pg_periodic_schedule_due_idx (sibling convention); 0008 regenerated in place.

Verification: 11 unit tests green; ruff/ruff-format/pre-commit clean; real-DB lifecycle (CREATE→DISABLE→ENABLE→DELETE) re-verified; migration applies + --check clean.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 18, 2026 16:19
@muhammad-ali-e muhammad-ali-e merged commit 87f9ede into feat/UN-3445-pg-queue-integration Jun 18, 2026
5 checks passed
@muhammad-ali-e muhammad-ali-e deleted the feat/UN-3581-FEAT_pg_periodic_schedule_mirror branch June 18, 2026 16:20
@greptile-apps

greptile-apps Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces the inert first slice of a Postgres-backed periodic scheduler: a new PgPeriodicSchedule model and migration that mirrors each pipeline's cron definition, plus best-effort dual-write hooks at all four schedule mutation points (create/update, pause, resume, delete). Nothing reads the new table; the existing Celery Beat path is fully preserved.

  • New PgPeriodicSchedule model (pg_queue/models.py + migration 0008) stores pipeline_id, organization_id, workflow_id, cron_string, and enabled; last_run_at/next_run_at are intentionally NULL until the scheduler tick lands in the next slice.
  • Dual-write in scheduler/tasks.py wraps every mirror mutation in try/except so a mirror failure can never affect the Beat scheduling path.
  • Six DB-free unit tests pin the field-extraction contract, enable/disable toggling, delete behaviour, and the best-effort-swallow guarantee.

Confidence Score: 4/5

Safe to merge into the feature branch; the Beat scheduling path is completely untouched and all mirror writes are isolated behind best-effort try/except blocks.

The dual-write logic and best-effort isolation are sound. One callsite in helper.py converts workflow_id with str() before passing it to a function that declares str | None — if workflow_id is ever None, str(None) becomes the string 'None', the or None guard inside the mirror function is bypassed, and Django's UUIDField rejects it, silently dropping the mirror row. A second smaller gap: delete_periodic_task only catches PeriodicTask.DoesNotExist, so any other exception from task.delete() would skip the mirror cleanup step despite the code comment saying cleanup runs regardless. Neither issue affects current runtime behaviour because the table is inert, but both will matter once the scheduler tick is wired up.

backend/scheduler/helper.py — the workflow_id=str(workflow_id) argument; backend/scheduler/tasks.py — the exception-handling scope around _mirror_periodic_schedule_delete.

Important Files Changed

Filename Overview
backend/pg_queue/migrations/0008_pgperiodicschedule.py Auto-generated migration creating pg_periodic_schedule; fields and composite index match the model exactly.
backend/pg_queue/models.py New PgPeriodicSchedule model is well-structured; nullable workflow_id and null=True, blank=True on timing fields are correct for the inert phase.
backend/scheduler/helper.py Calls mirror_periodic_schedule_upsert with workflow_id=str(workflow_id), converting a None to the literal string 'None' and breaking the or None guard in tasks.py, causing silent mirror failure for any pipeline where workflow_id resolves to None.
backend/scheduler/tasks.py Mirror functions are correctly best-effort wrapped; minor gap in delete_periodic_task — mirror cleanup is skipped if an unexpected exception escapes the inner try/except, contrary to the stated 'regardless' intent.
backend/scheduler/tests/test_pg_periodic_schedule_mirror.py Six DB-free unit tests pin the major dual-write contracts; does not cover the workflow_id=None edge case that would expose the str(None) to 'None' bug.

Comments Outside Diff (1)

  1. backend/scheduler/tasks.py, line 222-231 (link)

    P2 Mirror cleanup skipped on unexpected DB exceptions

    The comment says the mirror is cleaned "regardless of whether the PeriodicTask existed", but the try/except only catches PeriodicTask.DoesNotExist. If task.delete() raises anything else (e.g., a transient DB error), the exception propagates out of the function and _mirror_periodic_schedule_delete is never called, leaving a stale mirror row. Moving the mirror call into a finally block — or widening the catch — would honour the stated guarantee.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: backend/scheduler/tasks.py
    Line: 222-231
    
    Comment:
    **Mirror cleanup skipped on unexpected DB exceptions**
    
    The comment says the mirror is cleaned "regardless of whether the `PeriodicTask` existed", but the `try/except` only catches `PeriodicTask.DoesNotExist`. If `task.delete()` raises anything else (e.g., a transient DB error), the exception propagates out of the function and `_mirror_periodic_schedule_delete` is never called, leaving a stale mirror row. Moving the mirror call into a `finally` block — or widening the catch — would honour the stated guarantee.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Claude Code

Fix All in Claude Code

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
backend/scheduler/helper.py:69-72
`str(workflow_id)` converts a Python `None` into the string literal `"None"`, which is truthy. The guard inside `mirror_periodic_schedule_upsert``workflow_id or None` — therefore never triggers for a `None` input, so Django's `UUIDField` receives `"None"`, rejects it as an invalid UUID, and the whole upsert is silently swallowed. The function's own type annotation declares `workflow_id: str | None`, and `PgPeriodicSchedule.workflow_id` is `null=True`, so passing `None` is the intended and safe path.

```suggestion
        mirror_periodic_schedule_upsert(
            pipeline_id=str(pipeline.pk),
            organization_id=organization_id or "",
            workflow_id=str(workflow_id) if workflow_id is not None else None,
```

### Issue 2 of 2
backend/scheduler/tasks.py:222-231
**Mirror cleanup skipped on unexpected DB exceptions**

The comment says the mirror is cleaned "regardless of whether the `PeriodicTask` existed", but the `try/except` only catches `PeriodicTask.DoesNotExist`. If `task.delete()` raises anything else (e.g., a transient DB error), the exception propagates out of the function and `_mirror_periodic_schedule_delete` is never called, leaving a stale mirror row. Moving the mirror call into a `finally` block — or widening the catch — would honour the stated guarantee.

Reviews (1): Last reviewed commit: "UN-3581 address review: mirror real pipe..." | Re-trigger Greptile

Comment on lines +69 to +72
mirror_periodic_schedule_upsert(
pipeline_id=str(pipeline.pk),
organization_id=organization_id or "",
workflow_id=str(workflow_id),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 str(workflow_id) converts a Python None into the string literal "None", which is truthy. The guard inside mirror_periodic_schedule_upsertworkflow_id or None — therefore never triggers for a None input, so Django's UUIDField receives "None", rejects it as an invalid UUID, and the whole upsert is silently swallowed. The function's own type annotation declares workflow_id: str | None, and PgPeriodicSchedule.workflow_id is null=True, so passing None is the intended and safe path.

Suggested change
mirror_periodic_schedule_upsert(
pipeline_id=str(pipeline.pk),
organization_id=organization_id or "",
workflow_id=str(workflow_id),
mirror_periodic_schedule_upsert(
pipeline_id=str(pipeline.pk),
organization_id=organization_id or "",
workflow_id=str(workflow_id) if workflow_id is not None else None,
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/scheduler/helper.py
Line: 69-72

Comment:
`str(workflow_id)` converts a Python `None` into the string literal `"None"`, which is truthy. The guard inside `mirror_periodic_schedule_upsert``workflow_id or None` — therefore never triggers for a `None` input, so Django's `UUIDField` receives `"None"`, rejects it as an invalid UUID, and the whole upsert is silently swallowed. The function's own type annotation declares `workflow_id: str | None`, and `PgPeriodicSchedule.workflow_id` is `null=True`, so passing `None` is the intended and safe path.

```suggestion
        mirror_periodic_schedule_upsert(
            pipeline_id=str(pipeline.pk),
            organization_id=organization_id or "",
            workflow_id=str(workflow_id) if workflow_id is not None else None,
```

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.