Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

fix: read_gbq_table respects primary keys even when filters are set #689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 16, 2024
3 changes: 2 additions & 1 deletion 3 .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ repos:
hooks:
- id: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.1.1
rev: v1.10.0
hooks:
- id: mypy
additional_dependencies: [types-requests, types-tabulate, pandas-stubs]
args: ["--check-untyped-defs", "--explicit-package-bases", '--exclude="^third_party"', "--ignore-missing-imports"]
4 changes: 2 additions & 2 deletions 4 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ def __init__(

# If no index columns are set, create one.
#
# Note: get_index_cols_and_uniqueness in
# Note: get_index_cols in
# bigframes/session/_io/bigquery/read_gbq_table.py depends on this
# being as sequential integer index column. If this default behavior
# ever changes, please also update get_index_cols_and_uniqueness so
# ever changes, please also update get_index_cols so
# that users who explicitly request a sequential integer index can
# still get one.
if len(index_columns) == 0:
Expand Down
8 changes: 3 additions & 5 deletions 8 bigframes/core/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@


### Writing SQL Values (literals, column references, table references, etc.)
def simple_literal(value: str | int | bool | float):
def simple_literal(value: str | int | bool | float | datetime.datetime):
"""Return quoted input string."""
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#literals
if isinstance(value, str):
Expand All @@ -50,6 +50,8 @@ def simple_literal(value: str | int | bool | float):
if value == -math.inf:
return 'CAST("-inf" as FLOAT)'
return str(value)
if isinstance(value, datetime.datetime):
return f"TIMESTAMP('{value.isoformat()}')"
else:
raise ValueError(f"Cannot produce literal for {value}")

Expand Down Expand Up @@ -156,7 +158,3 @@ def ordering_clause(
part = f"`{ordering_expr.id}` {asc_desc} {null_clause}"
parts.append(part)
return f"ORDER BY {' ,'.join(parts)}"


def snapshot_clause(time_travel_timestamp: datetime.datetime):
return f"FOR SYSTEM_TIME AS OF TIMESTAMP({repr(time_travel_timestamp.isoformat())})"
2 changes: 2 additions & 0 deletions 2 bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ def read_gbq_query(
max_results: Optional[int] = None,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
filters: vendored_pandas_gbq.FiltersType = (),
) -> bigframes.dataframe.DataFrame:
_set_default_session_location_if_possible(query)
return global_session.with_default_session(
Expand All @@ -560,6 +561,7 @@ def read_gbq_query(
max_results=max_results,
use_cache=use_cache,
col_order=col_order,
filters=filters,
)


Expand Down
177 changes: 111 additions & 66 deletions 177 bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import ibis
import ibis.backends.bigquery as ibis_bigquery
import ibis.expr.types as ibis_types
import jellyfish
import numpy as np
import pandas
from pandas._typing import (
Expand Down Expand Up @@ -339,19 +340,6 @@ def read_gbq(
elif col_order:
columns = col_order

filters = list(filters)
if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix(
query_or_table
):
# TODO(b/338111344): This appears to be missing index_cols, which
# are necessary to be selected.
# TODO(b/338039517): Refactor this to be called inside both
# _read_gbq_query and _read_gbq_table (after detecting primary keys)
# so we can make sure index_col/index_cols reflects primary keys.
query_or_table = bf_io_bigquery.to_query(
query_or_table, _to_index_cols(index_col), columns, filters
)

if bf_io_bigquery.is_query(query_or_table):
return self._read_gbq_query(
query_or_table,
Expand All @@ -361,6 +349,7 @@ def read_gbq(
max_results=max_results,
api_name="read_gbq",
use_cache=use_cache,
filters=filters,
)
else:
if configuration is not None:
Expand All @@ -377,6 +366,7 @@ def read_gbq(
max_results=max_results,
api_name="read_gbq",
use_cache=use_cache if use_cache is not None else True,
filters=filters,
)

def _query_to_destination(
Expand Down Expand Up @@ -451,6 +441,7 @@ def read_gbq_query(
max_results: Optional[int] = None,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
filters: third_party_pandas_gbq.FiltersType = (),
) -> dataframe.DataFrame:
"""Turn a SQL query into a DataFrame.

Expand Down Expand Up @@ -517,6 +508,7 @@ def read_gbq_query(
max_results=max_results,
api_name="read_gbq_query",
use_cache=use_cache,
filters=filters,
)

def _read_gbq_query(
Expand All @@ -529,6 +521,7 @@ def _read_gbq_query(
max_results: Optional[int] = None,
api_name: str = "read_gbq_query",
use_cache: Optional[bool] = None,
filters: third_party_pandas_gbq.FiltersType = (),
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

Expand Down Expand Up @@ -557,6 +550,21 @@ def _read_gbq_query(

index_cols = _to_index_cols(index_col)

filters = list(filters)
if len(filters) != 0 or max_results is not None:
# TODO(b/338111344): If we are running a query anyway, we might as
# well generate ROW_NUMBER() at the same time.
query = bf_io_bigquery.to_query(
query,
index_cols,
columns,
filters,
max_results=max_results,
# We're executing the query, so we don't need time travel for
# determinism.
time_travel_timestamp=None,
)

destination, query_job = self._query_to_destination(
query,
index_cols,
Expand All @@ -580,12 +588,14 @@ def _read_gbq_query(
session=self,
)

return self.read_gbq_table(
return self._read_gbq_table(
f"{destination.project}.{destination.dataset_id}.{destination.table_id}",
index_col=index_col,
columns=columns,
max_results=max_results,
use_cache=configuration["query"]["useQueryCache"],
api_name=api_name,
# max_results and filters are omitted because they are already
# handled by to_query(), above.
)

def read_gbq_table(
Expand Down Expand Up @@ -621,31 +631,14 @@ def read_gbq_table(
elif col_order:
columns = col_order

filters = list(filters)
if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix(query):
# TODO(b/338039517): Refactor this to be called inside both
# _read_gbq_query and _read_gbq_table (after detecting primary keys)
# so we can make sure index_col/index_cols reflects primary keys.
query = bf_io_bigquery.to_query(
query, _to_index_cols(index_col), columns, filters
)

return self._read_gbq_query(
query,
index_col=index_col,
columns=columns,
max_results=max_results,
api_name="read_gbq_table",
use_cache=use_cache,
)

return self._read_gbq_table(
query=query,
index_col=index_col,
columns=columns,
max_results=max_results,
api_name="read_gbq_table",
use_cache=use_cache,
filters=filters,
)

def _read_gbq_table(
Expand All @@ -657,6 +650,7 @@ def _read_gbq_table(
max_results: Optional[int] = None,
api_name: str,
use_cache: bool = True,
filters: third_party_pandas_gbq.FiltersType = (),
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

Expand All @@ -673,6 +667,9 @@ def _read_gbq_table(
query, default_project=self.bqclient.project
)

columns = list(columns)
filters = list(filters)

# ---------------------------------
# Fetch table metadata and validate
# ---------------------------------
Expand All @@ -684,62 +681,110 @@ def _read_gbq_table(
cache=self._df_snapshot,
use_cache=use_cache,
)
table_column_names = {field.name for field in table.schema}

if table.location.casefold() != self._location.casefold():
raise ValueError(
f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}"
)

# -----------------------------------------
# Create Ibis table expression and validate
# -----------------------------------------

# Use a time travel to make sure the DataFrame is deterministic, even
# if the underlying table changes.
table_expression = bf_read_gbq_table.get_ibis_time_travel_table(
self.ibis_client,
table_ref,
time_travel_timestamp,
)

for key in columns:
if key not in table_expression.columns:
if key not in table_column_names:
possibility = min(
table_column_names,
key=lambda item: jellyfish.levenshtein_distance(key, item),
)
raise ValueError(
f"Column '{key}' of `columns` not found in this table."
f"Column '{key}' of `columns` not found in this table. Did you mean '{possibility}'?"
)

# ---------------------------------------
# Create a non-default index and validate
# ---------------------------------------

# TODO(b/337925142): Move index_cols creation to before we create the
# Ibis table expression so we don't have a "SELECT *" subquery in the
# query that checks for index uniqueness.

index_cols, is_index_unique = bf_read_gbq_table.get_index_cols_and_uniqueness(
bqclient=self.bqclient,
ibis_client=self.ibis_client,
# Converting index_col into a list of column names requires
# the table metadata because we might use the primary keys
# when constructing the index.
index_cols = bf_read_gbq_table.get_index_cols(
table=table,
table_expression=table_expression,
index_col=index_col,
api_name=api_name,
)

for key in index_cols:
if key not in table_expression.columns:
if key not in table_column_names:
possibility = min(
table_column_names,
key=lambda item: jellyfish.levenshtein_distance(key, item),
)
raise ValueError(
f"Column `{key}` of `index_col` not found in this table."
f"Column '{key}' of `index_col` not found in this table. Did you mean '{possibility}'?"
)

# TODO(b/337925142): We should push down column filters when we get the time
# travel table to avoid "SELECT *" subqueries.
if columns:
table_expression = table_expression.select([*index_cols, *columns])
# -----------------------------
# Optionally, execute the query
# -----------------------------

# max_results introduces non-determinism and limits the cost on
# clustered tables, so fallback to a query. We do this here so that
# the index is consistent with tables that have primary keys, even
# when max_results is set.
# TODO(b/338419730): We don't need to fallback to a query for wildcard
# tables if we allow some non-determinism when time travel isn't supported.
if max_results is not None or bf_io_bigquery.is_table_with_wildcard_suffix(
query
):
# TODO(b/338111344): If we are running a query anyway, we might as
# well generate ROW_NUMBER() at the same time.
query = bf_io_bigquery.to_query(
query,
index_cols=index_cols,
columns=columns,
filters=filters,
max_results=max_results,
# We're executing the query, so we don't need time travel for
# determinism.
time_travel_timestamp=None,
)

return self._read_gbq_query(
query,
index_col=index_cols,
columns=columns,
api_name="read_gbq_table",
use_cache=use_cache,
)

# -----------------------------------------
# Create Ibis table expression and validate
# -----------------------------------------

# Use a time travel to make sure the DataFrame is deterministic, even
# if the underlying table changes.
# TODO(b/340540991): If a dry run query fails with time travel but
# succeeds without it, omit the time travel clause and raise a warning
# about potential non-determinism if the underlying tables are modified.
table_expression = bf_read_gbq_table.get_ibis_time_travel_table(
ibis_client=self.ibis_client,
table_ref=table_ref,
index_cols=index_cols,
columns=columns,
filters=filters,
time_travel_timestamp=time_travel_timestamp,
)

# ----------------------------
# Create ordering and validate
# ----------------------------

# TODO(b/337925142): Generate a new subquery with just the index_cols
# in the Ibis table expression so we don't have a "SELECT *" subquery
# in the query that checks for index uniqueness.
# TODO(b/338065601): Provide a way to assume uniqueness and avoid this
# check.
is_index_unique = bf_read_gbq_table.are_index_cols_unique(
bqclient=self.bqclient,
ibis_client=self.ibis_client,
table=table,
index_cols=index_cols,
api_name=api_name,
)

if is_index_unique:
array_value = bf_read_gbq_table.to_array_value_with_total_ordering(
session=self,
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.