diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index afc03dbdea..727ee013f8 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2152,7 +2152,7 @@ def merge( def _align_both_axes( self, other: Block, how: str - ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]: # Join rows aligned_block, (get_column_left, get_column_right) = self.join(other, how=how) # join columns schema @@ -2161,7 +2161,7 @@ def _align_both_axes( columns, lcol_indexer, rcol_indexer = self.column_labels, None, None else: columns, lcol_indexer, rcol_indexer = self.column_labels.join( - other.column_labels, how="outer", return_indexers=True + other.column_labels, how=how, return_indexers=True ) lcol_indexer = ( lcol_indexer if (lcol_indexer is not None) else range(len(columns)) @@ -2183,11 +2183,11 @@ def _align_both_axes( left_inputs = [left_input_lookup(i) for i in lcol_indexer] right_inputs = [righ_input_lookup(i) for i in rcol_indexer] - return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) + return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore def _align_axis_0( self, other: Block, how: str - ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.DerefOp, ex.DerefOp]]]: assert len(other.value_columns) == 1 aligned_block, (get_column_left, get_column_right) = self.join(other, how=how) @@ -2203,7 +2203,7 @@ def _align_axis_0( def _align_series_block_axis_1( self, other: Block, how: str - ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]: assert len(other.value_columns) == 1 if other._transpose_cache is None: raise ValueError( @@ -2244,11 +2244,11 @@ def _align_series_block_axis_1( left_inputs = [left_input_lookup(i) for i in lcol_indexer] right_inputs = [righ_input_lookup(i) for i in rcol_indexer] - return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) + return aligned_block, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore def _align_pd_series_axis_1( self, other: pd.Series, how: str - ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.Expression, ex.Expression]]]: + ) -> Tuple[Block, pd.Index, Sequence[Tuple[ex.RefOrConstant, ex.RefOrConstant]]]: if self.column_labels.equals(other.index): columns, lcol_indexer, rcol_indexer = self.column_labels, None, None else: @@ -2275,7 +2275,7 @@ def _align_pd_series_axis_1( left_inputs = [left_input_lookup(i) for i in lcol_indexer] right_inputs = [righ_input_lookup(i) for i in rcol_indexer] - return self, columns, tuple(zip(left_inputs, right_inputs)) + return self, columns, tuple(zip(left_inputs, right_inputs)) # type: ignore def _apply_binop( self, diff --git a/bigframes/core/expression.py b/bigframes/core/expression.py index 2d561657cb..9173bebfc4 100644 --- a/bigframes/core/expression.py +++ b/bigframes/core/expression.py @@ -420,3 +420,6 @@ def deterministic(self) -> bool: return ( all(input.deterministic for input in self.inputs) and self.op.deterministic ) + + +RefOrConstant = Union[DerefOp, ScalarConstantExpression] diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 7f60f1c769..6c866ad4b5 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1473,6 +1473,48 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: return result + def corrwith( + self, + other: typing.Union[DataFrame, bigframes.series.Series], + *, + numeric_only: bool = False, + ): + other_frame = other if isinstance(other, DataFrame) else other.to_frame() + if numeric_only: + l_frame = self._drop_non_numeric() + r_frame = other_frame._drop_non_numeric() + else: + l_frame = self._raise_on_non_numeric("corrwith") + r_frame = other_frame._raise_on_non_numeric("corrwith") + + l_block = l_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block + r_block = r_frame.astype(bigframes.dtypes.FLOAT_DTYPE)._block + + if isinstance(other, DataFrame): + block, labels, expr_pairs = l_block._align_both_axes(r_block, how="inner") + else: + assert isinstance(other, bigframes.series.Series) + block, labels, expr_pairs = l_block._align_axis_0(r_block, how="inner") + + na_cols = l_block.column_labels.join( + r_block.column_labels, how="outer" + ).difference(labels) + + block, _ = block.aggregate( + aggregations=tuple( + ex.BinaryAggregation(agg_ops.CorrOp(), left_ex, right_ex) + for left_ex, right_ex in expr_pairs + ), + column_labels=labels, + ) + block = block.project_exprs( + (ex.const(float("nan")),) * len(na_cols), labels=na_cols + ) + block = block.transpose( + original_row_index=pandas.Index([None]), single_row_mode=True + ) + return bigframes.pandas.Series(block) + def to_arrow( self, *, diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 93c865536c..4266cdba88 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2246,6 +2246,72 @@ def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): ) +def test_df_corrwith_df(scalars_dfs_maybe_ordered): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered + + l_cols = ["int64_col", "float64_col", "int64_too"] + r_cols = ["int64_too", "float64_col"] + + bf_result = scalars_df[l_cols].corrwith(scalars_df[r_cols]).to_pandas() + pd_result = scalars_pandas_df[l_cols].corrwith(scalars_pandas_df[r_cols]) + + # BigFrames and Pandas differ in their data type handling: + # - Column types: BigFrames uses Float64, Pandas uses float64. + # - Index types: BigFrames uses strign, Pandas uses object. + pd.testing.assert_series_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + +def test_df_corrwith_df_numeric_only(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + l_cols = ["int64_col", "float64_col", "int64_too", "string_col"] + r_cols = ["int64_too", "float64_col", "bool_col"] + + bf_result = ( + scalars_df[l_cols].corrwith(scalars_df[r_cols], numeric_only=True).to_pandas() + ) + pd_result = scalars_pandas_df[l_cols].corrwith( + scalars_pandas_df[r_cols], numeric_only=True + ) + + # BigFrames and Pandas differ in their data type handling: + # - Column types: BigFrames uses Float64, Pandas uses float64. + # - Index types: BigFrames uses strign, Pandas uses object. + pd.testing.assert_series_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + +def test_df_corrwith_df_non_numeric_error(scalars_dfs): + scalars_df, _ = scalars_dfs + + l_cols = ["int64_col", "float64_col", "int64_too", "string_col"] + r_cols = ["int64_too", "float64_col", "bool_col"] + + with pytest.raises(NotImplementedError): + scalars_df[l_cols].corrwith(scalars_df[r_cols], numeric_only=False) + + +@skip_legacy_pandas +def test_df_corrwith_series(scalars_dfs_maybe_ordered): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered + + l_cols = ["int64_col", "float64_col", "int64_too"] + r_col = "float64_col" + + bf_result = scalars_df[l_cols].corrwith(scalars_df[r_col]).to_pandas() + pd_result = scalars_pandas_df[l_cols].corrwith(scalars_pandas_df[r_col]) + + # BigFrames and Pandas differ in their data type handling: + # - Column types: BigFrames uses Float64, Pandas uses float64. + # - Index types: BigFrames uses strign, Pandas uses object. + pd.testing.assert_series_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + @pytest.mark.parametrize( ("op"), [ diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index bf4d2f2d0c..f5aa23d00b 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4146,6 +4146,47 @@ def cov(self, *, numeric_only) -> DataFrame: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def corrwith( + self, + other, + *, + numeric_only: bool = False, + ): + """ + Compute pairwise correlation. + + Pairwise correlation is computed between rows or columns of + DataFrame with rows or columns of Series or DataFrame. DataFrames + are first aligned along both axes before computing the + correlations. + + **Examples:** + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> index = ["a", "b", "c", "d", "e"] + >>> columns = ["one", "two", "three", "four"] + >>> df1 = bpd.DataFrame(np.arange(20).reshape(5, 4), index=index, columns=columns) + >>> df2 = bpd.DataFrame(np.arange(16).reshape(4, 4), index=index[:4], columns=columns) + >>> df1.corrwith(df2) + one 1.0 + two 1.0 + three 1.0 + four 1.0 + dtype: Float64 + + Args: + other (DataFrame, Series): + Object with which to compute correlations. + + numeric_only (bool, default False): + Include only `float`, `int` or `boolean` data. + + Returns: + bigframes.pandas.Series: Pairwise correlations. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def update( self, other, join: str = "left", overwrite: bool = True, filter_func=None ) -> DataFrame: