diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index af05f4423c..4fd6488c9c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -35,7 +35,8 @@ repos: hooks: - id: flake8 - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.1.1 + rev: v1.10.0 hooks: - id: mypy additional_dependencies: [types-requests, types-tabulate, pandas-stubs] + args: ["--check-untyped-defs", "--explicit-package-bases", '--exclude="^third_party"', "--ignore-missing-imports"] diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 6cd93eec12..6cdb0021f5 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -122,10 +122,10 @@ def __init__( # If no index columns are set, create one. # - # Note: get_index_cols_and_uniqueness in + # Note: get_index_cols in # bigframes/session/_io/bigquery/read_gbq_table.py depends on this # being as sequential integer index column. If this default behavior - # ever changes, please also update get_index_cols_and_uniqueness so + # ever changes, please also update get_index_cols so # that users who explicitly request a sequential integer index can # still get one. if len(index_columns) == 0: diff --git a/bigframes/core/sql.py b/bigframes/core/sql.py index 3ad06610b6..cacf86d234 100644 --- a/bigframes/core/sql.py +++ b/bigframes/core/sql.py @@ -33,7 +33,7 @@ ### Writing SQL Values (literals, column references, table references, etc.) -def simple_literal(value: str | int | bool | float): +def simple_literal(value: str | int | bool | float | datetime.datetime): """Return quoted input string.""" # https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#literals if isinstance(value, str): @@ -50,6 +50,8 @@ def simple_literal(value: str | int | bool | float): if value == -math.inf: return 'CAST("-inf" as FLOAT)' return str(value) + if isinstance(value, datetime.datetime): + return f"TIMESTAMP('{value.isoformat()}')" else: raise ValueError(f"Cannot produce literal for {value}") @@ -156,7 +158,3 @@ def ordering_clause( part = f"`{ordering_expr.id}` {asc_desc} {null_clause}" parts.append(part) return f"ORDER BY {' ,'.join(parts)}" - - -def snapshot_clause(time_travel_timestamp: datetime.datetime): - return f"FOR SYSTEM_TIME AS OF TIMESTAMP({repr(time_travel_timestamp.isoformat())})" diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index c18d14b3dd..3628ecf67b 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -549,6 +549,7 @@ def read_gbq_query( max_results: Optional[int] = None, use_cache: Optional[bool] = None, col_order: Iterable[str] = (), + filters: vendored_pandas_gbq.FiltersType = (), ) -> bigframes.dataframe.DataFrame: _set_default_session_location_if_possible(query) return global_session.with_default_session( @@ -560,6 +561,7 @@ def read_gbq_query( max_results=max_results, use_cache=use_cache, col_order=col_order, + filters=filters, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 727269e7ee..89637644cf 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -62,6 +62,7 @@ import ibis import ibis.backends.bigquery as ibis_bigquery import ibis.expr.types as ibis_types +import jellyfish import numpy as np import pandas from pandas._typing import ( @@ -339,19 +340,6 @@ def read_gbq( elif col_order: columns = col_order - filters = list(filters) - if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix( - query_or_table - ): - # TODO(b/338111344): This appears to be missing index_cols, which - # are necessary to be selected. - # TODO(b/338039517): Refactor this to be called inside both - # _read_gbq_query and _read_gbq_table (after detecting primary keys) - # so we can make sure index_col/index_cols reflects primary keys. - query_or_table = bf_io_bigquery.to_query( - query_or_table, _to_index_cols(index_col), columns, filters - ) - if bf_io_bigquery.is_query(query_or_table): return self._read_gbq_query( query_or_table, @@ -361,6 +349,7 @@ def read_gbq( max_results=max_results, api_name="read_gbq", use_cache=use_cache, + filters=filters, ) else: if configuration is not None: @@ -377,6 +366,7 @@ def read_gbq( max_results=max_results, api_name="read_gbq", use_cache=use_cache if use_cache is not None else True, + filters=filters, ) def _query_to_destination( @@ -451,6 +441,7 @@ def read_gbq_query( max_results: Optional[int] = None, use_cache: Optional[bool] = None, col_order: Iterable[str] = (), + filters: third_party_pandas_gbq.FiltersType = (), ) -> dataframe.DataFrame: """Turn a SQL query into a DataFrame. @@ -517,6 +508,7 @@ def read_gbq_query( max_results=max_results, api_name="read_gbq_query", use_cache=use_cache, + filters=filters, ) def _read_gbq_query( @@ -529,6 +521,7 @@ def _read_gbq_query( max_results: Optional[int] = None, api_name: str = "read_gbq_query", use_cache: Optional[bool] = None, + filters: third_party_pandas_gbq.FiltersType = (), ) -> dataframe.DataFrame: import bigframes.dataframe as dataframe @@ -557,6 +550,21 @@ def _read_gbq_query( index_cols = _to_index_cols(index_col) + filters = list(filters) + if len(filters) != 0 or max_results is not None: + # TODO(b/338111344): If we are running a query anyway, we might as + # well generate ROW_NUMBER() at the same time. + query = bf_io_bigquery.to_query( + query, + index_cols, + columns, + filters, + max_results=max_results, + # We're executing the query, so we don't need time travel for + # determinism. + time_travel_timestamp=None, + ) + destination, query_job = self._query_to_destination( query, index_cols, @@ -580,12 +588,14 @@ def _read_gbq_query( session=self, ) - return self.read_gbq_table( + return self._read_gbq_table( f"{destination.project}.{destination.dataset_id}.{destination.table_id}", index_col=index_col, columns=columns, - max_results=max_results, use_cache=configuration["query"]["useQueryCache"], + api_name=api_name, + # max_results and filters are omitted because they are already + # handled by to_query(), above. ) def read_gbq_table( @@ -621,24 +631,6 @@ def read_gbq_table( elif col_order: columns = col_order - filters = list(filters) - if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix(query): - # TODO(b/338039517): Refactor this to be called inside both - # _read_gbq_query and _read_gbq_table (after detecting primary keys) - # so we can make sure index_col/index_cols reflects primary keys. - query = bf_io_bigquery.to_query( - query, _to_index_cols(index_col), columns, filters - ) - - return self._read_gbq_query( - query, - index_col=index_col, - columns=columns, - max_results=max_results, - api_name="read_gbq_table", - use_cache=use_cache, - ) - return self._read_gbq_table( query=query, index_col=index_col, @@ -646,6 +638,7 @@ def read_gbq_table( max_results=max_results, api_name="read_gbq_table", use_cache=use_cache, + filters=filters, ) def _read_gbq_table( @@ -657,6 +650,7 @@ def _read_gbq_table( max_results: Optional[int] = None, api_name: str, use_cache: bool = True, + filters: third_party_pandas_gbq.FiltersType = (), ) -> dataframe.DataFrame: import bigframes.dataframe as dataframe @@ -673,6 +667,9 @@ def _read_gbq_table( query, default_project=self.bqclient.project ) + columns = list(columns) + filters = list(filters) + # --------------------------------- # Fetch table metadata and validate # --------------------------------- @@ -684,62 +681,110 @@ def _read_gbq_table( cache=self._df_snapshot, use_cache=use_cache, ) + table_column_names = {field.name for field in table.schema} if table.location.casefold() != self._location.casefold(): raise ValueError( f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" ) - # ----------------------------------------- - # Create Ibis table expression and validate - # ----------------------------------------- - - # Use a time travel to make sure the DataFrame is deterministic, even - # if the underlying table changes. - table_expression = bf_read_gbq_table.get_ibis_time_travel_table( - self.ibis_client, - table_ref, - time_travel_timestamp, - ) - for key in columns: - if key not in table_expression.columns: + if key not in table_column_names: + possibility = min( + table_column_names, + key=lambda item: jellyfish.levenshtein_distance(key, item), + ) raise ValueError( - f"Column '{key}' of `columns` not found in this table." + f"Column '{key}' of `columns` not found in this table. Did you mean '{possibility}'?" ) - # --------------------------------------- - # Create a non-default index and validate - # --------------------------------------- - - # TODO(b/337925142): Move index_cols creation to before we create the - # Ibis table expression so we don't have a "SELECT *" subquery in the - # query that checks for index uniqueness. - - index_cols, is_index_unique = bf_read_gbq_table.get_index_cols_and_uniqueness( - bqclient=self.bqclient, - ibis_client=self.ibis_client, + # Converting index_col into a list of column names requires + # the table metadata because we might use the primary keys + # when constructing the index. + index_cols = bf_read_gbq_table.get_index_cols( table=table, - table_expression=table_expression, index_col=index_col, - api_name=api_name, ) for key in index_cols: - if key not in table_expression.columns: + if key not in table_column_names: + possibility = min( + table_column_names, + key=lambda item: jellyfish.levenshtein_distance(key, item), + ) raise ValueError( - f"Column `{key}` of `index_col` not found in this table." + f"Column '{key}' of `index_col` not found in this table. Did you mean '{possibility}'?" ) - # TODO(b/337925142): We should push down column filters when we get the time - # travel table to avoid "SELECT *" subqueries. - if columns: - table_expression = table_expression.select([*index_cols, *columns]) + # ----------------------------- + # Optionally, execute the query + # ----------------------------- + + # max_results introduces non-determinism and limits the cost on + # clustered tables, so fallback to a query. We do this here so that + # the index is consistent with tables that have primary keys, even + # when max_results is set. + # TODO(b/338419730): We don't need to fallback to a query for wildcard + # tables if we allow some non-determinism when time travel isn't supported. + if max_results is not None or bf_io_bigquery.is_table_with_wildcard_suffix( + query + ): + # TODO(b/338111344): If we are running a query anyway, we might as + # well generate ROW_NUMBER() at the same time. + query = bf_io_bigquery.to_query( + query, + index_cols=index_cols, + columns=columns, + filters=filters, + max_results=max_results, + # We're executing the query, so we don't need time travel for + # determinism. + time_travel_timestamp=None, + ) + + return self._read_gbq_query( + query, + index_col=index_cols, + columns=columns, + api_name="read_gbq_table", + use_cache=use_cache, + ) + + # ----------------------------------------- + # Create Ibis table expression and validate + # ----------------------------------------- + + # Use a time travel to make sure the DataFrame is deterministic, even + # if the underlying table changes. + # TODO(b/340540991): If a dry run query fails with time travel but + # succeeds without it, omit the time travel clause and raise a warning + # about potential non-determinism if the underlying tables are modified. + table_expression = bf_read_gbq_table.get_ibis_time_travel_table( + ibis_client=self.ibis_client, + table_ref=table_ref, + index_cols=index_cols, + columns=columns, + filters=filters, + time_travel_timestamp=time_travel_timestamp, + ) # ---------------------------- # Create ordering and validate # ---------------------------- + # TODO(b/337925142): Generate a new subquery with just the index_cols + # in the Ibis table expression so we don't have a "SELECT *" subquery + # in the query that checks for index uniqueness. + # TODO(b/338065601): Provide a way to assume uniqueness and avoid this + # check. + is_index_unique = bf_read_gbq_table.are_index_cols_unique( + bqclient=self.bqclient, + ibis_client=self.ibis_client, + table=table, + index_cols=index_cols, + api_name=api_name, + ) + if is_index_unique: array_value = bf_read_gbq_table.to_array_value_with_total_ordering( session=self, diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 95ab16fecf..28eed47965 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -337,6 +337,8 @@ def to_query( index_cols: Iterable[str], columns: Iterable[str], filters: third_party_pandas_gbq.FiltersType, + max_results: Optional[int], + time_travel_timestamp: Optional[datetime.datetime], ) -> str: """Compile query_or_table with conditions(filters, wildcards) to query.""" filters = list(filters) @@ -354,6 +356,15 @@ def to_query( else: select_clause = "SELECT *" + time_travel_clause = "" + if time_travel_timestamp is not None: + time_travel_literal = bigframes.core.sql.simple_literal(time_travel_timestamp) + time_travel_clause = f" FOR SYSTEM_TIME AS OF {time_travel_literal}" + + limit_clause = "" + if max_results is not None: + limit_clause = f" LIMIT {bigframes.core.sql.simple_literal(max_results)}" + filter_string = "" if filters: valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = { @@ -382,7 +393,7 @@ def to_query( for filter_item in group: if not isinstance(filter_item, tuple) or (len(filter_item) != 3): raise ValueError( - f"Filter condition should be a tuple of length 3, {filter_item} is not valid." + f"Elements of filters must be tuples of length 3, but got {repr(filter_item)}.", ) column, operator, value = filter_item @@ -419,7 +430,12 @@ def to_query( else: filter_string = and_expression + where_clause = "" if filter_string: - return f"{select_clause} FROM {sub_query} AS sub WHERE {filter_string}" - else: - return f"{select_clause} FROM {sub_query} AS sub" + where_clause = f" WHERE {filter_string}" + + return ( + f"{select_clause} " + f"FROM {sub_query}" + f"{time_travel_clause}{where_clause}{limit_clause}" + ) diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 370ee546d7..87083529ce 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -20,12 +20,12 @@ import datetime import itertools -import textwrap import typing from typing import Dict, Iterable, List, Optional, Tuple import warnings import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops +import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq import google.api_core.exceptions import google.cloud.bigquery as bigquery import ibis @@ -42,7 +42,7 @@ import bigframes.core.ordering as order import bigframes.core.sql import bigframes.dtypes -import bigframes.session._io.bigquery.read_gbq_table +import bigframes.session._io.bigquery import bigframes.session.clients import bigframes.version @@ -126,32 +126,33 @@ def get_table_metadata( return cached_table -def _create_time_travel_sql( - table_ref: bigquery.TableReference, time_travel_timestamp: datetime.datetime -) -> str: - """Query a table via 'time travel' for consistent reads.""" - # If we have an anonymous query results table, it can't be modified and - # there isn't any BigQuery time travel. - selection = bigframes.core.sql.select_table(table_ref) - if table_ref.dataset_id.startswith("_"): - return selection - - return textwrap.dedent( - f""" - {selection} - {bigframes.core.sql.snapshot_clause(time_travel_timestamp)} - """ - ) - - def get_ibis_time_travel_table( ibis_client: ibis.BaseBackend, table_ref: bigquery.TableReference, - time_travel_timestamp: datetime.datetime, + index_cols: Iterable[str], + columns: Iterable[str], + filters: third_party_pandas_gbq.FiltersType, + time_travel_timestamp: Optional[datetime.datetime], ) -> ibis_types.Table: + # If we have an anonymous query results table, it can't be modified and + # there isn't any BigQuery time travel. + if table_ref.dataset_id.startswith("_"): + time_travel_timestamp = None + try: - sql = _create_time_travel_sql(table_ref, time_travel_timestamp) - return ibis_client.sql(sql) + return ibis_client.sql( + bigframes.session._io.bigquery.to_query( + f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}", + index_cols=index_cols, + columns=columns, + filters=filters, + time_travel_timestamp=time_travel_timestamp, + # If we've made it this far, we know we don't have any + # max_results to worry about, because in that case we will + # have executed a query with a LIMI clause. + max_results=None, + ) + ) except google.api_core.exceptions.Forbidden as ex: # Ibis does a dry run to get the types of the columns from the SQL. if "Drive credentials" in ex.message: @@ -159,13 +160,21 @@ def get_ibis_time_travel_table( raise -def _check_index_uniqueness( +def are_index_cols_unique( bqclient: bigquery.Client, ibis_client: ibis.BaseBackend, - table: ibis_types.Table, + table: bigquery.table.Table, index_cols: List[str], api_name: str, ) -> bool: + # If index_cols contain the primary_keys, the query engine assumes they are + # provide a unique index. + primary_keys = frozenset(_get_primary_keys(table)) + if primary_keys <= frozenset(index_cols): + return True + + # TODO(b/337925142): Avoid a "SELECT *" subquery here by ensuring + # table_expression only selects just index_cols. table_sql = ibis_client.compile(table) is_unique_sql = bigframes.core.sql.is_distinct_sql(index_cols, table_sql) job_config = bigquery.QueryJobConfig() @@ -217,14 +226,10 @@ def _is_table_clustered_or_partitioned( return False -def get_index_cols_and_uniqueness( - bqclient: bigquery.Client, - ibis_client: ibis.BaseBackend, +def get_index_cols( table: bigquery.table.Table, - table_expression: ibis_types.Table, index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind, - api_name: str, -) -> Tuple[List[str], bool]: +) -> List[str]: """ If we can get a total ordering from the table, such as via primary key column(s), then return those too so that ordering generation can be @@ -241,7 +246,7 @@ def get_index_cols_and_uniqueness( # Note: This relies on the default behavior of the Block # constructor to create a default sequential index. If that ever # changes, this logic will need to be revisited. - return [], False + return [] else: # Note: It's actually quite difficult to mock this out to unit # test, as it's not possible to subclass enums in Python. See: @@ -278,19 +283,8 @@ def get_index_cols_and_uniqueness( # columns are unique, even if the constraint is not enforced. We make # the same assumption and use these columns as the total ordering keys. index_cols = primary_keys - is_index_unique = len(index_cols) != 0 - else: - is_index_unique = _check_index_uniqueness( - bqclient=bqclient, - ibis_client=ibis_client, - # TODO(b/337925142): Avoid a "SELECT *" subquery here by using - # _create_time_travel_sql with just index_cols. - table=table_expression, - index_cols=index_cols, - api_name=api_name, - ) - return index_cols, is_index_unique + return index_cols def get_time_travel_datetime_and_table_metadata( diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 052ef27384..2b7c6178ff 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -14,11 +14,12 @@ import io import random +import re import tempfile import textwrap import time import typing -from typing import List, Sequence +from typing import List, Optional, Sequence import google import google.cloud.bigquery as bigquery @@ -68,15 +69,6 @@ def test_read_gbq_tokyo( ["my_strings"], id="one_cols_in_query", ), - pytest.param( - "{scalars_table_id}", - ["unknown"], - marks=pytest.mark.xfail( - raises=ValueError, - reason="Column `unknown` not found in this table.", - ), - id="unknown_col", - ), ], ) def test_read_gbq_w_columns( @@ -91,6 +83,38 @@ def test_read_gbq_w_columns( assert df.columns.tolist() == columns +def test_read_gbq_w_unknown_column( + session: bigframes.Session, + scalars_table_id: str, +): + with pytest.raises( + ValueError, + match=re.escape( + "Column 'int63_col' of `columns` not found in this table. Did you mean 'int64_col'?" + ), + ): + session.read_gbq( + scalars_table_id, + columns=["string_col", "int63_col", "bool_col"], + ) + + +def test_read_gbq_w_unknown_index_col( + session: bigframes.Session, + scalars_table_id: str, +): + with pytest.raises( + ValueError, + match=re.escape( + "Column 'int64_two' of `index_col` not found in this table. Did you mean 'int64_too'?" + ), + ): + session.read_gbq( + scalars_table_id, + index_col=["int64_col", "int64_two"], + ) + + @pytest.mark.parametrize( ("query_or_table", "index_col"), [ @@ -248,6 +272,9 @@ def test_read_gbq_w_primary_keys_table( df = session.read_gbq(f"{table.project}.{table.dataset_id}.{table.table_id}") result = df.head(100).to_pandas() + # Verify that primary keys are used as the index. + assert list(result.index.names) == list(primary_keys) + # Verify that the DataFrame is already sorted by primary keys. sorted_result = result.sort_values(primary_keys) pd.testing.assert_frame_equal(result, sorted_result) @@ -256,6 +283,39 @@ def test_read_gbq_w_primary_keys_table( assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql +def test_read_gbq_w_primary_keys_table_and_filters( + session: bigframes.Session, usa_names_grouped_table: bigquery.Table +): + """ + Verify fix for internal issue 338039517, where using filters didn't use the + primary keys for indexing / ordering. + """ + # Validate that the table we're querying has a primary key. + table = usa_names_grouped_table + table_constraints = table.table_constraints + assert table_constraints is not None + primary_key = table_constraints.primary_key + assert primary_key is not None + primary_keys = primary_key.columns + assert len(primary_keys) != 0 + + df = session.read_gbq( + f"{table.project}.{table.dataset_id}.{table.table_id}", + filters=[ + ("name", "LIKE", "W%"), + ("total_people", ">", 100), + ], # type: ignore + ) + result = df.to_pandas() + + # Verify that primary keys are used as the index. + assert list(result.index.names) == list(primary_keys) + + # Verify that the DataFrame is already sorted by primary keys. + sorted_result = result.sort_values(primary_keys) + pd.testing.assert_frame_equal(result, sorted_result) + + @pytest.mark.parametrize( ("query_or_table", "max_results"), [ @@ -350,13 +410,14 @@ def test_read_gbq_table_clustered_with_filter(session: bigframes.Session): ["read_gbq", "read_gbq_table"], ) @pytest.mark.parametrize( - ("filters", "table_id", "index_col", "columns"), + ("filters", "table_id", "index_col", "columns", "max_results"), [ pytest.param( [("_table_suffix", ">=", "1930"), ("_table_suffix", "<=", "1939")], _GSOD_ALL_TABLES, ["stn", "wban", "year", "mo", "da"], ["temp", "max", "min"], + 100, id="all", ), pytest.param( @@ -364,6 +425,7 @@ def test_read_gbq_table_clustered_with_filter(session: bigframes.Session): _GSOD_1930S, (), # index_col ["temp", "max", "min"], + None, # max_results id="columns", ), pytest.param( @@ -371,6 +433,7 @@ def test_read_gbq_table_clustered_with_filter(session: bigframes.Session): _GSOD_ALL_TABLES, (), # index_col, (), # columns + None, # max_results id="filters", ), pytest.param( @@ -378,8 +441,17 @@ def test_read_gbq_table_clustered_with_filter(session: bigframes.Session): _GSOD_1930S, ["stn", "wban", "year", "mo", "da"], (), # columns + None, # max_results id="index_col", ), + pytest.param( + (), # filters + _GSOD_1930S, + (), # index_col + (), # columns + 100, # max_results + id="max_results", + ), ], ) def test_read_gbq_wildcard( @@ -389,10 +461,17 @@ def test_read_gbq_wildcard( table_id: str, index_col: Sequence[str], columns: Sequence[str], + max_results: Optional[int], ): table_metadata = session.bqclient.get_table(table_id) method = getattr(session, api_method) - df = method(table_id, filters=filters, index_col=index_col, columns=columns) + df = method( + table_id, + filters=filters, + index_col=index_col, + columns=columns, + max_results=max_results, + ) num_rows, num_columns = df.shape if index_col: diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 57f9e00363..8ba13a7276 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -13,6 +13,7 @@ # limitations under the License. import datetime +import re from typing import Iterable import google.cloud.bigquery as bigquery @@ -205,19 +206,16 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) @pytest.mark.parametrize( - ("query_or_table", "index_cols", "columns", "filters", "expected_output"), + ( + "query_or_table", + "index_cols", + "columns", + "filters", + "max_results", + "time_travel_timestamp", + "expected_output", + ), [ - pytest.param( - "test_table", - [], - [], - ["date_col", ">", "2022-10-20"], - None, - marks=pytest.mark.xfail( - raises=ValueError, - ), - id="raise_error", - ), pytest.param( "test_table", ["row_index"], @@ -226,30 +224,42 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) (("rowindex", "not in", [0, 6]),), (("string_col", "in", ["Hello, World!", "こんにちは"]),), ], + 123, # max_results, + datetime.datetime(2024, 5, 14, 12, 42, 36, 125125), ( - "SELECT `row_index`, `string_col` FROM `test_table` AS sub WHERE " - "`rowindex` NOT IN (0, 6) OR `string_col` IN ('Hello, World!', " - "'こんにちは')" + "SELECT `row_index`, `string_col` FROM `test_table` " + "FOR SYSTEM_TIME AS OF TIMESTAMP('2024-05-14T12:42:36.125125') " + "WHERE `rowindex` NOT IN (0, 6) OR `string_col` IN ('Hello, World!', " + "'こんにちは') LIMIT 123" ), id="table-all_params-filter_or_operation", ), pytest.param( - """SELECT - rowindex, - string_col, - FROM `test_table` AS t - """, + ( + """SELECT + rowindex, + string_col, + FROM `test_table` AS t + """ + ), ["rowindex"], ["string_col"], [ ("rowindex", "<", 4), ("string_col", "==", "Hello, World!"), ], - """SELECT `rowindex`, `string_col` FROM (SELECT - rowindex, - string_col, - FROM `test_table` AS t - ) AS sub WHERE `rowindex` < 4 AND `string_col` = \'Hello, World!\'""", + 123, # max_results, + datetime.datetime(2024, 5, 14, 12, 42, 36, 125125), + ( + """SELECT `rowindex`, `string_col` FROM (SELECT + rowindex, + string_col, + FROM `test_table` AS t + ) """ + "FOR SYSTEM_TIME AS OF TIMESTAMP('2024-05-14T12:42:36.125125') " + "WHERE `rowindex` < 4 AND `string_col` = 'Hello, World!' " + "LIMIT 123" + ), id="subquery-all_params-filter_and_operation", ), pytest.param( @@ -257,7 +267,9 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], ["col_a", "col_b"], [], - "SELECT `col_a`, `col_b` FROM `test_table` AS sub", + None, # max_results + None, # time_travel_timestampe + "SELECT `col_a`, `col_b` FROM `test_table`", id="table-columns", ), pytest.param( @@ -265,7 +277,9 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], [], [("date_col", ">", "2022-10-20")], - "SELECT * FROM `test_table` AS sub WHERE `date_col` > '2022-10-20'", + None, # max_results + None, # time_travel_timestampe + "SELECT * FROM `test_table` WHERE `date_col` > '2022-10-20'", id="table-filter", ), pytest.param( @@ -273,7 +287,9 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], [], [], - "SELECT * FROM `test_table*` AS sub", + None, # max_results + None, # time_travel_timestampe + "SELECT * FROM `test_table*`", id="wildcard-no_params", ), pytest.param( @@ -281,30 +297,49 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) [], [], [("_TABLE_SUFFIX", ">", "2022-10-20")], - "SELECT * FROM `test_table*` AS sub WHERE `_TABLE_SUFFIX` > '2022-10-20'", + None, # max_results + None, # time_travel_timestampe + "SELECT * FROM `test_table*` WHERE `_TABLE_SUFFIX` > '2022-10-20'", id="wildcard-filter", ), ], ) -def test_to_query(query_or_table, index_cols, columns, filters, expected_output): +def test_to_query( + query_or_table, + index_cols, + columns, + filters, + max_results, + time_travel_timestamp, + expected_output, +): query = io_bq.to_query( query_or_table, - index_cols, - columns, - filters, + index_cols=index_cols, + columns=columns, + filters=filters, + max_results=max_results, + time_travel_timestamp=time_travel_timestamp, ) assert query == expected_output @pytest.mark.parametrize( - ("query_or_table", "filters", "expected_output"), - [], + ("filters", "expected_message"), + ( + pytest.param( + ["date_col", ">", "2022-10-20"], + "Elements of filters must be tuples of length 3, but got 'd'", + ), + ), ) -def test_to_query_with_wildcard_table(query_or_table, filters, expected_output): - query = io_bq.to_query( - query_or_table, - (), # index_cols - (), # columns - filters, - ) - assert query == expected_output +def test_to_query_fails_with_bad_filters(filters, expected_message): + with pytest.raises(ValueError, match=re.escape(expected_message)): + io_bq.to_query( + "test_table", + index_cols=(), + columns=(), + filters=filters, + max_results=None, + time_travel_timestamp=None, + ) diff --git a/tests/unit/session/test_read_gbq_table.py b/tests/unit/session/test_read_gbq_table.py index 1d09769aec..5afd6bb135 100644 --- a/tests/unit/session/test_read_gbq_table.py +++ b/tests/unit/session/test_read_gbq_table.py @@ -20,18 +20,28 @@ import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table +from .. import resources + + +def test_get_ibis_time_travel_table_doesnt_timetravel_anonymous_datasets(): + bqsession = resources.create_bigquery_session() -def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): table_ref = bigquery.TableReference.from_string( "my-test-project._e8166e0cdb.anonbb92cd" ) - sql = bf_read_gbq_table._create_time_travel_sql( - table_ref, datetime.datetime.now(datetime.timezone.utc) + table_expression = bf_read_gbq_table.get_ibis_time_travel_table( + bqsession.ibis_client, + table_ref, + index_cols=(), + columns=(), + filters=(), + time_travel_timestamp=datetime.datetime.now(datetime.timezone.utc), ) + sql = table_expression.compile() # Anonymous query results tables don't support time travel. assert "SYSTEM_TIME" not in sql # Need fully-qualified table name. - assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql + assert "my-test-project" in sql diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 38ea208eaf..47a6013c4c 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -96,15 +96,15 @@ def read_gbq( Reading data with `columns` and `filters` parameters: >>> columns = ['pitcherFirstName', 'pitcherLastName', 'year', 'pitchSpeed'] - >>> filters = [('year', '==', 2016), ('pitcherFirstName', 'in', ['John', 'Doe']), ('pitcherLastName', 'in', ['Gant'])] + >>> filters = [('year', '==', 2016), ('pitcherFirstName', 'in', ['John', 'Doe']), ('pitcherLastName', 'in', ['Gant']), ('pitchSpeed', '>', 94)] >>> df = bpd.read_gbq( ... "bigquery-public-data.baseball.games_wide", ... columns=columns, ... filters=filters, ... ) >>> df.head(1) - pitcherFirstName pitcherLastName year pitchSpeed - 0 John Gant 2016 82 + pitcherFirstName pitcherLastName year pitchSpeed + 0 John Gant 2016 95 [1 rows x 4 columns]