Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 423e5bc

Browse filesBrowse files
committed
fix(tracing): ensure nesting of Transaction.begin under commit + fix suggestions from feature review
This change ensures that: * If a transaction was not yet begin, that if .commit() is invoked the resulting span hierarchy has .begin nested under .commit * We use "CloudSpanner.Transaction.execute_sql" instead of "CloudSpanner.Transaction.execute_streaming_sql" * If we have a tracer_provider that produces non-recordings spans, that it won't crash due to lacking `span._status` Fixes #1286
1 parent 04a11a6 commit 423e5bc
Copy full SHA for 423e5bc

File tree

Expand file treeCollapse file tree

9 files changed

+403
-54
lines changed
Filter options
Expand file treeCollapse file tree

9 files changed

+403
-54
lines changed

‎google/cloud/spanner_v1/_opentelemetry_tracing.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/_opentelemetry_tracing.py
+4-1Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
117117
# invoke .record_exception on our own else we shall have 2 exceptions.
118118
raise
119119
else:
120-
if (not span._status) or span._status.status_code == StatusCode.UNSET:
120+
# All spans still have set_status available even if for example
121+
# NonRecordingSpan doesn't have "_status".
122+
absent_span_status = getattr(span, "_status", None) is None
123+
if absent_span_status or span._status.status_code == StatusCode.UNSET:
121124
# OpenTelemetry-Python only allows a status change
122125
# if the current code is UNSET or ERROR. At the end
123126
# of the generator's consumption, only set it to OK

‎google/cloud/spanner_v1/snapshot.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/snapshot.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ def _get_streamed_result_set(
583583
iterator = _restart_on_unavailable(
584584
restart,
585585
request,
586-
f"CloudSpanner.{type(self).__name__}.execute_streaming_sql",
586+
f"CloudSpanner.{type(self).__name__}.execute_sql",
587587
self._session,
588588
trace_attributes,
589589
transaction=self,

‎google/cloud/spanner_v1/transaction.py

Copy file name to clipboardExpand all lines: google/cloud/spanner_v1/transaction.py
+34-32Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -242,39 +242,7 @@ def commit(
242242
:returns: timestamp of the committed changes.
243243
:raises ValueError: if there are no mutations to commit.
244244
"""
245-
self._check_state()
246-
if self._transaction_id is None and len(self._mutations) > 0:
247-
self.begin()
248-
elif self._transaction_id is None and len(self._mutations) == 0:
249-
raise ValueError("Transaction is not begun")
250-
251245
database = self._session._database
252-
api = database.spanner_api
253-
metadata = _metadata_with_prefix(database.name)
254-
if database._route_to_leader_enabled:
255-
metadata.append(
256-
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
257-
)
258-
259-
if request_options is None:
260-
request_options = RequestOptions()
261-
elif type(request_options) is dict:
262-
request_options = RequestOptions(request_options)
263-
if self.transaction_tag is not None:
264-
request_options.transaction_tag = self.transaction_tag
265-
266-
# Request tags are not supported for commit requests.
267-
request_options.request_tag = None
268-
269-
request = CommitRequest(
270-
session=self._session.name,
271-
mutations=self._mutations,
272-
transaction_id=self._transaction_id,
273-
return_commit_stats=return_commit_stats,
274-
max_commit_delay=max_commit_delay,
275-
request_options=request_options,
276-
)
277-
278246
trace_attributes = {"num_mutations": len(self._mutations)}
279247
observability_options = getattr(database, "observability_options", None)
280248
with trace_call(
@@ -283,6 +251,40 @@ def commit(
283251
trace_attributes,
284252
observability_options,
285253
) as span:
254+
self._check_state()
255+
if self._transaction_id is None and len(self._mutations) > 0:
256+
self.begin()
257+
elif self._transaction_id is None and len(self._mutations) == 0:
258+
raise ValueError("Transaction is not begun")
259+
260+
api = database.spanner_api
261+
metadata = _metadata_with_prefix(database.name)
262+
if database._route_to_leader_enabled:
263+
metadata.append(
264+
_metadata_with_leader_aware_routing(
265+
database._route_to_leader_enabled
266+
)
267+
)
268+
269+
if request_options is None:
270+
request_options = RequestOptions()
271+
elif type(request_options) is dict:
272+
request_options = RequestOptions(request_options)
273+
if self.transaction_tag is not None:
274+
request_options.transaction_tag = self.transaction_tag
275+
276+
# Request tags are not supported for commit requests.
277+
request_options.request_tag = None
278+
279+
request = CommitRequest(
280+
session=self._session.name,
281+
mutations=self._mutations,
282+
transaction_id=self._transaction_id,
283+
return_commit_stats=return_commit_stats,
284+
max_commit_delay=max_commit_delay,
285+
request_options=request_options,
286+
)
287+
286288
add_span_event(span, "Starting Commit")
287289

288290
method = functools.partial(

‎tests/_helpers.py

Copy file name to clipboardExpand all lines: tests/_helpers.py
+17Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,20 @@ def get_finished_spans(self):
132132

133133
def reset(self):
134134
self.tearDown()
135+
136+
def finished_spans_events_statuses(self):
137+
span_list = self.get_finished_spans()
138+
# Some event attributes are noisy/highly ephemeral
139+
# and can't be directly compared against.
140+
got_all_events = []
141+
imprecise_event_attributes = ["exception.stacktrace", "delay_seconds", "cause"]
142+
for span in span_list:
143+
for event in span.events:
144+
evt_attributes = event.attributes.copy()
145+
for attr_name in imprecise_event_attributes:
146+
if attr_name in evt_attributes:
147+
evt_attributes[attr_name] = "EPHEMERAL"
148+
149+
got_all_events.append((event.name, evt_attributes))
150+
151+
return got_all_events

‎tests/system/test_observability_options.py

Copy file name to clipboardExpand all lines: tests/system/test_observability_options.py
+198-5Lines changed: 198 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def test_propagation(enable_extended_tracing):
107107
gotNames = [span.name for span in from_inject_spans]
108108
wantNames = [
109109
"CloudSpanner.CreateSession",
110-
"CloudSpanner.Snapshot.execute_streaming_sql",
110+
"CloudSpanner.Snapshot.execute_sql",
111111
]
112112
assert gotNames == wantNames
113113

@@ -216,8 +216,8 @@ def select_in_txn(txn):
216216
"CloudSpanner.Database.run_in_transaction",
217217
"CloudSpanner.CreateSession",
218218
"CloudSpanner.Session.run_in_transaction",
219-
"CloudSpanner.Transaction.execute_streaming_sql",
220-
"CloudSpanner.Transaction.execute_streaming_sql",
219+
"CloudSpanner.Transaction.execute_sql",
220+
"CloudSpanner.Transaction.execute_sql",
221221
"CloudSpanner.Transaction.commit",
222222
]
223223

@@ -262,13 +262,206 @@ def select_in_txn(txn):
262262
("CloudSpanner.Database.run_in_transaction", codes.OK, None),
263263
("CloudSpanner.CreateSession", codes.OK, None),
264264
("CloudSpanner.Session.run_in_transaction", codes.OK, None),
265-
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
266-
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
265+
("CloudSpanner.Transaction.execute_sql", codes.OK, None),
266+
("CloudSpanner.Transaction.execute_sql", codes.OK, None),
267267
("CloudSpanner.Transaction.commit", codes.OK, None),
268268
]
269269
assert got_statuses == want_statuses
270270

271271

272+
@pytest.mark.skipif(
273+
not _helpers.USE_EMULATOR,
274+
reason="Emulator needed to run this tests",
275+
)
276+
@pytest.mark.skipif(
277+
not HAS_OTEL_INSTALLED,
278+
reason="Tracing requires OpenTelemetry",
279+
)
280+
def test_transaction_update_implicit_begin_nested_inside_commit():
281+
# Tests to ensure that transaction.commit() without a began transaction
282+
# has transaction.begin() inlined and nested under the commit span.
283+
from google.auth.credentials import AnonymousCredentials
284+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
285+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
286+
InMemorySpanExporter,
287+
)
288+
from opentelemetry.trace.status import StatusCode
289+
from opentelemetry.sdk.trace import TracerProvider
290+
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
291+
292+
PROJECT = _helpers.EMULATOR_PROJECT
293+
CONFIGURATION_NAME = "config-name"
294+
INSTANCE_ID = _helpers.INSTANCE_ID
295+
DISPLAY_NAME = "display-name"
296+
DATABASE_ID = _helpers.unique_id("temp_db")
297+
NODE_COUNT = 5
298+
LABELS = {"test": "true"}
299+
300+
def tx_update(txn):
301+
txn.update(
302+
"Singers",
303+
columns=["SingerId", "FirstName"],
304+
values=[["1", "Bryan"], ["2", "Slash"]],
305+
)
306+
307+
tracer_provider = TracerProvider(sampler=ALWAYS_ON)
308+
trace_exporter = InMemorySpanExporter()
309+
tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter))
310+
observability_options = dict(
311+
tracer_provider=tracer_provider,
312+
enable_extended_tracing=True,
313+
)
314+
315+
client = Client(
316+
project=PROJECT,
317+
observability_options=observability_options,
318+
credentials=AnonymousCredentials(),
319+
)
320+
321+
instance = client.instance(
322+
INSTANCE_ID,
323+
CONFIGURATION_NAME,
324+
display_name=DISPLAY_NAME,
325+
node_count=NODE_COUNT,
326+
labels=LABELS,
327+
)
328+
329+
try:
330+
instance.create()
331+
except Exception:
332+
pass
333+
334+
db = instance.database(DATABASE_ID)
335+
try:
336+
db._ddl_statements = [
337+
"""CREATE TABLE Singers (
338+
SingerId INT64 NOT NULL,
339+
FirstName STRING(1024),
340+
LastName STRING(1024),
341+
SingerInfo BYTES(MAX),
342+
FullName STRING(2048) AS (
343+
ARRAY_TO_STRING([FirstName, LastName], " ")
344+
) STORED
345+
) PRIMARY KEY (SingerId)""",
346+
"""CREATE TABLE Albums (
347+
SingerId INT64 NOT NULL,
348+
AlbumId INT64 NOT NULL,
349+
AlbumTitle STRING(MAX),
350+
MarketingBudget INT64,
351+
) PRIMARY KEY (SingerId, AlbumId),
352+
INTERLEAVE IN PARENT Singers ON DELETE CASCADE""",
353+
]
354+
db.create()
355+
except Exception:
356+
pass
357+
358+
try:
359+
db.run_in_transaction(tx_update)
360+
except Exception:
361+
pass
362+
363+
span_list = trace_exporter.get_finished_spans()
364+
# Sort the spans by their start time in the hierarchy.
365+
span_list = sorted(span_list, key=lambda span: span.start_time)
366+
got_span_names = [span.name for span in span_list]
367+
want_span_names = [
368+
"CloudSpanner.Database.run_in_transaction",
369+
"CloudSpanner.CreateSession",
370+
"CloudSpanner.Session.run_in_transaction",
371+
"CloudSpanner.Transaction.commit",
372+
"CloudSpanner.Transaction.begin",
373+
]
374+
375+
assert got_span_names == want_span_names
376+
377+
# Our object is to ensure that .begin() is a child of .commit()
378+
span_tx_begin = span_list[-1]
379+
span_tx_commit = span_list[-2]
380+
assert span_tx_begin.parent.span_id == span_tx_commit.context.span_id
381+
382+
got_events = []
383+
got_statuses = []
384+
385+
# Some event attributes are noisy/highly ephemeral
386+
# and can't be directly compared against.
387+
imprecise_event_attributes = ["exception.stacktrace", "delay_seconds", "cause"]
388+
for span in span_list:
389+
got_statuses.append(
390+
(span.name, span.status.status_code, span.status.description)
391+
)
392+
for event in span.events:
393+
evt_attributes = event.attributes.copy()
394+
for attr_name in imprecise_event_attributes:
395+
if attr_name in evt_attributes:
396+
evt_attributes[attr_name] = "EPHEMERAL"
397+
398+
got_events.append((event.name, evt_attributes))
399+
400+
# Check for the series of events
401+
want_events = [
402+
("Acquiring session", {"kind": "BurstyPool"}),
403+
("Waiting for a session to become available", {"kind": "BurstyPool"}),
404+
("No sessions available in pool. Creating session", {"kind": "BurstyPool"}),
405+
("Creating Session", {}),
406+
(
407+
"exception",
408+
{
409+
"exception.type": "google.api_core.exceptions.NotFound",
410+
"exception.message": "404 Table Singers: Row {Int64(1)} not found.",
411+
"exception.stacktrace": "EPHEMERAL",
412+
"exception.escaped": "False",
413+
},
414+
),
415+
(
416+
"Transaction.commit failed due to GoogleAPICallError, not retrying",
417+
{"attempt": 1},
418+
),
419+
(
420+
"exception",
421+
{
422+
"exception.type": "google.api_core.exceptions.NotFound",
423+
"exception.message": "404 Table Singers: Row {Int64(1)} not found.",
424+
"exception.stacktrace": "EPHEMERAL",
425+
"exception.escaped": "False",
426+
},
427+
),
428+
("Starting Commit", {}),
429+
(
430+
"exception",
431+
{
432+
"exception.type": "google.api_core.exceptions.NotFound",
433+
"exception.message": "404 Table Singers: Row {Int64(1)} not found.",
434+
"exception.stacktrace": "EPHEMERAL",
435+
"exception.escaped": "False",
436+
},
437+
),
438+
]
439+
assert got_events == want_events
440+
441+
# Check for the statues.
442+
codes = StatusCode
443+
want_statuses = [
444+
(
445+
"CloudSpanner.Database.run_in_transaction",
446+
codes.ERROR,
447+
"NotFound: 404 Table Singers: Row {Int64(1)} not found.",
448+
),
449+
("CloudSpanner.CreateSession", codes.OK, None),
450+
(
451+
"CloudSpanner.Session.run_in_transaction",
452+
codes.ERROR,
453+
"NotFound: 404 Table Singers: Row {Int64(1)} not found.",
454+
),
455+
(
456+
"CloudSpanner.Transaction.commit",
457+
codes.ERROR,
458+
"NotFound: 404 Table Singers: Row {Int64(1)} not found.",
459+
),
460+
("CloudSpanner.Transaction.begin", codes.OK, None),
461+
]
462+
assert got_statuses == want_statuses
463+
464+
272465
def _make_credentials():
273466
from google.auth.credentials import AnonymousCredentials
274467

‎tests/unit/test__opentelemetry_tracing.py

Copy file name to clipboardExpand all lines: tests/unit/test__opentelemetry_tracing.py
+30-1Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def test_trace_codeless_error(self):
159159
span = span_list[0]
160160
self.assertEqual(span.status.status_code, StatusCode.ERROR)
161161

162-
def test_trace_call_terminal_span_status(self):
162+
def test_trace_call_terminal_span_status_ALWAYS_ON_sampler(self):
163163
# Verify that we don't unconditionally set the terminal span status to
164164
# SpanStatus.OK per https://github.com/googleapis/python-spanner/issues/1246
165165
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
@@ -195,3 +195,32 @@ def test_trace_call_terminal_span_status(self):
195195
("VerifyTerminalSpanStatus", StatusCode.ERROR, "Our error exhibit"),
196196
]
197197
assert got_statuses == want_statuses
198+
199+
def test_trace_call_terminal_span_status_ALWAYS_OFF_sampler(self):
200+
# Verify that we get the correct status even when using the ALWAYS_OFF
201+
# sampler which produces the NonRecordingSpan per
202+
# https://github.com/googleapis/python-spanner/issues/1286
203+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
204+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
205+
InMemorySpanExporter,
206+
)
207+
from opentelemetry.sdk.trace import TracerProvider
208+
from opentelemetry.sdk.trace.sampling import ALWAYS_OFF
209+
210+
tracer_provider = TracerProvider(sampler=ALWAYS_OFF)
211+
trace_exporter = InMemorySpanExporter()
212+
tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter))
213+
observability_options = dict(tracer_provider=tracer_provider)
214+
215+
session = _make_session()
216+
used_span = None
217+
with _opentelemetry_tracing.trace_call(
218+
"VerifyWithNonRecordingSpan",
219+
session,
220+
observability_options=observability_options,
221+
) as span:
222+
used_span = span
223+
224+
assert type(used_span).__name__ == "NonRecordingSpan"
225+
span_list = list(trace_exporter.get_finished_spans())
226+
assert span_list == []

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.