Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: add .agg support for size #792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 26, 2024
55 changes: 46 additions & 9 deletions 55 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ def filter(self, predicate: scalars.Expression):

def aggregate_all_and_stack(
self,
operation: agg_ops.UnaryAggregateOp,
operation: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp],
*,
axis: int | str = 0,
value_col_id: str = "values",
Expand All @@ -1004,7 +1004,12 @@ def aggregate_all_and_stack(
axis_n = utils.get_axis_number(axis)
if axis_n == 0:
aggregations = [
(ex.UnaryAggregation(operation, ex.free_var(col_id)), col_id)
(
ex.UnaryAggregation(operation, ex.free_var(col_id))
if isinstance(operation, agg_ops.UnaryAggregateOp)
else ex.NullaryAggregation(operation),
col_id,
)
for col_id in self.value_columns
]
index_id = guid.generate_guid()
Expand Down Expand Up @@ -1033,6 +1038,11 @@ def aggregate_all_and_stack(
(ex.UnaryAggregation(agg_ops.AnyValueOp(), ex.free_var(col_id)), col_id)
for col_id in [*self.index_columns]
]
# TODO: may need add NullaryAggregation in main_aggregation
# when agg add support for axis=1, needed for agg("size", axis=1)
assert isinstance(
operation, agg_ops.UnaryAggregateOp
), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)."
main_aggregation = (
ex.UnaryAggregation(operation, ex.free_var(value_col_id)),
value_col_id,
Expand Down Expand Up @@ -1125,7 +1135,11 @@ def remap_f(x):
def aggregate(
self,
by_column_ids: typing.Sequence[str] = (),
aggregations: typing.Sequence[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = (),
aggregations: typing.Sequence[
typing.Tuple[
str, typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp]
]
] = (),
*,
dropna: bool = True,
) -> typing.Tuple[Block, typing.Sequence[str]]:
Expand All @@ -1139,7 +1153,9 @@ def aggregate(
"""
agg_specs = [
(
ex.UnaryAggregation(operation, ex.free_var(input_id)),
ex.UnaryAggregation(operation, ex.free_var(input_id))
if isinstance(operation, agg_ops.UnaryAggregateOp)
else ex.NullaryAggregation(operation),
guid.generate_guid(),
)
for input_id, operation in aggregations
Expand Down Expand Up @@ -1175,18 +1191,32 @@ def aggregate(
output_col_ids,
)

def get_stat(self, column_id: str, stat: agg_ops.UnaryAggregateOp):
def get_stat(
self,
column_id: str,
stat: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp],
):
"""Gets aggregates immediately, and caches it"""
if stat.name in self._stats_cache[column_id]:
return self._stats_cache[column_id][stat.name]

# TODO: Convert nonstandard stats into standard stats where possible (popvar, etc.)
# if getting a standard stat, just go get the rest of them
standard_stats = self._standard_stats(column_id)
standard_stats = typing.cast(
typing.Sequence[
typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp]
],
self._standard_stats(column_id),
)
stats_to_fetch = standard_stats if stat in standard_stats else [stat]

aggregations = [
(ex.UnaryAggregation(stat, ex.free_var(column_id)), stat.name)
(
ex.UnaryAggregation(stat, ex.free_var(column_id))
if isinstance(stat, agg_ops.UnaryAggregateOp)
else ex.NullaryAggregation(stat),
stat.name,
)
for stat in stats_to_fetch
]
expr = self.expr.aggregate(aggregations)
Expand Down Expand Up @@ -1231,13 +1261,20 @@ def get_binary_stat(
def summarize(
self,
column_ids: typing.Sequence[str],
stats: typing.Sequence[agg_ops.UnaryAggregateOp],
stats: typing.Sequence[
typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp]
],
):
"""Get a list of stats as a deferred block object."""
label_col_id = guid.generate_guid()
labels = [stat.name for stat in stats]
aggregations = [
(ex.UnaryAggregation(stat, ex.free_var(col_id)), f"{col_id}-{stat.name}")
(
ex.UnaryAggregation(stat, ex.free_var(col_id))
if isinstance(stat, agg_ops.UnaryAggregateOp)
else ex.NullaryAggregation(stat),
f"{col_id}-{stat.name}",
)
for stat in stats
for col_id in column_ids
]
Expand Down
10 changes: 7 additions & 3 deletions 10 bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,10 @@ def expanding(self, min_periods: int = 1) -> windows.Window:
block, window_spec, self._selected_cols, drop_null_groups=self._dropna
)

def agg(self, func=None, **kwargs) -> df.DataFrame:
def agg(self, func=None, **kwargs) -> typing.Union[df.DataFrame, series.Series]:
if func:
if isinstance(func, str):
return self._agg_string(func)
return self.size() if func == "size" else self._agg_string(func)
elif utils.is_dict_like(func):
return self._agg_dict(func)
elif utils.is_list_like(func):
Expand All @@ -315,7 +315,11 @@ def _agg_string(self, func: str) -> df.DataFrame:
return dataframe if self._as_index else self._convert_index(dataframe)

def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
aggregations: typing.List[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = []
aggregations: typing.List[
typing.Tuple[
str, typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp]
]
] = []
column_labels = []

want_aggfunc_level = any(utils.is_list_like(aggs) for aggs in func.values())
Expand Down
10 changes: 8 additions & 2 deletions 10 bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,9 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT


# TODO: Alternative names and lookup from numpy function objects
_AGGREGATIONS_LOOKUP: dict[str, UnaryAggregateOp] = {
_AGGREGATIONS_LOOKUP: typing.Dict[
str, typing.Union[UnaryAggregateOp, NullaryAggregateOp]
] = {
op.name: op
for op in [
sum_op,
Expand All @@ -506,10 +508,14 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
ApproxQuartilesOp(2),
ApproxQuartilesOp(3),
]
+ [
# Add size_op separately to avoid Mypy type inference errors.
size_op,
]
}


def lookup_agg_func(key: str) -> UnaryAggregateOp:
def lookup_agg_func(key: str) -> typing.Union[UnaryAggregateOp, NullaryAggregateOp]:
if callable(key):
raise NotImplementedError(
"Aggregating with callable object not supported, pass method name as string instead (eg. 'sum' instead of np.sum)."
Expand Down
5 changes: 3 additions & 2 deletions 5 bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,6 @@ def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series:
)
)
else:

return self._apply_aggregation(
agg_ops.lookup_agg_func(typing.cast(str, func))
)
Expand Down Expand Up @@ -1246,7 +1245,9 @@ def _align3(self, other1: Series | scalars.Scalar, other2: Series | scalars.Scal
values, index = self._align_n([other1, other2], how)
return (values[0], values[1], values[2], index)

def _apply_aggregation(self, op: agg_ops.UnaryAggregateOp) -> Any:
def _apply_aggregation(
self, op: agg_ops.UnaryAggregateOp | agg_ops.NullaryAggregateOp
) -> Any:
return self._block.get_stat(self._value_column, op)

def _apply_window_op(
Expand Down
14 changes: 11 additions & 3 deletions 14 tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2485,12 +2485,19 @@ def test_dataframe_agg_single_string(scalars_dfs):
)


def test_dataframe_agg_int_single_string(scalars_dfs):
@pytest.mark.parametrize(
("agg",),
(
("sum",),
("size",),
),
)
def test_dataframe_agg_int_single_string(scalars_dfs, agg):
numeric_cols = ["int64_col", "int64_too", "bool_col"]
scalars_df, scalars_pandas_df = scalars_dfs

bf_result = scalars_df[numeric_cols].agg("sum").to_pandas()
pd_result = scalars_pandas_df[numeric_cols].agg("sum")
bf_result = scalars_df[numeric_cols].agg(agg).to_pandas()
pd_result = scalars_pandas_df[numeric_cols].agg(agg)

assert bf_result.dtype == "Int64"
pd.testing.assert_series_equal(
Expand Down Expand Up @@ -2537,6 +2544,7 @@ def test_dataframe_agg_int_multi_string(scalars_dfs):
"sum",
"nunique",
"count",
"size",
]
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = scalars_df[numeric_cols].agg(aggregations).to_pandas()
Expand Down
43 changes: 30 additions & 13 deletions 43 tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,23 @@ def test_dataframe_groupby_agg_string(
)


def test_dataframe_groupby_agg_size_string(scalars_df_index, scalars_pandas_df_index):
col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"]
bf_result = scalars_df_index[col_names].groupby("string_col").agg("size")
pd_result = scalars_pandas_df_index[col_names].groupby("string_col").agg("size")

pd.testing.assert_series_equal(pd_result, bf_result.to_pandas(), check_dtype=False)


def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index):
col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"]
bf_result = scalars_df_index[col_names].groupby("string_col").agg(["count", "min"])
bf_result = (
scalars_df_index[col_names].groupby("string_col").agg(["count", "min", "size"])
)
pd_result = (
scalars_pandas_df_index[col_names].groupby("string_col").agg(["count", "min"])
scalars_pandas_df_index[col_names]
.groupby("string_col")
.agg(["count", "min", "size"])
)
bf_result_computed = bf_result.to_pandas()

Expand All @@ -161,8 +173,8 @@ def test_dataframe_groupby_agg_list_w_column_multi_index(
pd_df = scalars_pandas_df_index[columns].copy()
pd_df.columns = multi_columns

bf_result = bf_df.groupby(level=0).agg(["count", "min"])
pd_result = pd_df.groupby(level=0).agg(["count", "min"])
bf_result = bf_df.groupby(level=0).agg(["count", "min", "size"])
pd_result = pd_df.groupby(level=0).agg(["count", "min", "size"])

bf_result_computed = bf_result.to_pandas()
pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)
Expand All @@ -182,12 +194,12 @@ def test_dataframe_groupby_agg_dict_with_list(
bf_result = (
scalars_df_index[col_names]
.groupby("string_col", as_index=as_index)
.agg({"int64_too": ["mean", "max"], "string_col": "count"})
.agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"})
)
pd_result = (
scalars_pandas_df_index[col_names]
.groupby("string_col", as_index=as_index)
.agg({"int64_too": ["mean", "max"], "string_col": "count"})
.agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"})
)
bf_result_computed = bf_result.to_pandas()

Expand Down Expand Up @@ -413,16 +425,21 @@ def test_dataframe_groupby_nonnumeric_with_mean():
# ==============


def test_series_groupby_agg_string(scalars_df_index, scalars_pandas_df_index):
@pytest.mark.parametrize(
("agg"),
[
("count"),
("size"),
],
)
def test_series_groupby_agg_string(scalars_df_index, scalars_pandas_df_index, agg):
bf_result = (
scalars_df_index["int64_col"]
.groupby(scalars_df_index["string_col"])
.agg("count")
scalars_df_index["int64_col"].groupby(scalars_df_index["string_col"]).agg(agg)
)
pd_result = (
scalars_pandas_df_index["int64_col"]
.groupby(scalars_pandas_df_index["string_col"])
.agg("count")
.agg(agg)
)
bf_result_computed = bf_result.to_pandas()

Expand All @@ -435,12 +452,12 @@ def test_series_groupby_agg_list(scalars_df_index, scalars_pandas_df_index):
bf_result = (
scalars_df_index["int64_col"]
.groupby(scalars_df_index["string_col"])
.agg(["sum", "mean"])
.agg(["sum", "mean", "size"])
)
pd_result = (
scalars_pandas_df_index["int64_col"]
.groupby(scalars_pandas_df_index["string_col"])
.agg(["sum", "mean"])
.agg(["sum", "mean", "size"])
)
bf_result_computed = bf_result.to_pandas()

Expand Down
25 changes: 21 additions & 4 deletions 25 tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,15 +506,32 @@ def test_series_dropna(scalars_dfs, ignore_index):
pd.testing.assert_series_equal(pd_result, bf_result, check_index_type=False)


def test_series_agg_single_string(scalars_dfs):
@pytest.mark.parametrize(
("agg",),
(
("sum",),
("size",),
),
)
def test_series_agg_single_string(scalars_dfs, agg):
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = scalars_df["int64_col"].agg("sum")
pd_result = scalars_pandas_df["int64_col"].agg("sum")
bf_result = scalars_df["int64_col"].agg(agg)
pd_result = scalars_pandas_df["int64_col"].agg(agg)
assert math.isclose(pd_result, bf_result)


def test_series_agg_multi_string(scalars_dfs):
aggregations = ["sum", "mean", "std", "var", "min", "max", "nunique", "count"]
aggregations = [
"sum",
"mean",
"std",
"var",
"min",
"max",
"nunique",
"count",
"size",
]
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = scalars_df["int64_col"].agg(aggregations).to_pandas()
pd_result = scalars_pandas_df["int64_col"].agg(aggregations)
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.