-
Notifications
You must be signed in to change notification settings - Fork 50
fix: add df snapshots lookup for read_gbq
#229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e4efcf7
2d5a85b
862a2af
29b511e
be8837e
915d7d3
c55686a
2db606f
995eb46
6a07ed0
39e1624
68da750
76df8fb
79f090f
fe61d97
df79818
59b65e5
6ea975b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -177,6 +177,7 @@ def __init__( | |
# Now that we're starting the session, don't allow the options to be | ||
# changed. | ||
context._session_started = True | ||
self._df_snapshot: Dict[bigquery.TableReference, datetime.datetime] = {} | ||
|
||
@property | ||
def bqclient(self): | ||
|
@@ -232,6 +233,7 @@ def read_gbq( | |
index_col: Iterable[str] | str = (), | ||
col_order: Iterable[str] = (), | ||
max_results: Optional[int] = None, | ||
use_cache: bool = True, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we pass this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unresolving. You didn't populate the |
||
# Add a verify index argument that fails if the index is not unique. | ||
) -> dataframe.DataFrame: | ||
# TODO(b/281571214): Generate prompt to show the progress of read_gbq. | ||
|
@@ -242,6 +244,7 @@ def read_gbq( | |
col_order=col_order, | ||
max_results=max_results, | ||
api_name="read_gbq", | ||
use_cache=use_cache, | ||
) | ||
else: | ||
# TODO(swast): Query the snapshot table but mark it as a | ||
|
@@ -253,13 +256,15 @@ def read_gbq( | |
col_order=col_order, | ||
max_results=max_results, | ||
api_name="read_gbq", | ||
use_cache=use_cache, | ||
) | ||
|
||
def _query_to_destination( | ||
self, | ||
query: str, | ||
index_cols: List[str], | ||
api_name: str, | ||
use_cache: bool = True, | ||
) -> Tuple[Optional[bigquery.TableReference], Optional[bigquery.QueryJob]]: | ||
# If a dry_run indicates this is not a query type job, then don't | ||
# bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. | ||
|
@@ -284,6 +289,7 @@ def _query_to_destination( | |
job_config = bigquery.QueryJobConfig() | ||
job_config.labels["bigframes-api"] = api_name | ||
job_config.destination = temp_table | ||
job_config.use_query_cache = use_cache | ||
|
||
try: | ||
# Write to temp table to workaround BigQuery 10 GB query results | ||
|
@@ -305,6 +311,7 @@ def read_gbq_query( | |
index_col: Iterable[str] | str = (), | ||
col_order: Iterable[str] = (), | ||
max_results: Optional[int] = None, | ||
use_cache: bool = True, | ||
) -> dataframe.DataFrame: | ||
"""Turn a SQL query into a DataFrame. | ||
|
||
|
@@ -362,6 +369,7 @@ def read_gbq_query( | |
col_order=col_order, | ||
max_results=max_results, | ||
api_name="read_gbq_query", | ||
use_cache=use_cache, | ||
) | ||
|
||
def _read_gbq_query( | ||
|
@@ -372,14 +380,18 @@ def _read_gbq_query( | |
col_order: Iterable[str] = (), | ||
max_results: Optional[int] = None, | ||
api_name: str = "read_gbq_query", | ||
use_cache: bool = True, | ||
) -> dataframe.DataFrame: | ||
if isinstance(index_col, str): | ||
index_cols = [index_col] | ||
else: | ||
index_cols = list(index_col) | ||
|
||
destination, query_job = self._query_to_destination( | ||
query, index_cols, api_name=api_name | ||
query, | ||
index_cols, | ||
api_name=api_name, | ||
use_cache=use_cache, | ||
) | ||
|
||
# If there was no destination table, that means the query must have | ||
|
@@ -403,6 +415,7 @@ def _read_gbq_query( | |
index_col=index_cols, | ||
col_order=col_order, | ||
max_results=max_results, | ||
use_cache=use_cache, | ||
) | ||
|
||
def read_gbq_table( | ||
|
@@ -412,6 +425,7 @@ def read_gbq_table( | |
index_col: Iterable[str] | str = (), | ||
col_order: Iterable[str] = (), | ||
max_results: Optional[int] = None, | ||
use_cache: bool = True, | ||
) -> dataframe.DataFrame: | ||
"""Turn a BigQuery table into a DataFrame. | ||
|
||
|
@@ -434,33 +448,22 @@ def read_gbq_table( | |
col_order=col_order, | ||
max_results=max_results, | ||
api_name="read_gbq_table", | ||
use_cache=use_cache, | ||
) | ||
|
||
def _get_snapshot_sql_and_primary_key( | ||
self, | ||
table_ref: bigquery.table.TableReference, | ||
*, | ||
api_name: str, | ||
use_cache: bool = True, | ||
) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]: | ||
"""Create a read-only Ibis table expression representing a table. | ||
|
||
If we can get a total ordering from the table, such as via primary key | ||
column(s), then return those too so that ordering generation can be | ||
avoided. | ||
""" | ||
if table_ref.dataset_id.upper() == "_SESSION": | ||
# _SESSION tables aren't supported by the tables.get REST API. | ||
return ( | ||
self.ibis_client.sql( | ||
f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" | ||
), | ||
None, | ||
) | ||
table_expression = self.ibis_client.table( | ||
table_ref.table_id, | ||
database=f"{table_ref.project}.{table_ref.dataset_id}", | ||
) | ||
|
||
# If there are primary keys defined, the query engine assumes these | ||
# columns are unique, even if the constraint is not enforced. We make | ||
# the same assumption and use these columns as the total ordering keys. | ||
|
@@ -481,14 +484,18 @@ def _get_snapshot_sql_and_primary_key( | |
|
||
job_config = bigquery.QueryJobConfig() | ||
job_config.labels["bigframes-api"] = api_name | ||
current_timestamp = list( | ||
self.bqclient.query( | ||
"SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", | ||
job_config=job_config, | ||
).result() | ||
)[0][0] | ||
if use_cache and table_ref in self._df_snapshot.keys(): | ||
snapshot_timestamp = self._df_snapshot[table_ref] | ||
else: | ||
snapshot_timestamp = list( | ||
self.bqclient.query( | ||
"SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", | ||
job_config=job_config, | ||
).result() | ||
)[0][0] | ||
ashleyxuu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._df_snapshot[table_ref] = snapshot_timestamp | ||
table_expression = self.ibis_client.sql( | ||
bigframes_io.create_snapshot_sql(table_ref, current_timestamp) | ||
bigframes_io.create_snapshot_sql(table_ref, snapshot_timestamp) | ||
) | ||
return table_expression, primary_keys | ||
|
||
|
@@ -500,20 +507,21 @@ def _read_gbq_table( | |
col_order: Iterable[str] = (), | ||
max_results: Optional[int] = None, | ||
api_name: str, | ||
use_cache: bool = True, | ||
) -> dataframe.DataFrame: | ||
if max_results and max_results <= 0: | ||
raise ValueError("`max_results` should be a positive number.") | ||
|
||
# TODO(swast): Can we re-use the temp table from other reads in the | ||
# session, if the original table wasn't modified? | ||
table_ref = bigquery.table.TableReference.from_string( | ||
query, default_project=self.bqclient.project | ||
) | ||
|
||
( | ||
table_expression, | ||
total_ordering_cols, | ||
) = self._get_snapshot_sql_and_primary_key(table_ref, api_name=api_name) | ||
) = self._get_snapshot_sql_and_primary_key( | ||
table_ref, api_name=api_name, use_cache=use_cache | ||
) | ||
|
||
for key in col_order: | ||
if key not in table_expression.columns: | ||
|
Uh oh!
There was an error while loading. Please reload this page.