diff --git a/bigframes/ml/compose.py b/bigframes/ml/compose.py index 6d4fa5b76d..cd233589d6 100644 --- a/bigframes/ml/compose.py +++ b/bigframes/ml/compose.py @@ -18,16 +18,21 @@ from __future__ import annotations +import re +import types import typing -from typing import List, Optional, Tuple, Union +from typing import cast, List, Optional, Tuple, Union import bigframes_vendored.sklearn.compose._column_transformer +from google.cloud import bigquery +import bigframes +from bigframes import constants from bigframes.core import log_adapter from bigframes.ml import base, core, globals, preprocessing, utils import bigframes.pandas as bpd -CompilablePreprocessorType = Union[ +_PREPROCESSING_TYPES = Union[ preprocessing.OneHotEncoder, preprocessing.StandardScaler, preprocessing.MaxAbsScaler, @@ -36,6 +41,17 @@ preprocessing.LabelEncoder, ] +_BQML_TRANSFROM_TYPE_MAPPING = types.MappingProxyType( + { + "ML.STANDARD_SCALER": preprocessing.StandardScaler, + "ML.ONE_HOT_ENCODER": preprocessing.OneHotEncoder, + "ML.MAX_ABS_SCALER": preprocessing.MaxAbsScaler, + "ML.MIN_MAX_SCALER": preprocessing.MinMaxScaler, + "ML.BUCKETIZE": preprocessing.KBinsDiscretizer, + "ML.LABEL_ENCODER": preprocessing.LabelEncoder, + } +) + @log_adapter.class_logger class ColumnTransformer( @@ -51,7 +67,7 @@ def __init__( transformers: List[ Tuple[ str, - CompilablePreprocessorType, + _PREPROCESSING_TYPES, Union[str, List[str]], ] ], @@ -66,12 +82,12 @@ def __init__( @property def transformers_( self, - ) -> List[Tuple[str, CompilablePreprocessorType, str,]]: + ) -> List[Tuple[str, _PREPROCESSING_TYPES, str,]]: """The collection of transformers as tuples of (name, transformer, column).""" result: List[ Tuple[ str, - CompilablePreprocessorType, + _PREPROCESSING_TYPES, str, ] ] = [] @@ -89,6 +105,96 @@ def transformers_( return result + @classmethod + def _from_bq( + cls, session: bigframes.Session, model: bigquery.Model + ) -> ColumnTransformer: + col_transformer = cls._extract_from_bq_model(model) + col_transformer._bqml_model = core.BqmlModel(session, model) + + return col_transformer + + @classmethod + def _extract_from_bq_model( + cls, + bq_model: bigquery.Model, + ) -> ColumnTransformer: + """Extract transformers as ColumnTransformer obj from a BQ Model. Keep the _bqml_model field as None.""" + assert "transformColumns" in bq_model._properties + + transformers: List[ + Tuple[ + str, + _PREPROCESSING_TYPES, + Union[str, List[str]], + ] + ] = [] + + def camel_to_snake(name): + name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name) + return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower() + + for transform_col in bq_model._properties["transformColumns"]: + # pass the columns that are not transformed + if "transformSql" not in transform_col: + continue + transform_sql: str = cast(dict, transform_col)["transformSql"] + if not transform_sql.startswith("ML."): + continue + + found_transformer = False + for prefix in _BQML_TRANSFROM_TYPE_MAPPING: + if transform_sql.startswith(prefix): + transformer_cls = _BQML_TRANSFROM_TYPE_MAPPING[prefix] + transformers.append( + ( + camel_to_snake(transformer_cls.__name__), + *transformer_cls._parse_from_sql(transform_sql), # type: ignore + ) + ) + + found_transformer = True + break + if not found_transformer: + raise NotImplementedError( + f"Unsupported transformer type. {constants.FEEDBACK_LINK}" + ) + + return cls(transformers=transformers) + + def _merge( + self, bq_model: bigquery.Model + ) -> Union[ + ColumnTransformer, + preprocessing.StandardScaler, + preprocessing.OneHotEncoder, + preprocessing.MaxAbsScaler, + preprocessing.MinMaxScaler, + preprocessing.KBinsDiscretizer, + preprocessing.LabelEncoder, + ]: + """Try to merge the column transformer to a simple transformer. Depends on all the columns in bq_model are transformed with the same transformer.""" + transformers = self.transformers_ + + assert len(transformers) > 0 + _, transformer_0, column_0 = transformers[0] + columns = [column_0] + for _, transformer, column in transformers[1:]: + # all transformers are the same + if transformer != transformer_0: + return self + columns.append(column) + # all feature columns are transformed + if sorted( + [ + cast(str, feature_column.name) + for feature_column in bq_model.feature_columns + ] + ) == sorted(columns): + return transformer_0 + + return self + def _compile_to_sql( self, columns: List[str], @@ -143,3 +249,20 @@ def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame: bpd.DataFrame, df[self._output_names], ) + + def to_gbq(self, model_name: str, replace: bool = False) -> ColumnTransformer: + """Save the transformer as a BigQuery model. + + Args: + model_name (str): + the name of the model. + replace (bool, default False): + whether to replace if the model already exists. Default to False. + + Returns: + ColumnTransformer: saved model.""" + if not self._bqml_model: + raise RuntimeError("A transformer must be fitted before it can be saved") + + new_model = self._bqml_model.copy(model_name, replace) + return new_model.session.read_gbq_model(model_name) diff --git a/bigframes/ml/loader.py b/bigframes/ml/loader.py index 31912a0129..508003a98d 100644 --- a/bigframes/ml/loader.py +++ b/bigframes/ml/loader.py @@ -23,6 +23,7 @@ import bigframes.constants as constants from bigframes.ml import ( cluster, + compose, decomposition, ensemble, forecasting, @@ -79,6 +80,7 @@ def from_bq( llm.PaLM2TextGenerator, llm.PaLM2TextEmbeddingGenerator, pipeline.Pipeline, + compose.ColumnTransformer, ]: """Load a BQML model to BigQuery DataFrames ML. @@ -89,22 +91,32 @@ def from_bq( Returns: A BigQuery DataFrames ML model object. """ + # TODO(garrettwu): the entire condition only to TRANSFORM_ONLY when b/331679273 is fixed. + if ( + bq_model.model_type == "TRANSFORM_ONLY" + or bq_model.model_type == "MODEL_TYPE_UNSPECIFIED" + and "transformColumns" in bq_model._properties + and not _is_bq_model_remote(bq_model) + ): + return _transformer_from_bq(session, bq_model) + if _is_bq_model_pipeline(bq_model): return pipeline.Pipeline._from_bq(session, bq_model) return _model_from_bq(session, bq_model) +def _transformer_from_bq(session: bigframes.Session, bq_model: bigquery.Model): + # TODO(garrettwu): add other transformers + return compose.ColumnTransformer._from_bq(session, bq_model) + + def _model_from_bq(session: bigframes.Session, bq_model: bigquery.Model): if bq_model.model_type in _BQML_MODEL_TYPE_MAPPING: return _BQML_MODEL_TYPE_MAPPING[bq_model.model_type]._from_bq( # type: ignore session=session, model=bq_model ) - if ( - bq_model.model_type == "MODEL_TYPE_UNSPECIFIED" - and "remoteModelInfo" in bq_model._properties - and "endpoint" in bq_model._properties["remoteModelInfo"] - ): + if _is_bq_model_remote(bq_model): # Parse the remote model endpoint bqml_endpoint = bq_model._properties["remoteModelInfo"]["endpoint"] model_endpoint = bqml_endpoint.split("/")[-1] @@ -121,3 +133,11 @@ def _model_from_bq(session: bigframes.Session, bq_model: bigquery.Model): def _is_bq_model_pipeline(bq_model: bigquery.Model) -> bool: return "transformColumns" in bq_model._properties + + +def _is_bq_model_remote(bq_model: bigquery.Model) -> bool: + return ( + bq_model.model_type == "MODEL_TYPE_UNSPECIFIED" + and "remoteModelInfo" in bq_model._properties + and "endpoint" in bq_model._properties["remoteModelInfo"] + ) diff --git a/bigframes/ml/pipeline.py b/bigframes/ml/pipeline.py index 9289b613b8..92a3bae77d 100644 --- a/bigframes/ml/pipeline.py +++ b/bigframes/ml/pipeline.py @@ -18,7 +18,7 @@ from __future__ import annotations -from typing import cast, List, Optional, Tuple, Union +from typing import List, Optional, Tuple, Union import bigframes_vendored.sklearn.pipeline from google.cloud import bigquery @@ -83,8 +83,8 @@ def __init__(self, steps: List[Tuple[str, base.BaseEstimator]]): @classmethod def _from_bq(cls, session: bigframes.Session, bq_model: bigquery.Model) -> Pipeline: - col_transformer = _extract_as_column_transformer(bq_model) - transform = _merge_column_transformer(bq_model, col_transformer) + col_transformer = compose.ColumnTransformer._extract_from_bq_model(bq_model) + transform = col_transformer._merge(bq_model) estimator = loader._model_from_bq(session, bq_model) return cls([("transform", transform), ("estimator", estimator)]) @@ -138,110 +138,3 @@ def to_gbq(self, model_name: str, replace: bool = False) -> Pipeline: new_model = self._estimator._bqml_model.copy(model_name, replace) return new_model.session.read_gbq_model(model_name) - - -def _extract_as_column_transformer( - bq_model: bigquery.Model, -) -> compose.ColumnTransformer: - """Extract transformers as ColumnTransformer obj from a BQ Model.""" - assert "transformColumns" in bq_model._properties - - transformers: List[ - Tuple[ - str, - Union[ - preprocessing.OneHotEncoder, - preprocessing.StandardScaler, - preprocessing.MaxAbsScaler, - preprocessing.MinMaxScaler, - preprocessing.KBinsDiscretizer, - preprocessing.LabelEncoder, - ], - Union[str, List[str]], - ] - ] = [] - for transform_col in bq_model._properties["transformColumns"]: - # pass the columns that are not transformed - if "transformSql" not in transform_col: - continue - - transform_sql: str = cast(dict, transform_col)["transformSql"] - if transform_sql.startswith("ML.STANDARD_SCALER"): - transformers.append( - ( - "standard_scaler", - *preprocessing.StandardScaler._parse_from_sql(transform_sql), - ) - ) - elif transform_sql.startswith("ML.ONE_HOT_ENCODER"): - transformers.append( - ( - "ont_hot_encoder", - *preprocessing.OneHotEncoder._parse_from_sql(transform_sql), - ) - ) - elif transform_sql.startswith("ML.MAX_ABS_SCALER"): - transformers.append( - ( - "max_abs_scaler", - *preprocessing.MaxAbsScaler._parse_from_sql(transform_sql), - ) - ) - elif transform_sql.startswith("ML.MIN_MAX_SCALER"): - transformers.append( - ( - "min_max_scaler", - *preprocessing.MinMaxScaler._parse_from_sql(transform_sql), - ) - ) - elif transform_sql.startswith("ML.BUCKETIZE"): - transformers.append( - ( - "k_bins_discretizer", - *preprocessing.KBinsDiscretizer._parse_from_sql(transform_sql), - ) - ) - elif transform_sql.startswith("ML.LABEL_ENCODER"): - transformers.append( - ( - "label_encoder", - *preprocessing.LabelEncoder._parse_from_sql(transform_sql), - ) - ) - else: - raise NotImplementedError( - f"Unsupported transformer type. {constants.FEEDBACK_LINK}" - ) - - return compose.ColumnTransformer(transformers=transformers) - - -def _merge_column_transformer( - bq_model: bigquery.Model, column_transformer: compose.ColumnTransformer -) -> Union[ - compose.ColumnTransformer, - preprocessing.StandardScaler, - preprocessing.OneHotEncoder, - preprocessing.MaxAbsScaler, - preprocessing.MinMaxScaler, - preprocessing.KBinsDiscretizer, - preprocessing.LabelEncoder, -]: - """Try to merge the column transformer to a simple transformer.""" - transformers = column_transformer.transformers_ - - assert len(transformers) > 0 - _, transformer_0, column_0 = transformers[0] - columns = [column_0] - for _, transformer, column in transformers[1:]: - # all transformers are the same - if transformer != transformer_0: - return column_transformer - columns.append(column) - # all feature columns are transformed - if sorted( - [cast(str, feature_column.name) for feature_column in bq_model.feature_columns] - ) == sorted(columns): - return transformer_0 - - return column_transformer diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 479b3a7bac..515b74bd7f 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -952,7 +952,7 @@ def read_gbq_model(self, model_name: str): to load from the default project. Returns: - A bigframes.ml Model wrapping the model. + A bigframes.ml Model, Transformer or Pipeline wrapping the model. """ import bigframes.ml.loader diff --git a/tests/system/large/ml/test_compose.py b/tests/system/large/ml/test_compose.py index bb9a4d8f64..d7c49ca95a 100644 --- a/tests/system/large/ml/test_compose.py +++ b/tests/system/large/ml/test_compose.py @@ -14,31 +14,27 @@ import pandas -import bigframes.ml.cluster -import bigframes.ml.compose -import bigframes.ml.linear_model -import bigframes.ml.pipeline -import bigframes.ml.preprocessing +from bigframes.ml import compose, preprocessing def test_columntransformer_standalone_fit_and_transform( penguins_df_default_index, new_penguins_df ): - transformer = bigframes.ml.compose.ColumnTransformer( + transformer = compose.ColumnTransformer( [ ( "onehot", - bigframes.ml.preprocessing.OneHotEncoder(), + preprocessing.OneHotEncoder(), "species", ), ( "starndard_scale", - bigframes.ml.preprocessing.StandardScaler(), + preprocessing.StandardScaler(), ["culmen_length_mm", "flipper_length_mm"], ), ( "min_max_scale", - bigframes.ml.preprocessing.MinMaxScaler(), + preprocessing.MinMaxScaler(), ["culmen_length_mm"], ), ] @@ -76,16 +72,16 @@ def test_columntransformer_standalone_fit_and_transform( def test_columntransformer_standalone_fit_transform(new_penguins_df): - transformer = bigframes.ml.compose.ColumnTransformer( + transformer = compose.ColumnTransformer( [ ( "onehot", - bigframes.ml.preprocessing.OneHotEncoder(), + preprocessing.OneHotEncoder(), "species", ), ( "standard_scale", - bigframes.ml.preprocessing.StandardScaler(), + preprocessing.StandardScaler(), ["culmen_length_mm", "flipper_length_mm"], ), ] @@ -118,3 +114,40 @@ def test_columntransformer_standalone_fit_transform(new_penguins_df): ) pandas.testing.assert_frame_equal(result, expected, rtol=0.1, check_dtype=False) + + +def test_columntransformer_save_load(new_penguins_df, dataset_id): + transformer = compose.ColumnTransformer( + [ + ( + "onehot", + preprocessing.OneHotEncoder(), + "species", + ), + ( + "standard_scale", + preprocessing.StandardScaler(), + ["culmen_length_mm", "flipper_length_mm"], + ), + ] + ) + transformer.fit( + new_penguins_df[["species", "culmen_length_mm", "flipper_length_mm"]] + ) + + reloaded_transformer = transformer.to_gbq( + f"{dataset_id}.temp_configured_model", replace=True + ) + + assert isinstance(reloaded_transformer, compose.ColumnTransformer) + + expected = [ + ( + "one_hot_encoder", + preprocessing.OneHotEncoder(max_categories=1000001, min_frequency=0), + "species", + ), + ("standard_scaler", preprocessing.StandardScaler(), "culmen_length_mm"), + ("standard_scaler", preprocessing.StandardScaler(), "flipper_length_mm"), + ] + assert reloaded_transformer.transformers_ == expected diff --git a/tests/system/large/ml/test_pipeline.py b/tests/system/large/ml/test_pipeline.py index c128469bd2..c460efa75f 100644 --- a/tests/system/large/ml/test_pipeline.py +++ b/tests/system/large/ml/test_pipeline.py @@ -646,7 +646,7 @@ def test_pipeline_columntransformer_to_gbq(penguins_df_default_index, dataset_id compose.ColumnTransformer( [ ( - "ont_hot_encoder", + "one_hot_encoder", preprocessing.OneHotEncoder( drop="most_frequent", min_frequency=5, @@ -699,7 +699,7 @@ def test_pipeline_columntransformer_to_gbq(penguins_df_default_index, dataset_id transformers = pl_loaded._transform.transformers_ expected = [ ( - "ont_hot_encoder", + "one_hot_encoder", preprocessing.OneHotEncoder( drop="most_frequent", max_categories=100, min_frequency=5 ),