Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit dbd162b

Browse filesBrowse files
authored
feat(experimental): integrate writes strategy and appendable object writer (#1695)
Integrate all the components required for bidi writes retries
1 parent 5d9fafe commit dbd162b
Copy full SHA for dbd162b
Expand file treeCollapse file tree

20 files changed

+1526
-1515
lines changed
Open diff view settings
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py
+261-64Lines changed: 261 additions & 64 deletions
Large diffs are not rendered by default.
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_grpc_client.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_grpc_client.py
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class AsyncGrpcClient:
4242
(Optional) Whether to attempt to use DirectPath for gRPC connections.
4343
Defaults to ``True``.
4444
"""
45+
4546
def __init__(
4647
self,
4748
credentials=None,
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,4 +493,4 @@ async def close(self):
493493

494494
@property
495495
def is_stream_open(self) -> bool:
496-
return self._is_stream_open
496+
return self._is_stream_open
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_read_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_read_object_stream.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,4 +195,4 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse:
195195

196196
@property
197197
def is_stream_open(self) -> bool:
198-
return self._is_stream_open
198+
return self._is_stream_open
Collapse file

‎google/cloud/storage/_experimental/asyncio/async_write_object_stream.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/async_write_object_stream.py
+38-30Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
if you want to use these Rapid Storage APIs.
2222
2323
"""
24-
from typing import Optional
25-
from . import _utils
24+
from typing import List, Optional, Tuple
2625
from google.cloud import _storage_v2
26+
from google.cloud.storage._experimental.asyncio import _utils
2727
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
2828
from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import (
2929
_AsyncAbstractObjectStream,
@@ -72,6 +72,7 @@ def __init__(
7272
object_name: str,
7373
generation_number: Optional[int] = None, # None means new object
7474
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
75+
routing_token: Optional[str] = None,
7576
) -> None:
7677
if client is None:
7778
raise ValueError("client must be provided")
@@ -87,6 +88,7 @@ def __init__(
8788
)
8889
self.client: AsyncGrpcClient.grpc_client = client
8990
self.write_handle: Optional[_storage_v2.BidiWriteHandle] = write_handle
91+
self.routing_token: Optional[str] = routing_token
9092

9193
self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"
9294

@@ -101,7 +103,7 @@ def __init__(
101103
self.persisted_size = 0
102104
self.object_resource: Optional[_storage_v2.Object] = None
103105

104-
async def open(self) -> None:
106+
async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
105107
"""
106108
Opens the bidi-gRPC connection to write to the object.
107109
@@ -110,7 +112,7 @@ async def open(self) -> None:
110112
111113
:rtype: None
112114
:raises ValueError: If the stream is already open.
113-
:raises google.api_core.exceptions.FailedPrecondition:
115+
:raises google.api_core.exceptions.FailedPrecondition:
114116
if `generation_number` is 0 and object already exists.
115117
"""
116118
if self._is_stream_open:
@@ -121,9 +123,6 @@ async def open(self) -> None:
121123
# Created object type would be Appendable Object.
122124
# if `generation_number` == 0 new object will be created only if there
123125
# isn't any existing object.
124-
is_open_via_write_handle = (
125-
self.write_handle is not None and self.generation_number
126-
)
127126
if self.generation_number is None or self.generation_number == 0:
128127
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
129128
write_object_spec=_storage_v2.WriteObjectSpec(
@@ -140,44 +139,46 @@ async def open(self) -> None:
140139
bucket=self._full_bucket_name,
141140
object=self.object_name,
142141
generation=self.generation_number,
143-
write_handle=self.write_handle,
142+
write_handle=self.write_handle if self.write_handle else None,
143+
routing_token=self.routing_token if self.routing_token else None,
144144
),
145145
)
146+
147+
request_param_values = [f"bucket={self._full_bucket_name}"]
148+
final_metadata = []
149+
if metadata:
150+
for key, value in metadata:
151+
if key == "x-goog-request-params":
152+
request_param_values.append(value)
153+
else:
154+
final_metadata.append((key, value))
155+
156+
final_metadata.append(("x-goog-request-params", ",".join(request_param_values)))
157+
146158
self.socket_like_rpc = AsyncBidiRpc(
147-
self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata
159+
self.rpc,
160+
initial_request=self.first_bidi_write_req,
161+
metadata=final_metadata,
148162
)
149163

150164
await self.socket_like_rpc.open() # this is actually 1 send
151165
response = await self.socket_like_rpc.recv()
152166
self._is_stream_open = True
153-
if is_open_via_write_handle:
154-
# Don't use if not response.persisted_size because this will be true
155-
# if persisted_size==0 (0 is considered "Falsy" in Python)
156-
if response.persisted_size is None:
157-
raise ValueError(
158-
"Failed to obtain persisted_size after opening the stream via write_handle"
159-
)
167+
168+
if response.persisted_size:
160169
self.persisted_size = response.persisted_size
161-
else:
162-
if not response.resource:
163-
raise ValueError(
164-
"Failed to obtain object resource after opening the stream"
165-
)
166-
if not response.resource.generation:
167-
raise ValueError(
168-
"Failed to obtain object generation after opening the stream"
169-
)
170+
171+
if response.resource:
170172
if not response.resource.size:
171173
# Appending to a 0 byte appendable object.
172174
self.persisted_size = 0
173175
else:
174176
self.persisted_size = response.resource.size
175177

176-
if not response.write_handle:
177-
raise ValueError("Failed to obtain write_handle after opening the stream")
178+
self.generation_number = response.resource.generation
178179

179-
self.generation_number = response.resource.generation
180-
self.write_handle = response.write_handle
180+
if response.write_handle:
181+
self.write_handle = response.write_handle
181182

182183
async def close(self) -> None:
183184
"""Closes the bidi-gRPC connection."""
@@ -220,7 +221,14 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
220221
if not self._is_stream_open:
221222
raise ValueError("Stream is not open")
222223
response = await self.socket_like_rpc.recv()
223-
_utils.update_write_handle_if_exists(self, response)
224+
# Update write_handle if present in response
225+
if response:
226+
if response.write_handle:
227+
self.write_handle = response.write_handle
228+
if response.persisted_size is not None:
229+
self.persisted_size = response.persisted_size
230+
if response.resource and response.resource.size:
231+
self.persisted_size = response.resource.size
224232
return response
225233

226234
@property
Collapse file

‎google/cloud/storage/_experimental/asyncio/retry/_helpers.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/retry/_helpers.py
+44-2Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,19 @@
1818
from typing import Tuple, Optional
1919

2020
from google.api_core import exceptions
21-
from google.cloud._storage_v2.types import BidiReadObjectRedirectedError
21+
from google.cloud._storage_v2.types import (
22+
BidiReadObjectRedirectedError,
23+
BidiWriteObjectRedirectedError,
24+
)
2225
from google.rpc import status_pb2
2326

2427
_BIDI_READ_REDIRECTED_TYPE_URL = (
2528
"type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError"
2629
)
30+
_BIDI_WRITE_REDIRECTED_TYPE_URL = (
31+
"type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError"
32+
)
33+
logger = logging.getLogger(__name__)
2734

2835

2936
def _handle_redirect(
@@ -78,6 +85,41 @@ def _handle_redirect(
7885
read_handle = redirect_proto.read_handle
7986
break
8087
except Exception as e:
81-
logging.ERROR(f"Error unpacking redirect: {e}")
88+
logger.error(f"Error unpacking redirect: {e}")
8289

8390
return routing_token, read_handle
91+
92+
93+
def _extract_bidi_writes_redirect_proto(exc: Exception):
94+
grpc_error = None
95+
if isinstance(exc, exceptions.Aborted) and exc.errors:
96+
grpc_error = exc.errors[0]
97+
98+
if grpc_error:
99+
if isinstance(grpc_error, BidiWriteObjectRedirectedError):
100+
return grpc_error
101+
102+
if hasattr(grpc_error, "trailing_metadata"):
103+
trailers = grpc_error.trailing_metadata()
104+
if not trailers:
105+
return
106+
107+
status_details_bin = None
108+
for key, value in trailers:
109+
if key == "grpc-status-details-bin":
110+
status_details_bin = value
111+
break
112+
113+
if status_details_bin:
114+
status_proto = status_pb2.Status()
115+
try:
116+
status_proto.ParseFromString(status_details_bin)
117+
for detail in status_proto.details:
118+
if detail.type_url == _BIDI_WRITE_REDIRECTED_TYPE_URL:
119+
redirect_proto = BidiWriteObjectRedirectedError.deserialize(
120+
detail.value
121+
)
122+
return redirect_proto
123+
except Exception:
124+
logger.error("Error unpacking redirect details from gRPC error.")
125+
pass
Collapse file

‎google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py‎

Copy file name to clipboardExpand all lines: google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py
+38-37Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -12,40 +12,44 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import Any, Dict, IO, Iterable, Optional, Union
15+
from typing import Any, Dict, IO, List, Optional, Union
1616

1717
import google_crc32c
1818
from google.cloud._storage_v2.types import storage as storage_type
1919
from google.cloud._storage_v2.types.storage import BidiWriteObjectRedirectedError
2020
from google.cloud.storage._experimental.asyncio.retry.base_strategy import (
2121
_BaseResumptionStrategy,
2222
)
23+
from google.cloud.storage._experimental.asyncio.retry._helpers import (
24+
_extract_bidi_writes_redirect_proto,
25+
)
2326

2427

2528
class _WriteState:
2629
"""A helper class to track the state of a single upload operation.
2730
28-
:type spec: :class:`google.cloud.storage_v2.types.AppendObjectSpec`
29-
:param spec: The specification for the object to write.
30-
3131
:type chunk_size: int
3232
:param chunk_size: The size of chunks to write to the server.
3333
3434
:type user_buffer: IO[bytes]
3535
:param user_buffer: The data source.
36+
37+
:type flush_interval: int
38+
:param flush_interval: The flush interval at which the data is flushed.
3639
"""
3740

3841
def __init__(
3942
self,
40-
spec: Union[storage_type.AppendObjectSpec, storage_type.WriteObjectSpec],
4143
chunk_size: int,
4244
user_buffer: IO[bytes],
45+
flush_interval: int,
4346
):
44-
self.spec = spec
4547
self.chunk_size = chunk_size
4648
self.user_buffer = user_buffer
4749
self.persisted_size: int = 0
4850
self.bytes_sent: int = 0
51+
self.bytes_since_last_flush: int = 0
52+
self.flush_interval: int = flush_interval
4953
self.write_handle: Union[bytes, storage_type.BidiWriteHandle, None] = None
5054
self.routing_token: Optional[str] = None
5155
self.is_finalized: bool = False
@@ -56,39 +60,22 @@ class _WriteResumptionStrategy(_BaseResumptionStrategy):
5660

5761
def generate_requests(
5862
self, state: Dict[str, Any]
59-
) -> Iterable[storage_type.BidiWriteObjectRequest]:
63+
) -> List[storage_type.BidiWriteObjectRequest]:
6064
"""Generates BidiWriteObjectRequests to resume or continue the upload.
6165
62-
For Appendable Objects, every stream opening should send an
63-
AppendObjectSpec. If resuming, the `write_handle` is added to that spec.
64-
6566
This method is not applicable for `open` methods.
6667
"""
6768
write_state: _WriteState = state["write_state"]
6869

69-
initial_request = storage_type.BidiWriteObjectRequest()
70-
71-
# Determine if we need to send WriteObjectSpec or AppendObjectSpec
72-
if isinstance(write_state.spec, storage_type.WriteObjectSpec):
73-
initial_request.write_object_spec = write_state.spec
74-
else:
75-
if write_state.write_handle:
76-
write_state.spec.write_handle = write_state.write_handle
77-
78-
if write_state.routing_token:
79-
write_state.spec.routing_token = write_state.routing_token
80-
initial_request.append_object_spec = write_state.spec
81-
82-
yield initial_request
83-
70+
requests = []
8471
# The buffer should already be seeked to the correct position (persisted_size)
8572
# by the `recover_state_on_failure` method before this is called.
8673
while not write_state.is_finalized:
8774
chunk = write_state.user_buffer.read(write_state.chunk_size)
8875

8976
# End of File detection
9077
if not chunk:
91-
return
78+
break
9279

9380
checksummed_data = storage_type.ChecksummedData(content=chunk)
9481
checksum = google_crc32c.Checksum(chunk)
@@ -98,16 +85,25 @@ def generate_requests(
9885
write_offset=write_state.bytes_sent,
9986
checksummed_data=checksummed_data,
10087
)
101-
write_state.bytes_sent += len(chunk)
88+
chunk_len = len(chunk)
89+
write_state.bytes_sent += chunk_len
90+
write_state.bytes_since_last_flush += chunk_len
91+
92+
if write_state.bytes_since_last_flush >= write_state.flush_interval:
93+
request.flush = True
94+
# reset counter after marking flush
95+
write_state.bytes_since_last_flush = 0
10296

103-
yield request
97+
requests.append(request)
98+
return requests
10499

105100
def update_state_from_response(
106101
self, response: storage_type.BidiWriteObjectResponse, state: Dict[str, Any]
107102
) -> None:
108103
"""Processes a server response and updates the write state."""
109104
write_state: _WriteState = state["write_state"]
110-
105+
if response is None:
106+
return
111107
if response.persisted_size:
112108
write_state.persisted_size = response.persisted_size
113109

@@ -129,18 +125,23 @@ async def recover_state_on_failure(
129125
last confirmed 'persisted_size' from the server.
130126
"""
131127
write_state: _WriteState = state["write_state"]
132-
cause = getattr(error, "cause", error)
133128

134-
# Extract routing token and potentially a new write handle for redirection.
135-
if isinstance(cause, BidiWriteObjectRedirectedError):
136-
if cause.routing_token:
137-
write_state.routing_token = cause.routing_token
129+
redirect_proto = None
138130

139-
redirect_handle = getattr(cause, "write_handle", None)
140-
if redirect_handle:
141-
write_state.write_handle = redirect_handle
131+
if isinstance(error, BidiWriteObjectRedirectedError):
132+
redirect_proto = error
133+
else:
134+
redirect_proto = _extract_bidi_writes_redirect_proto(error)
135+
136+
# Extract routing token and potentially a new write handle for redirection.
137+
if redirect_proto:
138+
if redirect_proto.routing_token:
139+
write_state.routing_token = redirect_proto.routing_token
140+
if redirect_proto.write_handle:
141+
write_state.write_handle = redirect_proto.write_handle
142142

143143
# We must assume any data sent beyond 'persisted_size' was lost.
144144
# Reset the user buffer to the last known good byte confirmed by the server.
145145
write_state.user_buffer.seek(write_state.persisted_size)
146146
write_state.bytes_sent = write_state.persisted_size
147+
write_state.bytes_since_last_flush = 0

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.