Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 786af55

Browse filesBrowse files
authored
feat(zb-experimental): implement "open" for write_object_stream (#1613)
feat(zb-experimental): implement "open" for write_object_stream
1 parent 0b70a28 commit 786af55
Copy full SHA for 786af55

File tree

Expand file treeCollapse file tree

2 files changed

+211
-5
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+211
-5
lines changed
Open diff view settings
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_write_object_stream.py
+50-2Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,54 @@ async def open(self) -> None:
9595
"""Opening an object for write , should do it's state lookup
9696
to know what's the persisted size is.
9797
"""
98-
raise NotImplementedError(
99-
"open() is not implemented yet in _AsyncWriteObjectStream"
98+
if self._is_stream_open:
99+
raise ValueError("Stream is already open")
100+
101+
# Create a new object or overwrite existing one if generation_number
102+
# is None. This makes it consistent with GCS JSON API behavior.
103+
# Created object type would be Appendable Object.
104+
if self.generation_number is None:
105+
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
106+
write_object_spec=_storage_v2.WriteObjectSpec(
107+
resource=_storage_v2.Object(
108+
name=self.object_name, bucket=self._full_bucket_name
109+
),
110+
appendable=True,
111+
),
112+
)
113+
else:
114+
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
115+
append_object_spec=_storage_v2.AppendObjectSpec(
116+
bucket=self._full_bucket_name,
117+
object=self.object_name,
118+
generation=self.generation_number,
119+
),
120+
state_lookup=True,
121+
)
122+
123+
self.socket_like_rpc = AsyncBidiRpc(
124+
self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata
100125
)
101126

127+
await self.socket_like_rpc.open() # this is actually 1 send
128+
response = await self.socket_like_rpc.recv()
129+
self._is_stream_open = True
130+
131+
if not response.resource:
132+
raise ValueError(
133+
"Failed to obtain object resource after opening the stream"
134+
)
135+
if not response.resource.generation:
136+
raise ValueError(
137+
"Failed to obtain object generation after opening the stream"
138+
)
139+
self.generation_number = response.resource.generation
140+
141+
if not response.write_handle:
142+
raise ValueError("Failed to obtain write_handle after opening the stream")
143+
144+
self.write_handle = response.write_handle
145+
102146
async def close(self) -> None:
103147
"""Closes the bidi-gRPC connection."""
104148
raise NotImplementedError(
@@ -132,3 +176,7 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
132176
raise NotImplementedError(
133177
"recv() is not implemented yet in _AsyncWriteObjectStream"
134178
)
179+
180+
@property
181+
def is_stream_open(self) -> bool:
182+
return self._is_stream_open
Collapse file

‎tests/unit/asyncio/test_async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: tests/unit/asyncio/test_async_write_object_stream.py
+161-3Lines changed: 161 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
BUCKET = "my-bucket"
2424
OBJECT = "my-object"
25+
GENERATION = 12345
26+
WRITE_HANDLE = b"test-handle"
2527

2628

2729
@pytest.fixture
@@ -91,13 +93,169 @@ def test_async_write_object_stream_init_raises_value_error():
9193

9294

9395
@pytest.mark.asyncio
94-
async def test_unimplemented_methods_raise_error(mock_client):
95-
"""Test that unimplemented methods raise NotImplementedError."""
96+
@mock.patch(
97+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
98+
)
99+
async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
100+
"""Test opening a stream for a new object."""
101+
# Arrange
102+
socket_like_rpc = mock.AsyncMock()
103+
mock_async_bidi_rpc.return_value = socket_like_rpc
104+
socket_like_rpc.open = mock.AsyncMock()
105+
106+
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
107+
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
108+
mock_response.resource.generation = GENERATION
109+
mock_response.write_handle = WRITE_HANDLE
110+
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
111+
96112
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
97113

98-
with pytest.raises(NotImplementedError):
114+
# Act
115+
await stream.open()
116+
117+
# Assert
118+
assert stream._is_stream_open
119+
socket_like_rpc.open.assert_called_once()
120+
socket_like_rpc.recv.assert_called_once()
121+
assert stream.generation_number == GENERATION
122+
assert stream.write_handle == WRITE_HANDLE
123+
124+
125+
@pytest.mark.asyncio
126+
@mock.patch(
127+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
128+
)
129+
async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client):
130+
"""Test opening a stream for an existing object."""
131+
# Arrange
132+
socket_like_rpc = mock.AsyncMock()
133+
mock_async_bidi_rpc.return_value = socket_like_rpc
134+
socket_like_rpc.open = mock.AsyncMock()
135+
136+
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
137+
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
138+
mock_response.resource.generation = GENERATION
139+
mock_response.write_handle = WRITE_HANDLE
140+
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
141+
142+
stream = _AsyncWriteObjectStream(
143+
mock_client, BUCKET, OBJECT, generation_number=GENERATION
144+
)
145+
146+
# Act
147+
await stream.open()
148+
149+
# Assert
150+
assert stream._is_stream_open
151+
socket_like_rpc.open.assert_called_once()
152+
socket_like_rpc.recv.assert_called_once()
153+
assert stream.generation_number == GENERATION
154+
assert stream.write_handle == WRITE_HANDLE
155+
156+
157+
@pytest.mark.asyncio
158+
@mock.patch(
159+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
160+
)
161+
async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_client):
162+
"""Test that opening an already open stream raises a ValueError."""
163+
# Arrange
164+
socket_like_rpc = mock.AsyncMock()
165+
mock_async_bidi_rpc.return_value = socket_like_rpc
166+
socket_like_rpc.open = mock.AsyncMock()
167+
168+
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
169+
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
170+
mock_response.resource.generation = GENERATION
171+
mock_response.write_handle = WRITE_HANDLE
172+
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
173+
174+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
175+
await stream.open()
176+
177+
# Act & Assert
178+
with pytest.raises(ValueError, match="Stream is already open"):
99179
await stream.open()
100180

181+
182+
@pytest.mark.asyncio
183+
@mock.patch(
184+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
185+
)
186+
async def test_open_raises_error_on_missing_object_resource(
187+
mock_async_bidi_rpc, mock_client
188+
):
189+
"""Test that open raises ValueError if object_resource is not in the response."""
190+
socket_like_rpc = mock.AsyncMock()
191+
mock_async_bidi_rpc.return_value = socket_like_rpc
192+
193+
mock_reponse = mock.AsyncMock()
194+
type(mock_reponse).resource = mock.PropertyMock(return_value=None)
195+
socket_like_rpc.recv.return_value = mock_reponse
196+
197+
# Note: Don't use below code as unittest library automatically assigns an
198+
# `AsyncMock` object to an attribute, if not set.
199+
# socket_like_rpc.recv.return_value = mock.AsyncMock(
200+
# return_value=_storage_v2.BidiWriteObjectResponse(resource=None)
201+
# )
202+
203+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
204+
with pytest.raises(
205+
ValueError, match="Failed to obtain object resource after opening the stream"
206+
):
207+
await stream.open()
208+
209+
210+
@pytest.mark.asyncio
211+
@mock.patch(
212+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
213+
)
214+
async def test_open_raises_error_on_missing_generation(
215+
mock_async_bidi_rpc, mock_client
216+
):
217+
"""Test that open raises ValueError if generation is not in the response."""
218+
socket_like_rpc = mock.AsyncMock()
219+
mock_async_bidi_rpc.return_value = socket_like_rpc
220+
221+
# Configure the mock response object
222+
mock_response = mock.AsyncMock()
223+
type(mock_response.resource).generation = mock.PropertyMock(return_value=None)
224+
socket_like_rpc.recv.return_value = mock_response
225+
226+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
227+
with pytest.raises(
228+
ValueError, match="Failed to obtain object generation after opening the stream"
229+
):
230+
await stream.open()
231+
# assert stream.generation_number is None
232+
233+
234+
@pytest.mark.asyncio
235+
@mock.patch(
236+
"google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc"
237+
)
238+
async def test_open_raises_error_on_missing_write_handle(
239+
mock_async_bidi_rpc, mock_client
240+
):
241+
"""Test that open raises ValueError if write_handle is not in the response."""
242+
socket_like_rpc = mock.AsyncMock()
243+
mock_async_bidi_rpc.return_value = socket_like_rpc
244+
socket_like_rpc.recv = mock.AsyncMock(
245+
return_value=_storage_v2.BidiWriteObjectResponse(
246+
resource=_storage_v2.Object(generation=GENERATION), write_handle=None
247+
)
248+
)
249+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
250+
with pytest.raises(ValueError, match="Failed to obtain write_handle"):
251+
await stream.open()
252+
253+
254+
@pytest.mark.asyncio
255+
async def test_unimplemented_methods_raise_error(mock_client):
256+
"""Test that unimplemented methods raise NotImplementedError."""
257+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
258+
101259
with pytest.raises(NotImplementedError):
102260
await stream.close()
103261

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.