diff --git a/bigframes/__init__.py b/bigframes/__init__.py index bd1476957b..240608ebc2 100644 --- a/bigframes/__init__.py +++ b/bigframes/__init__.py @@ -17,6 +17,8 @@ from bigframes._config import option_context, options from bigframes._config.bigquery_options import BigQueryOptions from bigframes.core.global_session import close_session, get_global_session +import bigframes.enums as enums +import bigframes.exceptions as exceptions from bigframes.session import connect, Session from bigframes.version import __version__ @@ -25,6 +27,8 @@ "BigQueryOptions", "get_global_session", "close_session", + "enums", + "exceptions", "connect", "Session", "__version__", diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 4ff8a1836b..402581eb6f 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -116,10 +116,20 @@ def __init__( raise ValueError( f"'index_columns' (size {len(index_columns)}) and 'index_labels' (size {len(index_labels)}) must have equal length" ) + + # If no index columns are set, create one. + # + # Note: get_index_cols_and_uniqueness in + # bigframes/session/_io/bigquery/read_gbq_table.py depends on this + # being as sequential integer index column. If this default behavior + # ever changes, please also update get_index_cols_and_uniqueness so + # that users who explicitly request a sequential integer index can + # still get one. if len(index_columns) == 0: new_index_col_id = guid.generate_guid() expr = expr.promote_offsets(new_index_col_id) index_columns = [new_index_col_id] + self._index_columns = tuple(index_columns) # Index labels don't need complicated hierarchical access so can store as tuple self._index_labels = ( diff --git a/bigframes/enums.py b/bigframes/enums.py new file mode 100644 index 0000000000..4bec75f5df --- /dev/null +++ b/bigframes/enums.py @@ -0,0 +1,29 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Public enums used across BigQuery DataFrames.""" + +# NOTE: This module should not depend on any others in the package. + + +import enum + + +class DefaultIndexKind(enum.Enum): + """Sentinel values used to override default indexing behavior.""" + + #: Use consecutive integers as the index. This is ``0``, ``1``, ``2``, ..., + #: ``n - 3``, ``n - 2``, ``n - 1``, where ``n`` is the number of items in + #: the index. + SEQUENTIAL_INT64 = enum.auto() diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 62122e79d2..d179914983 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -12,6 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Public exceptions and warnings used across BigQuery DataFrames.""" + +# NOTE: This module should not depend on any others in the package. + class UnknownLocationWarning(Warning): """The location is set to an unknown value.""" + + +class NoDefaultIndexError(ValueError): + """Unable to create a default index.""" diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 48a4b0f68d..ce69f49c89 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -63,6 +63,7 @@ import bigframes.core.reshape import bigframes.core.tools import bigframes.dataframe +import bigframes.enums import bigframes.operations as ops import bigframes.series import bigframes.session @@ -423,7 +424,13 @@ def read_csv( Union[MutableSequence[Any], numpy.ndarray[Any, Any], Tuple[Any, ...], range] ] = None, index_col: Optional[ - Union[int, str, Sequence[Union[str, int]], Literal[False]] + Union[ + int, + str, + Sequence[Union[str, int]], + bigframes.enums.DefaultIndexKind, + Literal[False], + ] ] = None, usecols: Optional[ Union[ @@ -491,7 +498,7 @@ def read_json( def read_gbq( query_or_table: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -529,7 +536,7 @@ def read_gbq_model(model_name: str): def read_gbq_query( query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -555,7 +562,7 @@ def read_gbq_query( def read_gbq_table( query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), max_results: Optional[int] = None, filters: vendored_pandas_gbq.FiltersType = (), diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 0f5aa19592..6b84d838cf 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -294,7 +294,7 @@ def read_gbq( self, query_or_table: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -313,6 +313,9 @@ def read_gbq( filters = list(filters) if len(filters) != 0 or _is_table_with_wildcard_suffix(query_or_table): + # TODO(b/338111344): This appears to be missing index_cols, which + # are necessary to be selected. + # TODO(b/338039517): Also, need to account for primary keys. query_or_table = self._to_query(query_or_table, columns, filters) if _is_query(query_or_table): @@ -326,9 +329,6 @@ def read_gbq( use_cache=use_cache, ) else: - # TODO(swast): Query the snapshot table but mark it as a - # deterministic query so we can avoid serializing if we have a - # unique index. if configuration is not None: raise ValueError( "The 'configuration' argument is not allowed when " @@ -359,6 +359,8 @@ def _to_query( else f"`{query_or_table}`" ) + # TODO(b/338111344): Generate an index based on DefaultIndexKind if we + # don't have index columns specified. select_clause = "SELECT " + ( ", ".join(f"`{column}`" for column in columns) if columns else "*" ) @@ -488,7 +490,7 @@ def read_gbq_query( self, query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -566,7 +568,7 @@ def _read_gbq_query( self, query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -598,7 +600,9 @@ def _read_gbq_query( True if use_cache is None else use_cache ) - if isinstance(index_col, str): + if isinstance(index_col, bigframes.enums.DefaultIndexKind): + index_cols = [] + elif isinstance(index_col, str): index_cols = [index_col] else: index_cols = list(index_col) @@ -628,7 +632,7 @@ def _read_gbq_query( return self.read_gbq_table( f"{destination.project}.{destination.dataset_id}.{destination.table_id}", - index_col=index_cols, + index_col=index_col, columns=columns, max_results=max_results, use_cache=configuration["query"]["useQueryCache"], @@ -638,7 +642,7 @@ def read_gbq_table( self, query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), max_results: Optional[int] = None, filters: third_party_pandas_gbq.FiltersType = (), @@ -693,7 +697,7 @@ def _read_gbq_table( self, query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), max_results: Optional[int] = None, api_name: str, @@ -821,10 +825,12 @@ def _read_bigquery_load_job( table: Union[bigquery.Table, bigquery.TableReference], *, job_config: bigquery.LoadJobConfig, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), ) -> dataframe.DataFrame: - if isinstance(index_col, str): + if isinstance(index_col, bigframes.enums.DefaultIndexKind): + index_cols = [] + elif isinstance(index_col, str): index_cols = [index_col] else: index_cols = list(index_col) @@ -1113,7 +1119,13 @@ def read_csv( Union[MutableSequence[Any], np.ndarray[Any, Any], Tuple[Any, ...], range] ] = None, index_col: Optional[ - Union[int, str, Sequence[Union[str, int]], Literal[False]] + Union[ + int, + str, + Sequence[Union[str, int]], + bigframes.enums.DefaultIndexKind, + Literal[False], + ] ] = None, usecols: Optional[ Union[ @@ -1143,18 +1155,37 @@ def read_csv( f"{constants.FEEDBACK_LINK}" ) - if index_col is not None and ( - not index_col or not isinstance(index_col, str) + # TODO(b/338089659): Looks like we can relax this 1 column + # restriction if we check the contents of an iterable are strings + # not integers. + if ( + # Empty tuples, None, and False are allowed and falsey. + index_col + and not isinstance(index_col, bigframes.enums.DefaultIndexKind) + and not isinstance(index_col, str) ): raise NotImplementedError( - "BigQuery engine only supports a single column name for `index_col`. " - f"{constants.FEEDBACK_LINK}" + "BigQuery engine only supports a single column name for `index_col`, " + f"got: {repr(index_col)}. {constants.FEEDBACK_LINK}" ) - # None value for index_col cannot be passed to read_gbq - if index_col is None: + # None and False cannot be passed to read_gbq. + # TODO(b/338400133): When index_col is None, we should be using the + # first column of the CSV as the index to be compatible with the + # pandas engine. According to the pandas docs, only "False" + # indicates a default sequential index. + if not index_col: index_col = () + index_col = typing.cast( + Union[ + Sequence[str], # Falsey values + bigframes.enums.DefaultIndexKind, + str, + ], + index_col, + ) + # usecols should only be an iterable of strings (column names) for use as columns in read_gbq. columns: Tuple[Any, ...] = tuple() if usecols is not None: @@ -1199,6 +1230,11 @@ def read_csv( columns=columns, ) else: + if isinstance(index_col, bigframes.enums.DefaultIndexKind): + raise NotImplementedError( + f"With index_col={repr(index_col)}, only engine='bigquery' is supported. " + f"{constants.FEEDBACK_LINK}" + ) if any(arg in kwargs for arg in ("chunksize", "iterator")): raise NotImplementedError( "'chunksize' and 'iterator' arguments are not supported. " diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 3235ca92e5..29d5a5567f 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -35,6 +35,7 @@ import bigframes import bigframes.clients +import bigframes.constants import bigframes.core as core import bigframes.core.compile import bigframes.core.guid as guid @@ -206,12 +207,35 @@ def _get_primary_keys( return primary_keys +def _is_table_clustered_or_partitioned( + table: bigquery.table.Table, +) -> bool: + """Returns True if the table is clustered or partitioned.""" + + # Could be None or an empty tuple if it's not clustered, both of which are + # falsey. + if table.clustering_fields: + return True + + if ( + time_partitioning := table.time_partitioning + ) is not None and time_partitioning.type_ is not None: + return True + + if ( + range_partitioning := table.range_partitioning + ) is not None and range_partitioning.field is not None: + return True + + return False + + def get_index_cols_and_uniqueness( bqclient: bigquery.Client, ibis_client: ibis.BaseBackend, table: bigquery.table.Table, table_expression: ibis_types.Table, - index_col: Iterable[str] | str, + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind, api_name: str, ) -> Tuple[List[str], bool]: """ @@ -222,7 +246,23 @@ def get_index_cols_and_uniqueness( # Transform index_col -> index_cols so we have a variable that is # always a list of column names (possibly empty). - if isinstance(index_col, str): + if isinstance(index_col, bigframes.enums.DefaultIndexKind): + if index_col == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64: + # User has explicity asked for a default, sequential index. + # Use that, even if there are primary keys on the table. + # + # Note: This relies on the default behavior of the Block + # constructor to create a default sequential index. If that ever + # changes, this logic will need to be revisited. + return [], False + else: + # Note: It's actually quite difficult to mock this out to unit + # test, as it's not possible to subclass enums in Python. See: + # https://stackoverflow.com/a/33680021/101923 + raise NotImplementedError( + f"Got unexpected index_col {repr(index_col)}. {bigframes.constants.FEEDBACK_LINK}" + ) + elif isinstance(index_col, str): index_cols: List[str] = [index_col] else: index_cols = list(index_col) @@ -230,14 +270,26 @@ def get_index_cols_and_uniqueness( # If the isn't an index selected, use the primary keys of the table as the # index. If there are no primary keys, we'll return an empty list. if len(index_cols) == 0: - index_cols = _get_primary_keys(table) - - # TODO(b/335727141): If table has clustering/partitioning, fail if - # index_cols is empty. + primary_keys = _get_primary_keys(table) + + # If table has clustering/partitioning, fail if we haven't been able to + # find index_cols to use. This is to avoid unexpected performance and + # resource utilization because of the default sequential index. See + # internal issue 335727141. + if _is_table_clustered_or_partitioned(table) and not primary_keys: + raise bigframes.exceptions.NoDefaultIndexError( + f"Table '{str(table.reference)}' is clustered and/or " + "partitioned, but BigQuery DataFrames was not able to find a " + "suitable index. To avoid this error, set at least one of: " + # TODO(b/338037499): Allow max_results to override this too, + # once we make it more efficient. + "`index_col` or `filters`." + ) # If there are primary keys defined, the query engine assumes these # columns are unique, even if the constraint is not enforced. We make # the same assumption and use these columns as the total ordering keys. + index_cols = primary_keys is_index_unique = len(index_cols) != 0 else: is_index_unique = _check_index_uniqueness( diff --git a/docs/reference/bigframes/enums.rst b/docs/reference/bigframes/enums.rst new file mode 100644 index 0000000000..b0a198e184 --- /dev/null +++ b/docs/reference/bigframes/enums.rst @@ -0,0 +1,8 @@ + +===== +Enums +===== + +.. automodule:: bigframes.enums + :members: + :undoc-members: diff --git a/docs/reference/bigframes/exceptions.rst b/docs/reference/bigframes/exceptions.rst new file mode 100644 index 0000000000..c471aecdf7 --- /dev/null +++ b/docs/reference/bigframes/exceptions.rst @@ -0,0 +1,8 @@ + +======================= +Exceptions and Warnings +======================= + +.. automodule:: bigframes.exceptions + :members: + :undoc-members: diff --git a/docs/reference/bigframes/index.rst b/docs/reference/bigframes/index.rst index d26db18c96..f56883dc8e 100644 --- a/docs/reference/bigframes/index.rst +++ b/docs/reference/bigframes/index.rst @@ -6,6 +6,8 @@ Core objects .. toctree:: :maxdepth: 2 + enums + exceptions options diff --git a/docs/templates/toc.yml b/docs/templates/toc.yml index 80ccc01fac..67e628eb7d 100644 --- a/docs/templates/toc.yml +++ b/docs/templates/toc.yml @@ -32,6 +32,10 @@ - name: Session uid: bigframes.session.Session name: Session + - name: Enumerations + uid: bigframes.enums + - name: Exceptions and warnings + uid: bigframes.exceptions name: Core Objects - items: - name: DataFrame diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index f26902f084..f36dd64cbe 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -117,8 +117,8 @@ def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index): @pytest.mark.parametrize( - ("index"), - [True, False], + ("index",), + [(True,), (False,)], ) def test_to_csv_index( scalars_dfs: Tuple[bigframes.dataframe.DataFrame, pd.DataFrame], diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 1e76a8bd8b..2779874d6c 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -524,7 +524,11 @@ def test_read_csv_gcs_bq_engine(session, scalars_dfs, gcs_folder): scalars_df, _ = scalars_dfs path = gcs_folder + "test_read_csv_gcs_bq_engine_w_index*.csv" scalars_df.to_csv(path, index=False) - df = session.read_csv(path, engine="bigquery") + df = session.read_csv( + path, + engine="bigquery", + index_col=bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64, + ) # TODO(chelsealin): If we serialize the index, can more easily compare values. pd.testing.assert_index_equal(df.columns, scalars_df.columns) @@ -629,44 +633,24 @@ def test_read_csv_localbuffer_bq_engine(session, scalars_dfs): pd.testing.assert_series_equal(df.dtypes, scalars_df.dtypes) -@pytest.mark.parametrize( - ("kwargs", "match"), - [ - pytest.param( - {"engine": "bigquery", "names": []}, - "BigQuery engine does not support these arguments", - id="with_names", - ), - pytest.param( - {"engine": "bigquery", "dtype": {}}, - "BigQuery engine does not support these arguments", - id="with_dtype", - ), - pytest.param( - {"engine": "bigquery", "index_col": False}, - "BigQuery engine only supports a single column name for `index_col`.", - id="with_index_col_false", - ), - pytest.param( - {"engine": "bigquery", "index_col": 5}, - "BigQuery engine only supports a single column name for `index_col`.", - id="with_index_col_not_str", - ), - pytest.param( - {"engine": "bigquery", "usecols": [1, 2]}, - "BigQuery engine only supports an iterable of strings for `usecols`.", - id="with_usecols_invalid", - ), - pytest.param( - {"engine": "bigquery", "encoding": "ASCII"}, - "BigQuery engine only supports the following encodings", - id="with_encoding_invalid", - ), - ], -) -def test_read_csv_bq_engine_throws_not_implemented_error(session, kwargs, match): - with pytest.raises(NotImplementedError, match=match): - session.read_csv("", **kwargs) +def test_read_csv_bq_engine_supports_index_col_false( + session, scalars_df_index, gcs_folder +): + path = gcs_folder + "test_read_csv_bq_engine_supports_index_col_false*.csv" + read_path = utils.get_first_file_from_wildcard(path) + scalars_df_index.to_csv(path) + + df = session.read_csv( + read_path, + # Normally, pandas uses the first column as the index. index_col=False + # turns off that behavior. + index_col=False, + ) + assert df.shape[0] == scalars_df_index.shape[0] + + # We use a default index because of index_col=False, so the previous index + # column is just loaded as a column. + assert len(df.columns) == len(scalars_df_index.columns) + 1 @pytest.mark.parametrize( diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 34f185cafd..70a121435c 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import datetime import os import re @@ -23,10 +24,129 @@ import pytest import bigframes +import bigframes.enums import bigframes.exceptions from .. import resources +TABLE_REFERENCE = { + "projectId": "my-project", + "datasetId": "my_dataset", + "tableId": "my_table", +} +CLUSTERED_OR_PARTITIONED_TABLES = [ + pytest.param( + google.cloud.bigquery.Table.from_api_repr( + { + "tableReference": TABLE_REFERENCE, + "clustering": { + "fields": ["col1", "col2"], + }, + }, + ), + id="clustered", + ), + pytest.param( + google.cloud.bigquery.Table.from_api_repr( + { + "tableReference": TABLE_REFERENCE, + "rangePartitioning": { + "field": "col1", + "range": { + "start": 1, + "end": 100, + "interval": 1, + }, + }, + }, + ), + id="range-partitioned", + ), + pytest.param( + google.cloud.bigquery.Table.from_api_repr( + { + "tableReference": TABLE_REFERENCE, + "timePartitioning": { + "type": "MONTH", + "field": "col1", + }, + }, + ), + id="time-partitioned", + ), + pytest.param( + google.cloud.bigquery.Table.from_api_repr( + { + "tableReference": TABLE_REFERENCE, + "clustering": { + "fields": ["col1", "col2"], + }, + "timePartitioning": { + "type": "MONTH", + "field": "col1", + }, + }, + ), + id="time-partitioned-and-clustered", + ), +] + + +@pytest.mark.parametrize( + ("kwargs", "match"), + [ + pytest.param( + {"engine": "bigquery", "names": []}, + "BigQuery engine does not support these arguments", + id="with_names", + ), + pytest.param( + {"engine": "bigquery", "dtype": {}}, + "BigQuery engine does not support these arguments", + id="with_dtype", + ), + pytest.param( + {"engine": "bigquery", "index_col": 5}, + "BigQuery engine only supports a single column name for `index_col`.", + id="with_index_col_not_str", + ), + pytest.param( + {"engine": "bigquery", "usecols": [1, 2]}, + "BigQuery engine only supports an iterable of strings for `usecols`.", + id="with_usecols_invalid", + ), + pytest.param( + {"engine": "bigquery", "encoding": "ASCII"}, + "BigQuery engine only supports the following encodings", + id="with_encoding_invalid", + ), + ], +) +def test_read_csv_bq_engine_throws_not_implemented_error(kwargs, match): + session = resources.create_bigquery_session() + + with pytest.raises(NotImplementedError, match=match): + session.read_csv("", **kwargs) + + +@pytest.mark.parametrize( + ("engine",), + ( + ("c",), + ("python",), + ("pyarrow",), + ), +) +def test_read_csv_pandas_engines_index_col_sequential_int64_not_supported(engine): + session = resources.create_bigquery_session() + + with pytest.raises(NotImplementedError, match="index_col"): + session.read_csv( + "path/to/csv.csv", + engine=engine, + index_col=bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64, + ) + @pytest.mark.parametrize("missing_parts_table_id", [(""), ("table")]) def test_read_gbq_missing_parts(missing_parts_table_id): @@ -65,14 +185,109 @@ def get_table_mock(table_ref): assert "1999-01-02T03:04:05.678901" in df.sql -def test_read_gbq_clustered_table_ok_default_index_with_primary_key(): +@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES) +def test_no_default_index_error_raised_by_read_gbq(table): + """Because of the windowing operation to create a default index, row + filters can't push down to the clustering column. + + Raise an exception in this case so that the user is directed to supply a + unique index column or filter if possible. + + See internal issue 335727141. + """ + table = copy.deepcopy(table) + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + session = resources.create_bigquery_session(bqclient=bqclient) + table._properties["location"] = session._location + + with pytest.raises(bigframes.exceptions.NoDefaultIndexError): + session.read_gbq("my-project.my_dataset.my_table") + + +@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES) +def test_no_default_index_error_not_raised_by_read_gbq_index_col_sequential_int64( + table, +): + """Because of the windowing operation to create a default index, row + filters can't push down to the clustering column. + + Allow people to use the default index only if they explicitly request it. + + See internal issue 335727141. + """ + table = copy.deepcopy(table) + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + session = resources.create_bigquery_session(bqclient=bqclient) + table._properties["location"] = session._location + + # No exception raised because we set the option allowing the default indexes. + df = session.read_gbq( + "my-project.my_dataset.my_table", + index_col=bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64, + ) + + # We expect a window operation because we specificaly requested a sequential index. + generated_sql = df.sql.casefold() + assert "OVER".casefold() in generated_sql + assert "ROW_NUMBER()".casefold() in generated_sql + + +@pytest.mark.parametrize( + ("total_count", "distinct_count"), + ( + (0, 0), + (123, 123), + # Should still have a positive effect, even if the index is not unique. + (123, 111), + ), +) +@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES) +def test_no_default_index_error_not_raised_by_read_gbq_index_col_columns( + total_count, + distinct_count, + table, +): + table = copy.deepcopy(table) + table.schema = ( + google.cloud.bigquery.SchemaField("idx_1", "INT64"), + google.cloud.bigquery.SchemaField("idx_2", "INT64"), + google.cloud.bigquery.SchemaField("col_1", "INT64"), + google.cloud.bigquery.SchemaField("col_2", "INT64"), + ) + + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + bqclient.query_and_wait.return_value = ( + {"total_count": total_count, "distinct_count": distinct_count}, + ) + session = resources.create_bigquery_session( + bqclient=bqclient, table_schema=table.schema + ) + table._properties["location"] = session._location + + # No exception raised because there are columns to use as the index. + df = session.read_gbq( + "my-project.my_dataset.my_table", index_col=("idx_1", "idx_2") + ) + + # There should be no analytic operators to prevent row filtering pushdown. + assert "OVER" not in df.sql + assert tuple(df.index.names) == ("idx_1", "idx_2") + + +@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES) +def test_no_default_index_error_not_raised_by_read_gbq_primary_key(table): """If a primary key is set on the table, we use that as the index column by default, no error should be raised in this case. See internal issue 335727141. """ - table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") - table.clustering_fields = ["col1", "col2"] + table = copy.deepcopy(table) table.schema = ( google.cloud.bigquery.SchemaField("pk_1", "INT64"), google.cloud.bigquery.SchemaField("pk_2", "INT64"), @@ -95,6 +310,7 @@ def test_read_gbq_clustered_table_ok_default_index_with_primary_key(): ) table._properties["location"] = session._location + # No exception raised because there is a primary key to use as the index. df = session.read_gbq("my-project.my_dataset.my_table") # There should be no analytic operators to prevent row filtering pushdown. diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 93cee71289..c25dd8776f 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -6,6 +6,7 @@ from typing import Any, Dict, Iterable, Literal, Optional, Tuple, Union from bigframes import constants +import bigframes.enums FilterOps = Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">", "LIKE"] FilterType = Tuple[str, FilterOps, Any] @@ -17,7 +18,7 @@ def read_gbq( self, query_or_table: str, *, - index_col: Iterable[str] | str = (), + index_col: Union[Iterable[str], str, bigframes.enums.DefaultIndexKind] = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -28,16 +29,23 @@ def read_gbq( """Loads a DataFrame from BigQuery. BigQuery tables are an unordered, unindexed data source. To add support - pandas-compatibility, the following indexing options are supported: - - * (Default behavior) Add an arbitrary sequential index and ordering - using an an analytic windowed operation that prevents filtering - push down. + pandas-compatibility, the following indexing options are supported via + the ``index_col`` parameter: + + * (Empty iterable, default) A default index. **Behavior may change.** + Explicitly set ``index_col`` if your application makes use of + specific index values. + + If a table has primary key(s), those are used as the index, + otherwise a sequential index is generated. + * (:attr:`bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64`) Add an + arbitrary sequential index and ordering. **Warning** This uses an + analytic windowed operation that prevents filtering push down. Avoid + using on large clustered or partitioned tables. * (Recommended) Set the ``index_col`` argument to one or more columns. Unique values for the row labels are recommended. Duplicate labels are possible, but note that joins on a non-unique index can duplicate - rows and operations like ``cumsum()`` that window across a non-unique - index can have some non-deternimism. + rows via pandas-like outer join behavior. .. note:: By default, even SQL query inputs with an ORDER BY clause create a @@ -107,11 +115,18 @@ def read_gbq( `project.dataset.tablename` or `dataset.tablename`. Can also take wildcard table name, such as `project.dataset.table_prefix*`. In tha case, will read all the matched table as one DataFrame. - index_col (Iterable[str] or str): + index_col (Iterable[str], str, bigframes.enums.DefaultIndexKind): Name of result column(s) to use for index in results DataFrame. + If an empty iterable, such as ``()``, a default index is + generated. Do not depend on specific index values in this case. + **New in bigframes version 1.3.0**: If ``index_cols`` is not set, the primary key(s) of the table are used as the index. + + **New in bigframes version 1.4.0**: Support + :class:`bigframes.enums.DefaultIndexKind` to override default index + behavior. columns (Iterable[str]): List of BigQuery column names in the desired order for results DataFrame. @@ -141,6 +156,11 @@ def read_gbq( col_order (Iterable[str]): Alias for columns, retained for backwards compatibility. + Raises: + bigframes.exceptions.NoDefaultIndexError: + Using the default index is discouraged, such as with clustered + or partitioned tables without primary keys. + Returns: bigframes.dataframe.DataFrame: A DataFrame representing results of the query or table. """ diff --git a/third_party/bigframes_vendored/pandas/io/parsers/readers.py b/third_party/bigframes_vendored/pandas/io/parsers/readers.py index e8ed6182a6..d147abfd22 100644 --- a/third_party/bigframes_vendored/pandas/io/parsers/readers.py +++ b/third_party/bigframes_vendored/pandas/io/parsers/readers.py @@ -21,6 +21,7 @@ import numpy as np from bigframes import constants +import bigframes.enums class ReaderIOMixin: @@ -34,7 +35,13 @@ def read_csv( Union[MutableSequence[Any], np.ndarray[Any, Any], Tuple[Any, ...], range] ] = None, index_col: Optional[ - Union[int, str, Sequence[Union[str, int]], Literal[False]] + Union[ + int, + str, + Sequence[Union[str, int]], + bigframes.enums.DefaultIndexKind, + Literal[False], + ] ] = None, usecols=None, dtype: Optional[Dict] = None,