diff --git a/bigframes/constants.py b/bigframes/constants.py index 90837c79eb..82b48dc967 100644 --- a/bigframes/constants.py +++ b/bigframes/constants.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime + """Constants used across BigQuery DataFrames. This module should not depend on any others in the package. @@ -23,3 +25,5 @@ ) ABSTRACT_METHOD_ERROR_MESSAGE = f"Abstract method. You have likely encountered a bug. Please share this stacktrace and how you reached it with the BigQuery DataFrames team. {FEEDBACK_LINK}" + +DEFAULT_EXPIRATION = datetime.timedelta(days=1) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index ffcaf0d613..4932008f09 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2289,25 +2289,51 @@ def to_json( def to_gbq( self, - destination_table: str, + destination_table: Optional[str] = None, *, - if_exists: Optional[Literal["fail", "replace", "append"]] = "fail", + if_exists: Optional[Literal["fail", "replace", "append"]] = None, index: bool = True, ordering_id: Optional[str] = None, - ) -> None: - if "." not in destination_table: - raise ValueError( - "Invalid Table Name. Should be of the form 'datasetId.tableId' or " - "'projectId.datasetId.tableId'" - ) - + ) -> str: dispositions = { "fail": bigquery.WriteDisposition.WRITE_EMPTY, "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, "append": bigquery.WriteDisposition.WRITE_APPEND, } + + if destination_table is None: + # TODO(swast): If there have been no modifications to the DataFrame + # since the last time it was written (cached), then return that. + # For `read_gbq` nodes, return the underlying table clone. + destination_table = bigframes.session._io.bigquery.create_temp_table( + self._session.bqclient, + self._session._anonymous_dataset, + # TODO(swast): allow custom expiration times, probably via session configuration. + constants.DEFAULT_EXPIRATION, + ) + + if if_exists is not None and if_exists != "replace": + raise ValueError( + f"Got invalid value {repr(if_exists)} for if_exists. " + "When no destination table is specified, a new table is always created. " + "None or 'replace' are the only valid options in this case." + ) + if_exists = "replace" + + if "." not in destination_table: + raise ValueError( + f"Got invalid value for destination_table {repr(destination_table)}. " + "Should be of the form 'datasetId.tableId' or 'projectId.datasetId.tableId'." + ) + + if if_exists is None: + if_exists = "fail" + if if_exists not in dispositions: - raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) + raise ValueError( + f"Got invalid value {repr(if_exists)} for if_exists. " + f"Valid options include None or one of {dispositions.keys()}." + ) job_config = bigquery.QueryJobConfig( write_disposition=dispositions[if_exists], @@ -2318,6 +2344,7 @@ def to_gbq( ) self._run_io_query(index=index, ordering_id=ordering_id, job_config=job_config) + return destination_table def to_numpy( self, dtype=None, copy=False, na_value=None, **kwargs diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 5a61ed534f..a1eae69715 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -223,6 +223,17 @@ def _create_and_bind_bq_session(self): query_job.result() # blocks until finished self._session_id = query_job.session_info.session_id + # The anonymous dataset is used by BigQuery to write query results and + # session tables. BigQuery DataFrames also writes temp tables directly + # to the dataset, no BigQuery Session required. Note: there is a + # different anonymous dataset per location. See: + # https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored + query_destination = query_job.destination + self._anonymous_dataset = bigquery.DatasetReference( + query_destination.project, + query_destination.dataset_id, + ) + self.bqclient.default_query_job_config = bigquery.QueryJobConfig( connection_properties=[ bigquery.ConnectionProperty("session_id", self._session_id) diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index d47efbdddc..d200a9a861 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -18,10 +18,12 @@ import textwrap import types from typing import Dict, Iterable, Union +import uuid import google.cloud.bigquery as bigquery IO_ORDERING_ID = "bqdf_row_nums" +TEMP_TABLE_PREFIX = "bqdf{date}_{random_id}" def create_export_csv_statement( @@ -90,6 +92,24 @@ def create_snapshot_sql( ) +def create_temp_table( + bqclient: bigquery.Client, + dataset: bigquery.DatasetReference, + expiration: datetime.timedelta, +) -> str: + """Create an empty table with an expiration in the desired dataset.""" + now = datetime.datetime.now(datetime.timezone.utc) + random_id = uuid.uuid4().hex + table_id = TEMP_TABLE_PREFIX.format( + date=now.strftime("%Y%m%d"), random_id=random_id + ) + table_ref = dataset.table(table_id) + destination = bigquery.Table(table_ref) + destination.expires = now + expiration + bqclient.create_table(destination) + return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" + + # BigQuery REST API returns types in Legacy SQL format # https://cloud.google.com/bigquery/docs/data-types but we use Standard SQL # names diff --git a/tests/unit/resources.py b/tests/unit/resources.py index f660d774f0..8fc8acd175 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -19,17 +19,21 @@ import google.cloud.bigquery import ibis import pandas +import pytest import bigframes import bigframes.core as core import bigframes.core.ordering +import bigframes.dataframe import bigframes.session.clients """Utilities for creating test resources.""" def create_bigquery_session( - bqclient: Optional[google.cloud.bigquery.Client] = None, session_id: str = "abcxyz" + bqclient: Optional[mock.Mock] = None, + session_id: str = "abcxyz", + anonymous_dataset: Optional[google.cloud.bigquery.DatasetReference] = None, ) -> bigframes.Session: credentials = mock.create_autospec( google.auth.credentials.Credentials, instance=True @@ -39,6 +43,21 @@ def create_bigquery_session( bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" + if anonymous_dataset is None: + anonymous_dataset = google.cloud.bigquery.DatasetReference( + "test-project", + "test_dataset", + ) + + query_job = mock.create_autospec(google.cloud.bigquery.QueryJob) + type(query_job).destination = mock.PropertyMock( + return_value=anonymous_dataset.table("test_table"), + ) + type(query_job).session_info = google.cloud.bigquery.SessionInfo( + {"sessionInfo": {"sessionId": session_id}}, + ) + bqclient.query.return_value = query_job + clients_provider = mock.create_autospec(bigframes.session.clients.ClientsProvider) type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient) clients_provider._credentials = credentials @@ -51,6 +70,19 @@ def create_bigquery_session( return session +def create_dataframe( + monkeypatch: pytest.MonkeyPatch, session: Optional[bigframes.Session] = None +) -> bigframes.dataframe.DataFrame: + if session is None: + session = create_bigquery_session() + + # Since this may create a ReadLocalNode, the session we explicitly pass in + # might not actually be used. Mock out the global session, too. + monkeypatch.setattr(bigframes.core.global_session, "_global_session", session) + bigframes.options.bigquery._session_started = True + return bigframes.dataframe.DataFrame({}, session=session) + + def create_pandas_session(tables: Dict[str, pandas.DataFrame]) -> bigframes.Session: # TODO(tswast): Refactor to make helper available for all tests. Consider # providing a proper "local Session" for use by downstream developers. diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index d2255d5edf..cb3003b1cc 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -14,6 +14,7 @@ import datetime from typing import Iterable +import unittest.mock as mock import google.cloud.bigquery as bigquery import pytest @@ -37,7 +38,7 @@ def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql -def test_create_snapshot_sql_doesnt_timetravel_session_datasets(): +def test_create_snapshot_sql_doesnt_timetravel_session_tables(): table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg") sql = bigframes.session._io.bigquery.create_snapshot_sql( @@ -51,6 +52,29 @@ def test_create_snapshot_sql_doesnt_timetravel_session_datasets(): assert "my-test-project" not in sql +def test_create_temp_table_default_expiration(): + """Make sure the created table has an expiration.""" + bqclient = mock.create_autospec(bigquery.Client) + dataset = bigquery.DatasetReference("test-project", "test_dataset") + now = datetime.datetime.now(datetime.timezone.utc) + expiration = datetime.timedelta(days=3) + expected_expires = now + expiration + + bigframes.session._io.bigquery.create_temp_table(bqclient, dataset, expiration) + + bqclient.create_table.assert_called_once() + call_args = bqclient.create_table.call_args + table = call_args.args[0] + assert table.project == "test-project" + assert table.dataset_id == "test_dataset" + assert table.table_id.startswith("bqdf") + assert ( + (expected_expires - datetime.timedelta(minutes=1)) + < table.expires + < (expected_expires + datetime.timedelta(minutes=1)) + ) + + @pytest.mark.parametrize( ("schema", "expected"), ( diff --git a/tests/unit/test_dataframe.py b/tests/unit/test_dataframe.py new file mode 100644 index 0000000000..17a8290889 --- /dev/null +++ b/tests/unit/test_dataframe.py @@ -0,0 +1,59 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import google.cloud.bigquery +import pytest + +from . import resources + + +def test_dataframe_to_gbq_invalid_destination(monkeypatch: pytest.MonkeyPatch): + dataframe = resources.create_dataframe(monkeypatch) + + with pytest.raises(ValueError, match="no_dataset_or_project"): + dataframe.to_gbq("no_dataset_or_project") + + +def test_dataframe_to_gbq_invalid_if_exists(monkeypatch: pytest.MonkeyPatch): + dataframe = resources.create_dataframe(monkeypatch) + + with pytest.raises(ValueError, match="notreallyanoption"): + # Even though the type is annotated with the literals we accept, users + # might not be using a type checker, especially not in an interactive + # notebook. + dataframe.to_gbq(if_exists="notreallyanoption") # type: ignore + + +def test_dataframe_to_gbq_invalid_if_exists_no_destination( + monkeypatch: pytest.MonkeyPatch, +): + dataframe = resources.create_dataframe(monkeypatch) + + with pytest.raises(ValueError, match="append"): + dataframe.to_gbq(if_exists="append") + + +def test_dataframe_to_gbq_writes_to_anonymous_dataset( + monkeypatch: pytest.MonkeyPatch, +): + anonymous_dataset_id = "my-anonymous-project.my_anonymous_dataset" + anonymous_dataset = google.cloud.bigquery.DatasetReference.from_string( + anonymous_dataset_id + ) + session = resources.create_bigquery_session(anonymous_dataset=anonymous_dataset) + dataframe = resources.create_dataframe(monkeypatch, session=session) + + destination = dataframe.to_gbq() + + assert destination.startswith(anonymous_dataset_id) diff --git a/tests/unit/test_pandas.py b/tests/unit/test_pandas.py index 5d4f69c7c0..70c5441c68 100644 --- a/tests/unit/test_pandas.py +++ b/tests/unit/test_pandas.py @@ -116,7 +116,7 @@ def test_pandas_attribute(): assert bpd.ArrowDtype is pd.ArrowDtype -def test_close_session_after_bq_session_ended(monkeypatch): +def test_close_session_after_bq_session_ended(monkeypatch: pytest.MonkeyPatch): bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" session = resources.create_bigquery_session( @@ -141,7 +141,7 @@ def test_close_session_after_bq_session_ended(monkeypatch): google.api_core.exceptions.BadRequest, match="Session JUST_A_TEST has expired and is no longer available.", ): - bpd.read_gbq("SELECT 1") + bpd.read_gbq("SELECT 'ABC'") # Even though the query to stop the session raises an exception, we should # still be able to close it without raising an error to the user. diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 12bd053179..e267fac0f7 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -125,12 +125,12 @@ def to_numpy(self, dtype=None, copy=False, na_value=None, **kwargs) -> np.ndarra def to_gbq( self, - destination_table: str, + destination_table: Optional[str], *, - if_exists: Optional[Literal["fail", "replace", "append"]] = "fail", + if_exists: Optional[Literal["fail", "replace", "append"]] = None, index: bool = True, ordering_id: Optional[str] = None, - ) -> None: + ) -> str: """Write a DataFrame to a BigQuery table. **Examples:** @@ -138,17 +138,40 @@ def to_gbq( >>> import bigframes.pandas as bpd >>> bpd.options.display.progress_bar = None + Write a DataFrame to a BigQuery table. + >>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) >>> # destination_table = PROJECT_ID + "." + DATASET_ID + "." + TABLE_NAME >>> df.to_gbq("bigframes-dev.birds.test-numbers", if_exists="replace") + 'bigframes-dev.birds.test-numbers' + + Write a DataFrame to a temporary BigQuery table in the anonymous dataset. + + >>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) + >>> destination = df.to_gbq(ordering_id="ordering_id") + >>> # The table created can be read outside of the current session. + >>> bpd.close_session() # For demonstration, only. + >>> bpd.read_gbq(destination, index_col="ordering_id") + col1 col2 + ordering_id + 0 1 3 + 1 2 4 + + [2 rows x 2 columns] Args: - destination_table (str): + destination_table (Optional[str]): Name of table to be written, in the form ``dataset.tablename`` or ``project.dataset.tablename``. - if_exists (str, default 'fail'): - Behavior when the destination table exists. Value can be one of: + If no ``destination_table`` is set, a new temporary table is + created in the BigQuery anonymous dataset. + + if_exists (Optional[str]): + Behavior when the destination table exists. When + ``destination_table`` is set, this defaults to ``'fail'``. When + ``destination_table`` is not set, this field is not applicable. + A new table is always created. Value can be one of: ``'fail'`` If table exists raise pandas_gbq.gbq.TableCreationError. @@ -163,6 +186,11 @@ def to_gbq( ordering_id (Optional[str], default None): If set, write the ordering of the DataFrame as a column in the result table with this name. + + Returns: + str: + The fully-qualified ID for the written table, in the form + ``project.dataset.tablename``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)