Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit ea0f5bf

Browse filesBrowse files
feat(ZonalBuckets): add support for generation=0 to avoid overwriting existing objects and add is_stream_open support (#1709)
feat(ZonalBuckets): add support for `generation=0` to prevent overwriting existing objects feat(ZonalBuckets): add `is_stream_open` property to AsyncAppendableObjectWriter for stream status check --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 6c16079 commit ea0f5bf
Copy full SHA for ea0f5bf

File tree

Expand file treeCollapse file tree

5 files changed

+155
-18
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+155
-18
lines changed
Open diff view settings
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py
+26-8Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,29 @@ def __init__(
9797
:param object_name: The name of the GCS Appendable Object to be written.
9898
9999
:type generation: int
100-
:param generation: (Optional) If present, selects a specific revision of
101-
that object.
102-
If None, a new object is created.
103-
If None and Object already exists then it'll will be
104-
overwritten.
100+
:param generation: (Optional) If present, creates writer for that
101+
specific revision of that object. Use this to append data to an
102+
existing Appendable Object.
103+
104+
Setting to ``0`` makes the `writer.open()` succeed only if
105+
object doesn't exist in the bucket (useful for not accidentally
106+
overwriting existing objects).
107+
108+
Warning: If `None`, a new object is created. If an object with the
109+
same name already exists, it will be overwritten the moment
110+
`writer.open()` is called.
105111
106112
:type write_handle: bytes
107-
:param write_handle: (Optional) An existing handle for writing the object.
108-
If provided, opening the bidi-gRPC connection will be faster.
113+
:param write_handle: (Optional) An handle for writing the object.
114+
If provided, opening the bidi-gRPC connection will be faster.
115+
116+
:type writer_options: dict
117+
:param writer_options: (Optional) A dictionary of writer options.
118+
Supported options:
119+
- "FLUSH_INTERVAL_BYTES": int
120+
The number of bytes to append before "persisting" data in GCS
121+
servers. Default is `_DEFAULT_FLUSH_INTERVAL_BYTES`.
122+
Must be a multiple of `_MAX_CHUNK_SIZE_BYTES`.
109123
"""
110124
raise_if_no_fast_crc32c()
111125
self.client = client
@@ -133,7 +147,6 @@ def __init__(
133147
self.flush_interval = writer_options.get(
134148
"FLUSH_INTERVAL_BYTES", _DEFAULT_FLUSH_INTERVAL_BYTES
135149
)
136-
# TODO: add test case for this.
137150
if self.flush_interval < _MAX_CHUNK_SIZE_BYTES:
138151
raise exceptions.OutOfRange(
139152
f"flush_interval must be >= {_MAX_CHUNK_SIZE_BYTES} , but provided {self.flush_interval}"
@@ -346,6 +359,11 @@ async def finalize(self) -> _storage_v2.Object:
346359
self.offset = None
347360
return self.object_resource
348361

362+
@property
363+
def is_stream_open(self) -> bool:
364+
return self._is_stream_open
365+
366+
349367
# helper methods.
350368
async def append_from_string(self, data: str):
351369
"""
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_write_object_stream.py
+15-4Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,17 @@ class _AsyncWriteObjectStream(_AsyncAbstractObjectStream):
4747
:param object_name: The name of the GCS ``Appendable Object`` to be write.
4848
4949
:type generation_number: int
50-
:param generation_number: (Optional) If present, selects a specific revision of
51-
this object. If None, a new object is created.
50+
:param generation_number: (Optional) If present, creates writer for that
51+
specific revision of that object. Use this to append data to an
52+
existing Appendable Object.
53+
54+
Setting to ``0`` makes the `writer.open()` succeed only if
55+
object doesn't exist in the bucket (useful for not accidentally
56+
overwriting existing objects).
57+
58+
Warning: If `None`, a new object is created. If an object with the
59+
same name already exists, it will be overwritten the moment
60+
`writer.open()` is called.
5261
5362
:type write_handle: bytes
5463
:param write_handle: (Optional) An existing handle for writing the object.
@@ -101,13 +110,16 @@ async def open(self) -> None:
101110
# Create a new object or overwrite existing one if generation_number
102111
# is None. This makes it consistent with GCS JSON API behavior.
103112
# Created object type would be Appendable Object.
104-
if self.generation_number is None:
113+
# if `generation_number` == 0 new object will be created only if there
114+
# isn't any existing object.
115+
if self.generation_number is None or self.generation_number == 0:
105116
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
106117
write_object_spec=_storage_v2.WriteObjectSpec(
107118
resource=_storage_v2.Object(
108119
name=self.object_name, bucket=self._full_bucket_name
109120
),
110121
appendable=True,
122+
if_generation_match=self.generation_number,
111123
),
112124
)
113125
else:
@@ -118,7 +130,6 @@ async def open(self) -> None:
118130
generation=self.generation_number,
119131
),
120132
)
121-
122133
self.socket_like_rpc = AsyncBidiRpc(
123134
self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata
124135
)
Collapse file

‎tests/system/test_zonal.py‎

Copy file name to clipboardExpand all lines: tests/system/test_zonal.py
+72Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
1919
AsyncMultiRangeDownloader,
2020
)
21+
from google.api_core.exceptions import FailedPrecondition
2122

2223

2324
pytestmark = pytest.mark.skipif(
@@ -360,3 +361,74 @@ async def test_append_flushes_and_state_lookup(storage_client, blobs_to_delete):
360361
await mrd.close()
361362
content = buffer.getvalue()
362363
assert content == full_data
364+
365+
@pytest.mark.asyncio
366+
async def test_open_with_generation_zero(storage_client, blobs_to_delete):
367+
"""Tests that using `generation=0` fails if the object already exists.
368+
369+
This test verifies that:
370+
1. An object can be created using `AsyncAppendableObjectWriter` with `generation=0`.
371+
2. Attempting to create the same object again with `generation=0` raises a
372+
`FailedPrecondition` error with a 400 status code, because the
373+
precondition (object must not exist) is not met.
374+
"""
375+
object_name = f"test_append_with_generation-{uuid.uuid4()}"
376+
grpc_client = AsyncGrpcClient().grpc_client
377+
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name, generation=0)
378+
379+
# Empty object is created.
380+
await writer.open()
381+
assert writer.is_stream_open
382+
383+
await writer.close()
384+
assert not writer.is_stream_open
385+
386+
387+
with pytest.raises(FailedPrecondition) as exc_info:
388+
writer = AsyncAppendableObjectWriter(
389+
grpc_client, _ZONAL_BUCKET, object_name, generation=0
390+
)
391+
await writer.open()
392+
assert exc_info.value.code == 400
393+
394+
# cleanup
395+
del writer
396+
gc.collect()
397+
398+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
399+
400+
@pytest.mark.asyncio
401+
async def test_open_existing_object_with_gen_None_overrides_existing(storage_client, blobs_to_delete):
402+
"""
403+
Test that a new writer when specifies `None` overrides the existing object.
404+
"""
405+
object_name = f"test_append_with_generation-{uuid.uuid4()}"
406+
407+
grpc_client = AsyncGrpcClient().grpc_client
408+
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name, generation=0)
409+
410+
# Empty object is created.
411+
await writer.open()
412+
assert writer.is_stream_open
413+
old_gen = writer.generation
414+
415+
416+
await writer.close()
417+
assert not writer.is_stream_open
418+
419+
420+
421+
new_writer = AsyncAppendableObjectWriter(
422+
grpc_client, _ZONAL_BUCKET, object_name, generation=None
423+
)
424+
await new_writer.open()
425+
assert new_writer.generation != old_gen
426+
427+
# assert exc_info.value.code == 400
428+
429+
# cleanup
430+
del writer
431+
del new_writer
432+
gc.collect()
433+
434+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
Collapse file

‎tests/unit/asyncio/test_async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_appendable_object_writer.py
+6-6Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def test_init(mock_write_object_stream, mock_client):
5555
assert writer.object_name == OBJECT
5656
assert writer.generation is None
5757
assert writer.write_handle is None
58-
assert not writer._is_stream_open
58+
assert not writer.is_stream_open
5959
assert writer.offset is None
6060
assert writer.persisted_size is None
6161
assert writer.bytes_appended_since_last_flush == 0
@@ -225,7 +225,7 @@ async def test_open_appendable_object_writer(mock_write_object_stream, mock_clie
225225

226226
# Assert
227227
mock_stream.open.assert_awaited_once()
228-
assert writer._is_stream_open
228+
assert writer.is_stream_open
229229
assert writer.generation == GENERATION
230230
assert writer.write_handle == WRITE_HANDLE
231231
assert writer.persisted_size == 0
@@ -255,7 +255,7 @@ async def test_open_appendable_object_writer_existing_object(
255255

256256
# Assert
257257
mock_stream.open.assert_awaited_once()
258-
assert writer._is_stream_open
258+
assert writer.is_stream_open
259259
assert writer.generation == GENERATION
260260
assert writer.write_handle == WRITE_HANDLE
261261
assert writer.persisted_size == PERSISTED_SIZE
@@ -379,7 +379,7 @@ async def test_close(mock_write_object_stream, mock_client):
379379
mock_stream.close.assert_awaited_once()
380380
assert writer.offset is None
381381
assert persisted_size == 1024
382-
assert not writer._is_stream_open
382+
assert not writer.is_stream_open
383383

384384

385385
@pytest.mark.asyncio
@@ -415,7 +415,7 @@ async def test_finalize_on_close(mock_write_object_stream, mock_client):
415415

416416
# Assert
417417
mock_stream.close.assert_awaited_once()
418-
assert not writer._is_stream_open
418+
assert not writer.is_stream_open
419419
assert writer.offset is None
420420
assert writer.object_resource == mock_resource
421421
assert writer.persisted_size == 2048
@@ -448,7 +448,7 @@ async def test_finalize(mock_write_object_stream, mock_client):
448448
assert writer.object_resource == mock_resource
449449
assert writer.persisted_size == 123
450450
assert gcs_object == mock_resource
451-
assert writer._is_stream_open is False
451+
assert not writer.is_stream_open
452452
assert writer.offset is None
453453

454454

Collapse file

‎tests/unit/asyncio/test_async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_write_object_stream.py
+36Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,42 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
148148
assert stream.persisted_size == 0
149149

150150

151+
@pytest.mark.asyncio
152+
@mock.patch(
153+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
154+
)
155+
async def test_open_for_new_object_with_generation_zero(mock_async_bidi_rpc, mock_client):
156+
"""Test opening a stream for a new object."""
157+
# Arrange
158+
socket_like_rpc = mock.AsyncMock()
159+
mock_async_bidi_rpc.return_value = socket_like_rpc
160+
socket_like_rpc.open = mock.AsyncMock()
161+
162+
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
163+
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
164+
mock_response.resource.generation = GENERATION
165+
mock_response.resource.size = 0
166+
mock_response.write_handle = WRITE_HANDLE
167+
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
168+
169+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT, generation_number=0)
170+
171+
# Act
172+
await stream.open()
173+
174+
# Assert
175+
mock_async_bidi_rpc.assert_called_once()
176+
_, call_kwargs = mock_async_bidi_rpc.call_args
177+
initial_request = call_kwargs["initial_request"]
178+
assert initial_request.write_object_spec.if_generation_match == 0
179+
assert stream._is_stream_open
180+
socket_like_rpc.open.assert_called_once()
181+
socket_like_rpc.recv.assert_called_once()
182+
assert stream.generation_number == GENERATION
183+
assert stream.write_handle == WRITE_HANDLE
184+
assert stream.persisted_size == 0
185+
186+
151187
@pytest.mark.asyncio
152188
@mock.patch(
153189
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.