From b3039995c7f5d14649ac5c39023d0a39c9fd9df4 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 19 Apr 2024 21:32:08 +0000 Subject: [PATCH 01/24] docs: document index as a best practice --- bigframes/exceptions.py | 4 + bigframes/session/__init__.py | 25 ++--- setup.py | 2 +- testing/constraints-3.9.txt | 2 +- tests/system/small/test_session.py | 8 +- tests/unit/resources.py | 7 +- tests/unit/session/test_session.py | 98 +++++++++++++++++++ .../bigframes_vendored/pandas/io/gbq.py | 15 ++- 8 files changed, 133 insertions(+), 28 deletions(-) diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 62122e79d2..0d9627d3b3 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -15,3 +15,7 @@ class UnknownLocationWarning(Warning): """The location is set to an unknown value.""" + + +class NoDefaultIndexError(ValueError): + """Unable to create a default index.""" diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b6d56006be..9f21814b6c 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -708,13 +708,15 @@ def _get_snapshot_sql_and_primary_key( f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" ) - # TODO(b/305264153): Use public properties to fetch primary keys once - # added to google-cloud-bigquery. - primary_keys = ( - table._properties.get("tableConstraints", {}) - .get("primaryKey", {}) - .get("columns") - ) + primary_keys = None + if ( + (table_constraints := table.table_constraints) is not None + and (primary_key := table_constraints.primary_key) is not None + # This will be False for either None or empty list. + # We want primary_keys = None if no primary keys are set. + and (columns := primary_key.columns) + ): + primary_keys = columns job_config = bigquery.QueryJobConfig() job_config.labels["bigframes-api"] = api_name @@ -777,12 +779,13 @@ def _read_gbq_table( query, default_project=self.bqclient.project ) - ( - table_expression, - total_ordering_cols, - ) = self._get_snapshot_sql_and_primary_key( + (table_expression, primary_keys,) = self._get_snapshot_sql_and_primary_key( table_ref, api_name=api_name, use_cache=use_cache ) + total_ordering_cols = primary_keys + + if not index_col and primary_keys is not None: + index_col = primary_keys for key in columns: if key not in table_expression.columns: diff --git a/setup.py b/setup.py index 83049f9715..2ccf63259c 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,7 @@ "gcsfs >=2023.3.0", "geopandas >=0.12.2", "google-auth >=2.15.0,<3.0dev", - "google-cloud-bigquery[bqstorage,pandas] >=3.10.0", + "google-cloud-bigquery[bqstorage,pandas] >=3.16.0", "google-cloud-functions >=1.12.0", "google-cloud-bigquery-connection >=1.12.0", "google-cloud-iam >=2.12.1", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index 1e1f3a3e66..f5007ed564 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -4,7 +4,7 @@ fsspec==2023.3.0 gcsfs==2023.3.0 geopandas==0.12.2 google-auth==2.15.0 -google-cloud-bigquery==3.10.0 +google-cloud-bigquery==3.16.0 google-cloud-functions==1.12.0 google-cloud-bigquery-connection==1.12.0 google-cloud-iam==2.12.1 diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index ce415f9324..e79a16085e 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -237,13 +237,7 @@ def test_read_gbq_w_primary_keys_table( session: bigframes.Session, usa_names_grouped_table: bigquery.Table ): table = usa_names_grouped_table - # TODO(b/305264153): Use public properties to fetch primary keys once - # added to google-cloud-bigquery. - primary_keys = ( - table._properties.get("tableConstraints", {}) - .get("primaryKey", {}) - .get("columns") - ) + primary_keys = table.table_constraints.primary_key.columns assert len(primary_keys) != 0 df = session.read_gbq(f"{table.project}.{table.dataset_id}.{table.table_id}") diff --git a/tests/unit/resources.py b/tests/unit/resources.py index 6846659930..28b08e49dc 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -13,7 +13,7 @@ # limitations under the License. import datetime -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Sequence import unittest.mock as mock import google.auth.credentials @@ -37,6 +37,7 @@ def create_bigquery_session( bqclient: Optional[mock.Mock] = None, session_id: str = "abcxyz", + table_schema: Sequence[google.cloud.bigquery.SchemaField] = TEST_SCHEMA, anonymous_dataset: Optional[google.cloud.bigquery.DatasetReference] = None, ) -> bigframes.Session: credentials = mock.create_autospec( @@ -51,7 +52,7 @@ def create_bigquery_session( table = mock.create_autospec(google.cloud.bigquery.Table, instance=True) table._properties = {} type(table).location = mock.PropertyMock(return_value="test-region") - type(table).schema = mock.PropertyMock(return_value=TEST_SCHEMA) + type(table).schema = mock.PropertyMock(return_value=table_schema) bqclient.get_table.return_value = table if anonymous_dataset is None: @@ -72,7 +73,7 @@ def query_mock(query, *args, **kwargs): if query.startswith("SELECT CURRENT_TIMESTAMP()"): query_job.result = mock.MagicMock(return_value=[[datetime.datetime.now()]]) else: - type(query_job).schema = mock.PropertyMock(return_value=TEST_SCHEMA) + type(query_job).schema = mock.PropertyMock(return_value=table_schema) return query_job diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 3e2b28c200..a14029d83f 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -19,9 +19,11 @@ import google.api_core.exceptions import google.cloud.bigquery +import google.cloud.bigquery.table import pytest import bigframes +import bigframes.exceptions from .. import resources @@ -50,6 +52,102 @@ def test_read_gbq_cached_table(): assert "1999-01-02T03:04:05.678901" in df.sql +# START: Tests for NoDefaultIndexError on clustered/partitioned tables + + +def test_read_gbq_clustered_table_raises_no_default_index_error(): + """Because of the windowing operation to create a default index, row + filters can't push down to the clustering column. + + Raise an exception in this case so that the user is directed to supply a + unique index column or filter if possible. + + See internal issue 335727141. + """ + table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") + table.clustering_fields = ["col1", "col2"] + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + session = resources.create_bigquery_session(bqclient=bqclient) + table._properties["location"] = session._location + + with pytest.raises(bigframes.exceptions.NoDefaultIndexError): + session.read_gbq("my-project.my_dataset.my_table") + + +def test_read_gbq_clustered_table_ok_default_index_with_primary_key(): + """If a primary key is set on the table, we use that as the index column + by default, no error should be raised in this case. + + See internal issue 335727141. + """ + table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") + table.clustering_fields = ["col1", "col2"] + table.schema = ( + google.cloud.bigquery.SchemaField("pk_1", "INT64"), + google.cloud.bigquery.SchemaField("pk_2", "INT64"), + google.cloud.bigquery.SchemaField("col_1", "INT64"), + google.cloud.bigquery.SchemaField("col_2", "INT64"), + ) + + # TODO(b/305264153): use setter for table_constraints in client library + # when available. + table._properties["tableConstraints"] = { + "primaryKey": { + "columns": ["pk_1", "pk_2"], + }, + } + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + session = resources.create_bigquery_session( + bqclient=bqclient, table_schema=table.schema + ) + table._properties["location"] = session._location + + # No exception raised because there is a primary key to use as the index. + df = session.read_gbq("my-project.my_dataset.my_table") + assert "OVER" not in df.sql + assert tuple(df.index.names) == ("pk_1", "pk_2") + + +# def test_read_gbq_clustered_table_ok_default_index_with_filters(): +# bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) +# bqclient.project = "test-project" +# bqclient.get_table.side_effect = google.api_core.exceptions.NotFound( +# "table not found" +# ) +# session = resources.create_bigquery_session(bqclient=bqclient) +# +# with pytest.raises(google.api_core.exceptions.NotFound): +# session.read_gbq(not_found_table_id) +# +# +# def test_read_gbq_clustered_table_ok_default_index_with_force_true(): +# bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) +# bqclient.project = "test-project" +# bqclient.get_table.side_effect = google.api_core.exceptions.NotFound( +# "table not found" +# ) +# session = resources.create_bigquery_session(bqclient=bqclient) +# +# with pytest.raises(google.api_core.exceptions.NotFound): +# session.read_gbq(not_found_table_id) + + +# def test_read_gbq_partitioned_table_raises_no_default_index(): +# bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) +# bqclient.project = "test-project" +# bqclient.get_table.side_effect = google.api_core.exceptions.NotFound( +# "table not found" +# ) +# session = resources.create_bigquery_session(bqclient=bqclient) +# +# with pytest.raises(google.api_core.exceptions.NotFound): +# session.read_gbq(not_found_table_id) + + @pytest.mark.parametrize( "not_found_table_id", [("unknown.dataset.table"), ("project.unknown.table"), ("project.dataset.unknown")], diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index b5feeb13c5..0afa997acf 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -28,12 +28,14 @@ def read_gbq( """Loads a DataFrame from BigQuery. BigQuery tables are an unordered, unindexed data source. By default, - the DataFrame will have an arbitrary index and ordering. + the DataFrame will have an arbitrary index and ordering. Generating + the default index uses an analytic windowed operation that prevents + many filtering push down operations. As a best practice, set the + ``index_col`` argument to one or more columns, especially on large + tables. - Set the `index_col` argument to one or more columns to choose an - index. The resulting DataFrame is sorted by the index columns. For the - best performance, ensure the index columns don't contain duplicate - values. + Duplicate keys in an index are valid, but for the best performance, + ensure the index columns don't contain duplicate values. .. note:: By default, even SQL query inputs with an ORDER BY clause create a @@ -105,6 +107,9 @@ def read_gbq( In tha case, will read all the matched table as one DataFrame. index_col (Iterable[str] or str): Name of result column(s) to use for index in results DataFrame. + + **New in bigframes version 1.3.0**: If not set, the primary + key(s) on the table are used if available. columns (Iterable[str]): List of BigQuery column names in the desired order for results DataFrame. From 0ddd86b062d124a754dcb8087d114fbce7901970 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 19 Apr 2024 21:32:08 +0000 Subject: [PATCH 02/24] docs: set `index_cols` in `read_gbq` as a best practice --- third_party/bigframes_vendored/pandas/io/gbq.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index b5feeb13c5..520092204a 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -28,12 +28,14 @@ def read_gbq( """Loads a DataFrame from BigQuery. BigQuery tables are an unordered, unindexed data source. By default, - the DataFrame will have an arbitrary index and ordering. - - Set the `index_col` argument to one or more columns to choose an - index. The resulting DataFrame is sorted by the index columns. For the - best performance, ensure the index columns don't contain duplicate - values. + the DataFrame will have an arbitrary index and ordering. Generating + the default index uses an analytic windowed operation that prevents + many filtering push down operations. As a best practice, set the + ``index_col`` argument to one or more columns, especially on large + tables. + + Duplicate keys in an index are valid, but for the best performance, + ensure the index columns don't contain duplicate values. .. note:: By default, even SQL query inputs with an ORDER BY clause create a From 994a8f1bddf490133a9f321f253198ebd9f15bdc Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 19 Apr 2024 21:32:08 +0000 Subject: [PATCH 03/24] feat: support primary key(s) in `read_gbq` by using as the `index_col` by default --- bigframes/exceptions.py | 4 ++ bigframes/session/__init__.py | 25 ++++++------ setup.py | 2 +- testing/constraints-3.9.txt | 2 +- tests/system/small/test_session.py | 8 +--- tests/unit/resources.py | 7 ++-- tests/unit/session/test_session.py | 39 +++++++++++++++++++ .../bigframes_vendored/pandas/io/gbq.py | 3 ++ 8 files changed, 67 insertions(+), 23 deletions(-) diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 62122e79d2..0d9627d3b3 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -15,3 +15,7 @@ class UnknownLocationWarning(Warning): """The location is set to an unknown value.""" + + +class NoDefaultIndexError(ValueError): + """Unable to create a default index.""" diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b6d56006be..12420bd637 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -708,13 +708,15 @@ def _get_snapshot_sql_and_primary_key( f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" ) - # TODO(b/305264153): Use public properties to fetch primary keys once - # added to google-cloud-bigquery. - primary_keys = ( - table._properties.get("tableConstraints", {}) - .get("primaryKey", {}) - .get("columns") - ) + primary_keys = None + if ( + (table_constraints := getattr(table, "table_constraints", None)) is not None + and (primary_key := table_constraints.primary_key) is not None + # This will be False for either None or empty list. + # We want primary_keys = None if no primary keys are set. + and (columns := primary_key.columns) + ): + primary_keys = columns job_config = bigquery.QueryJobConfig() job_config.labels["bigframes-api"] = api_name @@ -777,12 +779,13 @@ def _read_gbq_table( query, default_project=self.bqclient.project ) - ( - table_expression, - total_ordering_cols, - ) = self._get_snapshot_sql_and_primary_key( + (table_expression, primary_keys,) = self._get_snapshot_sql_and_primary_key( table_ref, api_name=api_name, use_cache=use_cache ) + total_ordering_cols = primary_keys + + if not index_col and primary_keys is not None: + index_col = primary_keys for key in columns: if key not in table_expression.columns: diff --git a/setup.py b/setup.py index 83049f9715..2ccf63259c 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,7 @@ "gcsfs >=2023.3.0", "geopandas >=0.12.2", "google-auth >=2.15.0,<3.0dev", - "google-cloud-bigquery[bqstorage,pandas] >=3.10.0", + "google-cloud-bigquery[bqstorage,pandas] >=3.16.0", "google-cloud-functions >=1.12.0", "google-cloud-bigquery-connection >=1.12.0", "google-cloud-iam >=2.12.1", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index 1e1f3a3e66..f5007ed564 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -4,7 +4,7 @@ fsspec==2023.3.0 gcsfs==2023.3.0 geopandas==0.12.2 google-auth==2.15.0 -google-cloud-bigquery==3.10.0 +google-cloud-bigquery==3.16.0 google-cloud-functions==1.12.0 google-cloud-bigquery-connection==1.12.0 google-cloud-iam==2.12.1 diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index ce415f9324..e79a16085e 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -237,13 +237,7 @@ def test_read_gbq_w_primary_keys_table( session: bigframes.Session, usa_names_grouped_table: bigquery.Table ): table = usa_names_grouped_table - # TODO(b/305264153): Use public properties to fetch primary keys once - # added to google-cloud-bigquery. - primary_keys = ( - table._properties.get("tableConstraints", {}) - .get("primaryKey", {}) - .get("columns") - ) + primary_keys = table.table_constraints.primary_key.columns assert len(primary_keys) != 0 df = session.read_gbq(f"{table.project}.{table.dataset_id}.{table.table_id}") diff --git a/tests/unit/resources.py b/tests/unit/resources.py index 6846659930..28b08e49dc 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -13,7 +13,7 @@ # limitations under the License. import datetime -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Sequence import unittest.mock as mock import google.auth.credentials @@ -37,6 +37,7 @@ def create_bigquery_session( bqclient: Optional[mock.Mock] = None, session_id: str = "abcxyz", + table_schema: Sequence[google.cloud.bigquery.SchemaField] = TEST_SCHEMA, anonymous_dataset: Optional[google.cloud.bigquery.DatasetReference] = None, ) -> bigframes.Session: credentials = mock.create_autospec( @@ -51,7 +52,7 @@ def create_bigquery_session( table = mock.create_autospec(google.cloud.bigquery.Table, instance=True) table._properties = {} type(table).location = mock.PropertyMock(return_value="test-region") - type(table).schema = mock.PropertyMock(return_value=TEST_SCHEMA) + type(table).schema = mock.PropertyMock(return_value=table_schema) bqclient.get_table.return_value = table if anonymous_dataset is None: @@ -72,7 +73,7 @@ def query_mock(query, *args, **kwargs): if query.startswith("SELECT CURRENT_TIMESTAMP()"): query_job.result = mock.MagicMock(return_value=[[datetime.datetime.now()]]) else: - type(query_job).schema = mock.PropertyMock(return_value=TEST_SCHEMA) + type(query_job).schema = mock.PropertyMock(return_value=table_schema) return query_job diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 3e2b28c200..543196066a 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -19,9 +19,11 @@ import google.api_core.exceptions import google.cloud.bigquery +import google.cloud.bigquery.table import pytest import bigframes +import bigframes.exceptions from .. import resources @@ -50,6 +52,43 @@ def test_read_gbq_cached_table(): assert "1999-01-02T03:04:05.678901" in df.sql +def test_read_gbq_clustered_table_ok_default_index_with_primary_key(): + """If a primary key is set on the table, we use that as the index column + by default, no error should be raised in this case. + + See internal issue 335727141. + """ + table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") + table.clustering_fields = ["col1", "col2"] + table.schema = ( + google.cloud.bigquery.SchemaField("pk_1", "INT64"), + google.cloud.bigquery.SchemaField("pk_2", "INT64"), + google.cloud.bigquery.SchemaField("col_1", "INT64"), + google.cloud.bigquery.SchemaField("col_2", "INT64"), + ) + + # TODO(b/305264153): use setter for table_constraints in client library + # when available. + table._properties["tableConstraints"] = { + "primaryKey": { + "columns": ["pk_1", "pk_2"], + }, + } + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + session = resources.create_bigquery_session( + bqclient=bqclient, table_schema=table.schema + ) + table._properties["location"] = session._location + + df = session.read_gbq("my-project.my_dataset.my_table") + + # There should be no analytic operators to prevent row filtering pushdown. + assert "OVER" not in df.sql + assert tuple(df.index.names) == ("pk_1", "pk_2") + + @pytest.mark.parametrize( "not_found_table_id", [("unknown.dataset.table"), ("project.unknown.table"), ("project.dataset.unknown")], diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index b5feeb13c5..0e373e88af 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -105,6 +105,9 @@ def read_gbq( In tha case, will read all the matched table as one DataFrame. index_col (Iterable[str] or str): Name of result column(s) to use for index in results DataFrame. + + **New in bigframes version 1.3.0**: If ``index_cols`` is not + set, the primary key(s) of the table are used as the index. columns (Iterable[str]): List of BigQuery column names in the desired order for results DataFrame. From 5fcc5a026679a5e88864a9e0427f599706c8c840 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 19 Apr 2024 21:53:37 +0000 Subject: [PATCH 04/24] revert WIP commit --- bigframes/exceptions.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 0d9627d3b3..62122e79d2 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -15,7 +15,3 @@ class UnknownLocationWarning(Warning): """The location is set to an unknown value.""" - - -class NoDefaultIndexError(ValueError): - """Unable to create a default index.""" From 8c4e31c4eaf6559965dfbc00721c91c3af9dc090 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 22 Apr 2024 16:18:45 +0000 Subject: [PATCH 05/24] address type error in tests --- tests/system/small/test_session.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index e79a16085e..1e76a8bd8b 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -236,8 +236,13 @@ def test_read_gbq_w_anonymous_query_results_table(session: bigframes.Session): def test_read_gbq_w_primary_keys_table( session: bigframes.Session, usa_names_grouped_table: bigquery.Table ): + # Validate that the table we're querying has a primary key. table = usa_names_grouped_table - primary_keys = table.table_constraints.primary_key.columns + table_constraints = table.table_constraints + assert table_constraints is not None + primary_key = table_constraints.primary_key + assert primary_key is not None + primary_keys = primary_key.columns assert len(primary_keys) != 0 df = session.read_gbq(f"{table.project}.{table.dataset_id}.{table.table_id}") From b96cba38b29ced8a2814f4b2e1ecad2c72a9d6e0 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 22 Apr 2024 18:52:33 +0000 Subject: [PATCH 06/24] document behaviors --- .../bigframes_vendored/pandas/io/gbq.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 520092204a..c60a276338 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -27,15 +27,17 @@ def read_gbq( ): """Loads a DataFrame from BigQuery. - BigQuery tables are an unordered, unindexed data source. By default, - the DataFrame will have an arbitrary index and ordering. Generating - the default index uses an analytic windowed operation that prevents - many filtering push down operations. As a best practice, set the - ``index_col`` argument to one or more columns, especially on large - tables. - - Duplicate keys in an index are valid, but for the best performance, - ensure the index columns don't contain duplicate values. + BigQuery tables are an unordered, unindexed data source. To add support + pandas-compatibility, the following indexing options are supported: + + * (Default behavior) Add an arbitrary sequential index and ordering + using an an analytic windowed operation that prevents filtering + push down. + * (Recommended) Set the ``index_col`` argument to one or more columns. + Unique values for the row labels are recommended. Duplicate labels + are possible, but note that joins on a non-unique index can duplicate + rows and operations like ``cumsum()`` that window across a non-unique + index can have some non-deternimism. .. note:: By default, even SQL query inputs with an ORDER BY clause create a From 477a51618caf7aae26bf488eb633ac57cef7165d Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 23 Apr 2024 17:00:35 +0000 Subject: [PATCH 07/24] update docs to reflect new default index behavior --- bigframes/__init__.py | 4 +++ bigframes/enums.py | 29 +++++++++++++++ bigframes/exceptions.py | 5 +++ docs/reference/bigframes/enums.rst | 8 +++++ docs/reference/bigframes/exceptions.rst | 8 +++++ docs/reference/bigframes/index.rst | 2 ++ docs/templates/toc.yml | 4 +++ tests/unit/session/test_session.py | 24 +++++++++++++ .../bigframes_vendored/pandas/io/gbq.py | 35 +++++++++++++++---- 9 files changed, 112 insertions(+), 7 deletions(-) create mode 100644 bigframes/enums.py create mode 100644 docs/reference/bigframes/enums.rst create mode 100644 docs/reference/bigframes/exceptions.rst diff --git a/bigframes/__init__.py b/bigframes/__init__.py index bd1476957b..240608ebc2 100644 --- a/bigframes/__init__.py +++ b/bigframes/__init__.py @@ -17,6 +17,8 @@ from bigframes._config import option_context, options from bigframes._config.bigquery_options import BigQueryOptions from bigframes.core.global_session import close_session, get_global_session +import bigframes.enums as enums +import bigframes.exceptions as exceptions from bigframes.session import connect, Session from bigframes.version import __version__ @@ -25,6 +27,8 @@ "BigQueryOptions", "get_global_session", "close_session", + "enums", + "exceptions", "connect", "Session", "__version__", diff --git a/bigframes/enums.py b/bigframes/enums.py new file mode 100644 index 0000000000..742628a3bc --- /dev/null +++ b/bigframes/enums.py @@ -0,0 +1,29 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Public enums used across BigQuery DataFrames. + +This module should not depend on any others in the package. +""" + +import enum + + +class IndexKind(enum.Enum): + """Sentinel values used to override default indexing behavior.""" + + #: Use consecutive integers as the index. This is ``0``, ``1``, ``2``, ..., + #: ``n - 3``, ``n - 2``, ``n - 1``, where ``n`` is the number of items in + #: the index. + SEQUENTIAL_INT64 = enum.auto() diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 0d9627d3b3..b2efd31c83 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -12,6 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Public exceptions and warnings used across BigQuery DataFrames. + +This module should not depend on any others in the package. +""" + class UnknownLocationWarning(Warning): """The location is set to an unknown value.""" diff --git a/docs/reference/bigframes/enums.rst b/docs/reference/bigframes/enums.rst new file mode 100644 index 0000000000..b0a198e184 --- /dev/null +++ b/docs/reference/bigframes/enums.rst @@ -0,0 +1,8 @@ + +===== +Enums +===== + +.. automodule:: bigframes.enums + :members: + :undoc-members: diff --git a/docs/reference/bigframes/exceptions.rst b/docs/reference/bigframes/exceptions.rst new file mode 100644 index 0000000000..c471aecdf7 --- /dev/null +++ b/docs/reference/bigframes/exceptions.rst @@ -0,0 +1,8 @@ + +======================= +Exceptions and Warnings +======================= + +.. automodule:: bigframes.exceptions + :members: + :undoc-members: diff --git a/docs/reference/bigframes/index.rst b/docs/reference/bigframes/index.rst index 76d64444fa..17d03346c6 100644 --- a/docs/reference/bigframes/index.rst +++ b/docs/reference/bigframes/index.rst @@ -5,6 +5,8 @@ Core objects .. toctree:: :maxdepth: 2 + enums + exceptions options diff --git a/docs/templates/toc.yml b/docs/templates/toc.yml index 4573296ec3..511a18001d 100644 --- a/docs/templates/toc.yml +++ b/docs/templates/toc.yml @@ -32,6 +32,10 @@ - name: Session uid: bigframes.session.Session name: Session + - name: Enumerations + uid: bigframes.enums + - name: Exceptions and warnings + uid: bigframes.exceptions name: Core Objects - items: - name: DataFrame diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index ae7fc5d49b..719526fa3e 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -76,6 +76,30 @@ def test_read_gbq_clustered_table_raises_no_default_index_error(): session.read_gbq("my-project.my_dataset.my_table") +def test_read_gbq_clustered_table_ok_default_index_with_force_true(): + """Because of the windowing operation to create a default index, row + filters can't push down to the clustering column. + + Raise an exception in this case so that the user is directed to supply a + unique index column or filter if possible. + + See internal issue 335727141. + """ + table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") + table.clustering_fields = ["col1", "col2"] + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + session = resources.create_bigquery_session(bqclient=bqclient) + table._properties["location"] = session._location + + # No exception raised because we set the option allowing the default indexes. + with bigframes.option_context( + "compute.allow_default_index_on_clustered_partitioned_tables", True + ): + session.read_gbq("my-project.my_dataset.my_table") + + def test_read_gbq_clustered_table_ok_default_index_with_primary_key(): """If a primary key is set on the table, we use that as the index column by default, no error should be raised in this case. diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 93cee71289..f9c9ce953b 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -6,6 +6,7 @@ from typing import Any, Dict, Iterable, Literal, Optional, Tuple, Union from bigframes import constants +import bigframes.enums FilterOps = Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">", "LIKE"] FilterType = Tuple[str, FilterOps, Any] @@ -17,7 +18,7 @@ def read_gbq( self, query_or_table: str, *, - index_col: Iterable[str] | str = (), + index_col: Union[Iterable[str], str, bigframes.enums.IndexKind] = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -28,11 +29,19 @@ def read_gbq( """Loads a DataFrame from BigQuery. BigQuery tables are an unordered, unindexed data source. To add support - pandas-compatibility, the following indexing options are supported: - - * (Default behavior) Add an arbitrary sequential index and ordering - using an an analytic windowed operation that prevents filtering - push down. + pandas-compatibility, the following indexing options are supported via + the ``index_col`` parameter: + + * (Empty iterable, default) A default index. **Behavior may change.** + Explicitly set ``index_col`` if your application makes use of + specific index values. + + If a table has primary key(s), those are used as the index, + otherwise a sequential index is generated. + * (:attr:`bigframes.enums.IndexKind.SEQUENTIAL_INT64`) Add an arbitrary + sequential index and ordering. **Warning** This uses an an analytic + windowed operation that prevents filtering push down. Do not use on + large clustered or partitioned tables. * (Recommended) Set the ``index_col`` argument to one or more columns. Unique values for the row labels are recommended. Duplicate labels are possible, but note that joins on a non-unique index can duplicate @@ -107,11 +116,18 @@ def read_gbq( `project.dataset.tablename` or `dataset.tablename`. Can also take wildcard table name, such as `project.dataset.table_prefix*`. In tha case, will read all the matched table as one DataFrame. - index_col (Iterable[str] or str): + index_col (Iterable[str], str, bigframes.enums.IndexKind): Name of result column(s) to use for index in results DataFrame. + If an empty iterable, such as ``()``, a default index is + generated. Do not depend on specific index values in this case. + **New in bigframes version 1.3.0**: If ``index_cols`` is not set, the primary key(s) of the table are used as the index. + + **New in bigframes version 1.4.0**: Support + :class:`bigframes.enums.TypeKind` to override default index + behavior. columns (Iterable[str]): List of BigQuery column names in the desired order for results DataFrame. @@ -141,6 +157,11 @@ def read_gbq( col_order (Iterable[str]): Alias for columns, retained for backwards compatibility. + Raises: + bigframes.exceptions.NoDefaultIndexError: + Using the default index is discouraged, such as with clustered + or partitioned tables without primary keys. + Returns: bigframes.dataframe.DataFrame: A DataFrame representing results of the query or table. """ From 2c5a0dd9d23997c6c1015a83e3d75d4b3a35a3eb Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 24 Apr 2024 16:06:58 +0000 Subject: [PATCH 08/24] add DefaultIndexKind to allowed `index_col` values --- bigframes/enums.py | 2 +- bigframes/pandas/__init__.py | 15 +++++-- bigframes/session/__init__.py | 41 +++++++++++++------ tests/unit/session/test_session.py | 16 +++++--- .../bigframes_vendored/pandas/io/gbq.py | 15 +++---- .../pandas/io/parsers/readers.py | 9 +++- 6 files changed, 68 insertions(+), 30 deletions(-) diff --git a/bigframes/enums.py b/bigframes/enums.py index 742628a3bc..c6821bebba 100644 --- a/bigframes/enums.py +++ b/bigframes/enums.py @@ -20,7 +20,7 @@ import enum -class IndexKind(enum.Enum): +class DefaultIndexKind(enum.Enum): """Sentinel values used to override default indexing behavior.""" #: Use consecutive integers as the index. This is ``0``, ``1``, ``2``, ..., diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 96af6ab1b3..6807a721cd 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -63,6 +63,7 @@ import bigframes.core.reshape import bigframes.core.tools import bigframes.dataframe +import bigframes.enums import bigframes.operations as ops import bigframes.series import bigframes.session @@ -423,7 +424,13 @@ def read_csv( Union[MutableSequence[Any], numpy.ndarray[Any, Any], Tuple[Any, ...], range] ] = None, index_col: Optional[ - Union[int, str, Sequence[Union[str, int]], Literal[False]] + Union[ + int, + str, + Sequence[Union[str, int]], + bigframes.enums.DefaultIndexKind, + Literal[False], + ] ] = None, usecols: Optional[ Union[ @@ -491,7 +498,7 @@ def read_json( def read_gbq( query_or_table: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -529,7 +536,7 @@ def read_gbq_model(model_name: str): def read_gbq_query( query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -555,7 +562,7 @@ def read_gbq_query( def read_gbq_table( query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), max_results: Optional[int] = None, filters: vendored_pandas_gbq.FiltersType = (), diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index f3f1ffce16..98fea57133 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -294,7 +294,7 @@ def read_gbq( self, query_or_table: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -484,7 +484,7 @@ def read_gbq_query( self, query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -562,7 +562,7 @@ def _read_gbq_query( self, query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -594,7 +594,9 @@ def _read_gbq_query( True if use_cache is None else use_cache ) - if isinstance(index_col, str): + if isinstance(index_col, bigframes.enums.DefaultIndexKind): + index_cols = [] + elif isinstance(index_col, str): index_cols = [index_col] else: index_cols = list(index_col) @@ -624,7 +626,7 @@ def _read_gbq_query( return self.read_gbq_table( f"{destination.project}.{destination.dataset_id}.{destination.table_id}", - index_col=index_cols, + index_col=index_col, columns=columns, max_results=max_results, use_cache=configuration["query"]["useQueryCache"], @@ -634,7 +636,7 @@ def read_gbq_table( self, query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), max_results: Optional[int] = None, filters: third_party_pandas_gbq.FiltersType = (), @@ -764,7 +766,7 @@ def _read_gbq_table( self, query: str, *, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), max_results: Optional[int] = None, api_name: str, @@ -793,8 +795,10 @@ def _read_gbq_table( f"Column '{key}' of `columns` not found in this table." ) - if isinstance(index_col, str): - index_cols: List[str] = [index_col] + if isinstance(index_col, bigframes.enums.DefaultIndexKind): + index_cols: List[str] = [] + elif isinstance(index_col, str): + index_cols = [index_col] else: index_cols = list(index_col) @@ -899,10 +903,12 @@ def _read_bigquery_load_job( table: Union[bigquery.Table, bigquery.TableReference], *, job_config: bigquery.LoadJobConfig, - index_col: Iterable[str] | str = (), + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), ) -> dataframe.DataFrame: - if isinstance(index_col, str): + if isinstance(index_col, bigframes.enums.DefaultIndexKind): + index_cols = [] + elif isinstance(index_col, str): index_cols = [index_col] else: index_cols = list(index_col) @@ -1191,7 +1197,13 @@ def read_csv( Union[MutableSequence[Any], np.ndarray[Any, Any], Tuple[Any, ...], range] ] = None, index_col: Optional[ - Union[int, str, Sequence[Union[str, int]], Literal[False]] + Union[ + int, + str, + Sequence[Union[str, int]], + bigframes.enums.DefaultIndexKind, + Literal[False], + ] ] = None, usecols: Optional[ Union[ @@ -1277,6 +1289,11 @@ def read_csv( columns=columns, ) else: + if isinstance(index_col, bigframes.enums.DefaultIndexKind): + raise NotImplementedError( + f"With index_col={repr(index_col)}, only engine='bigquery' is supported. " + f"{constants.FEEDBACK_LINK}" + ) if any(arg in kwargs for arg in ("chunksize", "iterator")): raise NotImplementedError( "'chunksize' and 'iterator' arguments are not supported. " diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 719526fa3e..687b9e7ff0 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -23,6 +23,7 @@ import pytest import bigframes +import bigframes.enums import bigframes.exceptions from .. import resources @@ -94,10 +95,15 @@ def test_read_gbq_clustered_table_ok_default_index_with_force_true(): table._properties["location"] = session._location # No exception raised because we set the option allowing the default indexes. - with bigframes.option_context( - "compute.allow_default_index_on_clustered_partitioned_tables", True - ): - session.read_gbq("my-project.my_dataset.my_table") + df = session.read_gbq( + "my-project.my_dataset.my_table", + index_col=bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64, + ) + + # We expect a window operation because we specificaly requested a sequential index. + generated_sql = df.sql.casefold() + assert "OVER".casefold() in generated_sql + assert "ROW_NUMBER()".casefold() in generated_sql def test_read_gbq_clustered_table_ok_default_index_with_primary_key(): @@ -134,7 +140,7 @@ def test_read_gbq_clustered_table_ok_default_index_with_primary_key(): df = session.read_gbq("my-project.my_dataset.my_table") # There should be no analytic operators to prevent row filtering pushdown. - assert "OVER" not in df.sql + assert "OVER".casefold() not in df.sql.casefold() assert tuple(df.index.names) == ("pk_1", "pk_2") diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index f9c9ce953b..0dc94138b5 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -18,7 +18,7 @@ def read_gbq( self, query_or_table: str, *, - index_col: Union[Iterable[str], str, bigframes.enums.IndexKind] = (), + index_col: Union[Iterable[str], str, bigframes.enums.DefaultIndexKind] = (), columns: Iterable[str] = (), configuration: Optional[Dict] = None, max_results: Optional[int] = None, @@ -38,15 +38,16 @@ def read_gbq( If a table has primary key(s), those are used as the index, otherwise a sequential index is generated. - * (:attr:`bigframes.enums.IndexKind.SEQUENTIAL_INT64`) Add an arbitrary - sequential index and ordering. **Warning** This uses an an analytic - windowed operation that prevents filtering push down. Do not use on - large clustered or partitioned tables. + * (:attr:`bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64`) Add an + arbitrary sequential index and ordering. **Warning** This uses an + analytic windowed operation that prevents filtering push down. Avoid + using on large clustered or partitioned tables. * (Recommended) Set the ``index_col`` argument to one or more columns. Unique values for the row labels are recommended. Duplicate labels are possible, but note that joins on a non-unique index can duplicate - rows and operations like ``cumsum()`` that window across a non-unique - index can have some non-deternimism. + rows via pandas-like outer join behavior. Operations like + ``cumsum()`` that window across a non-unique index can have some + unpredictability due to ambiguous ordering. .. note:: By default, even SQL query inputs with an ORDER BY clause create a diff --git a/third_party/bigframes_vendored/pandas/io/parsers/readers.py b/third_party/bigframes_vendored/pandas/io/parsers/readers.py index e8ed6182a6..d147abfd22 100644 --- a/third_party/bigframes_vendored/pandas/io/parsers/readers.py +++ b/third_party/bigframes_vendored/pandas/io/parsers/readers.py @@ -21,6 +21,7 @@ import numpy as np from bigframes import constants +import bigframes.enums class ReaderIOMixin: @@ -34,7 +35,13 @@ def read_csv( Union[MutableSequence[Any], np.ndarray[Any, Any], Tuple[Any, ...], range] ] = None, index_col: Optional[ - Union[int, str, Sequence[Union[str, int]], Literal[False]] + Union[ + int, + str, + Sequence[Union[str, int]], + bigframes.enums.DefaultIndexKind, + Literal[False], + ] ] = None, usecols=None, dtype: Optional[Dict] = None, From d816db38768bc458107afc69c25feea67b0fa0c5 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 24 Apr 2024 20:11:19 +0000 Subject: [PATCH 09/24] refactor: cache table metadata alongside snapshot time This ensures the cached `primary_keys` is more likely to be correct, in case the user called ALTER TABLE after we originally cached the snapshot time. --- bigframes/session/__init__.py | 52 +++++++++------------------- bigframes/session/_io/bigquery.py | 54 ++++++++++++++++++++++++++++++ tests/unit/session/test_session.py | 7 ++-- 3 files changed, 75 insertions(+), 38 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index f3f1ffce16..1a0ea20e55 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -231,7 +231,9 @@ def __init__( # Now that we're starting the session, don't allow the options to be # changed. context._session_started = True - self._df_snapshot: Dict[bigquery.TableReference, datetime.datetime] = {} + self._df_snapshot: Dict[ + bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table] + ] = {} @property def bqclient(self): @@ -698,16 +700,25 @@ def _get_snapshot_sql_and_primary_key( column(s), then return those too so that ordering generation can be avoided. """ - # If there are primary keys defined, the query engine assumes these - # columns are unique, even if the constraint is not enforced. We make - # the same assumption and use these columns as the total ordering keys. - table = self.bqclient.get_table(table_ref) + ( + snapshot_timestamp, + table, + ) = bigframes_io.get_snapshot_datetime_and_table_metadata( + self.bqclient, + table_ref=table_ref, + api_name=api_name, + cache=self._df_snapshot, + use_cache=use_cache, + ) if table.location.casefold() != self._location.casefold(): raise ValueError( f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" ) + # If there are primary keys defined, the query engine assumes these + # columns are unique, even if the constraint is not enforced. We make + # the same assumption and use these columns as the total ordering keys. primary_keys = None if ( (table_constraints := getattr(table, "table_constraints", None)) is not None @@ -718,37 +729,6 @@ def _get_snapshot_sql_and_primary_key( ): primary_keys = columns - job_config = bigquery.QueryJobConfig() - job_config.labels["bigframes-api"] = api_name - if use_cache and table_ref in self._df_snapshot.keys(): - snapshot_timestamp = self._df_snapshot[table_ref] - - # Cache hit could be unexpected. See internal issue 329545805. - # Raise a warning with more information about how to avoid the - # problems with the cache. - warnings.warn( - f"Reading cached table from {snapshot_timestamp} to avoid " - "incompatibilies with previous reads of this table. To read " - "the latest version, set `use_cache=False` or close the " - "current session with Session.close() or " - "bigframes.pandas.close_session().", - # There are many layers before we get to (possibly) the user's code: - # pandas.read_gbq_table - # -> with_default_session - # -> Session.read_gbq_table - # -> _read_gbq_table - # -> _get_snapshot_sql_and_primary_key - stacklevel=6, - ) - else: - snapshot_timestamp = list( - self.bqclient.query( - "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", - job_config=job_config, - ).result() - )[0][0] - self._df_snapshot[table_ref] = snapshot_timestamp - try: table_expression = self.ibis_client.sql( bigframes_io.create_snapshot_sql(table_ref, snapshot_timestamp) diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index ac6ba4bae4..94576cfa12 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -23,6 +23,7 @@ import types from typing import Dict, Iterable, Optional, Sequence, Tuple, Union import uuid +import warnings import google.api_core.exceptions import google.cloud.bigquery as bigquery @@ -121,6 +122,59 @@ def table_ref_to_sql(table: bigquery.TableReference) -> str: return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`" +def get_snapshot_datetime_and_table_metadata( + bqclient: bigquery.Client, + table_ref: bigquery.TableReference, + *, + api_name: str, + cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], + use_cache: bool = True, +) -> Tuple[datetime.datetime, bigquery.Table]: + cached_table = cache.get(table_ref) + if use_cache and cached_table is not None: + snapshot_timestamp, _ = cached_table + + # Cache hit could be unexpected. See internal issue 329545805. + # Raise a warning with more information about how to avoid the + # problems with the cache. + warnings.warn( + f"Reading cached table from {snapshot_timestamp} to avoid " + "incompatibilies with previous reads of this table. To read " + "the latest version, set `use_cache=False` or close the " + "current session with Session.close() or " + "bigframes.pandas.close_session().", + # There are many layers before we get to (possibly) the user's code: + # pandas.read_gbq_table + # -> with_default_session + # -> Session.read_gbq_table + # -> _read_gbq_table + # -> _get_snapshot_sql_and_primary_key + # -> get_snapshot_datetime_and_table_metadata + stacklevel=7, + ) + return cached_table + + # TODO(swast): It's possible that the table metadata is changed between now + # and when we run the CURRENT_TIMESTAMP() query to see when we can time + # travel to. Find a way to fetch the table metadata and BQ's current time + # atomically. + table = bqclient.get_table(table_ref) + + # TODO(b/336521938): Refactor to make sure we set the "bigframes-api" + # whereever we execute a query. + job_config = bigquery.QueryJobConfig() + job_config.labels["bigframes-api"] = api_name + snapshot_timestamp = list( + bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + cached_table = (snapshot_timestamp, table) + cache[table_ref] = cached_table + return cached_table + + def create_snapshot_sql( table_ref: bigquery.TableReference, current_timestamp: datetime.datetime ) -> str: diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 543196066a..4ba47190bd 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -42,8 +42,11 @@ def test_read_gbq_cached_table(): google.cloud.bigquery.DatasetReference("my-project", "my_dataset"), "my_table", ) - session._df_snapshot[table_ref] = datetime.datetime( - 1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc + table = google.cloud.bigquery.Table(table_ref) + table._properties["location"] = session._location + session._df_snapshot[table_ref] = ( + datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc), + table, ) with pytest.warns(UserWarning, match=re.escape("use_cache=False")): From 241dc608aad59bcb8c79bdebcb5740d948ef89f2 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 25 Apr 2024 19:01:10 +0000 Subject: [PATCH 10/24] add unit tests --- bigframes/enums.py | 6 +- bigframes/exceptions.py | 5 +- bigframes/session/__init__.py | 40 +++++- tests/system/small/test_dataframe_io.py | 2 +- tests/system/small/test_session.py | 6 +- tests/unit/session/test_session.py | 161 ++++++++++++++++++------ 6 files changed, 170 insertions(+), 50 deletions(-) diff --git a/bigframes/enums.py b/bigframes/enums.py index c6821bebba..4bec75f5df 100644 --- a/bigframes/enums.py +++ b/bigframes/enums.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Public enums used across BigQuery DataFrames. +"""Public enums used across BigQuery DataFrames.""" + +# NOTE: This module should not depend on any others in the package. -This module should not depend on any others in the package. -""" import enum diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index b2efd31c83..d179914983 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -12,10 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Public exceptions and warnings used across BigQuery DataFrames. +"""Public exceptions and warnings used across BigQuery DataFrames.""" -This module should not depend on any others in the package. -""" +# NOTE: This module should not depend on any others in the package. class UnknownLocationWarning(Warning): diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 3d21d688ed..227156b567 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -766,6 +766,10 @@ def _read_gbq_table( ) total_ordering_cols = primary_keys + # TODO: warn if partitioned and/or clustered except if: + # primary_keys, index_col, or filters + # Except it looks like filters goes through the query path? + if not index_col and primary_keys is not None: index_col = primary_keys @@ -870,7 +874,12 @@ def _check_index_uniqueness( SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, (SELECT COUNT(*) FROM distinct_table) AS `distinct_count` """ - results, _ = self._start_query(is_unique_sql) + # We just need the results, not any job stats, so use query_and_wait, + # which should also be faster for queries with small results such as + # this. + # + # It also happens to be easier to mock out in unit tests. + results = self._start_query_and_wait(is_unique_sql) row = next(iter(results)) total_count = row["total_count"] @@ -1213,12 +1222,17 @@ def read_csv( f"{constants.FEEDBACK_LINK}" ) - if index_col is not None and ( - not index_col or not isinstance(index_col, str) + # TODO(tswast): Looks like we can relax this 1 column restriction, + # but leaving it for now because I'm not sure why we have it. + if ( + # Empty tuples and None are both allowed and falsey + index_col + and not isinstance(index_col, bigframes.enums.DefaultIndexKind) + and not isinstance(index_col, str) ): raise NotImplementedError( - "BigQuery engine only supports a single column name for `index_col`. " - f"{constants.FEEDBACK_LINK}" + "BigQuery engine only supports a single column name for `index_col`, " + f"got: {repr(index_col)}. {constants.FEEDBACK_LINK}" ) # None value for index_col cannot be passed to read_gbq @@ -1782,6 +1796,22 @@ def _prepare_copy_job_config(self) -> bigquery.CopyJobConfig: return job_config + def _start_query_and_wait( + self, + sql: str, + job_config: Optional[bigquery.job.QueryJobConfig] = None, + max_results: Optional[int] = None, + ) -> bigquery.table.RowIterator: + """ + Starts BigQuery query with query_and_wait and waits for results. + """ + job_config = self._prepare_query_job_config(job_config) + return self.bqclient.query_and_wait( + sql, + job_config=job_config, + max_results=max_results, + ) + def _start_query( self, sql: str, diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index f26902f084..4124dfd23a 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -117,7 +117,7 @@ def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index): @pytest.mark.parametrize( - ("index"), + ("index", "index_col"), [True, False], ) def test_to_csv_index( diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 1e76a8bd8b..ec20459c31 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -524,7 +524,11 @@ def test_read_csv_gcs_bq_engine(session, scalars_dfs, gcs_folder): scalars_df, _ = scalars_dfs path = gcs_folder + "test_read_csv_gcs_bq_engine_w_index*.csv" scalars_df.to_csv(path, index=False) - df = session.read_csv(path, engine="bigquery") + df = session.read_csv( + path, + engine="bigquery", + index_col=bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64, + ) # TODO(chelsealin): If we serialize the index, can more easily compare values. pd.testing.assert_index_equal(df.columns, scalars_df.columns) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 952ae06dc9..c8bdc7fc5a 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -29,6 +29,25 @@ from .. import resources +@pytest.mark.parametrize( + ("engine",), + ( + ("c",), + ("python",), + ("pyarrow",), + ), +) +def test_read_csv_pandas_engines_index_col_sequential_int64_not_supported(engine): + session = resources.create_bigquery_session() + + with pytest.raises(NotImplementedError, match="index_col"): + session.read_csv( + "path/to/csv.csv", + engine=engine, + index_col=bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64, + ) + + @pytest.mark.parametrize("missing_parts_table_id", [(""), ("table")]) def test_read_gbq_missing_parts(missing_parts_table_id): session = resources.create_bigquery_session() @@ -56,10 +75,28 @@ def test_read_gbq_cached_table(): assert "1999-01-02T03:04:05.678901" in df.sql -# START: Tests for NoDefaultIndexError on clustered/partitioned tables +def test_no_default_index_error_raised_by_read_gbq_clustered_table(): + """Because of the windowing operation to create a default index, row + filters can't push down to the clustering column. + Raise an exception in this case so that the user is directed to supply a + unique index column or filter if possible. -def test_read_gbq_clustered_table_raises_no_default_index_error(): + See internal issue 335727141. + """ + table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") + table.clustering_fields = ["col1", "col2"] + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + session = resources.create_bigquery_session(bqclient=bqclient) + table._properties["location"] = session._location + + with pytest.raises(bigframes.exceptions.NoDefaultIndexError): + session.read_gbq("my-project.my_dataset.my_table") + + +def test_no_default_index_error_raised_by_read_gbq_range_partitioned_table(): """Because of the windowing operation to create a default index, row filters can't push down to the clustering column. @@ -69,7 +106,9 @@ def test_read_gbq_clustered_table_raises_no_default_index_error(): See internal issue 335727141. """ table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") - table.clustering_fields = ["col1", "col2"] + table.time_partitioning = google.cloud.bigquery.table.RangePartitioning( + field="col1" + ) bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" bqclient.get_table.return_value = table @@ -80,13 +119,33 @@ def test_read_gbq_clustered_table_raises_no_default_index_error(): session.read_gbq("my-project.my_dataset.my_table") -def test_read_gbq_clustered_table_ok_default_index_with_force_true(): +def test_no_default_index_error_raised_by_read_gbq_time_partitioned_table(): """Because of the windowing operation to create a default index, row filters can't push down to the clustering column. Raise an exception in this case so that the user is directed to supply a unique index column or filter if possible. + See internal issue 335727141. + """ + table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") + table.time_partitioning = google.cloud.bigquery.table.TimePartitioning(field="col1") + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + session = resources.create_bigquery_session(bqclient=bqclient) + table._properties["location"] = session._location + + with pytest.raises(bigframes.exceptions.NoDefaultIndexError): + session.read_gbq("my-project.my_dataset.my_table") + + +def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_sequential_int64(): + """Because of the windowing operation to create a default index, row + filters can't push down to the clustering column. + + Allow people to use the default index only if they explicitly request it. + See internal issue 335727141. """ table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") @@ -109,7 +168,50 @@ def test_read_gbq_clustered_table_ok_default_index_with_force_true(): assert "ROW_NUMBER()".casefold() in generated_sql -def test_read_gbq_clustered_table_ok_default_index_with_primary_key(): +@pytest.mark.parametrize( + ("total_count", "distinct_count"), + ( + (0, 0), + (123, 123), + # Should still have a positive effect, even if the index is not unique. + (123, 111), + ), +) +def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_index_col( + total_count, + distinct_count, +): + table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") + table.clustering_fields = ["col1", "col2"] + table.schema = ( + google.cloud.bigquery.SchemaField("idx_1", "INT64"), + google.cloud.bigquery.SchemaField("idx_2", "INT64"), + google.cloud.bigquery.SchemaField("col_1", "INT64"), + google.cloud.bigquery.SchemaField("col_2", "INT64"), + ) + + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + bqclient.query_and_wait.return_value = ( + {"total_count": total_count, "distinct_count": distinct_count}, + ) + session = resources.create_bigquery_session( + bqclient=bqclient, table_schema=table.schema + ) + table._properties["location"] = session._location + + # No exception raised because there are columns to use as the index. + df = session.read_gbq( + "my-project.my_dataset.my_table", index_col=("idx_1", "idx_2") + ) + + # There should be no analytic operators to prevent row filtering pushdown. + assert "OVER" not in df.sql + assert tuple(df.index.names) == ("idx_1", "idx_2") + + +def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_primary_key(): """If a primary key is set on the table, we use that as the index column by default, no error should be raised in this case. @@ -147,40 +249,25 @@ def test_read_gbq_clustered_table_ok_default_index_with_primary_key(): assert tuple(df.index.names) == ("pk_1", "pk_2") -# def test_read_gbq_clustered_table_ok_default_index_with_filters(): -# bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) -# bqclient.project = "test-project" -# bqclient.get_table.side_effect = google.api_core.exceptions.NotFound( -# "table not found" -# ) -# session = resources.create_bigquery_session(bqclient=bqclient) -# -# with pytest.raises(google.api_core.exceptions.NotFound): -# session.read_gbq(not_found_table_id) -# -# -# def test_read_gbq_clustered_table_ok_default_index_with_force_true(): -# bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) -# bqclient.project = "test-project" -# bqclient.get_table.side_effect = google.api_core.exceptions.NotFound( -# "table not found" -# ) -# session = resources.create_bigquery_session(bqclient=bqclient) -# -# with pytest.raises(google.api_core.exceptions.NotFound): -# session.read_gbq(not_found_table_id) +def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_filters(): + table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") + table.clustering_fields = ["col1", "col2"] + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + session = resources.create_bigquery_session(bqclient=bqclient) + table._properties["location"] = session._location + # No exception raised because we set the option allowing the default indexes. + df = session.read_gbq( + "my-project.my_dataset.my_table", + filters=[("col2", "<", 123)], + ) -# def test_read_gbq_partitioned_table_raises_no_default_index(): -# bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) -# bqclient.project = "test-project" -# bqclient.get_table.side_effect = google.api_core.exceptions.NotFound( -# "table not found" -# ) -# session = resources.create_bigquery_session(bqclient=bqclient) -# -# with pytest.raises(google.api_core.exceptions.NotFound): -# session.read_gbq(not_found_table_id) + # We expect a window operation because we specificaly requested a sequential index. + generated_sql = df.sql.casefold() + assert "OVER".casefold() in generated_sql + assert "ROW_NUMBER()".casefold() in generated_sql @pytest.mark.parametrize( From 613e660293d825902ae54f37c6290b744a3f74fb Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 25 Apr 2024 22:40:16 +0000 Subject: [PATCH 11/24] parametrize tables with clustered and partitioned --- bigframes/session/__init__.py | 13 ++- tests/unit/session/test_session.py | 140 +++++++++++++++++------------ 2 files changed, 92 insertions(+), 61 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 227156b567..af885661cb 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1222,8 +1222,8 @@ def read_csv( f"{constants.FEEDBACK_LINK}" ) - # TODO(tswast): Looks like we can relax this 1 column restriction, - # but leaving it for now because I'm not sure why we have it. + # TODO(tswast): Looks like we can relax this 1 column restriction if + # we check the contents of an iterable are strings not integers. if ( # Empty tuples and None are both allowed and falsey index_col @@ -1234,6 +1234,15 @@ def read_csv( "BigQuery engine only supports a single column name for `index_col`, " f"got: {repr(index_col)}. {constants.FEEDBACK_LINK}" ) + index_col = typing.cast( + Union[ + None, + Sequence[str], # Falsey values + bigframes.enums.DefaultIndexKind, + str, + ], + index_col, + ) # None value for index_col cannot be passed to read_gbq if index_col is None: diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index c8bdc7fc5a..2b4c0b7a34 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import datetime import os import re @@ -28,6 +29,68 @@ from .. import resources +TABLE_REFERENCE = { + "projectId": "my-project", + "datasetId": "my_dataset", + "tableId": "my_table", +} +CLUSTERED_OR_PARTITIONED_TABLES = [ + pytest.param( + google.cloud.bigquery.Table.from_api_repr( + { + "tableReference": TABLE_REFERENCE, + "clustering": { + "fields": ["col1", "col2"], + }, + }, + ), + id="clustered", + ), + pytest.param( + google.cloud.bigquery.Table.from_api_repr( + { + "tableReference": TABLE_REFERENCE, + "rangePartitioning": { + "field": "col1", + "range": { + "start": 1, + "end": 100, + "interval": 1, + }, + }, + }, + ), + id="range-partitioned", + ), + pytest.param( + google.cloud.bigquery.Table.from_api_repr( + { + "tableReference": TABLE_REFERENCE, + "timePartitioning": { + "type": "MONTH", + "field": "col1", + }, + }, + ), + id="time-partitioned", + ), + pytest.param( + google.cloud.bigquery.Table.from_api_repr( + { + "tableReference": TABLE_REFERENCE, + "clustering": { + "fields": ["col1", "col2"], + }, + "timePartitioning": { + "type": "MONTH", + "field": "col1", + }, + }, + ), + id="time-partitioned-and-clustered", + ), +] + @pytest.mark.parametrize( ("engine",), @@ -75,7 +138,8 @@ def test_read_gbq_cached_table(): assert "1999-01-02T03:04:05.678901" in df.sql -def test_no_default_index_error_raised_by_read_gbq_clustered_table(): +@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES) +def test_no_default_index_error_raised_by_read_gbq(table): """Because of the windowing operation to create a default index, row filters can't push down to the clustering column. @@ -84,8 +148,7 @@ def test_no_default_index_error_raised_by_read_gbq_clustered_table(): See internal issue 335727141. """ - table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") - table.clustering_fields = ["col1", "col2"] + table = copy.deepcopy(table) bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" bqclient.get_table.return_value = table @@ -96,51 +159,10 @@ def test_no_default_index_error_raised_by_read_gbq_clustered_table(): session.read_gbq("my-project.my_dataset.my_table") -def test_no_default_index_error_raised_by_read_gbq_range_partitioned_table(): - """Because of the windowing operation to create a default index, row - filters can't push down to the clustering column. - - Raise an exception in this case so that the user is directed to supply a - unique index column or filter if possible. - - See internal issue 335727141. - """ - table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") - table.time_partitioning = google.cloud.bigquery.table.RangePartitioning( - field="col1" - ) - bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) - bqclient.project = "test-project" - bqclient.get_table.return_value = table - session = resources.create_bigquery_session(bqclient=bqclient) - table._properties["location"] = session._location - - with pytest.raises(bigframes.exceptions.NoDefaultIndexError): - session.read_gbq("my-project.my_dataset.my_table") - - -def test_no_default_index_error_raised_by_read_gbq_time_partitioned_table(): - """Because of the windowing operation to create a default index, row - filters can't push down to the clustering column. - - Raise an exception in this case so that the user is directed to supply a - unique index column or filter if possible. - - See internal issue 335727141. - """ - table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") - table.time_partitioning = google.cloud.bigquery.table.TimePartitioning(field="col1") - bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) - bqclient.project = "test-project" - bqclient.get_table.return_value = table - session = resources.create_bigquery_session(bqclient=bqclient) - table._properties["location"] = session._location - - with pytest.raises(bigframes.exceptions.NoDefaultIndexError): - session.read_gbq("my-project.my_dataset.my_table") - - -def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_sequential_int64(): +@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES) +def test_no_default_index_error_not_raised_by_read_gbq_index_col_sequential_int64( + table, +): """Because of the windowing operation to create a default index, row filters can't push down to the clustering column. @@ -148,8 +170,7 @@ def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_sequentia See internal issue 335727141. """ - table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") - table.clustering_fields = ["col1", "col2"] + table = copy.deepcopy(table) bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" bqclient.get_table.return_value = table @@ -177,12 +198,13 @@ def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_sequentia (123, 111), ), ) -def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_index_col( +@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES) +def test_no_default_index_error_not_raised_by_read_gbq_index_col_columns( total_count, distinct_count, + table, ): - table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") - table.clustering_fields = ["col1", "col2"] + table = copy.deepcopy(table) table.schema = ( google.cloud.bigquery.SchemaField("idx_1", "INT64"), google.cloud.bigquery.SchemaField("idx_2", "INT64"), @@ -211,14 +233,14 @@ def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_index_col assert tuple(df.index.names) == ("idx_1", "idx_2") -def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_primary_key(): +@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES) +def test_no_default_index_error_not_raised_by_read_gbq_primary_key(table): """If a primary key is set on the table, we use that as the index column by default, no error should be raised in this case. See internal issue 335727141. """ - table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") - table.clustering_fields = ["col1", "col2"] + table = copy.deepcopy(table) table.schema = ( google.cloud.bigquery.SchemaField("pk_1", "INT64"), google.cloud.bigquery.SchemaField("pk_2", "INT64"), @@ -249,9 +271,9 @@ def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_primary_k assert tuple(df.index.names) == ("pk_1", "pk_2") -def test_no_default_index_error_not_raised_by_read_gbq_clustered_table_filters(): - table = google.cloud.bigquery.Table("my-project.my_dataset.my_table") - table.clustering_fields = ["col1", "col2"] +@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES) +def test_no_default_index_error_not_raised_by_read_gbq_filters(table): + table = copy.deepcopy(table) bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" bqclient.get_table.return_value = table From f437dcf0788c355595e8a3cd872532142789d158 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 26 Apr 2024 16:14:42 +0000 Subject: [PATCH 12/24] refactor: split `read_gbq_table` implementation into functions and move to separate module add todos --- bigframes/session/__init__.py | 97 ++---- .../_io/{bigquery.py => bigquery/__init__.py} | 74 +---- .../session/_io/bigquery/read_gbq_table.py | 309 ++++++++++++++++++ 3 files changed, 333 insertions(+), 147 deletions(-) rename bigframes/session/_io/{bigquery.py => bigquery/__init__.py} (74%) create mode 100644 bigframes/session/_io/bigquery/read_gbq_table.py diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 34047ff155..f25007e2a7 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -692,59 +692,6 @@ def read_gbq_table( use_cache=use_cache, ) - def _get_snapshot_sql_and_primary_key( - self, - table: google.cloud.bigquery.table.Table, - *, - api_name: str, - use_cache: bool = True, - ) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]: - """Create a read-only Ibis table expression representing a table. - - If we can get a total ordering from the table, such as via primary key - column(s), then return those too so that ordering generation can be - avoided. - """ - ( - snapshot_timestamp, - table, - ) = bigframes_io.get_snapshot_datetime_and_table_metadata( - self.bqclient, - table_ref=table.reference, - api_name=api_name, - cache=self._df_snapshot, - use_cache=use_cache, - ) - - if table.location.casefold() != self._location.casefold(): - raise ValueError( - f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" - ) - - # If there are primary keys defined, the query engine assumes these - # columns are unique, even if the constraint is not enforced. We make - # the same assumption and use these columns as the total ordering keys. - primary_keys = None - if ( - (table_constraints := getattr(table, "table_constraints", None)) is not None - and (primary_key := table_constraints.primary_key) is not None - # This will be False for either None or empty list. - # We want primary_keys = None if no primary keys are set. - and (columns := primary_key.columns) - ): - primary_keys = columns - - try: - table_expression = self.ibis_client.sql( - bigframes_io.create_snapshot_sql(table.reference, snapshot_timestamp) - ) - except google.api_core.exceptions.Forbidden as ex: - if "Drive credentials" in ex.message: - ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." - raise - - return table_expression, primary_keys - def _read_gbq_table( self, query: str, @@ -757,16 +704,39 @@ def _read_gbq_table( ) -> dataframe.DataFrame: import bigframes.dataframe as dataframe + # TODO TODO TODO + # * Validations on types / value ranges + # 0. Transform input types, e.g. index_col -> index_cols + # 1. Get table metadata (possibly cached) + # 2. Create ibis Table with time travel. + # * Validations based on value columns. + # * Validations based on index columns. + # 4. Create index. + # 5. Create ordering. + # TODO TODO TODO + if max_results and max_results <= 0: raise ValueError("`max_results` should be a positive number.") table_ref = bigquery.table.TableReference.from_string( query, default_project=self.bqclient.project ) +<<<<<<< HEAD + ( + snapshot_timestamp, + table, + ) = bigframes_io.get_snapshot_datetime_and_table_metadata( + self.bqclient, + table_ref=table_ref, + api_name=api_name, + cache=self._df_snapshot, + use_cache=use_cache, +======= table = self.bqclient.get_table(table_ref) (table_expression, primary_keys,) = self._get_snapshot_sql_and_primary_key( table, api_name=api_name, use_cache=use_cache +>>>>>>> 729cf0a7 (add todos) ) total_ordering_cols = primary_keys @@ -862,27 +832,6 @@ def _read_gbq_table( df.sort_index() return df - def _check_index_uniqueness( - self, table: ibis_types.Table, index_cols: List[str] - ) -> bool: - distinct_table = table.select(*index_cols).distinct() - is_unique_sql = f"""WITH full_table AS ( - {self.ibis_client.compile(table)} - ), - distinct_table AS ( - {self.ibis_client.compile(distinct_table)} - ) - - SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, - (SELECT COUNT(*) FROM distinct_table) AS `distinct_count` - """ - results, _ = self._start_query(is_unique_sql) - row = next(iter(results)) - - total_count = row["total_count"] - distinct_count = row["distinct_count"] - return total_count == distinct_count - def _read_bigquery_load_job( self, filepath_or_buffer: str | IO["bytes"], diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery/__init__.py similarity index 74% rename from bigframes/session/_io/bigquery.py rename to bigframes/session/_io/bigquery/__init__.py index 94576cfa12..2cd2d8ff9a 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Private module: Helpers for I/O operations.""" +"""Private module: Helpers for BigQuery I/O operations.""" from __future__ import annotations @@ -23,7 +23,6 @@ import types from typing import Dict, Iterable, Optional, Sequence, Tuple, Union import uuid -import warnings import google.api_core.exceptions import google.cloud.bigquery as bigquery @@ -122,77 +121,6 @@ def table_ref_to_sql(table: bigquery.TableReference) -> str: return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`" -def get_snapshot_datetime_and_table_metadata( - bqclient: bigquery.Client, - table_ref: bigquery.TableReference, - *, - api_name: str, - cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], - use_cache: bool = True, -) -> Tuple[datetime.datetime, bigquery.Table]: - cached_table = cache.get(table_ref) - if use_cache and cached_table is not None: - snapshot_timestamp, _ = cached_table - - # Cache hit could be unexpected. See internal issue 329545805. - # Raise a warning with more information about how to avoid the - # problems with the cache. - warnings.warn( - f"Reading cached table from {snapshot_timestamp} to avoid " - "incompatibilies with previous reads of this table. To read " - "the latest version, set `use_cache=False` or close the " - "current session with Session.close() or " - "bigframes.pandas.close_session().", - # There are many layers before we get to (possibly) the user's code: - # pandas.read_gbq_table - # -> with_default_session - # -> Session.read_gbq_table - # -> _read_gbq_table - # -> _get_snapshot_sql_and_primary_key - # -> get_snapshot_datetime_and_table_metadata - stacklevel=7, - ) - return cached_table - - # TODO(swast): It's possible that the table metadata is changed between now - # and when we run the CURRENT_TIMESTAMP() query to see when we can time - # travel to. Find a way to fetch the table metadata and BQ's current time - # atomically. - table = bqclient.get_table(table_ref) - - # TODO(b/336521938): Refactor to make sure we set the "bigframes-api" - # whereever we execute a query. - job_config = bigquery.QueryJobConfig() - job_config.labels["bigframes-api"] = api_name - snapshot_timestamp = list( - bqclient.query( - "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", - job_config=job_config, - ).result() - )[0][0] - cached_table = (snapshot_timestamp, table) - cache[table_ref] = cached_table - return cached_table - - -def create_snapshot_sql( - table_ref: bigquery.TableReference, current_timestamp: datetime.datetime -) -> str: - """Query a table via 'time travel' for consistent reads.""" - # If we have an anonymous query results table, it can't be modified and - # there isn't any BigQuery time travel. - if table_ref.dataset_id.startswith("_"): - return f"SELECT * FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`" - - return textwrap.dedent( - f""" - SELECT * - FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}` - FOR SYSTEM_TIME AS OF TIMESTAMP({repr(current_timestamp.isoformat())}) - """ - ) - - def create_temp_table( bqclient: bigquery.Client, dataset: bigquery.DatasetReference, diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py new file mode 100644 index 0000000000..1d4eb7dece --- /dev/null +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -0,0 +1,309 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Private helpers for loading a BigQuery table as a BigQuery DataFrames DataFrame. +""" + +from __future__ import annotations + +import datetime +import itertools +import os +import textwrap +import types +from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union +import uuid +import warnings + +import google.api_core.exceptions +import google.cloud.bigquery as bigquery +import ibis +import ibis.expr.types as ibis_types + +import bigframes +import bigframes._config.bigquery_options as bigquery_options +import bigframes.clients +import bigframes.constants as constants +from bigframes.core import log_adapter +import bigframes.core as core +import bigframes.core.blocks as blocks +import bigframes.core.compile +import bigframes.core.guid as guid +import bigframes.core.nodes as nodes +from bigframes.core.ordering import IntegerEncoding +import bigframes.core.ordering as order +import bigframes.core.tree_properties as traversals +import bigframes.core.tree_properties as tree_properties +import bigframes.core.utils as utils +import bigframes.dtypes +import bigframes.formatting_helpers as formatting_helpers +from bigframes.functions.remote_function import read_gbq_function as bigframes_rgf +from bigframes.functions.remote_function import remote_function as bigframes_rf +import bigframes.session._io.bigquery as bigframes_io +import bigframes.session._io.bigquery.read_gbq_table +import bigframes.session.clients +import bigframes.version + +# Avoid circular imports. +if typing.TYPE_CHECKING: + import bigframes.dataframe as dataframe + + +def _check_index_uniqueness( + self, table: ibis_types.Table, index_cols: List[str] +) -> bool: + distinct_table = table.select(*index_cols).distinct() + is_unique_sql = f"""WITH full_table AS ( + {self.ibis_client.compile(table)} + ), + distinct_table AS ( + {self.ibis_client.compile(distinct_table)} + ) + + SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, + (SELECT COUNT(*) FROM distinct_table) AS `distinct_count` + """ + results, _ = self._start_query(is_unique_sql) + row = next(iter(results)) + + total_count = row["total_count"] + distinct_count = row["distinct_count"] + return total_count == distinct_count + + +def create_snapshot_sql( + table_ref: bigquery.TableReference, current_timestamp: datetime.datetime +) -> str: + """Query a table via 'time travel' for consistent reads.""" + # If we have an anonymous query results table, it can't be modified and + # there isn't any BigQuery time travel. + if table_ref.dataset_id.startswith("_"): + return f"SELECT * FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`" + + return textwrap.dedent( + f""" + SELECT * + FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}` + FOR SYSTEM_TIME AS OF TIMESTAMP({repr(current_timestamp.isoformat())}) + """ + ) + + +def _get_snapshot_sql_and_primary_key( + self, + table: google.cloud.bigquery.table.Table, + *, + api_name: str, + use_cache: bool = True, +) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]: + """Create a read-only Ibis table expression representing a table. + + If we can get a total ordering from the table, such as via primary key + column(s), then return those too so that ordering generation can be + avoided. + """ + ( + snapshot_timestamp, + table, + ) = bigframes_io.get_snapshot_datetime_and_table_metadata( + self.bqclient, + table_ref=table.reference, + api_name=api_name, + cache=self._df_snapshot, + use_cache=use_cache, + ) + + if table.location.casefold() != self._location.casefold(): + raise ValueError( + f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" + ) + + # If there are primary keys defined, the query engine assumes these + # columns are unique, even if the constraint is not enforced. We make + # the same assumption and use these columns as the total ordering keys. + primary_keys = None + if ( + (table_constraints := getattr(table, "table_constraints", None)) is not None + and (primary_key := table_constraints.primary_key) is not None + # This will be False for either None or empty list. + # We want primary_keys = None if no primary keys are set. + and (columns := primary_key.columns) + ): + primary_keys = columns + + try: + table_expression = self.ibis_client.sql( + bigframes_io.create_snapshot_sql(table.reference, snapshot_timestamp) + ) + except google.api_core.exceptions.Forbidden as ex: + if "Drive credentials" in ex.message: + ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." + raise + + return table_expression, primary_keys + + +def get_index_and_maybe_total_ordering( + table: bigquery.table.Table, +): + """ + If we can get a total ordering from the table, such as via primary key + column(s), then return those too so that ordering generation can be + avoided. + """ + # If there are primary keys defined, the query engine assumes these + # columns are unique, even if the constraint is not enforced. We make + # the same assumption and use these columns as the total ordering keys. + primary_keys = None + if ( + (table_constraints := getattr(table, "table_constraints", None)) is not None + and (primary_key := table_constraints.primary_key) is not None + # This will be False for either None or empty list. + # We want primary_keys = None if no primary keys are set. + and (columns := primary_key.columns) + ): + primary_keys = columns + + total_ordering_cols = primary_keys + + # TODO: warn if partitioned and/or clustered except if: + # primary_keys, index_col, or filters + # Except it looks like filters goes through the query path? + + if not index_col and primary_keys is not None: + index_col = primary_keys + + if isinstance(index_col, str): + index_cols = [index_col] + else: + index_cols = list(index_col) + + return index_cols, total_ordering_cols + + +def get_time_travel_datetime_and_table_metadata( + bqclient: bigquery.Client, + table_ref: bigquery.TableReference, + *, + api_name: str, + cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], + use_cache: bool = True, +) -> Tuple[datetime.datetime, bigquery.Table]: + cached_table = cache.get(table_ref) + if use_cache and cached_table is not None: + snapshot_timestamp, _ = cached_table + + # Cache hit could be unexpected. See internal issue 329545805. + # Raise a warning with more information about how to avoid the + # problems with the cache. + warnings.warn( + f"Reading cached table from {snapshot_timestamp} to avoid " + "incompatibilies with previous reads of this table. To read " + "the latest version, set `use_cache=False` or close the " + "current session with Session.close() or " + "bigframes.pandas.close_session().", + # There are many layers before we get to (possibly) the user's code: + # pandas.read_gbq_table + # -> with_default_session + # -> Session.read_gbq_table + # -> _read_gbq_table + # -> _get_snapshot_sql_and_primary_key + # -> get_snapshot_datetime_and_table_metadata + stacklevel=7, + ) + return cached_table + + # TODO(swast): It's possible that the table metadata is changed between now + # and when we run the CURRENT_TIMESTAMP() query to see when we can time + # travel to. Find a way to fetch the table metadata and BQ's current time + # atomically. + table = bqclient.get_table(table_ref) + + # TODO(b/336521938): Refactor to make sure we set the "bigframes-api" + # whereever we execute a query. + job_config = bigquery.QueryJobConfig() + job_config.labels["bigframes-api"] = api_name + snapshot_timestamp = list( + bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + cached_table = (snapshot_timestamp, table) + cache[table_ref] = cached_table + return cached_table + + +def to_ibis_table_with_time_travel( + ibis_client: ibis.Backend, + table_ref: bigquery.table.TableReference, + snapshot_timestamp: datetime.datetime, +) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]: + """Create a read-only Ibis table expression representing a table.""" + try: + table_expression = ibis_client.sql( + create_snapshot_sql(table_ref, snapshot_timestamp) + ) + except google.api_core.exceptions.Forbidden as ex: + if "Drive credentials" in ex.message: + ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." + raise + + return table_expression + + +def to_array_value_with_total_ordering( + self, + table: ibis_types.Table, +) -> core.ArrayValue: + # Since this might also be used as the index, don't use the default + # "ordering ID" name. + ordering_hash_part = guid.generate_guid("bigframes_ordering_") + ordering_rand_part = guid.generate_guid("bigframes_ordering_") + + # All inputs into hash must be non-null or resulting hash will be null + str_values = list( + map(lambda col: _convert_to_nonnull_string(table[col]), table.columns) + ) + full_row_str = ( + str_values[0].concat(*str_values[1:]) if len(str_values) > 1 else str_values[0] + ) + full_row_hash = full_row_str.hash().name(ordering_hash_part) + # Used to disambiguate between identical rows (which will have identical hash) + random_value = ibis.random().name(ordering_rand_part) + + original_column_ids = table.columns + table_with_ordering = table.select( + itertools.chain(original_column_ids, [full_row_hash, random_value]) + ) + + ordering_ref1 = order.ascending_over(ordering_hash_part) + ordering_ref2 = order.ascending_over(ordering_rand_part) + ordering = order.ExpressionOrdering( + ordering_value_columns=(ordering_ref1, ordering_ref2), + total_ordering_columns=frozenset([ordering_hash_part, ordering_rand_part]), + ) + columns = [table_with_ordering[col] for col in original_column_ids] + hidden_columns = [ + table_with_ordering[ordering_hash_part], + table_with_ordering[ordering_rand_part], + ] + return core.ArrayValue.from_ibis( + self, + table_with_ordering, + columns, + hidden_ordering_columns=hidden_columns, + ordering=ordering, + ) From 0090dc0032c211ae0082e80d9173887cf3859f08 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 29 Apr 2024 17:22:11 +0000 Subject: [PATCH 13/24] refactor progress --- bigframes/session/__init__.py | 81 ++++++---- .../session/_io/bigquery/read_gbq_table.py | 151 ++++++++++-------- tests/unit/session/test_io_bigquery.py | 16 -- tests/unit/session/test_read_gbq_table.py | 37 +++++ 4 files changed, 174 insertions(+), 111 deletions(-) create mode 100644 tests/unit/session/test_read_gbq_table.py diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index f25007e2a7..8853d24969 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -92,6 +92,7 @@ from bigframes.functions.remote_function import read_gbq_function as bigframes_rgf from bigframes.functions.remote_function import remote_function as bigframes_rf import bigframes.session._io.bigquery as bigframes_io +import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table import bigframes.session.clients import bigframes.version @@ -704,44 +705,60 @@ def _read_gbq_table( ) -> dataframe.DataFrame: import bigframes.dataframe as dataframe - # TODO TODO TODO - # * Validations on types / value ranges - # 0. Transform input types, e.g. index_col -> index_cols - # 1. Get table metadata (possibly cached) - # 2. Create ibis Table with time travel. - # * Validations based on value columns. - # * Validations based on index columns. - # 4. Create index. - # 5. Create ordering. - # TODO TODO TODO + # ----------------------------- + # Validate and transform inputs + # ----------------------------- if max_results and max_results <= 0: - raise ValueError("`max_results` should be a positive number.") + raise ValueError( + f"`max_results` should be a positive number, got {max_results}." + ) + + # Transform index_col -> index_cols so we have a variable that is + # always a list of column names (possibly empty). + if isinstance(index_col, str): + index_cols: List[str] = [index_col] + else: + index_cols = list(index_col) table_ref = bigquery.table.TableReference.from_string( query, default_project=self.bqclient.project ) -<<<<<<< HEAD - ( - snapshot_timestamp, - table, - ) = bigframes_io.get_snapshot_datetime_and_table_metadata( + + # --------------------------------- + # Fetch table metadata and validate + # --------------------------------- + + # TODO TODO TODO + # * Validations based on index columns. + # 4. Create index. + # 5. Create ordering. + # TODO TODO TODO + + (time_travel_timestamp, table,) = bf_read_gbq_table.get_table_metadata( self.bqclient, table_ref=table_ref, api_name=api_name, cache=self._df_snapshot, use_cache=use_cache, -======= - - table = self.bqclient.get_table(table_ref) - (table_expression, primary_keys,) = self._get_snapshot_sql_and_primary_key( - table, api_name=api_name, use_cache=use_cache ->>>>>>> 729cf0a7 (add todos) ) - total_ordering_cols = primary_keys - if not index_col and primary_keys is not None: - index_col = primary_keys + if table.location.casefold() != self._location.casefold(): + raise ValueError( + f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" + ) + + # ----------------------------------------- + # Create Ibis table expression and validate + # ----------------------------------------- + + # Use a time travel to make sure the DataFrame is deterministic, even + # if the underlying table changes. + table_expression = bf_read_gbq_table.get_ibis_time_travel_table( + self.ibis_client, + table_ref, + time_travel_timestamp, + ) for key in columns: if key not in table_expression.columns: @@ -749,10 +766,11 @@ def _read_gbq_table( f"Column '{key}' of `columns` not found in this table." ) - if isinstance(index_col, str): - index_cols: List[str] = [index_col] - else: - index_cols = list(index_col) + # ------------------------- + # Create index and validate + # ------------------------- + + # TODO: Get primary keys from the table. for key in index_cols: if key not in table_expression.columns: @@ -760,6 +778,11 @@ def _read_gbq_table( f"Column `{key}` of `index_col` not found in this table." ) + total_ordering_cols = primary_keys + + if not index_col and primary_keys is not None: + index_col = primary_keys + if columns: table_expression = table_expression.select([*index_cols, *columns]) diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 1d4eb7dece..8d69c9465e 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -23,6 +23,7 @@ import os import textwrap import types +import typing from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union import uuid import warnings @@ -30,6 +31,7 @@ import google.api_core.exceptions import google.cloud.bigquery as bigquery import ibis +import ibis.backends import ibis.expr.types as ibis_types import bigframes @@ -61,30 +63,63 @@ import bigframes.dataframe as dataframe -def _check_index_uniqueness( - self, table: ibis_types.Table, index_cols: List[str] -) -> bool: - distinct_table = table.select(*index_cols).distinct() - is_unique_sql = f"""WITH full_table AS ( - {self.ibis_client.compile(table)} - ), - distinct_table AS ( - {self.ibis_client.compile(distinct_table)} - ) +def get_table_metadata( + bqclient: bigquery.Client, + table_ref: google.cloud.bigquery.table.TableReference, + *, + api_name: str, + cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], + use_cache: bool = True, +) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]: + """Get the table metadata, either from cache or via REST API.""" - SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, - (SELECT COUNT(*) FROM distinct_table) AS `distinct_count` - """ - results, _ = self._start_query(is_unique_sql) - row = next(iter(results)) + cached_table = cache.get(table_ref) + if use_cache and cached_table is not None: + snapshot_timestamp, _ = cached_table - total_count = row["total_count"] - distinct_count = row["distinct_count"] - return total_count == distinct_count + # Cache hit could be unexpected. See internal issue 329545805. + # Raise a warning with more information about how to avoid the + # problems with the cache. + warnings.warn( + f"Reading cached table from {snapshot_timestamp} to avoid " + "incompatibilies with previous reads of this table. To read " + "the latest version, set `use_cache=False` or close the " + "current session with Session.close() or " + "bigframes.pandas.close_session().", + # There are many layers before we get to (possibly) the user's code: + # pandas.read_gbq_table + # -> with_default_session + # -> Session.read_gbq_table + # -> _read_gbq_table + # -> _get_snapshot_sql_and_primary_key + # -> get_snapshot_datetime_and_table_metadata + stacklevel=7, + ) + return cached_table + + # TODO(swast): It's possible that the table metadata is changed between now + # and when we run the CURRENT_TIMESTAMP() query to see when we can time + # travel to. Find a way to fetch the table metadata and BQ's current time + # atomically. + table = bqclient.get_table(table_ref) + + # TODO(b/336521938): Refactor to make sure we set the "bigframes-api" + # whereever we execute a query. + job_config = bigquery.QueryJobConfig() + job_config.labels["bigframes-api"] = api_name + snapshot_timestamp = list( + bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + cached_table = (snapshot_timestamp, table) + cache[table_ref] = cached_table + return cached_table -def create_snapshot_sql( - table_ref: bigquery.TableReference, current_timestamp: datetime.datetime +def _create_time_travel_sql( + table_ref: bigquery.TableReference, time_travel_timestamp: datetime.datetime ) -> str: """Query a table via 'time travel' for consistent reads.""" # If we have an anonymous query results table, it can't be modified and @@ -96,63 +131,47 @@ def create_snapshot_sql( f""" SELECT * FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}` - FOR SYSTEM_TIME AS OF TIMESTAMP({repr(current_timestamp.isoformat())}) + FOR SYSTEM_TIME AS OF TIMESTAMP({repr(time_travel_timestamp.isoformat())}) """ ) -def _get_snapshot_sql_and_primary_key( - self, - table: google.cloud.bigquery.table.Table, - *, - api_name: str, - use_cache: bool = True, -) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]: - """Create a read-only Ibis table expression representing a table. - - If we can get a total ordering from the table, such as via primary key - column(s), then return those too so that ordering generation can be - avoided. - """ - ( - snapshot_timestamp, - table, - ) = bigframes_io.get_snapshot_datetime_and_table_metadata( - self.bqclient, - table_ref=table.reference, - api_name=api_name, - cache=self._df_snapshot, - use_cache=use_cache, - ) - - if table.location.casefold() != self._location.casefold(): - raise ValueError( - f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" - ) - - # If there are primary keys defined, the query engine assumes these - # columns are unique, even if the constraint is not enforced. We make - # the same assumption and use these columns as the total ordering keys. - primary_keys = None - if ( - (table_constraints := getattr(table, "table_constraints", None)) is not None - and (primary_key := table_constraints.primary_key) is not None - # This will be False for either None or empty list. - # We want primary_keys = None if no primary keys are set. - and (columns := primary_key.columns) - ): - primary_keys = columns - +def get_ibis_time_travel_table( + ibis_client: ibis.BaseBackend, + table_ref: bigquery.TableReference, + time_travel_timestamp: datetime.datetime, +) -> ibis_types.Table: try: - table_expression = self.ibis_client.sql( - bigframes_io.create_snapshot_sql(table.reference, snapshot_timestamp) + return ibis_client.sql( + bigframes_io.create_snapshot_sql(table_ref, time_travel_timestamp) ) except google.api_core.exceptions.Forbidden as ex: + # Ibis does a dry run to get the types of the columns from the SQL. if "Drive credentials" in ex.message: ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." raise - return table_expression, primary_keys + +def _check_index_uniqueness( + self, table: ibis_types.Table, index_cols: List[str] +) -> bool: + distinct_table = table.select(*index_cols).distinct() + is_unique_sql = f"""WITH full_table AS ( + {self.ibis_client.compile(table)} + ), + distinct_table AS ( + {self.ibis_client.compile(distinct_table)} + ) + + SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, + (SELECT COUNT(*) FROM distinct_table) AS `distinct_count` + """ + results, _ = self._start_query(is_unique_sql) + row = next(iter(results)) + + total_count = row["total_count"] + distinct_count = row["distinct_count"] + return total_count == distinct_count def get_index_and_maybe_total_ordering( diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 406de2b88e..eed1acb5a3 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -137,22 +137,6 @@ def test_create_job_configs_labels_length_limit_met(): assert "source" in labels.keys() -def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): - table_ref = bigquery.TableReference.from_string( - "my-test-project._e8166e0cdb.anonbb92cd" - ) - - sql = bigframes.session._io.bigquery.create_snapshot_sql( - table_ref, datetime.datetime.now(datetime.timezone.utc) - ) - - # Anonymous query results tables don't support time travel. - assert "SYSTEM_TIME" not in sql - - # Need fully-qualified table name. - assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql - - def test_create_temp_table_default_expiration(): """Make sure the created table has an expiration.""" bqclient = mock.create_autospec(bigquery.Client) diff --git a/tests/unit/session/test_read_gbq_table.py b/tests/unit/session/test_read_gbq_table.py new file mode 100644 index 0000000000..1d09769aec --- /dev/null +++ b/tests/unit/session/test_read_gbq_table.py @@ -0,0 +1,37 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for read_gbq_table helper functions.""" + +import datetime + +import google.cloud.bigquery as bigquery + +import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table + + +def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): + table_ref = bigquery.TableReference.from_string( + "my-test-project._e8166e0cdb.anonbb92cd" + ) + + sql = bf_read_gbq_table._create_time_travel_sql( + table_ref, datetime.datetime.now(datetime.timezone.utc) + ) + + # Anonymous query results tables don't support time travel. + assert "SYSTEM_TIME" not in sql + + # Need fully-qualified table name. + assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql From 850db7ab79147d67c565a12cb2a0427be750de18 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 29 Apr 2024 19:32:36 +0000 Subject: [PATCH 14/24] add index_cols function --- bigframes/session/__init__.py | 12 ++--- .../session/_io/bigquery/read_gbq_table.py | 54 ++++++++++++++----- 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 8853d24969..c4ad62f711 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -714,13 +714,6 @@ def _read_gbq_table( f"`max_results` should be a positive number, got {max_results}." ) - # Transform index_col -> index_cols so we have a variable that is - # always a list of column names (possibly empty). - if isinstance(index_col, str): - index_cols: List[str] = [index_col] - else: - index_cols = list(index_col) - table_ref = bigquery.table.TableReference.from_string( query, default_project=self.bqclient.project ) @@ -770,6 +763,11 @@ def _read_gbq_table( # Create index and validate # ------------------------- + index_cols, is_index_unique = bf_read_gbq_table.get_index_cols_and_uniqueness( + table, + index_col, + api_name=api_name, + ) # TODO: Get primary keys from the table. for key in index_cols: diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 8d69c9465e..2e4138c95a 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -143,7 +143,7 @@ def get_ibis_time_travel_table( ) -> ibis_types.Table: try: return ibis_client.sql( - bigframes_io.create_snapshot_sql(table_ref, time_travel_timestamp) + _create_time_travel_sql(table_ref, time_travel_timestamp) ) except google.api_core.exceptions.Forbidden as ex: # Ibis does a dry run to get the types of the columns from the SQL. @@ -174,18 +174,12 @@ def _check_index_uniqueness( return total_count == distinct_count -def get_index_and_maybe_total_ordering( +def _get_primary_keys( table: bigquery.table.Table, -): - """ - If we can get a total ordering from the table, such as via primary key - column(s), then return those too so that ordering generation can be - avoided. - """ - # If there are primary keys defined, the query engine assumes these - # columns are unique, even if the constraint is not enforced. We make - # the same assumption and use these columns as the total ordering keys. - primary_keys = None +) -> List[str]: + """Get primary keys from table if they are set.""" + + primary_keys: List[str] = [] if ( (table_constraints := getattr(table, "table_constraints", None)) is not None and (primary_key := table_constraints.primary_key) is not None @@ -193,7 +187,41 @@ def get_index_and_maybe_total_ordering( # We want primary_keys = None if no primary keys are set. and (columns := primary_key.columns) ): - primary_keys = columns + primary_keys = columns if columns is not None else [] + + return primary_keys + + +def get_index_cols_and_uniqueness( + table: bigquery.table.Table, + index_col: Iterable[str] | str = (), +) -> Tuple[List[str], bool]: + """ + If we can get a total ordering from the table, such as via primary key + column(s), then return those too so that ordering generation can be + avoided. + """ + + # Transform index_col -> index_cols so we have a variable that is + # always a list of column names (possibly empty). + if isinstance(index_col, str): + index_cols: List[str] = [index_col] + else: + index_cols = list(index_col) + + # If the isn't an index selected, use the primary keys of the table as the + # index. If there are no primary keys, we'll return an empty list. + if len(index_cols) == 0: + index_cols = _get_primary_keys(table) + + # If there are primary keys defined, the query engine assumes these + # columns are unique, even if the constraint is not enforced. We make + # the same assumption and use these columns as the total ordering keys. + is_index_unique = True + else: + is_index_unique = _check_index_uniqueness(table, index_cols, api_name) + + return index_cols, is_index_unique total_ordering_cols = primary_keys From ab98d4abbe0315915101cb7e98a66405a3a5c1b9 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 29 Apr 2024 22:29:48 +0000 Subject: [PATCH 15/24] maybe ready for review --- bigframes/session/__init__.py | 182 ++++-------------- .../session/_io/bigquery/read_gbq_table.py | 168 +++++++++------- 2 files changed, 133 insertions(+), 217 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index c4ad62f711..0fe96d0bab 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -18,7 +18,6 @@ import copy import datetime -import itertools import logging import os import re @@ -43,7 +42,6 @@ # Even though the ibis.backends.bigquery import is unused, it's needed # to register new and replacement ops with the Ibis BigQuery backend. import bigframes_vendored.ibis.backends.bigquery # noqa -import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq import bigframes_vendored.pandas.io.parquet as third_party_pandas_parquet import bigframes_vendored.pandas.io.parsers.readers as third_party_pandas_readers @@ -62,7 +60,6 @@ import google.cloud.storage as storage # type: ignore import ibis import ibis.backends.bigquery as ibis_bigquery -import ibis.expr.datatypes as ibis_dtypes import ibis.expr.types as ibis_types import numpy as np import pandas @@ -80,7 +77,6 @@ import bigframes.core as core import bigframes.core.blocks as blocks import bigframes.core.compile -import bigframes.core.guid as guid import bigframes.core.nodes as nodes from bigframes.core.ordering import IntegerEncoding import bigframes.core.ordering as order @@ -705,9 +701,9 @@ def _read_gbq_table( ) -> dataframe.DataFrame: import bigframes.dataframe as dataframe - # ----------------------------- - # Validate and transform inputs - # ----------------------------- + # --------------------------------- + # Validate and transform parameters + # --------------------------------- if max_results and max_results <= 0: raise ValueError( @@ -759,16 +755,22 @@ def _read_gbq_table( f"Column '{key}' of `columns` not found in this table." ) - # ------------------------- - # Create index and validate - # ------------------------- + # --------------------------------------- + # Create a non-default index and validate + # --------------------------------------- + + # TODO(b/337925142): Move index_cols creation to before we create the + # Ibis table expression so we don't have a "SELECT *" subquery in the + # query that checks for index uniqueness. index_cols, is_index_unique = bf_read_gbq_table.get_index_cols_and_uniqueness( - table, - index_col, + bqclient=self.bqclient, + ibis_client=self.ibis_client, + table=table, + table_expression=table_expression, + index_col=index_col, api_name=api_name, ) - # TODO: Get primary keys from the table. for key in index_cols: if key not in table_expression.columns: @@ -776,67 +778,33 @@ def _read_gbq_table( f"Column `{key}` of `index_col` not found in this table." ) - total_ordering_cols = primary_keys - - if not index_col and primary_keys is not None: - index_col = primary_keys - + # TODO(b/337925142): We should push down column filters when we get the time + # travel table to avoid "SELECT *" subqueries. if columns: table_expression = table_expression.select([*index_cols, *columns]) - # If the index is unique and sortable, then we don't need to generate - # an ordering column. - ordering = None - if total_ordering_cols is not None: - # Note: currently, a table has a total ordering only when the - # primary key(s) are set on a table. The query engine assumes such - # columns are unique, even if not enforced. - ordering = order.ExpressionOrdering( - ordering_value_columns=tuple( - order.ascending_over(column_id) for column_id in total_ordering_cols - ), - total_ordering_columns=frozenset(total_ordering_cols), - ) - column_values = [table_expression[col] for col in table_expression.columns] - array_value = core.ArrayValue.from_ibis( - self, - table_expression, - columns=column_values, - hidden_ordering_columns=[], - ordering=ordering, - ) + # ---------------------------- + # Create ordering and validate + # ---------------------------- - elif len(index_cols) != 0: - # We have index columns, lets see if those are actually total_order_columns - ordering = order.ExpressionOrdering( - ordering_value_columns=tuple( - [order.ascending_over(column_id) for column_id in index_cols] - ), - total_ordering_columns=frozenset(index_cols), - ) - is_total_ordering = self._check_index_uniqueness( - table_expression, index_cols + if is_index_unique: + array_value = bf_read_gbq_table.to_array_value_with_total_ordering( + session=self, + table_expression=table_expression, + total_ordering_cols=index_cols, ) - if is_total_ordering: - column_values = [ - table_expression[col] for col in table_expression.columns - ] - array_value = core.ArrayValue.from_ibis( - self, - table_expression, - columns=column_values, - hidden_ordering_columns=[], - ordering=ordering, - ) - else: - array_value = self._create_total_ordering( - table_expression, table_rows=table.num_rows - ) else: - array_value = self._create_total_ordering( - table_expression, table_rows=table.num_rows + # Note: Even though we're adding a default ordering here, that's + # just so we have a deterministic total ordering. If the user + # specified a non-unique index, we still sort by that later. + array_value = bf_read_gbq_table.to_array_value_with_default_ordering( + session=self, table=table_expression, table_rows=table.num_rows ) + # ---------------------------------------------------- + # Create Block & default index if len(index_cols) == 0 + # ---------------------------------------------------- + value_columns = [col for col in array_value.column_ids if col not in index_cols] block = blocks.Block( array_value, @@ -1432,66 +1400,6 @@ def _create_empty_temp_table( ) return bigquery.TableReference.from_string(table) - def _create_total_ordering( - self, - table: ibis_types.Table, - table_rows: Optional[int], - ) -> core.ArrayValue: - # Since this might also be used as the index, don't use the default - # "ordering ID" name. - - # For small tables, 64 bits is enough to avoid collisions, 128 bits will never ever collide no matter what - # Assume table is large if table row count is unknown - use_double_hash = ( - (table_rows is None) or (table_rows == 0) or (table_rows > 100000) - ) - - ordering_hash_part = guid.generate_guid("bigframes_ordering_") - ordering_hash_part2 = guid.generate_guid("bigframes_ordering_") - ordering_rand_part = guid.generate_guid("bigframes_ordering_") - - # All inputs into hash must be non-null or resulting hash will be null - str_values = list( - map(lambda col: _convert_to_nonnull_string(table[col]), table.columns) - ) - full_row_str = ( - str_values[0].concat(*str_values[1:]) - if len(str_values) > 1 - else str_values[0] - ) - full_row_hash = full_row_str.hash().name(ordering_hash_part) - # By modifying value slightly, we get another hash uncorrelated with the first - full_row_hash_p2 = (full_row_str + "_").hash().name(ordering_hash_part2) - # Used to disambiguate between identical rows (which will have identical hash) - random_value = ibis.random().name(ordering_rand_part) - - order_values = ( - [full_row_hash, full_row_hash_p2, random_value] - if use_double_hash - else [full_row_hash, random_value] - ) - - original_column_ids = table.columns - table_with_ordering = table.select( - itertools.chain(original_column_ids, order_values) - ) - - ordering = order.ExpressionOrdering( - ordering_value_columns=tuple( - order.ascending_over(col.get_name()) for col in order_values - ), - total_ordering_columns=frozenset(col.get_name() for col in order_values), - ) - columns = [table_with_ordering[col] for col in original_column_ids] - hidden_columns = [table_with_ordering[col.get_name()] for col in order_values] - return core.ArrayValue.from_ibis( - self, - table_with_ordering, - columns, - hidden_ordering_columns=hidden_columns, - ordering=ordering, - ) - def _ibis_to_temp_table( self, table: ibis_types.Table, @@ -2025,28 +1933,6 @@ def _can_cluster_bq(field: bigquery.SchemaField): ) -def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringValue: - col_type = column.type() - if ( - col_type.is_numeric() - or col_type.is_boolean() - or col_type.is_binary() - or col_type.is_temporal() - ): - result = column.cast(ibis_dtypes.String(nullable=True)) - elif col_type.is_geospatial(): - result = typing.cast(ibis_types.GeoSpatialColumn, column).as_text() - elif col_type.is_string(): - result = column - else: - # TO_JSON_STRING works with all data types, but isn't the most efficient - # Needed for JSON, STRUCT and ARRAY datatypes - result = vendored_ibis_ops.ToJsonString(column).to_expr() # type: ignore - # Escape backslashes and use backslash as delineator - escaped = typing.cast(ibis_types.StringColumn, result.fillna("")).replace("\\", "\\\\") # type: ignore - return typing.cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped) - - def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict: """ For backwards-compatibility, convert any previously client-side only diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 2e4138c95a..3235ca92e5 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -20,47 +20,55 @@ import datetime import itertools -import os import textwrap -import types import typing -from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union -import uuid +from typing import Dict, Iterable, List, Optional, Tuple import warnings +import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops import google.api_core.exceptions import google.cloud.bigquery as bigquery import ibis import ibis.backends +import ibis.expr.datatypes as ibis_dtypes import ibis.expr.types as ibis_types import bigframes -import bigframes._config.bigquery_options as bigquery_options import bigframes.clients -import bigframes.constants as constants -from bigframes.core import log_adapter import bigframes.core as core -import bigframes.core.blocks as blocks import bigframes.core.compile import bigframes.core.guid as guid -import bigframes.core.nodes as nodes -from bigframes.core.ordering import IntegerEncoding import bigframes.core.ordering as order -import bigframes.core.tree_properties as traversals -import bigframes.core.tree_properties as tree_properties -import bigframes.core.utils as utils import bigframes.dtypes -import bigframes.formatting_helpers as formatting_helpers -from bigframes.functions.remote_function import read_gbq_function as bigframes_rgf -from bigframes.functions.remote_function import remote_function as bigframes_rf -import bigframes.session._io.bigquery as bigframes_io import bigframes.session._io.bigquery.read_gbq_table import bigframes.session.clients import bigframes.version # Avoid circular imports. if typing.TYPE_CHECKING: - import bigframes.dataframe as dataframe + import bigframes.session + + +def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringValue: + col_type = column.type() + if ( + col_type.is_numeric() + or col_type.is_boolean() + or col_type.is_binary() + or col_type.is_temporal() + ): + result = column.cast(ibis_dtypes.String(nullable=True)) + elif col_type.is_geospatial(): + result = typing.cast(ibis_types.GeoSpatialColumn, column).as_text() + elif col_type.is_string(): + result = column + else: + # TO_JSON_STRING works with all data types, but isn't the most efficient + # Needed for JSON, STRUCT and ARRAY datatypes + result = vendored_ibis_ops.ToJsonString(column).to_expr() # type: ignore + # Escape backslashes and use backslash as delineator + escaped = typing.cast(ibis_types.StringColumn, result.fillna("")).replace("\\", "\\\\") # type: ignore + return typing.cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped) def get_table_metadata( @@ -153,20 +161,26 @@ def get_ibis_time_travel_table( def _check_index_uniqueness( - self, table: ibis_types.Table, index_cols: List[str] + bqclient: bigquery.Client, + ibis_client: ibis.BaseBackend, + table: ibis_types.Table, + index_cols: List[str], + api_name: str, ) -> bool: distinct_table = table.select(*index_cols).distinct() is_unique_sql = f"""WITH full_table AS ( - {self.ibis_client.compile(table)} + {ibis_client.compile(table)} ), distinct_table AS ( - {self.ibis_client.compile(distinct_table)} + {ibis_client.compile(distinct_table)} ) SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, (SELECT COUNT(*) FROM distinct_table) AS `distinct_count` """ - results, _ = self._start_query(is_unique_sql) + job_config = bigquery.QueryJobConfig() + job_config.labels["bigframes-api"] = api_name + results = bqclient.query_and_wait(is_unique_sql, job_config=job_config) row = next(iter(results)) total_count = row["total_count"] @@ -193,8 +207,12 @@ def _get_primary_keys( def get_index_cols_and_uniqueness( + bqclient: bigquery.Client, + ibis_client: ibis.BaseBackend, table: bigquery.table.Table, - index_col: Iterable[str] | str = (), + table_expression: ibis_types.Table, + index_col: Iterable[str] | str, + api_name: str, ) -> Tuple[List[str], bool]: """ If we can get a total ordering from the table, such as via primary key @@ -214,31 +232,26 @@ def get_index_cols_and_uniqueness( if len(index_cols) == 0: index_cols = _get_primary_keys(table) + # TODO(b/335727141): If table has clustering/partitioning, fail if + # index_cols is empty. + # If there are primary keys defined, the query engine assumes these # columns are unique, even if the constraint is not enforced. We make # the same assumption and use these columns as the total ordering keys. - is_index_unique = True + is_index_unique = len(index_cols) != 0 else: - is_index_unique = _check_index_uniqueness(table, index_cols, api_name) + is_index_unique = _check_index_uniqueness( + bqclient=bqclient, + ibis_client=ibis_client, + # TODO(b/337925142): Avoid a "SELECT *" subquery here by using + # _create_time_travel_sql with just index_cols. + table=table_expression, + index_cols=index_cols, + api_name=api_name, + ) return index_cols, is_index_unique - total_ordering_cols = primary_keys - - # TODO: warn if partitioned and/or clustered except if: - # primary_keys, index_col, or filters - # Except it looks like filters goes through the query path? - - if not index_col and primary_keys is not None: - index_col = primary_keys - - if isinstance(index_col, str): - index_cols = [index_col] - else: - index_cols = list(index_col) - - return index_cols, total_ordering_cols - def get_time_travel_datetime_and_table_metadata( bqclient: bigquery.Client, @@ -293,31 +306,43 @@ def get_time_travel_datetime_and_table_metadata( return cached_table -def to_ibis_table_with_time_travel( - ibis_client: ibis.Backend, - table_ref: bigquery.table.TableReference, - snapshot_timestamp: datetime.datetime, -) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]: - """Create a read-only Ibis table expression representing a table.""" - try: - table_expression = ibis_client.sql( - create_snapshot_sql(table_ref, snapshot_timestamp) - ) - except google.api_core.exceptions.Forbidden as ex: - if "Drive credentials" in ex.message: - ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." - raise - - return table_expression +def to_array_value_with_total_ordering( + session: bigframes.session.Session, + table_expression: ibis_types.Table, + total_ordering_cols: List[str], +) -> core.ArrayValue: + """Create an ArrayValue, assuming we already have a total ordering.""" + ordering = order.ExpressionOrdering( + ordering_value_columns=tuple( + order.ascending_over(column_id) for column_id in total_ordering_cols + ), + total_ordering_columns=frozenset(total_ordering_cols), + ) + column_values = [table_expression[col] for col in table_expression.columns] + return core.ArrayValue.from_ibis( + session, + table_expression, + columns=column_values, + hidden_ordering_columns=[], + ordering=ordering, + ) -def to_array_value_with_total_ordering( - self, +def to_array_value_with_default_ordering( + session: bigframes.session.Session, table: ibis_types.Table, + table_rows: Optional[int], ) -> core.ArrayValue: + """Create an ArrayValue with a deterministic default ordering.""" # Since this might also be used as the index, don't use the default # "ordering ID" name. + + # For small tables, 64 bits is enough to avoid collisions, 128 bits will never ever collide no matter what + # Assume table is large if table row count is unknown + use_double_hash = (table_rows is None) or (table_rows == 0) or (table_rows > 100000) + ordering_hash_part = guid.generate_guid("bigframes_ordering_") + ordering_hash_part2 = guid.generate_guid("bigframes_ordering_") ordering_rand_part = guid.generate_guid("bigframes_ordering_") # All inputs into hash must be non-null or resulting hash will be null @@ -328,27 +353,32 @@ def to_array_value_with_total_ordering( str_values[0].concat(*str_values[1:]) if len(str_values) > 1 else str_values[0] ) full_row_hash = full_row_str.hash().name(ordering_hash_part) + # By modifying value slightly, we get another hash uncorrelated with the first + full_row_hash_p2 = (full_row_str + "_").hash().name(ordering_hash_part2) # Used to disambiguate between identical rows (which will have identical hash) random_value = ibis.random().name(ordering_rand_part) + order_values = ( + [full_row_hash, full_row_hash_p2, random_value] + if use_double_hash + else [full_row_hash, random_value] + ) + original_column_ids = table.columns table_with_ordering = table.select( - itertools.chain(original_column_ids, [full_row_hash, random_value]) + itertools.chain(original_column_ids, order_values) ) - ordering_ref1 = order.ascending_over(ordering_hash_part) - ordering_ref2 = order.ascending_over(ordering_rand_part) ordering = order.ExpressionOrdering( - ordering_value_columns=(ordering_ref1, ordering_ref2), - total_ordering_columns=frozenset([ordering_hash_part, ordering_rand_part]), + ordering_value_columns=tuple( + order.ascending_over(col.get_name()) for col in order_values + ), + total_ordering_columns=frozenset(col.get_name() for col in order_values), ) columns = [table_with_ordering[col] for col in original_column_ids] - hidden_columns = [ - table_with_ordering[ordering_hash_part], - table_with_ordering[ordering_rand_part], - ] + hidden_columns = [table_with_ordering[col.get_name()] for col in order_values] return core.ArrayValue.from_ibis( - self, + session, table_with_ordering, columns, hidden_ordering_columns=hidden_columns, From 05771317c30d377958617e76c9b09064c9f22fed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Tue, 30 Apr 2024 09:28:41 -0500 Subject: [PATCH 16/24] Update bigframes/session/__init__.py --- bigframes/session/__init__.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index e675eeaff1..0f5aa19592 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -718,12 +718,6 @@ def _read_gbq_table( # Fetch table metadata and validate # --------------------------------- - # TODO TODO TODO - # * Validations based on index columns. - # 4. Create index. - # 5. Create ordering. - # TODO TODO TODO - (time_travel_timestamp, table,) = bf_read_gbq_table.get_table_metadata( self.bqclient, table_ref=table_ref, From 204b2db0c790d2a31d770765c7b4a79c1f901890 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 30 Apr 2024 20:31:45 +0000 Subject: [PATCH 17/24] remove some todos --- bigframes/core/blocks.py | 10 +++++++++ bigframes/session/__init__.py | 21 +++---------------- .../session/_io/bigquery/read_gbq_table.py | 8 ++++++- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 4ff8a1836b..402581eb6f 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -116,10 +116,20 @@ def __init__( raise ValueError( f"'index_columns' (size {len(index_columns)}) and 'index_labels' (size {len(index_labels)}) must have equal length" ) + + # If no index columns are set, create one. + # + # Note: get_index_cols_and_uniqueness in + # bigframes/session/_io/bigquery/read_gbq_table.py depends on this + # being as sequential integer index column. If this default behavior + # ever changes, please also update get_index_cols_and_uniqueness so + # that users who explicitly request a sequential integer index can + # still get one. if len(index_columns) == 0: new_index_col_id = guid.generate_guid() expr = expr.promote_offsets(new_index_col_id) index_columns = [new_index_col_id] + self._index_columns = tuple(index_columns) # Index labels don't need complicated hierarchical access so can store as tuple self._index_labels = ( diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index fe16eb2072..f217e001e8 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1153,8 +1153,9 @@ def read_csv( f"{constants.FEEDBACK_LINK}" ) - # TODO(tswast): Looks like we can relax this 1 column restriction if - # we check the contents of an iterable are strings not integers. + # TODO(b/338089659): Looks like we can relax this 1 column + # restriction if we check the contents of an iterable are strings + # not integers. if ( # Empty tuples and None are both allowed and falsey index_col @@ -1703,22 +1704,6 @@ def _prepare_copy_job_config(self) -> bigquery.CopyJobConfig: return job_config - def _start_query_and_wait( - self, - sql: str, - job_config: Optional[bigquery.job.QueryJobConfig] = None, - max_results: Optional[int] = None, - ) -> bigquery.table.RowIterator: - """ - Starts BigQuery query with query_and_wait and waits for results. - """ - job_config = self._prepare_query_job_config(job_config) - return self.bqclient.query_and_wait( - sql, - job_config=job_config, - max_results=max_results, - ) - def _start_query( self, sql: str, diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 5ddfb01008..2a200277d4 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -227,9 +227,15 @@ def get_index_cols_and_uniqueness( if index_col == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64: # User has explicity asked for a default, sequential index. # Use that, even if there are primary keys on the table. + # + # Note: This relies on the default behavior of the Block + # constructor to create a default sequential index. If that ever + # changes, this logic will need to be revisited. return [], False else: - # TODO: Can we test this with a mock? + # Note: It's actually quite difficult to mock this out to unit + # test, as it's not possible to subclass enums in Python. See: + # https://stackoverflow.com/a/33680021/101923 raise NotImplementedError( f"Got unexpected index_col {repr(index_col)}. {bigframes.constants.FEEDBACK_LINK}" ) From adaf664b64c599c7c15273554e3ae40e6b15a9c2 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 30 Apr 2024 22:28:40 +0000 Subject: [PATCH 18/24] add error raising plus todos --- bigframes/session/__init__.py | 8 ++-- .../session/_io/bigquery/read_gbq_table.py | 43 +++++++++++++++++-- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index f217e001e8..2a00c2da7e 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -313,6 +313,9 @@ def read_gbq( filters = list(filters) if len(filters) != 0 or _is_table_with_wildcard_suffix(query_or_table): + # TODO(b/338111344): This appears to be missing index_cols, which + # are necessary to be selected. + # TODO(b/338039517): Also, need to account for primary keys. query_or_table = self._to_query(query_or_table, columns, filters) if _is_query(query_or_table): @@ -326,9 +329,6 @@ def read_gbq( use_cache=use_cache, ) else: - # TODO(swast): Query the snapshot table but mark it as a - # deterministic query so we can avoid serializing if we have a - # unique index. if configuration is not None: raise ValueError( "The 'configuration' argument is not allowed when " @@ -359,6 +359,8 @@ def _to_query( else f"`{query_or_table}`" ) + # TODO(b/338111344): Generate an index based on DefaultIndexKind if we + # don't have index columns specified. select_clause = "SELECT " + ( ", ".join(f"`{column}`" for column in columns) if columns else "*" ) diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 2a200277d4..29d5a5567f 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -207,6 +207,29 @@ def _get_primary_keys( return primary_keys +def _is_table_clustered_or_partitioned( + table: bigquery.table.Table, +) -> bool: + """Returns True if the table is clustered or partitioned.""" + + # Could be None or an empty tuple if it's not clustered, both of which are + # falsey. + if table.clustering_fields: + return True + + if ( + time_partitioning := table.time_partitioning + ) is not None and time_partitioning.type_ is not None: + return True + + if ( + range_partitioning := table.range_partitioning + ) is not None and range_partitioning.field is not None: + return True + + return False + + def get_index_cols_and_uniqueness( bqclient: bigquery.Client, ibis_client: ibis.BaseBackend, @@ -247,14 +270,26 @@ def get_index_cols_and_uniqueness( # If the isn't an index selected, use the primary keys of the table as the # index. If there are no primary keys, we'll return an empty list. if len(index_cols) == 0: - index_cols = _get_primary_keys(table) - - # TODO(b/335727141): If table has clustering/partitioning, fail if - # index_cols is empty. + primary_keys = _get_primary_keys(table) + + # If table has clustering/partitioning, fail if we haven't been able to + # find index_cols to use. This is to avoid unexpected performance and + # resource utilization because of the default sequential index. See + # internal issue 335727141. + if _is_table_clustered_or_partitioned(table) and not primary_keys: + raise bigframes.exceptions.NoDefaultIndexError( + f"Table '{str(table.reference)}' is clustered and/or " + "partitioned, but BigQuery DataFrames was not able to find a " + "suitable index. To avoid this error, set at least one of: " + # TODO(b/338037499): Allow max_results to override this too, + # once we make it more efficient. + "`index_col` or `filters`." + ) # If there are primary keys defined, the query engine assumes these # columns are unique, even if the constraint is not enforced. We make # the same assumption and use these columns as the total ordering keys. + index_cols = primary_keys is_index_unique = len(index_cols) != 0 else: is_index_unique = _check_index_uniqueness( From d028bc526503e01618373db674885c05523c9db6 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 1 May 2024 15:32:23 +0000 Subject: [PATCH 19/24] add TODO for ROW_NUMBER() in the query we generate --- tests/unit/session/test_session.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 7bbd5dee05..1debd659f2 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -296,7 +296,11 @@ def test_no_default_index_error_not_raised_by_read_gbq_filters(table): filters=[("col2", "<", 123)], ) - # We expect a window operation because we specificaly requested a sequential index. + # We expect a window operation because we specificaly requested a + # sequential index. + # TODO(b/338111344): Since this should be falling back to a query for + # filters, we really should be generating ROW_NUMBER() in that query, not + # the query for the DataFrame pointing to the temp table from that query. generated_sql = df.sql.casefold() assert "OVER".casefold() in generated_sql assert "ROW_NUMBER()".casefold() in generated_sql From 658f61d8cffb4bfd9846716ba9124e6bf411b772 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 1 May 2024 16:06:36 +0000 Subject: [PATCH 20/24] remove filters unit test for now --- tests/unit/session/test_session.py | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 1debd659f2..46e4dc555b 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -281,31 +281,6 @@ def test_no_default_index_error_not_raised_by_read_gbq_primary_key(table): assert tuple(df.index.names) == ("pk_1", "pk_2") -@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES) -def test_no_default_index_error_not_raised_by_read_gbq_filters(table): - table = copy.deepcopy(table) - bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) - bqclient.project = "test-project" - bqclient.get_table.return_value = table - session = resources.create_bigquery_session(bqclient=bqclient) - table._properties["location"] = session._location - - # No exception raised because we set the option allowing the default indexes. - df = session.read_gbq( - "my-project.my_dataset.my_table", - filters=[("col2", "<", 123)], - ) - - # We expect a window operation because we specificaly requested a - # sequential index. - # TODO(b/338111344): Since this should be falling back to a query for - # filters, we really should be generating ROW_NUMBER() in that query, not - # the query for the DataFrame pointing to the temp table from that query. - generated_sql = df.sql.casefold() - assert "OVER".casefold() in generated_sql - assert "ROW_NUMBER()".casefold() in generated_sql - - @pytest.mark.parametrize( "not_found_table_id", [("unknown.dataset.table"), ("project.unknown.table"), ("project.dataset.unknown")], From f1b3f88e5b286bc027c6774283f8d36a87b656c1 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 1 May 2024 17:57:54 +0000 Subject: [PATCH 21/24] docstring fixes --- third_party/bigframes_vendored/pandas/io/gbq.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 0dc94138b5..c25dd8776f 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -45,9 +45,7 @@ def read_gbq( * (Recommended) Set the ``index_col`` argument to one or more columns. Unique values for the row labels are recommended. Duplicate labels are possible, but note that joins on a non-unique index can duplicate - rows via pandas-like outer join behavior. Operations like - ``cumsum()`` that window across a non-unique index can have some - unpredictability due to ambiguous ordering. + rows via pandas-like outer join behavior. .. note:: By default, even SQL query inputs with an ORDER BY clause create a @@ -117,7 +115,7 @@ def read_gbq( `project.dataset.tablename` or `dataset.tablename`. Can also take wildcard table name, such as `project.dataset.table_prefix*`. In tha case, will read all the matched table as one DataFrame. - index_col (Iterable[str], str, bigframes.enums.IndexKind): + index_col (Iterable[str], str, bigframes.enums.DefaultIndexKind): Name of result column(s) to use for index in results DataFrame. If an empty iterable, such as ``()``, a default index is @@ -127,7 +125,7 @@ def read_gbq( set, the primary key(s) of the table are used as the index. **New in bigframes version 1.4.0**: Support - :class:`bigframes.enums.TypeKind` to override default index + :class:`bigframes.enums.DefaultIndexKind` to override default index behavior. columns (Iterable[str]): List of BigQuery column names in the desired order for results From 9f3e14923071e7dde56ec901b2b42dbbef838e89 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 2 May 2024 15:49:16 +0000 Subject: [PATCH 22/24] feat: support `index_col=False` in `read_csv` and `engine="bigquery"` --- bigframes/session/__init__.py | 16 +++++---- tests/system/small/test_session.py | 56 ++++++++++-------------------- tests/unit/session/test_session.py | 37 ++++++++++++++++++++ 3 files changed, 65 insertions(+), 44 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 2a00c2da7e..6b84d838cf 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1159,7 +1159,7 @@ def read_csv( # restriction if we check the contents of an iterable are strings # not integers. if ( - # Empty tuples and None are both allowed and falsey + # Empty tuples, None, and False are allowed and falsey. index_col and not isinstance(index_col, bigframes.enums.DefaultIndexKind) and not isinstance(index_col, str) @@ -1168,9 +1168,17 @@ def read_csv( "BigQuery engine only supports a single column name for `index_col`, " f"got: {repr(index_col)}. {constants.FEEDBACK_LINK}" ) + + # None and False cannot be passed to read_gbq. + # TODO(b/338400133): When index_col is None, we should be using the + # first column of the CSV as the index to be compatible with the + # pandas engine. According to the pandas docs, only "False" + # indicates a default sequential index. + if not index_col: + index_col = () + index_col = typing.cast( Union[ - None, Sequence[str], # Falsey values bigframes.enums.DefaultIndexKind, str, @@ -1178,10 +1186,6 @@ def read_csv( index_col, ) - # None value for index_col cannot be passed to read_gbq - if index_col is None: - index_col = () - # usecols should only be an iterable of strings (column names) for use as columns in read_gbq. columns: Tuple[Any, ...] = tuple() if usecols is not None: diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index ec20459c31..2779874d6c 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -633,44 +633,24 @@ def test_read_csv_localbuffer_bq_engine(session, scalars_dfs): pd.testing.assert_series_equal(df.dtypes, scalars_df.dtypes) -@pytest.mark.parametrize( - ("kwargs", "match"), - [ - pytest.param( - {"engine": "bigquery", "names": []}, - "BigQuery engine does not support these arguments", - id="with_names", - ), - pytest.param( - {"engine": "bigquery", "dtype": {}}, - "BigQuery engine does not support these arguments", - id="with_dtype", - ), - pytest.param( - {"engine": "bigquery", "index_col": False}, - "BigQuery engine only supports a single column name for `index_col`.", - id="with_index_col_false", - ), - pytest.param( - {"engine": "bigquery", "index_col": 5}, - "BigQuery engine only supports a single column name for `index_col`.", - id="with_index_col_not_str", - ), - pytest.param( - {"engine": "bigquery", "usecols": [1, 2]}, - "BigQuery engine only supports an iterable of strings for `usecols`.", - id="with_usecols_invalid", - ), - pytest.param( - {"engine": "bigquery", "encoding": "ASCII"}, - "BigQuery engine only supports the following encodings", - id="with_encoding_invalid", - ), - ], -) -def test_read_csv_bq_engine_throws_not_implemented_error(session, kwargs, match): - with pytest.raises(NotImplementedError, match=match): - session.read_csv("", **kwargs) +def test_read_csv_bq_engine_supports_index_col_false( + session, scalars_df_index, gcs_folder +): + path = gcs_folder + "test_read_csv_bq_engine_supports_index_col_false*.csv" + read_path = utils.get_first_file_from_wildcard(path) + scalars_df_index.to_csv(path) + + df = session.read_csv( + read_path, + # Normally, pandas uses the first column as the index. index_col=False + # turns off that behavior. + index_col=False, + ) + assert df.shape[0] == scalars_df_index.shape[0] + + # We use a default index because of index_col=False, so the previous index + # column is just loaded as a column. + assert len(df.columns) == len(scalars_df_index.columns) + 1 @pytest.mark.parametrize( diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 46e4dc555b..70a121435c 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -92,6 +92,43 @@ ] +@pytest.mark.parametrize( + ("kwargs", "match"), + [ + pytest.param( + {"engine": "bigquery", "names": []}, + "BigQuery engine does not support these arguments", + id="with_names", + ), + pytest.param( + {"engine": "bigquery", "dtype": {}}, + "BigQuery engine does not support these arguments", + id="with_dtype", + ), + pytest.param( + {"engine": "bigquery", "index_col": 5}, + "BigQuery engine only supports a single column name for `index_col`.", + id="with_index_col_not_str", + ), + pytest.param( + {"engine": "bigquery", "usecols": [1, 2]}, + "BigQuery engine only supports an iterable of strings for `usecols`.", + id="with_usecols_invalid", + ), + pytest.param( + {"engine": "bigquery", "encoding": "ASCII"}, + "BigQuery engine only supports the following encodings", + id="with_encoding_invalid", + ), + ], +) +def test_read_csv_bq_engine_throws_not_implemented_error(kwargs, match): + session = resources.create_bigquery_session() + + with pytest.raises(NotImplementedError, match=match): + session.read_csv("", **kwargs) + + @pytest.mark.parametrize( ("engine",), ( From e7c4d9319b13444c9ceef8c8cf6ea7d9a076ce1a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 2 May 2024 18:48:14 +0000 Subject: [PATCH 23/24] revert typo --- tests/system/small/test_dataframe_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index 4124dfd23a..e1b0b98357 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -117,7 +117,7 @@ def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index): @pytest.mark.parametrize( - ("index", "index_col"), + ("index",), [True, False], ) def test_to_csv_index( From d136bc0438cf117d66be8d5e3840d526f4b99906 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 2 May 2024 21:21:27 +0000 Subject: [PATCH 24/24] attempt 2 --- tests/system/small/test_dataframe_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index e1b0b98357..f36dd64cbe 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -118,7 +118,7 @@ def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index): @pytest.mark.parametrize( ("index",), - [True, False], + [(True,), (False,)], ) def test_to_csv_index( scalars_dfs: Tuple[bigframes.dataframe.DataFrame, pd.DataFrame],