Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d32aab2

Browse filesBrowse files
authored
Merge branch 'main' into trace-update-cases-from-review
2 parents 423e5bc + 0887eb4 commit d32aab2
Copy full SHA for d32aab2

File tree

Expand file treeCollapse file tree

12 files changed

+247
-72
lines changed
Filter options
Expand file treeCollapse file tree

12 files changed

+247
-72
lines changed

‎.gitignore

Copy file name to clipboardExpand all lines: .gitignore
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,7 @@ system_tests/local_test_setup
6262
# Make sure a generated file isn't accidentally committed.
6363
pylintrc
6464
pylintrc.test
65+
66+
67+
# Ignore coverage files
68+
.coverage*

‎google/cloud/spanner_dbapi/transaction_helper.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_dbapi/transaction_helper.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from google.cloud.spanner_dbapi.batch_dml_executor import BatchMode
2222
from google.cloud.spanner_dbapi.exceptions import RetryAborted
23-
from google.cloud.spanner_v1.session import _get_retry_delay
23+
from google.cloud.spanner_v1._helpers import _get_retry_delay
2424

2525
if TYPE_CHECKING:
2626
from google.cloud.spanner_dbapi import Connection, Cursor

‎google/cloud/spanner_v1/_helpers.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/_helpers.py
+75Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,15 @@
2727
from google.protobuf.internal.enum_type_wrapper import EnumTypeWrapper
2828

2929
from google.api_core import datetime_helpers
30+
from google.api_core.exceptions import Aborted
3031
from google.cloud._helpers import _date_from_iso8601_date
3132
from google.cloud.spanner_v1 import TypeCode
3233
from google.cloud.spanner_v1 import ExecuteSqlRequest
3334
from google.cloud.spanner_v1 import JsonObject
3435
from google.cloud.spanner_v1.request_id_header import with_request_id
36+
from google.rpc.error_details_pb2 import RetryInfo
37+
38+
import random
3539

3640
# Validation error messages
3741
NUMERIC_MAX_SCALE_ERR_MSG = (
@@ -460,6 +464,23 @@ def _metadata_with_prefix(prefix, **kw):
460464
return [("google-cloud-resource-prefix", prefix)]
461465

462466

467+
def _retry_on_aborted_exception(
468+
func,
469+
deadline,
470+
):
471+
"""
472+
Handles retry logic for Aborted exceptions, considering the deadline.
473+
"""
474+
attempts = 0
475+
while True:
476+
try:
477+
attempts += 1
478+
return func()
479+
except Aborted as exc:
480+
_delay_until_retry(exc, deadline=deadline, attempts=attempts)
481+
continue
482+
483+
463484
def _retry(
464485
func,
465486
retry_count=5,
@@ -529,6 +550,60 @@ def _metadata_with_leader_aware_routing(value, **kw):
529550
return ("x-goog-spanner-route-to-leader", str(value).lower())
530551

531552

553+
def _delay_until_retry(exc, deadline, attempts):
554+
"""Helper for :meth:`Session.run_in_transaction`.
555+
556+
Detect retryable abort, and impose server-supplied delay.
557+
558+
:type exc: :class:`google.api_core.exceptions.Aborted`
559+
:param exc: exception for aborted transaction
560+
561+
:type deadline: float
562+
:param deadline: maximum timestamp to continue retrying the transaction.
563+
564+
:type attempts: int
565+
:param attempts: number of call retries
566+
"""
567+
568+
cause = exc.errors[0]
569+
now = time.time()
570+
if now >= deadline:
571+
raise
572+
573+
delay = _get_retry_delay(cause, attempts)
574+
if delay is not None:
575+
if now + delay > deadline:
576+
raise
577+
578+
time.sleep(delay)
579+
580+
581+
def _get_retry_delay(cause, attempts):
582+
"""Helper for :func:`_delay_until_retry`.
583+
584+
:type exc: :class:`grpc.Call`
585+
:param exc: exception for aborted transaction
586+
587+
:rtype: float
588+
:returns: seconds to wait before retrying the transaction.
589+
590+
:type attempts: int
591+
:param attempts: number of call retries
592+
"""
593+
if hasattr(cause, "trailing_metadata"):
594+
metadata = dict(cause.trailing_metadata())
595+
else:
596+
metadata = {}
597+
retry_info_pb = metadata.get("google.rpc.retryinfo-bin")
598+
if retry_info_pb is not None:
599+
retry_info = RetryInfo()
600+
retry_info.ParseFromString(retry_info_pb)
601+
nanos = retry_info.retry_delay.nanos
602+
return retry_info.retry_delay.seconds + nanos / 1.0e9
603+
604+
return 2**attempts + random.random()
605+
606+
532607
class AtomicCounter:
533608
def __init__(self, start_value=0):
534609
self.__lock = threading.Lock()

‎google/cloud/spanner_v1/batch.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/batch.py
+13-3Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@
2929
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
3030
from google.cloud.spanner_v1 import RequestOptions
3131
from google.cloud.spanner_v1._helpers import _retry
32+
from google.cloud.spanner_v1._helpers import _retry_on_aborted_exception
3233
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
3334
from google.api_core.exceptions import InternalServerError
35+
import time
36+
37+
DEFAULT_RETRY_TIMEOUT_SECS = 30
3438

3539

3640
class _BatchBase(_SessionWrapper):
@@ -162,6 +166,7 @@ def commit(
162166
request_options=None,
163167
max_commit_delay=None,
164168
exclude_txn_from_change_streams=False,
169+
**kwargs,
165170
):
166171
"""Commit mutations to the database.
167172
@@ -227,9 +232,12 @@ def commit(
227232
request=request,
228233
metadata=metadata,
229234
)
230-
response = _retry(
235+
deadline = time.time() + kwargs.get(
236+
"timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS
237+
)
238+
response = _retry_on_aborted_exception(
231239
method,
232-
allowed_exceptions={InternalServerError: _check_rst_stream_error},
240+
deadline=deadline,
233241
)
234242
self.committed = response.commit_timestamp
235243
self.commit_stats = response.commit_stats
@@ -348,7 +356,9 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
348356
)
349357
response = _retry(
350358
method,
351-
allowed_exceptions={InternalServerError: _check_rst_stream_error},
359+
allowed_exceptions={
360+
InternalServerError: _check_rst_stream_error,
361+
},
352362
)
353363
self.committed = True
354364
return response

‎google/cloud/spanner_v1/database.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/database.py
+9-1Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,7 @@ def batch(
775775
request_options=None,
776776
max_commit_delay=None,
777777
exclude_txn_from_change_streams=False,
778+
**kw,
778779
):
779780
"""Return an object which wraps a batch.
780781
@@ -805,7 +806,11 @@ def batch(
805806
:returns: new wrapper
806807
"""
807808
return BatchCheckout(
808-
self, request_options, max_commit_delay, exclude_txn_from_change_streams
809+
self,
810+
request_options,
811+
max_commit_delay,
812+
exclude_txn_from_change_streams,
813+
**kw,
809814
)
810815

811816
def mutation_groups(self):
@@ -1166,6 +1171,7 @@ def __init__(
11661171
request_options=None,
11671172
max_commit_delay=None,
11681173
exclude_txn_from_change_streams=False,
1174+
**kw,
11691175
):
11701176
self._database = database
11711177
self._session = self._batch = None
@@ -1177,6 +1183,7 @@ def __init__(
11771183
self._request_options = request_options
11781184
self._max_commit_delay = max_commit_delay
11791185
self._exclude_txn_from_change_streams = exclude_txn_from_change_streams
1186+
self._kw = kw
11801187

11811188
def __enter__(self):
11821189
"""Begin ``with`` block."""
@@ -1197,6 +1204,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
11971204
request_options=self._request_options,
11981205
max_commit_delay=self._max_commit_delay,
11991206
exclude_txn_from_change_streams=self._exclude_txn_from_change_streams,
1207+
**self._kw,
12001208
)
12011209
finally:
12021210
if self._database.log_commit_stats and self._batch.commit_stats:

‎google/cloud/spanner_v1/session.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/session.py
+2-56Lines changed: 2 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
"""Wrapper for Cloud Spanner Session objects."""
1616

1717
from functools import total_ordering
18-
import random
1918
import time
2019
from datetime import datetime
2120

2221
from google.api_core.exceptions import Aborted
2322
from google.api_core.exceptions import GoogleAPICallError
2423
from google.api_core.exceptions import NotFound
2524
from google.api_core.gapic_v1 import method
26-
from google.rpc.error_details_pb2 import RetryInfo
25+
from google.cloud.spanner_v1._helpers import _delay_until_retry
26+
from google.cloud.spanner_v1._helpers import _get_retry_delay
2727

2828
from google.cloud.spanner_v1 import ExecuteSqlRequest
2929
from google.cloud.spanner_v1 import CreateSessionRequest
@@ -554,57 +554,3 @@ def run_in_transaction(self, func, *args, **kw):
554554
extra={"commit_stats": txn.commit_stats},
555555
)
556556
return return_value
557-
558-
559-
# Rational: this function factors out complex shared deadline / retry
560-
# handling from two `except:` clauses.
561-
def _delay_until_retry(exc, deadline, attempts):
562-
"""Helper for :meth:`Session.run_in_transaction`.
563-
564-
Detect retryable abort, and impose server-supplied delay.
565-
566-
:type exc: :class:`google.api_core.exceptions.Aborted`
567-
:param exc: exception for aborted transaction
568-
569-
:type deadline: float
570-
:param deadline: maximum timestamp to continue retrying the transaction.
571-
572-
:type attempts: int
573-
:param attempts: number of call retries
574-
"""
575-
cause = exc.errors[0]
576-
577-
now = time.time()
578-
579-
if now >= deadline:
580-
raise
581-
582-
delay = _get_retry_delay(cause, attempts)
583-
if delay is not None:
584-
if now + delay > deadline:
585-
raise
586-
587-
time.sleep(delay)
588-
589-
590-
def _get_retry_delay(cause, attempts):
591-
"""Helper for :func:`_delay_until_retry`.
592-
593-
:type exc: :class:`grpc.Call`
594-
:param exc: exception for aborted transaction
595-
596-
:rtype: float
597-
:returns: seconds to wait before retrying the transaction.
598-
599-
:type attempts: int
600-
:param attempts: number of call retries
601-
"""
602-
metadata = dict(cause.trailing_metadata())
603-
retry_info_pb = metadata.get("google.rpc.retryinfo-bin")
604-
if retry_info_pb is not None:
605-
retry_info = RetryInfo()
606-
retry_info.ParseFromString(retry_info_pb)
607-
nanos = retry_info.retry_delay.nanos
608-
return retry_info.retry_delay.seconds + nanos / 1.0e9
609-
610-
return 2**attempts + random.random()

‎google/cloud/spanner_v1/testing/mock_spanner.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/testing/mock_spanner.py
+13-4Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,19 @@ def __create_transaction(
213213
def Commit(self, request, context):
214214
self._requests.append(request)
215215
self.mock_spanner.pop_error(context)
216-
tx = self.transactions[request.transaction_id]
217-
if tx is None:
218-
raise ValueError(f"Transaction not found: {request.transaction_id}")
219-
del self.transactions[request.transaction_id]
216+
if not request.transaction_id == b"":
217+
tx = self.transactions[request.transaction_id]
218+
if tx is None:
219+
raise ValueError(f"Transaction not found: {request.transaction_id}")
220+
tx_id = request.transaction_id
221+
elif not request.single_use_transaction == TransactionOptions():
222+
tx = self.__create_transaction(
223+
request.session, request.single_use_transaction
224+
)
225+
tx_id = tx.id
226+
else:
227+
raise ValueError("Unsupported transaction type")
228+
del self.transactions[tx_id]
220229
return commit.CommitResponse()
221230

222231
def Rollback(self, request, context):

‎tests/mockserver_tests/test_aborted_transaction.py

Copy file name to clipboardExpand all lines: tests/mockserver_tests/test_aborted_transaction.py
+24Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,30 @@ def test_run_in_transaction_batch_dml_aborted(self):
9595
self.assertTrue(isinstance(requests[2], ExecuteBatchDmlRequest))
9696
self.assertTrue(isinstance(requests[3], CommitRequest))
9797

98+
def test_batch_commit_aborted(self):
99+
# Add an Aborted error for the Commit method on the mock server.
100+
add_error(SpannerServicer.Commit.__name__, aborted_status())
101+
with self.database.batch() as batch:
102+
batch.insert(
103+
table="Singers",
104+
columns=("SingerId", "FirstName", "LastName"),
105+
values=[
106+
(1, "Marc", "Richards"),
107+
(2, "Catalina", "Smith"),
108+
(3, "Alice", "Trentor"),
109+
(4, "Lea", "Martin"),
110+
(5, "David", "Lomond"),
111+
],
112+
)
113+
114+
# Verify that the transaction was retried.
115+
requests = self.spanner_service.requests
116+
self.assertEqual(3, len(requests), msg=requests)
117+
self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest))
118+
self.assertTrue(isinstance(requests[1], CommitRequest))
119+
# The transaction is aborted and retried.
120+
self.assertTrue(isinstance(requests[2], CommitRequest))
121+
98122

99123
def _insert_mutations(transaction: Transaction):
100124
transaction.insert("my_table", ["col1", "col2"], ["value1", "value2"])

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.