Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2ef6339

Browse filesBrowse files
authored
fix: receive eof while closing reads stream (#1733)
When `writer.close()` is called without setting finalize_on_close flag, we need to get two responses: 1) to get the persisted_size 2) eof response That's why added a check if the first response is not eof, then again receive the response from the stream.
1 parent c8dd7a0 commit 2ef6339
Copy full SHA for 2ef6339

File tree

Expand file treeCollapse file tree

2 files changed

+59
-2
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+59
-2
lines changed
Open diff view settings
Collapse file

‎google/cloud/storage/asyncio/async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/asyncio/async_write_object_stream.py
+11-2Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from typing import List, Optional, Tuple
16+
import grpc
1617
from google.cloud import _storage_v2
1718
from google.cloud.storage.asyncio import _utils
1819
from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient
@@ -181,9 +182,17 @@ async def close(self) -> None:
181182

182183
async def requests_done(self):
183184
"""Signals that all requests have been sent."""
184-
185185
await self.socket_like_rpc.send(None)
186-
_utils.update_write_handle_if_exists(self, await self.socket_like_rpc.recv())
186+
187+
# The server may send a final "EOF" response immediately, or it may
188+
# first send an intermediate response followed by the EOF response depending on whether the object was finalized or not.
189+
first_resp = await self.socket_like_rpc.recv()
190+
_utils.update_write_handle_if_exists(self, first_resp)
191+
192+
if first_resp != grpc.aio.EOF:
193+
self.persisted_size = first_resp.persisted_size
194+
second_resp = await self.socket_like_rpc.recv()
195+
assert second_resp == grpc.aio.EOF
187196

188197
async def send(
189198
self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest
Collapse file

‎tests/unit/asyncio/test_async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_write_object_stream.py
+48Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import unittest.mock as mock
1616
from unittest.mock import AsyncMock, MagicMock
1717
import pytest
18+
import grpc
19+
1820

1921
from google.cloud.storage.asyncio.async_write_object_stream import (
2022
_AsyncWriteObjectStream,
@@ -194,11 +196,57 @@ async def test_close_success(self, mock_client):
194196
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
195197
stream._is_stream_open = True
196198
stream.socket_like_rpc = AsyncMock()
199+
200+
stream.socket_like_rpc.send = AsyncMock()
201+
first_resp = _storage_v2.BidiWriteObjectResponse(persisted_size=100)
202+
stream.socket_like_rpc.recv = AsyncMock(side_effect=[first_resp, grpc.aio.EOF])
197203
stream.socket_like_rpc.close = AsyncMock()
198204

199205
await stream.close()
200206
stream.socket_like_rpc.close.assert_awaited_once()
201207
assert not stream.is_stream_open
208+
assert stream.persisted_size == 100
209+
210+
@pytest.mark.asyncio
211+
async def test_close_with_persisted_size_then_eof(self, mock_client):
212+
"""Test close when first recv has persisted_size, second is EOF."""
213+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
214+
stream._is_stream_open = True
215+
stream.socket_like_rpc = AsyncMock()
216+
217+
# First response has persisted_size (NOT EOF, intermediate)
218+
persisted_resp = _storage_v2.BidiWriteObjectResponse(persisted_size=500)
219+
# Second response is EOF (None)
220+
eof_resp = grpc.aio.EOF
221+
222+
stream.socket_like_rpc.send = AsyncMock()
223+
stream.socket_like_rpc.recv = AsyncMock(side_effect=[persisted_resp, eof_resp])
224+
stream.socket_like_rpc.close = AsyncMock()
225+
226+
await stream.close()
227+
228+
# Verify two recv calls: first has persisted_size (NOT EOF), so read second (EOF)
229+
assert stream.socket_like_rpc.recv.await_count == 2
230+
assert stream.persisted_size == 500
231+
assert not stream.is_stream_open
232+
233+
@pytest.mark.asyncio
234+
async def test_close_with_grpc_aio_eof_response(self, mock_client):
235+
"""Test close when first recv is grpc.aio.EOF sentinel."""
236+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
237+
stream._is_stream_open = True
238+
stream.socket_like_rpc = AsyncMock()
239+
240+
# First recv returns grpc.aio.EOF (explicit sentinel from finalize)
241+
stream.socket_like_rpc.send = AsyncMock()
242+
stream.socket_like_rpc.recv = AsyncMock(return_value=grpc.aio.EOF)
243+
stream.socket_like_rpc.close = AsyncMock()
244+
245+
await stream.close()
246+
247+
# Verify only one recv call (grpc.aio.EOF=EOF, so don't read second)
248+
assert stream.socket_like_rpc.recv.await_count == 1
249+
assert not stream.is_stream_open
202250

203251
@pytest.mark.asyncio
204252
async def test_methods_require_open_raises(self, mock_client):

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.