Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3654 [FIX] PG queue — reconnect-retry on stale dispatch connection (send())#2124

Merged
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3654-pg-dispatch-reconnect-retryZipstack/unstract:UN-3654-pg-dispatch-reconnect-retryCopy head branch name to clipboard
Jun 30, 2026
Merged

UN-3654 [FIX] PG queue — reconnect-retry on stale dispatch connection (send())#2124
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3654-pg-dispatch-reconnect-retryZipstack/unstract:UN-3654-pg-dispatch-reconnect-retryCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Adds a one-shot reconnect-retry to PgQueueClient.send() so a PG-routed dispatch survives a cached connection that was reaped server-side while idle. Follow-up to UN-3651.

Why

A PG-transport ETL execution failed with server closed the connection unexpectedly at the barrier's header dispatch, ~30 min after the previous execution on the same worker thread — PgBouncer server_idle_timeout reaped the cached connection while it sat idle between executions.

UN-3651 added a reconnect-retry to the barrier's idempotent pre-dispatch write (_reset_barrier), and that retry fired and succeeded. But the very next call — the header dispatch via dispatch._enqueue_pgPgQueueClient.send()INSERT into pg_queue_message — uses a separate cached connection with no reconnect logic, so it failed and aborted the whole execution. UN-3651's guard did not cover this site.

Root cause (worker logs): the dispatch singleton's cached psycopg2 connection was last used by the prior execution, then idle ~30 min; the server-side connection was dropped; the next send() INSERT hit a dead socket → OperationalError → execution ERROR.

How

send()'s INSERT is not idempotent (auto msg_id), so a blind retry could double-enqueue. The retry therefore fires only when the failing connection was reused (cached + owned by the client):

  • A reused connection that dies on its first statement was reaped while idle, so the INSERT never reached the server — re-inserting cannot duplicate.
  • A fresh connection failing is a genuine error (or an ambiguous mid-statement death) → re-raise, never retry.
  • In the rare reused-conn mid-statement-death case, a duplicate batch enqueue is absorbed by the existing claim_batch / pg_batch_dedup gate downstream.

Scope: PG transport only — the Celery dispatch path is untouched. Zero Celery regression.

Tests

5 new unit tests in test_pg_queue_client.py::TestSendReconnectRetry:

  • reused-stale conn → retry reconnects + succeeds
  • fresh-conn failure → no retry (re-raises)
  • injected conn → no retry (caller's connection untouched)
  • retry-also-fails → raises after exactly one reconnect
  • non-connection error on reused conn → no retry

Full file green (35 passed), pre-commit clean.

Dev-test

Reproduced the exact condition against a running Postgres (terminated the cached backend with pg_terminate_backend, the same client-side symptom as the cloud idle reap):

  • Before-fix path (un-wrapped INSERT) → OperationalError: server closed the connection unexpectedly (the failure that aborted the execution).
  • send() → self-heals: reconnects, returns a msg_id, and the row is verified present in the DB.

… (send())

A PG-transport execution aborted with "server closed the connection
unexpectedly" at barrier header dispatch ~30 min after the prior run on
the same worker thread: PgBouncer server_idle_timeout reaped the cached
dispatch connection while idle. UN-3651's reconnect-retry covers only the
barrier's idempotent pre-dispatch write; the next call —
PgQueueClient.send() INSERT into pg_queue_message — used a separate cached
connection with no reconnect logic, so it failed and aborted the execution.

send()'s INSERT is not idempotent (auto msg_id), so a blind retry could
double-enqueue. The one-shot reconnect-retry therefore fires ONLY when the
failing connection was reused (cached + owned): a reused conn that dies on
its first statement was reaped while idle, so the INSERT never reached the
server and re-inserting can't duplicate. A fresh connection failing
re-raises without retry. PG transport only; Celery dispatch untouched.

Tests: 5 new (reused-stale→retry; fresh→no retry; injected→no retry;
retry-also-fails→raise once; non-connection error→no retry). All green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1d5a57bd-18e1-4023-a679-1ae812953321

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3654-pg-dispatch-reconnect-retry

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@greptile-apps

greptile-apps Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds a one-shot reconnect-retry to PgQueueClient.send() to handle server-side connection reaping (PgBouncer server_idle_timeout) that was silently killing PG-transport executions at the barrier's header dispatch. It follows the reconnect logic already present in _reset_barrier (UN-3651) and extends it to the send() INSERT site, which was the missing piece.

  • Introduces _CONN_DEAD_ERRORS as a shared constant between _cursor() and send() so the "dead connection" test can't drift between sites; adds _SEND_RETRY_BACKOFF_SECONDS = 0.5 as a hard-coded literal (env-driven config is deliberately excluded to prevent tuning the hot-path into a long stall).
  • Extracts _insert_message() from send() to enable the one-shot retry: the reused flag (captured before the first attempt) gates the retry so it only fires for an owned, previously cached connection — a fresh or caller-injected connection re-raises immediately without retry.
  • The at-least-once semantics introduced by the retry (rare post-commit-death duplicate) are clearly documented in the docstring, with an explicit note that claim_batch/pg_batch_dedup only absorbs batch-header duplicates, not leaf tasks or callbacks.

Confidence Score: 5/5

Safe to merge — the retry logic is structurally one-shot, the reused-connection gate is captured correctly before any I/O, and the at-least-once duplicate risk is already absorbed by the existing consumer idempotency contract.

The reused flag is snapped before the first attempt, so it correctly distinguishes a cached connection from one lazily created mid-call. The retry call sits outside the try block, making the one-shot bound structural rather than conditional. _CONN_DEAD_ERRORS is shared between _cursor() and send(), preventing the two dead-connection tests from drifting. The five new unit tests cover all branches: both error subtypes on reused conn, fresh-conn suppression, injected-conn suppression, retry-also-fails, and non-connection error. No regressions to the Celery path or existing lifecycle tests are introduced.

No files require special attention.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/client.py Adds one-shot reconnect-retry to send() for stale cached connections; extracts _insert_message(), adds _CONN_DEAD_ERRORS constant and 0.5s backoff. Logic is sound: reused flag captured before attempt, retry only fires for owned+cached conns, second attempt is structurally outside the try-block so retry is truly one-shot.
workers/tests/test_pg_queue_client.py Adds 5 focused unit tests in TestSendReconnectRetry covering: reused-stale retry success (parameterized over both error types), fresh-conn no-retry, injected-conn no-retry, retry-also-fails one-shot bound, and non-connection error on reused conn. All critical branches exercised.

Reviews (2): Last reviewed commit: "UN-3654 [FIX] address PR #2124 review — ..." | Re-trigger Greptile

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR Review — UN-3654 reconnect-retry

Reviewed via the PR Review Toolkit (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier). The change is well-scoped and the safety-critical reused-vs-fresh gate is correct and well-tested; all 35 tests pass. Findings below are prioritised; the headline items are about the justification in the comments overstating a downstream guarantee, not the control flow.

P1 — overstated dedup guarantee + misattributed backoff rationale. P2 — error-class coverage gap, triage-misleading WARNING, no success breadcrumb, untested InterfaceError/backoff. P3 — blocking sleep, keyword-only discipline, test-helper duplication.

# Small fixed pause before send()'s single reconnect-retry (see send()). The
# idle-reap case reconnects instantly regardless; this only widens the self-heal
# window for a brief DB failover and avoids re-hammering a struggling server.
# A literal, not env-driven, so the one-shot bound can't be weakened operationally.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — misattributed rationale. This constant is the backoff sleep duration (0.5), but the comment justifies it as keeping "the one-shot bound" from being "weakened operationally." The one-shot guarantee (retry exactly once) is enforced structurally by send()'s single except/single retry call — it has nothing to do with this literal; making the backoff env-driven would not change the retry count. This reads as copied from pg_barrier.py:140-141, where the literal annotated is the actual attempt-count bound (_BARRIER_WRITE_ATTEMPTS). Suggest retying the rationale to what this value governs, e.g. "a literal so the pause can't be tuned into a long blocking stall on the enqueue hot path."

connection failing is a genuine error (or an ambiguous mid-statement
death) → re-raise, never retry. Unlike the barrier's idempotent
pre-dispatch write, an ``INSERT`` is not idempotent, so this
reused-only guard is what keeps the retry from double-enqueuing; in the

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — overstated downstream guarantee (flagged independently by 3 reviewers). The claim that a duplicate "is absorbed by the claim_batch / pg_batch_dedup gate downstream" only holds for batch header tasks: claim_batch is invoked from exactly one site, run_batch_with_barrier (pg_barrier.py:896). But send() is the generic enqueue primitive — dispatch.py:154 routes leaf tasks, the barrier header fan-out, and the aggregating callback through it, none of which carry a batch_index, so none are gated. A double-fired callback would aggregate chord results twice. Suggest leading with the real general backstop (the module's at-least-once / idempotent-consumer contract, lines 13-17) and presenting claim_batch as the batch-specific instance, so the comment stays accurate as new non-batch send() call sites appear.

f"priority out of range [{MIN_PRIORITY}, {MAX_PRIORITY}]: {priority!r}"
)
# Capture BEFORE the attempt: a fresh conn has self._conn is None here.
reused = self._conn is not None and self._owns_conn

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — reused cannot actually establish "the INSERT never reached the server." The flag only distinguishes "cached before this send" from "created during it." The caught OperationalError/InterfaceError can't separate (a) idle reap — INSERT never sent, retry safe — from (b) mid-statement death where the server committed the row but the socket dropped before psycopg2 read RETURNING msg_id → retry double-inserts. Case (b) is precisely the PgBouncer-recycle/failover scenario this feature targets, so it isn't rare on that path. This is acceptable as an at-least-once enqueue, but the docstring currently frames it as near-exactly-once. Either soften the wording ("duplicate possible only on commit-loss; relies on idempotent consumers") or, for the real fix, add a producer dedup key + INSERT ... ON CONFLICT DO NOTHING RETURNING msg_id so the retry collapses to the original. (Capturing reused before the attempt is itself correct — keep that.)

reused = self._conn is not None and self._owns_conn
try:
return self._insert_message(queue_name, message, org_id, priority)
except (psycopg2.OperationalError, psycopg2.InterfaceError) as exc:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — catch list is narrower than _cursor's own model, and duplicates it. _cursor (line 192-198) explicitly anticipates a server-side termination surfacing as a bare psycopg2.DatabaseError and treats a failed rollback() as proof of death — but this retry catches only (OperationalError, InterfaceError). So a stale death that manifests as a bare DatabaseError gets the conn correctly dropped by _cursor yet is not retried here, silently defeating the feature for that manifestation. Narrow is the right call (must not retry IntegrityError/CheckConstraint), but the gap is undocumented. Also: this tuple is duplicated against _cursor's at line 197 — extract a single _CONN_DEAD_ERRORS: Final = (psycopg2.OperationalError, psycopg2.InterfaceError) referenced by both, so the send/_cursor coupling is enforced by construction, and add a one-line docstring note that a bare DatabaseError is intentionally left to the next call's reconnect.

if not reused:
raise
logger.warning(
"PG-queue: send to queue=%r failed (%s: %s); cached connection "

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — WARNING asserts a cause it can't know, mis-steering triage. During a genuine DB outage (not a transient reap) this fires claiming "cached connection likely stale, reconnecting and retrying once" — the wrong diagnosis for an operator scanning WARNINGs mid-incident. Suggest describing the observation, not the conclusion: e.g. "send failed with a connection-level error on a reused cached connection; dropping it and retrying once (stale-reap or DB unavailable)."

)
time.sleep(_SEND_RETRY_BACKOFF_SECONDS)
# _cursor already dropped the dead owned conn, so this reconnects.
return self._insert_message(queue_name, message, org_id, priority)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — no positive record on a successful retry. For a feature whose primary hazard is a duplicate enqueue, the only log artifact is the pre-retry WARNING; there's no "retry succeeded, msg_id=N" breadcrumb to correlate against or to detect a double-insert after the fact. Capture the returned id and emit an INFO with the resulting msg_id before returning.

exc,
exc_info=True,
)
time.sleep(_SEND_RETRY_BACKOFF_SECONDS)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 — blocking time.sleep(0.5) penalises the common path. The docstring concedes the idle-reap case (the common reason this retry exists) reconnects instantly, so the 0.5s is pure dead time there; it only buys a failover self-heal window. Bounded and one-shot, so low severity, and fine while all callers are synchronous — but a simultaneous failover stales many cached clients at once, each dispatch then blocking 0.5s (a small thundering-herd stall). Worth a note that the sleep penalises the common path to buy failover headroom, and a caveat that this becomes a loop-stall if send() is ever pulled into async context.

# _cursor already dropped the dead owned conn, so this reconnects.
return self._insert_message(queue_name, message, org_id, priority)

def _insert_message(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 — _insert_message drops send()'s keyword-only discipline. send() forces org_id/priority keyword-only via *, and validates priority range; _insert_message takes all four positionally and assumes the range check already ran. Private + only two internal callers, so low risk, but a future direct caller bypasses both guards. Mirror the * boundary (org_id/priority keyword-only) so the helper reads like the public method it serves and can't be miscalled positionally.

Comment thread workers/tests/test_pg_queue_client.py Outdated
monkeypatch.setattr("queue_backend.pg_queue.client.time.sleep", sleep)
return sleep

def test_reused_stale_conn_retries_and_succeeds(self, monkeypatch):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — three coverage gaps in the happy-path test. (1) InterfaceError — the more likely stale symptom given conn.closed is a client-side-only flag — is never exercised; all 5 tests use OperationalError/RuntimeError, so narrowing the except to OperationalError alone would re-break the PgBouncer-reap fix yet pass the suite. Parametrize this test over both exception types. (2) The 0.5s backoff fires here but is never asserted (_no_sleep's mock is discarded); deleting the time.sleep line or changing _SEND_RETRY_BACKOFF_SECONDS passes the suite. Add sleep.assert_called_once_with(_SEND_RETRY_BACKOFF_SECONDS). (3) The retry's INSERT params (org_id/priority/message passthrough) are unverified — assert on the fresh cursor's execute.call_args for a send with explicit org_id/priority. A caplog check on the WARNING (precedent at lines 339-351) would round it out.

"""

@staticmethod
def _conn(*, execute_side_effect=None, fetchone=(1,)):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 — third copy of the mock-conn builder. _conn here cleanly de-duplicates within this class (backs all 5 tests — good), but it's now the third file-level copy of the same cur/conn/_CursorCtx/conn.closed=0 shape alongside _mock_conn (line 44) and _owned_client (line 141). Consider folding them into module-level _mock_conn with closed=0 + an execute_side_effect param and delegating. Minor: every call site does dead, _ = self._conn(...) and discards cur, so returning just conn would simplify them — optional if you keep the (conn, cur) convention for consistency. _no_sleep is good as-is.

… shared error tuple, keyword-only, test coverage

- reframe send() docstring: reused gate makes idle-reap retry safe but enqueue is at-least-once (commit-loss case); lean on module idempotent-consumer contract; claim_batch/pg_batch_dedup is the batch-header-specific instance only
- extract _CONN_DEAD_ERRORS shared by _cursor + send(); note bare DatabaseError left to next-call reconnect
- retie backoff-constant rationale to pause duration (+ async caveat); WARNING describes observation not conclusion; add success breadcrumb on reconnect
- _insert_message org_id/priority keyword-only
- tests: parametrize Operational/InterfaceError, assert sleep backoff + retry param passthrough + caplog WARNING

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Thanks for the review — addressed in 1b6cf9d. Grouped by priority:

P1 — overstated guarantee (3 reviewers)

  • Rewrote the send() reconnect-retry docstring to stop claiming exactly-once. It now states plainly: the reused gate only distinguishes "cached before this call" vs "created during it", which makes the retry safe for the idle-reap case (the INSERT never ran) — but a connection that dies after the server committed the row but before psycopg2 read back RETURNING msg_id (the commit-loss / PgBouncer-recycle case this feature targets) would be re-inserted. So the enqueue is at-least-once, leaning on the module-level idempotent-consumer contract as the general backstop. claim_batch/pg_batch_dedup is now framed as only the batch-header-specific instance of that idempotency — it does not gate leaf tasks or the aggregating callback (which also go through send() via dispatch.py), so it does not absorb every duplicate.
  • On the at-least-once point: I went with "soften the wording + lean on the idempotent-consumer contract" rather than adding a producer dedup key (ON CONFLICT). send() has no natural idempotency key, and the consumer contract already covers redelivery. Happy to switch to a producer dedup-key approach in a follow-up if you would prefer that over the contract route.

P2

  • Extracted _CONN_DEAD_ERRORS = (OperationalError, InterfaceError) shared by both _cursor() and send() so the two sites cannot drift. Added a note in _cursor that a server death surfacing as a bare DatabaseError is still treated as dead there but is deliberately NOT retried by send() — left to the next call’s reconnect.
  • Retied the _SEND_RETRY_BACKOFF_SECONDS comment to the pause duration (it is not the retry count — the one-shot is structural), with a caveat that the blocking time.sleep becomes a loop-stall if send() is ever pulled into an async context.
  • WARNING log now describes the observation ("connection-level error on a reused cached connection … stale reap or DB unavailable") instead of asserting "cached connection likely stale" as fact (which mis-triages a real outage).
  • Added a positive INFO breadcrumb capturing the retry’s msg_id on reconnect success, so a silent duplicate enqueue is correlatable.
  • Restored keyword-only discipline on _insert_message(*, org_id, priority) and updated both call sites.

Tests

  • Parametrized test_reused_stale_conn_retries_and_succeeds over both OperationalError and InterfaceError (InterfaceError was never exercised).
  • Asserted the backoff fired: sleep.assert_called_once_with(_SEND_RETRY_BACKOFF_SECONDS).
  • Asserted the retry’s INSERT carries the passthrough params (queue_name/org_id/priority) on the fresh cursor, so a params-drop on the retry is caught.
  • Added a caplog WARNING assertion on the retry.

P3 — deliberately deferred

  • Did not fold the three mock-conn builders into one helper. Kept churn minimal per the review’s "optional" note; can do it in a separate cleanup if desired.

36 passed; ruff check + ruff format --check clean on both files.

@muhammad-ali-e muhammad-ali-e merged commit 89b4b11 into feat/UN-3445-pg-queue-integration Jun 30, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3654-pg-dispatch-reconnect-retry branch June 30, 2026 04:39
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.