UN-3606 [FIX] PG Queue — prefork the consumer so file batches + executor RPCs run in parallel#2096
UN-3606 [FIX] PG Queue — prefork the consumer so file batches + executor RPCs run in parallel#2096muhammad-ali-e merged 4 commits intofeat/UN-3445-pg-queue-integrationZipstack/unstract:feat/UN-3445-pg-queue-integrationfrom feat/UN-3606-pg-consumer-prefork-concurrencyZipstack/unstract:feat/UN-3606-pg-consumer-prefork-concurrencyCopy head branch name to clipboard
Conversation
…tor RPCs run in parallel The PG file-processing path ran files SERIALLY: worker-pg-fileproc was a single batch=1 consumer, so multi-file ETL batches drained one at a time (Celery runs them in parallel via prefork --concurrency). Same single-consumer bottleneck on worker-pg-executor serialized the executor RPCs (the LLM extraction), so even parallel file fan-out gained nothing end-to-end. Fix: a prefork supervisor for the PG-queue consumer launcher — the PG analogue of Celery --pool=prefork --concurrency=N. - supervisor.py (new): when WORKER_PG_QUEUE_CONSUMER_CONCURRENCY > 1, fork N isolated consumer children (each does its own worker bootstrap, so no connections are inherited across the fork), monitor + re-fork dead children, and own a single fleet-liveness endpoint (503 when the oldest child stalls past the threshold). SKIP LOCKED distributes batches across children AND replicas; a single execution is still capped by MAX_PARALLEL_FILE_BATCHES; total live parallelism = concurrency x replicas (k8s HPA scales replicas). - The consumer code is UNCHANGED — each child is the current single-threaded PgQueueConsumer.run(); concurrency is purely a launch concern. CONCURRENCY=1 (default) keeps the byte-identical single-process path, so every other PG consumer is non-regressive. - consumer.py: extract build_consumer_from_env()/consumer_env() so the children and main() build identical consumers. - docker-compose: CONCURRENCY=4 on worker-pg-fileproc AND worker-pg-executor (mirrors WORKER_FILE_PROCESSING_CONCURRENCY). Also raise their VT_SECONDS to 3660 — process_file_batch/execute_extraction block on the executor up to 3600s, so the prior vt=30 default would re-claim a long batch mid-run (latent double-run); health-stale sits just above vt. - Tests: env-knob parse/clamp/validation, fleet-staleness calc, reap/restart matrix (dead -> re-fork, live -> left, shutdown -> not resurrected). Process model matches Celery prefork (the cloud-trusted choice): full process isolation, a crash is contained + re-forked, its in-flight message redelivers via vt (at-least-once). Dev-tested end-to-end: 4 fileproc + 4 executor children, a 2-file ETL runs fully parallel (both files + both executor RPCs overlap on separate children; ~25s vs ~42s serial), child crash auto-restarts, health green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR Review — UN-3606 PG-consumer prefork concurrency
Reviewed with the PR Review Toolkit (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier). This is a carefully-written, well-commented PR — the heartbeat clock math (time.time() - seconds_since_last_poll()) is correct, the lockless multiprocessing.Array and os._exit fork-bomb guard are sound, and the docker-compose vt < health-stale ordering is consistent.
The findings below are inline. Priority summary:
🔴 Critical — crash-looping child stays fake-fresh to the probe (supervisor.py:145), so a fully-wedged fleet can report HTTP 200.
🟠 High — unhandled os.fork() OSError can take down the whole fleet (supervisor.py:146); the per-child heartbeat thread can die silently and false-stale a healthy child (supervisor.py:115); the rate-limit backstop branch is entirely untested (test file:77).
🟡 Medium — re-fork after the liveness thread starts is a fork+threads hazard (supervisor.py:189); the rate-limit sleep serializes recovery and skips a stopping recheck (supervisor.py:222); liveness bind failure silently runs probe-less (supervisor.py:293); three parallel slot-keyed structures lack an owning type (supervisor.py:140).
🟢 Low — type annotations, two comment inaccuracies, and two safe simplifications.
None are merge-blockers on their own, but I'd address the Critical + two High items before this ships to the fleet.
|
| Filename | Overview |
|---|---|
| workers/pg_queue_consumer/supervisor.py | New prefork supervisor — signal handling, crash-loop detection and heartbeat logic look sound; sequential _join_children can total N×grace_seconds on worst-case shutdown, which exceeds k8s default terminationGracePeriodSeconds |
| workers/queue_backend/pg_queue/consumer.py | Clean refactor: _env() promoted to module-level consumer_env() and build_consumer_from_env() extracted; functional behaviour is unchanged |
| workers/pg_queue_consumer/main.py | Entry-point now branches on concurrency_from_env(); CONCURRENCY=1 path is byte-identical to pre-PR; concurrency>1 delegates to run_supervised() |
| workers/pg_queue_consumer/_bootstrap.py | Simple extraction of select_source_worker_type() so both single-process and supervisor children share the same WORKER_TYPE mutation before import worker |
| workers/tests/test_pg_consumer_supervisor.py | Good fork-free test suite covering env parsing, Fleet bookkeeping, reap/restart matrix, signal reset, and health; uses monkeypatching throughout to stay deterministic |
| docker/docker-compose.yaml | Adds CONCURRENCY=4 and bumps VT_SECONDS/HEALTH_STALE_SECONDS to 3660/3720 on both worker-pg-fileproc and worker-pg-executor; env-var-defaulted so operationally overridable |
| workers/sample.env | Documents new WORKER_PG_QUEUE_CONSUMER_CONCURRENCY knob with a commented-out default |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant E as Entrypoint
participant S as Supervisor
participant C0 as Child-0
participant C1 as Child-1
participant HB as Heartbeat thread
participant LS as LivenessServer
participant K8S as k8s probe
E->>S: "concurrency_from_env() > 1"
S->>S: install _on_term SIGTERM/SIGINT
S->>C0: "os.fork() slot=0"
S->>C1: "os.fork() slot=1"
C0->>C0: reset SIGTERM/SIGINT to SIG_DFL
C0->>C0: select_source_worker_type() + import worker
C0->>HB: start heartbeat daemon thread
C0->>C0: consumer.run() installs own SIGTERM
HB-->>S: "heartbeats[slot] = time.time() - seconds_since_last_poll"
S->>LS: _maybe_start_supervisor_health(fleet)
K8S->>LS: GET /health
LS-->>K8S: "200 fleet.freshness() < stale_after"
Note over S: monitor loop _reap_dead + _restart_due_children
C0--xS: child exits
S->>S: fleet.reap + schedule_restart
S->>C0: re-fork after backoff
Note over S: SIGTERM received
S->>C0: os.kill SIGTERM
S->>C1: os.kill SIGTERM
S->>S: _join_children per-child 30s grace then SIGKILL
S->>LS: health.stop()
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant E as Entrypoint
participant S as Supervisor
participant C0 as Child-0
participant C1 as Child-1
participant HB as Heartbeat thread
participant LS as LivenessServer
participant K8S as k8s probe
E->>S: "concurrency_from_env() > 1"
S->>S: install _on_term SIGTERM/SIGINT
S->>C0: "os.fork() slot=0"
S->>C1: "os.fork() slot=1"
C0->>C0: reset SIGTERM/SIGINT to SIG_DFL
C0->>C0: select_source_worker_type() + import worker
C0->>HB: start heartbeat daemon thread
C0->>C0: consumer.run() installs own SIGTERM
HB-->>S: "heartbeats[slot] = time.time() - seconds_since_last_poll"
S->>LS: _maybe_start_supervisor_health(fleet)
K8S->>LS: GET /health
LS-->>K8S: "200 fleet.freshness() < stale_after"
Note over S: monitor loop _reap_dead + _restart_due_children
C0--xS: child exits
S->>S: fleet.reap + schedule_restart
S->>C0: re-fork after backoff
Note over S: SIGTERM received
S->>C0: os.kill SIGTERM
S->>C1: os.kill SIGTERM
S->>S: _join_children per-child 30s grace then SIGKILL
S->>LS: health.stop()
Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
workers/pg_queue_consumer/supervisor.py:513-530
**Sequential join means worst-case shutdown = N × grace_seconds**
`_join_children` waits for each child sequentially. With `concurrency=4` and `_SHUTDOWN_GRACE_SECONDS=30`, the worst case is 4 × 30 = 120 s. Kubernetes' default `terminationGracePeriodSeconds` is 30 s, so k8s would SIGKILL the supervisor after 30 s while children are still being polled — effectively reducing later children's grace from 30 s to 0 s.
Since all children receive SIGTERM at the same moment (via `_signal_children` before this loop), they're draining concurrently, so the sequential polling is mostly harmless for well-behaved children. But for containers running `concurrency=4`, `terminationGracePeriodSeconds` in the k8s manifest should be at least `N × _SHUTDOWN_GRACE_SECONDS + a_few_seconds` (≥ 120 s) rather than the default 30 s, or `_SHUTDOWN_GRACE_SECONDS` should be exposed as an env var (`WORKER_PG_QUEUE_CONSUMER_SHUTDOWN_GRACE_SECONDS`) so operators can tune it per-deployment without a code change.
### Issue 2 of 2
workers/pg_queue_consumer/supervisor.py:533-545
**Docstring promises `liveness_probe_bound: false` but the code never emits it**
The docstring says "liveness_probe_bound: false is surfaced in the status payload so the degradation is observable", but when `server.start()` raises `OSError` the function returns `None` — there is no server, so the `_extra_status` closure (which hardcodes `True`) is never reached. The bind failure is logged, but a scraper expecting a JSON body would find no response at all, not a `{"liveness_probe_bound": false}` document. The docstring should either remove that claim or describe the actual observable signal (log line at ERROR/WARNING level).
Reviews (3): Last reviewed commit: "UN-3606 [FIX] Address greptile — snapsho..." | Re-trigger Greptile
…dening Critical/High: - Crash-loop no longer hides a wedged fleet from the probe: re-fork does NOT reseed the heartbeat (a never-polling slot ages), and a slot that dies immediately N times in a row forces freshness=inf → 503 (k8s restarts the pod). - os.fork() wrapped: initial-fleet failure fails fast with an actionable message; reap-path failure logs + leaves the slot for the next tick (no uncaught crash / no SIGTERM storm against healthy children). - Heartbeat publish loop guarded (try/except + log) so a transient error can't silently kill the thread and false-stale a healthy child. - Children reset SIGTERM/SIGINT to SIG_DFL immediately after fork — a signal in the fork→run window no longer fires the parent's _on_term against stale pids. Medium: - _Fleet class owns pid/last_fork/heartbeat/crash-count/restart-schedule with a validated slot and consistent reap (replaces three loose slot-keyed dicts). - Re-fork backoff is a scheduled not-before (non-blocking), re-checking stopping before each fork — no in-loop sleep serialising recovery, no child spawned into shutdown. - Liveness bind: EADDRINUSE logged at error (vs transient), and liveness_probe_bound surfaced in the status payload. - _join_children: per-child grace budget (one slow child can't starve the rest); _wait_for_exit helper extracted. Low: - real Callable annotations (drop noqas); HEALTH_PORT typed int|None; comment fixes (heartbeat writer reasoning, "JSON body differs" for the fleet probe); WORKER_TYPE selection deduped into pg_queue_consumer/_bootstrap.py. Tests rewritten + expanded: _Fleet bookkeeping/crash-loop/freshness, reap scheduling, restart-due gating, fork OSError + child hard-exit guard, _wait_for_exit + _join_children SIGKILL escalation, health no-port/bind-error. 32 tests. Re-validated live: 4 children, fleet health (crash_looping/liveness_probe_bound fields), crash → re-fork. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- test: avoid float equality (math.isinf for the crash-loop freshness; drop the 0.0 literal) — clears the only New-Code reliability bug (Rating C → A). - supervisor: clean the `# noqa: ANN201` suppression syntax (explanation moved to the docstring); narrow the child-failure catch BaseException → Exception (the realistic startup-failure surface; SystemExit/KeyboardInterrupt would exit anyway); logger.error(exc_info=True) → logger.exception() on the os.fork() guard. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review addressed —
|
is_crash_looping() runs in the LivenessServer daemon thread (via freshness()) while the main thread mutates _consecutive_crashes in schedule_restart() — a bare .values() iteration could raise "dictionary changed size during iteration". Snapshot with tuple(...) (atomic under the GIL) before iterating. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Addressed the 4/5 finding in 5dce34e: |
ab80a49
into
feat/UN-3445-pg-queue-integration
|
Problem
On the PG transport, a multi-file ETL processed files serially (the ramp blocker).
worker-pg-fileprocwas a singlebatch=1consumer, so the orchestrator's parallel batches drained one at a time — Celery runs them concurrently via--pool=prefork --concurrency. The same single-consumer bottleneck onworker-pg-executorserialized theexecute_extractionRPCs (the LLM extraction), so even parallel file fan-out gained nothing end-to-end.Evidence (2-file ETL): flag OFF (Celery) → both file executions created at the same instant; flag ON (PG, before this fix) → file-2 created only after file-1 completed.
Fix — a prefork supervisor for the PG-queue consumer launcher
The PG analogue of Celery
--pool=prefork --concurrency=N.supervisor.py(new): whenWORKER_PG_QUEUE_CONSUMER_CONCURRENCY > 1, fork N isolated consumer children (each does its own worker bootstrap, so no connections are inherited across the fork), monitor + re-fork dead children, and own a single fleet-liveness endpoint (503 when the oldest child stalls past the threshold).SELECT … FOR UPDATE SKIP LOCKEDdistributes batches across children and replicas; a single execution is still capped byMAX_PARALLEL_FILE_BATCHES; total live parallelism =concurrency × replicas(k8s HPA scales replicas).PgQueueConsumer.run(); concurrency is purely a launch concern.CONCURRENCY=1(default) keeps the byte-identical single-process path → every other PG consumer is non-regressive.consumer.py: extractbuild_consumer_from_env()/consumer_env()so children andmain()build identical consumers.docker-compose:CONCURRENCY=4on bothworker-pg-fileprocandworker-pg-executor(mirrorsWORKER_FILE_PROCESSING_CONCURRENCY). Also raise theirVT_SECONDSto 3660 —process_file_batch/execute_extractionblock on the executor up to 3600s, so the priorvt=30default would re-claim a long batch mid-run (latent double-run); health-stale sits just abovevt.Why processes (not threads)
Matches Celery prefork — the cloud-trusted model. Full process isolation (own DB connections + thread-local
StateStore, no shared-state/thread-safety surface), a crash is contained and re-forked, and its in-flight message redelivers viavt(at-least-once). The workload is I/O-bound so the GIL is moot, but cloud runs prefork specifically to avoid thread issues — so the PG consumer does too.Tests
workers/tests/test_pg_consumer_supervisor.py— env-knob parse/clamp/validation, fleet-staleness calc, and the reap/restart matrix (dead → re-fork, live → left alone, shutdown → not resurrected, already-reaped → handled). The fork/signal/health-port integration is validated live (below) rather than with flaky in-process fork tests.Dev-tested end-to-end (gated, running stack)
worker-pg-fileproc+ 4worker-pg-executorchildren fork; fleet health green; killing a child → supervisor re-forks it.execute_extractionRPCs overlap on separate executor children — ~25s vs ~42s when the executor was serial.Out of scope
Continuation/callback path and Celery decommission are later slices; the shared executor-RPC de-dup is UN-3607.
🤖 Generated with Claude Code