diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 8c90828091..f40dfc0071 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -995,7 +995,7 @@ def filter(self, predicate: scalars.Expression): def aggregate_all_and_stack( self, - operation: agg_ops.UnaryAggregateOp, + operation: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp], *, axis: int | str = 0, value_col_id: str = "values", @@ -1004,7 +1004,12 @@ def aggregate_all_and_stack( axis_n = utils.get_axis_number(axis) if axis_n == 0: aggregations = [ - (ex.UnaryAggregation(operation, ex.free_var(col_id)), col_id) + ( + ex.UnaryAggregation(operation, ex.free_var(col_id)) + if isinstance(operation, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(operation), + col_id, + ) for col_id in self.value_columns ] index_id = guid.generate_guid() @@ -1033,6 +1038,11 @@ def aggregate_all_and_stack( (ex.UnaryAggregation(agg_ops.AnyValueOp(), ex.free_var(col_id)), col_id) for col_id in [*self.index_columns] ] + # TODO: may need add NullaryAggregation in main_aggregation + # when agg add support for axis=1, needed for agg("size", axis=1) + assert isinstance( + operation, agg_ops.UnaryAggregateOp + ), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)." main_aggregation = ( ex.UnaryAggregation(operation, ex.free_var(value_col_id)), value_col_id, @@ -1125,7 +1135,11 @@ def remap_f(x): def aggregate( self, by_column_ids: typing.Sequence[str] = (), - aggregations: typing.Sequence[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = (), + aggregations: typing.Sequence[ + typing.Tuple[ + str, typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ] + ] = (), *, dropna: bool = True, ) -> typing.Tuple[Block, typing.Sequence[str]]: @@ -1139,7 +1153,9 @@ def aggregate( """ agg_specs = [ ( - ex.UnaryAggregation(operation, ex.free_var(input_id)), + ex.UnaryAggregation(operation, ex.free_var(input_id)) + if isinstance(operation, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(operation), guid.generate_guid(), ) for input_id, operation in aggregations @@ -1175,18 +1191,32 @@ def aggregate( output_col_ids, ) - def get_stat(self, column_id: str, stat: agg_ops.UnaryAggregateOp): + def get_stat( + self, + column_id: str, + stat: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp], + ): """Gets aggregates immediately, and caches it""" if stat.name in self._stats_cache[column_id]: return self._stats_cache[column_id][stat.name] # TODO: Convert nonstandard stats into standard stats where possible (popvar, etc.) # if getting a standard stat, just go get the rest of them - standard_stats = self._standard_stats(column_id) + standard_stats = typing.cast( + typing.Sequence[ + typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ], + self._standard_stats(column_id), + ) stats_to_fetch = standard_stats if stat in standard_stats else [stat] aggregations = [ - (ex.UnaryAggregation(stat, ex.free_var(column_id)), stat.name) + ( + ex.UnaryAggregation(stat, ex.free_var(column_id)) + if isinstance(stat, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(stat), + stat.name, + ) for stat in stats_to_fetch ] expr = self.expr.aggregate(aggregations) @@ -1231,13 +1261,20 @@ def get_binary_stat( def summarize( self, column_ids: typing.Sequence[str], - stats: typing.Sequence[agg_ops.UnaryAggregateOp], + stats: typing.Sequence[ + typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ], ): """Get a list of stats as a deferred block object.""" label_col_id = guid.generate_guid() labels = [stat.name for stat in stats] aggregations = [ - (ex.UnaryAggregation(stat, ex.free_var(col_id)), f"{col_id}-{stat.name}") + ( + ex.UnaryAggregation(stat, ex.free_var(col_id)) + if isinstance(stat, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(stat), + f"{col_id}-{stat.name}", + ) for stat in stats for col_id in column_ids ] diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 9c2bf18caa..2b23ccf0e4 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -286,10 +286,10 @@ def expanding(self, min_periods: int = 1) -> windows.Window: block, window_spec, self._selected_cols, drop_null_groups=self._dropna ) - def agg(self, func=None, **kwargs) -> df.DataFrame: + def agg(self, func=None, **kwargs) -> typing.Union[df.DataFrame, series.Series]: if func: if isinstance(func, str): - return self._agg_string(func) + return self.size() if func == "size" else self._agg_string(func) elif utils.is_dict_like(func): return self._agg_dict(func) elif utils.is_list_like(func): @@ -315,7 +315,11 @@ def _agg_string(self, func: str) -> df.DataFrame: return dataframe if self._as_index else self._convert_index(dataframe) def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: - aggregations: typing.List[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = [] + aggregations: typing.List[ + typing.Tuple[ + str, typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ] + ] = [] column_labels = [] want_aggfunc_level = any(utils.is_list_like(aggs) for aggs in func.values()) diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 783abfd788..619183287f 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -487,7 +487,9 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT # TODO: Alternative names and lookup from numpy function objects -_AGGREGATIONS_LOOKUP: dict[str, UnaryAggregateOp] = { +_AGGREGATIONS_LOOKUP: typing.Dict[ + str, typing.Union[UnaryAggregateOp, NullaryAggregateOp] +] = { op.name: op for op in [ sum_op, @@ -506,10 +508,14 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT ApproxQuartilesOp(2), ApproxQuartilesOp(3), ] + + [ + # Add size_op separately to avoid Mypy type inference errors. + size_op, + ] } -def lookup_agg_func(key: str) -> UnaryAggregateOp: +def lookup_agg_func(key: str) -> typing.Union[UnaryAggregateOp, NullaryAggregateOp]: if callable(key): raise NotImplementedError( "Aggregating with callable object not supported, pass method name as string instead (eg. 'sum' instead of np.sum)." diff --git a/bigframes/series.py b/bigframes/series.py index 57543abef3..3334321158 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -968,7 +968,6 @@ def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series: ) ) else: - return self._apply_aggregation( agg_ops.lookup_agg_func(typing.cast(str, func)) ) @@ -1246,7 +1245,9 @@ def _align3(self, other1: Series | scalars.Scalar, other2: Series | scalars.Scal values, index = self._align_n([other1, other2], how) return (values[0], values[1], values[2], index) - def _apply_aggregation(self, op: agg_ops.UnaryAggregateOp) -> Any: + def _apply_aggregation( + self, op: agg_ops.UnaryAggregateOp | agg_ops.NullaryAggregateOp + ) -> Any: return self._block.get_stat(self._value_column, op) def _apply_window_op( diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 0aac9e2578..a5c810b91b 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2485,12 +2485,19 @@ def test_dataframe_agg_single_string(scalars_dfs): ) -def test_dataframe_agg_int_single_string(scalars_dfs): +@pytest.mark.parametrize( + ("agg",), + ( + ("sum",), + ("size",), + ), +) +def test_dataframe_agg_int_single_string(scalars_dfs, agg): numeric_cols = ["int64_col", "int64_too", "bool_col"] scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df[numeric_cols].agg("sum").to_pandas() - pd_result = scalars_pandas_df[numeric_cols].agg("sum") + bf_result = scalars_df[numeric_cols].agg(agg).to_pandas() + pd_result = scalars_pandas_df[numeric_cols].agg(agg) assert bf_result.dtype == "Int64" pd.testing.assert_series_equal( @@ -2537,6 +2544,7 @@ def test_dataframe_agg_int_multi_string(scalars_dfs): "sum", "nunique", "count", + "size", ] scalars_df, scalars_pandas_df = scalars_dfs bf_result = scalars_df[numeric_cols].agg(aggregations).to_pandas() diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index 960dc10948..8e3baff4c2 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -140,11 +140,23 @@ def test_dataframe_groupby_agg_string( ) +def test_dataframe_groupby_agg_size_string(scalars_df_index, scalars_pandas_df_index): + col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"] + bf_result = scalars_df_index[col_names].groupby("string_col").agg("size") + pd_result = scalars_pandas_df_index[col_names].groupby("string_col").agg("size") + + pd.testing.assert_series_equal(pd_result, bf_result.to_pandas(), check_dtype=False) + + def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"] - bf_result = scalars_df_index[col_names].groupby("string_col").agg(["count", "min"]) + bf_result = ( + scalars_df_index[col_names].groupby("string_col").agg(["count", "min", "size"]) + ) pd_result = ( - scalars_pandas_df_index[col_names].groupby("string_col").agg(["count", "min"]) + scalars_pandas_df_index[col_names] + .groupby("string_col") + .agg(["count", "min", "size"]) ) bf_result_computed = bf_result.to_pandas() @@ -161,8 +173,8 @@ def test_dataframe_groupby_agg_list_w_column_multi_index( pd_df = scalars_pandas_df_index[columns].copy() pd_df.columns = multi_columns - bf_result = bf_df.groupby(level=0).agg(["count", "min"]) - pd_result = pd_df.groupby(level=0).agg(["count", "min"]) + bf_result = bf_df.groupby(level=0).agg(["count", "min", "size"]) + pd_result = pd_df.groupby(level=0).agg(["count", "min", "size"]) bf_result_computed = bf_result.to_pandas() pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False) @@ -182,12 +194,12 @@ def test_dataframe_groupby_agg_dict_with_list( bf_result = ( scalars_df_index[col_names] .groupby("string_col", as_index=as_index) - .agg({"int64_too": ["mean", "max"], "string_col": "count"}) + .agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"}) ) pd_result = ( scalars_pandas_df_index[col_names] .groupby("string_col", as_index=as_index) - .agg({"int64_too": ["mean", "max"], "string_col": "count"}) + .agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"}) ) bf_result_computed = bf_result.to_pandas() @@ -413,16 +425,21 @@ def test_dataframe_groupby_nonnumeric_with_mean(): # ============== -def test_series_groupby_agg_string(scalars_df_index, scalars_pandas_df_index): +@pytest.mark.parametrize( + ("agg"), + [ + ("count"), + ("size"), + ], +) +def test_series_groupby_agg_string(scalars_df_index, scalars_pandas_df_index, agg): bf_result = ( - scalars_df_index["int64_col"] - .groupby(scalars_df_index["string_col"]) - .agg("count") + scalars_df_index["int64_col"].groupby(scalars_df_index["string_col"]).agg(agg) ) pd_result = ( scalars_pandas_df_index["int64_col"] .groupby(scalars_pandas_df_index["string_col"]) - .agg("count") + .agg(agg) ) bf_result_computed = bf_result.to_pandas() @@ -435,12 +452,12 @@ def test_series_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): bf_result = ( scalars_df_index["int64_col"] .groupby(scalars_df_index["string_col"]) - .agg(["sum", "mean"]) + .agg(["sum", "mean", "size"]) ) pd_result = ( scalars_pandas_df_index["int64_col"] .groupby(scalars_pandas_df_index["string_col"]) - .agg(["sum", "mean"]) + .agg(["sum", "mean", "size"]) ) bf_result_computed = bf_result.to_pandas() diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index cb28686d59..10fcec63ce 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -506,15 +506,32 @@ def test_series_dropna(scalars_dfs, ignore_index): pd.testing.assert_series_equal(pd_result, bf_result, check_index_type=False) -def test_series_agg_single_string(scalars_dfs): +@pytest.mark.parametrize( + ("agg",), + ( + ("sum",), + ("size",), + ), +) +def test_series_agg_single_string(scalars_dfs, agg): scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_col"].agg("sum") - pd_result = scalars_pandas_df["int64_col"].agg("sum") + bf_result = scalars_df["int64_col"].agg(agg) + pd_result = scalars_pandas_df["int64_col"].agg(agg) assert math.isclose(pd_result, bf_result) def test_series_agg_multi_string(scalars_dfs): - aggregations = ["sum", "mean", "std", "var", "min", "max", "nunique", "count"] + aggregations = [ + "sum", + "mean", + "std", + "var", + "min", + "max", + "nunique", + "count", + "size", + ] scalars_df, scalars_pandas_df = scalars_dfs bf_result = scalars_df["int64_col"].agg(aggregations).to_pandas() pd_result = scalars_pandas_df["int64_col"].agg(aggregations)