From 20666713d80add6ba07351131b20956af1184d4c Mon Sep 17 00:00:00 2001 From: Garrett Wu Date: Thu, 19 Sep 2024 23:08:07 +0000 Subject: [PATCH] feat: add ml.model_selection.KFold class --- bigframes/ml/model_selection.py | 74 +++++--- bigframes/ml/utils.py | 39 +++- tests/system/small/ml/test_model_selection.py | 173 ++++++++++++++++++ .../sklearn/model_selection/_split.py | 109 +++++++++++ 4 files changed, 367 insertions(+), 28 deletions(-) create mode 100644 third_party/bigframes_vendored/sklearn/model_selection/_split.py diff --git a/bigframes/ml/model_selection.py b/bigframes/ml/model_selection.py index a6553d13dc..e4c41b2a39 100644 --- a/bigframes/ml/model_selection.py +++ b/bigframes/ml/model_selection.py @@ -17,8 +17,12 @@ https://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection.""" -from typing import cast, List, Union +import inspect +from typing import cast, Generator, List, Union +import bigframes_vendored.sklearn.model_selection._split as vendored_model_selection_split + +from bigframes.core import log_adapter from bigframes.ml import utils import bigframes.pandas as bpd @@ -30,30 +34,6 @@ def train_test_split( random_state: Union[int, None] = None, stratify: Union[bpd.Series, None] = None, ) -> List[Union[bpd.DataFrame, bpd.Series]]: - """Splits dataframes or series into random train and test subsets. - - Args: - *arrays (bigframes.dataframe.DataFrame or bigframes.series.Series): - A sequence of BigQuery DataFrames or Series that can be joined on - their indexes. - test_size (default None): - The proportion of the dataset to include in the test split. If - None, this will default to the complement of train_size. If both - are none, it will be set to 0.25. - train_size (default None): - The proportion of the dataset to include in the train split. If - None, this will default to the complement of test_size. - random_state (default None): - A seed to use for randomly choosing the rows of the split. If not - set, a random split will be generated each time. - stratify: (bigframes.series.Series or None, default None): - If not None, data is split in a stratified fashion, using this as the class labels. Each split has the same distribution of the class labels with the original dataset. - Default to None. - Note: By setting the stratify parameter, the memory consumption and generated SQL will be linear to the unique values in the Series. May return errors if the unique values size is too large. - - Returns: - List[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]]: A list of BigQuery DataFrames or Series. - """ # TODO(garrettwu): scikit-learn throws an error when the dataframes don't have the same # number of rows. We probably want to do something similar. Now the implementation is based @@ -123,3 +103,47 @@ def _stratify_split(df: bpd.DataFrame, stratify: bpd.Series) -> List[bpd.DataFra results.append(joined_df_test[columns]) return results + + +train_test_split.__doc__ = inspect.getdoc( + vendored_model_selection_split.train_test_split +) + + +@log_adapter.class_logger +class KFold(vendored_model_selection_split.KFold): + def __init__(self, n_splits: int = 5, *, random_state: Union[int, None] = None): + if n_splits < 2: + raise ValueError(f"n_splits must be at least 2. Got {n_splits}") + self._n_splits = n_splits + self._random_state = random_state + + def get_n_splits(self) -> int: + return self._n_splits + + def split( + self, + X: Union[bpd.DataFrame, bpd.Series], + y: Union[bpd.DataFrame, bpd.Series, None] = None, + ) -> Generator[tuple[Union[bpd.DataFrame, bpd.Series, None]], None, None]: + X_df = next(utils.convert_to_dataframe(X)) + y_df_or = next(utils.convert_to_dataframe(y)) if y is not None else None + joined_df = X_df.join(y_df_or, how="outer") if y_df_or is not None else X_df + + fracs = (1 / self._n_splits,) * self._n_splits + + dfs = joined_df._split(fracs=fracs, random_state=self._random_state) + + for i in range(len(dfs)): + train_df = bpd.concat(dfs[:i] + dfs[i + 1 :]) + test_df = dfs[i] + + X_train = train_df[X_df.columns] + y_train = train_df[y_df_or.columns] if y_df_or is not None else None + + X_test = test_df[X_df.columns] + y_test = test_df[y_df_or.columns] if y_df_or is not None else None + + yield utils.convert_to_types( + [X_train, X_test, y_train, y_test], [X, X, y, y] + ) diff --git a/bigframes/ml/utils.py b/bigframes/ml/utils.py index d754b1d002..96f0bc31e9 100644 --- a/bigframes/ml/utils.py +++ b/bigframes/ml/utils.py @@ -13,7 +13,7 @@ # limitations under the License. import typing -from typing import Any, Iterable, Literal, Mapping, Optional, Union +from typing import Any, Generator, Iterable, Literal, Mapping, Optional, Union import bigframes_vendored.constants as constants from google.cloud import bigquery @@ -25,7 +25,7 @@ ArrayType = Union[bpd.DataFrame, bpd.Series] -def convert_to_dataframe(*input: ArrayType) -> Iterable[bpd.DataFrame]: +def convert_to_dataframe(*input: ArrayType) -> Generator[bpd.DataFrame, None, None]: return (_convert_to_dataframe(frame) for frame in input) @@ -39,7 +39,7 @@ def _convert_to_dataframe(frame: ArrayType) -> bpd.DataFrame: ) -def convert_to_series(*input: ArrayType) -> Iterable[bpd.Series]: +def convert_to_series(*input: ArrayType) -> Generator[bpd.Series, None, None]: return (_convert_to_series(frame) for frame in input) @@ -60,6 +60,39 @@ def _convert_to_series(frame: ArrayType) -> bpd.Series: ) +def convert_to_types( + inputs: Iterable[Union[ArrayType, None]], + type_instances: Iterable[Union[ArrayType, None]], +) -> tuple[Union[ArrayType, None]]: + """Convert the DF, Series and None types of the input to corresponding type_instances types.""" + results = [] + for input, type_instance in zip(inputs, type_instances): + results.append(_convert_to_type(input, type_instance)) + return tuple(results) + + +def _convert_to_type( + input: Union[ArrayType, None], type_instance: Union[ArrayType, None] +): + if type_instance is None: + if input is not None: + raise ValueError( + f"Trying to convert not None type to None. {constants.FEEDBACK_LINK}" + ) + return None + if input is None: + raise ValueError( + f"Trying to convert None type to not None. {constants.FEEDBACK_LINK}" + ) + if isinstance(type_instance, bpd.DataFrame): + return _convert_to_dataframe(input) + if isinstance(type_instance, bpd.Series): + return _convert_to_series(input) + raise ValueError( + f"Unsupport converting to {type(type_instance)}. {constants.FEEDBACK_LINK}" + ) + + def parse_model_endpoint(model_endpoint: str) -> tuple[str, Optional[str]]: """Parse model endpoint string to model_name and version.""" model_name = model_endpoint diff --git a/tests/system/small/ml/test_model_selection.py b/tests/system/small/ml/test_model_selection.py index 47529565b7..e6b5f8cdc2 100644 --- a/tests/system/small/ml/test_model_selection.py +++ b/tests/system/small/ml/test_model_selection.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math + import pandas as pd import pytest @@ -302,3 +304,174 @@ def test_train_test_split_stratify(df_fixture, request): test_counts, check_index_type=False, ) + + +@pytest.mark.parametrize( + "n_splits", + (3, 5, 10), +) +def test_KFold_get_n_splits(n_splits): + kf = model_selection.KFold(n_splits) + assert kf.get_n_splits() == n_splits + + +@pytest.mark.parametrize( + "df_fixture", + ("penguins_df_default_index", "penguins_df_null_index"), +) +@pytest.mark.parametrize( + "n_splits", + (3, 5), +) +def test_KFold_split(df_fixture, n_splits, request): + df = request.getfixturevalue(df_fixture) + + kf = model_selection.KFold(n_splits=n_splits) + + X = df[ + [ + "species", + "island", + "culmen_length_mm", + ] + ] + y = df["body_mass_g"] + + len_test_upper, len_test_lower = math.ceil(len(df) / n_splits), math.floor( + len(df) / n_splits + ) + len_train_upper, len_train_lower = ( + len(df) - len_test_lower, + len(df) - len_test_upper, + ) + + for X_train, X_test, y_train, y_test in kf.split(X, y): # type: ignore + assert isinstance(X_train, bpd.DataFrame) + assert isinstance(X_test, bpd.DataFrame) + assert isinstance(y_train, bpd.Series) + assert isinstance(y_test, bpd.Series) + + # Depend on the iteration, train/test can +-1 in size. + assert ( + X_train.shape == (len_train_upper, 3) + and y_train.shape == (len_train_upper,) + and X_test.shape == (len_test_lower, 3) + and y_test.shape == (len_test_lower,) + ) or ( + X_train.shape == (len_train_lower, 3) + and y_train.shape == (len_train_lower,) + and X_test.shape == (len_test_upper, 3) + and y_test.shape == (len_test_upper,) + ) + + +@pytest.mark.parametrize( + "df_fixture", + ("penguins_df_default_index", "penguins_df_null_index"), +) +@pytest.mark.parametrize( + "n_splits", + (3, 5), +) +def test_KFold_split_X_only(df_fixture, n_splits, request): + df = request.getfixturevalue(df_fixture) + + kf = model_selection.KFold(n_splits=n_splits) + + X = df[ + [ + "species", + "island", + "culmen_length_mm", + ] + ] + + len_test_upper, len_test_lower = math.ceil(len(df) / n_splits), math.floor( + len(df) / n_splits + ) + len_train_upper, len_train_lower = ( + len(df) - len_test_lower, + len(df) - len_test_upper, + ) + + for X_train, X_test, y_train, y_test in kf.split(X, y=None): # type: ignore + assert isinstance(X_train, bpd.DataFrame) + assert isinstance(X_test, bpd.DataFrame) + assert y_train is None + assert y_test is None + + # Depend on the iteration, train/test can +-1 in size. + assert ( + X_train.shape == (len_train_upper, 3) + and X_test.shape == (len_test_lower, 3) + ) or ( + X_train.shape == (len_train_lower, 3) + and X_test.shape == (len_test_upper, 3) + ) + + +def test_KFold_seeded_correct_rows(session, penguins_pandas_df_default_index): + kf = model_selection.KFold(random_state=42) + # Note that we're using `penguins_pandas_df_default_index` as this test depends + # on a stable row order being present end to end + # filter down to the chunkiest penguins, to keep our test code a reasonable size + all_data = penguins_pandas_df_default_index[ + penguins_pandas_df_default_index.body_mass_g > 5500 + ] + + # Note that bigframes loses the index if it doesn't have a name + all_data.index.name = "rowindex" + + df = session.read_pandas(all_data) + + X = df[ + [ + "species", + "island", + "culmen_length_mm", + ] + ] + y = df["body_mass_g"] + X_train, X_test, y_train, y_test = next(kf.split(X, y)) # type: ignore + + X_train_sorted = X_train.to_pandas().sort_index() + X_test_sorted = X_test.to_pandas().sort_index() + y_train_sorted = y_train.to_pandas().sort_index() + y_test_sorted = y_test.to_pandas().sort_index() + + train_index: pd.Index = pd.Index( + [ + 144, + 146, + 148, + 161, + 168, + 183, + 217, + 221, + 225, + 226, + 237, + 244, + 257, + 262, + 264, + 266, + 267, + 269, + 278, + 289, + 290, + 291, + ], + dtype="Int64", + name="rowindex", + ) + test_index: pd.Index = pd.Index( + [186, 240, 245, 260, 263, 268], dtype="Int64", name="rowindex" + ) + + pd.testing.assert_index_equal(X_train_sorted.index, train_index) + pd.testing.assert_index_equal(X_test_sorted.index, test_index) + pd.testing.assert_index_equal(y_train_sorted.index, train_index) + pd.testing.assert_index_equal(y_test_sorted.index, test_index) diff --git a/third_party/bigframes_vendored/sklearn/model_selection/_split.py b/third_party/bigframes_vendored/sklearn/model_selection/_split.py new file mode 100644 index 0000000000..280962473e --- /dev/null +++ b/third_party/bigframes_vendored/sklearn/model_selection/_split.py @@ -0,0 +1,109 @@ +""" +The :mod:`sklearn.model_selection._split` module includes classes and +functions to split the data based on a preset strategy. +""" + +# Author: Alexandre Gramfort +# Gael Varoquaux +# Olivier Grisel +# Raghav RV +# Leandro Hermida +# Rodion Martynov +# License: BSD 3 clause + + +from abc import ABCMeta + +from bigframes import constants + + +class _BaseKFold(metaclass=ABCMeta): + """Base class for K-Fold cross-validators.""" + + def split(self, X, y=None): + """Generate indices to split data into training and test set. + + Args: + X (bigframes.dataframe.DataFrame or bigframes.series.Series): + BigFrames DataFrame or Series of shape (n_samples, n_features) + Training data, where `n_samples` is the number of samples + and `n_features` is the number of features. + + y (bigframes.dataframe.DataFrame, bigframes.series.Series or None): + BigFrames DataFrame, Series of shape (n_samples,) or None. + The target variable for supervised learning problems. Default to None. + + Yields: + X_train (bigframes.dataframe.DataFrame or bigframes.series.Series): + The training data for that split. + + X_test (bigframes.dataframe.DataFrame or bigframes.series.Series): + The testing data for that split. + + y_train (bigframes.dataframe.DataFrame, bigframes.series.Series or None): + The training label for that split. + + y_test (bigframes.dataframe.DataFrame, bigframes.series.Series or None): + The testing label for that split. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + def get_n_splits(self): + """Returns the number of splitting iterations in the cross-validator. + + Returns: + int: the number of splitting iterations in the cross-validator. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + +class KFold(_BaseKFold): + """K-Fold cross-validator. + + Split data in train/test sets. Split dataset into k consecutive folds. + + Each fold is then used once as a validation while the k - 1 remaining + folds form the training set. + + Args: + n_splits (int): + Number of folds. Must be at least 2. Default to 5. + + random_state (Optional[int]): + A seed to use for randomly choosing the rows of the split. If not + set, a random split will be generated each time. Default to None. + """ + + +def train_test_split( + *arrays, + test_size=None, + train_size=None, + random_state=None, + stratify=None, +): + """Splits dataframes or series into random train and test subsets. + + Args: + *arrays (bigframes.dataframe.DataFrame or bigframes.series.Series): + A sequence of BigQuery DataFrames or Series that can be joined on + their indexes. + test_size (default None): + The proportion of the dataset to include in the test split. If + None, this will default to the complement of train_size. If both + are none, it will be set to 0.25. + train_size (default None): + The proportion of the dataset to include in the train split. If + None, this will default to the complement of test_size. + random_state (default None): + A seed to use for randomly choosing the rows of the split. If not + set, a random split will be generated each time. + stratify: (bigframes.series.Series or None, default None): + If not None, data is split in a stratified fashion, using this as the class labels. Each split has the same distribution of the class labels with the original dataset. + Default to None. + Note: By setting the stratify parameter, the memory consumption and generated SQL will be linear to the unique values in the Series. May return errors if the unique values size is too large. + + Returns: + List[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]]: A list of BigQuery DataFrames or Series. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)