Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: add ml.preprocessing.MinMaxScaler #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions 1 bigframes/ml/compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
preprocessing.OneHotEncoder,
preprocessing.StandardScaler,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.LabelEncoder,
]

Expand Down
12 changes: 11 additions & 1 deletion 12 bigframes/ml/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, steps: List[Tuple[str, base.BaseEstimator]]):
preprocessing.StandardScaler,
preprocessing.OneHotEncoder,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.LabelEncoder,
),
):
Expand Down Expand Up @@ -149,6 +150,7 @@ def _extract_as_column_transformer(
preprocessing.OneHotEncoder,
preprocessing.StandardScaler,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.LabelEncoder,
],
Union[str, List[str]],
Expand Down Expand Up @@ -177,10 +179,17 @@ def _extract_as_column_transformer(
elif transform_sql.startswith("ML.MAX_ABS_SCALER"):
transformers.append(
(
"max_abs_encoder",
"max_abs_scaler",
*preprocessing.MaxAbsScaler._parse_from_sql(transform_sql),
)
)
elif transform_sql.startswith("ML.MIN_MAX_SCALER"):
transformers.append(
(
"min_max_scaler",
*preprocessing.MinMaxScaler._parse_from_sql(transform_sql),
)
)
elif transform_sql.startswith("ML.LABEL_ENCODER"):
transformers.append(
(
Expand All @@ -203,6 +212,7 @@ def _merge_column_transformer(
preprocessing.StandardScaler,
preprocessing.OneHotEncoder,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.LabelEncoder,
]:
"""Try to merge the column transformer to a simple transformer."""
Expand Down
84 changes: 82 additions & 2 deletions 84 bigframes/ml/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ def _compile_to_sql(self, columns: List[str]) -> List[Tuple[str, str]]:

@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[MaxAbsScaler, str]:
"""Parse SQL to tuple(StandardScaler, column_label).
"""Parse SQL to tuple(MaxAbsScaler, column_label).

Args:
sql: SQL string of format "ML.MAX_ABS_SCALER({col_label}) OVER()"

Returns:
tuple(StandardScaler, column_label)"""
tuple(MaxAbsScaler, column_label)"""
ashleyxuu marked this conversation as resolved.
Show resolved Hide resolved
col_label = sql[sql.find("(") + 1 : sql.find(")")]
return cls(), col_label

Expand Down Expand Up @@ -187,6 +187,86 @@ def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame:
)


class MinMaxScaler(
base.Transformer,
third_party.bigframes_vendored.sklearn.preprocessing._data.MinMaxScaler,
):
__doc__ = (
third_party.bigframes_vendored.sklearn.preprocessing._data.MinMaxScaler.__doc__
)

def __init__(self):
self._bqml_model: Optional[core.BqmlModel] = None
self._bqml_model_factory = globals.bqml_model_factory()
self._base_sql_generator = globals.base_sql_generator()

# TODO(garrettwu): implement __hash__
def __eq__(self, other: Any) -> bool:
return type(other) is MinMaxScaler and self._bqml_model == other._bqml_model

def _compile_to_sql(self, columns: List[str]) -> List[Tuple[str, str]]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause

Args:
columns: a list of column names to transform

Returns: a list of tuples of (sql_expression, output_name)"""
return [
(
self._base_sql_generator.ml_min_max_scaler(
column, f"min_max_scaled_{column}"
),
f"min_max_scaled_{column}",
)
for column in columns
]

@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[MinMaxScaler, str]:
"""Parse SQL to tuple(MinMaxScaler, column_label).

Args:
sql: SQL string of format "ML.MIN_MAX_SCALER({col_label}) OVER()"

Returns:
tuple(MinMaxScaler, column_label)"""
col_label = sql[sql.find("(") + 1 : sql.find(")")]
return cls(), col_label

def fit(
self,
X: Union[bpd.DataFrame, bpd.Series],
y=None, # ignored
) -> MinMaxScaler:
(X,) = utils.convert_to_dataframe(X)

compiled_transforms = self._compile_to_sql(X.columns.tolist())
transform_sqls = [transform_sql for transform_sql, _ in compiled_transforms]

self._bqml_model = self._bqml_model_factory.create_model(
X,
options={"model_type": "transform_only"},
transforms=transform_sqls,
)

# The schema of TRANSFORM output is not available in the model API, so save it during fitting
self._output_names = [name for _, name in compiled_transforms]
return self

def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame:
if not self._bqml_model:
raise RuntimeError("Must be fitted before transform")

(X,) = utils.convert_to_dataframe(X)

df = self._bqml_model.transform(X)
return typing.cast(
bpd.DataFrame,
df[self._output_names],
)


class OneHotEncoder(
base.Transformer,
third_party.bigframes_vendored.sklearn.preprocessing._encoder.OneHotEncoder,
Expand Down
4 changes: 4 additions & 0 deletions 4 bigframes/ml/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def ml_max_abs_scaler(self, numeric_expr_sql: str, name: str) -> str:
"""Encode ML.MAX_ABS_SCALER for BQML"""
return f"""ML.MAX_ABS_SCALER({numeric_expr_sql}) OVER() AS {name}"""

def ml_min_max_scaler(self, numeric_expr_sql: str, name: str) -> str:
"""Encode ML.MIN_MAX_SCALER for BQML"""
return f"""ML.MIN_MAX_SCALER({numeric_expr_sql}) OVER() AS {name}"""

def ml_one_hot_encoder(
self,
numeric_expr_sql: str,
Expand Down
46 changes: 43 additions & 3 deletions 46 tests/system/large/ml/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,11 @@ def test_pipeline_columntransformer_fit_predict(session, penguins_df_default_ind
preprocessing.MaxAbsScaler(),
["culmen_length_mm", "flipper_length_mm"],
),
(
"min_max_scale",
preprocessing.MinMaxScaler(),
["culmen_length_mm", "flipper_length_mm"],
),
(
"label",
preprocessing.LabelEncoder(),
Expand Down Expand Up @@ -647,6 +652,11 @@ def test_pipeline_columntransformer_to_gbq(penguins_df_default_index, dataset_id
preprocessing.MaxAbsScaler(),
["culmen_length_mm", "flipper_length_mm"],
),
(
"min_max_scale",
preprocessing.MinMaxScaler(),
["culmen_length_mm", "flipper_length_mm"],
),
(
"label",
preprocessing.LabelEncoder(),
Expand Down Expand Up @@ -684,9 +694,11 @@ def test_pipeline_columntransformer_to_gbq(penguins_df_default_index, dataset_id
"species",
),
("standard_scaler", preprocessing.StandardScaler(), "culmen_length_mm"),
("max_abs_encoder", preprocessing.MaxAbsScaler(), "culmen_length_mm"),
("max_abs_scaler", preprocessing.MaxAbsScaler(), "culmen_length_mm"),
("min_max_scaler", preprocessing.MinMaxScaler(), "culmen_length_mm"),
("standard_scaler", preprocessing.StandardScaler(), "flipper_length_mm"),
("max_abs_encoder", preprocessing.MaxAbsScaler(), "flipper_length_mm"),
("max_abs_scaler", preprocessing.MaxAbsScaler(), "flipper_length_mm"),
("min_max_scaler", preprocessing.MinMaxScaler(), "flipper_length_mm"),
]

assert transformers == expected
Expand Down Expand Up @@ -743,14 +755,42 @@ def test_pipeline_max_abs_scaler_to_gbq(penguins_df_default_index, dataset_id):
pl.fit(X_train, y_train)

pl_loaded = pl.to_gbq(
f"{dataset_id}.test_penguins_pipeline_standard_scaler", replace=True
f"{dataset_id}.test_penguins_pipeline_min_max_scaler", replace=True
)
assert isinstance(pl_loaded._transform, preprocessing.MaxAbsScaler)

assert isinstance(pl_loaded._estimator, linear_model.LinearRegression)
assert pl_loaded._estimator.fit_intercept is False


def test_pipeline_min_max_scaler_to_gbq(penguins_df_default_index, dataset_id):
pl = pipeline.Pipeline(
[
("transform", preprocessing.MinMaxScaler()),
("estimator", linear_model.LinearRegression(fit_intercept=False)),
]
)

df = penguins_df_default_index.dropna()
X_train = df[
[
"culmen_length_mm",
"culmen_depth_mm",
"flipper_length_mm",
]
]
y_train = df[["body_mass_g"]]
pl.fit(X_train, y_train)

pl_loaded = pl.to_gbq(
f"{dataset_id}.test_penguins_pipeline_min_max_scaler", replace=True
)
assert isinstance(pl_loaded._transform, preprocessing.MinMaxScaler)

assert isinstance(pl_loaded._estimator, linear_model.LinearRegression)
assert pl_loaded._estimator.fit_intercept is False


def test_pipeline_one_hot_encoder_to_gbq(penguins_df_default_index, dataset_id):
pl = pipeline.Pipeline(
[
Expand Down
93 changes: 93 additions & 0 deletions 93 tests/system/small/ml/test_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,99 @@ def test_max_abs_scaler_series_normalizes(penguins_df_default_index, new_penguin
pd.testing.assert_frame_equal(result, expected, rtol=1e-3)


def test_min_max_scaler_normalizeds_fit_transform(new_penguins_df):
scaler = bigframes.ml.preprocessing.MinMaxScaler()
result = scaler.fit_transform(
new_penguins_df[["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm"]]
).to_pandas()

# TODO: bug? feature columns seem to be in nondeterministic random order
# workaround: sort columns by name. Can't repro it in pantheon, so could
# be a bigframes issue...
result = result.reindex(sorted(result.columns), axis=1)

expected = pd.DataFrame(
{
"min_max_scaled_culmen_depth_mm": [1.0, 0.0, 0.5625],
"min_max_scaled_culmen_length_mm": [1.0, 0.375, 0.0],
"min_max_scaled_flipper_length_mm": [1.0, 0.0, 0.466667],
},
dtype="Float64",
index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
)

pd.testing.assert_frame_equal(result, expected, rtol=1e-3)


def test_min_max_scaler_series_normalizes(penguins_df_default_index, new_penguins_df):
scaler = bigframes.ml.preprocessing.MinMaxScaler()
scaler.fit(penguins_df_default_index["culmen_length_mm"])

result = scaler.transform(penguins_df_default_index["culmen_length_mm"]).to_pandas()

# If minmax-scaled correctly, min should be 0 and max should be 1.
for column in result.columns:
assert math.isclose(result[column].max(), 1.0, abs_tol=1e-3)
assert math.isclose(result[column].min(), 0.0, abs_tol=1e-3)

result = scaler.transform(new_penguins_df).to_pandas()

# TODO: bug? feature columns seem to be in nondeterministic random order
# workaround: sort columns by name. Can't repro it in pantheon, so could
# be a bigframes issue...
result = result.reindex(sorted(result.columns), axis=1)

expected = pd.DataFrame(
{
"min_max_scaled_culmen_length_mm": [0.269091, 0.232727, 0.210909],
},
dtype="Float64",
index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
)

pd.testing.assert_frame_equal(result, expected, rtol=1e-3)


def test_min_max_scaler_normalizes(penguins_df_default_index, new_penguins_df):
# TODO(http://b/292431644): add a second test that compares output to sklearn.preprocessing.StandardScaler, when BQML's change is in prod.
scaler = bigframes.ml.preprocessing.MinMaxScaler()
scaler.fit(
penguins_df_default_index[
["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm"]
]
)

result = scaler.transform(
penguins_df_default_index[
["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm"]
]
).to_pandas()

# If minmax-scaled correctly, min should be 0 and max should be 1.
for column in result.columns:
assert math.isclose(result[column].max(), 1.0, abs_tol=1e-3)
assert math.isclose(result[column].min(), 0.0, abs_tol=1e-3)

result = scaler.transform(new_penguins_df).to_pandas()

# TODO: bug? feature columns seem to be in nondeterministic random order
# workaround: sort columns by name. Can't repro it in pantheon, so could
# be a bigframes issue...
result = result.reindex(sorted(result.columns), axis=1)

expected = pd.DataFrame(
{
"min_max_scaled_culmen_depth_mm": [0.678571, 0.4880952, 0.595238],
"min_max_scaled_culmen_length_mm": [0.269091, 0.232727, 0.210909],
"min_max_scaled_flipper_length_mm": [0.40678, 0.152542, 0.271186],
},
dtype="Float64",
index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
)

pd.testing.assert_frame_equal(result, expected, rtol=1e-3)


def test_one_hot_encoder_default_params(new_penguins_df):
encoder = bigframes.ml.preprocessing.OneHotEncoder()
encoder.fit(new_penguins_df[["species", "sex"]])
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.