From 20e5c58868af8b18595d5635cb7722da4f622eb5 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 25 Oct 2024 16:19:47 -0700 Subject: [PATCH 1/8] feat: Add warning when user tries to access struct series fields with `__getitem__` (#1082) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add warning when accessing structs with wrong method * polishing words * move test from unit to system * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * add more tests * polish tests * upgrade test setup * define a dedicated warning and set a stack level at warning site * fix format * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix doc * Apply suggestions from code review * update doc --------- Co-authored-by: Owl Bot Co-authored-by: Tim Sweña (Swast) --- bigframes/core/indexers.py | 23 ++++++ bigframes/exceptions.py | 4 + tests/system/small/core/__init__.py | 13 ++++ tests/system/small/core/test_indexers.py | 94 ++++++++++++++++++++++++ 4 files changed, 134 insertions(+) create mode 100644 tests/system/small/core/__init__.py create mode 100644 tests/system/small/core/test_indexers.py diff --git a/bigframes/core/indexers.py b/bigframes/core/indexers.py index 06d9c4bbab..6c65077528 100644 --- a/bigframes/core/indexers.py +++ b/bigframes/core/indexers.py @@ -16,6 +16,7 @@ import typing from typing import Tuple, Union +import warnings import bigframes_vendored.constants as constants import ibis @@ -27,6 +28,8 @@ import bigframes.core.indexes as indexes import bigframes.core.scalar import bigframes.dataframe +import bigframes.dtypes +import bigframes.exceptions import bigframes.operations as ops import bigframes.series @@ -370,6 +373,7 @@ def _perform_loc_list_join( # right join based on the old index so that the matching rows from the user's # original dataframe will be duplicated and reordered appropriately if isinstance(series_or_dataframe, bigframes.series.Series): + _struct_accessor_check_and_warn(series_or_dataframe, keys_index) original_name = series_or_dataframe.name name = series_or_dataframe.name if series_or_dataframe.name is not None else "0" result = typing.cast( @@ -391,6 +395,25 @@ def _perform_loc_list_join( return result +def _struct_accessor_check_and_warn( + series: bigframes.series.Series, index: indexes.Index +): + if not bigframes.dtypes.is_struct_like(series.dtype): + # No need to check series that do not have struct values + return + + if not bigframes.dtypes.is_string_like(index.dtype): + # No need to check indexing with non-string values. + return + + if not bigframes.dtypes.is_string_like(series.index.dtype): + warnings.warn( + "Are you trying to access struct fields? If so, please use Series.struct.field(...) method instead.", + category=bigframes.exceptions.BadIndexerKeyWarning, + stacklevel=7, # Stack depth from series.__getitem__ to here + ) + + @typing.overload def _iloc_getitem_series_or_dataframe( series_or_dataframe: bigframes.series.Series, key diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 462bdf2bdd..27f3508ff4 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -73,3 +73,7 @@ class UnknownDataTypeWarning(Warning): class ApiDeprecationWarning(FutureWarning): """The API has been deprecated.""" + + +class BadIndexerKeyWarning(Warning): + """The indexer key is not used correctly.""" diff --git a/tests/system/small/core/__init__.py b/tests/system/small/core/__init__.py new file mode 100644 index 0000000000..6d5e14bcf4 --- /dev/null +++ b/tests/system/small/core/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/system/small/core/test_indexers.py b/tests/system/small/core/test_indexers.py new file mode 100644 index 0000000000..2c670f790d --- /dev/null +++ b/tests/system/small/core/test_indexers.py @@ -0,0 +1,94 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import warnings + +import pyarrow as pa +import pytest + +import bigframes.exceptions +import bigframes.pandas as bpd + + +@pytest.fixture(scope="module") +def string_indexed_struct_series(session): + return bpd.Series( + [ + {"project": "pandas", "version": 1}, + ], + dtype=bpd.ArrowDtype( + pa.struct([("project", pa.string()), ("version", pa.int64())]) + ), + index=["a"], + session=session, + ) + + +@pytest.fixture(scope="module") +def number_series(session): + return bpd.Series( + [0], + dtype=bpd.Int64Dtype, + session=session, + ) + + +@pytest.fixture(scope="module") +def string_indexed_number_series(session): + return bpd.Series( + [0], + dtype=bpd.Int64Dtype, + index=["a"], + session=session, + ) + + +def test_non_string_indexed_struct_series_with_string_key_should_warn(session): + s = bpd.Series( + [ + {"project": "pandas", "version": 1}, + ], + dtype=bpd.ArrowDtype( + pa.struct([("project", pa.string()), ("version", pa.int64())]) + ), + session=session, + ) + + with pytest.warns(bigframes.exceptions.BadIndexerKeyWarning): + s["a"] + + +@pytest.mark.parametrize( + "series", + [ + "string_indexed_struct_series", + "number_series", + "string_indexed_number_series", + ], +) +@pytest.mark.parametrize( + "key", + [ + 0, + "a", + ], +) +def test_struct_series_indexers_should_not_warn(request, series, key): + s = request.getfixturevalue(series) + + with warnings.catch_warnings(): + warnings.simplefilter( + "error", category=bigframes.exceptions.BadIndexerKeyWarning + ) + s[key] From 5528b4deb2fb4a966707f8c2daaa14ca2f8f6240 Mon Sep 17 00:00:00 2001 From: jialuoo Date: Mon, 28 Oct 2024 09:44:37 -0700 Subject: [PATCH 2/8] add new rep locations (#1115) --- bigframes/constants.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/bigframes/constants.py b/bigframes/constants.py index 13636a4484..f344c6ecee 100644 --- a/bigframes/constants.py +++ b/bigframes/constants.py @@ -84,8 +84,17 @@ "europe-west9", "europe-west8", "me-central2", + "us-central1", + "us-central2", + "us-east1", "us-east4", + "us-east5", + "us-east7", + "us-south1", "us-west1", + "us-west2", + "us-west3", + "us-west4", } ) From 5f7b8b189c093629d176ffc99364767dc766397a Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 28 Oct 2024 17:27:18 -0700 Subject: [PATCH 3/8] feat: support context manager for bigframes session (#1107) * feat: support context manager for bigframes session * add table cleanup tests for session context manager * rename ctor, split large test --- bigframes/session/__init__.py | 18 ++- tests/system/large/test_remote_function.py | 152 ++++++++++++++++++++ tests/system/large/test_session.py | 35 +++++ tests/unit/_config/test_bigquery_options.py | 8 +- 4 files changed, 208 insertions(+), 5 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 419e9d9a3b..f2f41b8463 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -275,7 +275,23 @@ def __init__( ) def __del__(self): - """Automatic cleanup of internal resources""" + """Automatic cleanup of internal resources.""" + self.close() + + def __enter__(self): + """Enter the runtime context of the Session object. + + See [With Statement Context Managers](https://docs.python.org/3/reference/datamodel.html#with-statement-context-managers) + for more details. + """ + return self + + def __exit__(self, *_): + """Exit the runtime context of the Session object. + + See [With Statement Context Managers](https://docs.python.org/3/reference/datamodel.html#with-statement-context-managers) + for more details. + """ self.close() @property diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 2365002857..4368a6511d 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -2245,3 +2245,155 @@ def test_remote_function_ingress_settings_unsupported(session): @session.remote_function(reuse=False, cloud_function_ingress_settings="unknown") def square(x: int) -> int: return x * x + + +@pytest.mark.parametrize( + ("session_creator"), + [ + pytest.param(bigframes.Session, id="session-constructor"), + pytest.param(bigframes.connect, id="connect-method"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_w_context_manager_unnamed( + scalars_dfs, dataset_id, bq_cf_connection, session_creator +): + def add_one(x: int) -> int: + return x + 1 + + scalars_df, scalars_pandas_df = scalars_dfs + pd_result = scalars_pandas_df["int64_too"].apply(add_one) + + temporary_bigquery_remote_function = None + temporary_cloud_run_function = None + + try: + with session_creator() as session: + # create a temporary remote function + add_one_remote_temp = session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + )(add_one) + + temporary_bigquery_remote_function = ( + add_one_remote_temp.bigframes_remote_function + ) + assert temporary_bigquery_remote_function is not None + assert ( + session.bqclient.get_routine(temporary_bigquery_remote_function) + is not None + ) + + temporary_cloud_run_function = add_one_remote_temp.bigframes_cloud_function + assert temporary_cloud_run_function is not None + assert ( + session.cloudfunctionsclient.get_function( + name=temporary_cloud_run_function + ) + is not None + ) + + bf_result = scalars_df["int64_too"].apply(add_one_remote_temp).to_pandas() + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + # outside the with statement context manager the temporary BQ remote + # function and the underlying cloud run function should have been + # cleaned up + assert temporary_bigquery_remote_function is not None + with pytest.raises(google.api_core.exceptions.NotFound): + session.bqclient.get_routine(temporary_bigquery_remote_function) + # the deletion of cloud function happens in a non-blocking way, ensure that + # it either exists in a being-deleted state, or is already deleted + assert temporary_cloud_run_function is not None + try: + gcf = session.cloudfunctionsclient.get_function( + name=temporary_cloud_run_function + ) + assert gcf.state is functions_v2.Function.State.DELETING + except google.cloud.exceptions.NotFound: + pass + finally: + # clean up the gcp assets created for the temporary remote function, + # just in case it was not explicitly cleaned up in the try clause due + # to assertion failure or exception earlier than that + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, add_one_remote_temp + ) + + +@pytest.mark.parametrize( + ("session_creator"), + [ + pytest.param(bigframes.Session, id="session-constructor"), + pytest.param(bigframes.connect, id="connect-method"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_w_context_manager_named( + scalars_dfs, dataset_id, bq_cf_connection, session_creator +): + def add_one(x: int) -> int: + return x + 1 + + scalars_df, scalars_pandas_df = scalars_dfs + pd_result = scalars_pandas_df["int64_too"].apply(add_one) + + persistent_bigquery_remote_function = None + persistent_cloud_run_function = None + + try: + with session_creator() as session: + # create a persistent remote function + name = test_utils.prefixer.Prefixer("bigframes", "").create_prefix() + add_one_remote_persist = session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + name=name, + )(add_one) + + persistent_bigquery_remote_function = ( + add_one_remote_persist.bigframes_remote_function + ) + assert persistent_bigquery_remote_function is not None + assert ( + session.bqclient.get_routine(persistent_bigquery_remote_function) + is not None + ) + + persistent_cloud_run_function = ( + add_one_remote_persist.bigframes_cloud_function + ) + assert persistent_cloud_run_function is not None + assert ( + session.cloudfunctionsclient.get_function( + name=persistent_cloud_run_function + ) + is not None + ) + + bf_result = ( + scalars_df["int64_too"].apply(add_one_remote_persist).to_pandas() + ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + + # outside the with statement context manager the persistent BQ remote + # function and the underlying cloud run function should still exist + assert persistent_bigquery_remote_function is not None + assert ( + session.bqclient.get_routine(persistent_bigquery_remote_function) + is not None + ) + assert persistent_cloud_run_function is not None + assert ( + session.cloudfunctionsclient.get_function( + name=persistent_cloud_run_function + ) + is not None + ) + finally: + # clean up the gcp assets created for the persistent remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, add_one_remote_persist + ) diff --git a/tests/system/large/test_session.py b/tests/system/large/test_session.py index 9f42c4ae94..7f13462cbe 100644 --- a/tests/system/large/test_session.py +++ b/tests/system/large/test_session.py @@ -133,3 +133,38 @@ def test_clean_up_by_session_id(): assert not any( [(session.session_id in table.full_table_id) for table in tables_after] ) + + +@pytest.mark.parametrize( + ("session_creator"), + [ + pytest.param(bigframes.Session, id="session-constructor"), + pytest.param(bigframes.connect, id="connect-method"), + ], +) +def test_clean_up_via_context_manager(session_creator): + # we will create two tables and confirm that they are deleted + # when the session is closed + with session_creator() as session: + bqclient = session.bqclient + + expiration = ( + datetime.datetime.now(datetime.timezone.utc) + + bigframes.constants.DEFAULT_EXPIRATION + ) + full_id_1 = bigframes.session._io.bigquery.create_temp_table( + session.bqclient, session._temp_storage_manager._random_table(), expiration + ) + full_id_2 = bigframes.session._io.bigquery.create_temp_table( + session.bqclient, session._temp_storage_manager._random_table(), expiration + ) + + # check that the tables were actually created + assert bqclient.get_table(full_id_1).created is not None + assert bqclient.get_table(full_id_2).created is not None + + # check that the tables are already deleted + with pytest.raises(google.cloud.exceptions.NotFound): + bqclient.delete_table(full_id_1) + with pytest.raises(google.cloud.exceptions.NotFound): + bqclient.delete_table(full_id_2) diff --git a/tests/unit/_config/test_bigquery_options.py b/tests/unit/_config/test_bigquery_options.py index d04b5bd575..44cf024219 100644 --- a/tests/unit/_config/test_bigquery_options.py +++ b/tests/unit/_config/test_bigquery_options.py @@ -98,7 +98,7 @@ def test_setter_if_session_started_but_setting_the_same_value(attribute): ) def test_location_set_to_valid_no_warning(valid_location): # test setting location through constructor - def set_location_in_ctor(): + def set_location_in_constructor(): bigquery_options.BigQueryOptions(location=valid_location) # test setting location property @@ -106,7 +106,7 @@ def set_location_property(): options = bigquery_options.BigQueryOptions() options.location = valid_location - for op in [set_location_in_ctor, set_location_property]: + for op in [set_location_in_constructor, set_location_property]: # Ensure that no warnings are emitted. # https://docs.pytest.org/en/7.0.x/how-to/capture-warnings.html#additional-use-cases-of-warnings-in-tests with warnings.catch_warnings(): @@ -136,7 +136,7 @@ def set_location_property(): ) def test_location_set_to_invalid_warning(invalid_location, possibility): # test setting location through constructor - def set_location_in_ctor(): + def set_location_in_constructor(): bigquery_options.BigQueryOptions(location=invalid_location) # test setting location property @@ -144,7 +144,7 @@ def set_location_property(): options = bigquery_options.BigQueryOptions() options.location = invalid_location - for op in [set_location_in_ctor, set_location_property]: + for op in [set_location_in_constructor, set_location_property]: with pytest.warns( bigframes.exceptions.UnknownLocationWarning, match=re.escape( From 254875c25f39df4bc477e1ed7339ecb30b395ab6 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 28 Oct 2024 17:44:08 -0700 Subject: [PATCH 4/8] feat: allow `fit` to take additional eval data in linear and ensemble models (#1096) * feat: allow `fit` to take additional eval data The additional eval data would be used to measure the fitted model and attach the measurement to the underlying BQML model, which can be used as benchmark for the model consumers in BigQuery Studio and otherwise. * subclass from TrainablePredictor * add support for fit-time evaluation in ensemble models * fetch logistic regression eval numbers from multiClassClassificationMetrics * use the generic type template * update vendored docstrings for fit taking X_eval, y_eval * update key to fetch model eval metrics * enfore binary classification in the logistic regression test --- bigframes/ml/base.py | 34 ++++ bigframes/ml/ensemble.py | 56 ++++++- bigframes/ml/linear_model.py | 29 +++- bigframes/ml/utils.py | 38 ++++- tests/system/large/ml/test_linear_model.py | 156 ++++++++++++++++++ .../sklearn/ensemble/_forest.py | 7 + .../sklearn/linear_model/_base.py | 7 + .../sklearn/linear_model/_logistic.py | 8 + .../bigframes_vendored/xgboost/sklearn.py | 7 + 9 files changed, 327 insertions(+), 15 deletions(-) diff --git a/bigframes/ml/base.py b/bigframes/ml/base.py index 550b4a8178..5662e54d6d 100644 --- a/bigframes/ml/base.py +++ b/bigframes/ml/base.py @@ -164,6 +164,40 @@ def fit( return self._fit(X, y) +class TrainableWithEvaluationPredictor(TrainablePredictor): + """A BigQuery DataFrames ML Model base class that can be used to fit and predict outputs. + + Additional evaluation data can be provided to measure the model in the fit phase.""" + + @abc.abstractmethod + def _fit(self, X, y, transforms=None, X_eval=None, y_eval=None): + pass + + @abc.abstractmethod + def score(self, X, y): + pass + + +class SupervisedTrainableWithEvaluationPredictor(TrainableWithEvaluationPredictor): + """A BigQuery DataFrames ML Supervised Model base class that can be used to fit and predict outputs. + + Need to provide both X and y in supervised tasks. + + Additional X_eval and y_eval can be provided to measure the model in the fit phase. + """ + + _T = TypeVar("_T", bound="SupervisedTrainableWithEvaluationPredictor") + + def fit( + self: _T, + X: utils.ArrayType, + y: utils.ArrayType, + X_eval: Optional[utils.ArrayType] = None, + y_eval: Optional[utils.ArrayType] = None, + ) -> _T: + return self._fit(X, y, X_eval=X_eval, y_eval=y_eval) + + class UnsupervisedTrainablePredictor(TrainablePredictor): """A BigQuery DataFrames ML Unsupervised Model base class that can be used to fit and predict outputs. diff --git a/bigframes/ml/ensemble.py b/bigframes/ml/ensemble.py index 91c14e4336..253ef7c5c1 100644 --- a/bigframes/ml/ensemble.py +++ b/bigframes/ml/ensemble.py @@ -52,7 +52,7 @@ @log_adapter.class_logger class XGBRegressor( - base.SupervisedTrainablePredictor, + base.SupervisedTrainableWithEvaluationPredictor, bigframes_vendored.xgboost.sklearn.XGBRegressor, ): __doc__ = bigframes_vendored.xgboost.sklearn.XGBRegressor.__doc__ @@ -145,14 +145,24 @@ def _fit( X: utils.ArrayType, y: utils.ArrayType, transforms: Optional[List[str]] = None, + X_eval: Optional[utils.ArrayType] = None, + y_eval: Optional[utils.ArrayType] = None, ) -> XGBRegressor: X, y = utils.convert_to_dataframe(X, y) + bqml_options = self._bqml_options + + if X_eval is not None and y_eval is not None: + X_eval, y_eval = utils.convert_to_dataframe(X_eval, y_eval) + X, y, bqml_options = utils.combine_training_and_evaluation_data( + X, y, X_eval, y_eval, bqml_options + ) + self._bqml_model = self._bqml_model_factory.create_model( X, y, transforms=transforms, - options=self._bqml_options, + options=bqml_options, ) return self @@ -200,7 +210,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> XGBRegressor: @log_adapter.class_logger class XGBClassifier( - base.SupervisedTrainablePredictor, + base.SupervisedTrainableWithEvaluationPredictor, bigframes_vendored.xgboost.sklearn.XGBClassifier, ): @@ -294,14 +304,24 @@ def _fit( X: utils.ArrayType, y: utils.ArrayType, transforms: Optional[List[str]] = None, + X_eval: Optional[utils.ArrayType] = None, + y_eval: Optional[utils.ArrayType] = None, ) -> XGBClassifier: X, y = utils.convert_to_dataframe(X, y) + bqml_options = self._bqml_options + + if X_eval is not None and y_eval is not None: + X_eval, y_eval = utils.convert_to_dataframe(X_eval, y_eval) + X, y, bqml_options = utils.combine_training_and_evaluation_data( + X, y, X_eval, y_eval, bqml_options + ) + self._bqml_model = self._bqml_model_factory.create_model( X, y, transforms=transforms, - options=self._bqml_options, + options=bqml_options, ) return self @@ -347,7 +367,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> XGBClassifier: @log_adapter.class_logger class RandomForestRegressor( - base.SupervisedTrainablePredictor, + base.SupervisedTrainableWithEvaluationPredictor, bigframes_vendored.sklearn.ensemble._forest.RandomForestRegressor, ): @@ -430,14 +450,24 @@ def _fit( X: utils.ArrayType, y: utils.ArrayType, transforms: Optional[List[str]] = None, + X_eval: Optional[utils.ArrayType] = None, + y_eval: Optional[utils.ArrayType] = None, ) -> RandomForestRegressor: X, y = utils.convert_to_dataframe(X, y) + bqml_options = self._bqml_options + + if X_eval is not None and y_eval is not None: + X_eval, y_eval = utils.convert_to_dataframe(X_eval, y_eval) + X, y, bqml_options = utils.combine_training_and_evaluation_data( + X, y, X_eval, y_eval, bqml_options + ) + self._bqml_model = self._bqml_model_factory.create_model( X, y, transforms=transforms, - options=self._bqml_options, + options=bqml_options, ) return self @@ -503,7 +533,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> RandomForestRegresso @log_adapter.class_logger class RandomForestClassifier( - base.SupervisedTrainablePredictor, + base.SupervisedTrainableWithEvaluationPredictor, bigframes_vendored.sklearn.ensemble._forest.RandomForestClassifier, ): @@ -586,14 +616,24 @@ def _fit( X: utils.ArrayType, y: utils.ArrayType, transforms: Optional[List[str]] = None, + X_eval: Optional[utils.ArrayType] = None, + y_eval: Optional[utils.ArrayType] = None, ) -> RandomForestClassifier: X, y = utils.convert_to_dataframe(X, y) + bqml_options = self._bqml_options + + if X_eval is not None and y_eval is not None: + X_eval, y_eval = utils.convert_to_dataframe(X_eval, y_eval) + X, y, bqml_options = utils.combine_training_and_evaluation_data( + X, y, X_eval, y_eval, bqml_options + ) + self._bqml_model = self._bqml_model_factory.create_model( X, y, transforms=transforms, - options=self._bqml_options, + options=bqml_options, ) return self diff --git a/bigframes/ml/linear_model.py b/bigframes/ml/linear_model.py index 5665507286..85be54e596 100644 --- a/bigframes/ml/linear_model.py +++ b/bigframes/ml/linear_model.py @@ -47,7 +47,7 @@ @log_adapter.class_logger class LinearRegression( - base.SupervisedTrainablePredictor, + base.SupervisedTrainableWithEvaluationPredictor, bigframes_vendored.sklearn.linear_model._base.LinearRegression, ): __doc__ = bigframes_vendored.sklearn.linear_model._base.LinearRegression.__doc__ @@ -131,14 +131,24 @@ def _fit( X: utils.ArrayType, y: utils.ArrayType, transforms: Optional[List[str]] = None, + X_eval: Optional[utils.ArrayType] = None, + y_eval: Optional[utils.ArrayType] = None, ) -> LinearRegression: X, y = utils.convert_to_dataframe(X, y) + bqml_options = self._bqml_options + + if X_eval is not None and y_eval is not None: + X_eval, y_eval = utils.convert_to_dataframe(X_eval, y_eval) + X, y, bqml_options = utils.combine_training_and_evaluation_data( + X, y, X_eval, y_eval, bqml_options + ) + self._bqml_model = self._bqml_model_factory.create_model( X, y, transforms=transforms, - options=self._bqml_options, + options=bqml_options, ) return self @@ -183,7 +193,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> LinearRegression: @log_adapter.class_logger class LogisticRegression( - base.SupervisedTrainablePredictor, + base.SupervisedTrainableWithEvaluationPredictor, bigframes_vendored.sklearn.linear_model._logistic.LogisticRegression, ): __doc__ = ( @@ -283,15 +293,24 @@ def _fit( X: utils.ArrayType, y: utils.ArrayType, transforms: Optional[List[str]] = None, + X_eval: Optional[utils.ArrayType] = None, + y_eval: Optional[utils.ArrayType] = None, ) -> LogisticRegression: - """Fit model with transforms.""" X, y = utils.convert_to_dataframe(X, y) + bqml_options = self._bqml_options + + if X_eval is not None and y_eval is not None: + X_eval, y_eval = utils.convert_to_dataframe(X_eval, y_eval) + X, y, bqml_options = utils.combine_training_and_evaluation_data( + X, y, X_eval, y_eval, bqml_options + ) + self._bqml_model = self._bqml_model_factory.create_model( X, y, transforms=transforms, - options=self._bqml_options, + options=bqml_options, ) return self diff --git a/bigframes/ml/utils.py b/bigframes/ml/utils.py index bdca45e457..8daed169da 100644 --- a/bigframes/ml/utils.py +++ b/bigframes/ml/utils.py @@ -13,13 +13,13 @@ # limitations under the License. import typing -from typing import Any, Generator, Literal, Mapping, Optional, Union +from typing import Any, Generator, Literal, Mapping, Optional, Tuple, Union import bigframes_vendored.constants as constants from google.cloud import bigquery import pandas as pd -from bigframes.core import blocks +from bigframes.core import blocks, guid import bigframes.pandas as bpd from bigframes.session import Session @@ -155,3 +155,37 @@ def retrieve_params_from_bq_model( kwargs[bf_param] = bf_param_type(last_fitting[bqml_param]) return kwargs + + +def combine_training_and_evaluation_data( + X_train: bpd.DataFrame, + y_train: bpd.DataFrame, + X_eval: bpd.DataFrame, + y_eval: bpd.DataFrame, + bqml_options: dict, +) -> Tuple[bpd.DataFrame, bpd.DataFrame, dict]: + """ + Combine training data and labels with evlauation data and labels, and keep + them differentiated through a split column in the combined data and labels. + """ + + assert X_train.columns.equals(X_eval.columns) + assert y_train.columns.equals(y_eval.columns) + + # create a custom split column for BQML and supply the evaluation + # data along with the training data in a combined single table + # https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-create-dnn-models#data_split_col. + split_col = guid.generate_guid() + assert split_col not in X_train.columns + + X_train[split_col] = False + X_eval[split_col] = True + X = bpd.concat([X_train, X_eval]) + y = bpd.concat([y_train, y_eval]) + + # create options copy to not mutate the incoming one + bqml_options = bqml_options.copy() + bqml_options["data_split_method"] = "CUSTOM" + bqml_options["data_split_col"] = split_col + + return X, y, bqml_options diff --git a/tests/system/large/ml/test_linear_model.py b/tests/system/large/ml/test_linear_model.py index 273da97bc5..f6ca26e7e4 100644 --- a/tests/system/large/ml/test_linear_model.py +++ b/tests/system/large/ml/test_linear_model.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pandas as pd + +from bigframes.ml import model_selection import bigframes.ml.linear_model from tests.system import utils @@ -58,6 +61,85 @@ def test_linear_regression_configure_fit_score(penguins_df_default_index, datase assert reloaded_model.tol == 0.01 +def test_linear_regression_configure_fit_with_eval_score( + penguins_df_default_index, dataset_id +): + model = bigframes.ml.linear_model.LinearRegression() + + df = penguins_df_default_index.dropna() + X = df[ + [ + "species", + "island", + "culmen_length_mm", + "culmen_depth_mm", + "flipper_length_mm", + "sex", + ] + ] + y = df[["body_mass_g"]] + + X_train, X_eval, y_train, y_eval = model_selection.train_test_split(X, y) + + model.fit(X_train, y_train, X_eval=X_eval, y_eval=y_eval) + + # Check score to ensure the model was fitted + result = model.score(X_eval, y_eval).to_pandas() + utils.check_pandas_df_schema_and_index( + result, columns=utils.ML_REGRESSION_METRICS, index=1 + ) + + # save, load, check parameters to ensure configuration was kept + bq_model_name = f"{dataset_id}.temp_configured_model" + reloaded_model = model.to_gbq(bq_model_name, replace=True) + assert reloaded_model._bqml_model is not None + assert ( + f"{dataset_id}.temp_configured_model" in reloaded_model._bqml_model.model_name + ) + assert reloaded_model.optimize_strategy == "NORMAL_EQUATION" + assert reloaded_model.fit_intercept is True + assert reloaded_model.calculate_p_values is False + assert reloaded_model.enable_global_explain is False + assert reloaded_model.l1_reg is None + assert reloaded_model.l2_reg == 0.0 + assert reloaded_model.learning_rate is None + assert reloaded_model.learning_rate_strategy == "line_search" + assert reloaded_model.ls_init_learning_rate is None + assert reloaded_model.max_iterations == 20 + assert reloaded_model.tol == 0.01 + + # make sure the bqml model was internally created with custom split + bq_model = penguins_df_default_index._session.bqclient.get_model(bq_model_name) + last_fitting = bq_model.training_runs[-1]["trainingOptions"] + assert last_fitting["dataSplitMethod"] == "CUSTOM" + assert "dataSplitColumn" in last_fitting + + # make sure the bqml model has the same evaluation metrics attached as + # returned by model.score() + bq_model_expected_eval_metrics = result[utils.ML_REGRESSION_METRICS[:5]] + bq_model_eval_metrics = bq_model.training_runs[-1]["evaluationMetrics"][ + "regressionMetrics" + ] + bq_model_eval_metrics = pd.DataFrame( + [ + [ + bq_model_eval_metrics["meanAbsoluteError"], + bq_model_eval_metrics["meanSquaredError"], + bq_model_eval_metrics["meanSquaredLogError"], + bq_model_eval_metrics["medianAbsoluteError"], + bq_model_eval_metrics["rSquared"], + ] + ], + columns=utils.ML_REGRESSION_METRICS[:5], + ) + pd.testing.assert_frame_equal( + bq_model_expected_eval_metrics, + bq_model_eval_metrics, + check_dtype=False, + check_index_type=False, + ) + + def test_linear_regression_customized_params_fit_score( penguins_df_default_index, dataset_id ): @@ -216,6 +298,80 @@ def test_logistic_regression_configure_fit_score(penguins_df_default_index, data assert reloaded_model.class_weight is None +def test_logistic_regression_configure_fit_with_eval_score( + penguins_df_default_index, dataset_id +): + model = bigframes.ml.linear_model.LogisticRegression() + + df = penguins_df_default_index.dropna() + df = df[df["sex"].isin(["MALE", "FEMALE"])] + + X = df[ + [ + "species", + "island", + "culmen_length_mm", + "culmen_depth_mm", + "flipper_length_mm", + "body_mass_g", + ] + ] + y = df[["sex"]] + + X_train, X_eval, y_train, y_eval = model_selection.train_test_split(X, y) + + model.fit(X_train, y_train, X_eval=X_eval, y_eval=y_eval) + + # Check score to ensure the model was fitted + result = model.score(X_eval, y_eval).to_pandas() + utils.check_pandas_df_schema_and_index( + result, columns=utils.ML_CLASSFICATION_METRICS, index=1 + ) + + # save, load, check parameters to ensure configuration was kept + bq_model_name = f"{dataset_id}.temp_configured_logistic_reg_model" + reloaded_model = model.to_gbq(bq_model_name, replace=True) + assert reloaded_model._bqml_model is not None + assert ( + f"{dataset_id}.temp_configured_logistic_reg_model" + in reloaded_model._bqml_model.model_name + ) + assert reloaded_model.fit_intercept is True + assert reloaded_model.class_weight is None + + # make sure the bqml model was internally created with custom split + bq_model = penguins_df_default_index._session.bqclient.get_model(bq_model_name) + last_fitting = bq_model.training_runs[-1]["trainingOptions"] + assert last_fitting["dataSplitMethod"] == "CUSTOM" + assert "dataSplitColumn" in last_fitting + + # make sure the bqml model has the same evaluation metrics attached as + # returned by model.score() + bq_model_expected_eval_metrics = result + bq_model_eval_metrics = bq_model.training_runs[-1]["evaluationMetrics"][ + "binaryClassificationMetrics" + ]["aggregateClassificationMetrics"] + bq_model_eval_metrics = pd.DataFrame( + [ + [ + bq_model_eval_metrics["precision"], + bq_model_eval_metrics["recall"], + bq_model_eval_metrics["accuracy"], + bq_model_eval_metrics["f1Score"], + bq_model_eval_metrics["logLoss"], + bq_model_eval_metrics["rocAuc"], + ] + ], + columns=utils.ML_CLASSFICATION_METRICS, + ) + pd.testing.assert_frame_equal( + bq_model_expected_eval_metrics, + bq_model_eval_metrics, + check_dtype=False, + check_index_type=False, + ) + + def test_logistic_regression_customized_params_fit_score( penguins_df_default_index, dataset_id ): diff --git a/third_party/bigframes_vendored/sklearn/ensemble/_forest.py b/third_party/bigframes_vendored/sklearn/ensemble/_forest.py index 1f6284c146..fb81bd6684 100644 --- a/third_party/bigframes_vendored/sklearn/ensemble/_forest.py +++ b/third_party/bigframes_vendored/sklearn/ensemble/_forest.py @@ -54,6 +54,13 @@ def fit(self, X, y): Series or DataFrame of shape (n_samples,) or (n_samples, n_targets). Target values. Will be cast to X's dtype if necessary. + X_eval (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): + Series or DataFrame of shape (n_samples, n_features). Evaluation data. + + y_eval (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): + Series or DataFrame of shape (n_samples,) or (n_samples, n_targets). + Evaluation target values. Will be cast to X_eval's dtype if necessary. + Returns: ForestModel: Fitted estimator. diff --git a/third_party/bigframes_vendored/sklearn/linear_model/_base.py b/third_party/bigframes_vendored/sklearn/linear_model/_base.py index fa8f28a656..d6b8a473bd 100644 --- a/third_party/bigframes_vendored/sklearn/linear_model/_base.py +++ b/third_party/bigframes_vendored/sklearn/linear_model/_base.py @@ -108,6 +108,13 @@ def fit( Series or DataFrame of shape (n_samples,) or (n_samples, n_targets). Target values. Will be cast to X's dtype if necessary. + X_eval (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): + Series or DataFrame of shape (n_samples, n_features). Evaluation data. + + y_eval (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): + Series or DataFrame of shape (n_samples,) or (n_samples, n_targets). + Evaluation target values. Will be cast to X_eval's dtype if necessary. + Returns: LinearRegression: Fitted estimator. """ diff --git a/third_party/bigframes_vendored/sklearn/linear_model/_logistic.py b/third_party/bigframes_vendored/sklearn/linear_model/_logistic.py index f3419ba8a9..479be19596 100644 --- a/third_party/bigframes_vendored/sklearn/linear_model/_logistic.py +++ b/third_party/bigframes_vendored/sklearn/linear_model/_logistic.py @@ -79,6 +79,14 @@ def fit( y (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): DataFrame of shape (n_samples,). Target vector relative to X. + X_eval (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): + Series or DataFrame of shape (n_samples, n_features). Evaluation vector, + where `n_samples` is the number of samples and `n_features` is + the number of features. + + y_eval (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): + DataFrame of shape (n_samples,). Target vector relative to X_eval. + Returns: LogisticRegression: Fitted estimator. diff --git a/third_party/bigframes_vendored/xgboost/sklearn.py b/third_party/bigframes_vendored/xgboost/sklearn.py index da1396af02..60a22e83d0 100644 --- a/third_party/bigframes_vendored/xgboost/sklearn.py +++ b/third_party/bigframes_vendored/xgboost/sklearn.py @@ -37,6 +37,13 @@ def fit(self, X, y): DataFrame of shape (n_samples,) or (n_samples, n_targets). Target values. Will be cast to X's dtype if necessary. + X_eval (bigframes.dataframe.DataFrame or bigframes.series.Series): + Series or DataFrame of shape (n_samples, n_features). Evaluation data. + + y_eval (bigframes.dataframe.DataFrame or bigframes.series.Series): + DataFrame of shape (n_samples,) or (n_samples, n_targets). + Evaluation target values. Will be cast to X_eval's dtype if necessary. + Returns: XGBModel: Fitted estimator. """ From 499f24a5f22ce484db96eb09cd3a0ce972398d81 Mon Sep 17 00:00:00 2001 From: Huan Chen <142538604+Genesis929@users.noreply.github.com> Date: Mon, 28 Oct 2024 21:43:01 -0700 Subject: [PATCH 5/8] =?UTF-8?q?perf:=20improve=20series.unique=20performan?= =?UTF-8?q?ce=20and=20replace=20drop=5Fduplicates=20i=E2=80=A6=20(#1108)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * perf: improve series.unique performance and replace drop_duplicates in tpch test. * change variable name * update docstring and index. * update flag to control behavior. * update examples * delete empty line * keep order error * reduce query_count * improve q16 * benchmark updates --- bigframes/series.py | 13 ++++++-- .../bigframes_vendored/pandas/core/series.py | 18 +++++++++-- .../bigframes_vendored/tpch/queries/q10.py | 30 +++++++++++-------- .../bigframes_vendored/tpch/queries/q11.py | 12 ++++++-- .../bigframes_vendored/tpch/queries/q15.py | 9 ++++-- .../bigframes_vendored/tpch/queries/q16.py | 14 ++++++--- .../bigframes_vendored/tpch/queries/q17.py | 4 +-- .../bigframes_vendored/tpch/queries/q20.py | 7 ++--- .../bigframes_vendored/tpch/queries/q22.py | 17 ++++++----- .../bigframes_vendored/tpch/queries/q4.py | 5 +--- 10 files changed, 84 insertions(+), 45 deletions(-) diff --git a/bigframes/series.py b/bigframes/series.py index 215f4473ee..d311742861 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1609,9 +1609,16 @@ def drop_duplicates(self, *, keep: str = "first") -> Series: block = block_ops.drop_duplicates(self._block, (self._value_column,), keep) return Series(block) - @validations.requires_ordering() - def unique(self) -> Series: - return self.drop_duplicates() + def unique(self, keep_order=True) -> Series: + if keep_order: + validations.enforce_ordered(self, "unique(keep_order != False)") + return self.drop_duplicates() + block, result = self._block.aggregate( + [self._value_column], + [(self._value_column, agg_ops.AnyValueOp())], + dropna=False, + ) + return Series(block.select_columns(result).reset_index()) def duplicated(self, keep: str = "first") -> Series: if keep is not False: diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 845d623e2a..a3b85205a9 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -645,13 +645,18 @@ def nunique(self) -> int: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def unique(self) -> Series: + def unique(self, keep_order=True) -> Series: """ Return unique values of Series object. - Uniques are returned in order of appearance. Hash table-based unique, + By default, uniques are returned in order of appearance. Hash table-based unique, therefore does NOT sort. + Args: + keep_order (bool, default True): + If True, preserves the order of the first appearance of each unique value. + If False, returns the elements in ascending order, which can be faster. + **Examples:** >>> import bigframes.pandas as bpd @@ -664,12 +669,21 @@ def unique(self) -> Series: 2 3 3 3 Name: A, dtype: Int64 + + Example with order preservation: Slower, but keeps order >>> s.unique() 0 2 1 1 2 3 Name: A, dtype: Int64 + Example without order preservation: Faster, but loses original order + >>> s.unique(keep_order=False) + 0 1 + 1 2 + 2 3 + Name: A, dtype: Int64 + Returns: Series: The unique values returned as a Series. """ diff --git a/third_party/bigframes_vendored/tpch/queries/q10.py b/third_party/bigframes_vendored/tpch/queries/q10.py index 75a8f2de7f..1650e9ca34 100644 --- a/third_party/bigframes_vendored/tpch/queries/q10.py +++ b/third_party/bigframes_vendored/tpch/queries/q10.py @@ -28,8 +28,6 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): var1 = date(1993, 10, 1) var2 = date(1994, 1, 1) - q_final = customer.merge - q_final = ( customer.merge(orders, left_on="C_CUSTKEY", right_on="O_CUSTKEY") .merge(lineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY") @@ -61,15 +59,21 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): as_index=False, ).agg(REVENUE=bpd.NamedAgg(column="INTERMEDIATE_REVENUE", aggfunc="sum")) - q_final[ - [ - "C_CUSTKEY", - "C_NAME", - "REVENUE", - "C_ACCTBAL", - "N_NAME", - "C_ADDRESS", - "C_PHONE", - "C_COMMENT", + q_final = ( + q_final[ + [ + "C_CUSTKEY", + "C_NAME", + "REVENUE", + "C_ACCTBAL", + "N_NAME", + "C_ADDRESS", + "C_PHONE", + "C_COMMENT", + ] ] - ].sort_values(by="REVENUE", ascending=False).head(20).to_gbq() + .sort_values(by="REVENUE", ascending=False) + .head(20) + ) + + q_final.to_gbq() diff --git a/third_party/bigframes_vendored/tpch/queries/q11.py b/third_party/bigframes_vendored/tpch/queries/q11.py index 484a7c0001..385393f781 100644 --- a/third_party/bigframes_vendored/tpch/queries/q11.py +++ b/third_party/bigframes_vendored/tpch/queries/q11.py @@ -30,10 +30,16 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): grouped["VALUE"] = grouped["VALUE"].round(2) - total_value = (filtered_df["PS_SUPPLYCOST"] * filtered_df["PS_AVAILQTY"]).sum() - threshold = total_value * 0.0001 + total_value = ( + (filtered_df["PS_SUPPLYCOST"] * filtered_df["PS_AVAILQTY"]).to_frame().sum() + ) + threshold = (total_value * 0.0001).rename("THRESHOLD") + + grouped = grouped.merge(threshold, how="cross") - result_df = grouped[grouped["VALUE"] > threshold] + result_df = grouped[grouped["VALUE"] > grouped["THRESHOLD"]].drop( + columns="THRESHOLD" + ) result_df = result_df.sort_values(by="VALUE", ascending=False) diff --git a/third_party/bigframes_vendored/tpch/queries/q15.py b/third_party/bigframes_vendored/tpch/queries/q15.py index 042adbda8b..adf37f9892 100644 --- a/third_party/bigframes_vendored/tpch/queries/q15.py +++ b/third_party/bigframes_vendored/tpch/queries/q15.py @@ -36,8 +36,13 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): supplier, grouped_revenue, left_on="S_SUPPKEY", right_on="SUPPLIER_NO" ) - max_revenue = joined_data["TOTAL_REVENUE"].max() - max_revenue_suppliers = joined_data[joined_data["TOTAL_REVENUE"] == max_revenue] + max_revenue = joined_data[["TOTAL_REVENUE"]].max().rename("MAX_REVENUE") + + joined_data = joined_data.merge(max_revenue, how="cross") + + max_revenue_suppliers = joined_data[ + joined_data["TOTAL_REVENUE"] == joined_data["MAX_REVENUE"] + ] max_revenue_suppliers["TOTAL_REVENUE"] = max_revenue_suppliers[ "TOTAL_REVENUE" diff --git a/third_party/bigframes_vendored/tpch/queries/q16.py b/third_party/bigframes_vendored/tpch/queries/q16.py index 1bd2795c42..79f42ec42c 100644 --- a/third_party/bigframes_vendored/tpch/queries/q16.py +++ b/third_party/bigframes_vendored/tpch/queries/q16.py @@ -20,16 +20,22 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): var1 = "Brand#45" - supplier = supplier[ - supplier["S_COMMENT"].str.contains("Customer.*Complaints", regex=True) - ]["S_SUPPKEY"] + supplier = ( + supplier[ + ~supplier["S_COMMENT"].str.contains("Customer.*Complaints", regex=True) + ]["S_SUPPKEY"] + .unique(keep_order=False) + .to_frame() + ) q_filtered = part.merge(partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY") q_filtered = q_filtered[q_filtered["P_BRAND"] != var1] q_filtered = q_filtered[~q_filtered["P_TYPE"].str.contains("MEDIUM POLISHED")] q_filtered = q_filtered[q_filtered["P_SIZE"].isin([49, 14, 23, 45, 19, 3, 36, 9])] - final_df = q_filtered[~q_filtered["PS_SUPPKEY"].isin(supplier)] + final_df = q_filtered.merge( + supplier, left_on=["PS_SUPPKEY"], right_on=["S_SUPPKEY"] + ) grouped = final_df.groupby(["P_BRAND", "P_TYPE", "P_SIZE"], as_index=False) result = grouped.agg( diff --git a/third_party/bigframes_vendored/tpch/queries/q17.py b/third_party/bigframes_vendored/tpch/queries/q17.py index 0bd1c44315..56289d57ad 100644 --- a/third_party/bigframes_vendored/tpch/queries/q17.py +++ b/third_party/bigframes_vendored/tpch/queries/q17.py @@ -33,8 +33,8 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): q_final = q_final[q_final["L_QUANTITY"] < q_final["AVG_QUANTITY"]] - q_final = bpd.DataFrame( - {"AVG_YEARLY": [(q_final["L_EXTENDEDPRICE"].sum() / 7.0).round(2)]} + q_final = ( + (q_final[["L_EXTENDEDPRICE"]].sum() / 7.0).round(2).to_frame(name="AVG_YEARLY") ) q_final.to_gbq() diff --git a/third_party/bigframes_vendored/tpch/queries/q20.py b/third_party/bigframes_vendored/tpch/queries/q20.py index 26651a31c4..fded5f5c97 100644 --- a/third_party/bigframes_vendored/tpch/queries/q20.py +++ b/third_party/bigframes_vendored/tpch/queries/q20.py @@ -46,7 +46,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): if not session._strictly_ordered: filtered_parts = filtered_parts[["P_PARTKEY"]].sort_values(by=["P_PARTKEY"]) - filtered_parts = filtered_parts[["P_PARTKEY"]].drop_duplicates() + filtered_parts = filtered_parts["P_PARTKEY"].unique(keep_order=False).to_frame() joined_parts = filtered_parts.merge( partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY" ) @@ -56,10 +56,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): ) final_filtered = final_join[final_join["PS_AVAILQTY"] > final_join["SUM_QUANTITY"]] - final_filtered = final_filtered[["PS_SUPPKEY"]] - if not session._strictly_ordered: - final_filtered = final_filtered.sort_values(by="PS_SUPPKEY") - final_filtered = final_filtered.drop_duplicates() + final_filtered = final_filtered["PS_SUPPKEY"].unique(keep_order=False).to_frame() final_result = final_filtered.merge(q3, left_on="PS_SUPPKEY", right_on="S_SUPPKEY") final_result = final_result[["S_NAME", "S_ADDRESS"]].sort_values(by="S_NAME") diff --git a/third_party/bigframes_vendored/tpch/queries/q22.py b/third_party/bigframes_vendored/tpch/queries/q22.py index 137a7d5c36..bc648ef392 100644 --- a/third_party/bigframes_vendored/tpch/queries/q22.py +++ b/third_party/bigframes_vendored/tpch/queries/q22.py @@ -18,13 +18,15 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): customer["CNTRYCODE"] = customer["C_PHONE"].str.slice(0, 2) - avg_acctbal = customer[ - (customer["CNTRYCODE"].isin(country_codes)) & (customer["C_ACCTBAL"] > 0) - ]["C_ACCTBAL"].mean() + avg_acctbal = ( + customer[ + (customer["CNTRYCODE"].isin(country_codes)) & (customer["C_ACCTBAL"] > 0) + ][["C_ACCTBAL"]] + .mean() + .rename("AVG_ACCTBAL") + ) - if not session._strictly_ordered: - orders = orders.sort_values(by="O_CUSTKEY") - orders_unique = orders.drop_duplicates(subset=["O_CUSTKEY"]) + orders_unique = orders["O_CUSTKEY"].unique(keep_order=False).to_frame() matched_customers = customer.merge( orders_unique, left_on="C_CUSTKEY", right_on="O_CUSTKEY" @@ -35,10 +37,11 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): matched_customers[["C_CUSTKEY", "IS_IN_ORDERS"]], on="C_CUSTKEY", how="left" ) customer["IS_IN_ORDERS"] = customer["IS_IN_ORDERS"].fillna(False) + customer = customer.merge(avg_acctbal, how="cross") filtered_customers = customer[ (customer["CNTRYCODE"].isin(country_codes)) - & (customer["C_ACCTBAL"] > avg_acctbal) + & (customer["C_ACCTBAL"] > customer["AVG_ACCTBAL"]) & (~customer["IS_IN_ORDERS"]) ] diff --git a/third_party/bigframes_vendored/tpch/queries/q4.py b/third_party/bigframes_vendored/tpch/queries/q4.py index b89f70845f..d149a71f71 100644 --- a/third_party/bigframes_vendored/tpch/queries/q4.py +++ b/third_party/bigframes_vendored/tpch/queries/q4.py @@ -26,10 +26,7 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): jn = jn[(jn["O_ORDERDATE"] >= var1) & (jn["O_ORDERDATE"] < var2)] jn = jn[jn["L_COMMITDATE"] < jn["L_RECEIPTDATE"]] - if not session._strictly_ordered: - jn = jn.sort_values(by=["O_ORDERPRIORITY", "L_ORDERKEY"]) - - jn = jn.drop_duplicates(subset=["O_ORDERPRIORITY", "L_ORDERKEY"]) + jn = jn.groupby(["O_ORDERPRIORITY", "L_ORDERKEY"], as_index=False).agg("size") gb = jn.groupby("O_ORDERPRIORITY", as_index=False) agg = gb.agg(ORDER_COUNT=bpd.NamedAgg(column="L_ORDERKEY", aggfunc="count")) From b5ca1d9fa471754a770de944a87bcd0563406574 Mon Sep 17 00:00:00 2001 From: Huan Chen <142538604+Genesis929@users.noreply.github.com> Date: Tue, 29 Oct 2024 11:56:10 -0700 Subject: [PATCH 6/8] chore: performance improvement for resample. (#1042) --- bigframes/core/compile/compiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 19c18798c0..1fa727780a 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -147,8 +147,8 @@ def compile_fromrange(self, node: nodes.FromRangeNode, ordered: bool = True): ).name("labels") labels = ( typing.cast(ibis.expr.types.ArrayValue, labels_array_table) - .unnest() .as_table() + .unnest(["labels"]) ) if ordered: return compiled.OrderedIR( From ca02cd4b87d354c1e01c670cd9d4e36fa74896f5 Mon Sep 17 00:00:00 2001 From: Chelsea Lin <124939984+chelsea-lin@users.noreply.github.com> Date: Tue, 29 Oct 2024 13:50:24 -0700 Subject: [PATCH 7/8] feat: add the `ground_with_google_search` option for GeminiTextGenerator predict (#1119) * feat: add option for GeminiTextGenerator predict * add pricing link to the warning message --- bigframes/ml/llm.py | 13 ++- bigframes/operations/semantics.py | 152 ++++++++++++++++++++++++++---- tests/system/load/test_llm.py | 20 ++++ 3 files changed, 164 insertions(+), 21 deletions(-) diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py index 5c5153e163..69f2f5bc62 100644 --- a/bigframes/ml/llm.py +++ b/bigframes/ml/llm.py @@ -913,6 +913,7 @@ def predict( max_output_tokens: int = 8192, top_k: int = 40, top_p: float = 1.0, + ground_with_google_search: bool = False, ) -> bpd.DataFrame: """Predict the result from input DataFrame. @@ -936,11 +937,20 @@ def predict( Specify a lower value for less random responses and a higher value for more random responses. Default 40. Possible values [1, 40]. - top_p (float, default 0.95):: + top_p (float, default 0.95): Top-P changes how the model selects tokens for output. Tokens are selected from the most (see top-K) to least probable until the sum of their probabilities equals the top-P value. For example, if tokens A, B, and C have a probability of 0.3, 0.2, and 0.1 and the top-P value is 0.5, then the model will select either A or B as the next token by using temperature and excludes C as a candidate. Specify a lower value for less random responses and a higher value for more random responses. Default 1.0. Possible values [0.0, 1.0]. + ground_with_google_search (bool, default False): + Enables Grounding with Google Search for the Vertex AI model. When set + to True, the model incorporates relevant information from Google Search + results into its responses, enhancing their accuracy and factualness. + This feature provides an additional column, `ml_generate_text_grounding_result`, + in the response output, detailing the sources used for grounding. + Note: Using this feature may impact billing costs. Refer to the pricing + page for details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models + The default is `False`. Returns: bigframes.dataframe.DataFrame: DataFrame of shape (n_samples, n_input_columns + n_prediction_columns). Returns predicted values. @@ -974,6 +984,7 @@ def predict( "top_k": top_k, "top_p": top_p, "flatten_json_output": True, + "ground_with_google_search": ground_with_google_search, } df = self._bqml_model.generate_text(X, options) diff --git a/bigframes/operations/semantics.py b/bigframes/operations/semantics.py index 3e0be74889..46ed5b8b3e 100644 --- a/bigframes/operations/semantics.py +++ b/bigframes/operations/semantics.py @@ -16,6 +16,7 @@ import re import typing from typing import List, Optional +import warnings import numpy as np @@ -39,6 +40,7 @@ def agg( model, cluster_column: typing.Optional[str] = None, max_agg_rows: int = 10, + ground_with_google_search: bool = False, ): """ Performs an aggregation over all rows of the table. @@ -90,6 +92,14 @@ def agg( max_agg_rows (int, default 10): The maxinum number of rows to be aggregated at a time. + ground_with_google_search (bool, default False): + Enables Grounding with Google Search for the GeminiTextGenerator model. + When set to True, the model incorporates relevant information from Google + Search results into its responses, enhancing their accuracy and factualness. + Note: Using this feature may impact billing costs. Refer to the pricing + page for details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models + The default is `False`. + Returns: bigframes.dataframe.DataFrame: A new DataFrame with the aggregated answers. @@ -119,6 +129,12 @@ def agg( ) column = columns[0] + if ground_with_google_search: + warnings.warn( + "Enables Grounding with Google Search may impact billing cost. See pricing " + "details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models" + ) + if max_agg_rows <= 1: raise ValueError( f"Invalid value for `max_agg_rows`: {max_agg_rows}." @@ -191,7 +207,12 @@ def agg( # Run model predict_df = typing.cast( - bigframes.dataframe.DataFrame, model.predict(prompt_s, temperature=0.0) + bigframes.dataframe.DataFrame, + model.predict( + prompt_s, + temperature=0.0, + ground_with_google_search=ground_with_google_search, + ), ) agg_df[column] = predict_df["ml_generate_text_llm_result"].combine_first( single_row_df @@ -284,7 +305,7 @@ def cluster_by( df[output_column] = clustered_result["CENTROID_ID"] return df - def filter(self, instruction: str, model): + def filter(self, instruction: str, model, ground_with_google_search: bool = False): """ Filters the DataFrame with the semantics of the user instruction. @@ -305,18 +326,26 @@ def filter(self, instruction: str, model): [1 rows x 2 columns] Args: - instruction: + instruction (str): An instruction on how to filter the data. This value must contain column references by name, which should be wrapped in a pair of braces. For example, if you have a column "food", you can refer to this column in the instructions like: "The {food} is healthy." - model: + model (bigframes.ml.llm.GeminiTextGenerator): A GeminiTextGenerator provided by Bigframes ML package. + ground_with_google_search (bool, default False): + Enables Grounding with Google Search for the GeminiTextGenerator model. + When set to True, the model incorporates relevant information from Google + Search results into its responses, enhancing their accuracy and factualness. + Note: Using this feature may impact billing costs. Refer to the pricing + page for details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models + The default is `False`. + Returns: - DataFrame filtered by the instruction. + bigframes.pandas.DataFrame: DataFrame filtered by the instruction. Raises: NotImplementedError: when the semantic operator experiment is off. @@ -332,6 +361,12 @@ def filter(self, instruction: str, model): if column not in self._df.columns: raise ValueError(f"Column {column} not found.") + if ground_with_google_search: + warnings.warn( + "Enables Grounding with Google Search may impact billing cost. See pricing " + "details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models" + ) + df: bigframes.dataframe.DataFrame = self._df[columns].copy() for column in columns: if df[column].dtype != dtypes.STRING_DTYPE: @@ -345,6 +380,7 @@ def filter(self, instruction: str, model): model.predict( self._make_prompt(df, columns, user_instruction, output_instruction), temperature=0.0, + ground_with_google_search=ground_with_google_search, ), ) @@ -352,7 +388,13 @@ def filter(self, instruction: str, model): results["ml_generate_text_llm_result"].str.lower().str.contains("true") ] - def map(self, instruction: str, output_column: str, model): + def map( + self, + instruction: str, + output_column: str, + model, + ground_with_google_search: bool = False, + ): """ Maps the DataFrame with the semantics of the user instruction. @@ -376,21 +418,29 @@ def map(self, instruction: str, output_column: str, model): [2 rows x 3 columns] Args: - instruction: + instruction (str): An instruction on how to map the data. This value must contain column references by name, which should be wrapped in a pair of braces. For example, if you have a column "food", you can refer to this column in the instructions like: "Get the ingredients of {food}." - output_column: + output_column (str): The column name of the mapping result. - model: + model (bigframes.ml.llm.GeminiTextGenerator): A GeminiTextGenerator provided by Bigframes ML package. + ground_with_google_search (bool, default False): + Enables Grounding with Google Search for the GeminiTextGenerator model. + When set to True, the model incorporates relevant information from Google + Search results into its responses, enhancing their accuracy and factualness. + Note: Using this feature may impact billing costs. Refer to the pricing + page for details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models + The default is `False`. + Returns: - DataFrame with attached mapping results. + bigframes.pandas.DataFrame: DataFrame with attached mapping results. Raises: NotImplementedError: when the semantic operator experiment is off. @@ -406,6 +456,12 @@ def map(self, instruction: str, output_column: str, model): if column not in self._df.columns: raise ValueError(f"Column {column} not found.") + if ground_with_google_search: + warnings.warn( + "Enables Grounding with Google Search may impact billing cost. See pricing " + "details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models" + ) + df: bigframes.dataframe.DataFrame = self._df[columns].copy() for column in columns: if df[column].dtype != dtypes.STRING_DTYPE: @@ -421,6 +477,7 @@ def map(self, instruction: str, output_column: str, model): model.predict( self._make_prompt(df, columns, user_instruction, output_instruction), temperature=0.0, + ground_with_google_search=ground_with_google_search, )["ml_generate_text_llm_result"], ) @@ -428,7 +485,14 @@ def map(self, instruction: str, output_column: str, model): return concat([self._df, results.rename(output_column)], axis=1) - def join(self, other, instruction: str, model, max_rows: int = 1000): + def join( + self, + other, + instruction: str, + model, + max_rows: int = 1000, + ground_with_google_search: bool = False, + ): """ Joines two dataframes by applying the instruction over each pair of rows from the left and right table. @@ -455,10 +519,10 @@ def join(self, other, instruction: str, model, max_rows: int = 1000): [4 rows x 2 columns] Args: - other: + other (bigframes.pandas.DataFrame): The other dataframe. - instruction: + instruction (str): An instruction on how left and right rows can be joined. This value must contain column references by name. which should be wrapped in a pair of braces. For example: "The {city} belongs to the {country}". @@ -467,15 +531,23 @@ def join(self, other, instruction: str, model, max_rows: int = 1000): self joins. For example: "The {left.employee_name} reports to {right.employee_name}" For unique column names, this prefix is optional. - model: + model (bigframes.ml.llm.GeminiTextGenerator): A GeminiTextGenerator provided by Bigframes ML package. - max_rows: + max_rows (int, default 1000): The maximum number of rows allowed to be sent to the model per call. If the result is too large, the method call will end early with an error. + ground_with_google_search (bool, default False): + Enables Grounding with Google Search for the GeminiTextGenerator model. + When set to True, the model incorporates relevant information from Google + Search results into its responses, enhancing their accuracy and factualness. + Note: Using this feature may impact billing costs. Refer to the pricing + page for details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models + The default is `False`. + Returns: - The joined dataframe. + bigframes.pandas.DataFrame: The joined dataframe. Raises: ValueError if the amount of data that will be sent for LLM processing is larger than max_rows. @@ -483,6 +555,12 @@ def join(self, other, instruction: str, model, max_rows: int = 1000): self._validate_model(model) columns = self._parse_columns(instruction) + if ground_with_google_search: + warnings.warn( + "Enables Grounding with Google Search may impact billing cost. See pricing " + "details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models" + ) + joined_table_rows = len(self._df) * len(other) if joined_table_rows > max_rows: @@ -545,7 +623,9 @@ def join(self, other, instruction: str, model, max_rows: int = 1000): joined_df = self._df.merge(other, how="cross", suffixes=("_left", "_right")) - return joined_df.semantics.filter(instruction, model).reset_index(drop=True) + return joined_df.semantics.filter( + instruction, model, ground_with_google_search=ground_with_google_search + ).reset_index(drop=True) def search( self, @@ -644,7 +724,13 @@ def search( return typing.cast(bigframes.dataframe.DataFrame, search_result) - def top_k(self, instruction: str, model, k=10): + def top_k( + self, + instruction: str, + model, + k: int = 10, + ground_with_google_search: bool = False, + ): """ Ranks each tuple and returns the k best according to the instruction. @@ -682,6 +768,14 @@ def top_k(self, instruction: str, model, k=10): k (int, default 10): The number of rows to return. + ground_with_google_search (bool, default False): + Enables Grounding with Google Search for the GeminiTextGenerator model. + When set to True, the model incorporates relevant information from Google + Search results into its responses, enhancing their accuracy and factualness. + Note: Using this feature may impact billing costs. Refer to the pricing + page for details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models + The default is `False`. + Returns: bigframes.dataframe.DataFrame: A new DataFrame with the top k rows. @@ -703,6 +797,12 @@ def top_k(self, instruction: str, model, k=10): "Semantic aggregations are limited to a single column." ) + if ground_with_google_search: + warnings.warn( + "Enables Grounding with Google Search may impact billing cost. See pricing " + "details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models" + ) + df: bigframes.dataframe.DataFrame = self._df[columns].copy() column = columns[0] if df[column].dtype != dtypes.STRING_DTYPE: @@ -743,6 +843,7 @@ def top_k(self, instruction: str, model, k=10): user_instruction, model, k - num_selected, + ground_with_google_search, ) num_selected += num_new_selected @@ -757,7 +858,13 @@ def top_k(self, instruction: str, model, k=10): @staticmethod def _topk_partition( - df, column: str, status_column: str, user_instruction: str, model, k + df, + column: str, + status_column: str, + user_instruction: str, + model, + k: int, + ground_with_google_search: bool, ): output_instruction = ( "Given a question and two documents, choose the document that best answers " @@ -784,7 +891,12 @@ def _topk_partition( import bigframes.dataframe predict_df = typing.cast( - bigframes.dataframe.DataFrame, model.predict(prompt_s, temperature=0.0) + bigframes.dataframe.DataFrame, + model.predict( + prompt_s, + temperature=0.0, + ground_with_google_search=ground_with_google_search, + ), ) marks = predict_df["ml_generate_text_llm_result"].str.contains("2") diff --git a/tests/system/load/test_llm.py b/tests/system/load/test_llm.py index 4b0f50973b..da583f41db 100644 --- a/tests/system/load/test_llm.py +++ b/tests/system/load/test_llm.py @@ -68,6 +68,26 @@ def test_llm_gemini_configure_fit(llm_fine_tune_df_default_index, llm_remote_tex # TODO(ashleyxu b/335492787): After bqml rolled out version control: save, load, check parameters to ensure configuration was kept +@pytest.mark.flaky(retries=2) +def test_llm_gemini_w_ground_with_google_search(llm_remote_text_df): + model = llm.GeminiTextGenerator(model_name="gemini-pro", max_iterations=1) + df = model.predict( + llm_remote_text_df["prompt"], + ground_with_google_search=True, + ).to_pandas() + utils.check_pandas_df_schema_and_index( + df, + columns=[ + "ml_generate_text_llm_result", + "ml_generate_text_rai_result", + "ml_generate_text_grounding_result", + "ml_generate_text_status", + "prompt", + ], + index=3, + ) + + # (b/366290533): Claude models are of extremely low capacity. The tests should reside in small tests. Moving these here just to protect BQML's shared capacity(as load test only runs once per day.) and make sure we still have minimum coverage. @pytest.mark.parametrize( "model_name", From 8962b4e1f6ba856e4e620e1c6730cba6da6c2b02 Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Tue, 29 Oct 2024 15:51:18 -0700 Subject: [PATCH 8/8] chore(main): release 1.25.0 (#1116) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> --- CHANGELOG.md | 15 +++++++++++++++ bigframes/version.py | 2 +- third_party/bigframes_vendored/version.py | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00f942f128..5f18459a24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,21 @@ [1]: https://pypi.org/project/bigframes/#history +## [1.25.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.24.0...v1.25.0) (2024-10-29) + + +### Features + +* Add the `ground_with_google_search` option for GeminiTextGenerator predict ([#1119](https://github.com/googleapis/python-bigquery-dataframes/issues/1119)) ([ca02cd4](https://github.com/googleapis/python-bigquery-dataframes/commit/ca02cd4b87d354c1e01c670cd9d4e36fa74896f5)) +* Add warning when user tries to access struct series fields with `__getitem__` ([#1082](https://github.com/googleapis/python-bigquery-dataframes/issues/1082)) ([20e5c58](https://github.com/googleapis/python-bigquery-dataframes/commit/20e5c58868af8b18595d5635cb7722da4f622eb5)) +* Allow `fit` to take additional eval data in linear and ensemble models ([#1096](https://github.com/googleapis/python-bigquery-dataframes/issues/1096)) ([254875c](https://github.com/googleapis/python-bigquery-dataframes/commit/254875c25f39df4bc477e1ed7339ecb30b395ab6)) +* Support context manager for bigframes session ([#1107](https://github.com/googleapis/python-bigquery-dataframes/issues/1107)) ([5f7b8b1](https://github.com/googleapis/python-bigquery-dataframes/commit/5f7b8b189c093629d176ffc99364767dc766397a)) + + +### Performance Improvements + +* Improve series.unique performance and replace drop_duplicates i… ([#1108](https://github.com/googleapis/python-bigquery-dataframes/issues/1108)) ([499f24a](https://github.com/googleapis/python-bigquery-dataframes/commit/499f24a5f22ce484db96eb09cd3a0ce972398d81)) + ## [1.24.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.23.0...v1.24.0) (2024-10-24) diff --git a/bigframes/version.py b/bigframes/version.py index 501aa2bd9d..1a818f9057 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.24.0" +__version__ = "1.25.0" diff --git a/third_party/bigframes_vendored/version.py b/third_party/bigframes_vendored/version.py index 501aa2bd9d..1a818f9057 100644 --- a/third_party/bigframes_vendored/version.py +++ b/third_party/bigframes_vendored/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.24.0" +__version__ = "1.25.0"