Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d461297

Browse filesBrowse files
kien-truongLinchin
andauthored
feat: support setting max_stream_count when fetching query result (#2051)
* feat: support setting max_stream_count when fetching query result Allow user to set max_stream_count when fetching result using BigQuery Storage API with RowIterator's incremental methods: * to_arrow_iterable * to_dataframe_iterable * docs: update docs about max_stream_count for ordered query * fix: add max_stream_count params to _EmptyRowIterator's methods * test: add tests for RowIterator's max_stream_count parameter * docs: add notes on valid max_stream_count range in docstring * use a different way to iterate result --------- Co-authored-by: Lingqing Gan <lingqing.gan@gmail.com>
1 parent fffe6ba commit d461297
Copy full SHA for d461297

File tree

Expand file treeCollapse file tree

2 files changed

+114
-0
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+114
-0
lines changed

‎google/cloud/bigquery/table.py

Copy file name to clipboardExpand all lines: google/cloud/bigquery/table.py
+44Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1812,6 +1812,7 @@ def to_arrow_iterable(
18121812
self,
18131813
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
18141814
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
1815+
max_stream_count: Optional[int] = None,
18151816
) -> Iterator["pyarrow.RecordBatch"]:
18161817
"""[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream.
18171818
@@ -1836,6 +1837,22 @@ def to_arrow_iterable(
18361837
created by the server. If ``max_queue_size`` is :data:`None`, the queue
18371838
size is infinite.
18381839
1840+
max_stream_count (Optional[int]):
1841+
The maximum number of parallel download streams when
1842+
using BigQuery Storage API. Ignored if
1843+
BigQuery Storage API is not used.
1844+
1845+
This setting also has no effect if the query result
1846+
is deterministically ordered with ORDER BY,
1847+
in which case, the number of download stream is always 1.
1848+
1849+
If set to 0 or None (the default), the number of download
1850+
streams is determined by BigQuery the server. However, this behaviour
1851+
can require a lot of memory to store temporary download result,
1852+
especially with very large queries. In that case,
1853+
setting this parameter value to a value > 0 can help
1854+
reduce system resource consumption.
1855+
18391856
Returns:
18401857
pyarrow.RecordBatch:
18411858
A generator of :class:`~pyarrow.RecordBatch`.
@@ -1852,6 +1869,7 @@ def to_arrow_iterable(
18521869
preserve_order=self._preserve_order,
18531870
selected_fields=self._selected_fields,
18541871
max_queue_size=max_queue_size,
1872+
max_stream_count=max_stream_count,
18551873
)
18561874
tabledata_list_download = functools.partial(
18571875
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
@@ -1978,6 +1996,7 @@ def to_dataframe_iterable(
19781996
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
19791997
dtypes: Optional[Dict[str, Any]] = None,
19801998
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
1999+
max_stream_count: Optional[int] = None,
19812000
) -> "pandas.DataFrame":
19822001
"""Create an iterable of pandas DataFrames, to process the table as a stream.
19832002
@@ -2008,6 +2027,22 @@ def to_dataframe_iterable(
20082027
20092028
.. versionadded:: 2.14.0
20102029
2030+
max_stream_count (Optional[int]):
2031+
The maximum number of parallel download streams when
2032+
using BigQuery Storage API. Ignored if
2033+
BigQuery Storage API is not used.
2034+
2035+
This setting also has no effect if the query result
2036+
is deterministically ordered with ORDER BY,
2037+
in which case, the number of download stream is always 1.
2038+
2039+
If set to 0 or None (the default), the number of download
2040+
streams is determined by BigQuery the server. However, this behaviour
2041+
can require a lot of memory to store temporary download result,
2042+
especially with very large queries. In that case,
2043+
setting this parameter value to a value > 0 can help
2044+
reduce system resource consumption.
2045+
20112046
Returns:
20122047
pandas.DataFrame:
20132048
A generator of :class:`~pandas.DataFrame`.
@@ -2034,6 +2069,7 @@ def to_dataframe_iterable(
20342069
preserve_order=self._preserve_order,
20352070
selected_fields=self._selected_fields,
20362071
max_queue_size=max_queue_size,
2072+
max_stream_count=max_stream_count,
20372073
)
20382074
tabledata_list_download = functools.partial(
20392075
_pandas_helpers.download_dataframe_row_iterator,
@@ -2690,6 +2726,7 @@ def to_dataframe_iterable(
26902726
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
26912727
dtypes: Optional[Dict[str, Any]] = None,
26922728
max_queue_size: Optional[int] = None,
2729+
max_stream_count: Optional[int] = None,
26932730
) -> Iterator["pandas.DataFrame"]:
26942731
"""Create an iterable of pandas DataFrames, to process the table as a stream.
26952732
@@ -2705,6 +2742,9 @@ def to_dataframe_iterable(
27052742
max_queue_size:
27062743
Ignored. Added for compatibility with RowIterator.
27072744
2745+
max_stream_count:
2746+
Ignored. Added for compatibility with RowIterator.
2747+
27082748
Returns:
27092749
An iterator yielding a single empty :class:`~pandas.DataFrame`.
27102750
@@ -2719,6 +2759,7 @@ def to_arrow_iterable(
27192759
self,
27202760
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
27212761
max_queue_size: Optional[int] = None,
2762+
max_stream_count: Optional[int] = None,
27222763
) -> Iterator["pyarrow.RecordBatch"]:
27232764
"""Create an iterable of pandas DataFrames, to process the table as a stream.
27242765
@@ -2731,6 +2772,9 @@ def to_arrow_iterable(
27312772
max_queue_size:
27322773
Ignored. Added for compatibility with RowIterator.
27332774
2775+
max_stream_count:
2776+
Ignored. Added for compatibility with RowIterator.
2777+
27342778
Returns:
27352779
An iterator yielding a single empty :class:`~pyarrow.RecordBatch`.
27362780
"""

‎tests/unit/test_table.py

Copy file name to clipboardExpand all lines: tests/unit/test_table.py
+70Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5822,3 +5822,73 @@ def test_table_reference_to_bqstorage_v1_stable(table_path):
58225822
for klass in (mut.TableReference, mut.Table, mut.TableListItem):
58235823
got = klass.from_string(table_path).to_bqstorage()
58245824
assert got == expected
5825+
5826+
5827+
@pytest.mark.parametrize("preserve_order", [True, False])
5828+
def test_to_arrow_iterable_w_bqstorage_max_stream_count(preserve_order):
5829+
pytest.importorskip("pandas")
5830+
pytest.importorskip("google.cloud.bigquery_storage")
5831+
from google.cloud.bigquery import schema
5832+
from google.cloud.bigquery import table as mut
5833+
from google.cloud import bigquery_storage
5834+
5835+
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
5836+
session = bigquery_storage.types.ReadSession()
5837+
bqstorage_client.create_read_session.return_value = session
5838+
5839+
row_iterator = mut.RowIterator(
5840+
_mock_client(),
5841+
api_request=None,
5842+
path=None,
5843+
schema=[
5844+
schema.SchemaField("colA", "INTEGER"),
5845+
],
5846+
table=mut.TableReference.from_string("proj.dset.tbl"),
5847+
)
5848+
row_iterator._preserve_order = preserve_order
5849+
5850+
max_stream_count = 132
5851+
result_iterable = row_iterator.to_arrow_iterable(
5852+
bqstorage_client=bqstorage_client, max_stream_count=max_stream_count
5853+
)
5854+
list(result_iterable)
5855+
bqstorage_client.create_read_session.assert_called_once_with(
5856+
parent=mock.ANY,
5857+
read_session=mock.ANY,
5858+
max_stream_count=max_stream_count if not preserve_order else 1,
5859+
)
5860+
5861+
5862+
@pytest.mark.parametrize("preserve_order", [True, False])
5863+
def test_to_dataframe_iterable_w_bqstorage_max_stream_count(preserve_order):
5864+
pytest.importorskip("pandas")
5865+
pytest.importorskip("google.cloud.bigquery_storage")
5866+
from google.cloud.bigquery import schema
5867+
from google.cloud.bigquery import table as mut
5868+
from google.cloud import bigquery_storage
5869+
5870+
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
5871+
session = bigquery_storage.types.ReadSession()
5872+
bqstorage_client.create_read_session.return_value = session
5873+
5874+
row_iterator = mut.RowIterator(
5875+
_mock_client(),
5876+
api_request=None,
5877+
path=None,
5878+
schema=[
5879+
schema.SchemaField("colA", "INTEGER"),
5880+
],
5881+
table=mut.TableReference.from_string("proj.dset.tbl"),
5882+
)
5883+
row_iterator._preserve_order = preserve_order
5884+
5885+
max_stream_count = 132
5886+
result_iterable = row_iterator.to_dataframe_iterable(
5887+
bqstorage_client=bqstorage_client, max_stream_count=max_stream_count
5888+
)
5889+
list(result_iterable)
5890+
bqstorage_client.create_read_session.assert_called_once_with(
5891+
parent=mock.ANY,
5892+
read_session=mock.ANY,
5893+
max_stream_count=max_stream_count if not preserve_order else 1,
5894+
)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.