Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3651 [FIX] PG barrier enqueue — reconnect-retry on stale cached connection#2121

Merged
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3651-pg-barrier-reconnect-retryZipstack/unstract:UN-3651-pg-barrier-reconnect-retryCopy head branch name to clipboard
Jun 29, 2026
Merged

UN-3651 [FIX] PG barrier enqueue — reconnect-retry on stale cached connection#2121
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3651-pg-barrier-reconnect-retryZipstack/unstract:UN-3651-pg-barrier-reconnect-retryCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

A transient psycopg2.OperationalError: server closed the connection unexpectedly during the PgBarrier enqueue (the barrier UPSERT) aborted the whole ETL execution → status ERROR with 0 files created.

Root cause

PgBarrier keeps a thread-local cached Postgres connection reused across barrier ops. While idle between ops it can be reaped server-side (PgBouncer server_idle_timeout / DB failover / TCP teardown), and _get_conn can't tell — conn.closed is a client-side flag only — so the first statement after the idle gap fails. The existing _cursor recovers the dead conn (so the next call reconnects) but does not retry the failed op → the execution aborts.

Fix

A one-shot reconnect-retry (_run_idempotent_write, attempts=2) scoped to the idempotent, pre-dispatch barrier write only — the UPSERT (ON CONFLICT … DO UPDATE → same row/state) + the per-execution dedup reset. Re-running them after an ambiguous commit is a no-op, and no header has been dispatched yet, so a retry can neither duplicate a row nor double-dispatch work.

Idempotency boundary preserved: the non-idempotent decrement (remaining - 1) and claim_batch stay on the plain _cursor (recover-but-don't-retry) — a re-applied decrement could fire the callback early, so it must never auto-retry.

Scope / non-regression

Workers only (pg_barrier.py), under the pg_queue_enabled flag — PgBarrier runs only on the PG transport. Flag OFF → CeleryChordBarrier, this code never runs → zero Celery-path regression. The change is additive on the PG happy path (first attempt succeeds, identical to before); the only behavior change is that a previously-fatal transient drop now self-heals.

Verification

  • Unit: +3 tests (retry-then-succeed, no-retry-on-non-connection-error, reraise-after-exhaust). Full PgBarrier suite 54 passed; ruff clean.
  • Live A/B against real Postgres (rebuilt worker image), reproducing the exact incident error via pg_terminate_backend:
    • WITHOUT fix (old with _cursor() path): raises → barrier row not written (0) → execution aborts.
    • WITH fix: logs reconnecting and retrying → barrier row written (1) → execution proceeds.
    • Control: plain _cursor (decrement path) raises with no retry → boundary intact.

Ref: UN-3651

…nnection

A transient `psycopg2.OperationalError: server closed the connection
unexpectedly` during the PgBarrier enqueue (the barrier UPSERT) aborted the
whole ETL execution -> status ERROR with 0 files. PgBarrier keeps a thread-local
cached connection reused across barrier ops; while idle between ops it can be
reaped server-side (PgBouncer server_idle_timeout / DB failover) and `_get_conn`
can't tell (conn.closed is client-side only), so the first statement after the
idle gap fails. `_cursor` recovered the dead conn but did not retry the op.

Fix: a one-shot reconnect-retry (`_run_idempotent_write`, attempts=2) scoped to
the IDEMPOTENT, pre-dispatch barrier write only — the UPSERT (ON CONFLICT DO
UPDATE -> same row/state) + the per-execution dedup reset. Re-running them after
an ambiguous commit is a no-op and no header has been dispatched yet, so a retry
can neither duplicate a row nor double-dispatch work.

Idempotency boundary preserved: the non-idempotent decrement (remaining - 1) and
claim_batch stay on the plain `_cursor` (recover-but-don't-retry) — a re-applied
decrement could fire the callback early.

Scope: workers only, under the pg_queue_enabled flag (PgBarrier runs only on the
PG transport; flag off -> CeleryChordBarrier, this code never runs -> zero
Celery-path regression).

Tests: +3 unit tests (retry-then-succeed / no-retry-on-non-connection-error /
reraise-after-exhaust); full PgBarrier suite 54 passed; ruff clean. Dev-tested
live (A/B against real Postgres via pg_terminate_backend): old path raises +
loses the write; new path reconnects, retries, and the write lands.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 011f5dde-2527-4cea-860d-7b4e707be43a

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3651-pg-barrier-reconnect-retry

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@greptile-apps

greptile-apps Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes a transient psycopg2.OperationalError: server closed the connection unexpectedly that would abort entire ETL executions when the thread-local cached Postgres connection was reaped server-side between barrier ops. It introduces a single-retry helper (_run_idempotent_pre_dispatch_write) scoped narrowly to the pre-dispatch barrier UPSERT and dedup-reset — operations that are idempotent by construction.

  • Adds _run_idempotent_pre_dispatch_write: one-shot reconnect-retry for idempotent barrier writes, with explicit exclusion of the non-idempotent decrement and claim_batch paths (which stay on the plain recover-but-don't-retry _cursor).
  • The _reset_barrier closure (UPSERT + DELETE) is extracted from enqueue() and routed through the new helper, leaving dispatch logic unchanged.
  • Three new unit tests cover retry-then-succeed, no-retry on non-connection error, and exhaustion re-raise; one integration test pins the end-to-end wiring through a real injected connection.

Confidence Score: 5/5

Safe to merge — the retry is narrowly scoped to idempotent pre-dispatch writes, the non-idempotent decrement path is explicitly left unchanged, and the fix is additive on the happy path.

The change is surgical: a single new helper with a one-shot retry wraps only the barrier UPSERT and dedup-reset, both of which are ON CONFLICT DO UPDATE / DELETE WHERE operations that produce the same final state whether run once or twice. The decrement and claim_batch paths are untouched. Test coverage exercises the retry boundary from unit and integration levels.

No files require special attention.

Important Files Changed

Filename Overview
workers/queue_backend/pg_barrier.py Adds _run_idempotent_pre_dispatch_write with a one-shot retry for stale-connection OperationalError/InterfaceError; extracts _reset_barrier closure from enqueue(); decrement and claim_batch paths are correctly left unchanged.
workers/tests/test_pg_barrier.py Adds TestIdempotentWriteRetry (3 unit tests covering success, non-connection error passthrough, and exhaustion re-raise) plus one end-to-end integration test pinning the enqueue wiring through a real DB connection stub.

Reviews (2): Last reviewed commit: "UN-3651 [FIX] address PR #2121 review — ..." | Re-trigger Greptile

Comment thread workers/queue_backend/pg_barrier.py

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated review via PR Review Toolkit (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).

Verdict: the fix is correct and the idempotency boundary holds. The retry control flow is sound (bounded, re-raises on exhaustion, no silent-loss path), the decrement/claim_batch correctly stay off the retry path, and all the docstring's structural claims (pre-dispatch ordering, plain-_cursor exclusions) check out. Findings below are about observability and test/comment precision, not correctness. None are blocking.

Prioritised:

  • HIGH — retry warning discards the triggering exception and hard-asserts "lost its DB connection" (pg_barrier.py:170)
  • HIGH — the enqueue() self-heal wiring has no test; reverting to the old inline _cursor() would still pass everything (test:173)
  • MEDIUM — catch is broader than stale-connection; retries server-side errors with no backoff (pg_barrier.py:163)
  • MEDIUM — "fire the callback twice" rationale is inaccurate (pg_barrier.py:154); InterfaceError branch untested (test:180); idempotency invariant is docs-only (pg_barrier.py:142)
  • LOWFinal/naming, "same state" overstatement, redundant call-site comment, dead test counters

Comment thread workers/queue_backend/pg_barrier.py Outdated
if attempt >= _BARRIER_WRITE_ATTEMPTS:
raise
logger.warning(
"PgBarrier: %s lost its DB connection (attempt %d/%d); "

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH — silent failure / observability] When the retry succeeds, the original OperationalError/InterfaceError is gone forever — no exc_info, and the exception type/message isn't interpolated. You can't tell a benign PgBouncer idle reap from a recurring statement timeout or deadlock that the retry is silently papering over. The message also hard-asserts "lost its DB connection", which (see line 163) will be a lie for many of the conditions this except actually catches.

Suggested:

except (psycopg2.OperationalError, psycopg2.InterfaceError) as exc:
    if attempt >= _BARRIER_WRITE_ATTEMPTS:
        raise
    logger.warning(
        "PgBarrier: %s — DB write failed (%s: %s); cached connection "
        "was likely stale, reconnecting and retrying (attempt %d/%d)",
        what, type(exc).__name__, exc, attempt, _BARRIER_WRITE_ATTEMPTS,
    )

Comment thread workers/queue_backend/pg_barrier.py Outdated
with _cursor() as cur:
operation(cur)
return
except (psycopg2.OperationalError, psycopg2.InterfaceError):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM — broad catch + no backoff] psycopg2.OperationalError is the base class for many server-side, non-connection conditions: QueryCanceled (statement_timeout), DeadlockDetected/SerializationFailure (via TransactionRollbackError), TooManyConnections, AdminShutdown, etc. All are now caught, mislabeled (line 170), and immediately re-run with zero backoff — which just adds load against an already-struggling DB. Re-running the idempotent UPSERT once is correctness-safe, so this isn't a bug, but consider either (a) narrowing the retry to genuine connection-drop signals (e.g. only when _cursor actually discarded the cached conn), or at minimum (b) excluding TransactionRollbackError so deadlocks aren't blindly hammered. A small fixed sleep before the retry would also widen the self-heal window during a DB failover (the idle-reap case succeeds instantly either way).

Comment thread workers/queue_backend/pg_barrier.py Outdated

Deliberately NOT used for the barrier **decrement** (``remaining =
remaining - 1``): that is not idempotent — a re-applied decrement could drive
``remaining`` to 0 early and fire the callback twice — so it stays on the

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM — comment inaccuracy] "fire the callback twice" isn't the actual failure mode. Decrements each run in their own committed txn, serialized by the row lock, and the callback fires only when a decrement's RETURNING observes remaining == 0. A double-applied decrement skips over 0 (1→0 committed-but-unacked, retry 0→-1 → returns -1 → abandoned branch), so two observations of exactly 0 can't occur. The real hazards — still justifying the exclusion — are firing the callback prematurely with incomplete results, or skipping past 0 and stranding the barrier to expiry. Suggest: "...double-counts: it can drive remaining to 0 early and fire the callback prematurely with incomplete results, or skip past 0 and strand the barrier."

Comment thread workers/queue_backend/pg_barrier.py Outdated
connection was dead.

Restricted to **idempotent** statements run BEFORE any task dispatch — the
barrier UPSERT (``ON CONFLICT … DO UPDATE`` → same row, same state) + the

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[LOW — comment precision] "same row, same state" slightly overstates idempotency: the UPSERT's DO UPDATE re-runs created_at = now() and expires_at = ... (lines 375-376), so timestamps advance on a retry. Harmless here (retries are milliseconds apart, execution_id is the PK so no row duplication), but a future reader relying on created_at for staleness/reaper logic could be surprised. Suggest softening to "→ same row, remaining/results reset identically (timestamps refresh, harmlessly)."

Comment thread workers/queue_backend/pg_barrier.py Outdated
_BARRIER_WRITE_ATTEMPTS = 2


def _run_idempotent_write(operation: Callable[[Any], None], *, what: str) -> None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM — type design] The defining invariant — that operation must be idempotent — lives entirely in the docstring; Callable[[Any], None] would happily accept the non-idempotent decrement. The -> None return is a nice structural guard (it blocks passing a RETURNING-reading op like claim_batch), but idempotency itself is unenforceable in Python's type system. Pragmatic mitigations: keep it private (done) and consider a name that makes the contract impossible to miss at call sites, e.g. _run_idempotent_pre_dispatch_write.

Also minor: the cursor is typed Any here, but the module already imports connection as PgConnection under TYPE_CHECKING — adding cursor as PgCursor and typing Callable[[PgCursor], None] would tighten intent at zero runtime cost (worth doing file-wide for consistency).

Comment thread workers/queue_backend/pg_barrier.py Outdated
# dead conn, so a single retry runs against a freshly reconnected one. This
# turns a transient blip (which previously aborted the whole execution at
# barrier enqueue) into a self-heal.
_BARRIER_WRITE_ATTEMPTS = 2

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[LOW — polish] _BARRIER_WRITE_ATTEMPTS = 2 means "total attempts", so 2 = "one retry" — a name/value relationship only the docstring disambiguates. Suggest _BARRIER_WRITE_ATTEMPTS: Final = 2 # total attempts: 1 initial + 1 retry. Keeping it a literal (not env-driven) is the right call — it keeps the idempotency bound from being weakened operationally.

Comment thread workers/queue_backend/pg_barrier.py Outdated

# Both statements are idempotent and run BEFORE any header dispatch,
# so a one-shot retry on a stale cached connection can't duplicate
# state or double-dispatch — see _run_idempotent_write.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[LOW — redundancy] This 3-line comment restates the _run_idempotent_write docstring it points to (directly above the call). It's the kind of duplicated rationale that rots independently of the source of truth. Suggest trimming to a one-liner pointer, e.g. # Idempotent + pre-dispatch → safe to retry; see _run_idempotent_write.

pg_barrier._local.conn = None


class TestIdempotentWriteRetry:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH — test gap] All three tests exercise _run_idempotent_write in isolation with a no-op operation — none go through the actual enqueue() path the fix protects. Concrete regression this misses: reverting enqueue to the old inline with _cursor() as cur: (the exact pre-fix code) leaves _run_idempotent_write as dead code and silently removes the self-heal, yet every test here still passes. Suggest an integration test using the real barrier_db fixture: set pg_barrier._local.conn to a stub whose first .execute raises OperationalError, monkeypatch create_pg_connection to return the real test connection, call PgBarrier().enqueue(...), then assert (a) the pg_barrier_state row exists exactly once with remaining == N, and (b) header dispatch was called exactly N times. This pins both the self-heal and the "headers still dispatched, no double-dispatch" idempotency claim end-to-end.

never silently swallow a real (non-connection) failure.
"""

def test_retries_once_on_dead_connection_then_succeeds(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM — test gap] Both except clauses (this fix at pg_barrier.py:163 and _cursor at :119) catch (OperationalError, InterfaceError), but all three tests raise only OperationalError. InterfaceError ("connection already closed") is arguably the more common psycopg2 symptom of a stale cached socket — exactly this PR's target scenario. Narrowing either clause to drop InterfaceError would break the self-heal with no failing test. Suggest parametrizing this test over [psycopg2.OperationalError("server closed..."), psycopg2.InterfaceError("connection already closed")]. While here, also asserting dead.commits == 0 (no partial commit on the failed attempt) and a caplog warning would lock the "log on retry, not on success" contract.

self._execute_error = execute_error
self.commits = 0
self.rollbacks = 0
self.executes = 0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[LOW — dead test scaffolding] _FakeConn increments self.rollbacks (line 159) and self.executes (line 149) but neither counter is ever asserted anywhere — test 1 counts invocations via its own attempts list. The rollback() method must stay (called by _cursor's recovery path), but the two counters are dead state. Either drop them, or better, assert on them (e.g. dead.executes == 1, healthy.executes == 1) to tightly bound the retry to exactly one extra attempt. Also closed is bool here but real psycopg2 connection.closed is an int (0/1/2) — fine for the truthiness check, minor fidelity drift.

…g, tests

Review feedback (greptile + PR-review-toolkit) — all non-blocking; fix was
confirmed correct. Changes:

- Retry log now names the real error (type + message) and keeps the traceback
  (exc_info=True); reworded so it no longer asserts a connection drop the broad
  psycopg2 catch can't be sure of. (HIGH — observability)
- Add a small fixed backoff (0.5s) before the retry — widens the self-heal
  window on a brief failover and avoids immediately re-hammering a struggling DB
  on the server-side errors the broad OperationalError catch also covers. (MED)
- Rename _run_idempotent_write -> _run_idempotent_pre_dispatch_write and type the
  op as Callable[[PgCursor], None] so the idempotent/pre-dispatch contract is
  loud at the call site (it can't be type-enforced). (MED — type design)
- _BARRIER_WRITE_ATTEMPTS: Final = 2  # total attempts; doc/comment precision:
  decrement hazard reworded (premature/incomplete callback or strand, not
  "fire twice"), "same state" softened (timestamps refresh harmlessly), trimmed
  the redundant call-site comment to a one-line pointer. (LOW)
- Tests: add an end-to-end self-heal test through enqueue() against the real DB
  (asserts the row lands once + every header dispatched exactly once — pins the
  wiring a no-DB test can't); parametrize the retry test over OperationalError
  AND InterfaceError; assert no partial commit (commits==0), exactly one extra
  attempt (executes counters), and the on-retry warning via caplog. (HIGH + MED)

Full PgBarrier suite: 56 passed; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Thanks for the thorough review 🙏 — all findings addressed in b68e7e6fc (none were blocking; the fix itself was confirmed correct). Summary:

Observability (HIGH + greptile P2)

  • Retry warning now captures the exception, names the real error (type(exc).__name__: exc), and keeps the traceback (exc_info=True). Reworded so it no longer asserts "lost its DB connection" — the broad OperationalError catch can't be sure of that.

Broad catch / no backoff (MEDIUM)

  • Added a small fixed backoff (_BARRIER_RETRY_BACKOFF_SECONDS = 0.5) before the retry — widens the self-heal window on a brief failover and avoids immediately re-hammering a struggling DB on the server-side conditions the catch also covers.
  • Kept the broad (OperationalError, InterfaceError) catch deliberately: a single retry of an idempotent, pre-dispatch write is correctness-safe for all of them, and narrowing risks dropping a genuine stale-socket signal (esp. InterfaceError). The accurate log + backoff cover the concern.

Type design / contract (MEDIUM)

  • Renamed _run_idempotent_write_run_idempotent_pre_dispatch_write and typed the op Callable[[PgCursor], None], so the idempotent + pre-dispatch contract is loud at the call site (it can't be type-enforced; the -> None already blocks a RETURNING-reading op).

Comment precision (MEDIUM + LOW)

  • Decrement hazard reworded: premature/incomplete-results callback, or skip-past-0 and strand the barrier (not "fire twice").
  • Softened "same row, same state" → remaining/results reset identically, timestamps refresh harmlessly.
  • _BARRIER_WRITE_ATTEMPTS: Final = 2 # total attempts: 1 initial + 1 retry.
  • Trimmed the redundant call-site comment to a one-line pointer.

Tests (HIGH + MEDIUM + LOW)

  • Added an end-to-end self-heal test through enqueue() against the real DB: first write hits a dead cached conn → reconnects → asserts the pg_barrier_state row lands exactly once with remaining == N and every header dispatched exactly once. This pins the wiring — reverting enqueue() to the inline with _cursor() now fails (the no-DB tests alone wouldn't catch that).
  • Parametrized the retry test over OperationalError and InterfaceError.
  • Now assert no partial commit (commits == 0), exactly one extra attempt (executes counters), and the on-retry warning via caplog.

Full PgBarrier suite: 56 passed; ruff clean. SonarCloud gate green.

@muhammad-ali-e muhammad-ali-e merged commit 2159ffd into feat/UN-3445-pg-queue-integration Jun 29, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3651-pg-barrier-reconnect-retry branch June 29, 2026 10:52
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.