diff --git a/.librarian/state.yaml b/.librarian/state.yaml index 71bcf16ad..efce633f2 100644 --- a/.librarian/state.yaml +++ b/.librarian/state.yaml @@ -1,7 +1,7 @@ image: us-central1-docker.pkg.dev/cloud-sdk-librarian-prod/images-prod/python-librarian-generator@sha256:c8612d3fffb3f6a32353b2d1abd16b61e87811866f7ec9d65b59b02eb452a620 libraries: - id: google-cloud-bigquery - version: 3.40.0 + version: 3.40.1 last_generated_commit: "" apis: [] source_roots: diff --git a/CHANGELOG.md b/CHANGELOG.md index 242165933..083dbfc4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,19 @@ [1]: https://pypi.org/project/google-cloud-bigquery/#history +## [3.40.1](https://github.com/googleapis/google-cloud-python/compare/google-cloud-bigquery-v3.40.0...google-cloud-bigquery-v3.40.1) (2026-02-12) + + +### Documentation + +* clarify that only jobs.query and jobs.getQueryResults are affec… (#2349) ([73228432a3c821db05d898ea4a4788adf15b033d](https://github.com/googleapis/google-cloud-python/commit/73228432a3c821db05d898ea4a4788adf15b033d)) + + +### Bug Fixes + +* updates timeout/retry code to respect hanging server (#2408) ([24d45d0d5bf89762f253ba6bd6fdbee9d5993422](https://github.com/googleapis/google-cloud-python/commit/24d45d0d5bf89762f253ba6bd6fdbee9d5993422)) +* add timeout parameter to to_dataframe and to_arrow met… (#2354) ([4f67ba20b49159e81f645ed98e401b9bb1359c1a](https://github.com/googleapis/google-cloud-python/commit/4f67ba20b49159e81f645ed98e401b9bb1359c1a)) + ## [3.40.0](https://github.com/googleapis/google-cloud-python/compare/google-cloud-bigquery-v3.39.0...google-cloud-bigquery-v3.40.0) (2026-01-08) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 2dab03a06..7bd9f99b6 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -26,12 +26,14 @@ import logging import queue import threading +import time import warnings from typing import Any, Union, Optional, Callable, Generator, List from google.cloud.bigquery import _pyarrow_helpers from google.cloud.bigquery import _versions_helpers +from google.cloud.bigquery import retry as bq_retry from google.cloud.bigquery import schema @@ -739,7 +741,7 @@ def _row_iterator_page_to_arrow(page, column_names, arrow_types): return pyarrow.RecordBatch.from_arrays(arrays, names=column_names) -def download_arrow_row_iterator(pages, bq_schema): +def download_arrow_row_iterator(pages, bq_schema, timeout=None): """Use HTTP JSON RowIterator to construct an iterable of RecordBatches. Args: @@ -750,6 +752,10 @@ def download_arrow_row_iterator(pages, bq_schema): Mapping[str, Any] \ ]]): A decription of the fields in result pages. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Yields: :class:`pyarrow.RecordBatch` The next page of records as a ``pyarrow`` record batch. @@ -758,8 +764,16 @@ def download_arrow_row_iterator(pages, bq_schema): column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema] arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema] - for page in pages: - yield _row_iterator_page_to_arrow(page, column_names, arrow_types) + if timeout is None: + for page in pages: + yield _row_iterator_page_to_arrow(page, column_names, arrow_types) + else: + start_time = time.monotonic() + for page in pages: + if time.monotonic() - start_time > timeout: + raise concurrent.futures.TimeoutError() + + yield _row_iterator_page_to_arrow(page, column_names, arrow_types) def _row_iterator_page_to_dataframe(page, column_names, dtypes): @@ -777,7 +791,7 @@ def _row_iterator_page_to_dataframe(page, column_names, dtypes): return pandas.DataFrame(columns, columns=column_names) -def download_dataframe_row_iterator(pages, bq_schema, dtypes): +def download_dataframe_row_iterator(pages, bq_schema, dtypes, timeout=None): """Use HTTP JSON RowIterator to construct a DataFrame. Args: @@ -791,14 +805,27 @@ def download_dataframe_row_iterator(pages, bq_schema, dtypes): dtypes(Mapping[str, numpy.dtype]): The types of columns in result data to hint construction of the resulting DataFrame. Not all column types have to be specified. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Yields: :class:`pandas.DataFrame` The next page of records as a ``pandas.DataFrame`` record batch. """ bq_schema = schema._to_schema_fields(bq_schema) column_names = [field.name for field in bq_schema] - for page in pages: - yield _row_iterator_page_to_dataframe(page, column_names, dtypes) + + if timeout is None: + for page in pages: + yield _row_iterator_page_to_dataframe(page, column_names, dtypes) + else: + start_time = time.monotonic() + for page in pages: + if time.monotonic() - start_time > timeout: + raise concurrent.futures.TimeoutError() + + yield _row_iterator_page_to_dataframe(page, column_names, dtypes) def _bqstorage_page_to_arrow(page): @@ -869,6 +896,7 @@ def _download_table_bqstorage( max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT, max_stream_count: Optional[int] = None, download_state: Optional[_DownloadState] = None, + timeout: Optional[float] = None, ) -> Generator[Any, None, None]: """Downloads a BigQuery table using the BigQuery Storage API. @@ -899,6 +927,9 @@ def _download_table_bqstorage( download_state (Optional[_DownloadState]): A threadsafe state object which can be used to observe the behavior of the worker threads created by this method. + timeout (Optional[float]): + The number of seconds to wait for the download to complete. + If None, wait indefinitely. Yields: pandas.DataFrame: Pandas DataFrames, one for each chunk of data @@ -906,6 +937,8 @@ def _download_table_bqstorage( Raises: ValueError: If attempting to read from a specific partition or snapshot. + concurrent.futures.TimeoutError: + If the download does not complete within the specified timeout. Note: This method requires the `google-cloud-bigquery-storage` library @@ -921,6 +954,7 @@ def _download_table_bqstorage( if "@" in table.table_id: raise ValueError("Reading from a specific snapshot is not currently supported.") + start_time = time.monotonic() requested_streams = determine_requested_streams(preserve_order, max_stream_count) requested_session = bigquery_storage.types.stream.ReadSession( @@ -937,10 +971,16 @@ def _download_table_bqstorage( ArrowSerializationOptions.CompressionCodec(1) ) + retry_policy = ( + bq_retry.DEFAULT_RETRY.with_deadline(timeout) if timeout is not None else None + ) + session = bqstorage_client.create_read_session( parent="projects/{}".format(project_id), read_session=requested_session, max_stream_count=requested_streams, + retry=retry_policy, + timeout=timeout, ) _LOGGER.debug( @@ -973,60 +1013,71 @@ def _download_table_bqstorage( worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size) - with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: - try: - # Manually submit jobs and wait for download to complete rather - # than using pool.map because pool.map continues running in the - # background even if there is an exception on the main thread. - # See: https://github.com/googleapis/google-cloud-python/pull/7698 - not_done = [ - pool.submit( - _download_table_bqstorage_stream, - download_state, - bqstorage_client, - session, - stream, - worker_queue, - page_to_item, - ) - for stream in session.streams - ] - - while not_done: - # Don't block on the worker threads. For performance reasons, - # we want to block on the queue's get method, instead. This - # prevents the queue from filling up, because the main thread - # has smaller gaps in time between calls to the queue's get - # method. For a detailed explanation, see: - # https://friendliness.dev/2019/06/18/python-nowait/ - done, not_done = _nowait(not_done) - for future in done: - # Call result() on any finished threads to raise any - # exceptions encountered. - future.result() - - try: - frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) - yield frame - except queue.Empty: # pragma: NO COVER - continue + # Manually manage the pool to control shutdown behavior on timeout. + pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams)) + wait_on_shutdown = True + try: + # Manually submit jobs and wait for download to complete rather + # than using pool.map because pool.map continues running in the + # background even if there is an exception on the main thread. + # See: https://github.com/googleapis/google-cloud-python/pull/7698 + not_done = [ + pool.submit( + _download_table_bqstorage_stream, + download_state, + bqstorage_client, + session, + stream, + worker_queue, + page_to_item, + ) + for stream in session.streams + ] + + while not_done: + # Check for timeout + if timeout is not None: + elapsed = time.monotonic() - start_time + if elapsed > timeout: + wait_on_shutdown = False + raise concurrent.futures.TimeoutError( + f"Download timed out after {timeout} seconds." + ) + + # Don't block on the worker threads. For performance reasons, + # we want to block on the queue's get method, instead. This + # prevents the queue from filling up, because the main thread + # has smaller gaps in time between calls to the queue's get + # method. For a detailed explanation, see: + # https://friendliness.dev/2019/06/18/python-nowait/ + done, not_done = _nowait(not_done) + for future in done: + # Call result() on any finished threads to raise any + # exceptions encountered. + future.result() + + try: + frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) + yield frame + except queue.Empty: # pragma: NO COVER + continue - # Return any remaining values after the workers finished. - while True: # pragma: NO COVER - try: - frame = worker_queue.get_nowait() - yield frame - except queue.Empty: # pragma: NO COVER - break - finally: - # No need for a lock because reading/replacing a variable is - # defined to be an atomic operation in the Python language - # definition (enforced by the global interpreter lock). - download_state.done = True + # Return any remaining values after the workers finished. + while True: # pragma: NO COVER + try: + frame = worker_queue.get_nowait() + yield frame + except queue.Empty: # pragma: NO COVER + break + finally: + # No need for a lock because reading/replacing a variable is + # defined to be an atomic operation in the Python language + # definition (enforced by the global interpreter lock). + download_state.done = True - # Shutdown all background threads, now that they should know to - # exit early. - pool.shutdown(wait=True) + # Shutdown all background threads, now that they should know to + # exit early. + pool.shutdown(wait=wait_on_shutdown) def download_arrow_bqstorage( @@ -1037,6 +1088,7 @@ def download_arrow_bqstorage( selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, max_stream_count=None, + timeout=None, ): return _download_table_bqstorage( project_id, @@ -1047,6 +1099,7 @@ def download_arrow_bqstorage( page_to_item=_bqstorage_page_to_arrow, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) @@ -1060,6 +1113,7 @@ def download_dataframe_bqstorage( selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, max_stream_count=None, + timeout=None, ): page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) return _download_table_bqstorage( @@ -1071,6 +1125,7 @@ def download_dataframe_bqstorage( page_to_item=page_to_item, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index e3a3cdb11..54c8886cd 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -3655,6 +3655,11 @@ def query_and_wait( page_size (Optional[int]): The maximum number of rows in each page of results from the initial jobs.query request. Non-positive values are ignored. + + This parameter only affects the jobs.query and + jobs.getQueryResults API calls. Large results downloaded with + the BigQuery Storage Read API are intentionally unaffected + by this parameter. max_results (Optional[int]): The maximum total number of rows from this request. diff --git a/google/cloud/bigquery/dbapi/cursor.py b/google/cloud/bigquery/dbapi/cursor.py index 014a6825e..bffd7678f 100644 --- a/google/cloud/bigquery/dbapi/cursor.py +++ b/google/cloud/bigquery/dbapi/cursor.py @@ -323,6 +323,8 @@ def _bqstorage_fetch(self, bqstorage_client): read_session=requested_session, # a single stream only, as DB API is not well-suited for multithreading max_stream_count=1, + retry=None, + timeout=None, ) if not read_session.streams: diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 38b8a7148..e82deb1ef 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1857,6 +1857,7 @@ def to_arrow( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, create_bqstorage_client: bool = True, max_results: Optional[int] = None, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1904,6 +1905,10 @@ def to_arrow( .. versionadded:: 2.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pyarrow.Table A :class:`pyarrow.Table` populated with row data and column @@ -1921,6 +1926,7 @@ def to_arrow( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, + timeout=timeout, ) # If changing the signature of this method, make sure to apply the same @@ -1949,6 +1955,7 @@ def to_dataframe( range_timestamp_dtype: Union[ Any, None ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Return a pandas DataFrame from a QueryJob @@ -2141,6 +2148,10 @@ def to_dataframe( .. versionadded:: 3.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A :class:`~pandas.DataFrame` populated with row data @@ -2174,6 +2185,7 @@ def to_dataframe( range_date_dtype=range_date_dtype, range_datetime_dtype=range_datetime_dtype, range_timestamp_dtype=range_timestamp_dtype, + timeout=timeout, ) # If changing the signature of this method, make sure to apply the same @@ -2191,6 +2203,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "geopandas.GeoDataFrame": """Return a GeoPandas GeoDataFrame from a QueryJob @@ -2269,6 +2282,9 @@ def to_geodataframe( then the data type will be ``numpy.dtype("object")``. BigQuery String type can be found at: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. Returns: geopandas.GeoDataFrame: @@ -2296,6 +2312,7 @@ def to_geodataframe( int_dtype=int_dtype, float_dtype=float_dtype, string_dtype=string_dtype, + timeout=timeout, ) def __iter__(self): diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index 19012efd6..6fd458df5 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from google.api_core import exceptions from google.api_core import retry import google.api_core.future.polling from google.auth import exceptions as auth_exceptions # type: ignore import requests.exceptions +_LOGGER = logging.getLogger(__name__) _RETRYABLE_REASONS = frozenset( ["rateLimitExceeded", "backendError", "internalError", "badGateway"] @@ -61,14 +64,17 @@ def _should_retry(exc): """Predicate for determining when to retry. - We retry if and only if the 'reason' is 'backendError' - or 'rateLimitExceeded'. + We retry if and only if the 'reason' is in _RETRYABLE_REASONS or is + in _UNSTRUCTURED_RETRYABLE_TYPES. """ - if not hasattr(exc, "errors") or len(exc.errors) == 0: - # Check for unstructured error returns, e.g. from GFE + try: + reason = exc.errors[0]["reason"] + except (AttributeError, IndexError, TypeError, KeyError): + # Fallback for when errors attribute is missing, empty, or not a dict + # or doesn't contain "reason" (e.g. gRPC exceptions). + _LOGGER.debug("Inspecting unstructured error for retry: %r", exc) return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES) - reason = exc.errors[0]["reason"] return reason in _RETRYABLE_REASONS diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 5efcb1958..88b673a8b 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -2087,6 +2087,7 @@ def to_arrow_iterable( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pyarrow.RecordBatch"]: """[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream. @@ -2127,6 +2128,10 @@ def to_arrow_iterable( setting this parameter value to a value > 0 can help reduce system resource consumption. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pyarrow.RecordBatch: A generator of :class:`~pyarrow.RecordBatch`. @@ -2144,9 +2149,13 @@ def to_arrow_iterable( selected_fields=self._selected_fields, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) tabledata_list_download = functools.partial( - _pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema + _pandas_helpers.download_arrow_row_iterator, + iter(self.pages), + self.schema, + timeout=timeout, ) return self._to_page_iterable( bqstorage_download, @@ -2161,6 +2170,7 @@ def to_arrow( progress_bar_type: Optional[str] = None, bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, create_bqstorage_client: bool = True, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -2202,6 +2212,9 @@ def to_arrow( This argument does nothing if ``bqstorage_client`` is supplied. .. versionadded:: 1.24.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. Returns: pyarrow.Table @@ -2236,7 +2249,7 @@ def to_arrow( record_batches = [] for record_batch in self.to_arrow_iterable( - bqstorage_client=bqstorage_client + bqstorage_client=bqstorage_client, timeout=timeout ): record_batches.append(record_batch) @@ -2271,6 +2284,7 @@ def to_dataframe_iterable( dtypes: Optional[Dict[str, Any]] = None, max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -2317,6 +2331,10 @@ def to_dataframe_iterable( setting this parameter value to a value > 0 can help reduce system resource consumption. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A generator of :class:`~pandas.DataFrame`. @@ -2344,12 +2362,14 @@ def to_dataframe_iterable( selected_fields=self._selected_fields, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) tabledata_list_download = functools.partial( _pandas_helpers.download_dataframe_row_iterator, iter(self.pages), self.schema, dtypes, + timeout=timeout, ) return self._to_page_iterable( bqstorage_download, @@ -2381,6 +2401,7 @@ def to_dataframe( range_timestamp_dtype: Union[ Any, None ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create a pandas DataFrame by loading all pages of a query. @@ -2577,6 +2598,10 @@ def to_dataframe( .. versionadded:: 3.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A :class:`~pandas.DataFrame` populated with row data and column @@ -2690,6 +2715,7 @@ def to_dataframe( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, + timeout=timeout, ) # Default date dtype is `db_dtypes.DateDtype()` that could cause out of bounds error, @@ -2768,6 +2794,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "geopandas.GeoDataFrame": """Create a GeoPandas GeoDataFrame by loading all pages of a query. @@ -2902,6 +2929,7 @@ def to_geodataframe( int_dtype=int_dtype, float_dtype=float_dtype, string_dtype=string_dtype, + timeout=timeout, ) return geopandas.GeoDataFrame( @@ -2917,9 +2945,6 @@ class _EmptyRowIterator(RowIterator): statements. """ - pages = () - total_rows = 0 - def __init__( self, client=None, api_request=None, path=None, schema=(), *args, **kwargs ): @@ -2931,12 +2956,14 @@ def __init__( *args, **kwargs, ) + self._total_rows = 0 def to_arrow( self, progress_bar_type=None, bqstorage_client=None, create_bqstorage_client=True, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create an empty class:`pyarrow.Table`. @@ -2944,6 +2971,7 @@ def to_arrow( progress_bar_type (str): Ignored. Added for compatibility with RowIterator. bqstorage_client (Any): Ignored. Added for compatibility with RowIterator. create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pyarrow.Table: An empty :class:`pyarrow.Table`. @@ -2970,6 +2998,7 @@ def to_dataframe( range_date_dtype=None, range_datetime_dtype=None, range_timestamp_dtype=None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an empty dataframe. @@ -2990,6 +3019,7 @@ def to_dataframe( range_date_dtype (Any): Ignored. Added for compatibility with RowIterator. range_datetime_dtype (Any): Ignored. Added for compatibility with RowIterator. range_timestamp_dtype (Any): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. @@ -3008,6 +3038,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an empty dataframe. @@ -3021,6 +3052,7 @@ def to_geodataframe( int_dtype (Any): Ignored. Added for compatibility with RowIterator. float_dtype (Any): Ignored. Added for compatibility with RowIterator. string_dtype (Any): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. @@ -3038,6 +3070,7 @@ def to_dataframe_iterable( dtypes: Optional[Dict[str, Any]] = None, max_queue_size: Optional[int] = None, max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pandas.DataFrame"]: """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -3056,6 +3089,9 @@ def to_dataframe_iterable( max_stream_count: Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): + Ignored. Added for compatibility with RowIterator. + Returns: An iterator yielding a single empty :class:`~pandas.DataFrame`. @@ -3071,6 +3107,7 @@ def to_arrow_iterable( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, max_queue_size: Optional[int] = None, max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pyarrow.RecordBatch"]: """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -3086,6 +3123,9 @@ def to_arrow_iterable( max_stream_count: Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): + Ignored. Added for compatibility with RowIterator. + Returns: An iterator yielding a single empty :class:`~pyarrow.RecordBatch`. """ diff --git a/google/cloud/bigquery/version.py b/google/cloud/bigquery/version.py index 6b0fa0fba..2519009bf 100644 --- a/google/cloud/bigquery/version.py +++ b/google/cloud/bigquery/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "3.40.0" +__version__ = "3.40.1" diff --git a/samples/geography/requirements.txt b/samples/geography/requirements.txt index ec5c7f2af..5f4d686b3 100644 --- a/samples/geography/requirements.txt +++ b/samples/geography/requirements.txt @@ -10,7 +10,7 @@ db-dtypes==1.4.3 Fiona==1.10.1 geojson==3.2.0 geopandas===1.0.1; python_version <= '3.9' -geopandas==1.1.1; python_version >= '3.10' +geopandas==1.1.2; python_version >= '3.10' google-api-core==2.25.2 google-auth==2.41.1 google-cloud-bigquery==3.38.0 @@ -27,7 +27,7 @@ packaging==25.0 pandas==2.3.3 proto-plus==1.26.1 pyarrow==21.0.0 -pyasn1==0.6.1 +pyasn1==0.6.2 pyasn1-modules==0.4.2 pycparser==2.23 pyparsing==3.2.5 @@ -41,4 +41,4 @@ Shapely==2.1.2; python_version >= '3.10' six==1.17.0 typing-extensions==4.15.0 typing-inspect==0.9.0 -urllib3==2.6.0 +urllib3==2.6.3 diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index a6c59b158..e0e0438f5 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -179,6 +179,8 @@ def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg): parent="projects/test-project", read_session=expected_session, max_stream_count=1, # Use a single stream to preserve row order. + retry=None, + timeout=None, ) @@ -593,6 +595,8 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg): parent="projects/bqstorage-billing-project", read_session=expected_session, max_stream_count=0, # Use default number of streams for best performance. + retry=None, + timeout=None, ) bqstorage_client.read_rows.assert_called_once_with(stream_id) @@ -644,6 +648,8 @@ def test_to_dataframe_bqstorage_no_pyarrow_compression(): parent="projects/bqstorage-billing-project", read_session=expected_session, max_stream_count=0, + retry=None, + timeout=None, ) @@ -1023,5 +1029,38 @@ def test_query_job_to_geodataframe_delegation(wait_for_query): int_dtype=DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) assert df is row_iterator.to_geodataframe.return_value + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@mock.patch("google.cloud.bigquery.job.query.wait_for_query") +def test_query_job_to_dataframe_delegation(wait_for_query): + job = _make_job() + bqstorage_client = object() + timeout = 123.45 + + job.to_dataframe(bqstorage_client=bqstorage_client, timeout=timeout) + + wait_for_query.assert_called_once_with(job, None, max_results=None) + row_iterator = wait_for_query.return_value + row_iterator.to_dataframe.assert_called_once() + call_args = row_iterator.to_dataframe.call_args + assert call_args.kwargs["timeout"] == timeout + + +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +@mock.patch("google.cloud.bigquery.job.query.wait_for_query") +def test_query_job_to_arrow_delegation(wait_for_query): + job = _make_job() + bqstorage_client = object() + timeout = 123.45 + + job.to_arrow(bqstorage_client=bqstorage_client, timeout=timeout) + + wait_for_query.assert_called_once_with(job, None, max_results=None) + row_iterator = wait_for_query.return_value + row_iterator.to_arrow.assert_called_once() + call_args = row_iterator.to_arrow.call_args + assert call_args.kwargs["timeout"] == timeout diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index bc94f5f54..6ec62c0b6 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -13,12 +13,14 @@ # limitations under the License. import collections +import concurrent.futures import datetime import decimal import functools import gc import operator import queue +import time from typing import Union from unittest import mock import warnings @@ -2177,3 +2179,207 @@ def test_determine_requested_streams_invalid_max_stream_count(): """Tests that a ValueError is raised if max_stream_count is negative.""" with pytest.raises(ValueError): determine_requested_streams(preserve_order=False, max_stream_count=-1) + + +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires google-cloud-bigquery-storage" +) +def test__download_table_bqstorage_w_timeout_error(module_under_test): + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + from unittest import mock + + mock_bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + fake_session = mock.Mock(streams=[mock.Mock()]) + mock_bqstorage_client.create_read_session.return_value = fake_session + + table_ref = table.TableReference( + dataset.DatasetReference("project-x", "dataset-y"), + "table-z", + ) + + def slow_download_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item + ): + # Block until the main thread sets done=True (which it will on timeout) + while not download_state.done: + time.sleep(0.01) + + with mock.patch.object( + module_under_test, "_download_table_bqstorage_stream", new=slow_download_stream + ): + # Use a very small timeout + result_gen = module_under_test._download_table_bqstorage( + "some-project", table_ref, mock_bqstorage_client, timeout=0.01 + ) + with pytest.raises(concurrent.futures.TimeoutError, match="timed out"): + list(result_gen) + + +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires google-cloud-bigquery-storage" +) +def test__download_table_bqstorage_w_timeout_success(module_under_test): + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + from unittest import mock + + mock_bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + fake_session = mock.Mock(streams=["stream/s0"]) + mock_bqstorage_client.create_read_session.return_value = fake_session + + table_ref = table.TableReference( + dataset.DatasetReference("project-x", "dataset-y"), + "table-z", + ) + + def fast_download_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item + ): + worker_queue.put("result_page") + + with mock.patch.object( + module_under_test, "_download_table_bqstorage_stream", new=fast_download_stream + ): + # Use a generous timeout + result_gen = module_under_test._download_table_bqstorage( + "some-project", table_ref, mock_bqstorage_client, timeout=10.0 + ) + results = list(result_gen) + + assert results == ["result_page"] + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") +@pytest.mark.parametrize( + "sleep_time, timeout, should_timeout", + [ + (0.1, 0.05, True), # Timeout case + (0, 10.0, False), # Success case + ], +) +def test_download_arrow_row_iterator_with_timeout( + module_under_test, sleep_time, timeout, should_timeout +): + bq_schema = [schema.SchemaField("name", "STRING")] + + # Mock page with to_arrow method + mock_page = mock.Mock() + mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays( + [pyarrow.array(["foo"])], + names=["name"], + ) + mock_page.__iter__ = lambda self: iter(["row1"]) + mock_page._columns = [["foo"]] + + def pages_gen(): + # First page yields quickly + yield mock_page + if sleep_time > 0: + time.sleep(sleep_time) + yield mock_page + + iterator = module_under_test.download_arrow_row_iterator( + pages_gen(), bq_schema, timeout=timeout + ) + + # First item should always succeed + next(iterator) + + if should_timeout: + with pytest.raises(concurrent.futures.TimeoutError): + next(iterator) + else: + # Should succeed and complete + results = list(iterator) + assert len(results) == 1 # 1 remaining item + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") +@pytest.mark.parametrize( + "sleep_time, timeout, should_timeout", + [ + (0.1, 0.05, True), # Timeout case + (0, 10.0, False), # Success case + ], +) +def test_download_dataframe_row_iterator_with_timeout( + module_under_test, sleep_time, timeout, should_timeout +): + bq_schema = [schema.SchemaField("name", "STRING")] + dtypes = {} + + # Mock page + mock_page = mock.Mock() + # Mock iterator for _row_iterator_page_to_dataframe checking next(iter(page)) + mock_page.__iter__ = lambda self: iter(["row1"]) + mock_page._columns = [["foo"]] + + def pages_gen(): + yield mock_page + if sleep_time > 0: + time.sleep(sleep_time) + yield mock_page + + iterator = module_under_test.download_dataframe_row_iterator( + pages_gen(), bq_schema, dtypes, timeout=timeout + ) + + next(iterator) + + if should_timeout: + with pytest.raises(concurrent.futures.TimeoutError): + next(iterator) + else: + results = list(iterator) + assert len(results) == 1 + + +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`" +) +def test_download_arrow_bqstorage_passes_timeout_to_create_read_session( + module_under_test, +): + # Mock dependencies + project_id = "test-project" + table = mock.Mock() + table.table_id = "test_table" + table.to_bqstorage.return_value = "projects/test/datasets/test/tables/test" + + bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + # Mock create_read_session to return a session with no streams so the function returns early + # (Checking start of loop logic vs empty streams return) + session = mock.Mock() + # If streams is empty, _download_table_bqstorage returns early, which is fine for this test + session.streams = [] + bqstorage_client.create_read_session.return_value = session + + # Call the function + timeout = 123.456 + # download_arrow_bqstorage yields frames, so we need to iterate to trigger execution + list( + module_under_test.download_arrow_bqstorage( + project_id, table, bqstorage_client, timeout=timeout + ) + ) + + # Verify timeout and retry were passed + bqstorage_client.create_read_session.assert_called_once() + _, kwargs = bqstorage_client.create_read_session.call_args + assert "timeout" in kwargs + assert kwargs["timeout"] == timeout + + assert "retry" in kwargs + retry_policy = kwargs["retry"] + assert retry_policy is not None + # Check if deadline is set correctly in the retry policy + assert retry_policy._deadline == timeout diff --git a/tests/unit/test_client_retry.py b/tests/unit/test_client_retry.py index 6e49cc464..f0e7ac88f 100644 --- a/tests/unit/test_client_retry.py +++ b/tests/unit/test_client_retry.py @@ -23,6 +23,11 @@ PROJECT = "test-project" +# A deadline > 1.0s is required because the default retry (google.api_core.retry.Retry) +# has an initial delay of 1.0s. If the deadline is <= 1.0s, the first retry attempt +# (scheduled for now + 1.0s) will be rejected immediately as exceeding the deadline. +_RETRY_DEADLINE = 10.0 + def _make_credentials(): import google.auth.credentials @@ -83,7 +88,7 @@ def test_call_api_applying_custom_retry_on_timeout(global_time_lock): "api_request", side_effect=[TimeoutError, "result"], ) - retry = DEFAULT_RETRY.with_deadline(1).with_predicate( + retry = DEFAULT_RETRY.with_deadline(_RETRY_DEADLINE).with_predicate( lambda exc: isinstance(exc, TimeoutError) ) diff --git a/tests/unit/test_dbapi_cursor.py b/tests/unit/test_dbapi_cursor.py index 6fca4cec0..c5cad8c91 100644 --- a/tests/unit/test_dbapi_cursor.py +++ b/tests/unit/test_dbapi_cursor.py @@ -480,7 +480,11 @@ def fake_ensure_bqstorage_client(bqstorage_client=None, **kwargs): data_format=bigquery_storage.DataFormat.ARROW, ) mock_bqstorage_client.create_read_session.assert_called_once_with( - parent="projects/P", read_session=expected_session, max_stream_count=1 + parent="projects/P", + read_session=expected_session, + max_stream_count=1, + retry=None, + timeout=None, ) # Check the data returned. diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index af31d116b..a8397247d 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -2495,6 +2495,20 @@ def test_to_geodataframe(self): else: assert not hasattr(df, "crs") + def test_methods_w_timeout(self): + pytest.importorskip("pyarrow") + pytest.importorskip("geopandas") + # Ensure that the timeout parameter is accepted by all methods without raising a TypeError, + # even though the _EmptyRowIterator implementations do not use the timeout value. + timeout = 42.0 + + # Call each type to ensure no TypeError is raised + self._make_one().to_arrow(timeout=timeout) + self._make_one().to_arrow_iterable(timeout=timeout) + self._make_one().to_dataframe(timeout=timeout) + self._make_one().to_dataframe_iterable(timeout=timeout) + self._make_one().to_geodataframe(timeout=timeout) + class TestRowIterator(unittest.TestCase): PYARROW_MINIMUM_VERSION = str(_versions_helpers._MIN_PYARROW_VERSION) @@ -4111,6 +4125,10 @@ def test_to_dataframe_tqdm_error(self): # Warn that a progress bar was requested, but creating the tqdm # progress bar failed. for warning in warned: # pragma: NO COVER + # Pyparsing warnings appear to be coming from a transitive + # dependency and are unrelated to the code under test. + if "Pyparsing" in warning.category.__name__: + continue self.assertIn( warning.category, [UserWarning, DeprecationWarning, tqdm.TqdmExperimentalWarning], @@ -5665,6 +5683,7 @@ def test_rowiterator_to_geodataframe_delegation(self, to_dataframe): int_dtype=DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) self.assertIsInstance(df, geopandas.GeoDataFrame) @@ -6838,6 +6857,8 @@ def test_to_arrow_iterable_w_bqstorage_max_stream_count(preserve_order): parent=mock.ANY, read_session=mock.ANY, max_stream_count=max_stream_count if not preserve_order else 1, + retry=None, + timeout=None, ) @@ -6873,4 +6894,6 @@ def test_to_dataframe_iterable_w_bqstorage_max_stream_count(preserve_order): parent=mock.ANY, read_session=mock.ANY, max_stream_count=max_stream_count if not preserve_order else 1, + retry=None, + timeout=None, ) diff --git a/tests/unit/test_table_pandas.py b/tests/unit/test_table_pandas.py index a4fa3fa39..64d8b1451 100644 --- a/tests/unit/test_table_pandas.py +++ b/tests/unit/test_table_pandas.py @@ -301,6 +301,7 @@ def test_rowiterator_to_geodataframe_with_default_dtypes( int_dtype=bigquery.enums.DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) mock_geopandas.GeoDataFrame.assert_called_once_with( mock_df, crs="EPSG:4326", geometry="geo_col" @@ -358,6 +359,7 @@ def test_rowiterator_to_geodataframe_with_custom_dtypes( int_dtype=custom_int_dtype, float_dtype=custom_float_dtype, string_dtype=custom_string_dtype, + timeout=None, ) mock_geopandas.GeoDataFrame.assert_called_once_with( mock_df, crs="EPSG:4326", geometry="geo_col"