Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: Add a unique session_id to Session and allow cleaning up sessions #553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 59 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
91ec183
feat: Add a unique session_id to Session, allowing manual cleanup of …
milkshakeiii Mar 30, 2024
d617ef0
test not closing by default
milkshakeiii Mar 30, 2024
faa4c3f
do close in session test for test
milkshakeiii Mar 30, 2024
3e3a47c
skip both tests
milkshakeiii Mar 31, 2024
e0029b5
move tests to large and use page size to speed up
milkshakeiii Mar 31, 2024
f01416d
skip cleanup in auth failure test
milkshakeiii Mar 31, 2024
6794fe7
fix a doctest
milkshakeiii Mar 31, 2024
06de060
try to fix kokoro e2e
milkshakeiii Apr 1, 2024
b11ec2a
only skip one test
milkshakeiii Apr 1, 2024
d9cb6d8
skip expensive cleanup in tests
milkshakeiii Apr 1, 2024
3e34042
skip both tests
milkshakeiii Apr 1, 2024
1ddff94
Merge branch 'main' into b331971774-clean-up-tables
milkshakeiii Apr 1, 2024
d43398a
don't scan all tables usually
milkshakeiii Apr 1, 2024
d66fc57
update comment
milkshakeiii Apr 1, 2024
379622e
update comment
milkshakeiii Apr 2, 2024
9bfa435
Merge branch 'main' into b331971774-clean-up-tables
milkshakeiii Apr 2, 2024
a5e48af
update comment
milkshakeiii Apr 2, 2024
9cbbdb1
Update third_party/bigframes_vendored/pandas/core/frame.py
milkshakeiii Apr 2, 2024
96ec88a
Update bigframes/core/global_session.py
milkshakeiii Apr 2, 2024
8462363
Update bigframes/pandas/__init__.py
milkshakeiii Apr 2, 2024
db2c809
Update bigframes/session/__init__.py
milkshakeiii Apr 2, 2024
b672331
Update bigframes/pandas/__init__.py
milkshakeiii Apr 2, 2024
41b2965
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 2, 2024
cd1925b
Update bigframes/pandas/__init__.py
milkshakeiii Apr 2, 2024
5bb1fe6
Address some of the comments
milkshakeiii Apr 2, 2024
27ee64b
add max_results to list calls and unskip test
milkshakeiii Apr 2, 2024
90dcc31
check for special purpose datasets
milkshakeiii Apr 2, 2024
ceff5f3
warn when global session is closed with expired credentials
milkshakeiii Apr 2, 2024
367f78d
use yield fixture to clean up session
milkshakeiii Apr 3, 2024
5a9e687
improve comment
milkshakeiii Apr 3, 2024
ebc6522
move slow test to different region
milkshakeiii Apr 3, 2024
b517014
Merge branch 'main' into b331971774-clean-up-tables
tswast Apr 3, 2024
34862e2
Update bigframes/core/global_session.py
milkshakeiii Apr 3, 2024
6d0217d
Update bigframes/dataframe.py
milkshakeiii Apr 3, 2024
4dc057b
traceback
milkshakeiii Apr 3, 2024
e6a5e45
Merge branch 'main' into b331971774-clean-up-tables
milkshakeiii Apr 8, 2024
5cbb797
Update bigframes/pandas/__init__.py
milkshakeiii Apr 8, 2024
179f2c4
Merge branch 'main' into b331971774-clean-up-tables
milkshakeiii Apr 12, 2024
d222536
Merge branch 'main' into b331971774-clean-up-tables
milkshakeiii Apr 12, 2024
f9e2888
don't scan datasets
milkshakeiii Apr 16, 2024
7e8acd7
change default param behavior
milkshakeiii Apr 16, 2024
e422cd3
try 10000 max results
milkshakeiii Apr 16, 2024
0421509
Merge branch 'main' into b331971774-clean-up-tables
milkshakeiii Apr 17, 2024
1ce15ac
format fix
milkshakeiii Apr 17, 2024
d0aff59
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 17, 2024
10cada8
Merge branch 'b331971774-clean-up-tables' of https://github.com/googl…
gcf-owl-bot[bot] Apr 17, 2024
1551354
still use alternative region
milkshakeiii Apr 17, 2024
ed2bf70
Merge branch 'main' into b331971774-clean-up-tables
milkshakeiii Apr 23, 2024
529e015
Update bigframes/pandas/__init__.py
milkshakeiii Apr 23, 2024
6fda625
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 23, 2024
75c7cae
Merge branch 'main' into b331971774-clean-up-tables
milkshakeiii May 1, 2024
deebc64
address some of the comments
milkshakeiii May 1, 2024
66f014c
docstring fix
milkshakeiii May 1, 2024
c827465
add warning
milkshakeiii May 1, 2024
07fe6b3
don't use global session in test_clean_up_by_session_id
milkshakeiii May 1, 2024
76f73fa
Update bigframes/pandas/__init__.py
tswast May 2, 2024
3669d97
Merge branch 'main' into b331971774-clean-up-tables
tswast May 2, 2024
ac24209
avoid possible changing global session
milkshakeiii May 2, 2024
c1c11f6
Merge branch 'main' into b331971774-clean-up-tables
milkshakeiii May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions 20 bigframes/core/global_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
"""Utilities for managing a default, globally available Session object."""

import threading
import traceback
from typing import Callable, Optional, TypeVar
import warnings

import google.auth.exceptions

import bigframes._config
import bigframes.session
Expand All @@ -27,7 +31,8 @@
def close_session() -> None:
"""Start a fresh session the next time a function requires a session.

Closes the current session if it was already started.
Closes the current session if it was already started, deleting any
milkshakeiii marked this conversation as resolved.
Show resolved Hide resolved
temporary tables that were created.

Returns:
None
Expand All @@ -36,7 +41,18 @@ def close_session() -> None:

with _global_session_lock:
if _global_session is not None:
_global_session.close()
try:
_global_session.close()
except google.auth.exceptions.RefreshError as e:
session_id = _global_session.session_id
location = _global_session._location
project_id = _global_session._project
warnings.warn(
f"Session cleanup failed for session with id: {session_id}, "
f"location: {location}, project: {project_id}",
category=bigframes.exceptions.CleanupFailedWarning,
)
traceback.print_tb(e.__traceback__)
_global_session = None

bigframes._config.options.bigquery._session_started = False
Expand Down
6 changes: 4 additions & 2 deletions 6 bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2928,8 +2928,10 @@ def to_gbq(
)
if_exists = "replace"

temp_table_ref = bigframes.session._io.bigquery.random_table(
self._session._anonymous_dataset
temp_table_ref = self._session._random_table(
# The client code owns this table reference now, so skip_cleanup=True
# to not clean it up when we close the session.
skip_cleanup=True,
)
destination_table = f"{temp_table_ref.project}.{temp_table_ref.dataset_id}.{temp_table_ref.table_id}"

Expand Down
4 changes: 4 additions & 0 deletions 4 bigframes/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ class UnknownLocationWarning(Warning):
"""The location is set to an unknown value."""


class CleanupFailedWarning(Warning):
"""Bigframes failed to clean up a table resource."""


class NoDefaultIndexError(ValueError):
"""Unable to create a default index."""
62 changes: 62 additions & 0 deletions 62 bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,68 @@ def to_datetime(
to_datetime.__doc__ = vendored_pandas_datetimes.to_datetime.__doc__


def get_default_session_id() -> str:
"""Gets the session id that is used whenever a custom session
has not been provided.

It is the session id of the default global session. It is prefixed to
the table id of all temporary tables created in the global session.

Returns:
str, the default global session id, ex. 'sessiona1b2c'
"""
return get_global_session().session_id


def clean_up_by_session_id(
session_id: str,
location: Optional[str] = None,
project: Optional[str] = None,
) -> None:
"""Searches through table names in BigQuery and deletes tables
found matching the expected format.

This could be useful if the session object has been lost.
Calling `session.close()` or `bigframes.pandas.close_session()`
is preferred in most cases.

Args:
session_id (str):
The session id to clean up. Can be found using
session.session_id or get_default_session_id().

location (str, default None):
The location of the session to clean up. If given, used
together with project kwarg to determine the dataset
to search through for tables to clean up.

project (str, default None):
The project id associated with the session to clean up.
If given, used together with location kwarg to determine
the dataset to search through for tables to clean up.

Returns:
None
"""
session = get_global_session()
client = session.bqclient

if (location is None) != (project is None):
raise ValueError(
"Only one of project or location was given. Must specify both or neither."
)
elif location is None and project is None:
dataset = session._anonymous_dataset
else:
dataset = bigframes.session._io.bigquery.create_bq_dataset_reference(
client, location=location, project=project
)

bigframes.session._io.bigquery.delete_tables_matching_session_id(
client, dataset, session_id
)


# pandas dtype attributes
NA = pandas.NA
BooleanDtype = pandas.BooleanDtype
Expand Down
84 changes: 59 additions & 25 deletions 84 bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import os
import re
import secrets
import typing
from typing import (
Any,
Expand All @@ -37,6 +38,7 @@
Tuple,
Union,
)
import uuid
import warnings

# Even though the ibis.backends.bigquery import is unused, it's needed
Expand Down Expand Up @@ -100,6 +102,8 @@

_BIGFRAMES_DEFAULT_CONNECTION_ID = "bigframes-default-connection"

_TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}"

_MAX_CLUSTER_COLUMNS = 4

# TODO(swast): Need to connect to regional endpoints when performing remote
Expand Down Expand Up @@ -203,7 +207,11 @@ def __init__(
bq_kms_key_name=self._bq_kms_key_name,
)

self._create_bq_datasets()
self._anonymous_dataset = (
bigframes.session._io.bigquery.create_bq_dataset_reference(
self.bqclient, location=self._location
)
)

# TODO(shobs): Remove this logic after https://github.com/ibis-project/ibis/issues/8494
# has been fixed. The ibis client changes the default query job config
Expand Down Expand Up @@ -233,6 +241,13 @@ def __init__(
bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]
] = {}

# unique session identifier, short enough to be human readable
# only needs to be unique among sessions created by the same user
# at the same time in the same region
self._session_id: str = "session" + secrets.token_hex(3)
self._table_ids: List[str] = []
# store table ids and delete them when the session is closed

@property
def bqclient(self):
return self._clients_provider.bqclient
Expand Down Expand Up @@ -263,6 +278,10 @@ def bqconnectionmanager(self):
)
return self._bq_connection_manager

@property
def session_id(self):
return self._session_id

@property
def _project(self):
return self.bqclient.project
Expand All @@ -271,24 +290,15 @@ def __hash__(self):
# Stable hash needed to use in expression tree
return hash(str(self._anonymous_dataset))

def _create_bq_datasets(self):
"""Create and identify dataset(s) for temporary BQ resources."""
query_job = self.bqclient.query("SELECT 1", location=self._location)
query_job.result() # blocks until finished

# The anonymous dataset is used by BigQuery to write query results and
# session tables. BigQuery DataFrames also writes temp tables directly
# to the dataset, no BigQuery Session required. Note: there is a
# different anonymous dataset per location. See:
# https://cloud.google.com/bigquery/docs/cached-results#how_cached_results_are_stored
query_destination = query_job.destination
self._anonymous_dataset = bigquery.DatasetReference(
query_destination.project,
query_destination.dataset_id,
)

def close(self):
"""No-op. Temporary resources are deleted after 7 days."""
"""Delete tables that were created with this session's session_id."""
client = self.bqclient
project_id = self._anonymous_dataset.project
dataset_id = self._anonymous_dataset.dataset_id

for table_id in self._table_ids:
full_id = ".".join([project_id, dataset_id, table_id])
client.delete_table(full_id, not_found_ok=True)

def read_gbq(
self,
Expand Down Expand Up @@ -1063,7 +1073,7 @@ def _read_pandas_load_job(

job_config.labels = {"bigframes-api": api_name}

load_table_destination = bigframes_io.random_table(self._anonymous_dataset)
load_table_destination = self._random_table()
load_job = self.bqclient.load_table_from_dataframe(
pandas_dataframe_copy,
load_table_destination,
Expand Down Expand Up @@ -1145,7 +1155,7 @@ def read_csv(
encoding: Optional[str] = None,
**kwargs,
) -> dataframe.DataFrame:
table = bigframes_io.random_table(self._anonymous_dataset)
table = self._random_table()

if engine is not None and engine == "bigquery":
if any(param is not None for param in (dtype, names)):
Expand Down Expand Up @@ -1282,7 +1292,7 @@ def read_parquet(
*,
engine: str = "auto",
) -> dataframe.DataFrame:
table = bigframes_io.random_table(self._anonymous_dataset)
table = self._random_table()

if engine == "bigquery":
job_config = self._prepare_load_job_config()
Expand Down Expand Up @@ -1319,7 +1329,7 @@ def read_json(
engine: Literal["ujson", "pyarrow", "bigquery"] = "ujson",
**kwargs,
) -> dataframe.DataFrame:
table = bigframes_io.random_table(self._anonymous_dataset)
table = self._random_table()

if engine == "bigquery":

Expand Down Expand Up @@ -1416,14 +1426,12 @@ def _create_empty_temp_table(
) -> bigquery.TableReference:
# Can't set a table in _SESSION as destination via query job API, so we
# run DDL, instead.
dataset = self._anonymous_dataset
expiration = (
datetime.datetime.now(datetime.timezone.utc) + constants.DEFAULT_EXPIRATION
)

table = bigframes_io.create_temp_table(
self.bqclient,
dataset,
self,
expiration,
schema=schema,
cluster_columns=cluster_cols,
Expand Down Expand Up @@ -1939,6 +1947,32 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
else:
job.result()

def _random_table(self, skip_cleanup: bool = False) -> bigquery.TableReference:
"""Generate a random table ID with BigQuery DataFrames prefix.

The generated ID will be stored and checked for deletion when the
session is closed, unless skip_cleanup is True.

Args:
skip_cleanup (bool, default False):
If True, do not add the generated ID to the list of tables
to clean up when the session is closed.

Returns:
google.cloud.bigquery.TableReference:
Fully qualified table ID of a table that doesn't exist.
"""
dataset = self._anonymous_dataset
session_id = self.session_id
now = datetime.datetime.now(datetime.timezone.utc)
random_id = uuid.uuid4().hex
table_id = _TEMP_TABLE_ID_FORMAT.format(
date=now.strftime("%Y%m%d"), session_id=session_id, random_id=random_id
)
if not skip_cleanup:
self._table_ids.append(table_id)
return dataset.table(table_id)


def connect(context: Optional[bigquery_options.BigQueryOptions] = None) -> Session:
return Session(context)
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.