Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 72c7a27

Browse filesBrowse files
authored
feat(storage): Enable full object checksum PR 1/3 : parse finalize_time and server crc32c in async object stream (#17261)
### 1. Overview of the Solution This solution implements end-to-end full-object checksum validation in `AsyncMultiRangeDownloader` for the asynchronous Google Cloud Storage Python client library. As asynchronous multiplexed downloads of non-contiguous ranges are performed concurrently over a single bidirectional gRPC connection, this feature automatically and incrementally calculates a rolling checksum as bytes arrive and validates it against the server's authoritative object checksum once the download completes. The technical approach consists of three coordinated layers: * **`_AsyncReadObjectStream` (Stream Ingestion)**: Safely extracts the authoritative server checksum (`full_obj_server_crc32c`) and finalization status (`is_finalized`) from the object metadata received in the first data payload response of the stream. * **`_ReadResumptionStrategy` & `_DownloadState` (Verification Logic)**: Computes an isolated, persistent rolling checksum in the individual `_DownloadState` object to ensure calculations do not bleed across concurrent multiplexed ranges. Crucially, the rolling hash updates only *after* buffer writes succeed to prevent state corruption during retry re-connects, raising a `DataCorruption` exception on completion if a mismatch occurs. * **`AsyncMultiRangeDownloader` (Orchestration & Cleanup)**: Detects candidate full-object ranges (e.g., `(0, 0)` or `(0, persisted_size)`), propagates checksum settings to the resumption strategy, and guarantees robust cleanup (closing the stream immediately and unregistering IDs) if data corruption or write errors occur. ### 2. What This PR Specifically Does This PR implements **Step 1: Stream Metadata Ingestion** of the solution: * Modifies `_AsyncReadObjectStream` to safely parse GCS object metadata from the first data payload of the response. * Populates `is_finalized`, `full_obj_server_crc32c`, and `object_metadata` attributes in `_AsyncReadObjectStream.open()`. * Adds an autouse pytest event loop fixture in `tests/unit/conftest.py` to resolve compatibility issues with `pytest-asyncio` under Python 3.11+. * Adds unit tests in `test_async_read_object_stream.py` to verify that finalization status and server-authoritative checksums are correctly extracted or skipped for unfinalized objects.
1 parent d01a4ba commit 72c7a27
Copy full SHA for 72c7a27

3 files changed

+85-1Lines changed: 85 additions & 1 deletion

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/google/cloud/storage/asyncio/async_read_object_stream.py
+15Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ def __init__(
7979
self.socket_like_rpc: Optional[AsyncBidiRpc] = None
8080
self._is_stream_open: bool = False
8181
self.persisted_size: Optional[int] = None
82+
self.is_finalized: bool = False
83+
self.full_obj_server_crc32c: Optional[int] = None
84+
self.object_metadata: Optional[_storage_v2.Object] = None
8285

8386
async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
8487
"""Opens the bidi-gRPC connection to read from the object.
@@ -132,6 +135,18 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
132135
self.generation_number = response.metadata.generation
133136
# update persisted size
134137
self.persisted_size = response.metadata.size
138+
self.object_metadata = response.metadata
139+
if (
140+
hasattr(response.metadata, "finalize_time")
141+
and response.metadata.finalize_time
142+
and response.metadata.finalize_time.second > 0
143+
):
144+
self.is_finalized = True
145+
if (
146+
hasattr(response.metadata, "checksums")
147+
and response.metadata.checksums
148+
):
149+
self.full_obj_server_crc32c = response.metadata.checksums.crc32c
135150

136151
if response and response.read_handle:
137152
self.read_handle = response.read_handle
Collapse file

‎packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/tests/unit/asyncio/test_async_read_object_stream.py
+38-1Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open
3838
socket_like_rpc.open = AsyncMock()
3939

4040
recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse)
41-
recv_response.metadata = mock.MagicMock(spec=_storage_v2.Object)
41+
recv_response.metadata = mock.MagicMock()
4242
recv_response.metadata.generation = _TEST_GENERATION_NUMBER
4343
recv_response.metadata.size = _TEST_OBJECT_SIZE
44+
recv_response.metadata.finalize_time.second = 30
45+
recv_response.metadata.checksums.crc32c = 98765
4446
recv_response.read_handle = _TEST_READ_HANDLE
4547
socket_like_rpc.recv = AsyncMock(return_value=recv_response)
4648

@@ -130,6 +132,8 @@ async def test_open(mock_client, mock_cls_async_bidi_rpc):
130132
assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER
131133
assert read_obj_stream.read_handle == _TEST_READ_HANDLE
132134
assert read_obj_stream.persisted_size == _TEST_OBJECT_SIZE
135+
assert read_obj_stream.is_finalized is True
136+
assert read_obj_stream.full_obj_server_crc32c == 98765
133137
assert read_obj_stream.is_stream_open
134138

135139

@@ -381,3 +385,36 @@ async def test_recv_updates_read_handle_on_refresh(
381385

382386
await stream.recv()
383387
assert stream.read_handle == refreshed_handle
388+
389+
390+
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
391+
@mock.patch(
392+
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
393+
)
394+
@pytest.mark.asyncio
395+
async def test_open_unfinalized_object_skips_checksum(
396+
mock_client, mock_cls_async_bidi_rpc
397+
):
398+
socket_like_rpc = AsyncMock()
399+
mock_cls_async_bidi_rpc.return_value = socket_like_rpc
400+
socket_like_rpc.open = AsyncMock()
401+
402+
recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse)
403+
recv_response.metadata = mock.MagicMock()
404+
recv_response.metadata.generation = _TEST_GENERATION_NUMBER
405+
recv_response.metadata.size = _TEST_OBJECT_SIZE
406+
recv_response.metadata.finalize_time.second = 0 # NOT finalized!
407+
recv_response.metadata.checksums.crc32c = 98765
408+
recv_response.read_handle = _TEST_READ_HANDLE
409+
socket_like_rpc.recv = AsyncMock(return_value=recv_response)
410+
411+
read_obj_stream = _AsyncReadObjectStream(
412+
client=mock_client,
413+
bucket_name=_TEST_BUCKET_NAME,
414+
object_name=_TEST_OBJECT_NAME,
415+
)
416+
417+
await read_obj_stream.open()
418+
419+
assert read_obj_stream.is_finalized is False
420+
assert read_obj_stream.full_obj_server_crc32c is None
Collapse file
+32Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2026 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import asyncio
17+
import pytest
18+
19+
20+
@pytest.fixture(autouse=True)
21+
def set_event_loop():
22+
try:
23+
asyncio.get_running_loop()
24+
yield
25+
except RuntimeError:
26+
loop = asyncio.new_event_loop()
27+
asyncio.set_event_loop(loop)
28+
try:
29+
yield
30+
finally:
31+
loop.close()
32+
asyncio.set_event_loop(None)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.