Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

UN-3660 [FIX] PG barrier decrement — phase-split reconnect-retry on stale connection#2130

Merged
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3660-pg-barrier-decrement-retryZipstack/unstract:UN-3660-pg-barrier-decrement-retryCopy head branch name to clipboard
Jun 30, 2026
Merged

UN-3660 [FIX] PG barrier decrement — phase-split reconnect-retry on stale connection#2130
muhammad-ali-e merged 2 commits into
feat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom
UN-3660-pg-barrier-decrement-retryZipstack/unstract:UN-3660-pg-barrier-decrement-retryCopy head branch name to clipboard

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Closes the last uncovered stale-connection-after-idle hang path in the PG-queue execution engine, after UN-3654 (dispatch send), UN-3651 (barrier enqueue) and UN-3659 (store_result).

The PgBarrier decrement —

UPDATE pg_barrier_state SET remaining = remaining - 1,
       results = results || jsonb_build_array(...) WHERE execution_id = ... RETURNING ...

runs on the worker's cached thread-local connection. After a worker sits idle > server_idle_timeout (PgBouncer) the connection is reaped server-side. The decrement's try/except caught only psycopg2.DataError, so an OperationalError/InterfaceError propagated, the batch's decrement was lost, remaining never reached 0, and the execution hung at the fan-in (all files COMPLETED but execution stuck EXECUTING) until the ~6h barrier expiry. Same class as the earlier hangs, one step later in the pipeline.

Why not a plain retry (or send's reused-guard)

The decrement is non-idempotent, and a double-apply is harmful: it can fire the callback early with incomplete results, or drive remaining past 0 and strand the barrier. Even send()'s reused-guard is not enough on its own here — a reused-connection failure at commit time is ambiguous (the server may have applied it).

Fix — phase-split retry

Split the statement into its two phases and retry only the one where a re-apply is provably exactly-once:

  • EXECUTE phase fails on a reused (cached) connection → idle-reap; the statement never committed → reconnect and re-apply once.
  • COMMIT phase fails → ambiguous → never retry; propagate so the caller tears the barrier down (fail-fast — unchanged behaviour).
  • Fresh-connection failure / DataError / non-connection error → propagate unchanged.

Safety rests on the connection being non-autocommit (verified in create_pg_connection): an execute-phase failure is never durable even if the server ran the UPDATE, because the open transaction is rolled back on disconnect. Durability happens only at commit(), the phase we never retry — so re-applying can never double-count.

Also adds the shared _CONN_DEAD_ERRORS constant (parity with client.py / result_backend.py, so the "is this a connection death?" test can't drift) and a _recover_after_error helper now reused by _cursor.

Tests

  • 6 unit (TestDecrementPhaseSplitRetry): execute-retry-heals (both error types), commit-fail-does-NOT-retry, fresh-conn-no-retry, DataError-no-retry, one-shot bound.
  • Real-DB: idle-reaped connection self-heals through the production entry and decrements exactly once (2→1, one result appended) — the no-double-decrement pin.
  • Real pg_terminate_backend: a genuinely killed backend mid-decrement self-heals and fires the callback exactly once.
  • Full test_pg_barrier.py (63) + sibling barrier suites (35) green; pre-commit clean.

Dev-tested end-to-end inside the live PG worker fleet against the real database (real backend termination mid-decrement → self-heal → callback once → barrier consumed).

After this, no stale-connection-after-idle path can hang an execution.

…tale connection

The barrier decrement was the last uncovered idle-reap hang path (after
UN-3654 dispatch send, UN-3651 enqueue, UN-3659 store_result). Its try/except
caught only psycopg2.DataError, so a PgBouncer-reaped cached connection failed
the UPDATE, propagated to the in-body abort, and tore the whole execution down
on a transient blip (all files COMPLETED, execution ERROR/stuck).

The decrement is non-idempotent and a double-apply is harmful (fires the
callback early with partial results, or skips past 0 and strands the barrier),
so a blind retry — or even send()'s reused-guard alone — is unsafe: a
reused-conn failure at commit time is ambiguous. Instead split the statement
into its two phases and retry only the safe one:

- EXECUTE phase fails on a reused (cached) conn → idle-reap, never committed →
  reconnect and re-apply exactly once.
- COMMIT phase fails → ambiguous (server may have applied it) → never retry;
  propagate so the caller tears the barrier down (fail-fast, unchanged).

Safety rests on the connection being non-autocommit (verified): an
execute-phase failure is never durable even if the server ran the UPDATE.

Adds the shared _CONN_DEAD_ERRORS constant (parity with client.py /
result_backend.py) and a _recover_after_error helper now reused by _cursor.

Tests: 6 unit (execute-retry, commit-no-retry, fresh-conn-no-retry,
DataError-no-retry, both error types, one-shot bound), a real-DB exactly-once
decrement, and a real pg_terminate_backend run asserting the callback fires
exactly once.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 83c3db1e-37df-4a6d-9f2a-7be27b474b5d

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3660-pg-barrier-decrement-retry

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@greptile-apps

greptile-apps Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR closes the last stale-connection hang path in the PG-queue barrier by introducing a phase-split retry for the non-idempotent remaining - 1 decrement: execute-phase failures on a cached connection (PgBouncer idle-reap) retry exactly once after reconnect, while commit-phase failures (ambiguous — server may have already applied) are never retried and propagate immediately.

  • _apply_decrement (new): executes the UPDATE … RETURNING and commit() in two explicit phases, retrying only when conn_dead and reused and attempt < limit; a _DecrementRow NamedTuple replaces positional tuple access.
  • _recover_after_error (new): factors the rollback-and-discard logic out of _cursor so both _cursor and _apply_decrement share one definition; now also logs on rollback failure instead of silently swallowing it.
  • CONN_DEAD_ERRORS centralized into connection.py and imported by all three PG-queue sites (client, result_backend, pg_barrier), removing three divergent local copies.

Confidence Score: 5/5

Safe to merge — the phase-split is logically sound and the exactly-once guarantee is well-tested against both unit stubs and real DB termination.

The core invariant (retry only on execute-phase failure on a cached connection, never on commit) is correctly implemented and thoroughly pinned: 6 unit tests cover every branch of the phase-split, a real-DB idle-reap test confirms the decrement lands exactly once, and a pg_terminate_backend test proves the callback fires exactly once against a genuine socket kill. The _recover_after_error refactor is a faithful extraction of the existing inline rollback logic with an added log on rollback failure. No double-decrement path exists.

connection.py — CONN_DEAD_ERRORS is missing Final; cosmetic but worth fixing to match the previous per-module definitions.

Important Files Changed

Filename Overview
workers/queue_backend/pg_barrier.py Adds _recover_after_error helper (refactors _cursor), _DecrementRow NamedTuple, and the key _apply_decrement phase-split retry function; wires them into _barrier_pg_decrement with a pre-guard conn_was_cached sample. Logic is correct — execute-phase retries only when reused=True and conn_dead and attempt < limit, commit failures are never retried.
workers/queue_backend/pg_queue/connection.py Promotes CONN_DEAD_ERRORS to a shared module-level constant so all three PG-queue sites import from one place; drops Final annotation present in the prior per-module definitions (the only finding in this file).
workers/queue_backend/pg_queue/client.py Removes local _CONN_DEAD_ERRORS definition and imports it from connection.py; no behavioural change.
workers/queue_backend/pg_queue/result_backend.py Same deduplication as client.py — local _CONN_DEAD_ERRORS removed, imported from connection.py; no behavioural change.
workers/tests/test_pg_barrier.py Adds TestDecrementPhaseSplitRetry (6 unit tests covering execute-retry heals, commit-fail no-retry, fresh-conn no-retry, DataError no-retry, one-shot bound, and end-to-end wrapper wiring) plus two real-DB tests (idle-reap self-heal and pg_terminate_backend exactly-once). _FakeConn extended with commit_error and get_transaction_status to support new paths.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Caller as _barrier_pg_decrement
    participant AD as _apply_decrement
    participant TL as Thread-local conn
    participant DB as Postgres / PgBouncer

    Caller->>TL: sample conn_was_cached (before entry guard)
    Caller->>TL: _get_conn() [entry-guard txn check]
    Caller->>AD: "_apply_decrement(reused=conn_was_cached)"

    loop attempt 1..2
        AD->>TL: "conn = _get_conn()"
        AD->>DB: cur.execute(UPDATE RETURNING)
        alt "Execute fails — conn-dead AND reused AND attempt < limit"
            AD->>AD: _recover_after_error → discard conn
            AD->>AD: sleep + continue (retry)
        else Execute fails — not retried (fresh-conn or budget spent)
            AD->>AD: _recover_after_error → discard conn
            AD-->>Caller: raise OperationalError/InterfaceError
        else Execute fails — non-conn error (DataError)
            AD->>AD: _recover_after_error → rollback only, conn kept
            AD-->>Caller: raise DataError
        else Execute succeeds
            AD->>DB: conn.commit()
            alt Commit fails — AMBIGUOUS, never retried
                AD->>AD: _recover_after_error → discard conn
                AD-->>Caller: raise (fail-fast)
            else Commit succeeds
                AD-->>Caller: _DecrementRow(remaining, results)
            end
        end
    end
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Caller as _barrier_pg_decrement
    participant AD as _apply_decrement
    participant TL as Thread-local conn
    participant DB as Postgres / PgBouncer

    Caller->>TL: sample conn_was_cached (before entry guard)
    Caller->>TL: _get_conn() [entry-guard txn check]
    Caller->>AD: "_apply_decrement(reused=conn_was_cached)"

    loop attempt 1..2
        AD->>TL: "conn = _get_conn()"
        AD->>DB: cur.execute(UPDATE RETURNING)
        alt "Execute fails — conn-dead AND reused AND attempt < limit"
            AD->>AD: _recover_after_error → discard conn
            AD->>AD: sleep + continue (retry)
        else Execute fails — not retried (fresh-conn or budget spent)
            AD->>AD: _recover_after_error → discard conn
            AD-->>Caller: raise OperationalError/InterfaceError
        else Execute fails — non-conn error (DataError)
            AD->>AD: _recover_after_error → rollback only, conn kept
            AD-->>Caller: raise DataError
        else Execute succeeds
            AD->>DB: conn.commit()
            alt Commit fails — AMBIGUOUS, never retried
                AD->>AD: _recover_after_error → discard conn
                AD-->>Caller: raise (fail-fast)
            else Commit succeeds
                AD-->>Caller: _DecrementRow(remaining, results)
            end
        end
    end
Loading

Reviews (2): Last reviewed commit: "UN-3660 [FIX] Address PR review — fix re..." | Re-trigger Greptile

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review (PR Review Toolkit)

Reviewed via six specialised agents (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, PR Test Analyzer, Comment Analyzer, Code Simplifier) scoped to the two changed files. Overall: a well-reasoned, safety-critical change. The execute-vs-commit phase split is structurally sound, the no-double-count argument holds, and the real-DB pg_terminate_backend test is a genuine exactly-once proof.

The findings below are mostly Low/Nit. The one that matters most is #1: the reused fresh-vs-cached distinction is inoperative in the production path (the entry guard's _get_conn() pre-populates _local.conn, so reused is always True on attempt 1). Not a correctness/double-count bug — the execute-phase retry is unconditionally safe — but the documented invariant isn't realized in prod and the unit test asserting it bypasses the real entry point.

See inline comments for specifics.

Priority summary

# Sev Location Finding
1 Med pg_barrier.py:675 reused always True in prod path; fresh-conn-not-retried invariant inoperative
2 Med pg_barrier.py:137 _recover_after_error swallows rollback failure with no log; can reclassify a non-conn error as conn-death
3 Low pg_barrier.py:647 Load-bearing non-autocommit invariant is documented but unenforced — add assert not conn.autocommit
4 Low pg_barrier.py:656 Docstring names wrong caller for commit-phase teardown (Celery .link path has no teardown)
5 Low pg_barrier.py:104 _CONN_DEAD_ERRORS triplicated across 3 modules — hoist to shared module to make the no-drift invariant real
6 Low pg_barrier.py:631 Return tuple[int, Any] | NoneNamedTuple for clarity / typed results
7 Low pg_barrier.py:701 Execute-phase give-up raise emits no decrement-level log (asymmetric trail)
8 Nit pg_barrier.py:719 AssertionError comment's "keeps the type checker honest" rationale is wrong (None is a valid return)
9 Test test_pg_barrier.py:275 time.sleep backoff never asserted — deleting it wouldn't fail any test
10 Test test_pg_barrier.py:329 Fresh-conn test bypasses the real entry point (ties to #1)
11 Test test_pg_barrier.py:368 Test rationale ("one-shot bound") doesn't match the actual gating branch

Comment thread workers/queue_backend/pg_barrier.py Outdated
for attempt in range(1, _BARRIER_DECREMENT_ATTEMPTS + 1):
# Only a connection cached BEFORE this attempt can be a stale idle-reaped
# handle; a freshly-reconnected one failing is a genuine error, not a reap.
reused = getattr(_local, "conn", None) is not None and not _local.conn.closed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium] The reused fresh-vs-cached distinction is inoperative in the production path.

The only production caller, _barrier_pg_decrement, unconditionally runs the entry-guard _get_conn() (~line 765) before _apply_decrement. That call populates _local.conn with a live, open connection. So by the time reused is sampled here, _local.conn is always non-None and open → reused is always True on attempt 1 — including the case the design explicitly wants to exclude: a brand-new connection whose first statement dies against a genuinely-down DB.

This is not a double-count bug (the execute-phase retry is unconditionally safe since nothing committed). The practical impact is bounded: one wasted reconnect + a 0.5s time.sleep before surfacing a real DB-down error. But:

  • the documented invariant ("a freshly-created one failing is a genuine DB error… so it is not retried", lines 645-647) is not realized through the production entry point — here reused's only real effect is the one-shot bound, which attempt < _BARRIER_DECREMENT_ATTEMPTS already provides;
  • test_fresh_conn_execute_failure_is_not_retried passes only because it calls _apply_decrement directly with _local.conn = None, bypassing the guard — false confidence;
  • the "Mirrors PgQueueClient.send's reused-guard" claim is operationally inaccurate: in send, reused is sampled before any _get_conn.

Fix: capture reused in _barrier_pg_decrement before the entry-guard _get_conn() and thread it into _apply_decrement. Then add an end-to-end test (through _barrier_pg_decrement) for the fresh-conn case. Alternatively, if the team accepts the execute retry is always safe regardless, soften the docstrings/comments to stop claiming a fresh conn isn't retried.

Minor (same line): capture _local.conn once to avoid the double lookup —

cached = getattr(_local, "conn", None)
reused = cached is not None and not cached.closed

conn_dead = isinstance(exc, _CONN_DEAD_ERRORS)
try:
conn.rollback()
except Exception:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium] Rollback failure is swallowed with no log, and can silently reclassify a non-connection error as a connection death.

Two compounding issues:

  1. The rollback exception is discarded with zero logging. The original op error does surface later via the caller's raise, but the rollback failure itself leaves no trace — if rollback fails for an unexpected reason (e.g. an InternalError), that diagnostic is gone.
  2. conn_dead = True on a failed rollback is the exact gate that decides whether _apply_decrement retries the execute phase (line 687). The docstring at lines 659-660 promises a DataError "propagates unchanged", but a non-_CONN_DEAD_ERRORS error whose rollback also fails gets relabeled conn_dead=True and routed into the retry path — contradicting the stated invariant. The counter stays safe (execute phase committed nothing), but the real error is masked behind a misleading "cached connection died, reconnecting" warning and re-run.

Fix: log the rollback failure with exc_info before flipping conn_dead:

except Exception:
    logger.warning(
        "PgBarrier: rollback during error recovery failed (original error %s) "
        "— treating the connection as dead and discarding it.",
        type(exc).__name__, exc_info=True,
    )
    conn_dead = True

Comment thread workers/queue_backend/pg_barrier.py Outdated
rolled back on disconnect. So the decrement provably never landed →
reconnect and re-apply it exactly once. (Only a *cached* connection can be a
stale idle-reaped handle; a freshly-created one failing is a genuine DB
error, not a reap, so it is not retried.) **This safety relies on the

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] The load-bearing "non-autocommit" invariant is documented but unenforced.

The entire exactly-once argument for the execute-phase retry rests on the connection being non-autocommit (so the uncommitted UPDATE rolls back on disconnect). This is currently true only because create_pg_connection (pg_queue/connection.py) never sets autocommit and thus inherits psycopg2's False default — it does not actively open non-autocommit, so the phrase "create_pg_connection opens it that way" slightly overstates. There is no runtime guard: a future change adding conn.autocommit = True there (e.g. to match the queue's commit-immediately pattern) would silently reintroduce the double-count this function exists to prevent — and the entry guard's TRANSACTION_STATUS_IDLE check does not imply non-autocommit.

Fix: (a) soften the wording to "non-autocommit (psycopg2's default; create_pg_connection does not override it)"; and (b) as a code change, assert the invariant loudly, e.g. assert not conn.autocommit at the top of _apply_decrement, so it fails loudly rather than corrupting the counter.

Comment thread workers/queue_backend/pg_barrier.py Outdated
- **COMMIT phase** — ``conn.commit()``. A failure here is AMBIGUOUS: the
server may have applied the commit before the socket dropped. Re-applying
could double-count, so it is NEVER retried — it propagates, and the caller
(:func:`run_batch_with_barrier`) tears the barrier down so the execution

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low / comment accuracy] This docstring names the wrong caller for commit-phase teardown.

The direct caller, _barrier_pg_decrement, does not catch a commit-phase error — its only except is psycopg2.DataError (~line 791). So a commit-phase failure propagates to whichever ultimate caller drove it:

  • run_batch_with_barrier (in-body PG path) — caught and _abort_barrier_in_body runs → teardown. Fail-fast holds. ✅
  • barrier_pg_decr_and_check (the Celery .link path, today's live path) — no teardown; the barrier strands to expires_at (max_retries=0, no replay).

The no-double-count guarantee still holds (commit is never retried), but the "tears the barrier down so the execution fails fast" claim is false for the path that's actually live today. Suggest rewording to cover both paths and drop the single-caller framing.

Comment thread workers/queue_backend/pg_barrier.py Outdated
# decrement phase-split retry, so the "was this a connection death?" test can't
# drift across them. Mirrors ``PgQueueClient._CONN_DEAD_ERRORS`` (UN-3654) and
# ``PgResultBackend._CONN_DEAD_ERRORS`` (UN-3659), the siblings this models on.
_CONN_DEAD_ERRORS: Final[tuple[type[Exception], ...]] = (

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] _CONN_DEAD_ERRORS is now copy #3 of an identical constant (pg_queue/client.py, pg_queue/result_backend.py). The comment here explicitly worries about the "was this a connection death?" test drifting — but the three module-level copies can drift independently, which is the exact risk the constant was meant to kill, just moved up to the module boundary.

Fix: hoist a single _CONN_DEAD_ERRORS into a shared location (e.g. pg_queue/connection.py, which all three already import for create_pg_connection) and import it in all three. If that's deemed too invasive, at minimum add a test asserting the three tuples are equal.

Comment nit: PgQueueClient._CONN_DEAD_ERRORS / PgResultBackend._CONN_DEAD_ERRORS are module-level constants, not class attributes — PgQueueClient._CONN_DEAD_ERRORS would AttributeError. Refer to them by module (pg_queue.client / pg_queue.result_backend).

)
time.sleep(_BARRIER_RETRY_BACKOFF_SECONDS)
continue
raise

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] The execute-phase give-up raise emits no decrement-level log.

The retry branch logs a warning (lines 688-698) and the commit branch logs (709-716), but this propagate path is silent. For a fresh-conn death (reused False) or a non-conn error, _barrier_pg_decrement only catches DataError, so an OperationalError here propagates with no barrier-specific breadcrumb — and on the Celery .link path the barrier isn't torn down, so it hangs to expires_at. The result is an asymmetric trail: retryable blips are logged, but the terminal "gave up / wasn't retryable" decision isn't. Consider one warning before this raise naming the execution, error type, and that the decrement is propagating without retry (mirroring the commit-phase log).

Comment thread workers/queue_backend/pg_barrier.py Outdated
)
raise
return row
# Unreachable: the loop either returns the row or raises. The bare raise keeps

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nit / comment accuracy] The "keeps the type checker honest" rationale is wrong.

The return annotation is tuple[int, Any] | None, so an implicit fall-through off the loop would return None — a valid return (the function legitimately returns None at line 718 when fetchone() finds no row). A type checker would therefore not flag a missing return whether or not this line exists. The AssertionError is a pure defensive runtime guard, not a type-checker aid. Reword to e.g. "Defensive: the loop always returns or raises before here." (Keeping the guard is fine — cheap insurance in concurrency code; just fix the justification.)

Comment thread workers/tests/test_pg_barrier.py Outdated
"""

@pytest.fixture(autouse=True)
def _no_sleep(self, monkeypatch):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Test gap] The time.sleep(_BARRIER_RETRY_BACKOFF_SECONDS) backoff is never asserted.

This _no_sleep fixture (and the inline monkeypatches at ~456/601/631) replace pg_barrier.time.sleep with a bare no-op purely to skip the wall-clock wait — none record or assert the call. If someone deleted the time.sleep in _apply_decrement (line 699), every test would still pass, and a "tighten the retry loop" refactor could silently start hammering a struggling DB.

Fix: have the fixture append to a list, then assert sleeps == [pg_barrier._BARRIER_RETRY_BACKOFF_SECONDS] in the retry test and sleeps == [] in the no-retry tests (commit-phase / fresh-conn).

assert reconnects == [] # NEVER reconnected/retried after a commit failure
assert conn.closed is True # the dead conn was discarded

def test_fresh_conn_execute_failure_is_not_retried(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Test gap — ties to the reused finding] This test bypasses the real entry point.

It calls _apply_decrement(...) directly with _local.conn = None, so it exercises a fresh-conn state that the production wrapper (_barrier_pg_decrementbarrier_pg_decr_and_check) never produces: that wrapper's entry-guard _get_conn() always pre-populates _local.conn, making reused always True on attempt 1. So this test asserts a property the production path does not actually provide. Add an end-to-end test through _barrier_pg_decrement for the fresh-conn-not-retried case (or, if reused is fixed to sample before the entry guard, this becomes a true assertion).

Comment thread workers/tests/test_pg_barrier.py Outdated
assert reconnects == [] # no retry on a logical/data error
assert conn.closed is False # a live conn after a data error is kept

def test_reraises_after_second_execute_failure(self, _clean_local, monkeypatch):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Test quality / informational] The stated rationale doesn't match the actual gating branch.

This test is framed around "the one-shot bound" (attempt < _BARRIER_DECREMENT_ATTEMPTS), but trace the path: on attempt 1, _recover_after_error sets _local.conn = None before the continue; on attempt 2, reused recomputes to False, so the retry is refused by reused, not by the attempt bound — both independently block it. The attempt < term can never be the sole reason a retry is refused, because every retry discards the connection. Net: the decrement is hard-capped at one self-heal by the reused logic regardless of the attempt constant, and bumping _BARRIER_DECREMENT_ATTEMPTS would NOT increase self-heals — an untested assumption worth a comment. The propagation outcome asserted here is correct; only the rationale needs fixing.

…ed constant

Addresses the PR Review Toolkit findings on #2130:

- reused-guard was inoperative in production: `_barrier_pg_decrement` runs the
  entry-guard `_get_conn()` before `_apply_decrement`, so `reused` (sampled
  inside the loop) was always True — the fresh-conn-not-retried case never fired.
  Now sample it in `_barrier_pg_decrement` BEFORE the guard and thread it in.
  Adds an end-to-end test through the wrapper for the fresh-conn case (the prior
  direct-call test bypassed the guard → false confidence).

- Hoist `_CONN_DEAD_ERRORS` to `pg_queue.connection` (the module all three sites
  import) and import it in client.py / result_backend.py / pg_barrier.py, so the
  three copies can't drift. (psycopg2 import drops out of the two that only used
  it for the constant.)

- `_recover_after_error`: log the rollback failure with exc_info before flipping
  `conn_dead` — it was silently swallowed and can reclassify a non-conn error.

- Non-autocommit invariant: soften the docstring ("psycopg2's default; not
  overridden") and pin it at the source with
  `test_create_pg_connection_is_non_autocommit` (a runtime assert in
  `_apply_decrement` would break the autocommit test fixture).

- Return a `_DecrementRow` NamedTuple instead of `tuple[int, Any]` so the caller
  reads `.remaining` / `.results`.

- Log the execute-phase give-up (fresh-conn death / retry exhausted) — the
  retryable and commit paths logged but this terminal path was silent.

- Commit-phase docstring: name both caller paths (in-body teardown vs Celery
  .link reclaim-at-expiry) instead of only run_batch_with_barrier.

- Reword the unreachable-raise comment (it's a defensive runtime guard, not a
  type-checker aid) and assert the backoff sleep fires exactly on retry.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Thanks for the thorough review — addressed in 126a3da2e. Summary:

[Medium] reused inoperative in the production path — fixed. _barrier_pg_decrement now samples conn_was_cached before the entry-guard _get_conn() and threads it into _apply_decrement(..., reused=...), so a genuinely fresh connection is no longer misclassified as cached. Added test_wrapper_fresh_conn_not_retried_end_to_end that drives the real wrapper (not _apply_decrement directly) and asserts no retry reconnect — the false-confidence gap you flagged.

[Medium] Rollback failure swallowed — fixed. _recover_after_error now logs the rollback failure with exc_info before flipping conn_dead, naming that it was the rollback (not the original error) that condemned the connection.

[Low] _CONN_DEAD_ERRORS copy #3 — fixed. Hoisted to pg_queue.connection (the module all three import) as CONN_DEAD_ERRORS; client.py, result_backend.py and pg_barrier.py now import it, so they can't drift. (psycopg2 drops out of the two that only used it for the constant.) Also fixed the comment nit — referred to siblings by module, not as class attributes.

[Low] Non-autocommit invariant unenforced — softened the wording ("psycopg2's default; create_pg_connection does not override it") and pinned it at the source with test_create_pg_connection_is_non_autocommit. I did not add assert not conn.autocommit in _apply_decrement: the barrier_db test fixture deliberately uses an autocommit connection, so the assert would break the real-DB tests — the source-level test is the stronger guard.

[Low] Commit-phase docstring names wrong caller — fixed. Now covers both paths: in-body (run_batch_with_barrier → teardown, fail-fast) and the live Celery .link path (barrier_pg_decr_and_check, max_retries=0 → reclaimed at expires_at by the reaper).

[Low] NamedTuple return — done. _apply_decrement returns _DecrementRow(remaining, results); the caller reads row.remaining / row.results.

[Low] Silent execute-phase give-up — fixed. A conn-dead give-up (fresh-conn death / retry exhausted) now logs a warning before propagating, matching the retryable and commit paths. (A non-conn DataError stays logged by the caller's teardown to avoid a double-log.)

[Nit] Unreachable-raise rationale — reworded to "defensive runtime guard" (the | None return means a type checker wouldn't flag a fall-through anyway).

[Test gap] Backoff never asserted — fixed. The sleeps fixture records time.sleep calls; the retry test asserts sleeps == [_BARRIER_RETRY_BACKOFF_SECONDS] and the no-retry tests assert sleeps == [].

[Test gap / quality] Fresh-conn bypass + one-shot rationale — the new end-to-end test covers the real fresh-conn path; test_reraises_after_one_retry now asserts exactly one reconnect + one backoff and its comment explains the attempt-bound vs reused interaction.

Re-verified: full test_pg_barrier.py + test_pg_queue_client.py + test_pg_result_backend.py + sibling barrier suites = 158 passed; pre-commit clean. Re-ran the live in-container pg_terminate_backend E2E on the final code (all 4 changed files patched into the PG worker fleet, clean boot, cross-module import resolves) → self-heal → decrement exactly once → callback once → barrier consumed.

@muhammad-ali-e muhammad-ali-e merged commit 4ffc41b into feat/UN-3445-pg-queue-integration Jun 30, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3660-pg-barrier-decrement-retry branch June 30, 2026 12:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.