Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 6dc711d

Browse filesBrowse files
authored
fix: add system test for opening with read_handle (#1672)
fix: add system test for opening with read_handle
1 parent 0e2961b commit 6dc711d
Copy full SHA for 6dc711d

File tree

Expand file treeCollapse file tree

4 files changed

+120
-22
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

4 files changed

+120
-22
lines changed
Open diff view settings
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py
+8-9Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,13 @@ async def open(self) -> None:
193193
"""
194194
if self._is_stream_open:
195195
raise ValueError("Underlying bidi-gRPC stream is already open")
196-
197-
if self.read_obj_str is None:
198-
self.read_obj_str = _AsyncReadObjectStream(
199-
client=self.client,
200-
bucket_name=self.bucket_name,
201-
object_name=self.object_name,
202-
generation_number=self.generation_number,
203-
read_handle=self.read_handle,
204-
)
196+
self.read_obj_str = _AsyncReadObjectStream(
197+
client=self.client,
198+
bucket_name=self.bucket_name,
199+
object_name=self.object_name,
200+
generation_number=self.generation_number,
201+
read_handle=self.read_handle,
202+
)
205203
await self.read_obj_str.open()
206204
self._is_stream_open = True
207205
if self.generation_number is None:
@@ -342,6 +340,7 @@ async def close(self):
342340
if not self._is_stream_open:
343341
raise ValueError("Underlying bidi-gRPC stream is not open")
344342
await self.read_obj_str.close()
343+
self.read_obj_str = None
345344
self._is_stream_open = False
346345

347346
@property
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_read_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_read_object_stream.py
+8-6Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,6 @@ def __init__(
8484
self.rpc = self.client._client._transport._wrapped_methods[
8585
self.client._client._transport.bidi_read_object
8686
]
87-
self.first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
88-
read_object_spec=_storage_v2.BidiReadObjectSpec(
89-
bucket=self._full_bucket_name, object=object_name
90-
),
91-
)
9287
self.metadata = (("x-goog-request-params", f"bucket={self._full_bucket_name}"),)
9388
self.socket_like_rpc: Optional[AsyncBidiRpc] = None
9489
self._is_stream_open: bool = False
@@ -102,14 +97,21 @@ async def open(self) -> None:
10297
"""
10398
if self._is_stream_open:
10499
raise ValueError("Stream is already open")
100+
self.first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
101+
read_object_spec=_storage_v2.BidiReadObjectSpec(
102+
bucket=self._full_bucket_name,
103+
object=self.object_name,
104+
read_handle=self.read_handle,
105+
),
106+
)
105107
self.socket_like_rpc = AsyncBidiRpc(
106108
self.rpc, initial_request=self.first_bidi_read_req, metadata=self.metadata
107109
)
108110
await self.socket_like_rpc.open() # this is actually 1 send
109111
response = await self.socket_like_rpc.recv()
110112
# populated only in the first response of bidi-stream and when opened
111113
# without using `read_handle`
112-
if response.metadata:
114+
if hasattr(response, "metadata") and response.metadata:
113115
if self.generation_number is None:
114116
self.generation_number = response.metadata.generation
115117
# update persisted size
Collapse file

‎tests/system/test_zonal.py‎

Copy file name to clipboardExpand all lines: tests/system/test_zonal.py
+53Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# py standard imports
2+
import asyncio
23
import os
34
import uuid
45
from io import BytesIO
@@ -27,6 +28,36 @@
2728
_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"
2829

2930

31+
async def write_one_appendable_object(
32+
bucket_name: str,
33+
object_name: str,
34+
data: bytes,
35+
) -> None:
36+
"""Helper to write an appendable object."""
37+
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
38+
writer = AsyncAppendableObjectWriter(grpc_client, bucket_name, object_name)
39+
await writer.open()
40+
await writer.append(data)
41+
await writer.close()
42+
43+
44+
@pytest.fixture(scope="function")
45+
def appendable_object(storage_client, blobs_to_delete):
46+
"""Fixture to create and cleanup an appendable object."""
47+
object_name = f"appendable_obj_for_mrd-{str(uuid.uuid4())[:4]}"
48+
asyncio.run(
49+
write_one_appendable_object(
50+
_ZONAL_BUCKET,
51+
object_name,
52+
_BYTES_TO_UPLOAD,
53+
)
54+
)
55+
yield object_name
56+
57+
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
58+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
59+
60+
3061
@pytest.mark.asyncio
3162
@pytest.mark.parametrize(
3263
"attempt_direct_path",
@@ -85,3 +116,25 @@ async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delet
85116

86117
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
87118
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
119+
120+
121+
@pytest.mark.asyncio
122+
async def test_mrd_open_with_read_handle(appendable_object):
123+
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
124+
125+
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, appendable_object)
126+
await mrd.open()
127+
read_handle = mrd.read_handle
128+
await mrd.close()
129+
130+
# Open a new MRD using the `read_handle` obtained above
131+
new_mrd = AsyncMultiRangeDownloader(
132+
grpc_client, _ZONAL_BUCKET, appendable_object, read_handle=read_handle
133+
)
134+
await new_mrd.open()
135+
# persisted_size not set when opened with read_handle
136+
assert new_mrd.persisted_size is None
137+
buffer = BytesIO()
138+
await new_mrd.download_ranges([(0, 0, buffer)])
139+
await new_mrd.close()
140+
assert buffer.getvalue() == _BYTES_TO_UPLOAD
Collapse file

‎tests/unit/asyncio/test_async_read_object_stream.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_read_object_stream.py
+51-7Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
_TEST_GENERATION_NUMBER = 12345
2828
_TEST_OBJECT_SIZE = 1024 * 1024 # 1 MiB
2929
_TEST_READ_HANDLE = b"test-read-handle"
30+
_TEST_READ_HANDLE_NEW = b"test-read-handle-new"
3031

3132

3233
async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open=True):
@@ -54,6 +55,30 @@ async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open
5455
return read_obj_stream
5556

5657

58+
async def instantiate_read_obj_stream_with_read_handle(
59+
mock_client, mock_cls_async_bidi_rpc, open=True
60+
):
61+
"""Helper to create an instance of _AsyncReadObjectStream and open it by default."""
62+
socket_like_rpc = AsyncMock()
63+
mock_cls_async_bidi_rpc.return_value = socket_like_rpc
64+
socket_like_rpc.open = AsyncMock()
65+
66+
recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse)
67+
recv_response.read_handle = _TEST_READ_HANDLE_NEW
68+
socket_like_rpc.recv = AsyncMock(return_value=recv_response)
69+
70+
read_obj_stream = _AsyncReadObjectStream(
71+
client=mock_client,
72+
bucket_name=_TEST_BUCKET_NAME,
73+
object_name=_TEST_OBJECT_NAME,
74+
)
75+
76+
if open:
77+
await read_obj_stream.open()
78+
79+
return read_obj_stream
80+
81+
5782
@mock.patch(
5883
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
5984
)
@@ -67,12 +92,6 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
6792
mock_client._client._transport._wrapped_methods = {
6893
"bidi_read_object_rpc": rpc_sentinel,
6994
}
70-
full_bucket_name = f"projects/_/buckets/{_TEST_BUCKET_NAME}"
71-
first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
72-
read_object_spec=_storage_v2.BidiReadObjectSpec(
73-
bucket=full_bucket_name, object=_TEST_OBJECT_NAME
74-
),
75-
)
7695

7796
# Act
7897
read_obj_stream = _AsyncReadObjectStream(
@@ -88,7 +107,6 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
88107
assert read_obj_stream.object_name == _TEST_OBJECT_NAME
89108
assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER
90109
assert read_obj_stream.read_handle == _TEST_READ_HANDLE
91-
assert read_obj_stream.first_bidi_read_req == first_bidi_read_req
92110
assert read_obj_stream.rpc == rpc_sentinel
93111

94112

@@ -118,6 +136,32 @@ async def test_open(mock_client, mock_cls_async_bidi_rpc):
118136
assert read_obj_stream.is_stream_open
119137

120138

139+
@mock.patch(
140+
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
141+
)
142+
@mock.patch(
143+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
144+
)
145+
@pytest.mark.asyncio
146+
async def test_open_with_read_handle(mock_client, mock_cls_async_bidi_rpc):
147+
# arrange
148+
read_obj_stream = await instantiate_read_obj_stream_with_read_handle(
149+
mock_client, mock_cls_async_bidi_rpc, open=False
150+
)
151+
152+
# act
153+
await read_obj_stream.open()
154+
155+
# assert
156+
read_obj_stream.socket_like_rpc.open.assert_called_once()
157+
read_obj_stream.socket_like_rpc.recv.assert_called_once()
158+
159+
assert read_obj_stream.generation_number is None
160+
assert read_obj_stream.persisted_size is None
161+
assert read_obj_stream.read_handle == _TEST_READ_HANDLE_NEW
162+
assert read_obj_stream.is_stream_open
163+
164+
121165
@mock.patch(
122166
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
123167
)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.