diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 0a03575491..bdbc00e620 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2719,7 +2719,8 @@ def _get_block(self) -> blocks.Block: return self._block def _cached(self) -> DataFrame: - return DataFrame(self._block.cached()) + self._set_block(self._block.cached()) + return self _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries") diff --git a/bigframes/ml/core.py b/bigframes/ml/core.py index 4c5a48cf62..39d01fca8d 100644 --- a/bigframes/ml/core.py +++ b/bigframes/ml/core.py @@ -126,7 +126,7 @@ def generate_text_embedding( def forecast(self) -> bpd.DataFrame: sql = self._model_manipulation_sql_generator.ml_forecast() - return self._session.read_gbq(sql) + return self._session.read_gbq(sql, index_col="forecast_timestamp").reset_index() def evaluate(self, input_data: Optional[bpd.DataFrame] = None): # TODO: validate input data schema @@ -139,14 +139,18 @@ def centroids(self) -> bpd.DataFrame: sql = self._model_manipulation_sql_generator.ml_centroids() - return self._session.read_gbq(sql) + return self._session.read_gbq( + sql, index_col=["centroid_id", "feature"] + ).reset_index() def principal_components(self) -> bpd.DataFrame: assert self._model.model_type == "PCA" sql = self._model_manipulation_sql_generator.ml_principal_components() - return self._session.read_gbq(sql) + return self._session.read_gbq( + sql, index_col=["principal_component_id", "feature"] + ).reset_index() def principal_component_info(self) -> bpd.DataFrame: assert self._model.model_type == "PCA" @@ -228,10 +232,12 @@ def create_model( Returns: a BqmlModel, wrapping a trained model in BigQuery """ options = dict(options) + # Cache dataframes to make sure base table is not a snapshot + # cached dataframe creates a full copy, never uses snapshot if y_train is None: - input_data = X_train + input_data = X_train._cached() else: - input_data = X_train.join(y_train, how="outer") + input_data = X_train._cached().join(y_train._cached(), how="outer") options.update({"INPUT_LABEL_COLS": y_train.columns.tolist()}) session = X_train._session @@ -259,7 +265,9 @@ def create_time_series_model( ), "Time stamp data input must only contain 1 column." options = dict(options) - input_data = X_train.join(y_train, how="outer") + # Cache dataframes to make sure base table is not a snapshot + # cached dataframe creates a full copy, never uses snapshot + input_data = X_train._cached().join(y_train._cached(), how="outer") options.update({"TIME_SERIES_TIMESTAMP_COL": X_train.columns.tolist()[0]}) options.update({"TIME_SERIES_DATA_COL": y_train.columns.tolist()[0]}) diff --git a/bigframes/series.py b/bigframes/series.py index 4fab1fe943..28290d591e 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1503,7 +1503,8 @@ def _slice( ) def _cached(self) -> Series: - return Series(self._block.cached()) + self._set_block(self._block.cached()) + return self def _is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]: diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 9b881de9a0..82c5a1c8d0 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -17,10 +17,10 @@ from __future__ import annotations import datetime +import itertools import logging import os import re -import textwrap import typing from typing import ( Any, @@ -81,6 +81,7 @@ # Even though the ibis.backends.bigquery.registry import is unused, it's needed # to register new and replacement ops with the Ibis BigQuery backend. import third_party.bigframes_vendored.ibis.backends.bigquery.registry # noqa +import third_party.bigframes_vendored.ibis.expr.operations as vendored_ibis_ops import third_party.bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq import third_party.bigframes_vendored.pandas.io.parquet as third_party_pandas_parquet import third_party.bigframes_vendored.pandas.io.parsers.readers as third_party_pandas_readers @@ -378,12 +379,6 @@ def read_gbq_query( ... pitchSpeed, ... FROM `bigquery-public-data.baseball.games_wide` ... ''') - >>> df.head(2) - pitcherFirstName pitcherLastName pitchSpeed - 0 0 - 1 0 - - [2 rows x 3 columns] Preserve ordering in a query input. @@ -480,16 +475,6 @@ def read_gbq_table( Read a whole table, with arbitrary ordering or ordering corresponding to the primary key(s). >>> df = bpd.read_gbq_table("bigquery-public-data.ml_datasets.penguins") - >>> df.head(2) - species island culmen_length_mm \\ - 0 Adelie Penguin (Pygoscelis adeliae) Dream 36.6 - 1 Adelie Penguin (Pygoscelis adeliae) Dream 39.8 - - culmen_depth_mm flipper_length_mm body_mass_g sex - 0 18.4 184.0 3475.0 FEMALE - 1 19.1 184.0 4650.0 MALE - - [2 rows x 7 columns] See also: :meth:`Session.read_gbq`. """ @@ -503,7 +488,7 @@ def read_gbq_table( api_name="read_gbq_table", ) - def _read_gbq_table_to_ibis_with_total_ordering( + def _get_snapshot_sql_and_primary_key( self, table_ref: bigquery.table.TableReference, *, @@ -523,7 +508,6 @@ def _read_gbq_table_to_ibis_with_total_ordering( ), None, ) - table_expression = self.ibis_client.table( table_ref.table_id, database=f"{table_ref.project}.{table_ref.dataset_id}", @@ -534,6 +518,11 @@ def _read_gbq_table_to_ibis_with_total_ordering( # the same assumption and use these columns as the total ordering keys. table = self.bqclient.get_table(table_ref) + if table.location.casefold() != self._location.casefold(): + raise ValueError( + f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" + ) + # TODO(b/305264153): Use public properties to fetch primary keys once # added to google-cloud-bigquery. primary_keys = ( @@ -542,22 +531,18 @@ def _read_gbq_table_to_ibis_with_total_ordering( .get("columns") ) - if not primary_keys: - return table_expression, None - else: - # Read from a snapshot since we won't have to copy the table data to create a total ordering. - job_config = bigquery.QueryJobConfig() - job_config.labels["bigframes-api"] = api_name - current_timestamp = list( - self.bqclient.query( - "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", - job_config=job_config, - ).result() - )[0][0] - table_expression = self.ibis_client.sql( - bigframes_io.create_snapshot_sql(table_ref, current_timestamp) - ) - return table_expression, primary_keys + job_config = bigquery.QueryJobConfig() + job_config.labels["bigframes-api"] = api_name + current_timestamp = list( + self.bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + table_expression = self.ibis_client.sql( + bigframes_io.create_snapshot_sql(table_ref, current_timestamp) + ) + return table_expression, primary_keys def _read_gbq_table( self, @@ -580,10 +565,7 @@ def _read_gbq_table( ( table_expression, total_ordering_cols, - ) = self._read_gbq_table_to_ibis_with_total_ordering( - table_ref, - api_name=api_name, - ) + ) = self._get_snapshot_sql_and_primary_key(table_ref, api_name=api_name) for key in col_order: if key not in table_expression.columns: @@ -596,24 +578,22 @@ def _read_gbq_table( else: index_cols = list(index_col) - hidden_cols: typing.Sequence[str] = () - for key in index_cols: if key not in table_expression.columns: raise ValueError( f"Column `{key}` of `index_col` not found in this table." ) + if col_order: + table_expression = table_expression.select([*index_cols, *col_order]) + # If the index is unique and sortable, then we don't need to generate # an ordering column. ordering = None - is_total_ordering = False - if total_ordering_cols is not None: - # Note: currently, this a table has a total ordering only when the + # Note: currently, a table has a total ordering only when the # primary key(s) are set on a table. The query engine assumes such # columns are unique, even if not enforced. - is_total_ordering = True ordering = orderings.ExpressionOrdering( ordering_value_columns=tuple( core.OrderingColumnReference(column_id) @@ -621,41 +601,17 @@ def _read_gbq_table( ), total_ordering_columns=frozenset(total_ordering_cols), ) - - if len(index_cols) != 0: - index_labels = typing.cast(List[Optional[str]], index_cols) - else: - # Use the total_ordering_cols to project offsets to use as the default index. - table_expression = table_expression.order_by(index_cols) - default_index_id = guid.generate_guid("bigframes_index_") - default_index_col = ( - ibis.row_number().cast(ibis_dtypes.int64).name(default_index_id) - ) - table_expression = table_expression.mutate( - **{default_index_id: default_index_col} - ) - index_cols = [default_index_id] - index_labels = [None] - elif len(index_cols) != 0: - index_labels = typing.cast(List[Optional[str]], index_cols) - distinct_table = table_expression.select(*index_cols).distinct() - is_unique_sql = f"""WITH full_table AS ( - {self.ibis_client.compile(table_expression)} - ), - distinct_table AS ( - {self.ibis_client.compile(distinct_table)} + column_values = [table_expression[col] for col in table_expression.columns] + array_value = core.ArrayValue.from_ibis( + self, + table_expression, + columns=column_values, + hidden_ordering_columns=[], + ordering=ordering, ) - SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, - (SELECT COUNT(*) FROM distinct_table) AS `distinct_count` - """ - results, query_job = self._start_query(is_unique_sql) - row = next(iter(results)) - - total_count = row["total_count"] - distinct_count = row["distinct_count"] - is_total_ordering = total_count == distinct_count - + elif len(index_cols) != 0: + # We have index columns, lets see if those are actually total_order_columns ordering = orderings.ExpressionOrdering( ordering_value_columns=tuple( [ @@ -665,142 +621,61 @@ def _read_gbq_table( ), total_ordering_columns=frozenset(index_cols), ) - - # We have a total ordering, so query via "time travel" so that - # the underlying data doesn't mutate. + is_total_ordering = self._check_index_uniqueness( + table_expression, index_cols + ) if is_total_ordering: - # Get the timestamp from the job metadata rather than the query - # text so that the query for determining uniqueness of the ID - # columns can be cached. - current_timestamp = query_job.started - - # The job finished, so we should have a start time. - assert current_timestamp is not None - table_expression = self.ibis_client.sql( - bigframes_io.create_snapshot_sql(table_ref, current_timestamp) + column_values = [ + table_expression[col] for col in table_expression.columns + ] + array_value = core.ArrayValue.from_ibis( + self, + table_expression, + columns=column_values, + hidden_ordering_columns=[], + ordering=ordering, ) else: - # Make sure when we generate an ordering, the row_number() - # coresponds to the index columns. - table_expression = table_expression.order_by(index_cols) - warnings.warn( - textwrap.dedent( - f""" - Got a non-unique index. A consistent ordering is not - guaranteed. DataFrame has {total_count} rows, - but only {distinct_count} distinct index values. - """, - ) - ) - - # When ordering by index columns, apply limit after ordering to - # make limit more predictable. - if max_results is not None: - table_expression = table_expression.limit(max_results) + array_value = self._create_total_ordering(table_expression) else: - if max_results is not None: - # Apply limit before generating rownums and creating temp table - # This makes sure the offsets are valid and limits the number of - # rows for which row numbers must be generated - table_expression = table_expression.limit(max_results) - table_expression, ordering = self._create_sequential_ordering( - table=table_expression, - api_name=api_name, - ) - hidden_cols = ( - (ordering.total_order_col.column_id,) - if ordering.total_order_col - else () - ) - assert len(ordering.ordering_value_columns) > 0 - is_total_ordering = True - # Block constructor will generate default index if passed empty - index_cols = [] - index_labels = [] - - return self._read_gbq_with_ordering( - table_expression=table_expression, - col_order=col_order, - index_cols=index_cols, - index_labels=index_labels, - hidden_cols=hidden_cols, - ordering=ordering, - is_total_ordering=is_total_ordering, - api_name=api_name, + array_value = self._create_total_ordering(table_expression) + + value_columns = [col for col in array_value.column_ids if col not in index_cols] + block = blocks.Block( + array_value, + index_columns=index_cols, + column_labels=value_columns, + index_labels=index_cols, ) + if max_results: + block = block.slice(stop=max_results) + df = dataframe.DataFrame(block) - def _read_gbq_with_ordering( - self, - table_expression: ibis_types.Table, - *, - col_order: Iterable[str] = (), - col_labels: Iterable[Optional[str]] = (), - index_cols: Iterable[str] = (), - index_labels: Iterable[Optional[str]] = (), - hidden_cols: Iterable[str] = (), - ordering: orderings.ExpressionOrdering, - is_total_ordering: bool = False, - api_name: str, - ) -> dataframe.DataFrame: - """Internal helper method that loads DataFrame from Google BigQuery given an ordering column. + # If user provided index columns, should sort over it + if len(index_cols) > 0: + df.sort_index() + return df - Args: - table_expression: - an ibis table expression to be executed in BigQuery. - col_order: - List of BigQuery column ids in the desired order for results DataFrame. - col_labels: - List of column labels as the column names. - index_cols: - List of index ids to use as the index or multi-index. - index_labels: - List of index labels as names of index. - hidden_cols: - Columns that should be hidden. Ordering columns may (not always) be hidden - ordering: - Column name to be used for ordering. If not supplied, a default ordering is generated. - api_name: - The name of the API method. + def _check_index_uniqueness( + self, table: ibis_types.Table, index_cols: List[str] + ) -> bool: + distinct_table = table.select(*index_cols).distinct() + is_unique_sql = f"""WITH full_table AS ( + {self.ibis_client.compile(table)} + ), + distinct_table AS ( + {self.ibis_client.compile(distinct_table)} + ) - Returns: - A DataFrame representing results of the query or table. + SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, + (SELECT COUNT(*) FROM distinct_table) AS `distinct_count` """ - index_cols, index_labels = list(index_cols), list(index_labels) - if len(index_cols) != len(index_labels): - raise ValueError( - "Needs same number of index labels are there are index columns. " - f"Got {len(index_labels)}, expected {len(index_cols)}." - ) - - # Logic: - # no total ordering, index -> create sequential order, ordered by index, use for both ordering and index - # total ordering, index -> use ordering as ordering, index as index + results, _ = self._start_query(is_unique_sql) + row = next(iter(results)) - # This code block ensures the existence of a total ordering. - column_keys = list(col_order) - if len(column_keys) == 0: - non_value_columns = set([*index_cols, *hidden_cols]) - column_keys = [ - key for key in table_expression.columns if key not in non_value_columns - ] - if not is_total_ordering: - # Rows are not ordered, we need to generate a default ordering and materialize it - table_expression, ordering = self._create_sequential_ordering( - table=table_expression, - index_cols=index_cols, - api_name=api_name, - ) - index_col_values = [table_expression[index_id] for index_id in index_cols] - if not col_labels: - col_labels = column_keys - return self._read_ibis( - table_expression, - index_col_values, - index_labels, - column_keys, - col_labels, - ordering=ordering, - ) + total_count = row["total_count"] + distinct_count = row["distinct_count"] + return total_count == distinct_count def _read_bigquery_load_job( self, @@ -853,40 +728,6 @@ def _read_bigquery_load_job( col_order=col_order, ) - def _read_ibis( - self, - table_expression: ibis_types.Table, - index_cols: Iterable[ibis_types.Value], - index_labels: Iterable[blocks.Label], - column_keys: Iterable[str], - column_labels: Iterable[blocks.Label], - ordering: orderings.ExpressionOrdering, - ) -> dataframe.DataFrame: - """Turns a table expression (plus index column) into a DataFrame.""" - - columns = list(index_cols) - for key in column_keys: - if key not in table_expression.columns: - raise ValueError(f"Column '{key}' not found in this table.") - columns.append(table_expression[key]) - - non_hidden_ids = [col.get_name() for col in columns] - hidden_ordering_columns = [] - for ref in ordering.all_ordering_columns: - if ref.column_id not in non_hidden_ids: - hidden_ordering_columns.append(table_expression[ref.column_id]) - - block = blocks.Block( - core.ArrayValue.from_ibis( - self, table_expression, columns, hidden_ordering_columns, ordering - ), - index_columns=[index_col.get_name() for index_col in index_cols], - column_labels=column_labels, - index_labels=index_labels, - ) - - return dataframe.DataFrame(block) - def read_gbq_model(self, model_name: str): """Loads a BigQuery ML model from BigQuery. @@ -1009,17 +850,26 @@ def _read_pandas( ): new_idx_ids, idx_labels = [], [] - df = self._read_gbq_with_ordering( - table_expression=table_expression, - col_labels=col_labels, - index_cols=new_idx_ids, - index_labels=idx_labels, - hidden_cols=(ordering_col,), + column_values = [ + table_expression[col] + for col in table_expression.columns + if col != ordering_col + ] + array_value = core.ArrayValue.from_ibis( + self, + table_expression, + columns=column_values, + hidden_ordering_columns=[table_expression[ordering_col]], ordering=ordering, - is_total_ordering=True, - api_name=api_name, ) - return df + + block = blocks.Block( + array_value, + index_columns=new_idx_ids, + column_labels=col_labels, + index_labels=idx_labels, + ) + return dataframe.DataFrame(block) def read_csv( self, @@ -1299,34 +1149,50 @@ def _create_empty_temp_table( ) return bigquery.TableReference.from_string(table) - def _create_sequential_ordering( + def _create_total_ordering( self, table: ibis_types.Table, - index_cols: Iterable[str] = (), - api_name: str = "", - ) -> Tuple[ibis_types.Table, orderings.ExpressionOrdering]: + ) -> core.ArrayValue: # Since this might also be used as the index, don't use the default # "ordering ID" name. - default_ordering_name = guid.generate_guid("bigframes_ordering_") - default_ordering_col = ( - ibis.row_number().cast(ibis_dtypes.int64).name(default_ordering_name) + ordering_hash_part = guid.generate_guid("bigframes_ordering_") + ordering_rand_part = guid.generate_guid("bigframes_ordering_") + + str_values = list( + map(lambda col: _convert_to_string(table[col]), table.columns) ) - table = table.mutate(**{default_ordering_name: default_ordering_col}) - table_ref = self._ibis_to_temp_table( - table, - cluster_cols=list(index_cols) + [default_ordering_name], - api_name=api_name, + full_row_str = ( + str_values[0].concat(*str_values[1:]) + if len(str_values) > 1 + else str_values[0] ) - table = self.ibis_client.table( - f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" + full_row_hash = full_row_str.hash().name(ordering_hash_part) + # Used to disambiguate between identical rows (which will have identical hash) + random_value = ibis.random().name(ordering_rand_part) + + original_column_ids = table.columns + table_with_ordering = table.select( + itertools.chain(original_column_ids, [full_row_hash, random_value]) ) - ordering_reference = core.OrderingColumnReference(default_ordering_name) + + ordering_ref1 = core.OrderingColumnReference(ordering_hash_part) + ordering_ref2 = core.OrderingColumnReference(ordering_rand_part) ordering = orderings.ExpressionOrdering( - ordering_value_columns=tuple([ordering_reference]), - total_ordering_columns=frozenset([default_ordering_name]), - integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True), + ordering_value_columns=(ordering_ref1, ordering_ref2), + total_ordering_columns=frozenset([ordering_hash_part, ordering_rand_part]), + ) + columns = [table_with_ordering[col] for col in original_column_ids] + hidden_columns = [ + table_with_ordering[ordering_hash_part], + table_with_ordering[ordering_rand_part], + ] + return core.ArrayValue.from_ibis( + self, + table_with_ordering, + columns, + hidden_ordering_columns=hidden_columns, + ordering=ordering, ) - return table, ordering def _ibis_to_temp_table( self, @@ -1561,3 +1427,23 @@ def _can_cluster_bq(field: bigquery.SchemaField): "BOOL", "BOOLEAN", ) + + +def _convert_to_string(column: ibis_types.Column) -> ibis_types.StringColumn: + col_type = column.type() + if ( + col_type.is_numeric() + or col_type.is_boolean() + or col_type.is_binary() + or col_type.is_temporal() + ): + result = column.cast(ibis_dtypes.String(nullable=True)) + elif col_type.is_geospatial(): + result = typing.cast(ibis_types.GeoSpatialColumn, column).as_text() + elif col_type.is_string(): + result = column + else: + # TO_JSON_STRING works with all data types, but isn't the most efficient + # Needed for JSON, STRUCT and ARRAY datatypes + result = vendored_ibis_ops.ToJsonString(column).to_expr() # type: ignore + return typing.cast(ibis_types.StringColumn, result) diff --git a/tests/system/small/ml/test_cluster.py b/tests/system/small/ml/test_cluster.py index caeffa7768..266a38e3ee 100644 --- a/tests/system/small/ml/test_cluster.py +++ b/tests/system/small/ml/test_cluster.py @@ -89,59 +89,67 @@ def test_kmeans_score(session, penguins_kmeans_model: cluster.KMeans): def test_kmeans_cluster_centers(penguins_kmeans_model: cluster.KMeans): - result = penguins_kmeans_model.cluster_centers_.to_pandas() - expected = pd.DataFrame( - { - "centroid_id": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3], - "feature": [ - "culmen_length_mm", - "culmen_depth_mm", - "flipper_length_mm", - "sex", - ] - * 3, - "numerical_value": [ - 47.509677, - 14.993548, - 217.040123, - pd.NA, - 38.207813, - 18.03125, - 187.992188, - pd.NA, - 47.036346, - 18.834808, - 197.1612, - pd.NA, - ], - "categorical_value": [ - [], - [], - [], - [ - {"category": ".", "value": 0.008064516129032258}, - {"category": "MALE", "value": 0.49193548387096775}, - {"category": "FEMALE", "value": 0.47580645161290325}, - {"category": "_null_filler", "value": 0.024193548387096774}, - ], - [], - [], - [], - [ - {"category": "MALE", "value": 0.34375}, - {"category": "FEMALE", "value": 0.625}, - {"category": "_null_filler", "value": 0.03125}, + result = ( + penguins_kmeans_model.cluster_centers_.to_pandas() + .sort_values(["centroid_id", "feature"]) + .reset_index(drop=True) + ) + expected = ( + pd.DataFrame( + { + "centroid_id": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3], + "feature": [ + "culmen_length_mm", + "culmen_depth_mm", + "flipper_length_mm", + "sex", + ] + * 3, + "numerical_value": [ + 47.509677, + 14.993548, + 217.040123, + pd.NA, + 38.207813, + 18.03125, + 187.992188, + pd.NA, + 47.036346, + 18.834808, + 197.1612, + pd.NA, ], - [], - [], - [], - [ - {"category": "MALE", "value": 0.6847826086956522}, - {"category": "FEMALE", "value": 0.2826086956521739}, - {"category": "_null_filler", "value": 0.03260869565217391}, + "categorical_value": [ + [], + [], + [], + [ + {"category": ".", "value": 0.008064516129032258}, + {"category": "MALE", "value": 0.49193548387096775}, + {"category": "FEMALE", "value": 0.47580645161290325}, + {"category": "_null_filler", "value": 0.024193548387096774}, + ], + [], + [], + [], + [ + {"category": "MALE", "value": 0.34375}, + {"category": "FEMALE", "value": 0.625}, + {"category": "_null_filler", "value": 0.03125}, + ], + [], + [], + [], + [ + {"category": "MALE", "value": 0.6847826086956522}, + {"category": "FEMALE", "value": 0.2826086956521739}, + {"category": "_null_filler", "value": 0.03260869565217391}, + ], ], - ], - }, + }, + ) + .sort_values(["centroid_id", "feature"]) + .reset_index(drop=True) ) pd.testing.assert_frame_equal( result, diff --git a/tests/system/small/ml/test_core.py b/tests/system/small/ml/test_core.py index ec1f351d87..be34a4871c 100644 --- a/tests/system/small/ml/test_core.py +++ b/tests/system/small/ml/test_core.py @@ -78,58 +78,62 @@ def test_model_eval_with_data(penguins_bqml_linear_model, penguins_df_default_in def test_model_centroids(penguins_bqml_kmeans_model: core.BqmlModel): result = penguins_bqml_kmeans_model.centroids().to_pandas() - expected = pd.DataFrame( - { - "centroid_id": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3], - "feature": [ - "culmen_length_mm", - "culmen_depth_mm", - "flipper_length_mm", - "sex", - ] - * 3, - "numerical_value": [ - 47.509677, - 14.993548, - 217.040123, - pd.NA, - 38.207813, - 18.03125, - 187.992188, - pd.NA, - 47.036346, - 18.834808, - 197.1612, - pd.NA, - ], - "categorical_value": [ - [], - [], - [], - [ - {"category": ".", "value": 0.008064516129032258}, - {"category": "MALE", "value": 0.49193548387096775}, - {"category": "FEMALE", "value": 0.47580645161290325}, - {"category": "_null_filler", "value": 0.024193548387096774}, + expected = ( + pd.DataFrame( + { + "centroid_id": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3], + "feature": [ + "culmen_length_mm", + "culmen_depth_mm", + "flipper_length_mm", + "sex", + ] + * 3, + "numerical_value": [ + 47.509677, + 14.993548, + 217.040123, + pd.NA, + 38.207813, + 18.03125, + 187.992188, + pd.NA, + 47.036346, + 18.834808, + 197.1612, + pd.NA, ], - [], - [], - [], - [ - {"category": "MALE", "value": 0.34375}, - {"category": "FEMALE", "value": 0.625}, - {"category": "_null_filler", "value": 0.03125}, + "categorical_value": [ + [], + [], + [], + [ + {"category": ".", "value": 0.008064516129032258}, + {"category": "MALE", "value": 0.49193548387096775}, + {"category": "FEMALE", "value": 0.47580645161290325}, + {"category": "_null_filler", "value": 0.024193548387096774}, + ], + [], + [], + [], + [ + {"category": "MALE", "value": 0.34375}, + {"category": "FEMALE", "value": 0.625}, + {"category": "_null_filler", "value": 0.03125}, + ], + [], + [], + [], + [ + {"category": "MALE", "value": 0.6847826086956522}, + {"category": "FEMALE", "value": 0.2826086956521739}, + {"category": "_null_filler", "value": 0.03260869565217391}, + ], ], - [], - [], - [], - [ - {"category": "MALE", "value": 0.6847826086956522}, - {"category": "FEMALE", "value": 0.2826086956521739}, - {"category": "_null_filler", "value": 0.03260869565217391}, - ], - ], - }, + }, + ) + .sort_values(["centroid_id", "feature"]) + .reset_index(drop=True) ) pd.testing.assert_frame_equal( result, @@ -148,59 +152,63 @@ def test_pca_model_principal_components(penguins_bqml_pca_model: core.BqmlModel) # result is too long, only check the first principal component here. result = result.head(7) - expected = pd.DataFrame( - { - "principal_component_id": [0] * 7, - "feature": [ - "species", - "island", - "culmen_length_mm", - "culmen_depth_mm", - "flipper_length_mm", - "body_mass_g", - "sex", - ], - "numerical_value": [ - pd.NA, - pd.NA, - 0.401489, - -0.377482, - 0.524052, - 0.501174, - pd.NA, - ], - "categorical_value": [ - [ - { - "category": "Gentoo penguin (Pygoscelis papua)", - "value": 0.25068877125667804, - }, - { - "category": "Adelie Penguin (Pygoscelis adeliae)", - "value": -0.20622291900416198, - }, - { - "category": "Chinstrap penguin (Pygoscelis antarctica)", - "value": -0.030161149275185855, - }, + expected = ( + pd.DataFrame( + { + "principal_component_id": [0] * 7, + "feature": [ + "species", + "island", + "culmen_length_mm", + "culmen_depth_mm", + "flipper_length_mm", + "body_mass_g", + "sex", ], - [ - {"category": "Biscoe", "value": 0.19761120114410635}, - {"category": "Dream", "value": -0.11264736305259061}, - {"category": "Torgersen", "value": -0.07065913511418596}, + "numerical_value": [ + pd.NA, + pd.NA, + 0.401489, + -0.377482, + 0.524052, + 0.501174, + pd.NA, ], - [], - [], - [], - [], - [ - {"category": ".", "value": 0.0015916894448071784}, - {"category": "MALE", "value": 0.06869704739750442}, - {"category": "FEMALE", "value": -0.052521171596813174}, - {"category": "_null_filler", "value": -0.0034628622681684906}, + "categorical_value": [ + [ + { + "category": "Gentoo penguin (Pygoscelis papua)", + "value": 0.25068877125667804, + }, + { + "category": "Adelie Penguin (Pygoscelis adeliae)", + "value": -0.20622291900416198, + }, + { + "category": "Chinstrap penguin (Pygoscelis antarctica)", + "value": -0.030161149275185855, + }, + ], + [ + {"category": "Biscoe", "value": 0.19761120114410635}, + {"category": "Dream", "value": -0.11264736305259061}, + {"category": "Torgersen", "value": -0.07065913511418596}, + ], + [], + [], + [], + [], + [ + {"category": ".", "value": 0.0015916894448071784}, + {"category": "MALE", "value": 0.06869704739750442}, + {"category": "FEMALE", "value": -0.052521171596813174}, + {"category": "_null_filler", "value": -0.0034628622681684906}, + ], ], - ], - }, + }, + ) + .sort_values(["principal_component_id", "feature"]) + .reset_index(drop=True) ) pd.testing.assert_frame_equal( result, diff --git a/tests/system/small/ml/test_decomposition.py b/tests/system/small/ml/test_decomposition.py index cc4d2e5801..42fea66cf8 100644 --- a/tests/system/small/ml/test_decomposition.py +++ b/tests/system/small/ml/test_decomposition.py @@ -57,59 +57,63 @@ def test_pca_components_(penguins_pca_model: decomposition.PCA): # result is too long, only check the first principal component here. result = result.head(7) - expected = pd.DataFrame( - { - "principal_component_id": [0] * 7, - "feature": [ - "species", - "island", - "culmen_length_mm", - "culmen_depth_mm", - "flipper_length_mm", - "body_mass_g", - "sex", - ], - "numerical_value": [ - pd.NA, - pd.NA, - 0.401489, - -0.377482, - 0.524052, - 0.501174, - pd.NA, - ], - "categorical_value": [ - [ - { - "category": "Gentoo penguin (Pygoscelis papua)", - "value": 0.25068877125667804, - }, - { - "category": "Adelie Penguin (Pygoscelis adeliae)", - "value": -0.20622291900416198, - }, - { - "category": "Chinstrap penguin (Pygoscelis antarctica)", - "value": -0.030161149275185855, - }, + expected = ( + pd.DataFrame( + { + "principal_component_id": [0] * 7, + "feature": [ + "species", + "island", + "culmen_length_mm", + "culmen_depth_mm", + "flipper_length_mm", + "body_mass_g", + "sex", ], - [ - {"category": "Biscoe", "value": 0.19761120114410635}, - {"category": "Dream", "value": -0.11264736305259061}, - {"category": "Torgersen", "value": -0.07065913511418596}, + "numerical_value": [ + pd.NA, + pd.NA, + 0.401489, + -0.377482, + 0.524052, + 0.501174, + pd.NA, ], - [], - [], - [], - [], - [ - {"category": ".", "value": 0.0015916894448071784}, - {"category": "MALE", "value": 0.06869704739750442}, - {"category": "FEMALE", "value": -0.052521171596813174}, - {"category": "_null_filler", "value": -0.0034628622681684906}, + "categorical_value": [ + [ + { + "category": "Gentoo penguin (Pygoscelis papua)", + "value": 0.25068877125667804, + }, + { + "category": "Adelie Penguin (Pygoscelis adeliae)", + "value": -0.20622291900416198, + }, + { + "category": "Chinstrap penguin (Pygoscelis antarctica)", + "value": -0.030161149275185855, + }, + ], + [ + {"category": "Biscoe", "value": 0.19761120114410635}, + {"category": "Dream", "value": -0.11264736305259061}, + {"category": "Torgersen", "value": -0.07065913511418596}, + ], + [], + [], + [], + [], + [ + {"category": ".", "value": 0.0015916894448071784}, + {"category": "MALE", "value": 0.06869704739750442}, + {"category": "FEMALE", "value": -0.052521171596813174}, + {"category": "_null_filler", "value": -0.0034628622681684906}, + ], ], - ], - }, + }, + ) + .sort_values(["principal_component_id", "feature"]) + .reset_index(drop=True) ) pd.testing.assert_frame_equal( result, diff --git a/tests/system/small/ml/test_forecasting.py b/tests/system/small/ml/test_forecasting.py index cb27dd388c..55079c94cf 100644 --- a/tests/system/small/ml/test_forecasting.py +++ b/tests/system/small/ml/test_forecasting.py @@ -36,6 +36,7 @@ def test_model_predict(time_series_arima_plus_model): expected["forecast_timestamp"] = expected["forecast_timestamp"].astype( pd.ArrowDtype(pa.timestamp("us", tz="UTC")) ) + pd.testing.assert_frame_equal( predictions, expected, diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index d700d93be9..fb9fb7bb89 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -80,6 +80,24 @@ def test_to_pandas_array_struct_correct_result(session): ) +def test_load_json(session): + df = session.read_gbq( + """SELECT + JSON_OBJECT('foo', 10, 'bar', TRUE) AS json_column + """ + ) + + result = df.to_pandas() + expected = pd.DataFrame( + { + "json_column": ['{"bar":true,"foo":10}'], + } + ) + expected.index = expected.index.astype("Int64") + pd.testing.assert_series_equal(result.dtypes, expected.dtypes) + pd.testing.assert_series_equal(result["json_column"], expected["json_column"]) + + def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index): """Verify to_pandas_batches() APIs returns the expected dtypes.""" expected = scalars_df_default_index.dtypes diff --git a/tests/system/small/test_pandas_options.py b/tests/system/small/test_pandas_options.py index ca67710d4e..c410d70fe7 100644 --- a/tests/system/small/test_pandas_options.py +++ b/tests/system/small/test_pandas_options.py @@ -74,7 +74,7 @@ def test_read_gbq_start_sets_session_location( # Now read_gbq* from another location should fail with pytest.raises( - google.api_core.exceptions.NotFound, + (google.api_core.exceptions.NotFound, ValueError), match=dataset_id_permanent, ): read_method(query) @@ -99,7 +99,7 @@ def test_read_gbq_start_sets_session_location( # Now read_gbq* from another location should fail with pytest.raises( - google.api_core.exceptions.NotFound, + (google.api_core.exceptions.NotFound, ValueError), match=dataset_id_permanent_tokyo, ): read_method(query_tokyo) @@ -145,7 +145,7 @@ def test_read_gbq_after_session_start_must_comply_with_default_location( # Doing read_gbq* from a table in another location should fail with pytest.raises( - google.api_core.exceptions.NotFound, + (google.api_core.exceptions.NotFound, ValueError), match=dataset_id_permanent_tokyo, ): read_method(query_tokyo) @@ -193,7 +193,7 @@ def test_read_gbq_must_comply_with_set_location_US( # Starting user journey with read_gbq* from another location should fail with pytest.raises( - google.api_core.exceptions.NotFound, + (google.api_core.exceptions.NotFound, ValueError), match=dataset_id_permanent_tokyo, ): read_method(query_tokyo) @@ -243,7 +243,7 @@ def test_read_gbq_must_comply_with_set_location_non_US( # Starting user journey with read_gbq* from another location should fail with pytest.raises( - google.api_core.exceptions.NotFound, + (google.api_core.exceptions.NotFound, ValueError), match=dataset_id_permanent, ): read_method(query) diff --git a/tests/system/small/test_progress_bar.py b/tests/system/small/test_progress_bar.py index 30ea63b483..c6eee82053 100644 --- a/tests/system/small/test_progress_bar.py +++ b/tests/system/small/test_progress_bar.py @@ -52,14 +52,6 @@ def test_progress_bar_scalar(penguins_df_default_index: bf.dataframe.DataFrame, assert_loading_msg_exist(capsys.readouterr().out) -def test_progress_bar_read_gbq(session: bf.Session, penguins_table_id: str, capsys): - bf.options.display.progress_bar = "terminal" - capsys.readouterr() # clear output - session.read_gbq(penguins_table_id) - - assert_loading_msg_exist(capsys.readouterr().out) - - def test_progress_bar_extract_jobs( penguins_df_default_index: bf.dataframe.DataFrame, gcs_folder, capsys ): diff --git a/tests/unit/ml/test_golden_sql.py b/tests/unit/ml/test_golden_sql.py index 3ca7e144a5..700eb500ff 100644 --- a/tests/unit/ml/test_golden_sql.py +++ b/tests/unit/ml/test_golden_sql.py @@ -38,6 +38,7 @@ def mock_session(): def mock_y(): mock_y = mock.create_autospec(spec=bpd.DataFrame) mock_y.columns = pd.Index(["input_column_label"]) + mock_y._cached.return_value = mock_y return mock_y @@ -57,6 +58,7 @@ def mock_X(mock_y, mock_session): ["index_column_id"], ["index_column_label"], ) + mock_X._cached.return_value = mock_X return mock_X diff --git a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py index a4e61ca0f9..e1b28690d7 100644 --- a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py +++ b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py @@ -22,10 +22,16 @@ def _last_non_null_value(translator, op: vendored_ibis_ops.LastNonNullValue): return f"LAST_VALUE({arg} IGNORE NULLS)" +def _to_json_string(translator, op: vendored_ibis_ops.ToJsonString): + arg = translator.translate(op.arg) + return f"TO_JSON_STRING({arg})" + + patched_ops = { - vendored_ibis_ops.ApproximateMultiQuantile: _approx_quantiles, - vendored_ibis_ops.FirstNonNullValue: _first_non_null_value, - vendored_ibis_ops.LastNonNullValue: _last_non_null_value, + vendored_ibis_ops.ApproximateMultiQuantile: _approx_quantiles, # type:ignore + vendored_ibis_ops.FirstNonNullValue: _first_non_null_value, # type:ignore + vendored_ibis_ops.LastNonNullValue: _last_non_null_value, # type:ignore + vendored_ibis_ops.ToJsonString: _to_json_string, # type:ignore } OPERATION_REGISTRY.update(patched_ops) diff --git a/third_party/bigframes_vendored/ibis/expr/operations/__init__.py b/third_party/bigframes_vendored/ibis/expr/operations/__init__.py index 1612d9c12e..8219701392 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/__init__.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/__init__.py @@ -1,5 +1,6 @@ # Contains code from https://github.com/ibis-project/ibis/blob/master/ibis/expr/operations/__init__.py from __future__ import annotations -from third_party.bigframes_vendored.ibis.expr.operations.analytic import * # noqa: F403 -from third_party.bigframes_vendored.ibis.expr.operations.reductions import * # noqa: F403 +from third_party.bigframes_vendored.ibis.expr.operations.analytic import * # noqa: F401 F403 +from third_party.bigframes_vendored.ibis.expr.operations.json import * # noqa: F401 F403 +from third_party.bigframes_vendored.ibis.expr.operations.reductions import * # noqa: F401 F403 diff --git a/third_party/bigframes_vendored/ibis/expr/operations/json.py b/third_party/bigframes_vendored/ibis/expr/operations/json.py new file mode 100644 index 0000000000..dbb3fa3066 --- /dev/null +++ b/third_party/bigframes_vendored/ibis/expr/operations/json.py @@ -0,0 +1,9 @@ +# Contains code from https://github.com/ibis-project/ibis/blob/master/ibis/expr/operations/json.py +from __future__ import annotations + +import ibis.expr.datatypes as dt +from ibis.expr.operations.core import Unary + + +class ToJsonString(Unary): + output_dtype = dt.string diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 575c501618..2161310b07 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -45,16 +45,6 @@ def read_gbq( If the input is a table ID: >>> df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins") - >>> df.head(2) - species island culmen_length_mm \\ - 0 Adelie Penguin (Pygoscelis adeliae) Dream 36.6 - 1 Adelie Penguin (Pygoscelis adeliae) Dream 39.8 - - culmen_depth_mm flipper_length_mm body_mass_g sex - 0 18.4 184.0 3475.0 FEMALE - 1 19.1 184.0 4650.0 MALE - - [2 rows x 7 columns] Preserve ordering in a query input. diff --git a/third_party/bigframes_vendored/pandas/io/parquet.py b/third_party/bigframes_vendored/pandas/io/parquet.py index f97bd386a4..0f664e70fc 100644 --- a/third_party/bigframes_vendored/pandas/io/parquet.py +++ b/third_party/bigframes_vendored/pandas/io/parquet.py @@ -24,12 +24,6 @@ def read_parquet( >>> gcs_path = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet" >>> df = bpd.read_parquet(path=gcs_path) - >>> df.head(2) - name post_abbr - 0 Alabama AL - 1 Alaska AK - - [2 rows x 2 columns] Args: path (str): diff --git a/third_party/bigframes_vendored/pandas/io/pickle.py b/third_party/bigframes_vendored/pandas/io/pickle.py index 053ba4871c..096d9b13d6 100644 --- a/third_party/bigframes_vendored/pandas/io/pickle.py +++ b/third_party/bigframes_vendored/pandas/io/pickle.py @@ -32,16 +32,6 @@ def read_pickle( >>> gcs_path = "gs://bigframes-dev-testing/test_pickle.pkl" >>> df = bpd.read_pickle(filepath_or_buffer=gcs_path) - >>> df.head(2) - species island culmen_length_mm \\ - 0 Adelie Penguin (Pygoscelis adeliae) Dream 36.6 - 1 Adelie Penguin (Pygoscelis adeliae) Dream 39.8 - - culmen_depth_mm flipper_length_mm body_mass_g sex - 0 18.4 184.0 3475.0 FEMALE - 1 19.1 184.0 4650.0 MALE - - [2 rows x 7 columns] Args: filepath_or_buffer (str, path object, or file-like object):