Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: add Series.cov method #368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions 22 bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,28 +261,6 @@ def aggregate(
)
)

def corr_aggregate(
self, corr_aggregations: typing.Sequence[typing.Tuple[str, str, str]]
) -> ArrayValue:
"""
Get correlations between each lef_column_id and right_column_id, stored in the respective output_column_id.
This uses BigQuery's CORR under the hood, and thus only Pearson's method is used.
Arguments:
corr_aggregations: left_column_id, right_column_id, output_column_id tuples
"""
aggregations = tuple(
(
ex.BinaryAggregation(
agg_ops.CorrOp(), ex.free_var(agg[0]), ex.free_var(agg[1])
),
agg[2],
)
for agg in corr_aggregations
)
return ArrayValue(
nodes.AggregateNode(child=self.node, aggregations=aggregations)
)

def project_window_op(
self,
column_name: str,
Expand Down
19 changes: 11 additions & 8 deletions 19 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,26 +1040,29 @@ def get_stat(self, column_id: str, stat: agg_ops.UnaryAggregateOp):
self._stats_cache[column_id].update(stats_map)
return stats_map[stat.name]

def get_corr_stat(self, column_id_left: str, column_id_right: str):
def get_binary_stat(
self, column_id_left: str, column_id_right: str, stat: agg_ops.BinaryAggregateOp
):
# TODO(kemppeterson): Clean up the column names for DataFrames.corr support
# TODO(kemppeterson): Add a cache here.
corr_aggregations = [
aggregations = [
(
column_id_left,
column_id_right,
"corr_" + column_id_left + column_id_right,
ex.BinaryAggregation(
stat, ex.free_var(column_id_left), ex.free_var(column_id_right)
),
f"{stat.name}_{column_id_left}{column_id_right}",
)
]
expr = self.expr.corr_aggregate(corr_aggregations)
expr = self.expr.aggregate(aggregations)
offset_index_id = guid.generate_guid()
expr = expr.promote_offsets(offset_index_id)
block = Block(
expr,
index_columns=[offset_index_id],
column_labels=[a[2] for a in corr_aggregations],
column_labels=[a[1] for a in aggregations],
)
df, _ = block.to_pandas()
return df.loc[0, "corr_" + column_id_left + column_id_right]
return df.loc[0, f"{stat.name}_{column_id_left}{column_id_right}"]

def summarize(
self,
Expand Down
13 changes: 13 additions & 0 deletions 13 bigframes/core/compile/aggregate_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,19 @@ def _(
return cast(ibis_types.NumericColumn, bq_corr)


@compile_binary_agg.register
def _(
op: agg_ops.CovOp, left: ibis_types.Column, right: ibis_types.Column, window=None
) -> ibis_types.NumericValue:
# Will be null if all inputs are null. Pandas defaults to zero sum though.
left_numeric = cast(ibis_types.NumericColumn, left)
right_numeric = cast(ibis_types.NumericColumn, right)
bq_cov = _apply_window_if_present(
left_numeric.cov(right_numeric, how="sample"), window
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's how='sample' meaning here? Do we calculate the cov based on a sample series?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sample variance often means it is population_variance * N / N-1. This provides an unbiased estimator of variance https://mathworld.wolfram.com/SampleVariance.html.

)
return cast(ibis_types.NumericColumn, bq_cov)


def _apply_window_if_present(value: ibis_types.Value, window):
return value.over(window) if (window is not None) else value

Expand Down
5 changes: 5 additions & 0 deletions 5 bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ class CorrOp(BinaryAggregateOp):
name: ClassVar[str] = "corr"


@dataclasses.dataclass(frozen=True)
class CovOp(BinaryAggregateOp):
name: ClassVar[str] = "cov"


sum_op = SumOp()
mean_op = MeanOp()
median_op = MedianOp()
Expand Down
7 changes: 5 additions & 2 deletions 7 bigframes/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import bigframes.core.scalar as scalars
import bigframes.dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.series as series
import bigframes.session
import third_party.bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing
Expand Down Expand Up @@ -188,10 +189,12 @@ def _apply_binary_op(
block, result_id = self._block.project_expr(expr, name)
return series.Series(block.select_column(result_id))

def _apply_corr_aggregation(self, other: series.Series) -> float:
def _apply_binary_aggregation(
self, other: series.Series, stat: agg_ops.BinaryAggregateOp
) -> float:
(left, right, block) = self._align(other, how="outer")

return block.get_corr_stat(left, right)
return block.get_binary_stat(left, right, stat)

def _align(self, other: series.Series, how="outer") -> tuple[str, str, blocks.Block]: # type: ignore
"""Aligns the series value with another scalar or series object. Returns new left column id, right column id and joined tabled expression."""
Expand Down
9 changes: 6 additions & 3 deletions 9 bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,8 +734,8 @@ def round(self, decimals=0) -> "Series":
return self._apply_binary_op(decimals, ops.round_op)

def corr(self, other: Series, method="pearson", min_periods=None) -> float:
# TODO(kemppeterson): Validate early that both are numeric
# TODO(kemppeterson): Handle partially-numeric columns
# TODO(tbergeron): Validate early that both are numeric
# TODO(tbergeron): Handle partially-numeric columns
if method != "pearson":
raise NotImplementedError(
f"Only Pearson correlation is currently supported. {constants.FEEDBACK_LINK}"
Expand All @@ -744,7 +744,10 @@ def corr(self, other: Series, method="pearson", min_periods=None) -> float:
raise NotImplementedError(
f"min_periods not yet supported. {constants.FEEDBACK_LINK}"
)
return self._apply_corr_aggregation(other)
return self._apply_binary_aggregation(other, agg_ops.CorrOp())

def cov(self, other: Series) -> float:
return self._apply_binary_aggregation(other, agg_ops.CovOp())

def all(self) -> bool:
return typing.cast(bool, self._apply_aggregation(agg_ops.all_op))
Expand Down
13 changes: 12 additions & 1 deletion 13 tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ def test_mods(scalars_dfs, col_x, col_y, method):

# We work around a pandas bug that doesn't handle correlating nullable dtypes by doing this
# manually with dumb self-correlation instead of parameterized as test_mods is above.
def test_corr(scalars_dfs):
def test_series_corr(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = scalars_df["int64_too"].corr(scalars_df["int64_too"])
pd_result = (
Expand All @@ -667,6 +667,17 @@ def test_corr(scalars_dfs):
assert math.isclose(pd_result, bf_result)


def test_series_cov(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = scalars_df["int64_too"].cov(scalars_df["int64_too"])
pd_result = (
scalars_pandas_df["int64_too"]
.astype("int64")
.cov(scalars_pandas_df["int64_too"].astype("int64"))
)
assert math.isclose(pd_result, bf_result)


@pytest.mark.parametrize(
("col_x",),
[
Expand Down
20 changes: 0 additions & 20 deletions 20 tests/unit/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,23 +208,3 @@ def test_arrayvalue_to_ibis_expr_with_aggregate():
assert actual.columns[0] == "col1"
assert actual.columns[1] == "col4"
assert expr.columns[1].type().is_int64()


def test_arrayvalue_to_ibis_expr_with_corr_aggregate():
value = resources.create_arrayvalue(
pandas.DataFrame(
{
"col1": [1, 2, 3],
"col2": ["a", "b", "c"],
"col3": [0.1, 0.2, 0.3],
}
),
total_ordering_columns=["col1"],
)
expr = value.corr_aggregate(
corr_aggregations=[("col1", "col3", "col4")]
)._compile_ordered()
actual = expr._to_ibis_expr(ordering_mode="unordered")
assert len(expr.columns) == 1
assert actual.columns[0] == "col4"
assert expr.columns[0].type().is_float64()
21 changes: 21 additions & 0 deletions 21 third_party/bigframes_vendored/pandas/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,27 @@ def corr(self, other, method="pearson", min_periods=None) -> float:
"""
raise NotImplementedError("abstract method")

def cov(
self,
other,
) -> float:
"""
Compute covariance with Series, excluding missing values.

The two `Series` objects are not required to be the same length and
will be aligned internally before the covariance is calculated.

Args:
other (Series):
Series with which to compute the covariance.

Returns:
float:
Covariance between Series and other normalized by N-1
(unbiased estimator).
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def diff(self) -> Series:
"""
First discrete difference of element.
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.