Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a0da993

Browse filesBrowse files
authored
feat(storage): Enhance Otel Span Attributes with BucketId and Location details for every Bucket/Blob operation
# feat(storage): Enhance Otel Span Attributes with BucketId and Location details for every Bucket/Blob operation as part of ACO (App-centric Observability) This PR implements **App-centric Observability (ACO)** tracing compatibility for the GCS Python SDK (`google-cloud-storage`). All OpenTelemetry trace spans produced by bucket and blob operations now seamlessly incorporate mandatory destination resource annotations (`gcp.resource.destination.id` and `gcp.resource.destination.location`). --- ## Core Architecture & Design ### 1. Centralized, DRY Telemetry Helper (`_helpers.py`) - All OpenTelemetry span context generation, attribute injection, and exception trapping are centralized in a module-level context manager `create_trace_span_helper` in [`_helpers.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/google/cloud/storage/_helpers.py). - **Zero modifications to the core tracing module**: [`_opentelemetry_tracing.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/google/cloud/storage/_opentelemetry_tracing.py) remains completely pristine and identical to `main`. - Seamlessly wrapped all critical read/write operations across `blob.py`, `bucket.py`, and `client.py` (e.g., `download_as_bytes`, `upload_from_string`, `get_bucket`, `lookup_bucket`, etc.). ### 2. Bounded LRU Metadata Cache (`_lru_cache.py`, `_bucket_metadata_cache.py`) - **LRU Capacity Bounding**: Implemented `LRUCache` utilizing an `OrderedDict` to support O(1) operations and strict capacity bounding to eliminate memory leaks in long-running applications. - **Concurrent Singleflight Warming**: Implemented `BucketMetadataCache` to store bucket locations and project numbers. On cache misses, it spawns background threads (`_fetch_background`) using singleflight tracking (`_inflight_fetches`) to prevent server stampedes / thundering herds. - **Fallback Annotations on 403**: On GCS `403 Forbidden` permissions errors, the cache permanently registers fallback annotations (`projects/_/buckets/{name}`) to completely avoid retry storms on subsequent API calls. ### 3. Resilient 404 Existence Eviction (`_http.py`, `_helpers.py`, `bucket.py`) - **Smart Out-of-band 404 Verification**: When a `404 NotFound` error occurs during media transfers or REST calls, a background thread is spawned (with concurrency protection via `_inflight_checks`) to check if the bucket was deleted out-of-band (`bucket.exists()`). If `exists()` returns `False`, the bucket is cleanly evicted from the cache. - **Instant Synchronous Eviction**: Direct `Bucket.delete()` calls synchronously and instantly evict the bucket name from the cache, ensuring real-time consistency. --- ## Extensive Testing Suite ### 1. 100% Sleep-Free System Tests (`test_aco_observability.py`) Added a comprehensive system test suite [`test_aco_observability.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/tests/system/test_aco_observability.py) executing against a live GCS backend: - **Sequential Priming**: Verifies cache miss return times, background priming, and subsequent span enrichment. - **403 Fallback**: Verifies minimal fallback registration on Forbidden responses. - **Cache Stampede Protection**: Simulates 15 concurrent threads on a cache miss and asserts only 1 GCS call is fired. - **Smart 404 Eviction**: Deletes a bucket out-of-band and verifies async cache clean-up on 404. - **Synchronous Delete Eviction**: Asserts immediate cache eviction on SDK deletion. - **LRU Capacity Bounding**: Populates the cache beyond its limits and verifies proper LRU eviction. - **Deterministic Synchronization**: Uses **`threading.Event` (zero static sleeps)** for thread coordination, guaranteeing thundering-fast execution and completely eliminating timing flakiness. ### 2. Robust Unit Tests - Added [`test__lru_cache.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/tests/unit/test__lru_cache.py) (LRU correctness, bounding, eviction). - Added [`test__bucket_metadata_cache.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py) (concurrency, location resolution, 403 fallback, singleflight). - Added `test_delete_hit_evicts_from_cache` inside [`test_bucket.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/tests/unit/test_bucket.py). --- ## Validation Results All checks, unit tests, and live GCS system tests pass flawlessly: - **Unit Tests**: 835 passed in 17.82s - **System Tests**: 8 passed in 26.94s - **Format & Linter**: 100% clean (`black` / `flake8`)
1 parent 4d64ebc commit a0da993
Copy full SHA for a0da993

14 files changed

+1,584-86Lines changed: 1584 additions & 86 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file
+150Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""In-memory LRU cache for bucket metadata supporting App-centric Observability (ACO)."""
16+
17+
import logging
18+
import threading
19+
20+
from google.api_core import exceptions as api_exceptions
21+
from google.cloud.exceptions import NotFound
22+
from google.cloud.storage._lru_cache import LRUCache
23+
24+
logger = logging.getLogger(__name__)
25+
26+
27+
class BucketMetadataCache:
28+
"""Thread-safe LRU cache for storing GCS bucket metadata (project number and location).
29+
30+
Supports Singleflight asynchronous background fetching to prevent stampedes on cache misses.
31+
"""
32+
33+
def __init__(self, client, max_size=10000):
34+
self._client = client
35+
self._cache = LRUCache(max_size)
36+
self._lock = threading.Lock()
37+
self._inflight_fetches = set()
38+
self._inflight_checks = set()
39+
40+
def get(self, bucket_name):
41+
"""Thread-safely retrieve cached metadata without queueing fetch."""
42+
with self._lock:
43+
return self._cache.get(bucket_name)
44+
45+
def get_or_queue_fetch(self, bucket_name):
46+
"""Retrieve bucket metadata or queue a background fetch on cache miss.
47+
48+
Returns None immediately on cache miss so caller does not block.
49+
"""
50+
with self._lock:
51+
if bucket_name in self._cache:
52+
return self._cache.get(bucket_name)
53+
elif bucket_name in self._inflight_fetches:
54+
# This handles a thundering herd where 'n' threads
55+
# simultaneously experience a cache miss while 1 is already
56+
# fetching metadata. The remaining n - 1 threads should
57+
# bypass starting duplicate fetches.
58+
return None
59+
else:
60+
# fire a background thread and get bucket metadata.
61+
self._inflight_fetches.add(bucket_name)
62+
threading.Thread(
63+
target=self._fetch_background, args=(bucket_name,), daemon=True
64+
).start()
65+
return None
66+
67+
def check_and_evict(self, bucket_name):
68+
"""Asynchronously verify if a bucket exists on 404 and evict if deleted."""
69+
with self._lock:
70+
if bucket_name not in self._cache:
71+
return
72+
if bucket_name in self._inflight_checks:
73+
return
74+
self._inflight_checks.add(bucket_name)
75+
threading.Thread(
76+
target=self._verify_existence_background,
77+
args=(bucket_name,),
78+
daemon=True,
79+
).start()
80+
81+
def _verify_existence_background(self, bucket_name):
82+
try:
83+
bucket = self._client.bucket(bucket_name)
84+
if not bucket.exists():
85+
self.evict(bucket_name)
86+
except Exception as e:
87+
logger.debug(
88+
f"Background verification for bucket existence failed for {bucket_name}: {e}"
89+
)
90+
finally:
91+
with self._lock:
92+
self._inflight_checks.discard(bucket_name)
93+
94+
def _fetch_background(self, bucket_name):
95+
"""Asynchronously fetch bucket metadata and update the cache."""
96+
try:
97+
bucket = self._client.get_bucket(bucket_name, timeout=10.0)
98+
self.update_from_bucket(bucket)
99+
except (NotFound, api_exceptions.NotFound):
100+
self.evict(bucket_name)
101+
except api_exceptions.Forbidden:
102+
# On 403 (Forbidden), cache fallback values permanently to avoid retry storms
103+
self.update_cache(
104+
bucket_name, f"projects/_/buckets/{bucket_name}", "global"
105+
)
106+
except Exception as e:
107+
logger.debug(
108+
f"Background fetch for bucket metadata failed for {bucket_name}: {e}"
109+
)
110+
finally:
111+
with self._lock:
112+
self._inflight_fetches.discard(bucket_name)
113+
114+
def update_from_bucket(self, bucket):
115+
"""Update cache from a Bucket instance."""
116+
if not bucket or not bucket.name:
117+
return
118+
119+
project_number = getattr(bucket, "project_number", None)
120+
location = getattr(bucket, "location", None) or "global"
121+
location = location.lower()
122+
location_type = getattr(bucket, "location_type", None) or "region"
123+
location_type = location_type.lower()
124+
125+
if location_type in ("multi-region", "dual-region"):
126+
location = "global"
127+
128+
if project_number:
129+
destination_id = f"projects/{project_number}/buckets/{bucket.name}"
130+
else:
131+
destination_id = f"projects/_/buckets/{bucket.name}"
132+
133+
self.update_cache(bucket.name, destination_id, location)
134+
135+
def update_cache(self, bucket_name, destination_id, location):
136+
"""Thread-safely update or insert a cache entry with bounded size."""
137+
with self._lock:
138+
self._cache.put(bucket_name, (destination_id, location))
139+
140+
def evict(self, bucket_name):
141+
"""Remove a bucket from the cache (e.g., on 404)."""
142+
with self._lock:
143+
self._cache.delete(bucket_name)
144+
145+
def clear(self):
146+
"""Clear all cached metadata."""
147+
with self._lock:
148+
self._cache.clear()
149+
self._inflight_fetches.clear()
150+
self._inflight_checks.clear()
Collapse file

‎packages/google-cloud-storage/google/cloud/storage/_helpers.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/google/cloud/storage/_helpers.py
+102Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,30 @@
1919

2020
import base64
2121
import datetime
22+
import logging
2223
import os
2324
import secrets
2425
import sys
26+
from contextlib import contextmanager
2527
from hashlib import md5
2628
from urllib.parse import urlsplit, urlunsplit
2729
from uuid import uuid4
2830

31+
from google.api_core import exceptions as api_exceptions
32+
from google.cloud.exceptions import NotFound
33+
2934
from google.auth import environment_vars
3035

3136
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
3237
from google.cloud.storage.retry import (
3338
DEFAULT_RETRY,
3439
DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
3540
)
41+
from google.cloud.storage._opentelemetry_tracing import (
42+
create_trace_span as _base_create_trace_span,
43+
)
44+
45+
_logger = logging.getLogger(__name__)
3646

3747
STORAGE_EMULATOR_ENV_VAR = "STORAGE_EMULATOR_HOST" # Despite name, includes scheme.
3848
"""Environment variable defining host for Storage emulator."""
@@ -137,6 +147,62 @@ def _validate_name(name):
137147
return name
138148

139149

150+
@contextmanager
151+
def create_trace_span_helper(client, bucket_name, name, attributes=None, **kwargs):
152+
span_attrs = dict(attributes) if attributes else {}
153+
154+
if (
155+
bucket_name
156+
and isinstance(bucket_name, str)
157+
and client
158+
and hasattr(client, "_bucket_metadata_cache")
159+
and client._bucket_metadata_cache
160+
):
161+
try:
162+
if name in (
163+
"Storage.Client.getBucket",
164+
"Storage.Client.lookupBucket",
165+
"Storage.Bucket.reload",
166+
"Storage.Bucket.exists",
167+
):
168+
cached = client._bucket_metadata_cache.get(bucket_name)
169+
else:
170+
cached = client._bucket_metadata_cache.get_or_queue_fetch(bucket_name)
171+
172+
if cached and isinstance(cached, tuple) and len(cached) == 2:
173+
dest_id, loc = cached
174+
span_attrs.update(
175+
{
176+
"gcp.resource.destination.id": dest_id,
177+
"gcp.resource.destination.location": loc,
178+
}
179+
)
180+
except Exception as e:
181+
_logger.debug(f"Failed cache lookup in create_trace_span_helper: {e}")
182+
183+
if "client" not in kwargs and client:
184+
kwargs["client"] = client
185+
186+
with _base_create_trace_span(name, attributes=span_attrs, **kwargs) as span:
187+
try:
188+
yield span
189+
except (NotFound, api_exceptions.NotFound):
190+
if (
191+
bucket_name
192+
and isinstance(bucket_name, str)
193+
and client
194+
and hasattr(client, "_bucket_metadata_cache")
195+
and client._bucket_metadata_cache
196+
):
197+
try:
198+
client._bucket_metadata_cache.check_and_evict(bucket_name)
199+
except Exception as e:
200+
_logger.debug(
201+
f"Failed cache eviction on 404 in create_trace_span_helper: {e}"
202+
)
203+
raise
204+
205+
140206
class _PropertyMixin(object):
141207
"""Abstract mixin for cloud storage classes with associated properties.
142208
@@ -185,6 +251,42 @@ def _require_client(self, client):
185251
client = self.client
186252
return client
187253

254+
@contextmanager
255+
def _create_trace_span(self, name, attributes=None, **kwargs):
256+
from google.cloud.storage.blob import Blob
257+
from google.cloud.storage.bucket import Bucket
258+
259+
if isinstance(self, Bucket):
260+
client = self.client
261+
bucket_name = self.name
262+
elif isinstance(self, Blob):
263+
bucket = getattr(self, "bucket", None)
264+
client = (
265+
getattr(bucket, "client", None)
266+
if bucket and hasattr(bucket, "client")
267+
else None
268+
)
269+
bucket_name = getattr(bucket, "name", None) if bucket else None
270+
else:
271+
client = None
272+
bucket_name = None
273+
274+
if callable(bucket_name):
275+
try:
276+
bucket_name = bucket_name()
277+
except Exception as e:
278+
_logger.debug(
279+
f"Failed callable bucket_name resolution in _create_trace_span: {e}"
280+
)
281+
282+
client_override = kwargs.pop("client", None)
283+
active_client = client_override or client
284+
285+
with create_trace_span_helper(
286+
active_client, bucket_name, name, attributes=attributes, **kwargs
287+
) as span:
288+
yield span
289+
188290
def _encryption_headers(self):
189291
"""Return any encryption headers needed to fetch the object.
190292
Collapse file

‎packages/google-cloud-storage/google/cloud/storage/_http.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-storage/google/cloud/storage/_http.py
+52-3Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,20 @@
1515
"""Create / interact with Google Cloud Storage connections."""
1616

1717
import functools
18+
import logging
19+
import re
1820

21+
from google.api_core import exceptions as api_exceptions
1922
from google.cloud import _http
23+
from google.cloud.exceptions import NotFound
2024
from google.cloud.storage import __version__, _helpers
21-
from google.cloud.storage._opentelemetry_tracing import create_trace_span
25+
from google.cloud.storage._opentelemetry_tracing import (
26+
create_trace_span,
27+
enable_otel_traces,
28+
HAS_OPENTELEMETRY,
29+
)
30+
31+
logger = logging.getLogger(__name__)
2232

2333

2434
class Connection(_http.JSONConnection):
@@ -71,11 +81,30 @@ def api_request(self, *args, **kwargs):
7181
span_attributes = {
7282
"gccl-invocation-id": invocation_id,
7383
}
84+
client = self._client
85+
if (
86+
HAS_OPENTELEMETRY
87+
and enable_otel_traces
88+
and hasattr(client, "_bucket_metadata_cache")
89+
and client._bucket_metadata_cache
90+
):
91+
path = kwargs.get("path") or ""
92+
match = re.search(r"/b/([^/?#]+)", path)
93+
if match:
94+
try:
95+
cached = client._bucket_metadata_cache.get(match.group(1))
96+
if cached and isinstance(cached, tuple) and len(cached) == 2:
97+
dest_id, loc = cached
98+
span_attributes["gcp.resource.destination.id"] = dest_id
99+
span_attributes["gcp.resource.destination.location"] = loc
100+
except Exception as e:
101+
logger.debug(f"Failed cache.get_or_queue_fetch in api_request: {e}")
102+
74103
call = functools.partial(super(Connection, self).api_request, *args, **kwargs)
75104
with create_trace_span(
76105
name="Storage.Connection.api_request",
77106
attributes=span_attributes,
78-
client=self._client,
107+
client=client,
79108
api_request=kwargs,
80109
retry=retry,
81110
):
@@ -87,4 +116,24 @@ def api_request(self, *args, **kwargs):
87116
pass
88117
if retry:
89118
call = retry(call)
90-
return call()
119+
try:
120+
return call()
121+
except (NotFound, api_exceptions.NotFound):
122+
if (
123+
HAS_OPENTELEMETRY
124+
and enable_otel_traces
125+
and hasattr(client, "_bucket_metadata_cache")
126+
and client._bucket_metadata_cache
127+
):
128+
path = kwargs.get("path") or ""
129+
match = re.search(r"/b/([^/?#]+)", path)
130+
if match:
131+
try:
132+
client._bucket_metadata_cache.check_and_evict(
133+
match.group(1)
134+
)
135+
except Exception as e:
136+
logger.debug(
137+
f"Failed cache.check_and_evict on 404 in api_request: {e}"
138+
)
139+
raise

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.