diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 12ee91a13a..f564cbf174 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -16,7 +16,6 @@ from __future__ import annotations -import datetime import logging import os import re @@ -511,12 +510,6 @@ def _read_gbq_table_to_ibis_with_total_ordering( If we can get a total ordering from the table, such as via primary key column(s), then return those too so that ordering generation can be avoided. - - For tables that aren't already read-only, this creates Create a table - clone so that any changes to the underlying table don't affect the - DataFrame and break our assumptions, especially with regards to unique - index and ordering. See: - https://cloud.google.com/bigquery/docs/table-clones-create """ if table_ref.dataset_id.upper() == "_SESSION": # _SESSION tables aren't supported by the tables.get REST API. @@ -527,24 +520,15 @@ def _read_gbq_table_to_ibis_with_total_ordering( None, ) - now = datetime.datetime.now(datetime.timezone.utc) - destination = bigframes_io.create_table_clone( - table_ref, - self._anonymous_dataset, - # TODO(swast): Allow the default expiration to be configured. - now + constants.DEFAULT_EXPIRATION, - self, - api_name, - ) table_expression = self.ibis_client.table( - destination.table_id, - database=f"{destination.project}.{destination.dataset_id}", + table_ref.table_id, + database=f"{table_ref.project}.{table_ref.dataset_id}", ) # If there are primary keys defined, the query engine assumes these # columns are unique, even if the constraint is not enforced. We make # the same assumption and use these columns as the total ordering keys. - table = self.bqclient.get_table(destination) + table = self.bqclient.get_table(table_ref) # TODO(b/305264153): Use public properties to fetch primary keys once # added to google-cloud-bigquery. @@ -553,7 +537,23 @@ def _read_gbq_table_to_ibis_with_total_ordering( .get("primaryKey", {}) .get("columns") ) - return table_expression, primary_keys + + if not primary_keys: + return table_expression, None + else: + # Read from a snapshot since we won't have to copy the table data to create a total ordering. + job_config = bigquery.QueryJobConfig() + job_config.labels["bigframes-api"] = api_name + current_timestamp = list( + self.bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + table_expression = self.ibis_client.sql( + bigframes_io.create_snapshot_sql(table_ref, current_timestamp) + ) + return table_expression, primary_keys def _read_gbq_table( self, @@ -664,7 +664,20 @@ def _read_gbq_table( total_ordering_columns=frozenset(index_cols), ) - if not is_total_ordering: + # We have a total ordering, so query via "time travel" so that + # the underlying data doesn't mutate. + if is_total_ordering: + # Get the timestamp from the job metadata rather than the query + # text so that the query for determining uniqueness of the ID + # columns can be cached. + current_timestamp = query_job.started + + # The job finished, so we should have a start time. + assert current_timestamp is not None + table_expression = self.ibis_client.sql( + bigframes_io.create_snapshot_sql(table_ref, current_timestamp) + ) + else: # Make sure when we generate an ordering, the row_number() # coresponds to the index columns. table_expression = table_expression.order_by(index_cols) @@ -790,7 +803,7 @@ def _read_gbq_with_ordering( def _read_bigquery_load_job( self, filepath_or_buffer: str | IO["bytes"], - table: bigquery.Table, + table: Union[bigquery.Table, bigquery.TableReference], *, job_config: bigquery.LoadJobConfig, index_col: Iterable[str] | str = (), @@ -1026,7 +1039,7 @@ def read_csv( encoding: Optional[str] = None, **kwargs, ) -> dataframe.DataFrame: - table = bigquery.Table(self._create_session_table()) + table = bigframes_io.random_table(self._anonymous_dataset) if engine is not None and engine == "bigquery": if any(param is not None for param in (dtype, names)): @@ -1140,7 +1153,7 @@ def read_parquet( # Note: "engine" is omitted because it is redundant. Loading a table # from a pandas DataFrame will just create another parquet file + load # job anyway. - table = bigquery.Table(self._create_session_table()) + table = bigframes_io.random_table(self._anonymous_dataset) job_config = bigquery.LoadJobConfig() job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED @@ -1163,7 +1176,7 @@ def read_json( engine: Literal["ujson", "pyarrow", "bigquery"] = "ujson", **kwargs, ) -> dataframe.DataFrame: - table = bigquery.Table(self._create_session_table()) + table = bigframes_io.random_table(self._anonymous_dataset) if engine == "bigquery": diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index fd3b1c59a7..06d240fec6 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -19,16 +19,11 @@ import datetime import textwrap import types -import typing from typing import Dict, Iterable, Union import uuid import google.cloud.bigquery as bigquery -if typing.TYPE_CHECKING: - import bigframes.session - - IO_ORDERING_ID = "bqdf_row_nums" TEMP_TABLE_PREFIX = "bqdf{date}_{random_id}" @@ -78,12 +73,10 @@ def create_export_data_statement( def random_table(dataset: bigquery.DatasetReference) -> bigquery.TableReference: """Generate a random table ID with BigQuery DataFrames prefix. - Args: dataset (google.cloud.bigquery.DatasetReference): The dataset to make the table reference in. Usually the anonymous dataset for the session. - Returns: google.cloud.bigquery.TableReference: Fully qualified table ID of a table that doesn't exist. @@ -101,47 +94,27 @@ def table_ref_to_sql(table: bigquery.TableReference) -> str: return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`" -def create_table_clone( - source: bigquery.TableReference, - dataset: bigquery.DatasetReference, - expiration: datetime.datetime, - session: bigframes.session.Session, - api_name: str, -) -> bigquery.TableReference: - """Create a table clone for consistent reads.""" +def create_snapshot_sql( + table_ref: bigquery.TableReference, current_timestamp: datetime.datetime +) -> str: + """Query a table via 'time travel' for consistent reads.""" + + # If we have a _SESSION table, assume that it's already a copy. Nothing to do here. + if table_ref.dataset_id.upper() == "_SESSION": + return f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" + # If we have an anonymous query results table, it can't be modified and # there isn't any BigQuery time travel. - if source.dataset_id.startswith("_"): - return source - - fully_qualified_source_id = table_ref_to_sql(source) - destination = random_table(dataset) - fully_qualified_destination_id = table_ref_to_sql(destination) + if table_ref.dataset_id.startswith("_"): + return f"SELECT * FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`" - # Include a label so that Dataplex Lineage can identify temporary - # tables that BigQuery DataFrames creates. Googlers: See internal issue - # 296779699. - ddl = textwrap.dedent( + return textwrap.dedent( f""" - CREATE OR REPLACE TABLE - {fully_qualified_destination_id} - CLONE {fully_qualified_source_id} - OPTIONS( - expiration_timestamp=TIMESTAMP "{expiration.isoformat()}", - labels=[ - ("source", "bigquery-dataframes-temp"), - ("bigframes-api", {repr(api_name)}) - ] - ) + SELECT * + FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}` + FOR SYSTEM_TIME AS OF TIMESTAMP({repr(current_timestamp.isoformat())}) """ ) - job_config = bigquery.QueryJobConfig() - job_config.labels = { - "source": "bigquery-dataframes-temp", - "bigframes-api": api_name, - } - session._start_query(ddl, job_config=job_config) - return destination def create_temp_table( diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 28486a1269..bf72e444eb 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -252,6 +252,9 @@ def test_read_gbq_w_primary_keys_table( sorted_result = result.sort_values(primary_keys) pd.testing.assert_frame_equal(result, sorted_result) + # Verify that we're working from a snapshot rather than a copy of the table. + assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql + @pytest.mark.parametrize( ("query_or_table", "max_results"), diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 7a8691232b..03470208e4 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -19,54 +19,37 @@ import google.cloud.bigquery as bigquery import pytest -import bigframes.session import bigframes.session._io.bigquery -def test_create_table_clone_doesnt_clone_anonymous_datasets(): - session = mock.create_autospec(bigframes.session.Session) - source = bigquery.TableReference.from_string( +def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): + table_ref = bigquery.TableReference.from_string( "my-test-project._e8166e0cdb.anonbb92cd" ) - destination = bigframes.session._io.bigquery.create_table_clone( - source, - bigquery.DatasetReference("other-project", "other_dataset"), - datetime.datetime(2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc), - session, - "test_api", + sql = bigframes.session._io.bigquery.create_snapshot_sql( + table_ref, datetime.datetime.now(datetime.timezone.utc) ) - # Anonymous query results tables don't support CLONE - assert destination is source - session._start_query.assert_not_called() + # Anonymous query results tables don't support time travel. + assert "SYSTEM_TIME" not in sql + # Need fully-qualified table name. + assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql -def test_create_table_clone_sets_expiration(): - session = mock.create_autospec(bigframes.session.Session) - source = bigquery.TableReference.from_string( - "my-test-project.test_dataset.some_table" - ) - expiration = datetime.datetime( - 2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc - ) - bigframes.session._io.bigquery.create_table_clone( - source, - bigquery.DatasetReference("other-project", "other_dataset"), - expiration, - session, - "test_api", +def test_create_snapshot_sql_doesnt_timetravel_session_tables(): + table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg") + + sql = bigframes.session._io.bigquery.create_snapshot_sql( + table_ref, datetime.datetime.now(datetime.timezone.utc) ) - session._start_query.assert_called_once() - call_args = session._start_query.call_args - query = call_args.args[0] - assert "CREATE OR REPLACE TABLE" in query - assert "CLONE" in query - assert f'expiration_timestamp=TIMESTAMP "{expiration.isoformat()}"' in query - assert '("source", "bigquery-dataframes-temp")' in query - assert call_args.kwargs["job_config"].labels["bigframes-api"] == "test_api" + # We aren't modifying _SESSION tables, so don't use time travel. + assert "SYSTEM_TIME" not in sql + + # Don't need the project ID for _SESSION tables. + assert "my-test-project" not in sql def test_create_temp_table_default_expiration(): @@ -85,7 +68,6 @@ def test_create_temp_table_default_expiration(): assert table.project == "test-project" assert table.dataset_id == "test_dataset" assert table.table_id.startswith("bqdf") - # TODO(swast): Why isn't the expiration exactly what we set it to? assert ( (expiration - datetime.timedelta(minutes=1)) < table.expires