Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3659 [FIX] PG result backend — reconnect-retry on stale store_result connection#2129

Merged
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3659-pg-result-backend-reconnect-retryZipstack/unstract:UN-3659-pg-result-backend-reconnect-retryCopy head branch name to clipboard
Jun 30, 2026
Merged

UN-3659 [FIX] PG result backend — reconnect-retry on stale store_result connection#2129
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3659-pg-result-backend-reconnect-retryZipstack/unstract:UN-3659-pg-result-backend-reconnect-retryCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

PgResultBackend.store_result() now survives a connection that PgBouncer reaped while the executor sat idle — a one-shot reconnect-retry. Sibling of UN-3654.

Why

A PG execution hung permanently (exec b11ba2f3, ali dev) after the worker was idle ~56 min (flag off → enabled + run, past PgBouncer server_idle_timeout). The logs showed every file extracted text and answered 4/4 prompts successfully, then:

PgResultBackend: rollback failed; treating connection as dead
psycopg2.OperationalError: server closed the connection unexpectedly
FAILED to store request-reply result (...) — acking anyway; caller will time out

The executor consumer caches one PgResultBackend for its lifetime, so its write connection is long-lived and idle-reapable. _cursor discards a dead connection so the next call reconnects, but store_result()'s current call failed with no retry → the result was dropped → the blocking file-processing callers waited forever, got SIGKILLed, and the execution was orphaned in EXECUTING. UN-3654 added the retry to dispatch send() and the consumer's claim connection but not to this result-write path.

How

store_result() retries once on OperationalError/InterfaceError. Unconditional (no reused-vs-fresh guard, unlike send()) because the write is INSERT … ON CONFLICT (task_id) DO NOTHING — re-running can't duplicate or clobber a recorded result. _cursor already drops the dead conn, so the retry reconnects and writes the result, unblocking the caller.

Scope (audited the other PG paths)

  • store_resultfixed (long-lived cached conn).
  • get_result/wait_for_resultsafe, left as-is: its only caller (executor_rpc.wait_for_result) makes a fresh PgResultBackend per wait and polls every ~0.2–2s, so no idle window. (Documented inline.)
  • Consumer read/delete + periodic leader/reaper/scheduler loops — self-heal next cycle.
  • Related follow-up (separate ticket): the PgBarrier decrement / claim_batch (cached thread-local conn) has the same idle-reap risk and was deliberately left non-retried by UN-3651 (non-idempotent); UN-3654's reused-guard would resolve it. Not the trigger here.

Tests

  • Dev-test: real pg_terminate_backend A/B — bare INSERT fails on the reaped conn with the exact server closed the connection that stranded b11ba2f3; store_result() self-heals and the row is verified in the DB.
  • 6 unit tests: retry+succeed, both Operational/Interface error types, backoff fires once, retry-also-fails raises after exactly one reconnect, non-connection error not retried. Full file: 13 passed.

Op note

b11ba2f3 won't self-recover (no execution-level reaper for stuck EXECUTING) — mark it FAILED on the primary or re-run.

…lt connection

A PG execution hung permanently (exec b11ba2f3) after the worker sat idle ~56 min
(flag off, then enabled + run — past PgBouncer server_idle_timeout). All files
extracted + answered prompts fine, then the executor's PgResultBackend.store_result()
INSERT hit a connection PgBouncer had silently reaped -> 'server closed the
connection unexpectedly' -> result dropped (consumer acked anyway) -> the blocking
file-processing callers waited forever and were SIGKILLed -> execution orphaned in
EXECUTING.

The executor consumer caches one PgResultBackend for its lifetime, so its write
connection is long-lived and idle-reapable. _cursor discards a dead connection so
the NEXT call reconnects, but store_result()'s CURRENT call failed with no retry.
This is the sibling of UN-3654, which added the reconnect-retry to dispatch send()
and the consumer claim connection but NOT to this result-write path.

Fix: store_result() now retries once on a connection-level error. Safe to retry
unconditionally (no reused-vs-fresh guard, unlike send()) because the write is
INSERT ... ON CONFLICT (task_id) DO NOTHING — re-running can't duplicate or clobber.
_cursor already drops the dead conn, so the retry reconnects and writes the result,
unblocking the caller. get_result is left as-is (its only caller makes a fresh
PgResultBackend per wait and polls frequently — no idle window).

PG transport only. Dev-tested with a real pg_terminate_backend A/B (bare INSERT
fails on the reaped conn; store_result self-heals + the row lands). 6 unit tests
(retry+succeed, both Operational/Interface error types, backoff fires, retry-also-
fails raises once, non-connection error not retried).

Related follow-up: the PgBarrier decrement/claim_batch has the same idle-reap risk
(left non-retried by UN-3651) — separate ticket.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ecf20294-5187-4844-8409-95637324cc85

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3659-pg-result-backend-reconnect-retry

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@greptile-apps

greptile-apps Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds a one-shot reconnect-retry to PgResultBackend.store_result() so that a PgBouncer-reaped idle connection no longer silently drops a result and strands the blocking executor caller forever.

  • A new _store_with_reconnect helper wraps the idempotent INSERT … ON CONFLICT DO NOTHING write, catching psycopg2.OperationalError/InterfaceError on attempt 1 and re-issuing the write on a fresh connection only when the backend owns its connection (injected/test connections are re-raised immediately).
  • Six focused unit tests cover the retry-succeeds, both error types, backoff fires once, retry-also-fails, injected-conn-not-retried, and non-connection-error-not-retried cases; a DB-gated integration test validates the real reconnect path against a live Postgres connection.

Confidence Score: 5/5

The change is safe to merge — it adds a tightly-scoped one-shot retry to a path whose idempotency guarantee makes re-execution harmless, and the injected-connection short-circuit preserves existing test isolation.

The retry logic is correct: _cursor nulls self._conn before re-raising, so the retry always gets a fresh connection; the ON CONFLICT DO NOTHING SQL makes the unconditional retry safe; the not self._owns_conn guard prevents spurious retries in tests. The 0.5s backoff adds negligible latency on the slow path. No regressions are introduced to existing paths (get_result, wait_for_result, injected-connection callers).

No files require special attention.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/result_backend.py Adds _store_with_reconnect with a one-shot retry on dead-connection errors (owned connections only). Logic is correct: _cursor nulls self._conn before re-raising, so the retry creates a fresh connection via self.conn; idempotent SQL makes unconditional retry safe; injected connections short-circuit immediately.
workers/tests/test_pg_result_backend.py Six new unit tests cover every retry branch (both error types, backoff, exhaust, injected-conn, non-connection error) plus a DB-gated integration test that validates a real conn.close() → reconnect → INSERT cycle. Mock setup correctly simulates _CursorCtx and conn.cursor() context manager contract.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant C as Consumer (cached backend)
    participant SWR as _store_with_reconnect
    participant CUR as _cursor (attempt 1)
    participant PgB as PgBouncer / Postgres
    participant CUR2 as _cursor (retry)

    C->>SWR: store_result(task_id, result)
    SWR->>CUR: with self._cursor() as cur
    CUR->>PgB: conn.cursor() / execute / commit
    PgB-->>CUR: OperationalError (idle reap)
    CUR->>CUR: rollback (may also fail)
    CUR->>CUR: "self._conn = None (owned conn)"
    CUR-->>SWR: re-raise OperationalError

    alt "attempt < _STORE_RETRY_ATTEMPTS AND owns_conn"
        SWR->>SWR: log warning + time.sleep(0.5s)
        SWR->>CUR2: with self._cursor() as cur (self._conn is None)
        CUR2->>PgB: create_pg_connection() - fresh conn
        CUR2->>PgB: execute INSERT ON CONFLICT DO NOTHING
        PgB-->>CUR2: ok
        CUR2->>PgB: commit
        CUR2-->>SWR: success
        SWR-->>C: return (result written)
    else last attempt OR injected conn
        SWR-->>C: re-raise OperationalError
    end
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant C as Consumer (cached backend)
    participant SWR as _store_with_reconnect
    participant CUR as _cursor (attempt 1)
    participant PgB as PgBouncer / Postgres
    participant CUR2 as _cursor (retry)

    C->>SWR: store_result(task_id, result)
    SWR->>CUR: with self._cursor() as cur
    CUR->>PgB: conn.cursor() / execute / commit
    PgB-->>CUR: OperationalError (idle reap)
    CUR->>CUR: rollback (may also fail)
    CUR->>CUR: "self._conn = None (owned conn)"
    CUR-->>SWR: re-raise OperationalError

    alt "attempt < _STORE_RETRY_ATTEMPTS AND owns_conn"
        SWR->>SWR: log warning + time.sleep(0.5s)
        SWR->>CUR2: with self._cursor() as cur (self._conn is None)
        CUR2->>PgB: create_pg_connection() - fresh conn
        CUR2->>PgB: execute INSERT ON CONFLICT DO NOTHING
        PgB-->>CUR2: ok
        CUR2->>PgB: commit
        CUR2-->>SWR: success
        SWR-->>C: return (result written)
    else last attempt OR injected conn
        SWR-->>C: re-raise OperationalError
    end
Loading

Reviews (2): Last reviewed commit: "UN-3659 [FIX] address PR #2129 review — ..." | Re-trigger Greptile

Comment thread workers/queue_backend/pg_queue/result_backend.py Outdated
Comment thread workers/queue_backend/pg_queue/result_backend.py

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).

The fix is correct and well-reasoned — the idempotency argument for the unconditional retry holds, the one-shot bound is right, and propagation/logging are sound. No functional bugs found. The inline notes below are consistency, error-specificity, and test-coverage improvements; none are merge-blockers except optionally the first (drift parity with the sibling PgQueueClient that this PR explicitly models itself on). Two findings overlap existing greptile comments at lines 142 and 175 and were intentionally omitted.

with self._cursor() as cur:
operation(cur)
return
except (psycopg2.OperationalError, psycopg2.InterfaceError) as exc:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — drift hazard + over-broad catch. The connection-dead tuple (psycopg2.OperationalError, psycopg2.InterfaceError) is now inlined in two coupled sites: here and in _cursor (isinstance(exc, (...)), line 122-124). They must stay in lockstep — narrowing one without the other silently breaks the retry's "was this a connection death?" test. The sibling client.py (UN-3654) deliberately hoisted this into a named _CONN_DEAD_ERRORS: Final[tuple[...]] = (...) constant precisely to prevent this, with a comment to that effect. This PR re-introduces the duplication the sibling removed. Suggest mirroring it:

_CONN_DEAD_ERRORS: Final[tuple[type[Exception], ...]] = (
    psycopg2.OperationalError, psycopg2.InterfaceError,
)

then except _CONN_DEAD_ERRORS as exc: here and isinstance(exc, _CONN_DEAD_ERRORS) in _cursor.

Secondary (P3): OperationalError is also the base class for deadlock/serialization (class 40), statement-timeout/cancel, and disk-full/too-many-connections (class 53). Those will trip this retry + the 0.5s sleep and tear down a healthy cached connection, while being logged as "stale idle reap or DB unavailable" — a misleading label for a deadlock. Idempotency keeps it correctness-safe, but consider either softening the warning wording to describe what was observed rather than an inferred cause, or gating the retry on exc.pgcode SQLSTATE class 08 (plus InterfaceError, which has no pgcode).

``INSERT … ON CONFLICT (task_id) DO NOTHING``, so re-running after an
ambiguous failure can neither duplicate nor clobber a recorded result.
"""
for attempt in range(1, _STORE_RETRY_ATTEMPTS + 1):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 — retry is a silent no-op for injected (non-owned) connections. _cursor only discards the dead handle when self._owns_conn (line 136). For an injected connection (_owns_conn=False), self._conn is not cleared on a connection-level error, so attempt 2 re-acquires the same dead handle via self.conn, burns the 0.5s backoff, fails identically, and re-raises — the retry buys nothing. Production is safe (the executor consumer always constructs PgResultBackend() with _owns_conn=True), but the asymmetry is undocumented. Either short-circuit if not self._owns_conn to raise immediately without the spurious sleep, or add a one-line note that the retry only heals owned connections. (Relatedly, the docstring's "_cursor has already dropped the handle" at line 152-153 is only true for owned connections — worth a half-clause clarifying that.)

``result`` is the decoded JSONB dict (psycopg2 parses ``jsonb`` to a
Python ``dict``); ``None`` means the task has not finished yet.

No reconnect-retry here (unlike :meth:`store_result`): the only caller,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 — comment-rot trap. This says the retry is omitted because "the only caller, executor_rpc.wait_for_result, makes a FRESH PgResultBackend per wait." Two precisions: (1) get_result's direct caller is PgResultBackend.wait_for_result (same class); executor_rpc.wait_for_result is the reachable entry point two hops up. (2) This is a load-bearing "only caller" safety assertion — if someone later adds a one-shot get_result lookup on a long-lived backend, the rationale silently goes stale. Suggest rewording to the invariant it actually depends on, e.g. "the sole entry point creates a fresh backend per wait and polls it every ~0.2–2s, so the connection is new and kept warm." The 0.2–2s figures and "fresh per wait" themselves check out.

return False


class TestStoreResultReconnectRetry:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — no CI test proves the reconnect actually heals against real PG. Every test here drives failure through cur.execute.side_effect against a fully mocked conn/cursor/create_pg_connection, asserting orchestration (close/factory/execute/commit were called). If _cursor's discard stopped nulling self._conn, or the real reconnect produced an unusable handle, these would still pass. The genuine heal is only covered by the manual pg_terminate_backend dev-test, which is not in CI. Consider one DB-gated test (no pg_terminate_backend needed — a client-side close() makes psycopg2 raise InterfaceError on next use):

rb = PgResultBackend()          # owned, real factory
rb.conn                          # materialize
rb._conn.close()                 # next use -> InterfaceError
rb.store_result(k, result={"healed": True})   # must self-heal
# then assert the row landed via a fresh backend's get_result

This is the test that would actually fail if the reconnect were broken.

Comment thread workers/tests/test_pg_result_backend.py Outdated
monkeypatch.setattr("queue_backend.pg_queue.result_backend.time.sleep", sleep)
return sleep

def test_stale_conn_retries_and_writes_result(self, monkeypatch):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 — two retry-path gaps worth closing. (1) Failed-status branch is never retried: all six tests call store_result(..., result={...}) (completed branch); the result is None -> STATUS_FAILED path is only covered by the no-retry real-PG round-trip. Since the PR's whole motivation is "a failed task's result silently dropped -> caller stranded," delivering the failed outcome through a stale connection deserves coverage — easily added by parametrizing this test over {"result": {...}} and {"error": "boom"}. (2) Commit-time reap untested: _cursor commits after the yield, and an idle reap typically surfaces when the buffered INSERT flushes at conn.commit(), not at execute. A variant with conn.commit.side_effect = OperationalError(...) on attempt 1 (fresh conn succeeds) would cover the most realistic failure point.

@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Thanks — all addressed in the latest commit.

P2 — shared _CONN_DEAD_ERRORS (parity with the sibling). Extracted _CONN_DEAD_ERRORS: Final[tuple[...]] = (OperationalError, InterfaceError) and used it in both _cursor and _store_with_reconnect — exactly the named constant PgQueueClient (UN-3654) introduced. The duplication this PR re-introduced is gone.

P2 — typing. operation: Callable[[Any], None].

P2 — over-broad catch / misleading label. Kept the catch (idempotency makes it correctness-safe) but reworded the WARNING to describe the observation ("a connection-level error") rather than asserting "stale idle reap" — since the same classes cover deadlock/timeout/admin-shutdown. Dropped the duplicate inline %s: %s; exc_info=True alone carries type+message (also resolves greptile's double-log note at line 175).

P3 — injected (non-owned) connection. Good catch on the silent no-op. Now short-circuits: the retry only runs for owned connections (since _cursor only clears the handle when _owns_conn); an injected conn re-raises immediately with no spurious backoff. Documented + a test_injected_connection_not_retried test.

P3 — get_result comment-rot. Reworded to the actual invariant (the sole wait entry point builds a fresh backend per wait and polls it ~0.2–2s → connection stays warm), and fixed the caller precision.

P2 — no real-PG heal test. Added test_real_reconnect_heals_closed_connection (DB-gated): owned backend, conn.close()InterfaceError on next use → store_result must self-heal → the row is read back on a separate connection. This is the one that fails if the reconnect were actually broken.

P3 — test gaps. Parametrized the retry over completed/failed (the failed result was the whole point — silently dropped → caller stranded), and added a commit-time reap test (conn.commit.side_effect on attempt 1, since an idle reap usually surfaces at flush, not execute).

17 tests green (incl. the DB-gated heal); pre-commit clean.

…ing, owned-only retry, real-PG test

- extract _CONN_DEAD_ERRORS shared by _cursor + _store_with_reconnect (parity
  with the sibling PgQueueClient UN-3654; stops the two sites drifting)
- type operation as Callable[[Any], None] (was Any)
- retry only OWNED connections: _cursor clears a dead handle only when
  _owns_conn, so an injected conn can't be reconnected — short-circuit re-raise
  (no spurious 0.5s sleep). Production is always owned.
- WARNING describes the observation, not an inferred cause; exc_info carries
  type+message, no duplicate inline log
- get_result docstring: anchor 'no retry needed' to the real invariant
- tests: parametrize retry over completed/failed, commit-time-reap retry,
  injected-not-retried, and a DB-gated REAL reconnect test (close -> heal -> row)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit c214bf2 into feat/UN-3445-pg-queue-integration Jun 30, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3659-pg-result-backend-reconnect-retry branch June 30, 2026 10:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.