Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 7073be1

Browse filesBrowse files
authored
perf(storage): implement fast-path for queue delivery in _StreamMultiplexer (#16718)
## Summary This PR optimizes the message delivery logic in `_StreamMultiplexer` to reduce latency and event loop overhead. ### Performance Improvements: 1. **Fast-path Delivery**: Implemented a "fast-path" that attempts `queue.put_nowait(item)` for all target queues. For queues with available capacity, this is a synchronous operation that avoids: - Creating and scheduling a coroutine. - Yielding to the event loop. - Overhead associated with `asyncio.wait_for`. 2. **Single-Queue Slow-path Optimization**: In cases where exactly one queue is full, the multiplexer now directly awaits the `_put_with_timeout` coroutine. This bypasses the overhead of `asyncio.gather`, which is now only used when multiple queues are full simultaneously. 3. **Reduced Event Loop Pressure**: By minimizing the number of tasks created and yields performed during high-throughput streaming, these changes help the multiplexer keep up with fast-arriving gRPC responses.
1 parent c6461a4 commit 7073be1
Copy full SHA for 7073be1

1 file changed

+34-19Lines changed: 34 additions & 19 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py
+34-19Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ def _get_unique_queues(self) -> Set[asyncio.Queue]:
8888
return set(self._queues.values())
8989

9090
async def _put_with_timeout(self, queue: asyncio.Queue, item) -> None:
91+
"""Slow-path put: wait up to _DEFAULT_PUT_TIMEOUT_SECONDS, else drop.
92+
93+
Callers should attempt ``queue.put_nowait(item)`` first and only call
94+
this when it raises :class:`asyncio.QueueFull`.
95+
"""
9196
try:
9297
await asyncio.wait_for(
9398
queue.put(item), timeout=_DEFAULT_PUT_TIMEOUT_SECONDS
@@ -100,6 +105,32 @@ async def _put_with_timeout(self, queue: asyncio.Queue, item) -> None:
100105
"Queue full for too long. Dropping item to prevent multiplexer hang."
101106
)
102107

108+
async def _put_to_queues(self, queues, item) -> None:
109+
"""Deliver ``item`` to each queue.
110+
111+
Fast path: ``put_nowait`` for queues with capacity (no Task, no
112+
timer handle, no coroutine yield). Slow path: ``_put_with_timeout``
113+
only for queues that were full, and a single direct ``await`` when
114+
exactly one queue needs the slow path (skips ``asyncio.gather``).
115+
"""
116+
slow_queues = None
117+
for q in queues:
118+
try:
119+
q.put_nowait(item)
120+
except asyncio.QueueFull:
121+
if slow_queues is None:
122+
slow_queues = [q]
123+
else:
124+
slow_queues.append(q)
125+
if slow_queues is None:
126+
return
127+
if len(slow_queues) == 1:
128+
await self._put_with_timeout(slow_queues[0], item)
129+
else:
130+
await asyncio.gather(
131+
*(self._put_with_timeout(q, item) for q in slow_queues)
132+
)
133+
103134
def _ensure_recv_loop(self) -> None:
104135
if self._recv_task is None or self._recv_task.done():
105136
self._recv_task = asyncio.create_task(self._recv_loop())
@@ -124,13 +155,7 @@ async def _recv_loop(self) -> None:
124155
while True:
125156
response = await self._stream.recv()
126157
if response == grpc.aio.EOF:
127-
sentinel = _StreamEnd()
128-
await asyncio.gather(
129-
*(
130-
self._put_with_timeout(queue, sentinel)
131-
for queue in self._get_unique_queues()
132-
)
133-
)
158+
await self._put_to_queues(self._get_unique_queues(), _StreamEnd())
134159
return
135160

136161
if response.object_data_ranges:
@@ -144,19 +169,9 @@ async def _recv_loop(self) -> None:
144169
logger.warning(
145170
f"Received data for unregistered read_id: {read_id}"
146171
)
147-
await asyncio.gather(
148-
*(
149-
self._put_with_timeout(queue, response)
150-
for queue in queues_to_notify
151-
)
152-
)
172+
await self._put_to_queues(queues_to_notify, response)
153173
else:
154-
await asyncio.gather(
155-
*(
156-
self._put_with_timeout(queue, response)
157-
for queue in self._get_unique_queues()
158-
)
159-
)
174+
await self._put_to_queues(self._get_unique_queues(), response)
160175
except asyncio.CancelledError:
161176
raise
162177
except Exception as e:

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.