Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 5d9fafe

Browse filesBrowse files
authored
fix: update write handle on every recv() (#1716)
fix: update `write_handle` on every `recv()` from write object stream.
1 parent 2bc15fa commit 5d9fafe
Copy full SHA for 5d9fafe

File tree

Expand file treeCollapse file tree

4 files changed

+116
-7
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

4 files changed

+116
-7
lines changed
Open diff view settings
Collapse file

‎google/cloud/storage/_experimental/asyncio/_utils.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/_utils.py
+6Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,9 @@ def raise_if_no_fast_crc32c():
3333
"C extension is required for faster data integrity checks."
3434
"For more information, see https://github.com/googleapis/python-crc32c."
3535
)
36+
37+
38+
def update_write_handle_if_exists(obj, response):
39+
"""Update the write_handle attribute of an object if it exists in the response."""
40+
if hasattr(response, "write_handle") and response.write_handle is not None:
41+
obj.write_handle = response.write_handle
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py
+6-2Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from google_crc32c import Checksum
2828
from google.api_core import exceptions
2929

30-
from ._utils import raise_if_no_fast_crc32c
30+
from . import _utils
3131
from google.cloud import _storage_v2
3232
from google.cloud.storage._experimental.asyncio.async_grpc_client import (
3333
AsyncGrpcClient,
@@ -121,7 +121,7 @@ def __init__(
121121
servers. Default is `_DEFAULT_FLUSH_INTERVAL_BYTES`.
122122
Must be a multiple of `_MAX_CHUNK_SIZE_BYTES`.
123123
"""
124-
raise_if_no_fast_crc32c()
124+
_utils.raise_if_no_fast_crc32c()
125125
self.client = client
126126
self.bucket_name = bucket_name
127127
self.object_name = object_name
@@ -175,6 +175,7 @@ async def state_lookup(self) -> int:
175175
)
176176
)
177177
response = await self.write_obj_stream.recv()
178+
_utils.update_write_handle_if_exists(self, response)
178179
self.persisted_size = response.persisted_size
179180
return self.persisted_size
180181

@@ -253,6 +254,7 @@ async def append(self, data: bytes) -> None:
253254

254255
if is_last_chunk:
255256
response = await self.write_obj_stream.recv()
257+
_utils.update_write_handle_if_exists(self, response)
256258
self.persisted_size = response.persisted_size
257259
self.offset = self.persisted_size
258260
self.bytes_appended_since_last_flush = 0
@@ -295,6 +297,7 @@ async def flush(self) -> int:
295297
)
296298
)
297299
response = await self.write_obj_stream.recv()
300+
_utils.update_write_handle_if_exists(self, response)
298301
self.persisted_size = response.persisted_size
299302
self.offset = self.persisted_size
300303
return self.persisted_size
@@ -351,6 +354,7 @@ async def finalize(self) -> _storage_v2.Object:
351354
_storage_v2.BidiWriteObjectRequest(finish_write=True)
352355
)
353356
response = await self.write_obj_stream.recv()
357+
_utils.update_write_handle_if_exists(self, response)
354358
self.object_resource = response.resource
355359
self.persisted_size = self.object_resource.size
356360
await self.write_obj_stream.close()
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_write_object_stream.py
+5-2Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
2323
"""
2424
from typing import Optional
25+
from . import _utils
2526
from google.cloud import _storage_v2
2627
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
2728
from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import (
@@ -190,7 +191,7 @@ async def requests_done(self):
190191
"""Signals that all requests have been sent."""
191192

192193
await self.socket_like_rpc.send(None)
193-
await self.socket_like_rpc.recv()
194+
_utils.update_write_handle_if_exists(self, await self.socket_like_rpc.recv())
194195

195196
async def send(
196197
self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest
@@ -218,7 +219,9 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
218219
"""
219220
if not self._is_stream_open:
220221
raise ValueError("Stream is not open")
221-
return await self.socket_like_rpc.recv()
222+
response = await self.socket_like_rpc.recv()
223+
_utils.update_write_handle_if_exists(self, response)
224+
return response
222225

223226
@property
224227
def is_stream_open(self) -> bool:
Collapse file

‎tests/unit/asyncio/test_async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_write_object_stream.py
+99-3Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
BUCKET = "my-bucket"
2525
OBJECT = "my-object"
2626
GENERATION = 12345
27-
WRITE_HANDLE = b"test-handle"
28-
WRITE_HANDLE_PROTO = _storage_v2.BidiWriteHandle(handle=WRITE_HANDLE)
27+
WRITE_HANDLE_BYTES = b"test-handle"
28+
NEW_WRITE_HANDLE_BYTES = b"new-test-handle"
29+
WRITE_HANDLE_PROTO = _storage_v2.BidiWriteHandle(handle=WRITE_HANDLE_BYTES)
30+
NEW_WRITE_HANDLE_PROTO = _storage_v2.BidiWriteHandle(handle=NEW_WRITE_HANDLE_BYTES)
2931

3032

3133
@pytest.fixture
@@ -151,7 +153,9 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
151153
@mock.patch(
152154
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
153155
)
154-
async def test_open_for_new_object_with_generation_zero(mock_async_bidi_rpc, mock_client):
156+
async def test_open_for_new_object_with_generation_zero(
157+
mock_async_bidi_rpc, mock_client
158+
):
155159
"""Test opening a stream for a new object."""
156160
# Arrange
157161
socket_like_rpc = mock.AsyncMock()
@@ -487,3 +491,95 @@ async def test_requests_done(mock_cls_async_bidi_rpc, mock_client):
487491
# Assert
488492
write_obj_stream.socket_like_rpc.send.assert_called_once_with(None)
489493
write_obj_stream.socket_like_rpc.recv.assert_called_once()
494+
495+
496+
@pytest.mark.asyncio
497+
@mock.patch(
498+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
499+
)
500+
async def test_open_for_existing_object_with_none_size(
501+
mock_async_bidi_rpc, mock_client
502+
):
503+
"""Test opening a stream for an existing object where size is None."""
504+
# Arrange
505+
socket_like_rpc = mock.AsyncMock()
506+
mock_async_bidi_rpc.return_value = socket_like_rpc
507+
socket_like_rpc.open = mock.AsyncMock()
508+
509+
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
510+
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
511+
mock_response.resource.size = None
512+
mock_response.resource.generation = GENERATION
513+
mock_response.write_handle = WRITE_HANDLE_PROTO
514+
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
515+
516+
stream = _AsyncWriteObjectStream(
517+
mock_client, BUCKET, OBJECT, generation_number=GENERATION
518+
)
519+
520+
# Act
521+
await stream.open()
522+
523+
# Assert
524+
assert stream.persisted_size == 0
525+
526+
527+
@pytest.mark.asyncio
528+
@mock.patch(
529+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
530+
)
531+
async def test_recv_updates_write_handle(mock_cls_async_bidi_rpc, mock_client):
532+
"""Test that recv updates the write_handle if present in the response."""
533+
# Arrange
534+
write_obj_stream = await instantiate_write_obj_stream(
535+
mock_client, mock_cls_async_bidi_rpc, open=True
536+
)
537+
538+
assert write_obj_stream.write_handle == WRITE_HANDLE_PROTO # Initial handle
539+
540+
# GCS can periodicallly update write handle in their responses.
541+
bidi_write_object_response = _storage_v2.BidiWriteObjectResponse(
542+
write_handle=NEW_WRITE_HANDLE_PROTO
543+
)
544+
write_obj_stream.socket_like_rpc.recv = AsyncMock(
545+
return_value=bidi_write_object_response
546+
)
547+
548+
# Act
549+
response = await write_obj_stream.recv()
550+
551+
# Assert
552+
write_obj_stream.socket_like_rpc.recv.assert_called_once()
553+
assert response == bidi_write_object_response
554+
# asserts that new write handle has been updated.
555+
assert write_obj_stream.write_handle == NEW_WRITE_HANDLE_PROTO
556+
557+
558+
@pytest.mark.asyncio
559+
@mock.patch(
560+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
561+
)
562+
async def test_requests_done_updates_write_handle(mock_cls_async_bidi_rpc, mock_client):
563+
"""Test that requests_done updates the write_handle if present in the response."""
564+
# Arrange
565+
write_obj_stream = await instantiate_write_obj_stream(
566+
mock_client, mock_cls_async_bidi_rpc, open=True
567+
)
568+
assert write_obj_stream.write_handle == WRITE_HANDLE_PROTO # Initial handle
569+
570+
# new_write_handle = b"new-test-handle"
571+
bidi_write_object_response = _storage_v2.BidiWriteObjectResponse(
572+
write_handle=NEW_WRITE_HANDLE_PROTO
573+
)
574+
write_obj_stream.socket_like_rpc.send = AsyncMock()
575+
write_obj_stream.socket_like_rpc.recv = AsyncMock(
576+
return_value=bidi_write_object_response
577+
)
578+
579+
# Act
580+
await write_obj_stream.requests_done()
581+
582+
# Assert
583+
write_obj_stream.socket_like_rpc.send.assert_called_once_with(None)
584+
write_obj_stream.socket_like_rpc.recv.assert_called_once()
585+
assert write_obj_stream.write_handle == NEW_WRITE_HANDLE_PROTO

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.