UN-3659 [FIX] PG result backend — reconnect-retry on stale store_result connection#2129
UN-3659 [FIX] PG result backend — reconnect-retry on stale store_result connection#2129muhammad-ali-e merged 2 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3659-pg-result-backend-reconnect-retryZipstack/unstract:UN-3659-pg-result-backend-reconnect-retryCopy head branch name to clipboard
Conversation
…lt connection A PG execution hung permanently (exec b11ba2f3) after the worker sat idle ~56 min (flag off, then enabled + run — past PgBouncer server_idle_timeout). All files extracted + answered prompts fine, then the executor's PgResultBackend.store_result() INSERT hit a connection PgBouncer had silently reaped -> 'server closed the connection unexpectedly' -> result dropped (consumer acked anyway) -> the blocking file-processing callers waited forever and were SIGKILLed -> execution orphaned in EXECUTING. The executor consumer caches one PgResultBackend for its lifetime, so its write connection is long-lived and idle-reapable. _cursor discards a dead connection so the NEXT call reconnects, but store_result()'s CURRENT call failed with no retry. This is the sibling of UN-3654, which added the reconnect-retry to dispatch send() and the consumer claim connection but NOT to this result-write path. Fix: store_result() now retries once on a connection-level error. Safe to retry unconditionally (no reused-vs-fresh guard, unlike send()) because the write is INSERT ... ON CONFLICT (task_id) DO NOTHING — re-running can't duplicate or clobber. _cursor already drops the dead conn, so the retry reconnects and writes the result, unblocking the caller. get_result is left as-is (its only caller makes a fresh PgResultBackend per wait and polls frequently — no idle window). PG transport only. Dev-tested with a real pg_terminate_backend A/B (bare INSERT fails on the reaped conn; store_result self-heals + the row lands). 6 unit tests (retry+succeed, both Operational/Interface error types, backoff fires, retry-also- fails raises once, non-connection error not retried). Related follow-up: the PgBarrier decrement/claim_batch has the same idle-reap risk (left non-retried by UN-3651) — separate ticket. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).
The fix is correct and well-reasoned — the idempotency argument for the unconditional retry holds, the one-shot bound is right, and propagation/logging are sound. No functional bugs found. The inline notes below are consistency, error-specificity, and test-coverage improvements; none are merge-blockers except optionally the first (drift parity with the sibling PgQueueClient that this PR explicitly models itself on). Two findings overlap existing greptile comments at lines 142 and 175 and were intentionally omitted.
| with self._cursor() as cur: | ||
| operation(cur) | ||
| return | ||
| except (psycopg2.OperationalError, psycopg2.InterfaceError) as exc: |
There was a problem hiding this comment.
P2 — drift hazard + over-broad catch. The connection-dead tuple (psycopg2.OperationalError, psycopg2.InterfaceError) is now inlined in two coupled sites: here and in _cursor (isinstance(exc, (...)), line 122-124). They must stay in lockstep — narrowing one without the other silently breaks the retry's "was this a connection death?" test. The sibling client.py (UN-3654) deliberately hoisted this into a named _CONN_DEAD_ERRORS: Final[tuple[...]] = (...) constant precisely to prevent this, with a comment to that effect. This PR re-introduces the duplication the sibling removed. Suggest mirroring it:
_CONN_DEAD_ERRORS: Final[tuple[type[Exception], ...]] = (
psycopg2.OperationalError, psycopg2.InterfaceError,
)then except _CONN_DEAD_ERRORS as exc: here and isinstance(exc, _CONN_DEAD_ERRORS) in _cursor.
Secondary (P3): OperationalError is also the base class for deadlock/serialization (class 40), statement-timeout/cancel, and disk-full/too-many-connections (class 53). Those will trip this retry + the 0.5s sleep and tear down a healthy cached connection, while being logged as "stale idle reap or DB unavailable" — a misleading label for a deadlock. Idempotency keeps it correctness-safe, but consider either softening the warning wording to describe what was observed rather than an inferred cause, or gating the retry on exc.pgcode SQLSTATE class 08 (plus InterfaceError, which has no pgcode).
| ``INSERT … ON CONFLICT (task_id) DO NOTHING``, so re-running after an | ||
| ambiguous failure can neither duplicate nor clobber a recorded result. | ||
| """ | ||
| for attempt in range(1, _STORE_RETRY_ATTEMPTS + 1): |
There was a problem hiding this comment.
P3 — retry is a silent no-op for injected (non-owned) connections. _cursor only discards the dead handle when self._owns_conn (line 136). For an injected connection (_owns_conn=False), self._conn is not cleared on a connection-level error, so attempt 2 re-acquires the same dead handle via self.conn, burns the 0.5s backoff, fails identically, and re-raises — the retry buys nothing. Production is safe (the executor consumer always constructs PgResultBackend() with _owns_conn=True), but the asymmetry is undocumented. Either short-circuit if not self._owns_conn to raise immediately without the spurious sleep, or add a one-line note that the retry only heals owned connections. (Relatedly, the docstring's "_cursor has already dropped the handle" at line 152-153 is only true for owned connections — worth a half-clause clarifying that.)
| ``result`` is the decoded JSONB dict (psycopg2 parses ``jsonb`` to a | ||
| Python ``dict``); ``None`` means the task has not finished yet. | ||
|
|
||
| No reconnect-retry here (unlike :meth:`store_result`): the only caller, |
There was a problem hiding this comment.
P3 — comment-rot trap. This says the retry is omitted because "the only caller, executor_rpc.wait_for_result, makes a FRESH PgResultBackend per wait." Two precisions: (1) get_result's direct caller is PgResultBackend.wait_for_result (same class); executor_rpc.wait_for_result is the reachable entry point two hops up. (2) This is a load-bearing "only caller" safety assertion — if someone later adds a one-shot get_result lookup on a long-lived backend, the rationale silently goes stale. Suggest rewording to the invariant it actually depends on, e.g. "the sole entry point creates a fresh backend per wait and polls it every ~0.2–2s, so the connection is new and kept warm." The 0.2–2s figures and "fresh per wait" themselves check out.
| return False | ||
|
|
||
|
|
||
| class TestStoreResultReconnectRetry: |
There was a problem hiding this comment.
P2 — no CI test proves the reconnect actually heals against real PG. Every test here drives failure through cur.execute.side_effect against a fully mocked conn/cursor/create_pg_connection, asserting orchestration (close/factory/execute/commit were called). If _cursor's discard stopped nulling self._conn, or the real reconnect produced an unusable handle, these would still pass. The genuine heal is only covered by the manual pg_terminate_backend dev-test, which is not in CI. Consider one DB-gated test (no pg_terminate_backend needed — a client-side close() makes psycopg2 raise InterfaceError on next use):
rb = PgResultBackend() # owned, real factory
rb.conn # materialize
rb._conn.close() # next use -> InterfaceError
rb.store_result(k, result={"healed": True}) # must self-heal
# then assert the row landed via a fresh backend's get_resultThis is the test that would actually fail if the reconnect were broken.
| monkeypatch.setattr("queue_backend.pg_queue.result_backend.time.sleep", sleep) | ||
| return sleep | ||
|
|
||
| def test_stale_conn_retries_and_writes_result(self, monkeypatch): |
There was a problem hiding this comment.
P3 — two retry-path gaps worth closing. (1) Failed-status branch is never retried: all six tests call store_result(..., result={...}) (completed branch); the result is None -> STATUS_FAILED path is only covered by the no-retry real-PG round-trip. Since the PR's whole motivation is "a failed task's result silently dropped -> caller stranded," delivering the failed outcome through a stale connection deserves coverage — easily added by parametrizing this test over {"result": {...}} and {"error": "boom"}. (2) Commit-time reap untested: _cursor commits after the yield, and an idle reap typically surfaces when the buffered INSERT flushes at conn.commit(), not at execute. A variant with conn.commit.side_effect = OperationalError(...) on attempt 1 (fresh conn succeeds) would cover the most realistic failure point.
|
Thanks — all addressed in the latest commit. P2 — shared P2 — typing. P2 — over-broad catch / misleading label. Kept the catch (idempotency makes it correctness-safe) but reworded the WARNING to describe the observation ("a connection-level error") rather than asserting "stale idle reap" — since the same classes cover deadlock/timeout/admin-shutdown. Dropped the duplicate inline P3 — injected (non-owned) connection. Good catch on the silent no-op. Now short-circuits: the retry only runs for owned connections (since P3 — get_result comment-rot. Reworded to the actual invariant (the sole wait entry point builds a fresh backend per wait and polls it ~0.2–2s → connection stays warm), and fixed the caller precision. P2 — no real-PG heal test. Added P3 — test gaps. Parametrized the retry over completed/failed (the failed result was the whole point — silently dropped → caller stranded), and added a commit-time reap test ( 17 tests green (incl. the DB-gated heal); pre-commit clean. |
…ing, owned-only retry, real-PG test - extract _CONN_DEAD_ERRORS shared by _cursor + _store_with_reconnect (parity with the sibling PgQueueClient UN-3654; stops the two sites drifting) - type operation as Callable[[Any], None] (was Any) - retry only OWNED connections: _cursor clears a dead handle only when _owns_conn, so an injected conn can't be reconnected — short-circuit re-raise (no spurious 0.5s sleep). Production is always owned. - WARNING describes the observation, not an inferred cause; exc_info carries type+message, no duplicate inline log - get_result docstring: anchor 'no retry needed' to the real invariant - tests: parametrize retry over completed/failed, commit-time-reap retry, injected-not-retried, and a DB-gated REAL reconnect test (close -> heal -> row) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
c214bf2
into
feat/UN-3445-pg-queue-integration
What
PgResultBackend.store_result()now survives a connection that PgBouncer reaped while the executor sat idle — a one-shot reconnect-retry. Sibling of UN-3654.Why
A PG execution hung permanently (exec
b11ba2f3, ali dev) after the worker was idle ~56 min (flag off → enabled + run, past PgBouncerserver_idle_timeout). The logs showed every file extracted text and answered 4/4 prompts successfully, then:The executor consumer caches one
PgResultBackendfor its lifetime, so its write connection is long-lived and idle-reapable._cursordiscards a dead connection so the next call reconnects, butstore_result()'s current call failed with no retry → the result was dropped → the blocking file-processing callers waited forever, got SIGKILLed, and the execution was orphaned inEXECUTING. UN-3654 added the retry to dispatchsend()and the consumer's claim connection but not to this result-write path.How
store_result()retries once onOperationalError/InterfaceError. Unconditional (no reused-vs-fresh guard, unlikesend()) because the write isINSERT … ON CONFLICT (task_id) DO NOTHING— re-running can't duplicate or clobber a recorded result._cursoralready drops the dead conn, so the retry reconnects and writes the result, unblocking the caller.Scope (audited the other PG paths)
store_result— fixed (long-lived cached conn).get_result/wait_for_result— safe, left as-is: its only caller (executor_rpc.wait_for_result) makes a freshPgResultBackendper wait and polls every ~0.2–2s, so no idle window. (Documented inline.)read/delete+ periodic leader/reaper/scheduler loops — self-heal next cycle.PgBarrierdecrement /claim_batch(cached thread-local conn) has the same idle-reap risk and was deliberately left non-retried by UN-3651 (non-idempotent); UN-3654's reused-guard would resolve it. Not the trigger here.Tests
pg_terminate_backendA/B — bare INSERT fails on the reaped conn with the exactserver closed the connectionthat strandedb11ba2f3;store_result()self-heals and the row is verified in the DB.Operational/Interfaceerror types, backoff fires once, retry-also-fails raises after exactly one reconnect, non-connection error not retried. Full file: 13 passed.Op note
b11ba2f3won't self-recover (no execution-level reaper for stuckEXECUTING) — mark it FAILED on the primary or re-run.