From d56bc74af357825d417ac2caef46c8e6ad3fc417 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Mon, 17 Jun 2024 19:51:16 +0000 Subject: [PATCH 1/7] feat: add .agg support for size --- bigframes/core/blocks.py | 55 +++++++++++++++++++++++----- bigframes/core/groupby/__init__.py | 6 ++- bigframes/operations/aggregations.py | 8 +++- bigframes/series.py | 5 ++- tests/system/small/test_dataframe.py | 14 +++++-- tests/system/small/test_groupby.py | 43 +++++++++++++++------- tests/system/small/test_series.py | 25 +++++++++++-- 7 files changed, 121 insertions(+), 35 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 2e7d6eb913..bd82f359b5 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -994,16 +994,21 @@ def filter(self, predicate: scalars.Expression): def aggregate_all_and_stack( self, - operation: agg_ops.UnaryAggregateOp, + operation: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp], *, - axis: int | str = 0, + axis: int | str = 1, value_col_id: str = "values", dropna: bool = True, ) -> Block: axis_n = utils.get_axis_number(axis) if axis_n == 0: aggregations = [ - (ex.UnaryAggregation(operation, ex.free_var(col_id)), col_id) + ( + ex.UnaryAggregation(operation, ex.free_var(col_id)) + if isinstance(operation, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(operation), + col_id, + ) for col_id in self.value_columns ] index_id = guid.generate_guid() @@ -1032,6 +1037,9 @@ def aggregate_all_and_stack( (ex.UnaryAggregation(agg_ops.AnyValueOp(), ex.free_var(col_id)), col_id) for col_id in [*self.index_columns] ] + # TODO: may need add NullaryAggregation in main_aggregation + # when agg add support for axis=1, needed for agg("size", axis=1) + assert isinstance(operation, agg_ops.UnaryAggregateOp) main_aggregation = ( ex.UnaryAggregation(operation, ex.free_var(value_col_id)), value_col_id, @@ -1124,7 +1132,11 @@ def remap_f(x): def aggregate( self, by_column_ids: typing.Sequence[str] = (), - aggregations: typing.Sequence[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = (), + aggregations: typing.Sequence[ + typing.Tuple[ + str, typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ] + ] = (), *, dropna: bool = True, ) -> typing.Tuple[Block, typing.Sequence[str]]: @@ -1138,7 +1150,9 @@ def aggregate( """ agg_specs = [ ( - ex.UnaryAggregation(operation, ex.free_var(input_id)), + ex.UnaryAggregation(operation, ex.free_var(input_id)) + if isinstance(operation, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(operation), guid.generate_guid(), ) for input_id, operation in aggregations @@ -1174,18 +1188,32 @@ def aggregate( output_col_ids, ) - def get_stat(self, column_id: str, stat: agg_ops.UnaryAggregateOp): + def get_stat( + self, + column_id: str, + stat: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp], + ): """Gets aggregates immediately, and caches it""" if stat.name in self._stats_cache[column_id]: return self._stats_cache[column_id][stat.name] # TODO: Convert nonstandard stats into standard stats where possible (popvar, etc.) # if getting a standard stat, just go get the rest of them - standard_stats = self._standard_stats(column_id) + standard_stats = typing.cast( + typing.Sequence[ + typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ], + self._standard_stats(column_id), + ) stats_to_fetch = standard_stats if stat in standard_stats else [stat] aggregations = [ - (ex.UnaryAggregation(stat, ex.free_var(column_id)), stat.name) + ( + ex.UnaryAggregation(stat, ex.free_var(column_id)) + if isinstance(stat, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(stat), + stat.name, + ) for stat in stats_to_fetch ] expr = self.expr.aggregate(aggregations) @@ -1230,13 +1258,20 @@ def get_binary_stat( def summarize( self, column_ids: typing.Sequence[str], - stats: typing.Sequence[agg_ops.UnaryAggregateOp], + stats: typing.Sequence[ + typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ], ): """Get a list of stats as a deferred block object.""" label_col_id = guid.generate_guid() labels = [stat.name for stat in stats] aggregations = [ - (ex.UnaryAggregation(stat, ex.free_var(col_id)), f"{col_id}-{stat.name}") + ( + ex.UnaryAggregation(stat, ex.free_var(col_id)) + if isinstance(stat, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(stat), + f"{col_id}-{stat.name}", + ) for stat in stats for col_id in column_ids ] diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index ee120635d3..0eb5ecc186 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -303,7 +303,11 @@ def _agg_string(self, func: str) -> df.DataFrame: return dataframe if self._as_index else self._convert_index(dataframe) def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: - aggregations: typing.List[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = [] + aggregations: typing.List[ + typing.Tuple[ + str, typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ] + ] = [] column_labels = [] want_aggfunc_level = any(utils.is_list_like(aggs) for aggs in func.values()) diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 783abfd788..c2acd02e91 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -487,7 +487,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT # TODO: Alternative names and lookup from numpy function objects -_AGGREGATIONS_LOOKUP: dict[str, UnaryAggregateOp] = { +# Add size_op separately to avoid Mypy type inference errors. +_AGGREGATIONS_LOOKUP: typing.Dict[ + str, typing.Union[UnaryAggregateOp, NullaryAggregateOp] +] = { op.name: op for op in [ sum_op, @@ -506,10 +509,11 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT ApproxQuartilesOp(2), ApproxQuartilesOp(3), ] + + [size_op] } -def lookup_agg_func(key: str) -> UnaryAggregateOp: +def lookup_agg_func(key: str) -> typing.Union[UnaryAggregateOp, NullaryAggregateOp]: if callable(key): raise NotImplementedError( "Aggregating with callable object not supported, pass method name as string instead (eg. 'sum' instead of np.sum)." diff --git a/bigframes/series.py b/bigframes/series.py index cb56319471..80797747c2 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -933,7 +933,6 @@ def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series: ) ) else: - return self._apply_aggregation( agg_ops.lookup_agg_func(typing.cast(str, func)) ) @@ -1211,7 +1210,9 @@ def _align3(self, other1: Series | scalars.Scalar, other2: Series | scalars.Scal values, index = self._align_n([other1, other2], how) return (values[0], values[1], values[2], index) - def _apply_aggregation(self, op: agg_ops.UnaryAggregateOp) -> Any: + def _apply_aggregation( + self, op: agg_ops.UnaryAggregateOp | agg_ops.NullaryAggregateOp + ) -> Any: return self._block.get_stat(self._value_column, op) def _apply_window_op( diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 782ef2d5ea..af77e189b0 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2485,12 +2485,19 @@ def test_dataframe_agg_single_string(scalars_dfs): ) -def test_dataframe_agg_int_single_string(scalars_dfs): +@pytest.mark.parametrize( + ("agg",), + ( + ("sum",), + ("size",), + ), +) +def test_dataframe_agg_int_single_string(scalars_dfs, agg): numeric_cols = ["int64_col", "int64_too", "bool_col"] scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df[numeric_cols].agg("sum").to_pandas() - pd_result = scalars_pandas_df[numeric_cols].agg("sum") + bf_result = scalars_df[numeric_cols].agg(agg).to_pandas() + pd_result = scalars_pandas_df[numeric_cols].agg(agg) assert bf_result.dtype == "Int64" pd.testing.assert_series_equal( @@ -2537,6 +2544,7 @@ def test_dataframe_agg_int_multi_string(scalars_dfs): "sum", "nunique", "count", + "size", ] scalars_df, scalars_pandas_df = scalars_dfs bf_result = scalars_df[numeric_cols].agg(aggregations).to_pandas() diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index b332d48574..66ee0ff527 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -133,11 +133,23 @@ def test_dataframe_groupby_agg_string( ) +def test_dataframe_groupby_agg_size_string(scalars_df_index, scalars_pandas_df_index): + col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"] + bf_result = scalars_df_index[col_names].groupby("string_col").agg("size") + pd_result = scalars_pandas_df_index[col_names].groupby("string_col").agg("size") + + pd.testing.assert_series_equal(pd_result, bf_result.to_pandas(), check_dtype=False) + + def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"] - bf_result = scalars_df_index[col_names].groupby("string_col").agg(["count", "min"]) + bf_result = ( + scalars_df_index[col_names].groupby("string_col").agg(["count", "min", "size"]) + ) pd_result = ( - scalars_pandas_df_index[col_names].groupby("string_col").agg(["count", "min"]) + scalars_pandas_df_index[col_names] + .groupby("string_col") + .agg(["count", "min", "size"]) ) bf_result_computed = bf_result.to_pandas() @@ -154,8 +166,8 @@ def test_dataframe_groupby_agg_list_w_column_multi_index( pd_df = scalars_pandas_df_index[columns].copy() pd_df.columns = multi_columns - bf_result = bf_df.groupby(level=0).agg(["count", "min"]) - pd_result = pd_df.groupby(level=0).agg(["count", "min"]) + bf_result = bf_df.groupby(level=0).agg(["count", "min", "size"]) + pd_result = pd_df.groupby(level=0).agg(["count", "min", "size"]) bf_result_computed = bf_result.to_pandas() pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False) @@ -175,12 +187,12 @@ def test_dataframe_groupby_agg_dict_with_list( bf_result = ( scalars_df_index[col_names] .groupby("string_col", as_index=as_index) - .agg({"int64_too": ["mean", "max"], "string_col": "count"}) + .agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"}) ) pd_result = ( scalars_pandas_df_index[col_names] .groupby("string_col", as_index=as_index) - .agg({"int64_too": ["mean", "max"], "string_col": "count"}) + .agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"}) ) bf_result_computed = bf_result.to_pandas() @@ -406,16 +418,21 @@ def test_dataframe_groupby_nonnumeric_with_mean(): # ============== -def test_series_groupby_agg_string(scalars_df_index, scalars_pandas_df_index): +@pytest.mark.parametrize( + ("agg"), + [ + ("count"), + ("size"), + ], +) +def test_series_groupby_agg_string(scalars_df_index, scalars_pandas_df_index, agg): bf_result = ( - scalars_df_index["int64_col"] - .groupby(scalars_df_index["string_col"]) - .agg("count") + scalars_df_index["int64_col"].groupby(scalars_df_index["string_col"]).agg(agg) ) pd_result = ( scalars_pandas_df_index["int64_col"] .groupby(scalars_pandas_df_index["string_col"]) - .agg("count") + .agg(agg) ) bf_result_computed = bf_result.to_pandas() @@ -428,12 +445,12 @@ def test_series_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): bf_result = ( scalars_df_index["int64_col"] .groupby(scalars_df_index["string_col"]) - .agg(["sum", "mean"]) + .agg(["sum", "mean", "size"]) ) pd_result = ( scalars_pandas_df_index["int64_col"] .groupby(scalars_pandas_df_index["string_col"]) - .agg(["sum", "mean"]) + .agg(["sum", "mean", "size"]) ) bf_result_computed = bf_result.to_pandas() diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 3e21418f2f..dc48f7a990 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -506,15 +506,32 @@ def test_series_dropna(scalars_dfs, ignore_index): pd.testing.assert_series_equal(pd_result, bf_result, check_index_type=False) -def test_series_agg_single_string(scalars_dfs): +@pytest.mark.parametrize( + ("agg",), + ( + ("sum",), + ("size",), + ), +) +def test_series_agg_single_string(scalars_dfs, agg): scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_col"].agg("sum") - pd_result = scalars_pandas_df["int64_col"].agg("sum") + bf_result = scalars_df["int64_col"].agg(agg) + pd_result = scalars_pandas_df["int64_col"].agg(agg) assert math.isclose(pd_result, bf_result) def test_series_agg_multi_string(scalars_dfs): - aggregations = ["sum", "mean", "std", "var", "min", "max", "nunique", "count"] + aggregations = [ + "sum", + "mean", + "std", + "var", + "min", + "max", + "nunique", + "count", + "size", + ] scalars_df, scalars_pandas_df = scalars_dfs bf_result = scalars_df["int64_col"].agg(aggregations).to_pandas() pd_result = scalars_pandas_df["int64_col"].agg(aggregations) From 721544e543e835e656727fa8dd256950efdcc880 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Mon, 17 Jun 2024 20:23:47 +0000 Subject: [PATCH 2/7] undo test change. --- bigframes/core/blocks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index bd82f359b5..10f9708c92 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -996,7 +996,7 @@ def aggregate_all_and_stack( self, operation: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp], *, - axis: int | str = 1, + axis: int | str = 0, value_col_id: str = "values", dropna: bool = True, ) -> Block: From 5a31af02c43ec6b62e93bead1c0ced19c81d1d10 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Mon, 17 Jun 2024 22:58:35 +0000 Subject: [PATCH 3/7] logic fix. --- bigframes/core/groupby/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 0eb5ecc186..e0fcd29936 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -277,7 +277,7 @@ def expanding(self, min_periods: int = 1) -> windows.Window: def agg(self, func=None, **kwargs) -> df.DataFrame: if func: if isinstance(func, str): - return self._agg_string(func) + return self.size() if func == "size" else self._agg_string(func) elif utils.is_dict_like(func): return self._agg_dict(func) elif utils.is_list_like(func): From b8cae0048dc4a5cd8a80483f779c642abdfaba15 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Tue, 18 Jun 2024 00:08:46 +0000 Subject: [PATCH 4/7] type update --- bigframes/core/groupby/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index e0fcd29936..c700c737db 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -274,7 +274,7 @@ def expanding(self, min_periods: int = 1) -> windows.Window: block, window_spec, self._selected_cols, drop_null_groups=self._dropna ) - def agg(self, func=None, **kwargs) -> df.DataFrame: + def agg(self, func=None, **kwargs) -> typing.Union[df.DataFrame, series.Series]: if func: if isinstance(func, str): return self.size() if func == "size" else self._agg_string(func) From d3d6b091929079daa9ef38bab0563e31fac1873a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Wed, 26 Jun 2024 13:01:42 -0500 Subject: [PATCH 5/7] Apply suggestions from code review --- bigframes/core/blocks.py | 2 +- bigframes/operations/aggregations.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 15178c848b..795b8fb75b 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1039,7 +1039,7 @@ def aggregate_all_and_stack( ] # TODO: may need add NullaryAggregation in main_aggregation # when agg add support for axis=1, needed for agg("size", axis=1) - assert isinstance(operation, agg_ops.UnaryAggregateOp) + assert isinstance(operation, agg_ops.UnaryAggregateOp), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)." main_aggregation = ( ex.UnaryAggregation(operation, ex.free_var(value_col_id)), value_col_id, diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index c2acd02e91..619183287f 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -487,7 +487,6 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT # TODO: Alternative names and lookup from numpy function objects -# Add size_op separately to avoid Mypy type inference errors. _AGGREGATIONS_LOOKUP: typing.Dict[ str, typing.Union[UnaryAggregateOp, NullaryAggregateOp] ] = { @@ -509,7 +508,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT ApproxQuartilesOp(2), ApproxQuartilesOp(3), ] - + [size_op] + + [ + # Add size_op separately to avoid Mypy type inference errors. + size_op, + ] } From 59eee909ec4f61c36d0eec85de787c7648942b67 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 26 Jun 2024 18:03:36 +0000 Subject: [PATCH 6/7] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- bigframes/core/blocks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 7968791a2d..f40dfc0071 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1040,7 +1040,9 @@ def aggregate_all_and_stack( ] # TODO: may need add NullaryAggregation in main_aggregation # when agg add support for axis=1, needed for agg("size", axis=1) - assert isinstance(operation, agg_ops.UnaryAggregateOp), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)." + assert isinstance( + operation, agg_ops.UnaryAggregateOp + ), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)." main_aggregation = ( ex.UnaryAggregation(operation, ex.free_var(value_col_id)), value_col_id, From f2a45b931ea100828df5639a18531bcb3ad40332 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 26 Jun 2024 18:03:47 +0000 Subject: [PATCH 7/7] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- bigframes/core/blocks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 7968791a2d..f40dfc0071 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1040,7 +1040,9 @@ def aggregate_all_and_stack( ] # TODO: may need add NullaryAggregation in main_aggregation # when agg add support for axis=1, needed for agg("size", axis=1) - assert isinstance(operation, agg_ops.UnaryAggregateOp), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)." + assert isinstance( + operation, agg_ops.UnaryAggregateOp + ), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)." main_aggregation = ( ex.UnaryAggregation(operation, ex.free_var(value_col_id)), value_col_id,