Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 89bfe7a

Browse filesBrowse files
authored
feat(experimental): flush the last chunk in append method (#1699)
Earlier the last chunk was being flushed while calling the close() method. Now it will be done inside the append method itself.
1 parent a57ea0e commit 89bfe7a
Copy full SHA for 89bfe7a

File tree

Expand file treeCollapse file tree

5 files changed

+162
-27
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+162
-27
lines changed
Open diff view settings
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py
+30-8Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,9 @@ async def append(self, data: bytes) -> None:
188188
ie. `self.offset` bytes relative to the begining of the object.
189189
190190
This method sends the provided `data` to the GCS server in chunks.
191-
and persists data in GCS at every `_MAX_BUFFER_SIZE_BYTES` bytes by
192-
calling `self.simple_flush`.
191+
and persists data in GCS at every `_DEFAULT_FLUSH_INTERVAL_BYTES` bytes
192+
or at the last chunk whichever is earlier. Persisting is done by setting
193+
`flush=True` on request.
193194
194195
:type data: bytes
195196
:param data: The bytes to append to the object.
@@ -214,20 +215,33 @@ async def append(self, data: bytes) -> None:
214215
while start_idx < total_bytes:
215216
end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes)
216217
data_chunk = data[start_idx:end_idx]
218+
is_last_chunk = end_idx == total_bytes
219+
chunk_size = end_idx - start_idx
217220
await self.write_obj_stream.send(
218221
_storage_v2.BidiWriteObjectRequest(
219222
write_offset=self.offset,
220223
checksummed_data=_storage_v2.ChecksummedData(
221224
content=data_chunk,
222225
crc32c=int.from_bytes(Checksum(data_chunk).digest(), "big"),
223226
),
227+
state_lookup=is_last_chunk,
228+
flush=is_last_chunk
229+
or (
230+
self.bytes_appended_since_last_flush + chunk_size
231+
>= self.flush_interval
232+
),
224233
)
225234
)
226-
chunk_size = end_idx - start_idx
227235
self.offset += chunk_size
228236
self.bytes_appended_since_last_flush += chunk_size
237+
229238
if self.bytes_appended_since_last_flush >= self.flush_interval:
230-
await self.simple_flush()
239+
self.bytes_appended_since_last_flush = 0
240+
241+
if is_last_chunk:
242+
response = await self.write_obj_stream.recv()
243+
self.persisted_size = response.persisted_size
244+
self.offset = self.persisted_size
231245
self.bytes_appended_since_last_flush = 0
232246
start_idx = end_idx
233247

@@ -292,20 +306,24 @@ async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object]
292306
raise ValueError("Stream is not open. Call open() before close().")
293307

294308
if finalize_on_close:
295-
await self.finalize()
296-
else:
297-
await self.flush()
309+
return await self.finalize()
298310

299311
await self.write_obj_stream.close()
300312

301313
self._is_stream_open = False
302314
self.offset = None
303-
return self.object_resource if finalize_on_close else self.persisted_size
315+
return self.persisted_size
304316

305317
async def finalize(self) -> _storage_v2.Object:
306318
"""Finalizes the Appendable Object.
307319
308320
Note: Once finalized no more data can be appended.
321+
This method is different from `close`. if `.close()` is called data may
322+
still be appended to object at a later point in time by opening with
323+
generation number.
324+
(i.e. `open(..., generation=<object_generation_number>)`.
325+
However if `.finalize()` is called no more data can be appended to the
326+
object.
309327
310328
rtype: google.cloud.storage_v2.types.Object
311329
returns: The finalized object resource.
@@ -322,6 +340,10 @@ async def finalize(self) -> _storage_v2.Object:
322340
response = await self.write_obj_stream.recv()
323341
self.object_resource = response.resource
324342
self.persisted_size = self.object_resource.size
343+
await self.write_obj_stream.close()
344+
345+
self._is_stream_open = False
346+
self.offset = None
325347
return self.object_resource
326348

327349
# helper methods.
Collapse file

‎noxfile.py‎

Copy file name to clipboardExpand all lines: noxfile.py
+8-1Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,14 @@ def system(session):
192192
# 2021-05-06: defer installing 'google-cloud-*' to after this package,
193193
# in order to work around Python 2.7 googolapis-common-protos
194194
# issue.
195-
session.install("mock", "pytest", "pytest-rerunfailures", "-c", constraints_path)
195+
session.install(
196+
"mock",
197+
"pytest",
198+
"pytest-rerunfailures",
199+
"pytest-asyncio",
200+
"-c",
201+
constraints_path,
202+
)
196203
session.install("-e", ".", "-c", constraints_path)
197204
session.install(
198205
"google-cloud-testutils",
Collapse file

‎tests/system/test_zonal.py‎

Copy file name to clipboardExpand all lines: tests/system/test_zonal.py
+53Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,3 +307,56 @@ async def _read_and_verify(expected_content, generation=None):
307307
del mrd
308308
del mrd_2
309309
gc.collect()
310+
311+
312+
@pytest.mark.asyncio
313+
async def test_append_flushes_and_state_lookup(storage_client, blobs_to_delete):
314+
"""
315+
System test for AsyncAppendableObjectWriter, verifying flushing behavior
316+
for both small and large appends.
317+
"""
318+
object_name = f"test-append-flush-varied-size-{uuid.uuid4()}"
319+
grpc_client = AsyncGrpcClient().grpc_client
320+
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
321+
322+
# Schedule for cleanup
323+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
324+
325+
# --- Part 1: Test with small data ---
326+
small_data = b"small data"
327+
328+
await writer.open()
329+
assert writer._is_stream_open
330+
331+
await writer.append(small_data)
332+
persisted_size = await writer.state_lookup()
333+
assert persisted_size == len(small_data)
334+
335+
# --- Part 2: Test with large data ---
336+
large_data = os.urandom(38 * 1024 * 1024)
337+
338+
# Append data larger than the default flush interval (16 MiB).
339+
# This should trigger the interval-based flushing logic.
340+
await writer.append(large_data)
341+
342+
# Verify the total data has been persisted.
343+
total_size = len(small_data) + len(large_data)
344+
persisted_size = await writer.state_lookup()
345+
assert persisted_size == total_size
346+
347+
# --- Part 3: Finalize and verify ---
348+
final_object = await writer.close(finalize_on_close=True)
349+
350+
assert not writer._is_stream_open
351+
assert final_object.size == total_size
352+
353+
# Verify the full content of the object.
354+
full_data = small_data + large_data
355+
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
356+
buffer = BytesIO()
357+
await mrd.open()
358+
# (0, 0) means read the whole object
359+
await mrd.download_ranges([(0, 0, buffer)])
360+
await mrd.close()
361+
content = buffer.getvalue()
362+
assert content == full_data
Collapse file

‎tests/unit/asyncio/retry/test_writes_resumption_strategy.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/retry/test_writes_resumption_strategy.py
+3-1Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,9 @@ def test_update_state_from_response(self):
206206
strategy.update_state_from_response(response2, state)
207207
self.assertEqual(write_state.persisted_size, 1024)
208208

209-
final_resource = storage_type.Object(name="test-object", bucket="b", size=2048, finalize_time=datetime.now())
209+
final_resource = storage_type.Object(
210+
name="test-object", bucket="b", size=2048, finalize_time=datetime.now()
211+
)
210212
response3 = storage_type.BidiWriteObjectResponse(resource=final_resource)
211213
strategy.update_state_from_response(response3, state)
212214
self.assertEqual(write_state.persisted_size, 2048)
Collapse file

‎tests/unit/asyncio/test_async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_appendable_object_writer.py
+68-17Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ async def test_close(mock_write_object_stream, mock_client):
364364
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
365365
writer._is_stream_open = True
366366
writer.offset = 1024
367+
writer.persisted_size = 1024
367368
mock_stream = mock_write_object_stream.return_value
368369
mock_stream.send = mock.AsyncMock()
369370
mock_stream.recv = mock.AsyncMock(
@@ -435,16 +436,20 @@ async def test_finalize(mock_write_object_stream, mock_client):
435436
mock_stream.recv = mock.AsyncMock(
436437
return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource)
437438
)
439+
mock_stream.close = mock.AsyncMock()
438440

439441
gcs_object = await writer.finalize()
440442

441443
mock_stream.send.assert_awaited_once_with(
442444
_storage_v2.BidiWriteObjectRequest(finish_write=True)
443445
)
444446
mock_stream.recv.assert_awaited_once()
447+
mock_stream.close.assert_awaited_once()
445448
assert writer.object_resource == mock_resource
446449
assert writer.persisted_size == 123
447450
assert gcs_object == mock_resource
451+
assert writer._is_stream_open is False
452+
assert writer.offset is None
448453

449454

450455
@pytest.mark.asyncio
@@ -501,30 +506,39 @@ async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client
501506
writer.persisted_size = 100
502507
mock_stream = mock_write_object_stream.return_value
503508
mock_stream.send = mock.AsyncMock()
504-
writer.simple_flush = mock.AsyncMock()
505509

506510
data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1)
511+
mock_stream.recv = mock.AsyncMock(
512+
return_value=_storage_v2.BidiWriteObjectResponse(
513+
persisted_size=100 + len(data)
514+
)
515+
)
516+
507517
await writer.append(data)
508518

509519
assert mock_stream.send.await_count == 2
510-
first_call = mock_stream.send.await_args_list[0]
511-
second_call = mock_stream.send.await_args_list[1]
520+
first_request = mock_stream.send.await_args_list[0].args[0]
521+
second_request = mock_stream.send.await_args_list[1].args[0]
512522

513523
# First chunk
514-
assert first_call[0][0].write_offset == 100
515-
assert len(first_call[0][0].checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES
516-
assert first_call[0][0].checksummed_data.crc32c == int.from_bytes(
524+
assert first_request.write_offset == 100
525+
assert len(first_request.checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES
526+
assert first_request.checksummed_data.crc32c == int.from_bytes(
517527
Checksum(data[:_MAX_CHUNK_SIZE_BYTES]).digest(), byteorder="big"
518528
)
519-
# Second chunk
520-
assert second_call[0][0].write_offset == 100 + _MAX_CHUNK_SIZE_BYTES
521-
assert len(second_call[0][0].checksummed_data.content) == 1
522-
assert second_call[0][0].checksummed_data.crc32c == int.from_bytes(
529+
assert not first_request.flush
530+
assert not first_request.state_lookup
531+
532+
# Second chunk (last chunk)
533+
assert second_request.write_offset == 100 + _MAX_CHUNK_SIZE_BYTES
534+
assert len(second_request.checksummed_data.content) == 1
535+
assert second_request.checksummed_data.crc32c == int.from_bytes(
523536
Checksum(data[_MAX_CHUNK_SIZE_BYTES:]).digest(), byteorder="big"
524537
)
538+
assert second_request.flush
539+
assert second_request.state_lookup
525540

526541
assert writer.offset == 100 + len(data)
527-
writer.simple_flush.assert_not_awaited()
528542

529543

530544
@pytest.mark.asyncio
@@ -541,12 +555,25 @@ async def test_append_flushes_when_buffer_is_full(
541555
writer.persisted_size = 0
542556
mock_stream = mock_write_object_stream.return_value
543557
mock_stream.send = mock.AsyncMock()
544-
writer.simple_flush = mock.AsyncMock()
558+
mock_stream.recv = mock.AsyncMock()
545559

546560
data = b"a" * _DEFAULT_FLUSH_INTERVAL_BYTES
547561
await writer.append(data)
548562

549-
writer.simple_flush.assert_awaited_once()
563+
num_chunks = _DEFAULT_FLUSH_INTERVAL_BYTES // _MAX_CHUNK_SIZE_BYTES
564+
assert mock_stream.send.await_count == num_chunks
565+
566+
# All but the last request should not have flush or state_lookup set.
567+
for i in range(num_chunks - 1):
568+
request = mock_stream.send.await_args_list[i].args[0]
569+
assert not request.flush
570+
assert not request.state_lookup
571+
572+
# The last request should have flush and state_lookup set.
573+
last_request = mock_stream.send.await_args_list[-1].args[0]
574+
assert last_request.flush
575+
assert last_request.state_lookup
576+
assert writer.bytes_appended_since_last_flush == 0
550577

551578

552579
@pytest.mark.asyncio
@@ -561,12 +588,18 @@ async def test_append_handles_large_data(mock_write_object_stream, mock_client):
561588
writer.persisted_size = 0
562589
mock_stream = mock_write_object_stream.return_value
563590
mock_stream.send = mock.AsyncMock()
564-
writer.simple_flush = mock.AsyncMock()
591+
mock_stream.recv = mock.AsyncMock()
565592

566593
data = b"a" * (_DEFAULT_FLUSH_INTERVAL_BYTES * 2 + 1)
567594
await writer.append(data)
568595

569-
assert writer.simple_flush.await_count == 2
596+
flushed_requests = [
597+
call.args[0] for call in mock_stream.send.await_args_list if call.args[0].flush
598+
]
599+
assert len(flushed_requests) == 3
600+
601+
last_request = mock_stream.send.await_args_list[-1].args[0]
602+
assert last_request.state_lookup
570603

571604

572605
@pytest.mark.asyncio
@@ -584,17 +617,35 @@ async def test_append_data_two_times(mock_write_object_stream, mock_client):
584617
writer.persisted_size = 0
585618
mock_stream = mock_write_object_stream.return_value
586619
mock_stream.send = mock.AsyncMock()
587-
writer.simple_flush = mock.AsyncMock()
588620

589621
data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10)
622+
mock_stream.recv = mock.AsyncMock(
623+
return_value=_storage_v2.BidiWriteObjectResponse(
624+
persisted_size= len(data1)
625+
)
626+
)
590627
await writer.append(data1)
591628

629+
assert mock_stream.send.await_count == 2
630+
last_request_data1 = mock_stream.send.await_args_list[-1].args[0]
631+
assert last_request_data1.flush
632+
assert last_request_data1.state_lookup
633+
592634
data2 = b"b" * (_MAX_CHUNK_SIZE_BYTES + 20)
635+
mock_stream.recv = mock.AsyncMock(
636+
return_value=_storage_v2.BidiWriteObjectResponse(
637+
persisted_size= len(data2) + len(data1)
638+
)
639+
)
593640
await writer.append(data2)
594641

642+
assert mock_stream.send.await_count == 4
643+
last_request_data2 = mock_stream.send.await_args_list[-1].args[0]
644+
assert last_request_data2.flush
645+
assert last_request_data2.state_lookup
646+
595647
total_data_length = len(data1) + len(data2)
596648
assert writer.offset == total_data_length
597-
assert writer.simple_flush.await_count == 0
598649

599650

600651
@pytest.mark.asyncio

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.