Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2d5a7b1

Browse filesBrowse files
authored
fix(experimental): no state lookup while opening bidi-write stream (#1636)
fix(experimental): no state lookup while opening bidi-write stream
1 parent 4a609a4 commit 2d5a7b1
Copy full SHA for 2d5a7b1

File tree

Expand file treeCollapse file tree

5 files changed

+57
-20
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+57
-20
lines changed
Open diff view settings
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py
+11-7Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,11 @@ def __init__(
114114
write_handle=self.write_handle,
115115
)
116116
self._is_stream_open: bool = False
117+
# `offset` is the latest size of the object without staleless.
117118
self.offset: Optional[int] = None
119+
# `persisted_size` is the total_bytes persisted in the GCS server.
120+
# Please note: `offset` and `persisted_size` are same when the stream is
121+
# opened.
118122
self.persisted_size: Optional[int] = None
119123

120124
async def state_lookup(self) -> int:
@@ -152,17 +156,17 @@ async def open(self) -> None:
152156
if self.generation is None:
153157
self.generation = self.write_obj_stream.generation_number
154158
self.write_handle = self.write_obj_stream.write_handle
155-
156-
# Update self.persisted_size
157-
_ = await self.state_lookup()
159+
self.persisted_size = self.write_obj_stream.persisted_size
158160

159161
async def append(self, data: bytes) -> None:
160162
"""Appends data to the Appendable object.
161163
162-
This method sends the provided data to the GCS server in chunks. It
163-
maintains an internal threshold `_MAX_BUFFER_SIZE_BYTES` and will
164-
automatically flush the data to make it visible to readers when that
165-
threshold has reached.
164+
calling `self.append` will append bytes at the end of the current size
165+
ie. `self.offset` bytes relative to the begining of the object.
166+
167+
This method sends the provided `data` to the GCS server in chunks.
168+
and persists data in GCS at every `_MAX_BUFFER_SIZE_BYTES` bytes by
169+
calling `self.simple_flush`.
166170
167171
:type data: bytes
168172
:param data: The bytes to append to the object.
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ async def download_ranges(
216216
217217
:type read_ranges: List[Tuple[int, int, "BytesIO"]]
218218
:param read_ranges: A list of tuples, where each tuple represents a
219-
byte range (start_byte, bytes_to_read, writeable_buffer). Buffer has
219+
combintaion of byte_range and writeable buffer in format -
220+
(`start_byte`, `bytes_to_read`, `writeable_buffer`). Buffer has
220221
to be provided by the user, and user has to make sure appropriate
221222
memory is available in the application to avoid out-of-memory crash.
222223
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_write_object_stream.py
+7-2Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ async def open(self) -> None:
117117
object=self.object_name,
118118
generation=self.generation_number,
119119
),
120-
state_lookup=True,
121120
)
122121

123122
self.socket_like_rpc = AsyncBidiRpc(
@@ -136,11 +135,17 @@ async def open(self) -> None:
136135
raise ValueError(
137136
"Failed to obtain object generation after opening the stream"
138137
)
139-
self.generation_number = response.resource.generation
140138

141139
if not response.write_handle:
142140
raise ValueError("Failed to obtain write_handle after opening the stream")
143141

142+
if not response.resource.size:
143+
# Appending to a 0 byte appendable object.
144+
self.persisted_size = 0
145+
else:
146+
self.persisted_size = response.resource.size
147+
148+
self.generation_number = response.resource.generation
144149
self.write_handle = response.write_handle
145150

146151
async def close(self) -> None:
Collapse file

‎tests/unit/asyncio/test_async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_appendable_object_writer.py
+31-10Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,10 @@ async def test_open_appendable_object_writer(mock_write_object_stream, mock_clie
133133
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
134134
mock_stream = mock_write_object_stream.return_value
135135
mock_stream.open = mock.AsyncMock()
136-
mock_stream.send = mock.AsyncMock()
137-
mock_stream.recv = mock.AsyncMock()
138-
139-
mock_state_response = mock.MagicMock()
140-
mock_state_response.persisted_size = 1024
141-
mock_stream.recv.return_value = mock_state_response
142136

143137
mock_stream.generation_number = GENERATION
144138
mock_stream.write_handle = WRITE_HANDLE
139+
mock_stream.persisted_size = 0
145140

146141
# Act
147142
await writer.open()
@@ -151,11 +146,37 @@ async def test_open_appendable_object_writer(mock_write_object_stream, mock_clie
151146
assert writer._is_stream_open
152147
assert writer.generation == GENERATION
153148
assert writer.write_handle == WRITE_HANDLE
149+
assert writer.persisted_size == 0
154150

155-
expected_request = _storage_v2.BidiWriteObjectRequest(state_lookup=True)
156-
mock_stream.send.assert_awaited_once_with(expected_request)
157-
mock_stream.recv.assert_awaited_once()
158-
assert writer.persisted_size == 1024
151+
152+
@pytest.mark.asyncio
153+
@mock.patch(
154+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
155+
)
156+
async def test_open_appendable_object_writer_existing_object(
157+
mock_write_object_stream, mock_client
158+
):
159+
"""Test the open method."""
160+
# Arrange
161+
writer = AsyncAppendableObjectWriter(
162+
mock_client, BUCKET, OBJECT, generation=GENERATION
163+
)
164+
mock_stream = mock_write_object_stream.return_value
165+
mock_stream.open = mock.AsyncMock()
166+
167+
mock_stream.generation_number = GENERATION
168+
mock_stream.write_handle = WRITE_HANDLE
169+
mock_stream.persisted_size = PERSISTED_SIZE
170+
171+
# Act
172+
await writer.open()
173+
174+
# Assert
175+
mock_stream.open.assert_awaited_once()
176+
assert writer._is_stream_open
177+
assert writer.generation == GENERATION
178+
assert writer.write_handle == WRITE_HANDLE
179+
assert writer.persisted_size == PERSISTED_SIZE
159180

160181

161182
@pytest.mark.asyncio
Collapse file

‎tests/unit/asyncio/test_async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_write_object_stream.py
+6Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, ope
5555
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
5656
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
5757
mock_response.resource.generation = GENERATION
58+
mock_response.resource.size = 0
5859
mock_response.write_handle = WRITE_HANDLE
5960
socket_like_rpc.recv = AsyncMock(return_value=mock_response)
6061

@@ -129,6 +130,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
129130
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
130131
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
131132
mock_response.resource.generation = GENERATION
133+
mock_response.resource.size = 0
132134
mock_response.write_handle = WRITE_HANDLE
133135
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
134136

@@ -143,6 +145,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
143145
socket_like_rpc.recv.assert_called_once()
144146
assert stream.generation_number == GENERATION
145147
assert stream.write_handle == WRITE_HANDLE
148+
assert stream.persisted_size == 0
146149

147150

148151
@pytest.mark.asyncio
@@ -158,6 +161,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client):
158161

159162
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
160163
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
164+
mock_response.resource.size = 1024
161165
mock_response.resource.generation = GENERATION
162166
mock_response.write_handle = WRITE_HANDLE
163167
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
@@ -175,6 +179,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client):
175179
socket_like_rpc.recv.assert_called_once()
176180
assert stream.generation_number == GENERATION
177181
assert stream.write_handle == WRITE_HANDLE
182+
assert stream.persisted_size == 1024
178183

179184

180185
@pytest.mark.asyncio
@@ -191,6 +196,7 @@ async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_cli
191196
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
192197
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
193198
mock_response.resource.generation = GENERATION
199+
mock_response.resource.size = 0
194200
mock_response.write_handle = WRITE_HANDLE
195201
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
196202

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.