Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b6a85e4

Browse filesBrowse files
authored
feat(storage): full object checksum: integrate full-object checksum in AsyncMultiRangeDownloader (#17263)
### 1. Overview of the Solution This solution implements end-to-end full-object checksum validation in `AsyncMultiRangeDownloader` for the asynchronous Google Cloud Storage Python client library. As asynchronous multiplexed downloads of non-contiguous ranges are performed concurrently over a single bidirectional gRPC connection, this feature automatically and incrementally calculates a rolling checksum as bytes arrive and validates it against the server's authoritative object checksum once the download completes. The technical approach consists of three coordinated layers: * **`_AsyncReadObjectStream` (Stream Ingestion)**: Safely extracts the authoritative server checksum (`full_obj_server_crc32c`) and finalization status (`is_finalized`) from the object metadata received in the first data payload response of the stream. * **`_ReadResumptionStrategy` & `_DownloadState` (Verification Logic)**: Computes an isolated, persistent rolling checksum in the individual `_DownloadState` object to ensure calculations do not bleed across concurrent multiplexed ranges. Crucially, the rolling hash updates only *after* buffer writes succeed to prevent state corruption during retry re-connects, raising a `DataCorruption` exception on completion if a mismatch occurs. * **`AsyncMultiRangeDownloader` (Orchestration & Cleanup)**: Detects candidate full-object ranges (e.g., `(0, 0)` or `(0, persisted_size)`), propagates checksum settings to the resumption strategy, and guarantees robust cleanup (closing the stream immediately and unregistering IDs) if data corruption or write errors occur. ### 2. What This PR Specifically Does This PR implements **Step 3: Downloader Orchestration & End-to-End Integration/System Tests** of the solution: * Relocates `raise_if_no_fast_crc32c()` validation to the execution phase (`download_ranges()`) instead of construction time. * Propagates stream details (`is_finalized`, `full_obj_server_crc32c`) to the resumption state dictionary. * Detects implicit full-object downloads (`(0, 0)`) or explicit full-object downloads (`(0, persisted_size)`) post-`open()`, and flags them for validation. * Implements the robust cleanup guarantee in `download_ranges()`: wraps execution in a robust `try...finally` block to close the stream immediately and unregister multiplexer range IDs upon a `DataCorruption` exception. * Adds integration tests in `test_async_multi_range_downloader.py` and extensive end-to-end system tests in `test_zonal.py` checking finalized, unfinalized (appendable), explicit, implicit, and bypassed range downloads against live GCS buckets.
1 parent 2361ba6 commit b6a85e4
Copy full SHA for b6a85e4

6 files changed

+257-11Lines changed: 257 additions & 11 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎packages/google-cloud-storage/google/cloud/storage/_helpers.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/google/cloud/storage/_helpers.py
+3-1Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@
3333
from google.cloud.exceptions import NotFound
3434

3535
from google.cloud.storage._opentelemetry_tracing import (
36-
create_trace_span as _base_create_trace_span,
3736
_is_bucket_metadata_disabled,
3837
)
38+
from google.cloud.storage._opentelemetry_tracing import (
39+
create_trace_span as _base_create_trace_span,
40+
)
3941
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
4042
from google.cloud.storage.retry import (
4143
DEFAULT_RETRY,
Collapse file

‎packages/google-cloud-storage/google/cloud/storage/_http.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/google/cloud/storage/_http.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
from google.cloud.storage import __version__, _helpers
2626
from google.cloud.storage._opentelemetry_tracing import (
2727
HAS_OPENTELEMETRY,
28+
_is_bucket_metadata_disabled,
2829
create_trace_span,
2930
enable_otel_traces,
30-
_is_bucket_metadata_disabled,
3131
)
3232

3333
logger = logging.getLogger(__name__)
Collapse file

‎packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py
+36-7Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
_DownloadState,
4545
_ReadResumptionStrategy,
4646
)
47+
from google.cloud.storage.exceptions import DataCorruption
4748

4849
from ._utils import raise_if_no_fast_crc32c
4950

@@ -219,8 +220,6 @@ def __init__(
219220
)
220221
generation = kwargs.pop("generation_number")
221222

222-
raise_if_no_fast_crc32c()
223-
224223
self.client = client
225224
self.bucket_name = bucket_name
226225
self.object_name = object_name
@@ -232,6 +231,8 @@ def __init__(
232231
self._multiplexer: Optional[_StreamMultiplexer] = None
233232
self.persisted_size: Optional[int] = None # updated after opening the stream
234233
self._open_retries: int = 0
234+
self.is_finalized: bool = False
235+
self.full_obj_server_crc32c: Optional[int] = None
235236

236237
async def __aenter__(self):
237238
"""Opens the underlying bidi-gRPC connection to read from the object."""
@@ -327,6 +328,8 @@ async def _do_open():
327328
self.read_handle = self.read_obj_str.read_handle
328329
if self.read_obj_str.persisted_size is not None:
329330
self.persisted_size = self.read_obj_str.persisted_size
331+
self.is_finalized = self.read_obj_str.is_finalized
332+
self.full_obj_server_crc32c = self.read_obj_str.full_obj_server_crc32c
330333

331334
self._is_stream_open = True
332335

@@ -363,6 +366,8 @@ async def factory():
363366
self.generation = stream.generation_number
364367
if stream.read_handle:
365368
self.read_handle = stream.read_handle
369+
self.is_finalized = stream.is_finalized
370+
self.full_obj_server_crc32c = stream.full_obj_server_crc32c
366371

367372
self.read_obj_str = stream
368373
self._is_stream_open = True
@@ -377,6 +382,7 @@ async def download_ranges(
377382
lock: asyncio.Lock = None,
378383
retry_policy: Optional[AsyncRetry] = None,
379384
metadata: Optional[List[Tuple[str, str]]] = None,
385+
enable_checksum: bool = True,
380386
) -> None:
381387
"""Downloads multiple byte ranges from the object into the buffers
382388
provided by user with automatic retries.
@@ -412,6 +418,9 @@ async def download_ranges(
412418
"Invalid input - length of read_ranges cannot be more than 1000"
413419
)
414420

421+
if enable_checksum:
422+
raise_if_no_fast_crc32c()
423+
415424
if not self._is_stream_open:
416425
raise ValueError("Underlying bidi-gRPC stream is not open")
417426

@@ -422,16 +431,30 @@ async def download_ranges(
422431
download_states = {}
423432
for read_range in read_ranges:
424433
read_id = generate_random_56_bit_integer()
434+
# Unpack tuple into self-documenting variable names to improve readability.
435+
offset, length, user_buffer = read_range
436+
437+
# Heuristic to detect full object reads:
438+
# - Implicit full object read: start offset is 0 and length is 0 (read all).
439+
# - Explicit full object read: start offset is 0 and length matches the exact persisted size.
440+
is_full_object_read = (offset == 0 and length == 0) or (
441+
self.persisted_size is not None
442+
and offset == 0
443+
and length == self.persisted_size
444+
)
425445
download_states[read_id] = _DownloadState(
426-
initial_offset=read_range[0],
427-
initial_length=read_range[1],
428-
user_buffer=read_range[2],
446+
initial_offset=offset,
447+
initial_length=length,
448+
user_buffer=user_buffer,
449+
is_full_object_read=is_full_object_read,
429450
)
430451

431452
initial_state = {
432453
"download_states": download_states,
433454
"read_handle": self.read_handle,
434455
"routing_token": None,
456+
"enable_checksum": enable_checksum,
457+
"full_obj_server_crc32c": self.full_obj_server_crc32c,
435458
}
436459

437460
read_ids = set(download_states.keys())
@@ -519,12 +542,18 @@ async def generator():
519542
strategy, send_and_recv_via_multiplexer
520543
)
521544

522-
await retry_manager.execute(initial_state, retry_policy)
545+
try:
546+
await retry_manager.execute(initial_state, retry_policy)
547+
except DataCorruption:
548+
if self.is_stream_open:
549+
await self.close()
550+
raise
523551

524552
if initial_state.get("read_handle"):
525553
self.read_handle = initial_state["read_handle"]
526554
finally:
527-
self._multiplexer.unregister(read_ids)
555+
if self._multiplexer is not None:
556+
self._multiplexer.unregister(read_ids)
528557

529558
async def close(self):
530559
"""
Collapse file

‎packages/google-cloud-storage/tests/system/test_zonal.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/tests/system/test_zonal.py
+86Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
ObjectCustomContextPayload,
2828
)
2929

30+
3031
pytestmark = pytest.mark.skipif(
3132
os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True",
3233
reason="Zonal system tests need to be explicitly enabled. This helps scheduling tests in Kokoro and Cloud Build.",
@@ -961,3 +962,88 @@ async def _run():
961962
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
962963

963964
event_loop.run_until_complete(_run())
965+
966+
967+
@pytest.mark.parametrize(
968+
"read_start, read_length, enable_checksum",
969+
[
970+
(0, 0, True),
971+
(0, 1024 * 1024, True),
972+
(0, 0, False),
973+
],
974+
)
975+
def test_mrd_checksum_validation(
976+
storage_client,
977+
blobs_to_delete,
978+
event_loop,
979+
grpc_client_direct,
980+
read_start,
981+
read_length,
982+
enable_checksum,
983+
):
984+
"""
985+
Tests full downloads with specified offset, length, and enable_checksum toggle on finalized objects.
986+
"""
987+
object_size = 1024 * 1024 # 1MB
988+
object_name = f"test_mrd_chksum-{uuid.uuid4()}"
989+
990+
async def _run():
991+
object_data = os.urandom(object_size)
992+
993+
writer = AsyncAppendableObjectWriter(
994+
grpc_client_direct, _ZONAL_BUCKET, object_name
995+
)
996+
await writer.open()
997+
await writer.append(object_data)
998+
await writer.close(finalize_on_close=True)
999+
1000+
async with AsyncMultiRangeDownloader(
1001+
grpc_client_direct, _ZONAL_BUCKET, object_name
1002+
) as mrd:
1003+
buffer = BytesIO()
1004+
await mrd.download_ranges(
1005+
[(read_start, read_length, buffer)], enable_checksum=enable_checksum
1006+
)
1007+
assert buffer.getvalue() == object_data
1008+
1009+
# cleanup
1010+
del writer
1011+
gc.collect()
1012+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
1013+
1014+
event_loop.run_until_complete(_run())
1015+
1016+
1017+
def test_mrd_checksum_unfinalized_appendable_skipped(
1018+
storage_client, blobs_to_delete, event_loop, grpc_client_direct
1019+
):
1020+
"""
1021+
Verifies that live, unfinalized appendable objects skip the full-object checksum check
1022+
naturally without raising any exceptions.
1023+
"""
1024+
object_name = f"test_mrd_chksum_unfin-{uuid.uuid4()}"
1025+
1026+
async def _run():
1027+
writer = AsyncAppendableObjectWriter(
1028+
grpc_client_direct, _ZONAL_BUCKET, object_name
1029+
)
1030+
await writer.open()
1031+
await writer.append(_BYTES_TO_UPLOAD)
1032+
await writer.flush() # Flushed but not finalized!
1033+
1034+
# Download the unfinalized appendable object with enable_checksum=True
1035+
async with AsyncMultiRangeDownloader(
1036+
grpc_client_direct, _ZONAL_BUCKET, object_name
1037+
) as mrd:
1038+
buffer = BytesIO()
1039+
# Since it's unfinalized, it should skip the checksum check without raising
1040+
await mrd.download_ranges([(0, 0, buffer)], enable_checksum=True)
1041+
assert buffer.getvalue() == _BYTES_TO_UPLOAD
1042+
1043+
# cleanup
1044+
await writer.close()
1045+
del writer
1046+
gc.collect()
1047+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
1048+
1049+
event_loop.run_until_complete(_run())
Collapse file

‎packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py
+130-2Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,12 +308,16 @@ async def test_downloading_without_opening_should_throw_error(self):
308308
assert not mrd.is_stream_open
309309

310310
@mock.patch("google.cloud.storage.asyncio._utils.google_crc32c")
311-
def test_init_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c):
311+
@pytest.mark.asyncio
312+
async def test_download_ranges_raises_if_crc32c_c_extension_is_missing(
313+
self, mock_google_crc32c
314+
):
312315
mock_google_crc32c.implementation = "python"
313316
mock_client = mock.MagicMock()
317+
mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object")
314318

315319
with pytest.raises(exceptions.FailedPrecondition) as exc_info:
316-
AsyncMultiRangeDownloader(mock_client, "bucket", "object")
320+
await mrd.download_ranges([(0, 10, BytesIO())])
317321

318322
assert "The google-crc32c package is not installed with C support" in str(
319323
exc_info.value
@@ -579,3 +583,127 @@ async def staged_recv():
579583

580584
# Assert
581585
mock_logger.info.assert_any_call("Resuming download (attempt 2) for 1 ranges.")
586+
587+
@mock.patch(
588+
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
589+
)
590+
@pytest.mark.asyncio
591+
async def test_open_populates_checksum_properties(
592+
self, mock_cls_async_read_object_stream
593+
):
594+
# Arrange
595+
mock_client = mock.MagicMock()
596+
mock_client.grpc_client = mock.AsyncMock()
597+
mock_stream = mock_cls_async_read_object_stream.return_value
598+
mock_stream.open = AsyncMock()
599+
mock_stream.generation_number = 123
600+
mock_stream.persisted_size = 100
601+
mock_stream.read_handle = b"h"
602+
mock_stream.is_finalized = True
603+
mock_stream.full_obj_server_crc32c = 999
604+
605+
mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object")
606+
assert mrd.is_finalized is False
607+
assert mrd.full_obj_server_crc32c is None
608+
609+
# Act
610+
await mrd.open()
611+
612+
# Assert
613+
assert mrd.is_finalized is True
614+
assert mrd.full_obj_server_crc32c == 999
615+
616+
@mock.patch(
617+
"google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy"
618+
)
619+
@mock.patch(
620+
"google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager"
621+
)
622+
@mock.patch(
623+
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
624+
)
625+
@pytest.mark.asyncio
626+
async def test_download_ranges_configures_full_object_read_state(
627+
self,
628+
mock_cls_async_read_object_stream,
629+
mock_retry_manager_cls,
630+
mock_strategy_cls,
631+
):
632+
# Arrange
633+
mock_client = mock.MagicMock()
634+
mock_client.grpc_client = mock.AsyncMock()
635+
mock_stream = mock_cls_async_read_object_stream.return_value
636+
mock_stream.open = AsyncMock()
637+
mock_stream.persisted_size = 100
638+
mock_stream.is_finalized = True
639+
mock_stream.full_obj_server_crc32c = 999
640+
641+
mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o")
642+
643+
mock_retry_manager = mock_retry_manager_cls.return_value
644+
mock_retry_manager.execute = AsyncMock()
645+
646+
# Act
647+
# Implicit full read (0, 0) and explicit full read (0, persisted_size=100)
648+
ranges = [(0, 0, BytesIO()), (0, 100, BytesIO()), (10, 20, BytesIO())]
649+
await mrd.download_ranges(ranges, enable_checksum=True)
650+
651+
# Assert
652+
mock_retry_manager.execute.assert_called_once()
653+
initial_state = mock_retry_manager.execute.call_args[0][0]
654+
655+
download_states = initial_state["download_states"]
656+
assert len(download_states) == 3
657+
658+
states_list = list(download_states.values())
659+
# First state: (0, 0) -> is_full_object_read is True
660+
assert states_list[0].is_full_object_read is True
661+
assert states_list[0].rolling_checksum is not None
662+
663+
# Second state: (0, 100) -> is_full_object_read is True
664+
assert states_list[1].is_full_object_read is True
665+
assert states_list[1].rolling_checksum is not None
666+
667+
# Third state: (10, 20) -> is_full_object_read is False
668+
assert states_list[2].is_full_object_read is False
669+
assert states_list[2].rolling_checksum is None
670+
671+
# State values for enable_checksum and crc32c
672+
assert initial_state["enable_checksum"] is True
673+
assert initial_state["full_obj_server_crc32c"] == 999
674+
675+
@mock.patch(
676+
"google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy"
677+
)
678+
@mock.patch(
679+
"google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager"
680+
)
681+
@mock.patch(
682+
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
683+
)
684+
@pytest.mark.asyncio
685+
async def test_download_ranges_closes_on_datacorruption(
686+
self,
687+
mock_cls_async_read_object_stream,
688+
mock_retry_manager_cls,
689+
mock_strategy_cls,
690+
):
691+
# Arrange
692+
mock_client = mock.MagicMock()
693+
mock_client.grpc_client = mock.AsyncMock()
694+
mock_stream = mock_cls_async_read_object_stream.return_value
695+
mock_stream.open = AsyncMock()
696+
697+
mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o")
698+
mrd.close = AsyncMock()
699+
700+
mock_retry_manager = mock_retry_manager_cls.return_value
701+
mock_retry_manager.execute = AsyncMock(
702+
side_effect=DataCorruption(None, "corrupted")
703+
)
704+
705+
# Act & Assert
706+
with pytest.raises(DataCorruption):
707+
await mrd.download_ranges([(0, 0, BytesIO())])
708+
709+
mrd.close.assert_called_once()
Collapse file

‎packages/google-cloud-storage/tests/unit/conftest.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/tests/unit/conftest.py
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515

1616
import asyncio
17+
1718
import pytest
1819

1920

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.