UN-3581 [FEAT] PG Queue 9h-a — pg_periodic_schedule mirror table + dual-write (inert)#2080
UN-3581 [FEAT] PG Queue 9h-a — pg_periodic_schedule mirror table + dual-write (inert)#2080muhammad-ali-e merged 2 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom feat/UN-3581-FEAT_pg_periodic_schedule_mirrorZipstack/unstract:feat/UN-3581-FEAT_pg_periodic_schedule_mirrorCopy head branch name to clipboard
Conversation
…al-write (inert) First inert slice toward replacing Celery Beat with a PG-backed periodic scheduler (to be folded into the leader-elected reaper/orchestrator loop). This slice only mirrors each scheduled pipeline's cron definition into a new Postgres table; nothing reads it yet, so behaviour is unchanged. - backend/pg_queue/models.py: new PgPeriodicSchedule (pipeline_id PK, org_id, workflow_id, pipeline_name, cron_string, enabled; last_run_at/next_run_at left NULL — the scheduler tick owns all cron computation in the next slice). Index on (enabled, next_run_at) for the future "due schedules" query. - migration 0008 (generated; makemigrations --check clean). - backend/scheduler/tasks.py: dual-write the mirror from the four schedule choke-points (create/update, pause, resume, delete) that already manage the django_celery_beat PeriodicTask. Toggles are placed right after task.save() so a downstream pipeline-status failure can't desync the mirror. Every mirror write is best-effort (try/except + log) — a mirror failure can NEVER break the existing Beat scheduling path. - tests: 6 DB-free unit tests (field extraction, enable/disable, delete, delete-when-PeriodicTask-missing, best-effort swallow). Inert / non-regressive: nothing reads the table; the PeriodicTask + celery-beat container are untouched. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review — pg_periodic_schedule inert mirror
Ran the PR Review Toolkit (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier) against this single-commit PR. The inert/best-effort guarantee is correctly implemented and well-tested; migration matches the model. Findings below are scoped accordingly — most are Low/Medium because nothing reads the table yet, but two should be addressed before the table goes live (next slice).
Prioritized summary
| # | Severity | Finding | Location |
|---|---|---|---|
| 1 | High | pipeline_name mirrors the synthetic label "Pipeline job-<uuid>", not a real pipeline name |
tasks.py:118 |
| 2 | Medium | enable/disable/delete use filter().update()/.delete() → silent no-op + asymmetric divergence vs the update_or_create upsert; mirror can never self-heal a missing row |
tasks.py:56 |
| 3 | Medium | updated_at (auto_now) is NOT advanced by the .update() enable/disable path |
models.py:264 |
| 4 | Medium | Positional task_args coupling silently degrades to placeholder data; the swallowed IndexError hides arg drift |
tasks.py:116 |
| 5 | Medium | cron_string is unvalidated free text, unlike every sibling model's domain CheckConstraint |
models.py:257 |
| 6 | Medium | Test gaps: set_enabled/delete failure-swallowing, short task_args, enabled=False, and no DB-backed/makemigrations --check to catch schema drift behind the mock |
tests file |
| 7 | Low | Helper params typed Any; upstream already guarantees str / `str |
None` |
| 8 | Low | Test fixture _NAME="Nightly ETL" pins a contract production never produces |
tests:18 |
| 9 | Nit | Index name pg_sched_due_idx drops the full-table prefix used by siblings |
models.py:270 |
Green (confirmed correct): the swallow-and-log best-effort pattern, ordering so a mirror failure can't roll back the Beat write, delete cleaning the mirror outside the DoesNotExist try/except, the migration↔model match, and the no-NULL-text convention. The code-simplifier found no churn worth making.
Inline comments follow.
| pipeline_id=task_name, | ||
| workflow_id=task_args[0] if len(task_args) > 0 else None, | ||
| organization_id=task_args[1] if len(task_args) > 1 else "", | ||
| pipeline_name=task_args[6] if len(task_args) > 6 else "", |
There was a problem hiding this comment.
High — pipeline_name stores the synthetic job label, not the pipeline name.
Flagged independently by the code-review, silent-failure, comment, and test agents. task_args[6] on this path is the name built in SchedulerHelper.add_or_update_job as f"Pipeline job-{pipeline.id}" (helper.py:68 → helper.py:60). So this column is always persisted as "Pipeline job-<uuid>" — effectively a prefixed duplicate of pipeline_id, never the user-facing name the field name implies. (The other producer, piepline_api_execution_views.py, does pass the real pipeline.pipeline_name at index 6, but it dispatches via celery_app.send_task and never mirrors — so the mirror only ever sees the synthetic label.)
The broad except can't catch this (it's a valid string write) and the unit test feeds _NAME="Nightly ETL", hiding it.
Fix: source the real name from the Pipeline object and pass it explicitly, OR drop pipeline_name from this inert slice rather than persisting a misleading value. At minimum, correct the layout comment and the test fixture so the false contract isn't pinned.
There was a problem hiding this comment.
Fixed in b856507 — the mirror upsert moved to SchedulerHelper._schedule_task_job, which holds the Pipeline object and now passes pipeline_name=pipeline.pipeline_name (the real user-facing name). create_or_update_periodic_task no longer mirrors / parses args. Added a helper-wiring test asserting the real name flows (not the synthetic Pipeline job-<id> label).
|
|
||
| def _mirror_periodic_schedule_set_enabled(pipeline_id: Any, enabled: bool) -> None: | ||
| try: | ||
| PgPeriodicSchedule.objects.filter(pipeline_id=pipeline_id).update(enabled=enabled) |
There was a problem hiding this comment.
Medium — silent no-op desync; asymmetric with the upsert path.
filter(pipeline_id=...).update(...) (and the .delete() sibling at line 66) silently match zero rows if the mirror row doesn't exist — e.g. a pipeline scheduled before this code shipped, or one whose update_or_create upsert previously failed and was swallowed. The Beat PeriodicTask stays correct, but the mirror diverges with no signal, and because enable/disable only updates-if-present it can never self-heal: a pipeline that is only ever paused/resumed is never backfilled. The future scheduler that reads enabled+next_run_at would then silently never fire (or fire a paused) pipeline.
This asymmetry (update_or_create on create vs filter().update() here) is the mechanism. Acceptable for the inert slice, but worth hardening before the table is read.
Fix: standardize the mutating paths on update_or_create semantics so every mirror mutation converges regardless of prior row existence, OR check the .update()/.delete() rowcount and logger.info when it's 0 so the gap is observable.
There was a problem hiding this comment.
Addressed in b856507 — _mirror_periodic_schedule_set_enabled now checks the .update() rowcount and logger.infos when it matches 0 (a pre-existing/unmirrored schedule), so the gap is observable. Full self-heal/backfill of pre-existing schedules lands with the scheduler that reads the table (②b) — agreed it's not needed while the table is inert. +test asserting the 0-row log.
| last_run_at = models.DateTimeField(null=True, blank=True) | ||
| next_run_at = models.DateTimeField(null=True, blank=True) | ||
| created_at = models.DateTimeField(default=timezone.now) | ||
| updated_at = models.DateTimeField(auto_now=True) |
There was a problem hiding this comment.
Medium — updated_at (auto_now=True) is not bumped by the enable/disable path.
Django's auto_now fires only on Model.save()/update_or_create, not on queryset .update(). _mirror_periodic_schedule_set_enabled (tasks.py:56) uses .filter().update(enabled=...), so pause/resume changes enabled without advancing updated_at. The upsert path bumps it correctly, making updated_at an unreliable "last changed" signal that silently misses enable/disable mutations.
Fix: add "updated_at": timezone.now() to the .update(...) call in _mirror_periodic_schedule_set_enabled, or add a one-line comment that updated_at intentionally tracks only definition upserts.
There was a problem hiding this comment.
Fixed in b856507 — .update(enabled=..., updated_at=timezone.now()); queryset .update() doesn't fire auto_now, so this keeps updated_at accurate on pause/resume. Dev-tested: updated_at advances on disable.
| # [workflow_id, organization_id, execution_action, "", pipeline_id, False, name]. | ||
| _mirror_periodic_schedule_upsert( | ||
| pipeline_id=task_name, | ||
| workflow_id=task_args[0] if len(task_args) > 0 else None, |
There was a problem hiding this comment.
Medium — positional task_args coupling silently degrades to placeholder data.
The task_args[N] if len(task_args) > N else <default> guards prevent an IndexError crash (good), but they convert a contract drift into silent placeholder data: if a future change reorders or shortens task_args (note the TODO: Remove unused args with a migration on this very contract), the mirror quietly stores organization_id="", workflow_id=None, etc., with no log. Worse, since this runs inside _mirror_periodic_schedule_upsert's broad except, even an off-by-one IndexError would be swallowed and silently drop the mirror row — and the short-list branches have zero test coverage.
Fix: create_or_update_periodic_task is called from _schedule_task_job, which already has workflow_id/organization_id/name as named locals (helper.py:41-44). Pass them as named params instead of re-parsing the serialized positional list. If positional parsing must stay, logger.warning when len(task_args) is shorter than expected.
There was a problem hiding this comment.
Fixed in b856507 — dropped the positional task_args[N] parsing entirely. The upsert is now driven from _schedule_task_job with named locals (workflow_id, organization_id, pipeline.pipeline_name, pipeline.pk), so a future task_args reorder/shorten can't silently degrade the mirror to placeholders.
| organization_id = models.TextField(blank=True, default="") | ||
| workflow_id = models.UUIDField(null=True, blank=True) | ||
| pipeline_name = models.TextField(blank=True, default="") | ||
| cron_string = models.TextField() |
There was a problem hiding this comment.
Medium — cron_string is unvalidated free text, against this file's own convention.
Every sibling model encodes its domain bounds as a DB constraint (PgQueueMessage.priority range, PgBatchDedup.batch_index >= 0, PgBarrierState.expires_at > created_at), yet cron_string is a bare TextField() with no validator and no length cap — despite the Beat path requiring exactly 5 whitespace-separated fields (cron_string.split() at tasks.py:85). A malformed value mirrors silently and only fails later in the future scheduler tick, far from the write site.
Fix: a cron CheckConstraint is impractical, so validate-at-write is the pragmatic fit: check the 5-field split at the helper boundary (where the cron is already in hand) before mirroring, and/or add a MaxLengthValidator/length cap. Don't over-engineer a regex into the schema.
There was a problem hiding this comment.
The structural 5-field validation already exists upstream and runs before the mirror: create_or_update_periodic_task does minute, hour, dom, moy, dow = cron_string.split() (raises ValueError on a non-5-field value), and the mirror upsert now runs after that in the same helper call — so a malformed cron never reaches the table. Full cron semantics are validated by croniter in ②b (the reader/next-run computation), where a parse failure is actionable. Kept TextField per your 'don't over-engineer a regex into the schema' note. Added a comment to that effect.
| # Must not raise, and the mirror is still cleaned. | ||
| tasks.delete_periodic_task(_PIPELINE_ID) | ||
|
|
||
| sched.objects.filter.return_value.delete.assert_called_once() |
There was a problem hiding this comment.
Medium — test gaps (pr-test-analyzer).
The happy-path contract is well covered and follows the DB-free pg_queue/tests/test_producer.py convention, but several gaps could hide real bugs once the table goes live:
- Failure-swallowing is tested for the upsert only. Add tests that make
sched.objects.filter(...).update/.deleteraise and assertdisable_task/enable_task/delete_periodic_taskdon't propagate and still callPipelineProcessor.update_pipeline— this is the central "never break Beat" guarantee for 2 of 3 helpers, currently unverified (tasks.py:54-71). - Short
task_args— every test uses the full 7-element list, so theelsefallbacks (tasks.py:116-118) and the swallowed-IndexErrorpath are never exercised. Call with[]/a 2-element list and assert the defaults land. enabled=Falsepropagation throughcreate_or_update_periodic_task(real callers passenabled=pipeline.active).- Schema drift is invisible — the model is fully mocked, so a field-name mismatch between the upsert kwargs and the new model/migration passes green and fails only at runtime (then gets swallowed). Add one
@pytest.mark.django_dbround-trip, or a CImakemigrations --checkgate.
There was a problem hiding this comment.
Addressed in b856507: added (1) failure-swallow tests for disable/enable/delete asserting they don't propagate AND still call update_pipeline; (3) enabled=False upsert test; the 0-row-match log test. (2) short-task_args is now moot — the positional parsing was removed (named params from the helper). (4) schema-drift: makemigrations --check is clean and the real-DB lifecycle dev-test (CREATE→DISABLE→ENABLE→DELETE) exercises the actual model round-trip; a django_db unit round-trip needs the gated unit-backend rig group (postgres) which doesn't list scheduler/tests yet — noted as the same gap prior pg_queue PRs have.
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| def _mirror_periodic_schedule_upsert( |
There was a problem hiding this comment.
Low — helper params typed Any hide a known contract.
The upstream caller already guarantees these are strings: str(pipeline.pk), str(workflow_id), str(name), and organization_id from UserContext.get_organization_identifier() (helper.py:48-61). Tightening to pipeline_id: str, organization_id: str, workflow_id: str | None, pipeline_name: str (keep cron_string: str, enabled: bool) costs nothing and documents that the mirror receives stringified UUIDs the UUIDField then coerces.
There was a problem hiding this comment.
Fixed in b856507 — mirror_periodic_schedule_upsert params tightened to pipeline_id: str, organization_id: str, workflow_id: str | None, pipeline_name: str (+ cron_string: str, enabled: bool).
| _PIPELINE_ID = "11111111-1111-1111-1111-111111111111" | ||
| _WORKFLOW_ID = "22222222-2222-2222-2222-222222222222" | ||
| _ORG = "org_abc" | ||
| _NAME = "Nightly ETL" |
There was a problem hiding this comment.
Low — fixture pins a contract production never produces.
_NAME = "Nightly ETL" at index 6 (asserted as pipeline_name) masks the High finding on tasks.py:118 — the real value flowing through SchedulerHelper is "Pipeline job-<uuid>". Use a representative fixture, e.g. _NAME = f"Pipeline job-{_PIPELINE_ID}", so the test reflects reality rather than enshrining a false expectation.
There was a problem hiding this comment.
Resolved by the High fix (b856507): the mirror now stores the real pipeline.pipeline_name, so a representative name in the fixture is now the correct expectation, not a false one. The new helper-wiring test explicitly passes the synthetic Pipeline job-<id> label as the PeriodicTask name and asserts the mirror still records the real pipeline name — pinning the right contract.
| db_table = "pg_periodic_schedule" | ||
| indexes = [ | ||
| # Drives the (future) "due schedules" dequeue in the scheduler tick. | ||
| models.Index(fields=["enabled", "next_run_at"], name="pg_sched_due_idx"), |
There was a problem hiding this comment.
Nit — index name drops the table prefix used by siblings.
Sibling indexes use the full table name as prefix (pg_queue_message_dequeue_idx, pg_barrier_expires_idx); this one abbreviates to pg_sched_due_idx. Cosmetic only — rename to pg_periodic_schedule_due_idx for consistency if you're already churning the migration, otherwise leave it (a rename costs a migration).
There was a problem hiding this comment.
Fixed in b856507 — renamed to pg_periodic_schedule_due_idx (matches the sibling pg_queue_message_dequeue_idx / pg_barrier_expires_idx convention). Migration 0008 regenerated in place (not yet merged, so no extra migration).
… toggles, tighten types - [High] pipeline_name now sourced from the Pipeline object in SchedulerHelper._schedule_task_job (pipeline.pipeline_name), not task_args[6] — which carries the synthetic "Pipeline job-<id>" label, never the user name. The mirror upsert moves to the helper (clean named ids, no positional arg parsing); create_or_update_periodic_task is Beat-only again. - [Med] _mirror_periodic_schedule_set_enabled now bumps updated_at explicitly (queryset .update() does not fire auto_now) and logs when it matches 0 rows (a pre-existing/unmirrored schedule — backfill lands in ②b). - [Med] dropped the positional task_args[N] parsing entirely (the helper passes named fields), removing the silent-placeholder-on-contract-drift risk. - [Low] tightened mirror helper param types (str / str | None) — the callers already pass stringified UUIDs. - [Nit] index renamed pg_sched_due_idx -> pg_periodic_schedule_due_idx (sibling convention); migration 0008 regenerated (not yet merged). - tests: cover the standalone upsert (+enabled=False, +failure-swallow), the helper wiring proving the real pipeline_name flows, disable/enable/delete failure-swallow + still-calls-update_pipeline, and the 0-row log. 11 tests. cron_string validation: the existing 5-field split unpack in create_or_update_periodic_task already gates the Beat path before the mirror runs; full cron semantics are validated by croniter in ②b (the reader). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review round 1 addressed —
|
|
87f9ede
into
feat/UN-3445-pg-queue-integration
|
| Filename | Overview |
|---|---|
| backend/pg_queue/migrations/0008_pgperiodicschedule.py | Auto-generated migration creating pg_periodic_schedule; fields and composite index match the model exactly. |
| backend/pg_queue/models.py | New PgPeriodicSchedule model is well-structured; nullable workflow_id and null=True, blank=True on timing fields are correct for the inert phase. |
| backend/scheduler/helper.py | Calls mirror_periodic_schedule_upsert with workflow_id=str(workflow_id), converting a None to the literal string 'None' and breaking the or None guard in tasks.py, causing silent mirror failure for any pipeline where workflow_id resolves to None. |
| backend/scheduler/tasks.py | Mirror functions are correctly best-effort wrapped; minor gap in delete_periodic_task — mirror cleanup is skipped if an unexpected exception escapes the inner try/except, contrary to the stated 'regardless' intent. |
| backend/scheduler/tests/test_pg_periodic_schedule_mirror.py | Six DB-free unit tests pin the major dual-write contracts; does not cover the workflow_id=None edge case that would expose the str(None) to 'None' bug. |
Comments Outside Diff (1)
-
backend/scheduler/tasks.py, line 222-231 (link)Mirror cleanup skipped on unexpected DB exceptions
The comment says the mirror is cleaned "regardless of whether the
PeriodicTaskexisted", but thetry/exceptonly catchesPeriodicTask.DoesNotExist. Iftask.delete()raises anything else (e.g., a transient DB error), the exception propagates out of the function and_mirror_periodic_schedule_deleteis never called, leaving a stale mirror row. Moving the mirror call into afinallyblock — or widening the catch — would honour the stated guarantee.Prompt To Fix With AI
This is a comment left during a code review. Path: backend/scheduler/tasks.py Line: 222-231 Comment: **Mirror cleanup skipped on unexpected DB exceptions** The comment says the mirror is cleaned "regardless of whether the `PeriodicTask` existed", but the `try/except` only catches `PeriodicTask.DoesNotExist`. If `task.delete()` raises anything else (e.g., a transient DB error), the exception propagates out of the function and `_mirror_periodic_schedule_delete` is never called, leaving a stale mirror row. Moving the mirror call into a `finally` block — or widening the catch — would honour the stated guarantee. How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
backend/scheduler/helper.py:69-72
`str(workflow_id)` converts a Python `None` into the string literal `"None"`, which is truthy. The guard inside `mirror_periodic_schedule_upsert` — `workflow_id or None` — therefore never triggers for a `None` input, so Django's `UUIDField` receives `"None"`, rejects it as an invalid UUID, and the whole upsert is silently swallowed. The function's own type annotation declares `workflow_id: str | None`, and `PgPeriodicSchedule.workflow_id` is `null=True`, so passing `None` is the intended and safe path.
```suggestion
mirror_periodic_schedule_upsert(
pipeline_id=str(pipeline.pk),
organization_id=organization_id or "",
workflow_id=str(workflow_id) if workflow_id is not None else None,
```
### Issue 2 of 2
backend/scheduler/tasks.py:222-231
**Mirror cleanup skipped on unexpected DB exceptions**
The comment says the mirror is cleaned "regardless of whether the `PeriodicTask` existed", but the `try/except` only catches `PeriodicTask.DoesNotExist`. If `task.delete()` raises anything else (e.g., a transient DB error), the exception propagates out of the function and `_mirror_periodic_schedule_delete` is never called, leaving a stale mirror row. Moving the mirror call into a `finally` block — or widening the catch — would honour the stated guarantee.
Reviews (1): Last reviewed commit: "UN-3581 address review: mirror real pipe..." | Re-trigger Greptile
| mirror_periodic_schedule_upsert( | ||
| pipeline_id=str(pipeline.pk), | ||
| organization_id=organization_id or "", | ||
| workflow_id=str(workflow_id), |
There was a problem hiding this comment.
str(workflow_id) converts a Python None into the string literal "None", which is truthy. The guard inside mirror_periodic_schedule_upsert — workflow_id or None — therefore never triggers for a None input, so Django's UUIDField receives "None", rejects it as an invalid UUID, and the whole upsert is silently swallowed. The function's own type annotation declares workflow_id: str | None, and PgPeriodicSchedule.workflow_id is null=True, so passing None is the intended and safe path.
| mirror_periodic_schedule_upsert( | |
| pipeline_id=str(pipeline.pk), | |
| organization_id=organization_id or "", | |
| workflow_id=str(workflow_id), | |
| mirror_periodic_schedule_upsert( | |
| pipeline_id=str(pipeline.pk), | |
| organization_id=organization_id or "", | |
| workflow_id=str(workflow_id) if workflow_id is not None else None, |
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/scheduler/helper.py
Line: 69-72
Comment:
`str(workflow_id)` converts a Python `None` into the string literal `"None"`, which is truthy. The guard inside `mirror_periodic_schedule_upsert` — `workflow_id or None` — therefore never triggers for a `None` input, so Django's `UUIDField` receives `"None"`, rejects it as an invalid UUID, and the whole upsert is silently swallowed. The function's own type annotation declares `workflow_id: str | None`, and `PgPeriodicSchedule.workflow_id` is `null=True`, so passing `None` is the intended and safe path.
```suggestion
mirror_periodic_schedule_upsert(
pipeline_id=str(pipeline.pk),
organization_id=organization_id or "",
workflow_id=str(workflow_id) if workflow_id is not None else None,
```
How can I resolve this? If you propose a fix, please make it concise.
What & why
First inert slice toward replacing Celery Beat with a Postgres-backed periodic scheduler. The downstream execution is already PG-capable via the transport seam — this track migrates only the trigger (Beat → a leader-elected scheduler loop, to be folded into the reaper/orchestrator).
This slice only mirrors each scheduled pipeline's cron definition into a new table. Nothing reads it yet, so behaviour is unchanged — it's the inert primitive the next slice builds on.
Changes
backend/pg_queue/models.py— newPgPeriodicSchedule(one row per scheduled pipeline):pipeline_idPK,organization_id,workflow_id,pipeline_name,cron_string,enabled.last_run_at/next_run_atare left NULL here — the scheduler tick owns all cron computation in the next slice. Index on(enabled, next_run_at)for the future "due schedules" query.makemigrations --checkclean.backend/scheduler/tasks.py— dual-write the mirror from the four schedule choke-points that already manage thedjango_celery_beatPeriodicTask: create/update (upsert, fields from the documentedtask_argslayout), pause/resume (toggleenabled), delete. Toggles are placed right aftertask.save()so a downstream pipeline-status failure can't desync the mirror. Every mirror write is best-effort (try/except+ log) — a mirror failure can never break the existing Beat scheduling path.Non-regression
Nothing reads the new table; the
PeriodicTaskand thecelery-beatcontainer are untouched. Best-effort writes isolate the mirror from the Beat path.Dev-test
makemigrations --checkclean.Targets
feat/UN-3445-pg-queue-integration(notmain). Sub-task UN-3581.🤖 Generated with Claude Code