Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: add ml.model_selection.KFold class #1001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 49 additions & 25 deletions 74 bigframes/ml/model_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
https://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection."""


from typing import cast, List, Union
import inspect
from typing import cast, Generator, List, Union

import bigframes_vendored.sklearn.model_selection._split as vendored_model_selection_split

from bigframes.core import log_adapter
from bigframes.ml import utils
import bigframes.pandas as bpd

Expand All @@ -30,30 +34,6 @@ def train_test_split(
random_state: Union[int, None] = None,
stratify: Union[bpd.Series, None] = None,
) -> List[Union[bpd.DataFrame, bpd.Series]]:
"""Splits dataframes or series into random train and test subsets.

Args:
*arrays (bigframes.dataframe.DataFrame or bigframes.series.Series):
A sequence of BigQuery DataFrames or Series that can be joined on
their indexes.
test_size (default None):
The proportion of the dataset to include in the test split. If
None, this will default to the complement of train_size. If both
are none, it will be set to 0.25.
train_size (default None):
The proportion of the dataset to include in the train split. If
None, this will default to the complement of test_size.
random_state (default None):
A seed to use for randomly choosing the rows of the split. If not
set, a random split will be generated each time.
stratify: (bigframes.series.Series or None, default None):
If not None, data is split in a stratified fashion, using this as the class labels. Each split has the same distribution of the class labels with the original dataset.
Default to None.
Note: By setting the stratify parameter, the memory consumption and generated SQL will be linear to the unique values in the Series. May return errors if the unique values size is too large.

Returns:
List[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]]: A list of BigQuery DataFrames or Series.
"""

# TODO(garrettwu): scikit-learn throws an error when the dataframes don't have the same
# number of rows. We probably want to do something similar. Now the implementation is based
Expand Down Expand Up @@ -123,3 +103,47 @@ def _stratify_split(df: bpd.DataFrame, stratify: bpd.Series) -> List[bpd.DataFra
results.append(joined_df_test[columns])

return results


train_test_split.__doc__ = inspect.getdoc(
vendored_model_selection_split.train_test_split
)


@log_adapter.class_logger
class KFold(vendored_model_selection_split.KFold):
def __init__(self, n_splits: int = 5, *, random_state: Union[int, None] = None):
if n_splits < 2:
raise ValueError(f"n_splits must be at least 2. Got {n_splits}")
self._n_splits = n_splits
self._random_state = random_state

def get_n_splits(self) -> int:
return self._n_splits

def split(
self,
X: Union[bpd.DataFrame, bpd.Series],
y: Union[bpd.DataFrame, bpd.Series, None] = None,
) -> Generator[tuple[Union[bpd.DataFrame, bpd.Series, None]], None, None]:
X_df = next(utils.convert_to_dataframe(X))
y_df_or = next(utils.convert_to_dataframe(y)) if y is not None else None
joined_df = X_df.join(y_df_or, how="outer") if y_df_or is not None else X_df

fracs = (1 / self._n_splits,) * self._n_splits

dfs = joined_df._split(fracs=fracs, random_state=self._random_state)

for i in range(len(dfs)):
train_df = bpd.concat(dfs[:i] + dfs[i + 1 :])
test_df = dfs[i]

X_train = train_df[X_df.columns]
y_train = train_df[y_df_or.columns] if y_df_or is not None else None

X_test = test_df[X_df.columns]
y_test = test_df[y_df_or.columns] if y_df_or is not None else None

yield utils.convert_to_types(
[X_train, X_test, y_train, y_test], [X, X, y, y]
)
39 changes: 36 additions & 3 deletions 39 bigframes/ml/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import typing
from typing import Any, Iterable, Literal, Mapping, Optional, Union
from typing import Any, Generator, Iterable, Literal, Mapping, Optional, Union

import bigframes_vendored.constants as constants
from google.cloud import bigquery
Expand All @@ -25,7 +25,7 @@
ArrayType = Union[bpd.DataFrame, bpd.Series]


def convert_to_dataframe(*input: ArrayType) -> Iterable[bpd.DataFrame]:
def convert_to_dataframe(*input: ArrayType) -> Generator[bpd.DataFrame, None, None]:
return (_convert_to_dataframe(frame) for frame in input)


Expand All @@ -39,7 +39,7 @@ def _convert_to_dataframe(frame: ArrayType) -> bpd.DataFrame:
)


def convert_to_series(*input: ArrayType) -> Iterable[bpd.Series]:
def convert_to_series(*input: ArrayType) -> Generator[bpd.Series, None, None]:
return (_convert_to_series(frame) for frame in input)


Expand All @@ -60,6 +60,39 @@ def _convert_to_series(frame: ArrayType) -> bpd.Series:
)


def convert_to_types(
inputs: Iterable[Union[ArrayType, None]],
type_instances: Iterable[Union[ArrayType, None]],
) -> tuple[Union[ArrayType, None]]:
"""Convert the DF, Series and None types of the input to corresponding type_instances types."""
results = []
for input, type_instance in zip(inputs, type_instances):
results.append(_convert_to_type(input, type_instance))
return tuple(results)


def _convert_to_type(
input: Union[ArrayType, None], type_instance: Union[ArrayType, None]
):
if type_instance is None:
if input is not None:
raise ValueError(
f"Trying to convert not None type to None. {constants.FEEDBACK_LINK}"
)
return None
if input is None:
raise ValueError(
f"Trying to convert None type to not None. {constants.FEEDBACK_LINK}"
)
if isinstance(type_instance, bpd.DataFrame):
return _convert_to_dataframe(input)
if isinstance(type_instance, bpd.Series):
return _convert_to_series(input)
raise ValueError(
f"Unsupport converting to {type(type_instance)}. {constants.FEEDBACK_LINK}"
)


def parse_model_endpoint(model_endpoint: str) -> tuple[str, Optional[str]]:
"""Parse model endpoint string to model_name and version."""
model_name = model_endpoint
Expand Down
173 changes: 173 additions & 0 deletions 173 tests/system/small/ml/test_model_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import math

import pandas as pd
import pytest

Expand Down Expand Up @@ -302,3 +304,174 @@ def test_train_test_split_stratify(df_fixture, request):
test_counts,
check_index_type=False,
)


@pytest.mark.parametrize(
"n_splits",
(3, 5, 10),
)
def test_KFold_get_n_splits(n_splits):
kf = model_selection.KFold(n_splits)
assert kf.get_n_splits() == n_splits


@pytest.mark.parametrize(
"df_fixture",
("penguins_df_default_index", "penguins_df_null_index"),
)
@pytest.mark.parametrize(
"n_splits",
(3, 5),
)
def test_KFold_split(df_fixture, n_splits, request):
df = request.getfixturevalue(df_fixture)

kf = model_selection.KFold(n_splits=n_splits)

X = df[
[
"species",
"island",
"culmen_length_mm",
]
]
y = df["body_mass_g"]

len_test_upper, len_test_lower = math.ceil(len(df) / n_splits), math.floor(
len(df) / n_splits
)
len_train_upper, len_train_lower = (
len(df) - len_test_lower,
len(df) - len_test_upper,
)

for X_train, X_test, y_train, y_test in kf.split(X, y): # type: ignore
assert isinstance(X_train, bpd.DataFrame)
assert isinstance(X_test, bpd.DataFrame)
assert isinstance(y_train, bpd.Series)
assert isinstance(y_test, bpd.Series)

# Depend on the iteration, train/test can +-1 in size.
assert (
X_train.shape == (len_train_upper, 3)
and y_train.shape == (len_train_upper,)
and X_test.shape == (len_test_lower, 3)
and y_test.shape == (len_test_lower,)
) or (
X_train.shape == (len_train_lower, 3)
and y_train.shape == (len_train_lower,)
and X_test.shape == (len_test_upper, 3)
and y_test.shape == (len_test_upper,)
)


@pytest.mark.parametrize(
"df_fixture",
("penguins_df_default_index", "penguins_df_null_index"),
)
@pytest.mark.parametrize(
"n_splits",
(3, 5),
)
def test_KFold_split_X_only(df_fixture, n_splits, request):
df = request.getfixturevalue(df_fixture)

kf = model_selection.KFold(n_splits=n_splits)

X = df[
[
"species",
"island",
"culmen_length_mm",
]
]

len_test_upper, len_test_lower = math.ceil(len(df) / n_splits), math.floor(
len(df) / n_splits
)
len_train_upper, len_train_lower = (
len(df) - len_test_lower,
len(df) - len_test_upper,
)

for X_train, X_test, y_train, y_test in kf.split(X, y=None): # type: ignore
assert isinstance(X_train, bpd.DataFrame)
assert isinstance(X_test, bpd.DataFrame)
assert y_train is None
assert y_test is None

# Depend on the iteration, train/test can +-1 in size.
assert (
X_train.shape == (len_train_upper, 3)
and X_test.shape == (len_test_lower, 3)
) or (
X_train.shape == (len_train_lower, 3)
and X_test.shape == (len_test_upper, 3)
)


def test_KFold_seeded_correct_rows(session, penguins_pandas_df_default_index):
kf = model_selection.KFold(random_state=42)
# Note that we're using `penguins_pandas_df_default_index` as this test depends
# on a stable row order being present end to end
# filter down to the chunkiest penguins, to keep our test code a reasonable size
all_data = penguins_pandas_df_default_index[
penguins_pandas_df_default_index.body_mass_g > 5500
]

# Note that bigframes loses the index if it doesn't have a name
all_data.index.name = "rowindex"

df = session.read_pandas(all_data)

X = df[
[
"species",
"island",
"culmen_length_mm",
]
]
y = df["body_mass_g"]
X_train, X_test, y_train, y_test = next(kf.split(X, y)) # type: ignore

X_train_sorted = X_train.to_pandas().sort_index()
X_test_sorted = X_test.to_pandas().sort_index()
y_train_sorted = y_train.to_pandas().sort_index()
y_test_sorted = y_test.to_pandas().sort_index()

train_index: pd.Index = pd.Index(
[
144,
146,
148,
161,
168,
183,
217,
221,
225,
226,
237,
244,
257,
262,
264,
266,
267,
269,
278,
289,
290,
291,
],
dtype="Int64",
name="rowindex",
)
test_index: pd.Index = pd.Index(
[186, 240, 245, 260, 263, 268], dtype="Int64", name="rowindex"
)

pd.testing.assert_index_equal(X_train_sorted.index, train_index)
pd.testing.assert_index_equal(X_test_sorted.index, test_index)
pd.testing.assert_index_equal(y_train_sorted.index, train_index)
pd.testing.assert_index_equal(y_test_sorted.index, test_index)
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.