Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 686bda6

Browse filesBrowse files
authored
chore(x-goog-request-id): commit testing scaffold (#1366)
* chore(x-goog-request-id): commit testing scaffold This change commits the scaffolding for which testing will be used. This is a carve out of PRs #1264 and #1364, meant to make those changes lighter and much easier to review then merge. Updates #1261 * Use guard to keep x-goog-request-id interceptor docile in tests until activation later * AtomicCounter update * Remove duplicate unavailable_status that had been already committed into main
1 parent e53eaa2 commit 686bda6
Copy full SHA for 686bda6

File tree

Expand file treeCollapse file tree

8 files changed

+163
-7
lines changed
Filter options
Expand file treeCollapse file tree

8 files changed

+163
-7
lines changed

‎google/cloud/spanner_v1/_helpers.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/_helpers.py
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,10 @@ def __radd__(self, n):
707707
"""
708708
return self.__add__(n)
709709

710+
def reset(self):
711+
with self.__lock:
712+
self.__value = 0
713+
710714

711715
def _metadata_with_request_id(*args, **kwargs):
712716
return with_request_id(*args, **kwargs)

‎google/cloud/spanner_v1/client.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/client.py
+9Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
except ImportError: # pragma: NO COVER
7171
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False
7272

73+
from google.cloud.spanner_v1._helpers import AtomicCounter
7374

7475
_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
7576
EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST"
@@ -182,6 +183,8 @@ class Client(ClientWithProject):
182183
SCOPE = (SPANNER_ADMIN_SCOPE,)
183184
"""The scopes required for Google Cloud Spanner."""
184185

186+
NTH_CLIENT = AtomicCounter()
187+
185188
def __init__(
186189
self,
187190
project=None,
@@ -263,6 +266,12 @@ def __init__(
263266
"default_transaction_options must be an instance of DefaultTransactionOptions"
264267
)
265268
self._default_transaction_options = default_transaction_options
269+
self._nth_client_id = Client.NTH_CLIENT.increment()
270+
self._nth_request = AtomicCounter(0)
271+
272+
@property
273+
def _next_nth_request(self):
274+
return self._nth_request.increment()
266275

267276
@property
268277
def credentials(self):

‎google/cloud/spanner_v1/request_id_header.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/request_id_header.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,6 @@ def generate_rand_uint64():
3737

3838
def with_request_id(client_id, channel_id, nth_request, attempt, other_metadata=[]):
3939
req_id = f"{REQ_ID_VERSION}.{REQ_RAND_PROCESS_ID}.{client_id}.{channel_id}.{nth_request}.{attempt}"
40-
all_metadata = other_metadata.copy()
40+
all_metadata = (other_metadata or []).copy()
4141
all_metadata.append((REQ_ID_HEADER_KEY, req_id))
4242
return all_metadata

‎google/cloud/spanner_v1/testing/database_test.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/testing/database_test.py
+9Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from google.cloud.spanner_v1.testing.interceptors import (
2626
MethodCountInterceptor,
2727
MethodAbortInterceptor,
28+
XGoogRequestIDHeaderInterceptor,
2829
)
2930

3031

@@ -34,6 +35,8 @@ class TestDatabase(Database):
3435
currently, and we don't want to make changes in the Database class for
3536
testing purpose as this is a hack to use interceptors in tests."""
3637

38+
_interceptors = []
39+
3740
def __init__(
3841
self,
3942
database_id,
@@ -74,6 +77,8 @@ def spanner_api(self):
7477
client_options = client._client_options
7578
if self._instance.emulator_host is not None:
7679
channel = grpc.insecure_channel(self._instance.emulator_host)
80+
self._x_goog_request_id_interceptor = XGoogRequestIDHeaderInterceptor()
81+
self._interceptors.append(self._x_goog_request_id_interceptor)
7782
channel = grpc.intercept_channel(channel, *self._interceptors)
7883
transport = SpannerGrpcTransport(channel=channel)
7984
self._spanner_api = SpannerClient(
@@ -110,3 +115,7 @@ def _create_spanner_client_for_tests(self, client_options, credentials):
110115
client_options=client_options,
111116
transport=transport,
112117
)
118+
119+
def reset(self):
120+
if self._x_goog_request_id_interceptor:
121+
self._x_goog_request_id_interceptor.reset()

‎google/cloud/spanner_v1/testing/interceptors.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/testing/interceptors.py
+71Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# limitations under the License.
1414

1515
from collections import defaultdict
16+
import threading
17+
1618
from grpc_interceptor import ClientInterceptor
1719
from google.api_core.exceptions import Aborted
1820

@@ -63,3 +65,72 @@ def reset(self):
6365
self._method_to_abort = None
6466
self._count = 0
6567
self._connection = None
68+
69+
70+
X_GOOG_REQUEST_ID = "x-goog-spanner-request-id"
71+
72+
73+
class XGoogRequestIDHeaderInterceptor(ClientInterceptor):
74+
# TODO:(@odeke-em): delete this guard when PR #1367 is merged.
75+
X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED = False
76+
77+
def __init__(self):
78+
self._unary_req_segments = []
79+
self._stream_req_segments = []
80+
self.__lock = threading.Lock()
81+
82+
def intercept(self, method, request_or_iterator, call_details):
83+
metadata = call_details.metadata
84+
x_goog_request_id = None
85+
for key, value in metadata:
86+
if key == X_GOOG_REQUEST_ID:
87+
x_goog_request_id = value
88+
break
89+
90+
if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED and not x_goog_request_id:
91+
raise Exception(
92+
f"Missing {X_GOOG_REQUEST_ID} header in {call_details.method}"
93+
)
94+
95+
response_or_iterator = method(request_or_iterator, call_details)
96+
streaming = getattr(response_or_iterator, "__iter__", None) is not None
97+
98+
if self.X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED:
99+
with self.__lock:
100+
if streaming:
101+
self._stream_req_segments.append(
102+
(call_details.method, parse_request_id(x_goog_request_id))
103+
)
104+
else:
105+
self._unary_req_segments.append(
106+
(call_details.method, parse_request_id(x_goog_request_id))
107+
)
108+
109+
return response_or_iterator
110+
111+
@property
112+
def unary_request_ids(self):
113+
return self._unary_req_segments
114+
115+
@property
116+
def stream_request_ids(self):
117+
return self._stream_req_segments
118+
119+
def reset(self):
120+
self._stream_req_segments.clear()
121+
self._unary_req_segments.clear()
122+
123+
124+
def parse_request_id(request_id_str):
125+
splits = request_id_str.split(".")
126+
version, rand_process_id, client_id, channel_id, nth_request, nth_attempt = list(
127+
map(lambda v: int(v), splits)
128+
)
129+
return (
130+
version,
131+
rand_process_id,
132+
client_id,
133+
channel_id,
134+
nth_request,
135+
nth_attempt,
136+
)

‎google/cloud/spanner_v1/testing/mock_spanner.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/testing/mock_spanner.py
+2-5Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
from google.cloud.spanner_v1 import (
2323
TransactionOptions,
2424
ResultSetMetadata,
25-
ExecuteSqlRequest,
26-
ExecuteBatchDmlRequest,
2725
)
2826
from google.cloud.spanner_v1.testing.mock_database_admin import DatabaseAdminServicer
2927
import google.cloud.spanner_v1.testing.spanner_database_admin_pb2_grpc as database_admin_grpc
@@ -107,6 +105,7 @@ def CreateSession(self, request, context):
107105

108106
def BatchCreateSessions(self, request, context):
109107
self._requests.append(request)
108+
self.mock_spanner.pop_error(context)
110109
sessions = []
111110
for i in range(request.session_count):
112111
sessions.append(
@@ -186,9 +185,7 @@ def BeginTransaction(self, request, context):
186185
self._requests.append(request)
187186
return self.__create_transaction(request.session, request.options)
188187

189-
def __maybe_create_transaction(
190-
self, request: ExecuteSqlRequest | ExecuteBatchDmlRequest
191-
):
188+
def __maybe_create_transaction(self, request):
192189
started_transaction = None
193190
if not request.transaction.begin == TransactionOptions():
194191
started_transaction = self.__create_transaction(

‎tests/mockserver_tests/mock_server_test_base.py

Copy file name to clipboardExpand all lines: tests/mockserver_tests/mock_server_test_base.py
+4-1Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ def setup_class(cls):
153153
def teardown_class(cls):
154154
if MockServerTestBase.server is not None:
155155
MockServerTestBase.server.stop(grace=None)
156+
Client.NTH_CLIENT.reset()
156157
MockServerTestBase.server = None
157158

158159
def setup_method(self, *args, **kwargs):
@@ -186,6 +187,8 @@ def instance(self) -> Instance:
186187
def database(self) -> Database:
187188
if self._database is None:
188189
self._database = self.instance.database(
189-
"test-database", pool=FixedSizePool(size=10)
190+
"test-database",
191+
pool=FixedSizePool(size=10),
192+
enable_interceptors_in_tests=True,
190193
)
191194
return self._database

‎tests/unit/test_transaction.py

Copy file name to clipboardExpand all lines: tests/unit/test_transaction.py
+63Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
from google.cloud.spanner_v1 import TypeCode
2222
from google.api_core.retry import Retry
2323
from google.api_core import gapic_v1
24+
from google.cloud.spanner_v1._helpers import (
25+
AtomicCounter,
26+
_metadata_with_request_id,
27+
)
2428

2529
from tests._helpers import (
2630
HAS_OPENTELEMETRY_INSTALLED,
@@ -197,6 +201,11 @@ def test_begin_ok(self):
197201
[
198202
("google-cloud-resource-prefix", database.name),
199203
("x-goog-spanner-route-to-leader", "true"),
204+
# TODO(@odeke-em): enable with PR #1367.
205+
# (
206+
# "x-goog-spanner-request-id",
207+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
208+
# ),
200209
],
201210
)
202211

@@ -301,6 +310,11 @@ def test_rollback_ok(self):
301310
[
302311
("google-cloud-resource-prefix", database.name),
303312
("x-goog-spanner-route-to-leader", "true"),
313+
# TODO(@odeke-em): enable with PR #1367.
314+
# (
315+
# "x-goog-spanner-request-id",
316+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
317+
# ),
304318
],
305319
)
306320

@@ -492,6 +506,11 @@ def _commit_helper(
492506
[
493507
("google-cloud-resource-prefix", database.name),
494508
("x-goog-spanner-route-to-leader", "true"),
509+
# TODO(@odeke-em): enable with PR #1367.
510+
# (
511+
# "x-goog-spanner-request-id",
512+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
513+
# ),
495514
],
496515
)
497516
self.assertEqual(actual_request_options, expected_request_options)
@@ -666,6 +685,11 @@ def _execute_update_helper(
666685
metadata=[
667686
("google-cloud-resource-prefix", database.name),
668687
("x-goog-spanner-route-to-leader", "true"),
688+
# TODO(@odeke-em): enable with PR #1367.
689+
# (
690+
# "x-goog-spanner-request-id",
691+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
692+
# ),
669693
],
670694
)
671695

@@ -859,6 +883,11 @@ def _batch_update_helper(
859883
metadata=[
860884
("google-cloud-resource-prefix", database.name),
861885
("x-goog-spanner-route-to-leader", "true"),
886+
# TODO(@odeke-em): enable with PR #1367.
887+
# (
888+
# "x-goog-spanner-request-id",
889+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
890+
# ),
862891
],
863892
retry=retry,
864893
timeout=timeout,
@@ -974,6 +1003,11 @@ def test_context_mgr_success(self):
9741003
[
9751004
("google-cloud-resource-prefix", database.name),
9761005
("x-goog-spanner-route-to-leader", "true"),
1006+
# TODO(@odeke-em): enable with PR #1367.
1007+
# (
1008+
# "x-goog-spanner-request-id",
1009+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1",
1010+
# ),
9771011
],
9781012
)
9791013

@@ -1004,11 +1038,19 @@ def test_context_mgr_failure(self):
10041038

10051039

10061040
class _Client(object):
1041+
NTH_CLIENT = AtomicCounter()
1042+
10071043
def __init__(self):
10081044
from google.cloud.spanner_v1 import ExecuteSqlRequest
10091045

10101046
self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1")
10111047
self.directed_read_options = None
1048+
self._nth_client_id = _Client.NTH_CLIENT.increment()
1049+
self._nth_request = AtomicCounter()
1050+
1051+
@property
1052+
def _next_nth_request(self):
1053+
return self._nth_request.increment()
10121054

10131055

10141056
class _Instance(object):
@@ -1024,6 +1066,27 @@ def __init__(self):
10241066
self._directed_read_options = None
10251067
self.default_transaction_options = DefaultTransactionOptions()
10261068

1069+
@property
1070+
def _next_nth_request(self):
1071+
return self._instance._client._next_nth_request
1072+
1073+
@property
1074+
def _nth_client_id(self):
1075+
return self._instance._client._nth_client_id
1076+
1077+
def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]):
1078+
return _metadata_with_request_id(
1079+
self._nth_client_id,
1080+
self._channel_id,
1081+
nth_request,
1082+
nth_attempt,
1083+
prior_metadata,
1084+
)
1085+
1086+
@property
1087+
def _channel_id(self):
1088+
return 1
1089+
10271090

10281091
class _Session(object):
10291092
_transaction = None

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.