Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3553 [FEAT] PG Queue 9d slice 1 — leader-election lease (orchestrator_lock)#2056

Merged
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3553-leader-election-leaseZipstack/unstract:UN-3553-leader-election-leaseCopy head branch name to clipboard
Jun 15, 2026
Merged

UN-3553 [FEAT] PG Queue 9d slice 1 — leader-election lease (orchestrator_lock)#2056
muhammad-ali-e merged 3 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3553-leader-election-leaseZipstack/unstract:UN-3553-leader-election-leaseCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

First slice of 9d (the orchestrator/reaper) on the PG-queue integration branch: a leader-election lease so the reaper (and a later fair-admission gate) can run as exactly one active instance. This PR is the primitive + tests only — nothing acquires leadership yet, so it ships dark and changes no runtime behaviour.

Why now

9d was skipped in the merged spine (9c → liveness → priority → PgBarrier). It's the safety net the pipeline migration (9e) needs: without a reaper, every at-least-once hang / orphaned barrier bottoms out at the 6h TTL with no recovery. The reaper must be a singleton (multiple would contend on SKIP LOCKED and double-act on recovery), so leader election is the foundation slice.

Key decision: lease, not advisory lock

Leadership is a TTL'd row UPDATE (take it if the leader is free or its lease is stale), not pg_advisory_lock. Session-scoped advisory locks don't survive the transaction-pooled PgBouncer the queue connects through (extension-free decision) — the pooler hands out a different backend per transaction, silently dropping a session-held lock. A plain UPDATE is one transaction → pooling-safe. All time comparisons are server-side (now()), so candidate clock skew can't split leadership.

What's in it

Backend (backend/pg_queue/)

  • PgOrchestratorLock — single-row lock model (id PK default 1, leader text default "", acquired_at), CheckConstraint(id = 1), table pg_orchestrator_lock.
  • Migration 0005 — generated via makemigrations + a reversible RunPython that seeds the one free row. Free = empty leader (follows the PgQueueMessage.org_id no-nullable-text convention).

Workers (workers/queue_backend/pg_queue/leader_election.py)

  • LeaderLeasetry_acquire() / renew() / release() over the lease row; instance-owned, self-recovering connection (mirrors PgBarrier / PgQueueClient).
  • lease_seconds_from_env()WORKER_PG_ORCHESTRATOR_LEASE_SECONDS (default 10s, loud-on-misconfig).
  • default_worker_id()host:pid:rand, stable per process.

Tests (workers/tests/test_leader_election.py) — 20, real-PG. The two load-bearing properties:

  • concurrent try_acquire on a free lock → exactly one winner (5-thread race, separate connections);
  • renew() returns False after a standby took over a stale lease (the signal that stops a stalled leader).
    Plus lease-expiry takeover, release-frees-immediately, non-holder renew/release no-ops, env validation, and the single-row constraint.

Out of scope (later 9d slices)

  • The reaper process itself (acquire → renew loop → release; barrier-orphan sweep; counter reconstruction; re-enqueue/fail stuck rows) + run-worker.sh wiring + liveness.
  • worker_registry.
  • Fair-admission gate (tier / workload / burst_max via org_config) — deferred, no cross-org load yet. (Our single-table design has no staging→task admit step — a deliberate divergence from the labs two-table model.)

Testing

  • 20/20 new tests pass, stable across repeated runs; 91/91 across leader-election + pg_barrier + pg_queue_client + barrier-selection (no regression).
  • makemigrations --check --dry-run reports no drift (migration faithfully matches the model).
  • Migration applied to a dev DB cleanly.

Base: feat/UN-3445-pg-queue-integration (not main). Sub-task UN-3553 under UN-3536 (Phase 9).

…tor_lock)

First slice of 9d (orchestrator/reaper): the singleton-guarantee primitive
the reaper loop and a future fair-admission gate hang off. Ships dark —
nothing acquires leadership yet, so merging changes no runtime behaviour.

Why now: 9d was skipped in the merged spine (9c -> liveness -> priority ->
PgBarrier) and is the safety net 9e needs — without a reaper, every
at-least-once hang / orphaned barrier bottoms out at the 6h TTL with no
recovery. The reaper must run as exactly one instance, so leader election is
the foundation.

Lease, not advisory lock: leadership is a TTL'd row UPDATE (take it if the
leader is free or its lease is stale), not pg_advisory_lock. Session-scoped
advisory locks don't survive the transaction-pooled PgBouncer the queue
connects through (UN-3533) — a plain UPDATE is one transaction, pooling-safe.
All time comparisons are server-side (now()), so candidate clock skew can't
split leadership.

- backend/pg_queue: PgOrchestratorLock single-row model (id PK, leader,
  acquired_at) + CheckConstraint(id=1); generated migration 0005 + a
  reversible RunPython seeding the one free row. Free = empty leader
  (follows the PgQueueMessage.org_id no-nullable-text convention).
- workers/queue_backend/pg_queue/leader_election.py: LeaderLease
  (try_acquire/renew/release), lease_seconds_from_env() (default 10s,
  loud-on-misconfig), default_worker_id(). Instance-owned self-recovering
  connection.
- tests: 20 real-PG tests. Load-bearing properties — concurrent try_acquire
  yields exactly one winner; renew returns False after a stale-lease
  takeover (the signal that stops a stalled leader). Plus lease-expiry
  takeover, release-frees-immediately, non-holder no-ops, env validation,
  single-row constraint.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 89b46e30-de0d-4ec7-bcf5-95b7fb796cd2

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3553-leader-election-lease

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review — UN-3553 leader-election lease

Reviewed via the PR Review Toolkit (Code Reviewer, Silent Failure Hunter, Type Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier). Overall this is a clean, well-documented, correctness-focused PR — the lease SQL is split-brain-safe (single atomic conditional UPDATE, server-side now(), holder-scoped renew/release) and the semantics are well tested.

The one finding worth acting on before merge — flagged independently by four of the six agents — is the missing _owns_conn ownership guard (see inline comment on __init__): LeaderLease drops the connection-ownership distinction its sibling PgQueueClient treats as essential, so a transient DB error silently closes a caller-injected connection and reconnects against the default DB_* database. Everything else is Low severity. Inline comments below.

Comment thread workers/queue_backend/pg_queue/leader_election.py
Comment thread workers/queue_backend/pg_queue/leader_election.py Outdated
Comment thread workers/queue_backend/pg_queue/leader_election.py Outdated
Comment thread workers/queue_backend/pg_queue/leader_election.py
Comment thread workers/queue_backend/pg_queue/leader_election.py Outdated
Comment thread workers/queue_backend/pg_queue/leader_election.py Outdated
Comment thread workers/tests/test_leader_election.py
Comment thread backend/pg_queue/migrations/0005_pgorchestratorlock.py
…y tests

Toolkit + SonarCloud review on #2056:

- [High] _owns_conn ownership guard: LeaderLease now mirrors PgQueueClient —
  an injected connection is never closed/swapped on a transient error (it
  would otherwise silently re-point an injected TEST_DB_/caller connection at
  a fresh DB_-env one). _get_conn only recreates an OWNED missing/closed conn.
- [Medium] Log the owned-connection discard in _cursor (worker_id + exc type)
  — a silent rebuild on the reaper singleton correlates with missed renews.
- [Medium] Test the recovery machinery: owned-conn recovered on
  OperationalError, owned-conn recovered when rollback fails, injected-conn
  never swapped. Plus two documented-invariant gaps — same-holder re-acquire
  on a fresh lease returns False, and release after a takeover is a no-op.
- [Low] release() branches on rowcount — only logs "released" when it really
  freed the lease; a no-op release logs debug (truthful post-mortems).
- [Low] Scope the lease_seconds<=0 guard to the explicit-arg branch (dead on
  the env path, which already rejects <=0).
- [Low] Document the exception-propagation contract (raise == "leadership
  unknown, stop acting"), relabel the durable Usage example.
- [Low] Migration: note the seed row is load-bearing (future reaper-bootstrap
  should self-heal with INSERT ... ON CONFLICT DO NOTHING).
- SonarCloud S117: rename the migration's get_model local to lock_model.

25 leader-election tests pass; makemigrations --check clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review addressed — commit cca2741bf

All 8 toolkit findings + the 2 SonarCloud issues are in. Validated each against the code first; all were valid.

High / Medium (behaviour + coverage)

  • _owns_conn ownership guard — mirrors PgQueueClient. An injected connection is never closed/swapped on a transient error; only an owned missing/closed conn reconnects. This closes the real test-integrity hole (an injected TEST_DB_ conn silently re-pointing at DB_). New test pins it.
  • Silent reconnect — the owned-conn discard now logs warning (worker_id + exc type).
  • Recovery path untested — new TestConnectionRecovery (mocked): recovered-on-OperationalError, recovered-when-rollback-fails, injected-never-swapped. Plus two invariant gaps (same-holder re-acquire → False; release-after-takeover no-op). 20 → 25 tests.

Low (clarity / truthfulness)

  • release() logs "released" only when rowcount == 1; a no-op logs debug.
  • Scoped the lease_seconds <= 0 guard to the explicit-arg branch.
  • Documented the exception-propagation contract (raise == "leadership unknown, stop acting") + durable Usage: framing.
  • Migration: noted the seed row is load-bearing → future reaper-bootstrap should self-heal with INSERT ... ON CONFLICT DO NOTHING.

SonarCloud S117 — renamed the migration's get_model local to lock_model (both hits).

makemigrations --check clean; 25/25 leader-election tests pass; pre-commit green.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 15, 2026 11:11
@greptile-apps

greptile-apps Bot commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

Introduces a PostgreSQL-backed leader-election lease (LeaderLease) for the future orchestrator/reaper singleton. The mechanism uses a single conditional UPDATE … RETURNING on the pg_orchestrator_lock row rather than an advisory lock, making it safe through PgBouncer's transaction-pooled connections while keeping all time comparisons server-side.

  • LeaderLease exposes try_acquire / renew / release; each is a self-contained transaction with explicit commit/rollback and owned-connection recovery that mirrors the existing PgBarrier/PgQueueClient posture.
  • Migration 0005 creates the table, enforces the single-row invariant via CheckConstraint(id=1), and seeds the free row idempotently.
  • 20 new tests cover the two load-bearing correctness properties (exactly-one concurrent winner; renew returns False after a standby takes over a stale lease) plus env validation, connection recovery, and constraint enforcement; no runtime behaviour changes ship in this slice.

Confidence Score: 5/5

The change ships dark — nothing acquires a lease yet — so there is zero runtime impact on the existing pipeline. The SQL is atomic and pooling-safe, the error handling is consistent with the sibling implementations, and the migration is reversible.

The UPDATE-based leader election is correct: PostgreSQL serialises concurrent row-level updates, so exactly one try_acquire returns a row. The renew/release guard on leader = %s prevents a deposed leader from accidentally clearing a new holder's lease. Connection recovery (owned vs. injected) is well-tested. The one observation (fork-safety docstring) is a documentation nit for future reaper wiring, not a defect in the code as merged.

No files require special attention before merging. The docstring clarification on default_worker_id is worth addressing before the reaper process wiring lands in the next slice.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/leader_election.py Core LeaderLease implementation; SQL is atomic and pooling-safe; connection recovery logic is correct; one minor doc discrepancy around fork-safety of default_worker_id.
workers/tests/test_leader_election.py Comprehensive test suite covering the two load-bearing properties (exactly-one winner on concurrent acquire, renew-returns-False after takeover), connection-recovery branches, and constraint enforcement.
backend/pg_queue/migrations/0005_pgorchestratorlock.py Adds reversible migration creating pg_orchestrator_lock table with single-row CheckConstraint and seeds the free row; idempotent via get_or_create.
backend/pg_queue/models.py Adds PgOrchestratorLock model with clear docstring; free-equals-empty-string convention and CheckConstraint(id=1) correctly enforce the single-row invariant.
workers/queue_backend/pg_queue/init.py Exports LeaderLease, default_worker_id, and lease_seconds_from_env; all kept in alphabetical order.

Sequence Diagram

sequenceDiagram
    participant A as Candidate A (Leader)
    participant B as Candidate B (Standby)
    participant PG as pg_orchestrator_lock (row)

    Note over A,PG: Acquire (free lock)
    A->>PG: "UPDATE SET leader='A' WHERE leader='' OR acquired_at < now()-TTL"
    PG-->>A: "RETURNING id (won=True)"

    Note over A,PG: Renew cycle
    A->>PG: "UPDATE SET acquired_at=now() WHERE leader='A'"
    PG-->>A: "RETURNING id (still_leader=True)"

    Note over B,PG: Standby blocked while lease is fresh
    B->>PG: "UPDATE SET leader='B' WHERE leader='' OR acquired_at < now()-TTL"
    PG-->>B: "no row (won=False)"

    Note over A,PG: Leader stalls past TTL
    B->>PG: "UPDATE SET leader='B' WHERE leader='' OR acquired_at < now()-TTL"
    PG-->>B: RETURNING id (takeover)

    Note over A,PG: Deposed leader detects loss
    A->>PG: "UPDATE SET acquired_at=now() WHERE leader='A'"
    PG-->>A: "no row (still_leader=False, stop acting)"

    Note over A,PG: Graceful release
    A->>PG: "UPDATE SET leader='' WHERE leader='A'"
    PG-->>A: "rowcount=1 (freed)"
    B->>PG: "UPDATE SET leader='B' WHERE leader=''"
    PG-->>B: RETURNING id (immediate takeover)
Loading

Reviews (2): Last reviewed commit: "UN-3553 [FEAT] Address Greptile: idempot..." | Re-trigger Greptile

Comment thread workers/queue_backend/pg_queue/leader_election.py
Comment thread workers/queue_backend/pg_queue/leader_election.py
… log

- default_worker_id() is now cached (functools.cache) → idempotent per
  process, so a caller passing it inline in a retry/restart loop can't drift
  the worker id out from under renew()/release(). Lazy (first-call), so it's
  fixed after a fork rather than shared across children. Test asserts
  idempotency.
- renew()'s failure warning now reads "not the current leader (taken over by
  another candidate, or the lease was never held)" — accurate for the
  non-holder-renew case too, and fixes the "took over"->"taken over" grammar.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Greptile review addressed — commit a90c12618

Both P2 findings validated and fixed:

  • default_worker_id() idempotency footgun — now @functools.cache'd, so repeated/inline calls within a process return the same id (a retry loop can't drift the worker id out from under renew()/release()). Kept lazy/first-call so it's fixed after a fork, not shared across children. Test updated to assert idempotency.
  • renew() warning misleading + grammar — reworded to "not the current leader (taken over by another candidate, or the lease was never held)", which reads correctly for the non-holder-renew path too.

25/25 tests pass; pre-commit green.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit 52a28fd into feat/UN-3445-pg-queue-integration Jun 15, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3553-leader-election-lease branch June 15, 2026 15:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.