Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 493df65

Browse filesBrowse files
authored
feat: implement AsyncMultiRangeDownloader with multiplexed bidi-gRPC stream support (#16528)
This PR implements **`AsyncMultiRangeDownloader`** with a new **`_StreamMultiplexer`**, enabling multiple concurrent range downloads to share a single bidirectional gRPC stream. ### Before vs. After | Feature | Before | After (This PR) | | :--- | :--- | :--- | | **Concurrency** | Sequential or multiple connections | Concurrent over **one** connection | | **Overhead** | High (multiple gRPC streams) | Low (multiplexed single stream) | | **Reliability** | Per-stream retry logic | Unified generation-gated reopening | ### How it works The system uses a background **`_StreamMultiplexer`** to manage the shared bidirectional stream: 1. **Requests**: Concurrent tasks send range requests (`BidiReadObjectRequest`) directly to the shared stream. 2. **Multiplexing**: A background **Recv Loop** listens for all responses. It uses the `read_id` in each response to route data to the correct task-specific `asyncio.Queue`. 3. **Error Handling**: If the stream breaks, a **generation-gated lock** ensures the stream is reopened only once. All active tasks receive a `_StreamError` and automatically retry using the new stream generation. --- **Key Changes:** - **`_StreamMultiplexer`**: Background receiver loop for routing responses. - **Generation-Gated Reopening**: Coordinates stream recovery across concurrent tasks. - **`AsyncMultiRangeDownloader` Integration**: Full support for concurrent `download_ranges` calls.
1 parent 33e4a6f commit 493df65
Copy full SHA for 493df65

5 files changed

+1,202-172Lines changed: 1202 additions & 172 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file
+207Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import asyncio
18+
import logging
19+
from typing import Awaitable, Callable, Dict, Optional, Set
20+
21+
import grpc
22+
23+
from google.cloud import _storage_v2
24+
from google.cloud.storage.asyncio.async_read_object_stream import (
25+
_AsyncReadObjectStream,
26+
)
27+
28+
logger = logging.getLogger(__name__)
29+
30+
_DEFAULT_QUEUE_MAX_SIZE = 100
31+
_DEFAULT_PUT_TIMEOUT_SECONDS = 20.0
32+
33+
34+
class _StreamError:
35+
"""Wraps an error with the stream generation that produced it."""
36+
37+
def __init__(self, exception: Exception, generation: int):
38+
self.exception = exception
39+
self.generation = generation
40+
41+
42+
class _StreamEnd:
43+
"""Signals the stream closed normally."""
44+
45+
pass
46+
47+
48+
class _StreamMultiplexer:
49+
"""Multiplexes concurrent download tasks over a single bidi-gRPC stream.
50+
51+
Routes responses from a background recv loop to per-task asyncio.Queues
52+
keyed by read_id. Coordinates stream reopening via generation-gated
53+
locking.
54+
55+
A slow consumer on one task will slow down the entire shared connection
56+
due to bounded queue backpressure propagating through gRPC flow control.
57+
"""
58+
59+
def __init__(
60+
self,
61+
stream: _AsyncReadObjectStream,
62+
queue_max_size: int = _DEFAULT_QUEUE_MAX_SIZE,
63+
):
64+
self._stream = stream
65+
self._stream_generation: int = 0
66+
self._queues: Dict[int, asyncio.Queue] = {}
67+
self._reopen_lock = asyncio.Lock()
68+
self._recv_task: Optional[asyncio.Task] = None
69+
self._queue_max_size = queue_max_size
70+
71+
@property
72+
def stream_generation(self) -> int:
73+
return self._stream_generation
74+
75+
def register(self, read_ids: Set[int]) -> asyncio.Queue:
76+
"""Register read_ids for a task and return its response queue."""
77+
queue = asyncio.Queue(maxsize=self._queue_max_size)
78+
for read_id in read_ids:
79+
self._queues[read_id] = queue
80+
return queue
81+
82+
def unregister(self, read_ids: Set[int]) -> None:
83+
"""Remove read_ids from routing."""
84+
for read_id in read_ids:
85+
self._queues.pop(read_id, None)
86+
87+
def _get_unique_queues(self) -> Set[asyncio.Queue]:
88+
return set(self._queues.values())
89+
90+
async def _put_with_timeout(self, queue: asyncio.Queue, item) -> None:
91+
try:
92+
await asyncio.wait_for(
93+
queue.put(item), timeout=_DEFAULT_PUT_TIMEOUT_SECONDS
94+
)
95+
except asyncio.TimeoutError:
96+
if queue not in self._get_unique_queues():
97+
logger.debug("Dropped item for unregistered queue.")
98+
else:
99+
logger.warning(
100+
"Queue full for too long. Dropping item to prevent multiplexer hang."
101+
)
102+
103+
def _ensure_recv_loop(self) -> None:
104+
if self._recv_task is None or self._recv_task.done():
105+
self._recv_task = asyncio.create_task(self._recv_loop())
106+
107+
def _stop_recv_loop(self) -> None:
108+
if self._recv_task and not self._recv_task.done():
109+
self._recv_task.cancel()
110+
111+
def _put_error_nowait(self, queue: asyncio.Queue, error: _StreamError) -> None:
112+
while True:
113+
try:
114+
queue.put_nowait(error)
115+
break
116+
except asyncio.QueueFull:
117+
try:
118+
queue.get_nowait()
119+
except asyncio.QueueEmpty:
120+
pass
121+
122+
async def _recv_loop(self) -> None:
123+
try:
124+
while True:
125+
response = await self._stream.recv()
126+
if response == grpc.aio.EOF:
127+
sentinel = _StreamEnd()
128+
await asyncio.gather(
129+
*(
130+
self._put_with_timeout(queue, sentinel)
131+
for queue in self._get_unique_queues()
132+
)
133+
)
134+
return
135+
136+
if response.object_data_ranges:
137+
queues_to_notify: Set[asyncio.Queue] = set()
138+
for data_range in response.object_data_ranges:
139+
read_id = data_range.read_range.read_id
140+
queue = self._queues.get(read_id)
141+
if queue:
142+
queues_to_notify.add(queue)
143+
else:
144+
logger.warning(
145+
f"Received data for unregistered read_id: {read_id}"
146+
)
147+
await asyncio.gather(
148+
*(
149+
self._put_with_timeout(queue, response)
150+
for queue in queues_to_notify
151+
)
152+
)
153+
else:
154+
await asyncio.gather(
155+
*(
156+
self._put_with_timeout(queue, response)
157+
for queue in self._get_unique_queues()
158+
)
159+
)
160+
except asyncio.CancelledError:
161+
raise
162+
except Exception as e:
163+
logger.warning(f"Stream multiplexer recv loop failed: {e}", exc_info=True)
164+
error = _StreamError(e, self._stream_generation)
165+
for queue in self._get_unique_queues():
166+
self._put_error_nowait(queue, error)
167+
168+
async def send(self, request: _storage_v2.BidiReadObjectRequest) -> int:
169+
self._ensure_recv_loop()
170+
await self._stream.send(request)
171+
return self._stream_generation
172+
173+
async def reopen_stream(
174+
self,
175+
broken_generation: int,
176+
stream_factory: Callable[[], Awaitable[_AsyncReadObjectStream]],
177+
) -> None:
178+
async with self._reopen_lock:
179+
if self._stream_generation != broken_generation:
180+
return
181+
self._stop_recv_loop()
182+
if self._recv_task:
183+
try:
184+
await self._recv_task
185+
except (asyncio.CancelledError, Exception):
186+
pass
187+
error = _StreamError(Exception("Stream reopening"), self._stream_generation)
188+
for queue in self._get_unique_queues():
189+
self._put_error_nowait(queue, error)
190+
try:
191+
await self._stream.close()
192+
except Exception:
193+
pass
194+
self._stream = await stream_factory()
195+
self._stream_generation += 1
196+
self._ensure_recv_loop()
197+
198+
async def close(self) -> None:
199+
self._stop_recv_loop()
200+
if self._recv_task:
201+
try:
202+
await self._recv_task
203+
except (asyncio.CancelledError, Exception):
204+
pass
205+
error = _StreamError(Exception("Multiplexer closed"), self._stream_generation)
206+
for queue in self._get_unique_queues():
207+
self._put_error_nowait(queue, error)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.