Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: Add DataFrame.corrwith method #1315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions 16 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,7 @@ def merge(

def _align_both_axes(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]:
# Join rows
aligned_block, (get_column_left, get_column_right) = self.join(other, how=how)
# join columns schema
Expand All @@ -2161,7 +2161,7 @@ def _align_both_axes(
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
else:
columns, lcol_indexer, rcol_indexer = self.column_labels.join(
other.column_labels, how="outer", return_indexers=True
other.column_labels, how=how, return_indexers=True
)
lcol_indexer = (
lcol_indexer if (lcol_indexer is not None) else range(len(columns))
Expand All @@ -2183,11 +2183,11 @@ def _align_both_axes(

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return aligned_block, columns, tuple(zip(left_inputs, right_inputs))
return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore

def _align_axis_0(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.DerefOp, ex.DerefOp]]]:
assert len(other.value_columns) == 1
aligned_block, (get_column_left, get_column_right) = self.join(other, how=how)

Expand All @@ -2203,7 +2203,7 @@ def _align_axis_0(

def _align_series_block_axis_1(
self, other: Block, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]:
assert len(other.value_columns) == 1
if other._transpose_cache is None:
raise ValueError(
Expand Down Expand Up @@ -2244,11 +2244,11 @@ def _align_series_block_axis_1(

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return aligned_block, columns, tuple(zip(left_inputs, right_inputs))
return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore

def _align_pd_series_axis_1(
self, other: pd.Series, how: str
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]:
) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]:
if self.column_labels.equals(other.index):
columns, lcol_indexer, rcol_indexer = self.column_labels, None, None
else:
Expand All @@ -2275,7 +2275,7 @@ def _align_pd_series_axis_1(

left_inputs = [left_input_lookup(i) for i in lcol_indexer]
right_inputs = [righ_input_lookup(i) for i in rcol_indexer]
return self, columns, tuple(zip(left_inputs, right_inputs))
return self, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore

def _apply_binop(
self,
Expand Down
3 changes: 3 additions & 0 deletions 3 bigframes/core/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,6 @@ def deterministic(self) -> bool:
return (
all(input.deterministic for input in self.inputs) and self.op.deterministic
)


RefOrConstant = Union[DerefOp, ScalarConstantExpression]
42 changes: 42 additions & 0 deletions 42 bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,48 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame:

return result

def corrwith(
self,
other: typing.Union[DataFrame, bigframes.series.Series],
*,
numeric_only: bool = False,
):
other_frame = other if isinstance(other, DataFrame) else other.to_frame()
if numeric_only:
l_frame = self._drop_non_numeric()
r_frame = other_frame._drop_non_numeric()
else:
l_frame = self._raise_on_non_numeric("corrwith")
r_frame = other_frame._raise_on_non_numeric("corrwith")

l_block = l_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block
r_block = r_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block

if isinstance(other, DataFrame):
block, labels, expr_pairs = l_block._align_both_axes(r_block, how="inner")
else:
assert isinstance(other, bigframes.series.Series)
block, labels, expr_pairs = l_block._align_axis_0(r_block, how="inner")

na_cols = l_block.column_labels.join(
r_block.column_labels, how="outer"
).difference(labels)

block, _ = block.aggregate(
aggregations=tuple(
ex.BinaryAggregation(agg_ops.CorrOp(), left_ex, right_ex)
for left_ex, right_ex in expr_pairs
),
column_labels=labels,
)
block = block.project_exprs(
(ex.const(float("nan")),) * len(na_cols), labels=na_cols
)
block = block.transpose(
original_row_index=pandas.Index([None]), single_row_mode=True
)
return bigframes.pandas.Series(block)

def to_arrow(
self,
*,
Expand Down
66 changes: 66 additions & 0 deletions 66 tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2246,6 +2246,72 @@ def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only):
)


def test_df_corrwith_df(scalars_dfs_maybe_ordered):
scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered

l_cols = ["int64_col", "float64_col", "int64_too"]
r_cols = ["int64_too", "float64_col"]

bf_result = scalars_df[l_cols].corrwith(scalars_df[r_cols]).to_pandas()
pd_result = scalars_pandas_df[l_cols].corrwith(scalars_pandas_df[r_cols])

# BigFrames and Pandas differ in their data type handling:
# - Column types: BigFrames uses Float64, Pandas uses float64.
# - Index types: BigFrames uses strign, Pandas uses object.
pd.testing.assert_series_equal(
bf_result, pd_result, check_dtype=False, check_index_type=False
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should add two more cases:

numeric_only = True, two dfs contain only numeric columns => computation proceeds successfully
numeric_only = True, one of the dfs has a non-numeric column => an error is raised.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


def test_df_corrwith_df_numeric_only(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs

l_cols = ["int64_col", "float64_col", "int64_too", "string_col"]
r_cols = ["int64_too", "float64_col", "bool_col"]

bf_result = (
scalars_df[l_cols].corrwith(scalars_df[r_cols], numeric_only=True).to_pandas()
)
pd_result = scalars_pandas_df[l_cols].corrwith(
scalars_pandas_df[r_cols], numeric_only=True
)

# BigFrames and Pandas differ in their data type handling:
# - Column types: BigFrames uses Float64, Pandas uses float64.
# - Index types: BigFrames uses strign, Pandas uses object.
pd.testing.assert_series_equal(
bf_result, pd_result, check_dtype=False, check_index_type=False
)


def test_df_corrwith_df_non_numeric_error(scalars_dfs):
scalars_df, _ = scalars_dfs

l_cols = ["int64_col", "float64_col", "int64_too", "string_col"]
r_cols = ["int64_too", "float64_col", "bool_col"]

with pytest.raises(NotImplementedError):
scalars_df[l_cols].corrwith(scalars_df[r_cols], numeric_only=False)


@skip_legacy_pandas
def test_df_corrwith_series(scalars_dfs_maybe_ordered):
scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered

l_cols = ["int64_col", "float64_col", "int64_too"]
r_col = "float64_col"

bf_result = scalars_df[l_cols].corrwith(scalars_df[r_col]).to_pandas()
pd_result = scalars_pandas_df[l_cols].corrwith(scalars_pandas_df[r_col])

# BigFrames and Pandas differ in their data type handling:
# - Column types: BigFrames uses Float64, Pandas uses float64.
# - Index types: BigFrames uses strign, Pandas uses object.
pd.testing.assert_series_equal(
bf_result, pd_result, check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize(
("op"),
[
Expand Down
41 changes: 41 additions & 0 deletions 41 third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -4146,6 +4146,47 @@ def cov(self, *, numeric_only) -> DataFrame:
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def corrwith(
self,
other,
*,
numeric_only: bool = False,
):
"""
Compute pairwise correlation.

Pairwise correlation is computed between rows or columns of
DataFrame with rows or columns of Series or DataFrame. DataFrames
are first aligned along both axes before computing the
correlations.

**Examples:**
>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None

>>> index = ["a", "b", "c", "d", "e"]
>>> columns = ["one", "two", "three", "four"]
>>> df1 = bpd.DataFrame(np.arange(20).reshape(5, 4), index=index, columns=columns)
>>> df2 = bpd.DataFrame(np.arange(16).reshape(4, 4), index=index[:4], columns=columns)
>>> df1.corrwith(df2)
one 1.0
two 1.0
three 1.0
four 1.0
dtype: Float64

Args:
other (DataFrame, Series):
Object with which to compute correlations.

numeric_only (bool, default False):
Include only `float`, `int` or `boolean` data.

Returns:
bigframes.pandas.Series: Pairwise correlations.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def update(
self, other, join: str = "left", overwrite: bool = True, filter_func=None
) -> DataFrame:
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.