diff --git a/bigframes/core/indexers.py b/bigframes/core/indexers.py index a74880041c..09f0d5956c 100644 --- a/bigframes/core/indexers.py +++ b/bigframes/core/indexers.py @@ -21,6 +21,7 @@ import pandas as pd import bigframes.constants as constants +import bigframes.core.blocks import bigframes.core.guid as guid import bigframes.core.indexes as indexes import bigframes.core.scalar @@ -214,7 +215,7 @@ def __getitem__(self, key: tuple) -> bigframes.core.scalar.Scalar: raise ValueError(error_message) if len(key) != 2: raise TypeError(error_message) - block = self._dataframe._block + block: bigframes.core.blocks.Block = self._dataframe._block column_block = block.select_columns([block.value_columns[key[1]]]) column = bigframes.series.Series(column_block) return column.iloc[key[0]] diff --git a/bigframes/session.py b/bigframes/session.py index 4f509f0704..6c1160c88e 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -498,6 +498,8 @@ def read_gbq_query( See also: :meth:`Session.read_gbq`. """ + # NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so + # these docstrings are inline. return self._read_gbq_query( query=query, index_col=index_col, @@ -515,8 +517,6 @@ def _read_gbq_query( max_results: Optional[int] = None, api_name: str, ) -> dataframe.DataFrame: - # NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so - # these docstrings are inline. if isinstance(index_col, str): index_cols = [index_col] else: @@ -561,6 +561,8 @@ def read_gbq_table( See also: :meth:`Session.read_gbq`. """ + # NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so + # these docstrings are inline. return self._read_gbq_table( query=query, index_col=index_col, @@ -569,6 +571,62 @@ def read_gbq_table( api_name="read_gbq_table", ) + def _read_gbq_table_to_ibis_with_total_ordering( + self, + table_ref: bigquery.table.TableReference, + *, + api_name: str, + ) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]: + """Create a read-only Ibis table expression representing a table. + + If we can get a total ordering from the table, such as via primary key + column(s), then return those too so that ordering generation can be + avoided. + """ + if table_ref.dataset_id.upper() == "_SESSION": + # _SESSION tables aren't supported by the tables.get REST API. + return ( + self.ibis_client.sql( + f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" + ), + None, + ) + + table_expression = self.ibis_client.table( + table_ref.table_id, + database=f"{table_ref.project}.{table_ref.dataset_id}", + ) + + # If there are primary keys defined, the query engine assumes these + # columns are unique, even if the constraint is not enforced. We make + # the same assumption and use these columns as the total ordering keys. + table = self.bqclient.get_table(table_ref) + + # TODO(b/305264153): Use public properties to fetch primary keys once + # added to google-cloud-bigquery. + primary_keys = ( + table._properties.get("tableConstraints", {}) + .get("primaryKey", {}) + .get("columns") + ) + + if not primary_keys: + return table_expression, None + else: + # Read from a snapshot since we won't have to copy the table data to create a total ordering. + job_config = bigquery.QueryJobConfig() + job_config.labels["bigframes-api"] = api_name + current_timestamp = list( + self.bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + table_expression = self.ibis_client.sql( + bigframes_io.create_snapshot_sql(table_ref, current_timestamp) + ) + return table_expression, primary_keys + def _read_gbq_table( self, query: str, @@ -581,24 +639,19 @@ def _read_gbq_table( if max_results and max_results <= 0: raise ValueError("`max_results` should be a positive number.") - # NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so - # these docstrings are inline. # TODO(swast): Can we re-use the temp table from other reads in the # session, if the original table wasn't modified? table_ref = bigquery.table.TableReference.from_string( query, default_project=self.bqclient.project ) - if table_ref.dataset_id.upper() == "_SESSION": - # _SESSION tables aren't supported by the tables.get REST API. - table_expression = self.ibis_client.sql( - f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" - ) - else: - table_expression = self.ibis_client.table( - table_ref.table_id, - database=f"{table_ref.project}.{table_ref.dataset_id}", - ) + ( + table_expression, + total_ordering_cols, + ) = self._read_gbq_table_to_ibis_with_total_ordering( + table_ref, + api_name=api_name, + ) for key in col_order: if key not in table_expression.columns: @@ -624,7 +677,34 @@ def _read_gbq_table( ordering = None is_total_ordering = False - if len(index_cols) != 0: + if total_ordering_cols is not None: + # Note: currently, this a table has a total ordering only when the + # primary key(s) are set on a table. The query engine assumes such + # columns are unique, even if not enforced. + is_total_ordering = True + ordering = core.ExpressionOrdering( + ordering_value_columns=[ + core.OrderingColumnReference(column_id) + for column_id in total_ordering_cols + ], + total_ordering_columns=frozenset(total_ordering_cols), + ) + + if len(index_cols) != 0: + index_labels = typing.cast(List[Optional[str]], index_cols) + else: + # Use the total_ordering_cols to project offsets to use as the default index. + table_expression = table_expression.order_by(index_cols) + default_index_id = guid.generate_guid("bigframes_index_") + default_index_col = ( + ibis.row_number().cast(ibis_dtypes.int64).name(default_index_id) + ) + table_expression = table_expression.mutate( + **{default_index_id: default_index_col} + ) + index_cols = [default_index_id] + index_labels = [None] + elif len(index_cols) != 0: index_labels = typing.cast(List[Optional[str]], index_cols) distinct_table = table_expression.select(*index_cols).distinct() is_unique_sql = f"""WITH full_table AS ( diff --git a/noxfile.py b/noxfile.py index 84e5ab11bb..54ccdb9a87 100644 --- a/noxfile.py +++ b/noxfile.py @@ -89,7 +89,6 @@ "system", "doctest", "cover", - "release_dry_run", ] # Error if a python version is missing diff --git a/tests/system/conftest.py b/tests/system/conftest.py index ed22a3e8da..f36a29b0ab 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -17,6 +17,7 @@ import logging import math import pathlib +import textwrap import typing from typing import Dict, Optional @@ -795,6 +796,36 @@ def penguins_randomforest_classifier_model_name( return model_name +@pytest.fixture(scope="session") +def usa_names_grouped_table( + session: bigframes.Session, dataset_id_permanent +) -> bigquery.Table: + """Provides a table with primary key(s) set.""" + table_id = f"{dataset_id_permanent}.usa_names_grouped" + try: + return session.bqclient.get_table(table_id) + except google.cloud.exceptions.NotFound: + query = textwrap.dedent( + f""" + CREATE TABLE `{dataset_id_permanent}.usa_names_grouped` + ( + total_people INT64, + name STRING, + gender STRING, + year INT64, + PRIMARY KEY(name, gender, year) NOT ENFORCED + ) + AS + SELECT SUM(`number`) AS total_people, name, gender, year + FROM `bigquery-public-data.usa_names.usa_1910_2013` + GROUP BY name, gender, year + """ + ) + job = session.bqclient.query(query) + job.result() + return session.bqclient.get_table(table_id) + + @pytest.fixture() def deferred_repr(): bigframes.options.display.repr_mode = "deferred" diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index bfe9bc8d0f..127a88a760 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -20,6 +20,7 @@ from typing import List import google.api_core.exceptions +import google.cloud.bigquery as bigquery import numpy as np import pandas as pd import pytest @@ -231,6 +232,30 @@ def test_read_gbq_w_anonymous_query_results_table(session: bigframes.Session): pd.testing.assert_frame_equal(result, expected, check_dtype=False) +def test_read_gbq_w_primary_keys_table( + session: bigframes.Session, usa_names_grouped_table: bigquery.Table +): + table = usa_names_grouped_table + # TODO(b/305264153): Use public properties to fetch primary keys once + # added to google-cloud-bigquery. + primary_keys = ( + table._properties.get("tableConstraints", {}) + .get("primaryKey", {}) + .get("columns") + ) + assert len(primary_keys) != 0 + + df = session.read_gbq(f"{table.project}.{table.dataset_id}.{table.table_id}") + result = df.head(100).to_pandas() + + # Verify that the DataFrame is already sorted by primary keys. + sorted_result = result.sort_values(primary_keys) + pd.testing.assert_frame_equal(result, sorted_result) + + # Verify that we're working from a snapshot rather than a copy of the table. + assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql + + @pytest.mark.parametrize( ("query_or_table", "max_results"), [