UN-3654 [FIX] PG queue — reconnect-retry on stale dispatch connection (send())#2124
UN-3654 [FIX] PG queue — reconnect-retry on stale dispatch connection (send())#2124muhammad-ali-e merged 2 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom UN-3654-pg-dispatch-reconnect-retryZipstack/unstract:UN-3654-pg-dispatch-reconnect-retryCopy head branch name to clipboard
Conversation
… (send()) A PG-transport execution aborted with "server closed the connection unexpectedly" at barrier header dispatch ~30 min after the prior run on the same worker thread: PgBouncer server_idle_timeout reaped the cached dispatch connection while idle. UN-3651's reconnect-retry covers only the barrier's idempotent pre-dispatch write; the next call — PgQueueClient.send() INSERT into pg_queue_message — used a separate cached connection with no reconnect logic, so it failed and aborted the execution. send()'s INSERT is not idempotent (auto msg_id), so a blind retry could double-enqueue. The one-shot reconnect-retry therefore fires ONLY when the failing connection was reused (cached + owned): a reused conn that dies on its first statement was reaped while idle, so the INSERT never reached the server and re-inserting can't duplicate. A fresh connection failing re-raises without retry. PG transport only; Celery dispatch untouched. Tests: 5 new (reused-stale→retry; fresh→no retry; injected→no retry; retry-also-fails→raise once; non-connection error→no retry). All green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/client.py | Adds one-shot reconnect-retry to send() for stale cached connections; extracts _insert_message(), adds _CONN_DEAD_ERRORS constant and 0.5s backoff. Logic is sound: reused flag captured before attempt, retry only fires for owned+cached conns, second attempt is structurally outside the try-block so retry is truly one-shot. |
| workers/tests/test_pg_queue_client.py | Adds 5 focused unit tests in TestSendReconnectRetry covering: reused-stale retry success (parameterized over both error types), fresh-conn no-retry, injected-conn no-retry, retry-also-fails one-shot bound, and non-connection error on reused conn. All critical branches exercised. |
Reviews (2): Last reviewed commit: "UN-3654 [FIX] address PR #2124 review — ..." | Re-trigger Greptile
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review — UN-3654 reconnect-retry
Reviewed via the PR Review Toolkit (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier). The change is well-scoped and the safety-critical reused-vs-fresh gate is correct and well-tested; all 35 tests pass. Findings below are prioritised; the headline items are about the justification in the comments overstating a downstream guarantee, not the control flow.
P1 — overstated dedup guarantee + misattributed backoff rationale. P2 — error-class coverage gap, triage-misleading WARNING, no success breadcrumb, untested InterfaceError/backoff. P3 — blocking sleep, keyword-only discipline, test-helper duplication.
| # Small fixed pause before send()'s single reconnect-retry (see send()). The | ||
| # idle-reap case reconnects instantly regardless; this only widens the self-heal | ||
| # window for a brief DB failover and avoids re-hammering a struggling server. | ||
| # A literal, not env-driven, so the one-shot bound can't be weakened operationally. |
There was a problem hiding this comment.
P1 — misattributed rationale. This constant is the backoff sleep duration (0.5), but the comment justifies it as keeping "the one-shot bound" from being "weakened operationally." The one-shot guarantee (retry exactly once) is enforced structurally by send()'s single except/single retry call — it has nothing to do with this literal; making the backoff env-driven would not change the retry count. This reads as copied from pg_barrier.py:140-141, where the literal annotated is the actual attempt-count bound (_BARRIER_WRITE_ATTEMPTS). Suggest retying the rationale to what this value governs, e.g. "a literal so the pause can't be tuned into a long blocking stall on the enqueue hot path."
| connection failing is a genuine error (or an ambiguous mid-statement | ||
| death) → re-raise, never retry. Unlike the barrier's idempotent | ||
| pre-dispatch write, an ``INSERT`` is not idempotent, so this | ||
| reused-only guard is what keeps the retry from double-enqueuing; in the |
There was a problem hiding this comment.
P1 — overstated downstream guarantee (flagged independently by 3 reviewers). The claim that a duplicate "is absorbed by the claim_batch / pg_batch_dedup gate downstream" only holds for batch header tasks: claim_batch is invoked from exactly one site, run_batch_with_barrier (pg_barrier.py:896). But send() is the generic enqueue primitive — dispatch.py:154 routes leaf tasks, the barrier header fan-out, and the aggregating callback through it, none of which carry a batch_index, so none are gated. A double-fired callback would aggregate chord results twice. Suggest leading with the real general backstop (the module's at-least-once / idempotent-consumer contract, lines 13-17) and presenting claim_batch as the batch-specific instance, so the comment stays accurate as new non-batch send() call sites appear.
| f"priority out of range [{MIN_PRIORITY}, {MAX_PRIORITY}]: {priority!r}" | ||
| ) | ||
| # Capture BEFORE the attempt: a fresh conn has self._conn is None here. | ||
| reused = self._conn is not None and self._owns_conn |
There was a problem hiding this comment.
P2 — reused cannot actually establish "the INSERT never reached the server." The flag only distinguishes "cached before this send" from "created during it." The caught OperationalError/InterfaceError can't separate (a) idle reap — INSERT never sent, retry safe — from (b) mid-statement death where the server committed the row but the socket dropped before psycopg2 read RETURNING msg_id → retry double-inserts. Case (b) is precisely the PgBouncer-recycle/failover scenario this feature targets, so it isn't rare on that path. This is acceptable as an at-least-once enqueue, but the docstring currently frames it as near-exactly-once. Either soften the wording ("duplicate possible only on commit-loss; relies on idempotent consumers") or, for the real fix, add a producer dedup key + INSERT ... ON CONFLICT DO NOTHING RETURNING msg_id so the retry collapses to the original. (Capturing reused before the attempt is itself correct — keep that.)
| reused = self._conn is not None and self._owns_conn | ||
| try: | ||
| return self._insert_message(queue_name, message, org_id, priority) | ||
| except (psycopg2.OperationalError, psycopg2.InterfaceError) as exc: |
There was a problem hiding this comment.
P2 — catch list is narrower than _cursor's own model, and duplicates it. _cursor (line 192-198) explicitly anticipates a server-side termination surfacing as a bare psycopg2.DatabaseError and treats a failed rollback() as proof of death — but this retry catches only (OperationalError, InterfaceError). So a stale death that manifests as a bare DatabaseError gets the conn correctly dropped by _cursor yet is not retried here, silently defeating the feature for that manifestation. Narrow is the right call (must not retry IntegrityError/CheckConstraint), but the gap is undocumented. Also: this tuple is duplicated against _cursor's at line 197 — extract a single _CONN_DEAD_ERRORS: Final = (psycopg2.OperationalError, psycopg2.InterfaceError) referenced by both, so the send/_cursor coupling is enforced by construction, and add a one-line docstring note that a bare DatabaseError is intentionally left to the next call's reconnect.
| if not reused: | ||
| raise | ||
| logger.warning( | ||
| "PG-queue: send to queue=%r failed (%s: %s); cached connection " |
There was a problem hiding this comment.
P2 — WARNING asserts a cause it can't know, mis-steering triage. During a genuine DB outage (not a transient reap) this fires claiming "cached connection likely stale, reconnecting and retrying once" — the wrong diagnosis for an operator scanning WARNINGs mid-incident. Suggest describing the observation, not the conclusion: e.g. "send failed with a connection-level error on a reused cached connection; dropping it and retrying once (stale-reap or DB unavailable)."
| ) | ||
| time.sleep(_SEND_RETRY_BACKOFF_SECONDS) | ||
| # _cursor already dropped the dead owned conn, so this reconnects. | ||
| return self._insert_message(queue_name, message, org_id, priority) |
There was a problem hiding this comment.
P2 — no positive record on a successful retry. For a feature whose primary hazard is a duplicate enqueue, the only log artifact is the pre-retry WARNING; there's no "retry succeeded, msg_id=N" breadcrumb to correlate against or to detect a double-insert after the fact. Capture the returned id and emit an INFO with the resulting msg_id before returning.
| exc, | ||
| exc_info=True, | ||
| ) | ||
| time.sleep(_SEND_RETRY_BACKOFF_SECONDS) |
There was a problem hiding this comment.
P3 — blocking time.sleep(0.5) penalises the common path. The docstring concedes the idle-reap case (the common reason this retry exists) reconnects instantly, so the 0.5s is pure dead time there; it only buys a failover self-heal window. Bounded and one-shot, so low severity, and fine while all callers are synchronous — but a simultaneous failover stales many cached clients at once, each dispatch then blocking 0.5s (a small thundering-herd stall). Worth a note that the sleep penalises the common path to buy failover headroom, and a caveat that this becomes a loop-stall if send() is ever pulled into async context.
| # _cursor already dropped the dead owned conn, so this reconnects. | ||
| return self._insert_message(queue_name, message, org_id, priority) | ||
|
|
||
| def _insert_message( |
There was a problem hiding this comment.
P3 — _insert_message drops send()'s keyword-only discipline. send() forces org_id/priority keyword-only via *, and validates priority range; _insert_message takes all four positionally and assumes the range check already ran. Private + only two internal callers, so low risk, but a future direct caller bypasses both guards. Mirror the * boundary (org_id/priority keyword-only) so the helper reads like the public method it serves and can't be miscalled positionally.
| monkeypatch.setattr("queue_backend.pg_queue.client.time.sleep", sleep) | ||
| return sleep | ||
|
|
||
| def test_reused_stale_conn_retries_and_succeeds(self, monkeypatch): |
There was a problem hiding this comment.
P2 — three coverage gaps in the happy-path test. (1) InterfaceError — the more likely stale symptom given conn.closed is a client-side-only flag — is never exercised; all 5 tests use OperationalError/RuntimeError, so narrowing the except to OperationalError alone would re-break the PgBouncer-reap fix yet pass the suite. Parametrize this test over both exception types. (2) The 0.5s backoff fires here but is never asserted (_no_sleep's mock is discarded); deleting the time.sleep line or changing _SEND_RETRY_BACKOFF_SECONDS passes the suite. Add sleep.assert_called_once_with(_SEND_RETRY_BACKOFF_SECONDS). (3) The retry's INSERT params (org_id/priority/message passthrough) are unverified — assert on the fresh cursor's execute.call_args for a send with explicit org_id/priority. A caplog check on the WARNING (precedent at lines 339-351) would round it out.
| """ | ||
|
|
||
| @staticmethod | ||
| def _conn(*, execute_side_effect=None, fetchone=(1,)): |
There was a problem hiding this comment.
P3 — third copy of the mock-conn builder. _conn here cleanly de-duplicates within this class (backs all 5 tests — good), but it's now the third file-level copy of the same cur/conn/_CursorCtx/conn.closed=0 shape alongside _mock_conn (line 44) and _owned_client (line 141). Consider folding them into module-level _mock_conn with closed=0 + an execute_side_effect param and delegating. Minor: every call site does dead, _ = self._conn(...) and discards cur, so returning just conn would simplify them — optional if you keep the (conn, cur) convention for consistency. _no_sleep is good as-is.
… shared error tuple, keyword-only, test coverage - reframe send() docstring: reused gate makes idle-reap retry safe but enqueue is at-least-once (commit-loss case); lean on module idempotent-consumer contract; claim_batch/pg_batch_dedup is the batch-header-specific instance only - extract _CONN_DEAD_ERRORS shared by _cursor + send(); note bare DatabaseError left to next-call reconnect - retie backoff-constant rationale to pause duration (+ async caveat); WARNING describes observation not conclusion; add success breadcrumb on reconnect - _insert_message org_id/priority keyword-only - tests: parametrize Operational/InterfaceError, assert sleep backoff + retry param passthrough + caplog WARNING Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Thanks for the review — addressed in 1b6cf9d. Grouped by priority: P1 — overstated guarantee (3 reviewers)
P2
Tests
P3 — deliberately deferred
36 passed; |
89b4b11
into
feat/UN-3445-pg-queue-integration
|
What
Adds a one-shot reconnect-retry to
PgQueueClient.send()so a PG-routed dispatch survives a cached connection that was reaped server-side while idle. Follow-up to UN-3651.Why
A PG-transport ETL execution failed with
server closed the connection unexpectedlyat the barrier's header dispatch, ~30 min after the previous execution on the same worker thread — PgBouncerserver_idle_timeoutreaped the cached connection while it sat idle between executions.UN-3651 added a reconnect-retry to the barrier's idempotent pre-dispatch write (
_reset_barrier), and that retry fired and succeeded. But the very next call — the header dispatch viadispatch._enqueue_pg→PgQueueClient.send()→INSERTintopg_queue_message— uses a separate cached connection with no reconnect logic, so it failed and aborted the whole execution. UN-3651's guard did not cover this site.Root cause (worker logs): the dispatch singleton's cached
psycopg2connection was last used by the prior execution, then idle ~30 min; the server-side connection was dropped; the nextsend()INSERT hit a dead socket →OperationalError→ executionERROR.How
send()'s INSERT is not idempotent (automsg_id), so a blind retry could double-enqueue. The retry therefore fires only when the failing connection was reused (cached + owned by the client):claim_batch/pg_batch_dedupgate downstream.Scope: PG transport only — the Celery dispatch path is untouched. Zero Celery regression.
Tests
5 new unit tests in
test_pg_queue_client.py::TestSendReconnectRetry:Full file green (35 passed), pre-commit clean.
Dev-test
Reproduced the exact condition against a running Postgres (terminated the cached backend with
pg_terminate_backend, the same client-side symptom as the cloud idle reap):OperationalError: server closed the connection unexpectedly(the failure that aborted the execution).msg_id, and the row is verified present in the DB.