Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2e1a1eb

Browse filesBrowse files
feat(samples): add samples for appendable objects writes and reads (#1705)
feat(samples): add samples for appendable objects writes and reads --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent ea0f5bf commit 2e1a1eb
Copy full SHA for 2e1a1eb
Expand file treeCollapse file tree

14 files changed

+1010
-10
lines changed
Open diff view settings
Collapse file

‎cloudbuild/run_zonal_tests.sh‎

Copy file name to clipboardExpand all lines: cloudbuild/run_zonal_tests.sh
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ export RUN_ZONAL_SYSTEM_TESTS=True
2626
CURRENT_ULIMIT=$(ulimit -n)
2727
echo '--- Running Zonal tests on VM with ulimit set to ---' $CURRENT_ULIMIT
2828
pytest -vv -s --log-format='%(asctime)s %(levelname)s %(message)s' --log-date-format='%H:%M:%S' tests/system/test_zonal.py
29+
pytest -vv -s --log-format='%(asctime)s %(levelname)s %(message)s' --log-date-format='%H:%M:%S' samples/snippets/zonal_buckets/zonal_snippets_test.py
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py
+9-1Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,11 +303,19 @@ async def download_ranges(
303303
304304
:type read_ranges: List[Tuple[int, int, "BytesIO"]]
305305
:param read_ranges: A list of tuples, where each tuple represents a
306-
combintaion of byte_range and writeable buffer in format -
306+
combination of byte_range and writeable buffer in format -
307307
(`start_byte`, `bytes_to_read`, `writeable_buffer`). Buffer has
308308
to be provided by the user, and user has to make sure appropriate
309309
memory is available in the application to avoid out-of-memory crash.
310310
311+
Special cases:
312+
if the value of `bytes_to_read` is 0, it'll be interpreted as
313+
download all contents until the end of the file from `start_byte`.
314+
Examples:
315+
* (0, 0, buffer) : downloads 0 to end , i.e. entire object.
316+
* (100, 0, buffer) : downloads from 100 to end.
317+
318+
311319
:type lock: asyncio.Lock
312320
:param lock: (Optional) An asyncio lock to synchronize sends and recvs
313321
on the underlying bidi-GRPC stream. This is required when multiple
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_write_object_stream.py
+10-2Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,16 @@ def __init__(
101101
self.object_resource: Optional[_storage_v2.Object] = None
102102

103103
async def open(self) -> None:
104-
"""Opening an object for write , should do it's state lookup
105-
to know what's the persisted size is.
104+
"""
105+
Opens the bidi-gRPC connection to write to the object.
106+
107+
This method sends an initial request to start the stream and receives
108+
the first response containing metadata and a write handle.
109+
110+
:rtype: None
111+
:raises ValueError: If the stream is already open.
112+
:raises google.api_core.exceptions.FailedPrecondition:
113+
if `generation_number` is 0 and object already exists.
106114
"""
107115
if self._is_stream_open:
108116
raise ValueError("Stream is already open")
Collapse file
+77Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2026 Google Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the 'License');
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import argparse
18+
import asyncio
19+
20+
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
21+
AsyncAppendableObjectWriter,
22+
)
23+
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
24+
25+
26+
# [START storage_create_and_write_appendable_object]
27+
28+
29+
async def storage_create_and_write_appendable_object(
30+
bucket_name, object_name, grpc_client=None
31+
):
32+
"""Uploads an appendable object to zonal bucket.
33+
34+
grpc_client: an existing grpc_client to use, this is only for testing.
35+
"""
36+
37+
if grpc_client is None:
38+
grpc_client = AsyncGrpcClient().grpc_client
39+
writer = AsyncAppendableObjectWriter(
40+
client=grpc_client,
41+
bucket_name=bucket_name,
42+
object_name=object_name,
43+
generation=0, # throws `FailedPrecondition` if object already exists.
44+
)
45+
# This creates a new appendable object of size 0 and opens it for appending.
46+
await writer.open()
47+
48+
# appends data to the object
49+
# you can perform `.append` multiple times as needed. Data will be appended
50+
# to the end of the object.
51+
await writer.append(b"Some data")
52+
53+
# Once all appends are done, close the gRPC bidirectional stream.
54+
await writer.close()
55+
56+
print(
57+
f"Appended object {object_name} created of size {writer.persisted_size} bytes."
58+
)
59+
60+
61+
# [END storage_create_and_write_appendable_object]
62+
63+
if __name__ == "__main__":
64+
parser = argparse.ArgumentParser(
65+
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
66+
)
67+
parser.add_argument("--bucket_name", help="Your Cloud Storage bucket name.")
68+
parser.add_argument("--object_name", help="Your Cloud Storage object name.")
69+
70+
args = parser.parse_args()
71+
72+
asyncio.run(
73+
storage_create_and_write_appendable_object(
74+
bucket_name=args.bucket_name,
75+
object_name=args.object_name,
76+
)
77+
)
Collapse file
+78Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2026 Google Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the 'License');
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import argparse
18+
import asyncio
19+
20+
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
21+
AsyncAppendableObjectWriter,
22+
)
23+
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
24+
25+
26+
# [START storage_finalize_appendable_object_upload]
27+
async def storage_finalize_appendable_object_upload(
28+
bucket_name, object_name, grpc_client=None
29+
):
30+
"""Creates, writes to, and finalizes an appendable object.
31+
32+
grpc_client: an existing grpc_client to use, this is only for testing.
33+
"""
34+
35+
if grpc_client is None:
36+
grpc_client = AsyncGrpcClient().grpc_client
37+
writer = AsyncAppendableObjectWriter(
38+
client=grpc_client,
39+
bucket_name=bucket_name,
40+
object_name=object_name,
41+
generation=0, # throws `FailedPrecondition` if object already exists.
42+
)
43+
# This creates a new appendable object of size 0 and opens it for appending.
44+
await writer.open()
45+
46+
# Appends data to the object.
47+
await writer.append(b"Some data")
48+
49+
# finalize the appendable object,
50+
# NOTE:
51+
# 1. once finalized no more appends can be done to the object.
52+
# 2. If you don't want to finalize, you can simply call `writer.close`
53+
# 3. calling `.finalize()` also closes the grpc-bidi stream, calling
54+
# `.close` after `.finalize` may lead to undefined behavior.
55+
object_resource = await writer.finalize()
56+
57+
print(f"Appendable object {object_name} created and finalized.")
58+
print("Object Metadata:")
59+
print(object_resource)
60+
61+
62+
# [END storage_finalize_appendable_object_upload]
63+
64+
if __name__ == "__main__":
65+
parser = argparse.ArgumentParser(
66+
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
67+
)
68+
parser.add_argument("--bucket_name", help="Your Cloud Storage bucket name.")
69+
parser.add_argument("--object_name", help="Your Cloud Storage object name.")
70+
71+
args = parser.parse_args()
72+
73+
asyncio.run(
74+
storage_finalize_appendable_object_upload(
75+
bucket_name=args.bucket_name,
76+
object_name=args.object_name,
77+
)
78+
)
Collapse file
+85Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2026 Google Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the 'License');
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""Downloads a range of bytes from multiple objects concurrently."""
18+
import argparse
19+
import asyncio
20+
from io import BytesIO
21+
22+
from google.cloud.storage._experimental.asyncio.async_grpc_client import (
23+
AsyncGrpcClient,
24+
)
25+
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
26+
AsyncMultiRangeDownloader,
27+
)
28+
29+
30+
# [START storage_open_multiple_objects_ranged_read]
31+
async def storage_open_multiple_objects_ranged_read(
32+
bucket_name, object_names, grpc_client=None
33+
):
34+
"""Downloads a range of bytes from multiple objects concurrently.
35+
36+
grpc_client: an existing grpc_client to use, this is only for testing.
37+
"""
38+
if grpc_client is None:
39+
grpc_client = AsyncGrpcClient().grpc_client
40+
41+
async def _download_range(object_name):
42+
"""Helper coroutine to download a range from a single object."""
43+
mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)
44+
try:
45+
# Open the object, mrd always opens in read mode.
46+
await mrd.open()
47+
48+
# Each object downloads the first 100 bytes.
49+
start_byte = 0
50+
size = 100
51+
52+
# requested range will be downloaded into this buffer, user may provide
53+
# their own buffer or file-like object.
54+
output_buffer = BytesIO()
55+
await mrd.download_ranges([(start_byte, size, output_buffer)])
56+
finally:
57+
if mrd.is_stream_open:
58+
await mrd.close()
59+
60+
# Downloaded size can differ from requested size if object is smaller.
61+
# mrd will download at most up to the end of the object.
62+
downloaded_size = output_buffer.getbuffer().nbytes
63+
print(f"Downloaded {downloaded_size} bytes from {object_name}")
64+
65+
download_tasks = [_download_range(name) for name in object_names]
66+
await asyncio.gather(*download_tasks)
67+
68+
69+
# [END storage_open_multiple_objects_ranged_read]
70+
71+
72+
if __name__ == "__main__":
73+
parser = argparse.ArgumentParser(
74+
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
75+
)
76+
parser.add_argument("--bucket_name", help="Your Cloud Storage bucket name.")
77+
parser.add_argument(
78+
"--object_names", nargs="+", help="Your Cloud Storage object name(s)."
79+
)
80+
81+
args = parser.parse_args()
82+
83+
asyncio.run(
84+
storage_open_multiple_objects_ranged_read(args.bucket_name, args.object_names)
85+
)
Collapse file
+85Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2026 Google Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the 'License');
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import argparse
18+
import asyncio
19+
from io import BytesIO
20+
21+
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
22+
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
23+
AsyncMultiRangeDownloader,
24+
)
25+
26+
27+
# [START storage_open_object_multiple_ranged_read]
28+
async def storage_open_object_multiple_ranged_read(
29+
bucket_name, object_name, grpc_client=None
30+
):
31+
"""Downloads multiple ranges of bytes from a single object into different buffers.
32+
33+
grpc_client: an existing grpc_client to use, this is only for testing.
34+
"""
35+
if grpc_client is None:
36+
grpc_client = AsyncGrpcClient().grpc_client
37+
38+
mrd = AsyncMultiRangeDownloader(grpc_client, bucket_name, object_name)
39+
40+
try:
41+
# Open the object, mrd always opens in read mode.
42+
await mrd.open()
43+
44+
# Specify four different buffers to download ranges into.
45+
buffers = [BytesIO(), BytesIO(), BytesIO(), BytesIO()]
46+
47+
# Define the ranges to download. Each range is a tuple of (start_byte, size, buffer).
48+
# All ranges will download 10 bytes from different starting positions.
49+
# We choose arbitrary start bytes for this example. An object should be large enough.
50+
# A user can choose any start byte between 0 and `object_size`.
51+
# If `start_bytes` is greater than `object_size`, mrd will throw an error.
52+
ranges = [
53+
(0, 10, buffers[0]),
54+
(20, 10, buffers[1]),
55+
(40, 10, buffers[2]),
56+
(60, 10, buffers[3]),
57+
]
58+
59+
await mrd.download_ranges(ranges)
60+
61+
finally:
62+
await mrd.close()
63+
64+
# Print the downloaded content from each buffer.
65+
for i, output_buffer in enumerate(buffers):
66+
downloaded_size = output_buffer.getbuffer().nbytes
67+
print(
68+
f"Downloaded {downloaded_size} bytes into buffer {i + 1} from start byte {ranges[i][0]}: {output_buffer.getvalue()}"
69+
)
70+
71+
72+
# [END storage_open_object_multiple_ranged_read]
73+
74+
if __name__ == "__main__":
75+
parser = argparse.ArgumentParser(
76+
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
77+
)
78+
parser.add_argument("--bucket_name", help="Your Cloud Storage bucket name.")
79+
parser.add_argument("--object_name", help="Your Cloud Storage object name.")
80+
81+
args = parser.parse_args()
82+
83+
asyncio.run(
84+
storage_open_object_multiple_ranged_read(args.bucket_name, args.object_name)
85+
)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.