UN-3651 [FIX] PG barrier enqueue — reconnect-retry on stale cached connection#2121
UN-3651 [FIX] PG barrier enqueue — reconnect-retry on stale cached connection#2121muhammad-ali-e merged 2 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3651-pg-barrier-reconnect-retryZipstack/unstract:UN-3651-pg-barrier-reconnect-retryCopy head branch name to clipboard
Conversation
…nnection A transient `psycopg2.OperationalError: server closed the connection unexpectedly` during the PgBarrier enqueue (the barrier UPSERT) aborted the whole ETL execution -> status ERROR with 0 files. PgBarrier keeps a thread-local cached connection reused across barrier ops; while idle between ops it can be reaped server-side (PgBouncer server_idle_timeout / DB failover) and `_get_conn` can't tell (conn.closed is client-side only), so the first statement after the idle gap fails. `_cursor` recovered the dead conn but did not retry the op. Fix: a one-shot reconnect-retry (`_run_idempotent_write`, attempts=2) scoped to the IDEMPOTENT, pre-dispatch barrier write only — the UPSERT (ON CONFLICT DO UPDATE -> same row/state) + the per-execution dedup reset. Re-running them after an ambiguous commit is a no-op and no header has been dispatched yet, so a retry can neither duplicate a row nor double-dispatch work. Idempotency boundary preserved: the non-idempotent decrement (remaining - 1) and claim_batch stay on the plain `_cursor` (recover-but-don't-retry) — a re-applied decrement could fire the callback early. Scope: workers only, under the pg_queue_enabled flag (PgBarrier runs only on the PG transport; flag off -> CeleryChordBarrier, this code never runs -> zero Celery-path regression). Tests: +3 unit tests (retry-then-succeed / no-retry-on-non-connection-error / reraise-after-exhaust); full PgBarrier suite 54 passed; ruff clean. Dev-tested live (A/B against real Postgres via pg_terminate_backend): old path raises + loses the write; new path reconnects, retries, and the write lands. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_barrier.py | Adds _run_idempotent_pre_dispatch_write with a one-shot retry for stale-connection OperationalError/InterfaceError; extracts _reset_barrier closure from enqueue(); decrement and claim_batch paths are correctly left unchanged. |
| workers/tests/test_pg_barrier.py | Adds TestIdempotentWriteRetry (3 unit tests covering success, non-connection error passthrough, and exhaustion re-raise) plus one end-to-end integration test pinning the enqueue wiring through a real DB connection stub. |
Reviews (2): Last reviewed commit: "UN-3651 [FIX] address PR #2121 review — ..." | Re-trigger Greptile
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated review via PR Review Toolkit (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).
Verdict: the fix is correct and the idempotency boundary holds. The retry control flow is sound (bounded, re-raises on exhaustion, no silent-loss path), the decrement/claim_batch correctly stay off the retry path, and all the docstring's structural claims (pre-dispatch ordering, plain-_cursor exclusions) check out. Findings below are about observability and test/comment precision, not correctness. None are blocking.
Prioritised:
- HIGH — retry warning discards the triggering exception and hard-asserts "lost its DB connection" (pg_barrier.py:170)
- HIGH — the
enqueue()self-heal wiring has no test; reverting to the old inline_cursor()would still pass everything (test:173) - MEDIUM — catch is broader than stale-connection; retries server-side errors with no backoff (pg_barrier.py:163)
- MEDIUM — "fire the callback twice" rationale is inaccurate (pg_barrier.py:154);
InterfaceErrorbranch untested (test:180); idempotency invariant is docs-only (pg_barrier.py:142) - LOW —
Final/naming, "same state" overstatement, redundant call-site comment, dead test counters
| if attempt >= _BARRIER_WRITE_ATTEMPTS: | ||
| raise | ||
| logger.warning( | ||
| "PgBarrier: %s lost its DB connection (attempt %d/%d); " |
There was a problem hiding this comment.
[HIGH — silent failure / observability] When the retry succeeds, the original OperationalError/InterfaceError is gone forever — no exc_info, and the exception type/message isn't interpolated. You can't tell a benign PgBouncer idle reap from a recurring statement timeout or deadlock that the retry is silently papering over. The message also hard-asserts "lost its DB connection", which (see line 163) will be a lie for many of the conditions this except actually catches.
Suggested:
except (psycopg2.OperationalError, psycopg2.InterfaceError) as exc:
if attempt >= _BARRIER_WRITE_ATTEMPTS:
raise
logger.warning(
"PgBarrier: %s — DB write failed (%s: %s); cached connection "
"was likely stale, reconnecting and retrying (attempt %d/%d)",
what, type(exc).__name__, exc, attempt, _BARRIER_WRITE_ATTEMPTS,
)| with _cursor() as cur: | ||
| operation(cur) | ||
| return | ||
| except (psycopg2.OperationalError, psycopg2.InterfaceError): |
There was a problem hiding this comment.
[MEDIUM — broad catch + no backoff] psycopg2.OperationalError is the base class for many server-side, non-connection conditions: QueryCanceled (statement_timeout), DeadlockDetected/SerializationFailure (via TransactionRollbackError), TooManyConnections, AdminShutdown, etc. All are now caught, mislabeled (line 170), and immediately re-run with zero backoff — which just adds load against an already-struggling DB. Re-running the idempotent UPSERT once is correctness-safe, so this isn't a bug, but consider either (a) narrowing the retry to genuine connection-drop signals (e.g. only when _cursor actually discarded the cached conn), or at minimum (b) excluding TransactionRollbackError so deadlocks aren't blindly hammered. A small fixed sleep before the retry would also widen the self-heal window during a DB failover (the idle-reap case succeeds instantly either way).
|
|
||
| Deliberately NOT used for the barrier **decrement** (``remaining = | ||
| remaining - 1``): that is not idempotent — a re-applied decrement could drive | ||
| ``remaining`` to 0 early and fire the callback twice — so it stays on the |
There was a problem hiding this comment.
[MEDIUM — comment inaccuracy] "fire the callback twice" isn't the actual failure mode. Decrements each run in their own committed txn, serialized by the row lock, and the callback fires only when a decrement's RETURNING observes remaining == 0. A double-applied decrement skips over 0 (1→0 committed-but-unacked, retry 0→-1 → returns -1 → abandoned branch), so two observations of exactly 0 can't occur. The real hazards — still justifying the exclusion — are firing the callback prematurely with incomplete results, or skipping past 0 and stranding the barrier to expiry. Suggest: "...double-counts: it can drive remaining to 0 early and fire the callback prematurely with incomplete results, or skip past 0 and strand the barrier."
| connection was dead. | ||
|
|
||
| Restricted to **idempotent** statements run BEFORE any task dispatch — the | ||
| barrier UPSERT (``ON CONFLICT … DO UPDATE`` → same row, same state) + the |
There was a problem hiding this comment.
[LOW — comment precision] "same row, same state" slightly overstates idempotency: the UPSERT's DO UPDATE re-runs created_at = now() and expires_at = ... (lines 375-376), so timestamps advance on a retry. Harmless here (retries are milliseconds apart, execution_id is the PK so no row duplication), but a future reader relying on created_at for staleness/reaper logic could be surprised. Suggest softening to "→ same row, remaining/results reset identically (timestamps refresh, harmlessly)."
| _BARRIER_WRITE_ATTEMPTS = 2 | ||
|
|
||
|
|
||
| def _run_idempotent_write(operation: Callable[[Any], None], *, what: str) -> None: |
There was a problem hiding this comment.
[MEDIUM — type design] The defining invariant — that operation must be idempotent — lives entirely in the docstring; Callable[[Any], None] would happily accept the non-idempotent decrement. The -> None return is a nice structural guard (it blocks passing a RETURNING-reading op like claim_batch), but idempotency itself is unenforceable in Python's type system. Pragmatic mitigations: keep it private (done) and consider a name that makes the contract impossible to miss at call sites, e.g. _run_idempotent_pre_dispatch_write.
Also minor: the cursor is typed Any here, but the module already imports connection as PgConnection under TYPE_CHECKING — adding cursor as PgCursor and typing Callable[[PgCursor], None] would tighten intent at zero runtime cost (worth doing file-wide for consistency).
| # dead conn, so a single retry runs against a freshly reconnected one. This | ||
| # turns a transient blip (which previously aborted the whole execution at | ||
| # barrier enqueue) into a self-heal. | ||
| _BARRIER_WRITE_ATTEMPTS = 2 |
There was a problem hiding this comment.
[LOW — polish] _BARRIER_WRITE_ATTEMPTS = 2 means "total attempts", so 2 = "one retry" — a name/value relationship only the docstring disambiguates. Suggest _BARRIER_WRITE_ATTEMPTS: Final = 2 # total attempts: 1 initial + 1 retry. Keeping it a literal (not env-driven) is the right call — it keeps the idempotency bound from being weakened operationally.
|
|
||
| # Both statements are idempotent and run BEFORE any header dispatch, | ||
| # so a one-shot retry on a stale cached connection can't duplicate | ||
| # state or double-dispatch — see _run_idempotent_write. |
There was a problem hiding this comment.
[LOW — redundancy] This 3-line comment restates the _run_idempotent_write docstring it points to (directly above the call). It's the kind of duplicated rationale that rots independently of the source of truth. Suggest trimming to a one-liner pointer, e.g. # Idempotent + pre-dispatch → safe to retry; see _run_idempotent_write.
| pg_barrier._local.conn = None | ||
|
|
||
|
|
||
| class TestIdempotentWriteRetry: |
There was a problem hiding this comment.
[HIGH — test gap] All three tests exercise _run_idempotent_write in isolation with a no-op operation — none go through the actual enqueue() path the fix protects. Concrete regression this misses: reverting enqueue to the old inline with _cursor() as cur: (the exact pre-fix code) leaves _run_idempotent_write as dead code and silently removes the self-heal, yet every test here still passes. Suggest an integration test using the real barrier_db fixture: set pg_barrier._local.conn to a stub whose first .execute raises OperationalError, monkeypatch create_pg_connection to return the real test connection, call PgBarrier().enqueue(...), then assert (a) the pg_barrier_state row exists exactly once with remaining == N, and (b) header dispatch was called exactly N times. This pins both the self-heal and the "headers still dispatched, no double-dispatch" idempotency claim end-to-end.
| never silently swallow a real (non-connection) failure. | ||
| """ | ||
|
|
||
| def test_retries_once_on_dead_connection_then_succeeds( |
There was a problem hiding this comment.
[MEDIUM — test gap] Both except clauses (this fix at pg_barrier.py:163 and _cursor at :119) catch (OperationalError, InterfaceError), but all three tests raise only OperationalError. InterfaceError ("connection already closed") is arguably the more common psycopg2 symptom of a stale cached socket — exactly this PR's target scenario. Narrowing either clause to drop InterfaceError would break the self-heal with no failing test. Suggest parametrizing this test over [psycopg2.OperationalError("server closed..."), psycopg2.InterfaceError("connection already closed")]. While here, also asserting dead.commits == 0 (no partial commit on the failed attempt) and a caplog warning would lock the "log on retry, not on success" contract.
| self._execute_error = execute_error | ||
| self.commits = 0 | ||
| self.rollbacks = 0 | ||
| self.executes = 0 |
There was a problem hiding this comment.
[LOW — dead test scaffolding] _FakeConn increments self.rollbacks (line 159) and self.executes (line 149) but neither counter is ever asserted anywhere — test 1 counts invocations via its own attempts list. The rollback() method must stay (called by _cursor's recovery path), but the two counters are dead state. Either drop them, or better, assert on them (e.g. dead.executes == 1, healthy.executes == 1) to tightly bound the retry to exactly one extra attempt. Also closed is bool here but real psycopg2 connection.closed is an int (0/1/2) — fine for the truthiness check, minor fidelity drift.
…g, tests Review feedback (greptile + PR-review-toolkit) — all non-blocking; fix was confirmed correct. Changes: - Retry log now names the real error (type + message) and keeps the traceback (exc_info=True); reworded so it no longer asserts a connection drop the broad psycopg2 catch can't be sure of. (HIGH — observability) - Add a small fixed backoff (0.5s) before the retry — widens the self-heal window on a brief failover and avoids immediately re-hammering a struggling DB on the server-side errors the broad OperationalError catch also covers. (MED) - Rename _run_idempotent_write -> _run_idempotent_pre_dispatch_write and type the op as Callable[[PgCursor], None] so the idempotent/pre-dispatch contract is loud at the call site (it can't be type-enforced). (MED — type design) - _BARRIER_WRITE_ATTEMPTS: Final = 2 # total attempts; doc/comment precision: decrement hazard reworded (premature/incomplete callback or strand, not "fire twice"), "same state" softened (timestamps refresh harmlessly), trimmed the redundant call-site comment to a one-line pointer. (LOW) - Tests: add an end-to-end self-heal test through enqueue() against the real DB (asserts the row lands once + every header dispatched exactly once — pins the wiring a no-DB test can't); parametrize the retry test over OperationalError AND InterfaceError; assert no partial commit (commits==0), exactly one extra attempt (executes counters), and the on-retry warning via caplog. (HIGH + MED) Full PgBarrier suite: 56 passed; ruff clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Thanks for the thorough review 🙏 — all findings addressed in Observability (HIGH + greptile P2)
Broad catch / no backoff (MEDIUM)
Type design / contract (MEDIUM)
Comment precision (MEDIUM + LOW)
Tests (HIGH + MEDIUM + LOW)
Full PgBarrier suite: 56 passed; ruff clean. SonarCloud gate green. |
2159ffd
into
feat/UN-3445-pg-queue-integration
|
What
A transient
psycopg2.OperationalError: server closed the connection unexpectedlyduring the PgBarrier enqueue (the barrier UPSERT) aborted the whole ETL execution → statusERRORwith 0 files created.Root cause
PgBarrier keeps a thread-local cached Postgres connection reused across barrier ops. While idle between ops it can be reaped server-side (PgBouncer
server_idle_timeout/ DB failover / TCP teardown), and_get_conncan't tell —conn.closedis a client-side flag only — so the first statement after the idle gap fails. The existing_cursorrecovers the dead conn (so the next call reconnects) but does not retry the failed op → the execution aborts.Fix
A one-shot reconnect-retry (
_run_idempotent_write, attempts=2) scoped to the idempotent, pre-dispatch barrier write only — the UPSERT (ON CONFLICT … DO UPDATE→ same row/state) + the per-execution dedup reset. Re-running them after an ambiguous commit is a no-op, and no header has been dispatched yet, so a retry can neither duplicate a row nor double-dispatch work.Idempotency boundary preserved: the non-idempotent decrement (
remaining - 1) andclaim_batchstay on the plain_cursor(recover-but-don't-retry) — a re-applied decrement could fire the callback early, so it must never auto-retry.Scope / non-regression
Workers only (
pg_barrier.py), under thepg_queue_enabledflag — PgBarrier runs only on the PG transport. Flag OFF →CeleryChordBarrier, this code never runs → zero Celery-path regression. The change is additive on the PG happy path (first attempt succeeds, identical to before); the only behavior change is that a previously-fatal transient drop now self-heals.Verification
pg_terminate_backend:with _cursor()path): raises → barrier row not written (0) → execution aborts.reconnecting and retrying→ barrier row written (1) → execution proceeds._cursor(decrement path) raises with no retry → boundary intact.Ref: UN-3651