Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 6c16079

Browse filesBrowse files
authored
fix(experimental): implement requests_done method to signal end of requests in async streams. Gracefully close streams. (#1700)
fix(experimental): implement requests_done method to signal end of requests in async streams. Gracefully close streams.
1 parent 16cf512 commit 6c16079
Copy full SHA for 6c16079

File tree

Expand file treeCollapse file tree

4 files changed

+64
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

4 files changed

+64
-0
lines changed
Open diff view settings
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_read_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_read_object_stream.py
+7Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,16 @@ async def close(self) -> None:
151151
"""Closes the bidi-gRPC connection."""
152152
if not self._is_stream_open:
153153
raise ValueError("Stream is not open")
154+
await self.requests_done()
154155
await self.socket_like_rpc.close()
155156
self._is_stream_open = False
156157

158+
async def requests_done(self):
159+
"""Signals that all requests have been sent."""
160+
161+
await self.socket_like_rpc.send(None)
162+
await self.socket_like_rpc.recv()
163+
157164
async def send(
158165
self, bidi_read_object_request: _storage_v2.BidiReadObjectRequest
159166
) -> None:
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_write_object_stream.py
+8Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,16 @@ async def close(self) -> None:
152152
"""Closes the bidi-gRPC connection."""
153153
if not self._is_stream_open:
154154
raise ValueError("Stream is not open")
155+
await self.requests_done()
155156
await self.socket_like_rpc.close()
156157
self._is_stream_open = False
157158

159+
async def requests_done(self):
160+
"""Signals that all requests have been sent."""
161+
162+
await self.socket_like_rpc.send(None)
163+
await self.socket_like_rpc.recv()
164+
158165
async def send(
159166
self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest
160167
) -> None:
@@ -186,3 +193,4 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
186193
@property
187194
def is_stream_open(self) -> bool:
188195
return self._is_stream_open
196+
Collapse file

‎tests/unit/asyncio/test_async_read_object_stream.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_read_object_stream.py
+26Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,15 +197,41 @@ async def test_close(mock_client, mock_cls_async_bidi_rpc):
197197
read_obj_stream = await instantiate_read_obj_stream(
198198
mock_client, mock_cls_async_bidi_rpc, open=True
199199
)
200+
read_obj_stream.requests_done = AsyncMock()
200201

201202
# act
202203
await read_obj_stream.close()
203204

204205
# assert
206+
read_obj_stream.requests_done.assert_called_once()
205207
read_obj_stream.socket_like_rpc.close.assert_called_once()
206208
assert not read_obj_stream.is_stream_open
207209

208210

211+
@mock.patch(
212+
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
213+
)
214+
@mock.patch(
215+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
216+
)
217+
@pytest.mark.asyncio
218+
async def test_requests_done(mock_client, mock_cls_async_bidi_rpc):
219+
"""Test that requests_done signals the end of requests."""
220+
# Arrange
221+
read_obj_stream = await instantiate_read_obj_stream(
222+
mock_client, mock_cls_async_bidi_rpc, open=True
223+
)
224+
read_obj_stream.socket_like_rpc.send = AsyncMock()
225+
read_obj_stream.socket_like_rpc.recv = AsyncMock()
226+
227+
# Act
228+
await read_obj_stream.requests_done()
229+
230+
# Assert
231+
read_obj_stream.socket_like_rpc.send.assert_called_once_with(None)
232+
read_obj_stream.socket_like_rpc.recv.assert_called_once()
233+
234+
209235
@mock.patch(
210236
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
211237
)
Collapse file

‎tests/unit/asyncio/test_async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_write_object_stream.py
+23Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,11 +289,13 @@ async def test_close(mock_cls_async_bidi_rpc, mock_client):
289289
write_obj_stream = await instantiate_write_obj_stream(
290290
mock_client, mock_cls_async_bidi_rpc, open=True
291291
)
292+
write_obj_stream.requests_done = AsyncMock()
292293

293294
# Act
294295
await write_obj_stream.close()
295296

296297
# Assert
298+
write_obj_stream.requests_done.assert_called_once()
297299
write_obj_stream.socket_like_rpc.close.assert_called_once()
298300
assert not write_obj_stream.is_stream_open
299301

@@ -394,3 +396,24 @@ async def test_recv_without_open_should_raise_error(
394396
# Act & Assert
395397
with pytest.raises(ValueError, match="Stream is not open"):
396398
await write_obj_stream.recv()
399+
400+
401+
@pytest.mark.asyncio
402+
@mock.patch(
403+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
404+
)
405+
async def test_requests_done(mock_cls_async_bidi_rpc, mock_client):
406+
"""Test that requests_done signals the end of requests."""
407+
# Arrange
408+
write_obj_stream = await instantiate_write_obj_stream(
409+
mock_client, mock_cls_async_bidi_rpc, open=True
410+
)
411+
write_obj_stream.socket_like_rpc.send = AsyncMock()
412+
write_obj_stream.socket_like_rpc.recv = AsyncMock()
413+
414+
# Act
415+
await write_obj_stream.requests_done()
416+
417+
# Assert
418+
write_obj_stream.socket_like_rpc.send.assert_called_once_with(None)
419+
write_obj_stream.socket_like_rpc.recv.assert_called_once()

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.