UN-3618 [REFACTOR] PG Queue — gate solely on the Flipt flag (+ UN-3619 transient-connect retry)#2109
UN-3618 [REFACTOR] PG Queue — gate solely on the Flipt flag (+ UN-3619 transient-connect retry)#2109muhammad-ali-e merged 5 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom feat/UN-3618-pg-single-flag-gatingZipstack/unstract:feat/UN-3618-pg-single-flag-gatingCopy head branch name to clipboard
Conversation
…PORT_ENABLED) The pg_queue_enabled Flipt flag becomes the sole gate across all four resolvers (execution, scheduler, executor backend+workers). Removes the redundant env master-switch and the master_gate_enabled parameter from the shared resolve_pg_transport. Fully fail-closed (flag off / Flipt blind / Flipt error / no org -> Celery); verified by unit + real-stack dev-test. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
create_pg_connection retries a transient psycopg2.OperationalError with bounded exponential backoff (WORKER_PG_QUEUE_CONNECT_RETRIES / _BACKOFF, defaults 3 / 0.5s). Connecting is side-effect-free so retry is safe; the enqueue INSERT is intentionally NOT retried (double-dispatch risk). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review (PR Review Toolkit)
Review of the single-flag gating refactor (UN-3618) + transient-connect retry (UN-3619), produced by six specialised agents (Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, Code Simplifier). Net: a clean, mostly-subtractive PR with no Critical issues. One High silent-failure risk (Flipt is now the sole gate, but a Flipt outage is indistinguishable from "flag off" and logs nothing at the decision layer), plus several Medium/Low items on the new retry logic, test coverage, docs, and dead code. Inline comments below.
| execution; ``"celery"`` otherwise (including any error — fail-closed). | ||
| A :class:`WorkflowTransport` value string — ``"pg_queue"`` only when Flipt | ||
| is reachable and says yes for this execution; ``"celery"`` otherwise | ||
| (including any error — fail-closed). |
There was a problem hiding this comment.
[HIGH — Silent failure] The fail-closed try/except here is dead code, so a real Flipt outage is silent.
The docstring promises fail-closed "including any error", and the wrap at lines 123–136 is labelled "defense in depth". But check_feature_flag_status (unstract/flags/src/unstract/flags/feature_flag.py:45-46) already does a bare except Exception: return False with no logging. So:
- The
except Exceptionbranch at lines 129–136 can never execute — thelogger.warning("...Flipt evaluation failed...")you wrote here will never fire. - A genuinely broken Flipt (deleted flag, wrong namespace, gRPC connect failure while
FLIPT_SERVICE_AVAILABLE=true) collapses into the sameFalseas a healthy "rollout says no", with no decision-layer log.
This PR removes the PG_QUEUE_TRANSPORT_ENABLED master gate, so Flipt is now the sole interlock — making this blind spot load-bearing.
Fix: make check_feature_flag_status log + distinguish "evaluation errored" from "flag off" (e.g. log with exc_info and/or re-raise on a sentinel) so these resolver warnings can actually fire; or, at minimum, drop the misleading "defense in depth" comment so a future reader doesn't trust a guard that can't trip. Same issue in scheduler/ownership.py and unstract/.../executor_rpc.py.
| loses its firer. | ||
| Mirrors ``resolve_transport``: gated by the single ``pg_queue_enabled`` Flipt | ||
| flag, keyed on ``pipeline_id`` for a stable percentage bucket. **Fails closed to | ||
| Beat** on a blind Flipt or any error — so a schedule never silently loses its |
There was a problem hiding this comment.
[HIGH — Silent failure] "Fails closed … on any error" is unreachable; a Flipt outage silently leaves schedules on Beat with no log.
The except Exception → warn → return False at lines 66–76 is dead code: check_feature_flag_status (unstract/flags/.../feature_flag.py:45-46) swallows every exception and returns False itself, so the resolve_schedule_owner: Flipt check failed warning never fires. A broken/unreachable Flipt is indistinguishable from "rollout says no", and with the master gate now removed there is no other signal that ownership silently reverted to Beat.
Fix: same as in transport.py — have check_feature_flag_status log/propagate evaluation failures so this fail-closed path is observable, or drop the dead try/except + comment.
| closed gate, a blind Flipt, or any error — so the executor never silently loses | ||
| its transport. | ||
| Gated by the single ``pg_queue_enabled`` Flipt flag, bucketed per org. | ||
| **Fails closed to Celery** on a blind Flipt or any error — so the executor |
There was a problem hiding this comment.
[HIGH — Silent failure] Fail-closed try/except (lines 159–167) is dead code; a Flipt outage silently routes to Celery with no log.
check_feature_flag_status already returns False on any exception with no logging, so the resolve_pg_transport: Flipt check failed warning here is unreachable. Same root cause and fix as transport.py/ownership.py.
Separately, the module docstring (lines ~19–22) is now stale — it still describes "a master kill-switch … supplied by the caller … then the single pg_queue_enabled Flipt flag", but this PR removed the master_gate_enabled param. Update it to describe Flipt as the sole gate.
| # purely additive. Note we deliberately do NOT auto-retry the enqueue INSERT | ||
| # (queue_backend.pg_queue.client.send): an ambiguous commit-time failure could | ||
| # double-enqueue → double-dispatch, so that path stays fail-and-surface. | ||
| _DEFAULT_CONNECT_RETRIES = 3 # total attempts (1 = no retry) |
There was a problem hiding this comment.
[MEDIUM — Type design] Group the retry knobs into one validated policy.
_DEFAULT_CONNECT_BACKOFF and _CONNECT_BACKOFF_CAP only make sense together, and RETRIES/BACKOFF are env-tunable while the cap is not. An operator who sets WORKER_PG_QUEUE_CONNECT_BACKOFF=10 gets every sleep silently clamped to 5.0 — the knob appears to do nothing past the cap, with no warning.
Fix: fold the three values into a frozen ConnectRetryPolicy dataclass built once from env, with a sleep_for(attempt) method and a __post_init__ that asserts backoff_cap >= base_backoff (and either make the cap env-tunable or document it). This puts the backoff math + invariants in one enforced place and trivialises the loop body.
| Empty/unset → default; an unparseable value warns and uses the default | ||
| (a bad knob must not wedge the only direct-DB worker path). | ||
| """ | ||
| raw = os.getenv(f"WORKER_PG_QUEUE_CONNECT_{suffix}") |
There was a problem hiding this comment.
[LOW — Consistency / reuse] _connect_env validation is inconsistent, and it duplicates an existing helper.
A malformed value (RETRIES=abc) warns, but an out-of-range value (RETRIES=0, negative BACKOFF) is silently coerced by the caller's max(1, …)/max(0.0, …) (lines 85–86) with no log. For a "bad knob must not wedge it" goal, out-of-range deserves the same visibility as unparseable — move the clamping in here and warn when a parsed value is coerced.
Also note unstract.core.utilities._safe_get_env_int/_safe_get_env_float already do "read env, parse, warn-and-default" (core is a workers dependency). Not a drop-in (they take a fixed key and bake in non-negative validation), but worth converging on rather than carrying a near-duplicate.
| password=os.getenv(f"{env_prefix}PASSWORD", "unstract_pass"), | ||
| options=f"-c search_path={schema}", | ||
| ) | ||
| except psycopg2.OperationalError: |
There was a problem hiding this comment.
[LOW] psycopg2.OperationalError is broad — permanent misconfig is retried before surfacing. Auth failures (password authentication failed), database "x" does not exist, and bad-host all raise OperationalError, not a subclass. A permanently-misconfigured DB_* var burns all attempts + ~1.5s of backoff on every create_pg_connection call (invoked by several long-running loops) before raising. The final logger.error + raise does surface it, so this is latency/log-noise, not a silent failure. Optional: skip retry for clearly-permanent SQLSTATEs (28xxx auth, 3D000 invalid db) via exc.pgcode, or tighten the docstring to say auth/db-name errors are retried-then-raised.
| env_prefix, | ||
| ) | ||
| raise | ||
| raise AssertionError("unreachable: loop returns or raises") # pragma: no cover |
There was a problem hiding this comment.
[LOW — Simplification] Drop the unreachable AssertionError sentinel by restructuring the loop. The loop already returns on success or raises on the final attempt, so this # pragma: no cover sentinel only exists because the terminal raise lives inside the loop. Invert the guard so the loop body handles only the retryable case and the terminal logger.error(...); raise is reached structurally on the last iteration — functionally identical, no sentinel, no pragma:
except psycopg2.OperationalError:
if attempt >= attempts:
logger.error(...)
raise
sleep_for = min(backoff * (2 ** (attempt - 1)), _CONNECT_BACKOFF_CAP)
logger.warning(...)
time.sleep(sleep_for)| @@ -48,31 +47,13 @@ | ||
|
|
||
| logger = logging.getLogger(__name__) |
There was a problem hiding this comment.
[LOW — Dead code] logger (and import logging at line ~17) is now unused. Its only consumer was the fat-fingered-env warning this PR deletes; the backend counterpart (backend/pg_queue/executor_rpc.py) correctly carries no logger. Ruff won't flag a module-level unused binding, so this is cosmetic — remove both lines to keep the module clean (or keep if you anticipate logging here soon).
|
|
||
| # PG-queue consumer prefork concurrency (UN-3606). >1 makes the consumer launcher | ||
| # fork N isolated consumer children — the PG analogue of Celery's prefork | ||
| # --concurrency, so file batches run in parallel. SKIP LOCKED distributes work |
There was a problem hiding this comment.
[LOW — Docs] The two new operator knobs are undocumented. This PR adds WORKER_PG_QUEUE_CONNECT_RETRIES and WORKER_PG_QUEUE_CONNECT_BACKOFF (read in connection.py:_connect_env) but documents neither here, while sibling PG-queue knobs (e.g. WORKER_PG_REAPER_INTERVAL_SECONDS) are documented by convention. Add commented entries near the other WORKER_PG_* settings noting the defaults (3 attempts / 0.5s base), the doubling behaviour, and the 5.0s cap.
| assert len(calls) == 1 | ||
| assert sleeps == [] | ||
|
|
||
| def test_retries_transient_then_succeeds(self, monkeypatch): |
There was a problem hiding this comment.
[MEDIUM — Test coverage gaps] The new suite covers the core paths well but misses four behavioural branches of the retry logic:
- Backoff cap (
connection.py:100) — no test drives a sleep past 5.0s, so themin(…, _CONNECT_BACKOFF_CAP)clamp (a real production guarantee) has zero coverage. Add e.g.retries=5, backoff=4→ assertsleeps == [4.0, 5.0, 5.0, 5.0]. - Unset-env default path (
connection.py:47-49) —_patchalwayssetenvs both knobs, so the production default (env absent → 3 attempts / 0.5s) is never exercised. Add adelenvtest. - Invalid / negative
BACKOFF(connection.py:52-59,86) — only theRETRIEScast-failure is tested; addbackoff=xyz(→ default) andbackoff=-1(→[0.0, 0.0], retry-without-sleep). - Connection params forwarded (
connection.py:90-97) — every test discardsconnectkwargs; nothing assertsoptions=-c search_path=…(or host/db) is passed and stable across retries.
Also: test_retries_transient_then_succeeds's sleeps == [0.5, 1.0] can't distinguish exponential from linear growth — add one ≥3-sleep case (e.g. [1.0, 2.0, 4.0]) to pin the geometric contract.
- Flipt observability (UN-3618): check_feature_flag_status / _variant now log a genuine evaluation failure (warning + exc_info) instead of silently returning the disabled default — the now-sole gate's outages are visible at the decision layer. Resolver defense-in-depth guards kept. - Update stale shared executor_rpc docstring (Flipt is the sole gate). - Remove now-dead module logger in workers executor_rpc. - Connect retry (UN-3619): clamp+warn out-of-range RETRIES (cap 10) / negative BACKOFF; drop the unreachable AssertionError sentinel via an inverted loop; document the backoff cap + that permanent misconfigs are retried-then-raised; document both knobs in sample.env. - Tests: add backoff-cap, unset-default, invalid/negative-backoff, attempts-clamp, geometric-growth, and params-forwarded/stable coverage (13 connection tests). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Thanks for the thorough review — addressed in Fixed
Declined (with rationale)
ruff check + format clean; pre-commit clean. |
…-single-flag-gating
…bda (SonarCloud) Replace the (_ for _ in ()).throw(...) generator trick with a plain local def connect that raises — matches the other tests' style and clears the SonarCloud comprehension finding. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
d5b4a2f
into
feat/UN-3445-pg-queue-integration
|
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/connection.py | Adds bounded exponential-backoff retry on transient OperationalError; retry math, cap, clamping, and env-read defaults are all correct and unit-tested. |
| unstract/workflow-execution/src/unstract/workflow_execution/executor_rpc.py | Removes master_gate_enabled parameter from resolve_pg_transport; Flipt flag is now the sole gate. Fail-closed paths unchanged. |
| backend/workflow_manager/workflow_v2/transport.py | Drops PG_QUEUE_TRANSPORT_ENABLED master-gate check; documents the remaining outer try/except as explicit defense-in-depth since check_feature_flag_status already catches all exceptions internally. |
| backend/scheduler/ownership.py | Removes settings.PG_QUEUE_TRANSPORT_ENABLED gate check; Flipt flag is now the only gate. Fail-closed to Beat on all error paths preserved. |
| unstract/flags/src/unstract/flags/feature_flag.py | Adds WARNING + exc_info logging inside the exception handler for both check_feature_flag_status and check_feature_flag_variant, making Flipt outages visible rather than silently returning False. |
| workers/tests/test_pg_connection.py | New test file covering 13 retry-contract scenarios including first-try success, geometric backoff, sleep cap, clamping, invalid env fallback, and stable params across retries. |
| workers/queue_backend/pg_queue/executor_rpc.py | Removes the worker-side PG_QUEUE_TRANSPORT_ENABLED env gate and its fat-fingered-value warning; now delegates directly to resolve_pg_transport with no extra gate. |
| backend/pg_queue/executor_rpc.py | Drops settings.PG_QUEUE_TRANSPORT_ENABLED master-switch forwarding; resolve_executor_transport now calls resolve_pg_transport with no master_gate_enabled kwarg. |
| backend/backend/settings/base.py | Removes PG_QUEUE_TRANSPORT_ENABLED Django setting; clean deletion with no other references. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A([Execution / Schedule / Executor RPC]) --> B{organization_id present?}
B -- No --> C[Return: Celery\nlog WARNING]
B -- Yes --> D{FLIPT_SERVICE_AVAILABLE\n== 'true'?}
D -- No --> E[Return: Celery\nlog WARNING]
D -- Yes --> F[call check_feature_flag_status\npg_queue_enabled]
F --> G{Evaluation\nsucceeded?}
G -- No --> H[log WARNING\nexc_info=True\nreturn False]
H --> I[Return: Celery]
G -- Yes --> J{flag enabled\nfor this entity?}
J -- No --> K[Return: Celery]
J -- Yes --> L[Return: PG Queue]
style C fill:#f9c,stroke:#c00
style E fill:#f9c,stroke:#c00
style I fill:#f9c,stroke:#c00
style K fill:#f9c,stroke:#c00
style L fill:#9fc,stroke:#090
style H fill:#fc9,stroke:#a60
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
A([Execution / Schedule / Executor RPC]) --> B{organization_id present?}
B -- No --> C[Return: Celery\nlog WARNING]
B -- Yes --> D{FLIPT_SERVICE_AVAILABLE\n== 'true'?}
D -- No --> E[Return: Celery\nlog WARNING]
D -- Yes --> F[call check_feature_flag_status\npg_queue_enabled]
F --> G{Evaluation\nsucceeded?}
G -- No --> H[log WARNING\nexc_info=True\nreturn False]
H --> I[Return: Celery]
G -- Yes --> J{flag enabled\nfor this entity?}
J -- No --> K[Return: Celery]
J -- Yes --> L[Return: PG Queue]
style C fill:#f9c,stroke:#c00
style E fill:#f9c,stroke:#c00
style I fill:#f9c,stroke:#c00
style K fill:#f9c,stroke:#c00
style L fill:#9fc,stroke:#090
style H fill:#fc9,stroke:#a60
Reviews (1): Last reviewed commit: "UN-3619 [FIX] pg_benchmark connection te..." | Re-trigger Greptile
What & why
Two small, closely-related PG-queue changes, bundled into one draft PR (sub-tasks UN-3618 + UN-3619 under the Phase-9 story).
UN-3618 — single-flag gating (drop
PG_QUEUE_TRANSPORT_ENABLED)The
pg_queue_enabledFlipt flag is now the sole gate for the whole PG-queue feature. ThePG_QUEUE_TRANSPORT_ENABLEDenv master-switch was a per-process AND-condition checked before Flipt — but with Flipt per-env and fail-closed it only ever forced OFF (it never enabled PG on its own), so it added no safety the flag didn't already provide while risking config drift across the backend/worker processes. The deploy-ordering argument isn't PG-specific either (Celery has the same "no consumer → stuck queue" exposure). Removing it now, before any cloud helm/configmap wiring exists, means we never add it there.transport.py,scheduler/ownership.py, and bothpg_queue/executor_rpc.pyhalves.master_gate_enabledparameter from the sharedunstract.workflow_execution.executor_rpc.resolve_pg_transport.sample.enventries, a docker-compose comment, theflags.pymention.UN-3619 — bounded retry on transient Postgres connect
create_pg_connection(workers) retries a transientpsycopg2.OperationalError(DB restart, PgBouncer pool wait, brief network partition, "too many clients" spike) with bounded exponential backoff (WORKER_PG_QUEUE_CONNECT_RETRIESdefault 3,WORKER_PG_QUEUE_CONNECT_BACKOFFdefault 0.5s). Connecting is side-effect-free so retry is safe; the enqueue INSERT is intentionally not retried (an ambiguous commit-time failure could double-enqueue → double-dispatch).Testing
resolve_transportroutes to PG by the Flipt flag alone (5/5); fail-closed (Flipt blind → Celery) intact.ruff check+ruff formatclean; pre-commit clean.Risk
Zero-regression: behaviour is identical to before for every gate path, and rollback remains the single Flipt flag. The workers' executor-RPC half is best confirmed in the k8s validation (needs rebuilt worker images); unit tests cover its logic here.
🤖 Generated with Claude Code