Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: to_gbq without a destination table writes to a temporary table #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions 4 bigframes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime

"""Constants used across BigQuery DataFrames.

This module should not depend on any others in the package.
Expand All @@ -23,3 +25,5 @@
)

ABSTRACT_METHOD_ERROR_MESSAGE = f"Abstract method. You have likely encountered a bug. Please share this stacktrace and how you reached it with the BigQuery DataFrames team. {FEEDBACK_LINK}"

DEFAULT_EXPIRATION = datetime.timedelta(days=1)
47 changes: 37 additions & 10 deletions 47 bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2289,25 +2289,51 @@ def to_json(

def to_gbq(
self,
destination_table: str,
destination_table: Optional[str] = None,
*,
if_exists: Optional[Literal["fail", "replace", "append"]] = "fail",
if_exists: Optional[Literal["fail", "replace", "append"]] = None,
index: bool = True,
ordering_id: Optional[str] = None,
) -> None:
if "." not in destination_table:
raise ValueError(
"Invalid Table Name. Should be of the form 'datasetId.tableId' or "
"'projectId.datasetId.tableId'"
)

) -> str:
dispositions = {
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
"append": bigquery.WriteDisposition.WRITE_APPEND,
}

if destination_table is None:
# TODO(swast): If there have been no modifications to the DataFrame
# since the last time it was written (cached), then return that.
# For `read_gbq` nodes, return the underlying table clone.
destination_table = bigframes.session._io.bigquery.create_temp_table(
self._session.bqclient,
self._session._anonymous_dataset,
# TODO(swast): allow custom expiration times, probably via session configuration.
constants.DEFAULT_EXPIRATION,
)

if if_exists is not None and if_exists != "replace":
raise ValueError(
f"Got invalid value {repr(if_exists)} for if_exists. "
"When no destination table is specified, a new table is always created. "
"None or 'replace' are the only valid options in this case."
)
if_exists = "replace"

if "." not in destination_table:
raise ValueError(
f"Got invalid value for destination_table {repr(destination_table)}. "
"Should be of the form 'datasetId.tableId' or 'projectId.datasetId.tableId'."
)

if if_exists is None:
if_exists = "fail"

if if_exists not in dispositions:
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))
raise ValueError(
f"Got invalid value {repr(if_exists)} for if_exists. "
f"Valid options include None or one of {dispositions.keys()}."
)

job_config = bigquery.QueryJobConfig(
write_disposition=dispositions[if_exists],
Expand All @@ -2318,6 +2344,7 @@ def to_gbq(
)

self._run_io_query(index=index, ordering_id=ordering_id, job_config=job_config)
return destination_table

def to_numpy(
self, dtype=None, copy=False, na_value=None, **kwargs
Expand Down
11 changes: 11 additions & 0 deletions 11 bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,17 @@ def _create_and_bind_bq_session(self):
query_job.result() # blocks until finished
self._session_id = query_job.session_info.session_id

# The anonymous dataset is used by BigQuery to write query results and
# session tables. BigQuery DataFrames also writes temp tables directly
# to the dataset, no BigQuery Session required. Note: there is a
tswast marked this conversation as resolved.
Show resolved Hide resolved
# different anonymous dataset per location. See:
# https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored
query_destination = query_job.destination
self._anonymous_dataset = bigquery.DatasetReference(
query_destination.project,
query_destination.dataset_id,
)

self.bqclient.default_query_job_config = bigquery.QueryJobConfig(
connection_properties=[
bigquery.ConnectionProperty("session_id", self._session_id)
Expand Down
20 changes: 20 additions & 0 deletions 20 bigframes/session/_io/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import textwrap
import types
from typing import Dict, Iterable, Union
import uuid

import google.cloud.bigquery as bigquery

IO_ORDERING_ID = "bqdf_row_nums"
TEMP_TABLE_PREFIX = "bqdf{date}_{random_id}"


def create_export_csv_statement(
Expand Down Expand Up @@ -90,6 +92,24 @@ def create_snapshot_sql(
)


def create_temp_table(
bqclient: bigquery.Client,
dataset: bigquery.DatasetReference,
expiration: datetime.timedelta,
) -> str:
"""Create an empty table with an expiration in the desired dataset."""
now = datetime.datetime.now(datetime.timezone.utc)
random_id = uuid.uuid4().hex
table_id = TEMP_TABLE_PREFIX.format(
date=now.strftime("%Y%m%d"), random_id=random_id
)
table_ref = dataset.table(table_id)
destination = bigquery.Table(table_ref)
destination.expires = now + expiration
bqclient.create_table(destination)
return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"


# BigQuery REST API returns types in Legacy SQL format
# https://cloud.google.com/bigquery/docs/data-types but we use Standard SQL
# names
Expand Down
34 changes: 33 additions & 1 deletion 34 tests/unit/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@
import google.cloud.bigquery
import ibis
import pandas
import pytest

import bigframes
import bigframes.core as core
import bigframes.core.ordering
import bigframes.dataframe
import bigframes.session.clients

"""Utilities for creating test resources."""


def create_bigquery_session(
bqclient: Optional[google.cloud.bigquery.Client] = None, session_id: str = "abcxyz"
bqclient: Optional[mock.Mock] = None,
session_id: str = "abcxyz",
anonymous_dataset: Optional[google.cloud.bigquery.DatasetReference] = None,
) -> bigframes.Session:
credentials = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
Expand All @@ -39,6 +43,21 @@ def create_bigquery_session(
bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True)
bqclient.project = "test-project"

if anonymous_dataset is None:
anonymous_dataset = google.cloud.bigquery.DatasetReference(
"test-project",
"test_dataset",
)

query_job = mock.create_autospec(google.cloud.bigquery.QueryJob)
type(query_job).destination = mock.PropertyMock(
return_value=anonymous_dataset.table("test_table"),
)
type(query_job).session_info = google.cloud.bigquery.SessionInfo(
{"sessionInfo": {"sessionId": session_id}},
)
bqclient.query.return_value = query_job

clients_provider = mock.create_autospec(bigframes.session.clients.ClientsProvider)
type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient)
clients_provider._credentials = credentials
Expand All @@ -51,6 +70,19 @@ def create_bigquery_session(
return session


def create_dataframe(
monkeypatch: pytest.MonkeyPatch, session: Optional[bigframes.Session] = None
) -> bigframes.dataframe.DataFrame:
if session is None:
session = create_bigquery_session()

# Since this may create a ReadLocalNode, the session we explicitly pass in
# might not actually be used. Mock out the global session, too.
monkeypatch.setattr(bigframes.core.global_session, "_global_session", session)
bigframes.options.bigquery._session_started = True
return bigframes.dataframe.DataFrame({}, session=session)


def create_pandas_session(tables: Dict[str, pandas.DataFrame]) -> bigframes.Session:
# TODO(tswast): Refactor to make helper available for all tests. Consider
# providing a proper "local Session" for use by downstream developers.
Expand Down
26 changes: 25 additions & 1 deletion 26 tests/unit/session/test_io_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import datetime
from typing import Iterable
import unittest.mock as mock

import google.cloud.bigquery as bigquery
import pytest
Expand All @@ -37,7 +38,7 @@ def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets():
assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql


def test_create_snapshot_sql_doesnt_timetravel_session_datasets():
def test_create_snapshot_sql_doesnt_timetravel_session_tables():
table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg")

sql = bigframes.session._io.bigquery.create_snapshot_sql(
Expand All @@ -51,6 +52,29 @@ def test_create_snapshot_sql_doesnt_timetravel_session_datasets():
assert "my-test-project" not in sql


def test_create_temp_table_default_expiration():
"""Make sure the created table has an expiration."""
bqclient = mock.create_autospec(bigquery.Client)
dataset = bigquery.DatasetReference("test-project", "test_dataset")
now = datetime.datetime.now(datetime.timezone.utc)
expiration = datetime.timedelta(days=3)
expected_expires = now + expiration

bigframes.session._io.bigquery.create_temp_table(bqclient, dataset, expiration)

bqclient.create_table.assert_called_once()
call_args = bqclient.create_table.call_args
table = call_args.args[0]
assert table.project == "test-project"
assert table.dataset_id == "test_dataset"
assert table.table_id.startswith("bqdf")
assert (
(expected_expires - datetime.timedelta(minutes=1))
< table.expires
< (expected_expires + datetime.timedelta(minutes=1))
)


@pytest.mark.parametrize(
("schema", "expected"),
(
Expand Down
59 changes: 59 additions & 0 deletions 59 tests/unit/test_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import google.cloud.bigquery
import pytest

from . import resources


def test_dataframe_to_gbq_invalid_destination(monkeypatch: pytest.MonkeyPatch):
dataframe = resources.create_dataframe(monkeypatch)

with pytest.raises(ValueError, match="no_dataset_or_project"):
dataframe.to_gbq("no_dataset_or_project")


def test_dataframe_to_gbq_invalid_if_exists(monkeypatch: pytest.MonkeyPatch):
dataframe = resources.create_dataframe(monkeypatch)

with pytest.raises(ValueError, match="notreallyanoption"):
# Even though the type is annotated with the literals we accept, users
# might not be using a type checker, especially not in an interactive
# notebook.
dataframe.to_gbq(if_exists="notreallyanoption") # type: ignore


def test_dataframe_to_gbq_invalid_if_exists_no_destination(
monkeypatch: pytest.MonkeyPatch,
):
dataframe = resources.create_dataframe(monkeypatch)

with pytest.raises(ValueError, match="append"):
dataframe.to_gbq(if_exists="append")


def test_dataframe_to_gbq_writes_to_anonymous_dataset(
monkeypatch: pytest.MonkeyPatch,
):
anonymous_dataset_id = "my-anonymous-project.my_anonymous_dataset"
anonymous_dataset = google.cloud.bigquery.DatasetReference.from_string(
anonymous_dataset_id
)
session = resources.create_bigquery_session(anonymous_dataset=anonymous_dataset)
dataframe = resources.create_dataframe(monkeypatch, session=session)

destination = dataframe.to_gbq()

assert destination.startswith(anonymous_dataset_id)
4 changes: 2 additions & 2 deletions 4 tests/unit/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_pandas_attribute():
assert bpd.ArrowDtype is pd.ArrowDtype


def test_close_session_after_bq_session_ended(monkeypatch):
def test_close_session_after_bq_session_ended(monkeypatch: pytest.MonkeyPatch):
bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True)
bqclient.project = "test-project"
session = resources.create_bigquery_session(
Expand All @@ -141,7 +141,7 @@ def test_close_session_after_bq_session_ended(monkeypatch):
google.api_core.exceptions.BadRequest,
match="Session JUST_A_TEST has expired and is no longer available.",
):
bpd.read_gbq("SELECT 1")
bpd.read_gbq("SELECT 'ABC'")

# Even though the query to stop the session raises an exception, we should
# still be able to close it without raising an error to the user.
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.