Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2361ba6

Browse filesBrowse files
authored
feat(storage): full object checksum: implement rolling checksum and verification in reads resumption strategy (#17262)
### 1. Overview of the Solution This solution implements end-to-end full-object checksum validation in `AsyncMultiRangeDownloader` for the asynchronous Google Cloud Storage Python client library. As asynchronous multiplexed downloads of non-contiguous ranges are performed concurrently over a single bidirectional gRPC connection, this feature automatically and incrementally calculates a rolling checksum as bytes arrive and validates it against the server's authoritative object checksum once the download completes. The technical approach consists of three coordinated layers: * **`_AsyncReadObjectStream` (Stream Ingestion)**: Safely extracts the authoritative server checksum (`full_obj_server_crc32c`) and finalization status (`is_finalized`) from the object metadata received in the first data payload response of the stream. * **`_ReadResumptionStrategy` & `_DownloadState` (Verification Logic)**: Computes an isolated, persistent rolling checksum in the individual `_DownloadState` object to ensure calculations do not bleed across concurrent multiplexed ranges. Crucially, the rolling hash updates only *after* buffer writes succeed to prevent state corruption during retry re-connects, raising a `DataCorruption` exception on completion if a mismatch occurs. * **`AsyncMultiRangeDownloader` (Orchestration & Cleanup)**: Detects candidate full-object ranges (e.g., `(0, 0)` or `(0, persisted_size)`), propagates checksum settings to the resumption strategy, and guarantees robust cleanup (closing the stream immediately and unregistering IDs) if data corruption or write errors occur. ### 2. What This PR Specifically Does This PR implements **Step 2: Full-Object Rolling Checksum & Resumption Verification Logic** of the solution: * Upgrades `_DownloadState` to track `is_full_object_read` and initialize an isolated `google_crc32c.Checksum()` rolling instance. * Updates `_ReadResumptionStrategy.update_state_from_response()` to run buffer writes *before* updating the rolling checksum, ensuring transactional safety during connection failures and retry reconnects. * Optimizes performance by bypassing rolling checksum calculations entirely if `enable_checksum` is `False`. * Performs the final validation match at `range_end` against the server's authoritative checksum, raising a `DataCorruption` exception if a mismatch is found. * Adds comprehensive unit tests in `test_reads_resumption_strategy.py` to verify successful validation, failure exceptions, and bypassed checks when validation is disabled.
1 parent e4a207d commit 2361ba6
Copy full SHA for 2361ba6

2 files changed

+153-5Lines changed: 153 additions & 5 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py
+39-3Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,25 @@ class _DownloadState:
3636
"""A helper class to track the state of a single range download."""
3737

3838
def __init__(
39-
self, initial_offset: int, initial_length: int, user_buffer: IO[bytes]
39+
self,
40+
initial_offset: int,
41+
initial_length: int,
42+
user_buffer: IO[bytes],
43+
is_full_object_read: bool = False,
44+
enable_checksum: bool = True,
4045
):
4146
self.initial_offset = initial_offset
4247
self.initial_length = initial_length
4348
self.user_buffer = user_buffer
4449
self.bytes_written = 0
4550
self.next_expected_offset = initial_offset
4651
self.is_complete = False
52+
self.is_full_object_read = is_full_object_read
53+
self.rolling_checksum = (
54+
google_crc32c.Checksum()
55+
if (is_full_object_read and enable_checksum)
56+
else None
57+
)
4758

4859

4960
class _ReadResumptionStrategy(_BaseResumptionStrategy):
@@ -90,6 +101,7 @@ def update_state_from_response(
90101
)
91102

92103
download_states = state["download_states"]
104+
checksum_enabled = state.get("enable_checksum", True)
93105

94106
for object_data_range in proto.object_data_ranges:
95107
# Ignore empty ranges or ranges for IDs not in our state
@@ -125,7 +137,7 @@ def update_state_from_response(
125137
checksummed_data = object_data_range.checksummed_data
126138
data = checksummed_data.content
127139

128-
if checksummed_data.HasField("crc32c"):
140+
if checksum_enabled and checksummed_data.HasField("crc32c"):
129141
server_checksum = checksummed_data.crc32c
130142
client_checksum = google_crc32c.value(data)
131143
if server_checksum != client_checksum:
@@ -138,10 +150,14 @@ def update_state_from_response(
138150
# Update State & Write Data
139151
chunk_size = len(data)
140152
read_state.user_buffer.write(data)
153+
154+
# Commit updates only after the write succeeds
155+
if checksum_enabled and read_state.rolling_checksum is not None:
156+
read_state.rolling_checksum.update(data)
141157
read_state.bytes_written += chunk_size
142158
read_state.next_expected_offset += chunk_size
143159

144-
# Final Byte Count Verification
160+
# Final Byte Count & Full Object Checksum Verification
145161
if object_data_range.range_end:
146162
read_state.is_complete = True
147163
if (
@@ -154,6 +170,26 @@ def update_state_from_response(
154170
f"Expected {read_state.initial_length}, got {read_state.bytes_written}",
155171
)
156172

173+
# Perform full-object checksum verification once the stream finishes.
174+
if (
175+
read_state.is_full_object_read
176+
and checksum_enabled
177+
and read_state.rolling_checksum is not None
178+
):
179+
full_obj_server_crc32c = state.get("full_obj_server_crc32c")
180+
if full_obj_server_crc32c is not None:
181+
# Use standard big-endian byte conversion to retrieve the rolling checksum value.
182+
client_checksum = int.from_bytes(
183+
read_state.rolling_checksum.digest(),
184+
byteorder="big",
185+
)
186+
if client_checksum != full_obj_server_crc32c:
187+
raise DataCorruption(
188+
response,
189+
f"Full object checksum mismatch for read_id {read_id}. "
190+
f"Server authoritative crc32c: {full_obj_server_crc32c}, client calculated rolling: {client_checksum}.",
191+
)
192+
157193
async def recover_state_on_failure(self, error: Exception, state: Any) -> None:
158194
"""Handles BidiReadObjectRedirectedError for reads."""
159195
routing_token, read_handle = _handle_redirect(error)
Collapse file

‎packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/tests/unit/asyncio/retry/test_reads_resumption_strategy.py
+114-2Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,48 @@ def test_initialization(self):
4545
self.assertEqual(state.bytes_written, 0)
4646
self.assertEqual(state.next_expected_offset, initial_offset)
4747
self.assertFalse(state.is_complete)
48+
self.assertFalse(state.is_full_object_read)
49+
self.assertIsNone(state.rolling_checksum)
50+
51+
def test_initialization_with_full_object_read(self):
52+
"""Test that _DownloadState initializes correctly when is_full_object_read is True."""
53+
initial_offset = 10
54+
initial_length = 100
55+
user_buffer = io.BytesIO()
56+
state_full = _DownloadState(
57+
initial_offset, initial_length, user_buffer, is_full_object_read=True
58+
)
59+
60+
self.assertEqual(state_full.initial_offset, initial_offset)
61+
self.assertEqual(state_full.initial_length, initial_length)
62+
self.assertEqual(state_full.user_buffer, user_buffer)
63+
self.assertEqual(state_full.bytes_written, 0)
64+
self.assertEqual(state_full.next_expected_offset, initial_offset)
65+
self.assertFalse(state_full.is_complete)
66+
self.assertTrue(state_full.is_full_object_read)
67+
self.assertIsNotNone(state_full.rolling_checksum)
68+
69+
def test_initialization_with_full_object_read_and_checksum_disabled(self):
70+
"""Test that _DownloadState does not initialize rolling_checksum when enable_checksum is False."""
71+
initial_offset = 10
72+
initial_length = 100
73+
user_buffer = io.BytesIO()
74+
state_full = _DownloadState(
75+
initial_offset,
76+
initial_length,
77+
user_buffer,
78+
is_full_object_read=True,
79+
enable_checksum=False,
80+
)
81+
82+
self.assertEqual(state_full.initial_offset, initial_offset)
83+
self.assertEqual(state_full.initial_length, initial_length)
84+
self.assertEqual(state_full.user_buffer, user_buffer)
85+
self.assertEqual(state_full.bytes_written, 0)
86+
self.assertEqual(state_full.next_expected_offset, initial_offset)
87+
self.assertFalse(state_full.is_complete)
88+
self.assertTrue(state_full.is_full_object_read)
89+
self.assertIsNone(state_full.rolling_checksum)
4890

4991

5092
class TestReadResumptionStrategy(unittest.TestCase):
@@ -53,12 +95,24 @@ def setUp(self):
5395

5496
self.state = {"download_states": {}, "read_handle": None, "routing_token": None}
5597

56-
def _add_download(self, read_id, offset=0, length=100, buffer=None):
98+
def _add_download(
99+
self,
100+
read_id,
101+
offset=0,
102+
length=100,
103+
buffer=None,
104+
is_full_object_read=False,
105+
enable_checksum=True,
106+
):
57107
"""Helper to inject a download state into the correct nested location."""
58108
if buffer is None:
59109
buffer = io.BytesIO()
60110
state = _DownloadState(
61-
initial_offset=offset, initial_length=length, user_buffer=buffer
111+
initial_offset=offset,
112+
initial_length=length,
113+
user_buffer=buffer,
114+
is_full_object_read=is_full_object_read,
115+
enable_checksum=enable_checksum,
62116
)
63117
self.state["download_states"][read_id] = state
64118
return state
@@ -358,3 +412,61 @@ async def run():
358412

359413
# Token should remain unchanged
360414
self.assertEqual(self.state["routing_token"], "existing-token")
415+
416+
def test_update_state_full_object_checksum_success(self):
417+
"""Test that full object checksum verification succeeds on range_end."""
418+
read_state = self._add_download(
419+
_READ_ID, offset=0, length=9, is_full_object_read=True
420+
)
421+
self.state["enable_checksum"] = True
422+
self.state["full_obj_server_crc32c"] = google_crc32c.value(b"testdata1")
423+
424+
resp1 = self._create_response(b"test", _READ_ID, offset=0)
425+
self.strategy.update_state_from_response(resp1, self.state)
426+
427+
resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True)
428+
self.strategy.update_state_from_response(resp2, self.state)
429+
430+
self.assertTrue(read_state.is_complete)
431+
self.assertEqual(read_state.bytes_written, 9)
432+
433+
def test_update_state_full_object_checksum_failure(self):
434+
"""Test that full object checksum verification raises DataCorruption on mismatch at range_end."""
435+
self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True)
436+
self.state["enable_checksum"] = True
437+
self.state["full_obj_server_crc32c"] = 111111 # Wrong server checksum!
438+
439+
resp1 = self._create_response(b"test", _READ_ID, offset=0)
440+
self.strategy.update_state_from_response(resp1, self.state)
441+
442+
resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True)
443+
with self.assertRaisesRegex(DataCorruption, "Full object checksum mismatch"):
444+
self.strategy.update_state_from_response(resp2, self.state)
445+
446+
def test_update_state_checksum_mismatch_ignored_when_disabled(self):
447+
"""Test that a CRC32C mismatch is ignored when enable_checksum is False."""
448+
self._add_download(_READ_ID)
449+
self.state["enable_checksum"] = False
450+
response = self._create_response(b"data", _READ_ID, offset=0, crc=999999)
451+
452+
# Should NOT raise DataCorruption!
453+
self.strategy.update_state_from_response(response, self.state)
454+
455+
def test_update_state_full_object_checksum_mismatch_ignored_when_disabled(self):
456+
"""Test that a full-object CRC32C mismatch is ignored when enable_checksum is False."""
457+
self._add_download(
458+
_READ_ID,
459+
offset=0,
460+
length=9,
461+
is_full_object_read=True,
462+
enable_checksum=False,
463+
)
464+
self.state["enable_checksum"] = False
465+
self.state["full_obj_server_crc32c"] = 111111 # Wrong server checksum!
466+
467+
resp1 = self._create_response(b"test", _READ_ID, offset=0)
468+
self.strategy.update_state_from_response(resp1, self.state)
469+
470+
resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True)
471+
# Should NOT raise DataCorruption!
472+
self.strategy.update_state_from_response(resp2, self.state)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.