From 42c70f5ef85c8f310fee5e83768b3e107be85df9 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 13 Oct 2023 17:05:24 +0000 Subject: [PATCH 1/5] fix: use table clone instead of system time for `read_gbq_table` --- bigframes/constants.py | 2 +- bigframes/session/__init__.py | 53 ++++++++----------- bigframes/session/_io/bigquery.py | 70 ++++++++++++++++++-------- tests/unit/session/test_io_bigquery.py | 8 +-- 4 files changed, 74 insertions(+), 59 deletions(-) diff --git a/bigframes/constants.py b/bigframes/constants.py index 82b48dc967..a1ffd2b755 100644 --- a/bigframes/constants.py +++ b/bigframes/constants.py @@ -26,4 +26,4 @@ ABSTRACT_METHOD_ERROR_MESSAGE = f"Abstract method. You have likely encountered a bug. Please share this stacktrace and how you reached it with the BigQuery DataFrames team. {FEEDBACK_LINK}" -DEFAULT_EXPIRATION = datetime.timedelta(days=1) +DEFAULT_EXPIRATION = datetime.timedelta(days=7) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index a1eae69715..6212ae7fc5 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -430,7 +430,9 @@ def _read_gbq_query( index_cols = list(index_col) destination, query_job = self._query_to_destination( - query, index_cols, api_name="read_gbq_query" + query, + index_cols, + api_name=api_name, ) # If there was no destination table, that means the query must have @@ -508,25 +510,29 @@ def _read_gbq_table_to_ibis_with_total_ordering( If we can get a total ordering from the table, such as via primary key column(s), then return those too so that ordering generation can be avoided. - """ - if table_ref.dataset_id.upper() == "_SESSION": - # _SESSION tables aren't supported by the tables.get REST API. - return ( - self.ibis_client.sql( - f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" - ), - None, - ) + For tables that aren't already read-only, this creates Create a table + clone so that any changes to the underlying table don't affect the + DataFrame and break our assumptions, especially with regards to unique + index and ordering. See: + https://cloud.google.com/bigquery/docs/table-clones-create + """ + destination = bigframes_io.create_table_clone( + table_ref, + self._anonymous_dataset, + constants.DEFAULT_EXPIRATION, + self, + api_name, + ) table_expression = self.ibis_client.table( - table_ref.table_id, - database=f"{table_ref.project}.{table_ref.dataset_id}", + destination.table_id, + database=f"{destination.project}.{destination.dataset_id}", ) # If there are primary keys defined, the query engine assumes these # columns are unique, even if the constraint is not enforced. We make # the same assumption and use these columns as the total ordering keys. - table = self.bqclient.get_table(table_ref) + table = self.bqclient.get_table(destination) # TODO(b/305264153): Use public properties to fetch primary keys once # added to google-cloud-bigquery. @@ -535,23 +541,7 @@ def _read_gbq_table_to_ibis_with_total_ordering( .get("primaryKey", {}) .get("columns") ) - - if not primary_keys: - return table_expression, None - else: - # Read from a snapshot since we won't have to copy the table data to create a total ordering. - job_config = bigquery.QueryJobConfig() - job_config.labels["bigframes-api"] = api_name - current_timestamp = list( - self.bqclient.query( - "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", - job_config=job_config, - ).result() - )[0][0] - table_expression = self.ibis_client.sql( - bigframes_io.create_snapshot_sql(table_ref, current_timestamp) - ) - return table_expression, primary_keys + return table_expression, primary_keys def _read_gbq_table( self, @@ -672,9 +662,6 @@ def _read_gbq_table( # The job finished, so we should have a start time. assert current_timestamp is not None - table_expression = self.ibis_client.sql( - bigframes_io.create_snapshot_sql(table_ref, current_timestamp) - ) else: # Make sure when we generate an ordering, the row_number() # coresponds to the index columns. diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index d200a9a861..f20db5fc9c 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -14,14 +14,21 @@ """Private module: Helpers for I/O operations.""" +from __future__ import annotations + import datetime import textwrap import types +import typing from typing import Dict, Iterable, Union import uuid import google.cloud.bigquery as bigquery +if typing.TYPE_CHECKING: + import bigframes.session + + IO_ORDERING_ID = "bqdf_row_nums" TEMP_TABLE_PREFIX = "bqdf{date}_{random_id}" @@ -69,27 +76,52 @@ def create_export_data_statement( ) -def create_snapshot_sql( - table_ref: bigquery.TableReference, current_timestamp: datetime.datetime -) -> str: - """Query a table via 'time travel' for consistent reads.""" +def random_table(dataset: bigquery.DatasetReference) -> bigquery.TableReference: + now = datetime.datetime.now(datetime.timezone.utc) + random_id = uuid.uuid4().hex + table_id = TEMP_TABLE_PREFIX.format( + date=now.strftime("%Y%m%d"), random_id=random_id + ) + return dataset.table(table_id) - # If we have a _SESSION table, assume that it's already a copy. Nothing to do here. - if table_ref.dataset_id.upper() == "_SESSION": - return f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" - # If we have an anonymous query results table, it can't be modified and - # there isn't any BigQuery time travel. - if table_ref.dataset_id.startswith("_"): - return f"SELECT * FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`" +def table_ref_to_sql(table: bigquery.TableReference) -> str: + return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`" - return textwrap.dedent( + +def create_table_clone( + source: bigquery.TableReference, + dataset: bigquery.DatasetReference, + expiration: datetime.timedelta, + session: bigframes.session.Session, + api_name: str, +) -> bigquery.TableReference: + """Create a table clone for consistent reads.""" + now = datetime.datetime.now(datetime.timezone.utc) + expiration_timestamp = now + expiration + fully_qualified_source_id = table_ref_to_sql(source) + destination = random_table(dataset) + fully_qualified_destination_id = table_ref_to_sql(destination) + + # Include a label so that Dataplex Lineage can identify temporary + # tables that BigQuery DataFrames creates. Googlers: See internal issue + # 296779699. + ddl = textwrap.dedent( f""" - SELECT * - FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}` - FOR SYSTEM_TIME AS OF TIMESTAMP({repr(current_timestamp.isoformat())}) + CREATE OR REPLACE TABLE + {fully_qualified_destination_id} + CLONE {fully_qualified_source_id} + OPTIONS( + expiration_timestamp=TIMESTAMP "{expiration_timestamp.isoformat()}", + labels=[ + ("source", "bigquery-dataframes-temp"), + ("bigframes-api", {repr(api_name)}) + ] + ) """ ) + session._start_query(ddl) + return destination def create_temp_table( @@ -98,13 +130,9 @@ def create_temp_table( expiration: datetime.timedelta, ) -> str: """Create an empty table with an expiration in the desired dataset.""" - now = datetime.datetime.now(datetime.timezone.utc) - random_id = uuid.uuid4().hex - table_id = TEMP_TABLE_PREFIX.format( - date=now.strftime("%Y%m%d"), random_id=random_id - ) - table_ref = dataset.table(table_id) + table_ref = random_table(dataset) destination = bigquery.Table(table_ref) + now = datetime.datetime.now(datetime.timezone.utc) destination.expires = now + expiration bqclient.create_table(destination) return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index cb3003b1cc..ff3354ccdd 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -22,12 +22,12 @@ import bigframes.session._io.bigquery -def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): +def test_create_table_clone_doesnt_timetravel_anonymous_datasets(): table_ref = bigquery.TableReference.from_string( "my-test-project._e8166e0cdb.anonbb92cd" ) - sql = bigframes.session._io.bigquery.create_snapshot_sql( + sql = bigframes.core.io.create_table_clone( table_ref, datetime.datetime.now(datetime.timezone.utc) ) @@ -38,10 +38,10 @@ def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql -def test_create_snapshot_sql_doesnt_timetravel_session_tables(): +def test_create_table_clone_doesnt_timetravel_session_datasets(): table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg") - sql = bigframes.session._io.bigquery.create_snapshot_sql( + sql = bigframes.core.io.create_table_clone( table_ref, datetime.datetime.now(datetime.timezone.utc) ) From 8a3791c3f95c29b64ba9cc48b31af3954a8891af Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 2 Nov 2023 21:49:09 +0000 Subject: [PATCH 2/5] accept expiration datetime instead of timedelta for easier testing --- bigframes/dataframe.py | 4 +- bigframes/session/__init__.py | 5 +- bigframes/session/_io/bigquery.py | 20 ++++---- tests/system/small/test_session.py | 3 -- tests/unit/session/test_io_bigquery.py | 64 +++++++++++++++++--------- 5 files changed, 60 insertions(+), 36 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 4932008f09..45dbcdc78d 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -16,6 +16,7 @@ from __future__ import annotations +import datetime import re import textwrap import typing @@ -2309,7 +2310,8 @@ def to_gbq( self._session.bqclient, self._session._anonymous_dataset, # TODO(swast): allow custom expiration times, probably via session configuration. - constants.DEFAULT_EXPIRATION, + datetime.datetime.now(datetime.timezone.utc) + + constants.DEFAULT_EXPIRATION, ) if if_exists is not None and if_exists != "replace": diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 6212ae7fc5..f540073a6a 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -16,6 +16,7 @@ from __future__ import annotations +import datetime import logging import os import re @@ -517,10 +518,12 @@ def _read_gbq_table_to_ibis_with_total_ordering( index and ordering. See: https://cloud.google.com/bigquery/docs/table-clones-create """ + now = datetime.datetime.now(datetime.timezone.utc) destination = bigframes_io.create_table_clone( table_ref, self._anonymous_dataset, - constants.DEFAULT_EXPIRATION, + # TODO(swast): Allow the default expiration to be configured. + now + constants.DEFAULT_EXPIRATION, self, api_name, ) diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index f20db5fc9c..9f9a2119be 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -92,13 +92,16 @@ def table_ref_to_sql(table: bigquery.TableReference) -> str: def create_table_clone( source: bigquery.TableReference, dataset: bigquery.DatasetReference, - expiration: datetime.timedelta, + expiration: datetime.datetime, session: bigframes.session.Session, api_name: str, ) -> bigquery.TableReference: """Create a table clone for consistent reads.""" - now = datetime.datetime.now(datetime.timezone.utc) - expiration_timestamp = now + expiration + # If we have an anonymous query results table, it can't be modified and + # there isn't any BigQuery time travel. + if source.dataset_id.startswith("_"): + return source + fully_qualified_source_id = table_ref_to_sql(source) destination = random_table(dataset) fully_qualified_destination_id = table_ref_to_sql(destination) @@ -112,7 +115,7 @@ def create_table_clone( {fully_qualified_destination_id} CLONE {fully_qualified_source_id} OPTIONS( - expiration_timestamp=TIMESTAMP "{expiration_timestamp.isoformat()}", + expiration_timestamp=TIMESTAMP "{expiration.isoformat()}", labels=[ ("source", "bigquery-dataframes-temp"), ("bigframes-api", {repr(api_name)}) @@ -120,20 +123,21 @@ def create_table_clone( ) """ ) - session._start_query(ddl) + job_config = bigquery.QueryJobConfig() + job_config.labels = {"bigframes-api": api_name} + session._start_query(ddl, job_config=job_config) return destination def create_temp_table( bqclient: bigquery.Client, dataset: bigquery.DatasetReference, - expiration: datetime.timedelta, + expiration: datetime.datetime, ) -> str: """Create an empty table with an expiration in the desired dataset.""" table_ref = random_table(dataset) destination = bigquery.Table(table_ref) - now = datetime.datetime.now(datetime.timezone.utc) - destination.expires = now + expiration + destination.expires = expiration bqclient.create_table(destination) return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index bf72e444eb..28486a1269 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -252,9 +252,6 @@ def test_read_gbq_w_primary_keys_table( sorted_result = result.sort_values(primary_keys) pd.testing.assert_frame_equal(result, sorted_result) - # Verify that we're working from a snapshot rather than a copy of the table. - assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql - @pytest.mark.parametrize( ("query_or_table", "max_results"), diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index ff3354ccdd..7a8691232b 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -19,46 +19,63 @@ import google.cloud.bigquery as bigquery import pytest +import bigframes.session import bigframes.session._io.bigquery -def test_create_table_clone_doesnt_timetravel_anonymous_datasets(): - table_ref = bigquery.TableReference.from_string( +def test_create_table_clone_doesnt_clone_anonymous_datasets(): + session = mock.create_autospec(bigframes.session.Session) + source = bigquery.TableReference.from_string( "my-test-project._e8166e0cdb.anonbb92cd" ) - sql = bigframes.core.io.create_table_clone( - table_ref, datetime.datetime.now(datetime.timezone.utc) + destination = bigframes.session._io.bigquery.create_table_clone( + source, + bigquery.DatasetReference("other-project", "other_dataset"), + datetime.datetime(2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc), + session, + "test_api", ) - # Anonymous query results tables don't support time travel. - assert "SYSTEM_TIME" not in sql + # Anonymous query results tables don't support CLONE + assert destination is source + session._start_query.assert_not_called() - # Need fully-qualified table name. - assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql - -def test_create_table_clone_doesnt_timetravel_session_datasets(): - table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg") - - sql = bigframes.core.io.create_table_clone( - table_ref, datetime.datetime.now(datetime.timezone.utc) +def test_create_table_clone_sets_expiration(): + session = mock.create_autospec(bigframes.session.Session) + source = bigquery.TableReference.from_string( + "my-test-project.test_dataset.some_table" ) - # We aren't modifying _SESSION tables, so don't use time travel. - assert "SYSTEM_TIME" not in sql + expiration = datetime.datetime( + 2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc + ) + bigframes.session._io.bigquery.create_table_clone( + source, + bigquery.DatasetReference("other-project", "other_dataset"), + expiration, + session, + "test_api", + ) - # Don't need the project ID for _SESSION tables. - assert "my-test-project" not in sql + session._start_query.assert_called_once() + call_args = session._start_query.call_args + query = call_args.args[0] + assert "CREATE OR REPLACE TABLE" in query + assert "CLONE" in query + assert f'expiration_timestamp=TIMESTAMP "{expiration.isoformat()}"' in query + assert '("source", "bigquery-dataframes-temp")' in query + assert call_args.kwargs["job_config"].labels["bigframes-api"] == "test_api" def test_create_temp_table_default_expiration(): """Make sure the created table has an expiration.""" bqclient = mock.create_autospec(bigquery.Client) dataset = bigquery.DatasetReference("test-project", "test_dataset") - now = datetime.datetime.now(datetime.timezone.utc) - expiration = datetime.timedelta(days=3) - expected_expires = now + expiration + expiration = datetime.datetime( + 2023, 11, 2, 13, 44, 55, 678901, datetime.timezone.utc + ) bigframes.session._io.bigquery.create_temp_table(bqclient, dataset, expiration) @@ -68,10 +85,11 @@ def test_create_temp_table_default_expiration(): assert table.project == "test-project" assert table.dataset_id == "test_dataset" assert table.table_id.startswith("bqdf") + # TODO(swast): Why isn't the expiration exactly what we set it to? assert ( - (expected_expires - datetime.timedelta(minutes=1)) + (expiration - datetime.timedelta(minutes=1)) < table.expires - < (expected_expires + datetime.timedelta(minutes=1)) + < (expiration + datetime.timedelta(minutes=1)) ) From 808b6ef978904c7cb89a4d2bf1f2e5c00b7fb10e Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 3 Nov 2023 00:41:15 +0000 Subject: [PATCH 3/5] don't use table clone on _session tables --- bigframes/session/__init__.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index f540073a6a..b4143ce14b 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -518,6 +518,15 @@ def _read_gbq_table_to_ibis_with_total_ordering( index and ordering. See: https://cloud.google.com/bigquery/docs/table-clones-create """ + if table_ref.dataset_id.upper() == "_SESSION": + # _SESSION tables aren't supported by the tables.get REST API. + return ( + self.ibis_client.sql( + f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" + ), + None, + ) + now = datetime.datetime.now(datetime.timezone.utc) destination = bigframes_io.create_table_clone( table_ref, From 5d0308a3c7d4ff6dc488cbd3a70834a8dcb37847 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 3 Nov 2023 00:45:50 +0000 Subject: [PATCH 4/5] remove unnecessary assert --- bigframes/session/__init__.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b4143ce14b..12ee91a13a 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -664,17 +664,7 @@ def _read_gbq_table( total_ordering_columns=frozenset(index_cols), ) - # We have a total ordering, so query via "time travel" so that - # the underlying data doesn't mutate. - if is_total_ordering: - # Get the timestamp from the job metadata rather than the query - # text so that the query for determining uniqueness of the ID - # columns can be cached. - current_timestamp = query_job.started - - # The job finished, so we should have a start time. - assert current_timestamp is not None - else: + if not is_total_ordering: # Make sure when we generate an ordering, the row_number() # coresponds to the index columns. table_expression = table_expression.order_by(index_cols) From bc74273fc5ad5f533ff11225370345975312d7b7 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 3 Nov 2023 15:44:39 +0000 Subject: [PATCH 5/5] add docstrings --- bigframes/session/_io/bigquery.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index 9f9a2119be..fd3b1c59a7 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -77,6 +77,17 @@ def create_export_data_statement( def random_table(dataset: bigquery.DatasetReference) -> bigquery.TableReference: + """Generate a random table ID with BigQuery DataFrames prefix. + + Args: + dataset (google.cloud.bigquery.DatasetReference): + The dataset to make the table reference in. Usually the anonymous + dataset for the session. + + Returns: + google.cloud.bigquery.TableReference: + Fully qualified table ID of a table that doesn't exist. + """ now = datetime.datetime.now(datetime.timezone.utc) random_id = uuid.uuid4().hex table_id = TEMP_TABLE_PREFIX.format( @@ -86,6 +97,7 @@ def random_table(dataset: bigquery.DatasetReference) -> bigquery.TableReference: def table_ref_to_sql(table: bigquery.TableReference) -> str: + """Format a table reference as escaped SQL.""" return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`" @@ -124,7 +136,10 @@ def create_table_clone( """ ) job_config = bigquery.QueryJobConfig() - job_config.labels = {"bigframes-api": api_name} + job_config.labels = { + "source": "bigquery-dataframes-temp", + "bigframes-api": api_name, + } session._start_query(ddl, job_config=job_config) return destination