From be1e7a701d285111e1d868aa18adc3f9c8892a63 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Thu, 2 May 2024 17:35:27 +0000 Subject: [PATCH 1/3] feat: add strategy =quantile in KBinsDiscretizer --- bigframes/ml/compose.py | 1 + bigframes/ml/preprocessing.py | 53 +++++++++++------ bigframes/ml/sql.py | 11 +++- tests/system/small/ml/test_preprocessing.py | 58 +++++++++++++++++++ tests/unit/ml/test_sql.py | 7 +++ .../sklearn/preprocessing/_discretization.py | 2 +- 6 files changed, 112 insertions(+), 20 deletions(-) diff --git a/bigframes/ml/compose.py b/bigframes/ml/compose.py index 89969f23e7..77bfd76bde 100644 --- a/bigframes/ml/compose.py +++ b/bigframes/ml/compose.py @@ -38,6 +38,7 @@ "ML.MAX_ABS_SCALER": preprocessing.MaxAbsScaler, "ML.MIN_MAX_SCALER": preprocessing.MinMaxScaler, "ML.BUCKETIZE": preprocessing.KBinsDiscretizer, + "ML.QUANTILE_BUCKETIZE": preprocessing.KBinsDiscretizer, "ML.LABEL_ENCODER": preprocessing.LabelEncoder, } ) diff --git a/bigframes/ml/preprocessing.py b/bigframes/ml/preprocessing.py index 673ee27db0..bc5b765151 100644 --- a/bigframes/ml/preprocessing.py +++ b/bigframes/ml/preprocessing.py @@ -290,10 +290,6 @@ def __init__( n_bins: int = 5, strategy: Literal["uniform", "quantile"] = "quantile", ): - if strategy != "uniform": - raise NotImplementedError( - f"Only strategy = 'uniform' is supported now, input is {strategy}." - ) if n_bins < 2: raise ValueError( f"n_bins has to be larger than or equal to 2, input is {n_bins}." @@ -337,15 +333,31 @@ def _compile_to_sql( min_value + i * bin_size for i in range(self.n_bins - 1) ] - return [ - ( - self._base_sql_generator.ml_bucketize( - column, array_split_points[column], f"kbinsdiscretizer_{column}" - ), - f"kbinsdiscretizer_{column}", - ) - for column in columns - ] + return [ + ( + self._base_sql_generator.ml_bucketize( + column, array_split_points[column], f"kbinsdiscretizer_{column}" + ), + f"kbinsdiscretizer_{column}", + ) + for column in columns + ] + + if self.strategy == "quantile": + # quantile_size = 1 / self.n_bins + # quantile_df = X.quantile([quantile_size + i * quantile_size for i in range(self.n_bins - 1)]) + # for column in columns: + # array_split_points[column] = quantile_df[column].to_list() + + return [ + ( + self._base_sql_generator.ml_quantile_bucketize( + column, self.n_bins, f"kbinsdiscretizer_{column}" + ), + f"kbinsdiscretizer_{column}", + ) + for column in columns + ] @classmethod def _parse_from_sql(cls, sql: str) -> tuple[KBinsDiscretizer, str]: @@ -356,11 +368,16 @@ def _parse_from_sql(cls, sql: str) -> tuple[KBinsDiscretizer, str]: Returns: tuple(KBinsDiscretizer, column_label)""" - s = sql[sql.find("(") + 1 : sql.find(")")] - array_split_points = s[s.find("[") + 1 : s.find("]")] - col_label = s[: s.find(",")] - n_bins = array_split_points.count(",") + 2 - return cls(n_bins, "uniform"), col_label + if sql.startswith("ML.QUANTILE_BUCKETIZE"): + s = sql[sql.find("(") + 1 : sql.find(")")] + col_label, num_bins = s.split(", ") + return cls(int(num_bins), "quantile"), col_label + else: + s = sql[sql.find("(") + 1 : sql.find(")")] + array_split_points = s[s.find("[") + 1 : s.find("]")] + col_label = s[: s.find(",")] + n_bins = array_split_points.count(",") + 2 + return cls(n_bins, "uniform"), col_label def fit( self, diff --git a/bigframes/ml/sql.py b/bigframes/ml/sql.py index ea693e3437..b701ab301c 100644 --- a/bigframes/ml/sql.py +++ b/bigframes/ml/sql.py @@ -109,9 +109,18 @@ def ml_bucketize( array_split_points: Iterable[Union[int, float]], name: str, ) -> str: - """Encode ML.MIN_MAX_SCALER for BQML""" + """Encode ML.BUCKETIZE for BQML""" return f"""ML.BUCKETIZE({numeric_expr_sql}, {array_split_points}, FALSE) AS {name}""" + def ml_quantile_bucketize( + self, + numeric_expr_sql: str, + num_bucket: int, + name: str, + ) -> str: + """Encode ML.QUANTILE_BUCKETIZE for BQML""" + return f"""ML.QUANTILE_BUCKETIZE({numeric_expr_sql}, {num_bucket}) OVER() AS {name}""" + def ml_one_hot_encoder( self, numeric_expr_sql: str, diff --git a/tests/system/small/ml/test_preprocessing.py b/tests/system/small/ml/test_preprocessing.py index faa0cd7bbd..5b457cc9c0 100644 --- a/tests/system/small/ml/test_preprocessing.py +++ b/tests/system/small/ml/test_preprocessing.py @@ -373,6 +373,27 @@ def test_k_bins_discretizer_normalized_fit_transform_default_params(new_penguins pd.testing.assert_frame_equal(result, expected, rtol=0.1) +def test_k_bins_discretizer_normalized_fit_transform_default_params_quantile( + new_penguins_df, +): + discretizer = preprocessing.KBinsDiscretizer(strategy="quantile") + result = discretizer.fit_transform( + new_penguins_df[["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm"]] + ).to_pandas() + + expected = pd.DataFrame( + { + "kbinsdiscretizer_culmen_length_mm": ["bin_2", "bin_2", "bin_1"], + "kbinsdiscretizer_culmen_depth_mm": ["bin_2", "bin_1", "bin_2"], + "kbinsdiscretizer_flipper_length_mm": ["bin_2", "bin_1", "bin_2"], + }, + dtype="string[pyarrow]", + index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), + ) + + pd.testing.assert_frame_equal(result, expected, rtol=0.1) + + def test_k_bins_discretizer_series_normalizes( penguins_df_default_index, new_penguins_df ): @@ -395,6 +416,28 @@ def test_k_bins_discretizer_series_normalizes( pd.testing.assert_frame_equal(result, expected, rtol=0.1) +def test_k_bins_discretizer_series_normalizes_quantile( + penguins_df_default_index, new_penguins_df +): + discretizer = preprocessing.KBinsDiscretizer(strategy="quantile") + discretizer.fit(penguins_df_default_index["culmen_length_mm"]) + + result = discretizer.transform( + penguins_df_default_index["culmen_length_mm"] + ).to_pandas() + result = discretizer.transform(new_penguins_df).to_pandas() + + expected = pd.DataFrame( + { + "kbinsdiscretizer_culmen_length_mm": ["bin_2", "bin_2", "bin_1"], + }, + dtype="string[pyarrow]", + index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"), + ) + + pd.testing.assert_frame_equal(result, expected, rtol=0.1) + + def test_k_bins_discretizer_normalizes(penguins_df_default_index, new_penguins_df): # TODO(http://b/292431644): add a second test that compares output to sklearn.preprocessing.KBinsDiscretizer, when BQML's change is in prod. discretizer = preprocessing.KBinsDiscretizer(strategy="uniform") @@ -488,6 +531,21 @@ def test_k_bins_discretizer_save_load(new_penguins_df, dataset_id): pd.testing.assert_frame_equal(result, expected, rtol=0.1) +def test_k_bins_discretizer_save_load_quantile(new_penguins_df, dataset_id): + transformer = preprocessing.KBinsDiscretizer(n_bins=6, strategy="quantile") + transformer.fit( + new_penguins_df[["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm"]] + ) + + reloaded_transformer = transformer.to_gbq( + f"{dataset_id}.temp_configured_model", replace=True + ) + assert isinstance(reloaded_transformer, preprocessing.KBinsDiscretizer) + assert reloaded_transformer.n_bins == transformer.n_bins + assert reloaded_transformer.strategy == transformer.strategy + assert reloaded_transformer._bqml_model is not None + + def test_one_hot_encoder_default_params(new_penguins_df): encoder = preprocessing.OneHotEncoder() encoder.fit(new_penguins_df[["species", "sex"]]) diff --git a/tests/unit/ml/test_sql.py b/tests/unit/ml/test_sql.py index 4dd90b2c4a..07b247fb41 100644 --- a/tests/unit/ml/test_sql.py +++ b/tests/unit/ml/test_sql.py @@ -113,6 +113,13 @@ def test_k_bins_discretizer_correct( assert sql == "ML.BUCKETIZE(col_a, [1, 2, 3, 4], FALSE) AS scaled_col_a" +def test_k_bins_discretizer_quantile_correct( + base_sql_generator: ml_sql.BaseSqlGenerator, +): + sql = base_sql_generator.ml_quantile_bucketize("col_a", 5, "scaled_col_a") + assert sql == "ML.QUANTILE_BUCKETIZE(col_a, 5) OVER() AS scaled_col_a" + + def test_one_hot_encoder_correct( base_sql_generator: ml_sql.BaseSqlGenerator, ): diff --git a/third_party/bigframes_vendored/sklearn/preprocessing/_discretization.py b/third_party/bigframes_vendored/sklearn/preprocessing/_discretization.py index 98b9d0371f..54c81af71d 100644 --- a/third_party/bigframes_vendored/sklearn/preprocessing/_discretization.py +++ b/third_party/bigframes_vendored/sklearn/preprocessing/_discretization.py @@ -18,7 +18,7 @@ class KBinsDiscretizer(TransformerMixin, BaseEstimator): strategy ({'uniform', 'quantile'}, default='quantile'): Strategy used to define the widths of the bins. 'uniform': All bins in each feature have identical widths. 'quantile': All bins in each - feature have the same number of points. Only `uniform` is supported. + feature have the same number of points. """ def fit(self, X, y=None): From e58acdfeacb5db21acc8e6588cef276802e27278 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Tue, 7 May 2024 17:20:59 +0000 Subject: [PATCH 2/3] address comments --- bigframes/ml/preprocessing.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/bigframes/ml/preprocessing.py b/bigframes/ml/preprocessing.py index bc5b765151..5a35e4ec55 100644 --- a/bigframes/ml/preprocessing.py +++ b/bigframes/ml/preprocessing.py @@ -344,10 +344,6 @@ def _compile_to_sql( ] if self.strategy == "quantile": - # quantile_size = 1 / self.n_bins - # quantile_df = X.quantile([quantile_size + i * quantile_size for i in range(self.n_bins - 1)]) - # for column in columns: - # array_split_points[column] = quantile_df[column].to_list() return [ ( @@ -368,14 +364,14 @@ def _parse_from_sql(cls, sql: str) -> tuple[KBinsDiscretizer, str]: Returns: tuple(KBinsDiscretizer, column_label)""" + s = sql[sql.find("(") + 1 : sql.find(")")] + col_label = s[: s.find(",")] + if sql.startswith("ML.QUANTILE_BUCKETIZE"): - s = sql[sql.find("(") + 1 : sql.find(")")] - col_label, num_bins = s.split(", ") + num_bins = s.split(",")[1] return cls(int(num_bins), "quantile"), col_label else: - s = sql[sql.find("(") + 1 : sql.find(")")] array_split_points = s[s.find("[") + 1 : s.find("]")] - col_label = s[: s.find(",")] n_bins = array_split_points.count(",") + 2 return cls(n_bins, "uniform"), col_label From 59aec7ec8f24633cd987cd5a2f0a288eec993204 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Tue, 7 May 2024 18:32:32 +0000 Subject: [PATCH 3/3] address more comments --- bigframes/ml/preprocessing.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/bigframes/ml/preprocessing.py b/bigframes/ml/preprocessing.py index 5a35e4ec55..954d5adff0 100644 --- a/bigframes/ml/preprocessing.py +++ b/bigframes/ml/preprocessing.py @@ -343,7 +343,7 @@ def _compile_to_sql( for column in columns ] - if self.strategy == "quantile": + elif self.strategy == "quantile": return [ ( @@ -355,12 +355,18 @@ def _compile_to_sql( for column in columns ] + else: + raise ValueError( + f"strategy should be set 'quantile' or 'uniform', but your input is {self.strategy}." + ) + @classmethod def _parse_from_sql(cls, sql: str) -> tuple[KBinsDiscretizer, str]: """Parse SQL to tuple(KBinsDiscretizer, column_label). Args: - sql: SQL string of format "ML.BUCKETIZE({col_label}, array_split_points, FALSE) OVER()" + sql: SQL string of format "ML.BUCKETIZE({col_label}, array_split_points, FALSE)" + or ML.QUANTILE_BUCKETIZE({col_label}, num_bucket) OVER()" Returns: tuple(KBinsDiscretizer, column_label)"""