Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2bc15fa

Browse filesBrowse files
authored
feat: Add support for opening via write_handle and fix write_handle type (#1715)
feat: Add support for opening via `write_handle` and fix `write_handle` type
1 parent f751f1e commit 2bc15fa
Copy full SHA for 2bc15fa

File tree

Expand file treeCollapse file tree

7 files changed

+146
-53
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

7 files changed

+146
-53
lines changed
Open diff view settings
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class _AsyncAbstractObjectStream(abc.ABC):
3232
:param generation_number: (Optional) If present, selects a specific revision of
3333
this object.
3434
35-
:type handle: bytes
35+
:type handle: Any
3636
:param handle: (Optional) The handle for the object, could be read_handle or
3737
write_handle, based on how the stream is used.
3838
"""
@@ -42,13 +42,13 @@ def __init__(
4242
bucket_name: str,
4343
object_name: str,
4444
generation_number: Optional[int] = None,
45-
handle: Optional[bytes] = None,
45+
handle: Optional[Any] = None,
4646
) -> None:
4747
super().__init__()
4848
self.bucket_name: str = bucket_name
4949
self.object_name: str = object_name
5050
self.generation_number: Optional[int] = generation_number
51-
self.handle: Optional[bytes] = handle
51+
self.handle: Optional[Any] = handle
5252

5353
@abc.abstractmethod
5454
async def open(self) -> None:
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py
+5-6Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ def __init__(
4949
client: AsyncGrpcClient.grpc_client,
5050
bucket_name: str,
5151
object_name: str,
52-
generation=None,
53-
write_handle=None,
52+
generation: Optional[int] = None,
53+
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
5454
writer_options: Optional[dict] = None,
5555
):
5656
"""
@@ -96,7 +96,7 @@ def __init__(
9696
:type object_name: str
9797
:param object_name: The name of the GCS Appendable Object to be written.
9898
99-
:type generation: int
99+
:type generation: Optional[int]
100100
:param generation: (Optional) If present, creates writer for that
101101
specific revision of that object. Use this to append data to an
102102
existing Appendable Object.
@@ -106,10 +106,10 @@ def __init__(
106106
overwriting existing objects).
107107
108108
Warning: If `None`, a new object is created. If an object with the
109-
same name already exists, it will be overwritten the moment
109+
same name already exists, it will be overwritten the moment
110110
`writer.open()` is called.
111111
112-
:type write_handle: bytes
112+
:type write_handle: _storage_v2.BidiWriteHandle
113113
:param write_handle: (Optional) An handle for writing the object.
114114
If provided, opening the bidi-gRPC connection will be faster.
115115
@@ -363,7 +363,6 @@ async def finalize(self) -> _storage_v2.Object:
363363
def is_stream_open(self) -> bool:
364364
return self._is_stream_open
365365

366-
367366
# helper methods.
368367
async def append_from_string(self, data: str):
369368
"""
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py
+6-6Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ async def create_mrd(
129129
bucket_name: str,
130130
object_name: str,
131131
generation_number: Optional[int] = None,
132-
read_handle: Optional[bytes] = None,
132+
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
133133
retry_policy: Optional[AsyncRetry] = None,
134134
metadata: Optional[List[Tuple[str, str]]] = None,
135135
) -> AsyncMultiRangeDownloader:
@@ -149,7 +149,7 @@ async def create_mrd(
149149
:param generation_number: (Optional) If present, selects a specific
150150
revision of this object.
151151
152-
:type read_handle: bytes
152+
:type read_handle: _storage_v2.BidiReadHandle
153153
:param read_handle: (Optional) An existing handle for reading the object.
154154
If provided, opening the bidi-gRPC connection will be faster.
155155
@@ -172,7 +172,7 @@ def __init__(
172172
bucket_name: str,
173173
object_name: str,
174174
generation_number: Optional[int] = None,
175-
read_handle: Optional[bytes] = None,
175+
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
176176
) -> None:
177177
"""Constructor for AsyncMultiRangeDownloader, clients are not adviced to
178178
use it directly. Instead it's adviced to use the classmethod `create_mrd`.
@@ -190,7 +190,7 @@ def __init__(
190190
:param generation_number: (Optional) If present, selects a specific revision of
191191
this object.
192192
193-
:type read_handle: bytes
193+
:type read_handle: _storage_v2.BidiReadHandle
194194
:param read_handle: (Optional) An existing read handle.
195195
"""
196196

@@ -200,7 +200,7 @@ def __init__(
200200
self.bucket_name = bucket_name
201201
self.object_name = object_name
202202
self.generation_number = generation_number
203-
self.read_handle = read_handle
203+
self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle
204204
self.read_obj_str: Optional[_AsyncReadObjectStream] = None
205205
self._is_stream_open: bool = False
206206
self._routing_token: Optional[str] = None
@@ -493,4 +493,4 @@ async def close(self):
493493

494494
@property
495495
def is_stream_open(self) -> bool:
496-
return self._is_stream_open
496+
return self._is_stream_open
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_read_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_read_object_stream.py
+4-4Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream):
5151
:param generation_number: (Optional) If present, selects a specific revision of
5252
this object.
5353
54-
:type read_handle: bytes
54+
:type read_handle: _storage_v2.BidiReadHandle
5555
:param read_handle: (Optional) An existing handle for reading the object.
5656
If provided, opening the bidi-gRPC connection will be faster.
5757
"""
@@ -62,7 +62,7 @@ def __init__(
6262
bucket_name: str,
6363
object_name: str,
6464
generation_number: Optional[int] = None,
65-
read_handle: Optional[bytes] = None,
65+
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
6666
) -> None:
6767
if client is None:
6868
raise ValueError("client must be provided")
@@ -77,7 +77,7 @@ def __init__(
7777
generation_number=generation_number,
7878
)
7979
self.client: AsyncGrpcClient.grpc_client = client
80-
self.read_handle: Optional[bytes] = read_handle
80+
self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle
8181

8282
self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"
8383

@@ -195,4 +195,4 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse:
195195

196196
@property
197197
def is_stream_open(self) -> bool:
198-
return self._is_stream_open
198+
return self._is_stream_open
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_write_object_stream.py
+29-19Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class _AsyncWriteObjectStream(_AsyncAbstractObjectStream):
5959
same name already exists, it will be overwritten the moment
6060
`writer.open()` is called.
6161
62-
:type write_handle: bytes
62+
:type write_handle: _storage_v2.BidiWriteHandle
6363
:param write_handle: (Optional) An existing handle for writing the object.
6464
If provided, opening the bidi-gRPC connection will be faster.
6565
"""
@@ -70,7 +70,7 @@ def __init__(
7070
bucket_name: str,
7171
object_name: str,
7272
generation_number: Optional[int] = None, # None means new object
73-
write_handle: Optional[bytes] = None,
73+
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
7474
) -> None:
7575
if client is None:
7676
raise ValueError("client must be provided")
@@ -85,7 +85,7 @@ def __init__(
8585
generation_number=generation_number,
8686
)
8787
self.client: AsyncGrpcClient.grpc_client = client
88-
self.write_handle: Optional[bytes] = write_handle
88+
self.write_handle: Optional[_storage_v2.BidiWriteHandle] = write_handle
8989

9090
self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"
9191

@@ -120,6 +120,9 @@ async def open(self) -> None:
120120
# Created object type would be Appendable Object.
121121
# if `generation_number` == 0 new object will be created only if there
122122
# isn't any existing object.
123+
is_open_via_write_handle = (
124+
self.write_handle is not None and self.generation_number
125+
)
123126
if self.generation_number is None or self.generation_number == 0:
124127
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
125128
write_object_spec=_storage_v2.WriteObjectSpec(
@@ -136,6 +139,7 @@ async def open(self) -> None:
136139
bucket=self._full_bucket_name,
137140
object=self.object_name,
138141
generation=self.generation_number,
142+
write_handle=self.write_handle,
139143
),
140144
)
141145
self.socket_like_rpc = AsyncBidiRpc(
@@ -145,25 +149,32 @@ async def open(self) -> None:
145149
await self.socket_like_rpc.open() # this is actually 1 send
146150
response = await self.socket_like_rpc.recv()
147151
self._is_stream_open = True
148-
149-
if not response.resource:
150-
raise ValueError(
151-
"Failed to obtain object resource after opening the stream"
152-
)
153-
if not response.resource.generation:
154-
raise ValueError(
155-
"Failed to obtain object generation after opening the stream"
156-
)
152+
if is_open_via_write_handle:
153+
# Don't use if not response.persisted_size because this will be true
154+
# if persisted_size==0 (0 is considered "Falsy" in Python)
155+
if response.persisted_size is None:
156+
raise ValueError(
157+
"Failed to obtain persisted_size after opening the stream via write_handle"
158+
)
159+
self.persisted_size = response.persisted_size
160+
else:
161+
if not response.resource:
162+
raise ValueError(
163+
"Failed to obtain object resource after opening the stream"
164+
)
165+
if not response.resource.generation:
166+
raise ValueError(
167+
"Failed to obtain object generation after opening the stream"
168+
)
169+
if not response.resource.size:
170+
# Appending to a 0 byte appendable object.
171+
self.persisted_size = 0
172+
else:
173+
self.persisted_size = response.resource.size
157174

158175
if not response.write_handle:
159176
raise ValueError("Failed to obtain write_handle after opening the stream")
160177

161-
if not response.resource.size:
162-
# Appending to a 0 byte appendable object.
163-
self.persisted_size = 0
164-
else:
165-
self.persisted_size = response.resource.size
166-
167178
self.generation_number = response.resource.generation
168179
self.write_handle = response.write_handle
169180

@@ -212,4 +223,3 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
212223
@property
213224
def is_stream_open(self) -> bool:
214225
return self._is_stream_open
215-
Collapse file

‎tests/system/test_zonal.py‎

Copy file name to clipboardExpand all lines: tests/system/test_zonal.py
+50Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,56 @@ async def _run():
333333
event_loop.run_until_complete(_run())
334334

335335

336+
def test_wrd_open_with_write_handle(
337+
event_loop, grpc_client_direct, storage_client, blobs_to_delete
338+
):
339+
object_name = f"test_write_handl-{str(uuid.uuid4())[:4]}"
340+
341+
async def _run():
342+
# 1. Create an object and get its write_handle
343+
writer = AsyncAppendableObjectWriter(
344+
grpc_client_direct, _ZONAL_BUCKET, object_name
345+
)
346+
await writer.open()
347+
write_handle = writer.write_handle
348+
await writer.close()
349+
350+
# 2. Open a new writer using the obtained `write_handle` and generation
351+
new_writer = AsyncAppendableObjectWriter(
352+
grpc_client_direct,
353+
_ZONAL_BUCKET,
354+
object_name,
355+
write_handle=write_handle,
356+
generation=writer.generation,
357+
)
358+
await new_writer.open()
359+
# Verify that the new writer is open and has the same write_handle
360+
assert new_writer.is_stream_open
361+
assert new_writer.generation == writer.generation
362+
363+
# 3. Append some data using the new writer
364+
test_data = b"data_from_new_writer"
365+
await new_writer.append(test_data)
366+
await new_writer.close()
367+
368+
# 4. Verify the data was written correctly by reading it back
369+
mrd = AsyncMultiRangeDownloader(grpc_client_direct, _ZONAL_BUCKET, object_name)
370+
buffer = BytesIO()
371+
await mrd.open()
372+
await mrd.download_ranges([(0, 0, buffer)])
373+
await mrd.close()
374+
assert buffer.getvalue() == test_data
375+
376+
# Clean up
377+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
378+
del writer
379+
del new_writer
380+
del mrd
381+
gc.collect()
382+
383+
event_loop.run_until_complete(_run())
384+
385+
336386
def test_read_unfinalized_appendable_object_with_generation(
337387
storage_client, blobs_to_delete, event_loop, grpc_client_direct
338388
):

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.