Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 3271831

Browse filesBrowse files
authored
feat(storage): Add support for blob object in AAOW (#16577)
Add support for adding custom metadata, content type and kms key name in appendable objects
1 parent aa43c83 commit 3271831
Copy full SHA for 3271831

6 files changed

+288-5Lines changed: 288 additions & 5 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file
+40Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from google.cloud import _storage_v2
16+
17+
# Map Python Blob attributes to GCS V2 Object proto field names.
18+
_BLOB_ATTR_TO_PROTO_FIELD = {
19+
"content_type": "content_type",
20+
"metadata": "metadata",
21+
"kms_key_name": "kms_key",
22+
}
23+
24+
25+
def blob_to_proto(blob):
26+
"""Converts a Blob instance to a GCS V2 Object proto message."""
27+
28+
resource_params = {
29+
"name": blob.name,
30+
}
31+
32+
if blob.bucket:
33+
resource_params["bucket"] = f"projects/_/buckets/{blob.bucket.name}"
34+
35+
for attr_name, proto_field in _BLOB_ATTR_TO_PROTO_FIELD.items():
36+
value = getattr(blob, attr_name, None)
37+
if value is not None:
38+
resource_params[proto_field] = value
39+
40+
return _storage_v2.Object(**resource_params)
Collapse file

‎packages/google-cloud-storage/google/cloud/storage/asyncio/async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/google/cloud/storage/asyncio/async_appendable_object_writer.py
+56Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from google.cloud import _storage_v2
2525
from google.cloud._storage_v2.types import BidiWriteObjectRedirectedError
2626
from google.cloud._storage_v2.types.storage import BidiWriteObjectRequest
27+
from google.cloud.storage import Blob
2728
from google.cloud.storage.asyncio.async_grpc_client import (
2829
AsyncGrpcClient,
2930
)
@@ -211,6 +212,60 @@ def __init__(
211212
self._routing_token: Optional[str] = None
212213
self.object_resource: Optional[_storage_v2.Object] = None
213214
self._flush_count = 0
215+
self.blob: Optional[Blob] = None
216+
217+
@classmethod
218+
def from_blob(
219+
cls,
220+
client: AsyncGrpcClient,
221+
blob: Blob,
222+
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
223+
writer_options: Optional[dict] = None,
224+
) -> "AsyncAppendableObjectWriter":
225+
"""Creates an AsyncAppendableObjectWriter from an existing Blob object.
226+
227+
This factory method extracts the bucket and object names directly from
228+
the provided blob instance.
229+
230+
.. code-block:: python
231+
232+
from google.cloud.storage.bucket import Bucket
233+
from google.cloud.storage.blob import Blob
234+
235+
bucket = Bucket(client, name="my-bucket")
236+
blob = Blob(name="my-object.txt", bucket=bucket)
237+
238+
writer = AsyncAppendableObjectWriter.from_blob(
239+
client=client,
240+
blob=blob
241+
)
242+
243+
:type client: :class:`~google.cloud.storage.client.AsyncGrpcClient`
244+
:param client: The async gRPC client to use for write operations.
245+
246+
:type blob: :class:`~google.cloud.storage.blob.Blob`
247+
:param blob: The blob instance providing the target path.
248+
249+
:type write_handle: :class:`~google.storage.v2.BidiWriteHandle`
250+
:param write_handle: (Optional) An existing BidiWriteHandle to resume a session.
251+
252+
:type writer_options: dict
253+
:param writer_options: (Optional) Configuration settings for the underlying
254+
appendable writer.
255+
256+
:rtype: :class:`AsyncAppendableObjectWriter`
257+
:returns: An initialized writer instance.
258+
"""
259+
instance = cls(
260+
client=client,
261+
bucket_name=blob.bucket.name,
262+
object_name=blob.name,
263+
generation=blob.generation,
264+
write_handle=write_handle,
265+
writer_options=writer_options,
266+
)
267+
instance.blob = blob
268+
return instance
214269

215270
async def state_lookup(self) -> int:
216271
"""Returns the persisted_size
@@ -297,6 +352,7 @@ async def _do_open():
297352
client=self.client.grpc_client,
298353
bucket_name=self.bucket_name,
299354
object_name=self.object_name,
355+
blob=self.blob,
300356
generation_number=self.generation,
301357
write_handle=self.write_handle,
302358
routing_token=self._routing_token,
Collapse file

‎packages/google-cloud-storage/google/cloud/storage/asyncio/async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/google/cloud/storage/asyncio/async_write_object_stream.py
+11-4Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
from google.api_core.bidi_async import AsyncBidiRpc
1919

2020
from google.cloud import _storage_v2
21+
from google.cloud.storage import Blob
22+
from google.cloud.storage import _grpc_conversions
2123
from google.cloud.storage.asyncio import _utils
2224
from google.cloud.storage.asyncio.async_abstract_object_stream import (
2325
_AsyncAbstractObjectStream,
@@ -67,6 +69,7 @@ def __init__(
6769
generation_number: Optional[int] = None, # None means new object
6870
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
6971
routing_token: Optional[str] = None,
72+
blob: Optional[Blob] = None,
7073
) -> None:
7174
if client is None:
7275
raise ValueError("client must be provided")
@@ -83,7 +86,7 @@ def __init__(
8386
self.client: AsyncGrpcClient.grpc_client = client
8487
self.write_handle: Optional[_storage_v2.BidiWriteHandle] = write_handle
8588
self.routing_token: Optional[str] = routing_token
86-
89+
self.blob: Optional[Blob] = blob
8790
self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"
8891

8992
self.rpc = self.client._client._transport._wrapped_methods[
@@ -118,11 +121,15 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
118121
# if `generation_number` == 0 new object will be created only if there
119122
# isn't any existing object.
120123
if self.generation_number is None or self.generation_number == 0:
124+
if self.blob:
125+
resource = _grpc_conversions.blob_to_proto(self.blob)
126+
else:
127+
resource = _storage_v2.Object(
128+
name=self.object_name, bucket=self._full_bucket_name
129+
)
121130
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
122131
write_object_spec=_storage_v2.WriteObjectSpec(
123-
resource=_storage_v2.Object(
124-
name=self.object_name, bucket=self._full_bucket_name
125-
),
132+
resource=resource,
126133
appendable=True,
127134
if_generation_match=self.generation_number,
128135
),
Collapse file

‎packages/google-cloud-storage/tests/system/test_zonal.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/tests/system/test_zonal.py
+131-1Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818

1919
# current library imports
20+
from google.cloud import kms
2021
from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient
2122
from google.cloud.storage.asyncio.async_multi_range_downloader import (
2223
AsyncMultiRangeDownloader,
@@ -29,7 +30,7 @@
2930

3031

3132
# TODO: replace this with a fixture once zonal bucket creation / deletion
32-
# is supported in grpc client or json client client.
33+
# is supported in grpc client or json client.
3334
_ZONAL_BUCKET = os.getenv("ZONAL_BUCKET")
3435
_CROSS_REGION_BUCKET = os.getenv("CROSS_REGION_BUCKET")
3536
_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"
@@ -40,6 +41,58 @@ async def create_async_grpc_client(attempt_direct_path=True):
4041
return AsyncGrpcClient(attempt_direct_path=attempt_direct_path)
4142

4243

44+
@pytest.fixture(scope="session")
45+
def zonal_kms_key(storage_client, kms_client):
46+
"""Provisions a KMS key in the same location as of the zonal bucket."""
47+
# Get the zonal bucket and extract its location
48+
bucket = storage_client.get_bucket(_ZONAL_BUCKET)
49+
location = bucket.location.lower()
50+
51+
project = storage_client.project
52+
keyring_name = "gcs-test-zonal-ring"
53+
key_name = "gcs-test-zonal-key"
54+
55+
keyring_path = kms_client.key_ring_path(project, location, keyring_name)
56+
57+
# Create the KeyRing if it doesn't exist
58+
try:
59+
kms_client.get_key_ring(name=keyring_path)
60+
except NotFound:
61+
parent = f"projects/{project}/locations/{location}"
62+
kms_client.create_key_ring(
63+
request={"parent": parent, "key_ring_id": keyring_name, "key_ring": {}}
64+
)
65+
66+
# Grant GCS service account permissions to use the key
67+
service_account_email = storage_client.get_service_account_email()
68+
policy = {
69+
"bindings": [
70+
{
71+
"role": "roles/cloudkms.cryptoKeyEncrypterDecrypter",
72+
"members": [f"serviceAccount:{service_account_email}"],
73+
}
74+
]
75+
}
76+
kms_client.set_iam_policy(request={"resource": keyring_path, "policy": policy})
77+
78+
# Create the CryptoKey if it doesn't exist
79+
key_path = kms_client.crypto_key_path(project, location, keyring_name, key_name)
80+
try:
81+
kms_client.get_crypto_key(name=key_path)
82+
except NotFound:
83+
purpose = kms.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT
84+
key = {"purpose": purpose}
85+
kms_client.create_crypto_key(
86+
request={
87+
"parent": keyring_path,
88+
"crypto_key_id": key_name,
89+
"crypto_key": key,
90+
}
91+
)
92+
93+
return key_path
94+
95+
4396
@pytest.fixture(scope="session")
4497
def event_loop():
4598
"""Redefine pytest-asyncio's event_loop fixture to be session-scoped."""
@@ -286,6 +339,83 @@ async def _run():
286339
event_loop.run_until_complete(_run())
287340

288341

342+
def test_write_from_blob(
343+
storage_client,
344+
blobs_to_delete,
345+
event_loop,
346+
grpc_client,
347+
):
348+
object_name = f"test_from_blob-{str(uuid.uuid4())[:4]}"
349+
content_type = "text/plain"
350+
metadata = {"environment": "system-test"}
351+
test_data = b"system-test-data"
352+
353+
async def _run():
354+
# 1. Create a Blob instance
355+
blob = storage_client.bucket(_ZONAL_BUCKET).blob(object_name)
356+
blob.content_type = content_type
357+
blob.metadata = metadata
358+
359+
# 2. Use from_blob to create the writer
360+
writer = AsyncAppendableObjectWriter.from_blob(grpc_client, blob)
361+
await writer.open()
362+
await writer.append(test_data)
363+
await writer.close(finalize_on_close=True)
364+
365+
# 3. Verify the object metadata
366+
obj = await grpc_client.get_object(
367+
bucket_name=_ZONAL_BUCKET,
368+
object_name=object_name,
369+
)
370+
371+
assert obj.content_type == content_type
372+
assert obj.metadata["environment"] == "system-test"
373+
374+
blobs_to_delete.append(blob)
375+
376+
event_loop.run_until_complete(_run())
377+
378+
379+
def test_write_from_blob_with_kms_key(
380+
storage_client,
381+
blobs_to_delete,
382+
event_loop,
383+
grpc_client,
384+
zonal_kms_key,
385+
):
386+
"""Verifies AsyncAppendableObjectWriter.from_blob correctly applies KMS encryption."""
387+
388+
object_name = f"test_from_blob_kms-{str(uuid.uuid4())[:4]}"
389+
test_data = b"kms-protected-data"
390+
391+
async def _run():
392+
# Create a local Blob instance with the KMS key
393+
blob = storage_client.bucket(_ZONAL_BUCKET).blob(
394+
object_name, kms_key_name=zonal_kms_key
395+
)
396+
397+
writer = AsyncAppendableObjectWriter.from_blob(grpc_client, blob)
398+
399+
await writer.open()
400+
await writer.append(test_data)
401+
402+
await writer.close(finalize_on_close=True)
403+
404+
# Verify the encryption metadata
405+
obj = await grpc_client.get_object(
406+
bucket_name=_ZONAL_BUCKET,
407+
object_name=object_name,
408+
)
409+
410+
# Assert that the object was encrypted with the correct key
411+
# GCS appends a version suffix, so we use startswith()
412+
assert obj.kms_key.startswith(zonal_kms_key)
413+
414+
blobs_to_delete.append(blob)
415+
416+
event_loop.run_until_complete(_run())
417+
418+
289419
def test_read_unfinalized_appendable_object(
290420
storage_client, blobs_to_delete, event_loop, grpc_client_direct
291421
):
Collapse file

‎packages/google-cloud-storage/tests/unit/asyncio/test_async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/tests/unit/asyncio/test_async_appendable_object_writer.py
+19Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import pytest
2020
from google.api_core import exceptions
2121
from google.rpc import status_pb2
22+
from google.cloud.storage import Blob
2223

2324
from google.cloud._storage_v2.types import storage as storage_type
2425
from google.cloud._storage_v2.types.storage import BidiWriteObjectRedirectedError
@@ -166,6 +167,24 @@ def test_init_raises_if_crc32c_missing(self, mock_appendable_writer):
166167
with pytest.raises(exceptions.FailedPrecondition):
167168
self._make_one(mock_appendable_writer["mock_client"])
168169

170+
def test_from_blob(self, mock_appendable_writer):
171+
mock_blob = mock.Mock(spec=Blob)
172+
mock_blob.name = OBJECT
173+
mock_blob.bucket.name = BUCKET
174+
mock_blob.generation = GENERATION
175+
176+
writer = AsyncAppendableObjectWriter.from_blob(
177+
mock_appendable_writer["mock_client"],
178+
mock_blob,
179+
writer_options={"FLUSH_INTERVAL_BYTES": EIGHT_MIB},
180+
)
181+
182+
assert writer.bucket_name == BUCKET
183+
assert writer.object_name == OBJECT
184+
assert writer.generation == GENERATION
185+
assert writer.flush_interval == EIGHT_MIB
186+
assert writer.blob == mock_blob
187+
169188
# -------------------------------------------------------------------------
170189
# Stream Lifecycle Tests
171190
# -------------------------------------------------------------------------

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.