Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit fdd0a50

Browse filesBrowse files
authored
feat(zb-experimental): Add Async_appendable_object_writer.py (#1616)
feat(zb-experimental): Add Async_appendable_object_writer.py
1 parent 6da1186 commit fdd0a50
Copy full SHA for fdd0a50

File tree

Expand file treeCollapse file tree

2 files changed

+273
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+273
-0
lines changed
Open diff view settings
Collapse file
+158Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""
15+
NOTE:
16+
This is _experimental module for upcoming support for Rapid Storage.
17+
(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new)
18+
19+
APIs may not work as intended and are not stable yet. Feature is not
20+
GA(Generally Available) yet, please contact your TAM (Technical Account Manager)
21+
if you want to use these Rapid Storage APIs.
22+
23+
"""
24+
from typing import Optional
25+
from google.cloud.storage._experimental.asyncio.async_grpc_client import (
26+
AsyncGrpcClient,
27+
)
28+
from google.cloud.storage._experimental.asyncio.async_write_object_stream import (
29+
_AsyncWriteObjectStream,
30+
)
31+
32+
33+
class AsyncAppendableObjectWriter:
34+
"""Class for appending data to a GCS Appendable Object asynchronously."""
35+
36+
def __init__(
37+
self,
38+
client: AsyncGrpcClient.grpc_client,
39+
bucket_name: str,
40+
object_name: str,
41+
generation=None,
42+
write_handle=None,
43+
):
44+
"""
45+
Class for appending data to a GCS Appendable Object.
46+
47+
Example usage:
48+
49+
```
50+
51+
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
52+
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import AsyncAppendableObjectWriter
53+
import asyncio
54+
55+
client = AsyncGrpcClient().grpc_client
56+
bucket_name = "my-bucket"
57+
object_name = "my-appendable-object"
58+
59+
# instantiate the writer
60+
writer = AsyncAppendableObjectWriter(client, bucket_name, object_name)
61+
# open the writer, (underlying gRPC bidi-stream will be opened)
62+
await writer.open()
63+
64+
# append data, it can be called multiple times.
65+
await writer.append(b"hello world")
66+
await writer.append(b"some more data")
67+
68+
# optionally flush data to persist.
69+
await writer.flush()
70+
71+
# close the gRPC stream.
72+
# Please note closing the program will also close the stream,
73+
# however it's recommended to close the stream if no more data to append
74+
# to clean up gRPC connection (which means CPU/memory/network resources)
75+
await writer.close()
76+
```
77+
78+
:type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client`
79+
:param client: async grpc client to use for making API requests.
80+
81+
:type bucket_name: str
82+
:param bucket_name: The name of the GCS bucket containing the object.
83+
84+
:type object_name: str
85+
:param object_name: The name of the GCS Appendable Object to be written.
86+
87+
:type generation: int
88+
:param generation: (Optional) If present, selects a specific revision of
89+
that object.
90+
If None, a new object is created.
91+
If None and Object already exists then it'll will be
92+
overwritten.
93+
94+
:type write_handle: bytes
95+
:param write_handle: (Optional) An existing handle for writing the object.
96+
If provided, opening the bidi-gRPC connection will be faster.
97+
"""
98+
self.client = client
99+
self.bucket_name = bucket_name
100+
self.object_name = object_name
101+
self.write_handle = write_handle
102+
self.generation = generation
103+
104+
self.write_obj_stream = _AsyncWriteObjectStream(
105+
client=self.client,
106+
bucket_name=self.bucket_name,
107+
object_name=self.object_name,
108+
generation_number=self.generation,
109+
write_handle=self.write_handle,
110+
)
111+
self._is_stream_open: bool = False
112+
self.offset: Optional[int] = None
113+
self.persisted_size: Optional[int] = None
114+
115+
async def state_lookup(self):
116+
"""Returns the persisted_size."""
117+
raise NotImplementedError("state_lookup is not implemented yet.")
118+
119+
async def open(self) -> None:
120+
"""Opens the underlying bidi-gRPC stream."""
121+
raise NotImplementedError("open is not implemented yet.")
122+
123+
async def append(self, data: bytes):
124+
raise NotImplementedError("append is not implemented yet.")
125+
126+
async def flush(self) -> int:
127+
"""Returns persisted_size"""
128+
raise NotImplementedError("flush is not implemented yet.")
129+
130+
async def close(self, finalize_on_close=False) -> int:
131+
"""Returns persisted_size"""
132+
raise NotImplementedError("close is not implemented yet.")
133+
134+
async def finalize(self) -> int:
135+
"""Returns persisted_size
136+
Note: Once finalized no more data can be appended.
137+
"""
138+
raise NotImplementedError("finalize is not implemented yet.")
139+
140+
# helper methods.
141+
async def append_from_string(self, data: str):
142+
"""
143+
str data will be encoded to bytes using utf-8 encoding calling
144+
145+
self.append(data.encode("utf-8"))
146+
"""
147+
raise NotImplementedError("append_from_string is not implemented yet.")
148+
149+
async def append_from_stream(self, stream_obj):
150+
"""
151+
At a time read a chunk of data (16MiB) from `stream_obj`
152+
and call self.append(chunk)
153+
"""
154+
raise NotImplementedError("append_from_stream is not implemented yet.")
155+
156+
async def append_from_file(self, file_path: str):
157+
"""Create a file object from `file_path` and call append_from_stream(file_obj)"""
158+
raise NotImplementedError("append_from_file is not implemented yet.")
Collapse file
+115Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
from unittest import mock
17+
18+
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
19+
AsyncAppendableObjectWriter,
20+
)
21+
22+
BUCKET = "test-bucket"
23+
OBJECT = "test-object"
24+
GENERATION = 123
25+
WRITE_HANDLE = b"test-write-handle"
26+
27+
28+
@pytest.fixture
29+
def mock_client():
30+
"""Mock the async gRPC client."""
31+
return mock.AsyncMock()
32+
33+
34+
@mock.patch(
35+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
36+
)
37+
def test_init(mock_write_object_stream, mock_client):
38+
"""Test the constructor of AsyncAppendableObjectWriter."""
39+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
40+
41+
assert writer.client == mock_client
42+
assert writer.bucket_name == BUCKET
43+
assert writer.object_name == OBJECT
44+
assert writer.generation is None
45+
assert writer.write_handle is None
46+
assert not writer._is_stream_open
47+
assert writer.offset is None
48+
assert writer.persisted_size is None
49+
50+
mock_write_object_stream.assert_called_once_with(
51+
client=mock_client,
52+
bucket_name=BUCKET,
53+
object_name=OBJECT,
54+
generation_number=None,
55+
write_handle=None,
56+
)
57+
assert writer.write_obj_stream == mock_write_object_stream.return_value
58+
59+
60+
@mock.patch(
61+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
62+
)
63+
def test_init_with_optional_args(mock_write_object_stream, mock_client):
64+
"""Test the constructor with optional arguments."""
65+
writer = AsyncAppendableObjectWriter(
66+
mock_client,
67+
BUCKET,
68+
OBJECT,
69+
generation=GENERATION,
70+
write_handle=WRITE_HANDLE,
71+
)
72+
73+
assert writer.generation == GENERATION
74+
assert writer.write_handle == WRITE_HANDLE
75+
76+
mock_write_object_stream.assert_called_once_with(
77+
client=mock_client,
78+
bucket_name=BUCKET,
79+
object_name=OBJECT,
80+
generation_number=GENERATION,
81+
write_handle=WRITE_HANDLE,
82+
)
83+
84+
85+
@pytest.mark.asyncio
86+
async def test_unimplemented_methods_raise_error(mock_client):
87+
"""Test that all currently unimplemented methods raise NotImplementedError."""
88+
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
89+
90+
with pytest.raises(NotImplementedError):
91+
await writer.state_lookup()
92+
93+
with pytest.raises(NotImplementedError):
94+
await writer.open()
95+
96+
with pytest.raises(NotImplementedError):
97+
await writer.append(b"data")
98+
99+
with pytest.raises(NotImplementedError):
100+
await writer.flush()
101+
102+
with pytest.raises(NotImplementedError):
103+
await writer.close()
104+
105+
with pytest.raises(NotImplementedError):
106+
await writer.finalize()
107+
108+
with pytest.raises(NotImplementedError):
109+
await writer.append_from_string("data")
110+
111+
with pytest.raises(NotImplementedError):
112+
await writer.append_from_stream(mock.Mock())
113+
114+
with pytest.raises(NotImplementedError):
115+
await writer.append_from_file("file.txt")

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.