Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit 24d45d0

Browse filesBrowse files
authored
fix: updates timeout/retry code to respect hanging server (#2408)
**Description** This PR fixes a crash when handling `_InactiveRpcError` during retry logic and ensures proper `timeout` propagation in `RowIterator.to_dataframe`. **Fixes** **Retry Logic Crash**: Addressed an issue in `google/cloud/bigquery/retry.py` where `_should_retry` would raise a `TypeError` when inspecting unstructured `gRPC` errors (like `_InactiveRpcError`). The fix adds robust error inspection to fallback gracefully when `exc.errors` is not subscriptable. **Timeout Propagation**: Added the missing `timeout` parameter to `RowIterator.to_dataframe` in `google/cloud/bigquery/table.py`. This ensures that the user-specified `timeout` is correctly passed down to the underlying `to_arrow` call, preventing the client from hanging indefinitely when the Storage API is unresponsive. **Changes** Modified `google/cloud/bigquery/retry.py`: Updated `_should_retry` to handle `TypeError` and `KeyError` when accessing `exc.errors`. Modified `google/cloud/bigquery/table.py`: Updated `RowIterator.to_dataframe` signature and implementation to accept and pass the `timeout` parameter. The first half of this work was completed in PR #2354
1 parent 7b8ceea commit 24d45d0
Copy full SHA for 24d45d0

File tree

Expand file treeCollapse file tree

9 files changed

+214
-17
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

9 files changed

+214
-17
lines changed
Open diff view settings
Collapse file

‎google/cloud/bigquery/_pandas_helpers.py‎

Copy file name to clipboardExpand all lines: google/cloud/bigquery/_pandas_helpers.py
+40-9Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
from google.cloud.bigquery import _pyarrow_helpers
3535
from google.cloud.bigquery import _versions_helpers
36+
from google.cloud.bigquery import retry as bq_retry
3637
from google.cloud.bigquery import schema
3738

3839

@@ -740,7 +741,7 @@ def _row_iterator_page_to_arrow(page, column_names, arrow_types):
740741
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)
741742

742743

743-
def download_arrow_row_iterator(pages, bq_schema):
744+
def download_arrow_row_iterator(pages, bq_schema, timeout=None):
744745
"""Use HTTP JSON RowIterator to construct an iterable of RecordBatches.
745746
746747
Args:
@@ -751,6 +752,10 @@ def download_arrow_row_iterator(pages, bq_schema):
751752
Mapping[str, Any] \
752753
]]):
753754
A decription of the fields in result pages.
755+
timeout (Optional[float]):
756+
The number of seconds to wait for the underlying download to complete.
757+
If ``None``, wait indefinitely.
758+
754759
Yields:
755760
:class:`pyarrow.RecordBatch`
756761
The next page of records as a ``pyarrow`` record batch.
@@ -759,8 +764,16 @@ def download_arrow_row_iterator(pages, bq_schema):
759764
column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
760765
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]
761766

762-
for page in pages:
763-
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
767+
if timeout is None:
768+
for page in pages:
769+
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
770+
else:
771+
start_time = time.monotonic()
772+
for page in pages:
773+
if time.monotonic() - start_time > timeout:
774+
raise concurrent.futures.TimeoutError()
775+
776+
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
764777

765778

766779
def _row_iterator_page_to_dataframe(page, column_names, dtypes):
@@ -778,7 +791,7 @@ def _row_iterator_page_to_dataframe(page, column_names, dtypes):
778791
return pandas.DataFrame(columns, columns=column_names)
779792

780793

781-
def download_dataframe_row_iterator(pages, bq_schema, dtypes):
794+
def download_dataframe_row_iterator(pages, bq_schema, dtypes, timeout=None):
782795
"""Use HTTP JSON RowIterator to construct a DataFrame.
783796
784797
Args:
@@ -792,14 +805,27 @@ def download_dataframe_row_iterator(pages, bq_schema, dtypes):
792805
dtypes(Mapping[str, numpy.dtype]):
793806
The types of columns in result data to hint construction of the
794807
resulting DataFrame. Not all column types have to be specified.
808+
timeout (Optional[float]):
809+
The number of seconds to wait for the underlying download to complete.
810+
If ``None``, wait indefinitely.
811+
795812
Yields:
796813
:class:`pandas.DataFrame`
797814
The next page of records as a ``pandas.DataFrame`` record batch.
798815
"""
799816
bq_schema = schema._to_schema_fields(bq_schema)
800817
column_names = [field.name for field in bq_schema]
801-
for page in pages:
802-
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
818+
819+
if timeout is None:
820+
for page in pages:
821+
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
822+
else:
823+
start_time = time.monotonic()
824+
for page in pages:
825+
if time.monotonic() - start_time > timeout:
826+
raise concurrent.futures.TimeoutError()
827+
828+
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
803829

804830

805831
def _bqstorage_page_to_arrow(page):
@@ -928,6 +954,7 @@ def _download_table_bqstorage(
928954
if "@" in table.table_id:
929955
raise ValueError("Reading from a specific snapshot is not currently supported.")
930956

957+
start_time = time.monotonic()
931958
requested_streams = determine_requested_streams(preserve_order, max_stream_count)
932959

933960
requested_session = bigquery_storage.types.stream.ReadSession(
@@ -944,10 +971,16 @@ def _download_table_bqstorage(
944971
ArrowSerializationOptions.CompressionCodec(1)
945972
)
946973

974+
retry_policy = (
975+
bq_retry.DEFAULT_RETRY.with_deadline(timeout) if timeout is not None else None
976+
)
977+
947978
session = bqstorage_client.create_read_session(
948979
parent="projects/{}".format(project_id),
949980
read_session=requested_session,
950981
max_stream_count=requested_streams,
982+
retry=retry_policy,
983+
timeout=timeout,
951984
)
952985

953986
_LOGGER.debug(
@@ -983,8 +1016,6 @@ def _download_table_bqstorage(
9831016
# Manually manage the pool to control shutdown behavior on timeout.
9841017
pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams))
9851018
wait_on_shutdown = True
986-
start_time = time.time()
987-
9881019
try:
9891020
# Manually submit jobs and wait for download to complete rather
9901021
# than using pool.map because pool.map continues running in the
@@ -1006,7 +1037,7 @@ def _download_table_bqstorage(
10061037
while not_done:
10071038
# Check for timeout
10081039
if timeout is not None:
1009-
elapsed = time.time() - start_time
1040+
elapsed = time.monotonic() - start_time
10101041
if elapsed > timeout:
10111042
wait_on_shutdown = False
10121043
raise concurrent.futures.TimeoutError(
Collapse file

‎google/cloud/bigquery/dbapi/cursor.py‎

Copy file name to clipboardExpand all lines: google/cloud/bigquery/dbapi/cursor.py
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,8 @@ def _bqstorage_fetch(self, bqstorage_client):
323323
read_session=requested_session,
324324
# a single stream only, as DB API is not well-suited for multithreading
325325
max_stream_count=1,
326+
retry=None,
327+
timeout=None,
326328
)
327329

328330
if not read_session.streams:
Collapse file

‎google/cloud/bigquery/retry.py‎

Copy file name to clipboardExpand all lines: google/cloud/bigquery/retry.py
+11-5Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import logging
16+
1517
from google.api_core import exceptions
1618
from google.api_core import retry
1719
import google.api_core.future.polling
1820
from google.auth import exceptions as auth_exceptions # type: ignore
1921
import requests.exceptions
2022

23+
_LOGGER = logging.getLogger(__name__)
2124

2225
_RETRYABLE_REASONS = frozenset(
2326
["rateLimitExceeded", "backendError", "internalError", "badGateway"]
@@ -61,14 +64,17 @@
6164
def _should_retry(exc):
6265
"""Predicate for determining when to retry.
6366
64-
We retry if and only if the 'reason' is 'backendError'
65-
or 'rateLimitExceeded'.
67+
We retry if and only if the 'reason' is in _RETRYABLE_REASONS or is
68+
in _UNSTRUCTURED_RETRYABLE_TYPES.
6669
"""
67-
if not hasattr(exc, "errors") or len(exc.errors) == 0:
68-
# Check for unstructured error returns, e.g. from GFE
70+
try:
71+
reason = exc.errors[0]["reason"]
72+
except (AttributeError, IndexError, TypeError, KeyError):
73+
# Fallback for when errors attribute is missing, empty, or not a dict
74+
# or doesn't contain "reason" (e.g. gRPC exceptions).
75+
_LOGGER.debug("Inspecting unstructured error for retry: %r", exc)
6976
return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES)
7077

71-
reason = exc.errors[0]["reason"]
7278
return reason in _RETRYABLE_REASONS
7379

7480

Collapse file

‎google/cloud/bigquery/table.py‎

Copy file name to clipboardExpand all lines: google/cloud/bigquery/table.py
+5-1Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2152,7 +2152,10 @@ def to_arrow_iterable(
21522152
timeout=timeout,
21532153
)
21542154
tabledata_list_download = functools.partial(
2155-
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
2155+
_pandas_helpers.download_arrow_row_iterator,
2156+
iter(self.pages),
2157+
self.schema,
2158+
timeout=timeout,
21562159
)
21572160
return self._to_page_iterable(
21582161
bqstorage_download,
@@ -2366,6 +2369,7 @@ def to_dataframe_iterable(
23662369
iter(self.pages),
23672370
self.schema,
23682371
dtypes,
2372+
timeout=timeout,
23692373
)
23702374
return self._to_page_iterable(
23712375
bqstorage_download,
Collapse file

‎tests/unit/job/test_query_pandas.py‎

Copy file name to clipboardExpand all lines: tests/unit/job/test_query_pandas.py
+6Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg):
179179
parent="projects/test-project",
180180
read_session=expected_session,
181181
max_stream_count=1, # Use a single stream to preserve row order.
182+
retry=None,
183+
timeout=None,
182184
)
183185

184186

@@ -593,6 +595,8 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg):
593595
parent="projects/bqstorage-billing-project",
594596
read_session=expected_session,
595597
max_stream_count=0, # Use default number of streams for best performance.
598+
retry=None,
599+
timeout=None,
596600
)
597601
bqstorage_client.read_rows.assert_called_once_with(stream_id)
598602

@@ -644,6 +648,8 @@ def test_to_dataframe_bqstorage_no_pyarrow_compression():
644648
parent="projects/bqstorage-billing-project",
645649
read_session=expected_session,
646650
max_stream_count=0,
651+
retry=None,
652+
timeout=None,
647653
)
648654

649655

Collapse file

‎tests/unit/test__pandas_helpers.py‎

Copy file name to clipboardExpand all lines: tests/unit/test__pandas_helpers.py
+131Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2252,3 +2252,134 @@ def fast_download_stream(
22522252
results = list(result_gen)
22532253

22542254
assert results == ["result_page"]
2255+
2256+
2257+
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
2258+
@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
2259+
@pytest.mark.parametrize(
2260+
"sleep_time, timeout, should_timeout",
2261+
[
2262+
(0.1, 0.05, True), # Timeout case
2263+
(0, 10.0, False), # Success case
2264+
],
2265+
)
2266+
def test_download_arrow_row_iterator_with_timeout(
2267+
module_under_test, sleep_time, timeout, should_timeout
2268+
):
2269+
bq_schema = [schema.SchemaField("name", "STRING")]
2270+
2271+
# Mock page with to_arrow method
2272+
mock_page = mock.Mock()
2273+
mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays(
2274+
[pyarrow.array(["foo"])],
2275+
names=["name"],
2276+
)
2277+
mock_page.__iter__ = lambda self: iter(["row1"])
2278+
mock_page._columns = [["foo"]]
2279+
2280+
def pages_gen():
2281+
# First page yields quickly
2282+
yield mock_page
2283+
if sleep_time > 0:
2284+
time.sleep(sleep_time)
2285+
yield mock_page
2286+
2287+
iterator = module_under_test.download_arrow_row_iterator(
2288+
pages_gen(), bq_schema, timeout=timeout
2289+
)
2290+
2291+
# First item should always succeed
2292+
next(iterator)
2293+
2294+
if should_timeout:
2295+
with pytest.raises(concurrent.futures.TimeoutError):
2296+
next(iterator)
2297+
else:
2298+
# Should succeed and complete
2299+
results = list(iterator)
2300+
assert len(results) == 1 # 1 remaining item
2301+
2302+
2303+
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
2304+
@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
2305+
@pytest.mark.parametrize(
2306+
"sleep_time, timeout, should_timeout",
2307+
[
2308+
(0.1, 0.05, True), # Timeout case
2309+
(0, 10.0, False), # Success case
2310+
],
2311+
)
2312+
def test_download_dataframe_row_iterator_with_timeout(
2313+
module_under_test, sleep_time, timeout, should_timeout
2314+
):
2315+
bq_schema = [schema.SchemaField("name", "STRING")]
2316+
dtypes = {}
2317+
2318+
# Mock page
2319+
mock_page = mock.Mock()
2320+
# Mock iterator for _row_iterator_page_to_dataframe checking next(iter(page))
2321+
mock_page.__iter__ = lambda self: iter(["row1"])
2322+
mock_page._columns = [["foo"]]
2323+
2324+
def pages_gen():
2325+
yield mock_page
2326+
if sleep_time > 0:
2327+
time.sleep(sleep_time)
2328+
yield mock_page
2329+
2330+
iterator = module_under_test.download_dataframe_row_iterator(
2331+
pages_gen(), bq_schema, dtypes, timeout=timeout
2332+
)
2333+
2334+
next(iterator)
2335+
2336+
if should_timeout:
2337+
with pytest.raises(concurrent.futures.TimeoutError):
2338+
next(iterator)
2339+
else:
2340+
results = list(iterator)
2341+
assert len(results) == 1
2342+
2343+
2344+
@pytest.mark.skipif(
2345+
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
2346+
)
2347+
def test_download_arrow_bqstorage_passes_timeout_to_create_read_session(
2348+
module_under_test,
2349+
):
2350+
# Mock dependencies
2351+
project_id = "test-project"
2352+
table = mock.Mock()
2353+
table.table_id = "test_table"
2354+
table.to_bqstorage.return_value = "projects/test/datasets/test/tables/test"
2355+
2356+
bqstorage_client = mock.create_autospec(
2357+
bigquery_storage.BigQueryReadClient, instance=True
2358+
)
2359+
# Mock create_read_session to return a session with no streams so the function returns early
2360+
# (Checking start of loop logic vs empty streams return)
2361+
session = mock.Mock()
2362+
# If streams is empty, _download_table_bqstorage returns early, which is fine for this test
2363+
session.streams = []
2364+
bqstorage_client.create_read_session.return_value = session
2365+
2366+
# Call the function
2367+
timeout = 123.456
2368+
# download_arrow_bqstorage yields frames, so we need to iterate to trigger execution
2369+
list(
2370+
module_under_test.download_arrow_bqstorage(
2371+
project_id, table, bqstorage_client, timeout=timeout
2372+
)
2373+
)
2374+
2375+
# Verify timeout and retry were passed
2376+
bqstorage_client.create_read_session.assert_called_once()
2377+
_, kwargs = bqstorage_client.create_read_session.call_args
2378+
assert "timeout" in kwargs
2379+
assert kwargs["timeout"] == timeout
2380+
2381+
assert "retry" in kwargs
2382+
retry_policy = kwargs["retry"]
2383+
assert retry_policy is not None
2384+
# Check if deadline is set correctly in the retry policy
2385+
assert retry_policy._deadline == timeout
Collapse file

‎tests/unit/test_client_retry.py‎

Copy file name to clipboardExpand all lines: tests/unit/test_client_retry.py
+6-1Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323

2424
PROJECT = "test-project"
2525

26+
# A deadline > 1.0s is required because the default retry (google.api_core.retry.Retry)
27+
# has an initial delay of 1.0s. If the deadline is <= 1.0s, the first retry attempt
28+
# (scheduled for now + 1.0s) will be rejected immediately as exceeding the deadline.
29+
_RETRY_DEADLINE = 10.0
30+
2631

2732
def _make_credentials():
2833
import google.auth.credentials
@@ -83,7 +88,7 @@ def test_call_api_applying_custom_retry_on_timeout(global_time_lock):
8388
"api_request",
8489
side_effect=[TimeoutError, "result"],
8590
)
86-
retry = DEFAULT_RETRY.with_deadline(1).with_predicate(
91+
retry = DEFAULT_RETRY.with_deadline(_RETRY_DEADLINE).with_predicate(
8792
lambda exc: isinstance(exc, TimeoutError)
8893
)
8994

Collapse file

‎tests/unit/test_dbapi_cursor.py‎

Copy file name to clipboardExpand all lines: tests/unit/test_dbapi_cursor.py
+5-1Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,11 @@ def fake_ensure_bqstorage_client(bqstorage_client=None, **kwargs):
480480
data_format=bigquery_storage.DataFormat.ARROW,
481481
)
482482
mock_bqstorage_client.create_read_session.assert_called_once_with(
483-
parent="projects/P", read_session=expected_session, max_stream_count=1
483+
parent="projects/P",
484+
read_session=expected_session,
485+
max_stream_count=1,
486+
retry=None,
487+
timeout=None,
484488
)
485489

486490
# Check the data returned.

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.