From 728a4dec0154d2b08703e4abbd8f53b3ab54a151 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sun, 7 Apr 2024 06:09:02 +0000 Subject: [PATCH 1/4] fix: keep most relevant dtype for aggregates --- bigframes/core/blocks.py | 38 ++++++++-- tests/system/small/test_dataframe.py | 106 +++++++++++++++++++++++---- 2 files changed, 123 insertions(+), 21 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index c7b41e93eb..73df38793a 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -886,9 +886,11 @@ def aggregate_all_and_stack( axis: int | str = 0, value_col_id: str = "values", dropna: bool = True, - dtype: typing.Union[ - bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...] - ] = pd.Float64Dtype(), + dtype: Optional[ + typing.Union[ + bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...] + ] + ] = None, ) -> Block: axis_n = utils.get_axis_number(axis) if axis_n == 0: @@ -899,7 +901,22 @@ def aggregate_all_and_stack( index_col_ids = [ guid.generate_guid() for i in range(self.column_labels.nlevels) ] - result_expr = self.expr.aggregate(aggregations, dropna=dropna).unpivot( + aggregate_expr = self.expr.aggregate(aggregations, dropna=dropna) + + # if all aggregates are of the same dtype then we should set that + # dtype in the result + if dtype is None: + aggregate_dtypes = set( + [ + aggregate_expr.get_column_type(c) + for c in aggregate_expr.column_ids + ] + ) + if len(aggregate_dtypes) == 1: + dtype = aggregate_dtypes.pop() + if dtype is None: + dtype = pd.Float64Dtype() + result_expr = aggregate_expr.unpivot( row_labels=self.column_labels.to_list(), index_col_ids=index_col_ids, unpivot_columns=tuple([(value_col_id, tuple(self.value_columns))]), @@ -916,6 +933,8 @@ def aggregate_all_and_stack( # TODO: Allow to promote identity/total_order columns instead for better perf offset_col = guid.generate_guid() expr_with_offsets = self.expr.promote_offsets(offset_col) + if dtype is None: + dtype = pd.Float64Dtype() stacked_expr = expr_with_offsets.unpivot( row_labels=self.column_labels.to_list(), index_col_ids=[guid.generate_guid()], @@ -1102,10 +1121,19 @@ def summarize( (col_id, tuple(f"{col_id}-{stat.name}" for stat in stats)) for col_id in column_ids ] - expr = self.expr.aggregate(aggregations).unpivot( + aggregate_expr = self.expr.aggregate(aggregations) + aggregate_dtypes = set( + [aggregate_expr.get_column_type(c) for c in aggregate_expr.column_ids] + ) + if len(aggregate_dtypes) == 1: + dtype = aggregate_dtypes.pop() + else: + dtype = pd.Float64Dtype() + expr = aggregate_expr.unpivot( labels, unpivot_columns=tuple(columns), index_col_ids=tuple([label_col_id]), + dtype=dtype, ) return Block( expr, diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 5d6a859c11..6c5c92506b 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2397,11 +2397,23 @@ def test_dataframe_pct_change(scalars_df_index, scalars_pandas_df_index, periods def test_dataframe_agg_single_string(scalars_dfs): numeric_cols = ["int64_col", "int64_too", "float64_col"] scalars_df, scalars_pandas_df = scalars_dfs + bf_result = scalars_df[numeric_cols].agg("sum").to_pandas() pd_result = scalars_pandas_df[numeric_cols].agg("sum") - # Pandas may produce narrower numeric types, but bigframes always produces Float64 - pd_result = pd_result.astype("Float64") + assert bf_result.dtype == "Float64" + pd.testing.assert_series_equal(pd_result, bf_result, check_index_type=False) + + +def test_dataframe_agg_int_single_string(scalars_dfs): + numeric_cols = ["int64_col", "int64_too", "bool_col"] + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df[numeric_cols].agg("sum").to_pandas() + pd_result = scalars_pandas_df[numeric_cols].agg("sum") + + assert bf_result.dtype == "Int64" + # Pandas has object index type pd.testing.assert_series_equal(pd_result, bf_result, check_index_type=False) @@ -2438,6 +2450,27 @@ def test_dataframe_agg_multi_string(scalars_dfs): ).all() +def test_dataframe_agg_int_multi_string(scalars_dfs): + numeric_cols = ["int64_col", "int64_too", "bool_col"] + aggregations = [ + "sum", + "nunique", + "count", + ] + scalars_df, scalars_pandas_df = scalars_dfs + bf_result = scalars_df[numeric_cols].agg(aggregations).to_pandas() + pd_result = scalars_pandas_df[numeric_cols].agg(aggregations) + + for dtype in bf_result.dtypes: + assert dtype == "Int64" + + # Pandas may produce narrower numeric types + # Pandas has object index type + pd.testing.assert_frame_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + + @skip_legacy_pandas def test_df_describe(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs @@ -2927,6 +2960,39 @@ def test_loc_setitem_bool_series_scalar_error(scalars_dfs): pd_df.loc[pd_df["int64_too"] == 1, "string_col"] = 99 +@pytest.mark.parametrize( + ("col", "op"), + [ + # Int aggregates + pytest.param("int64_col", lambda x: x.sum(), id="int-sum"), + pytest.param("int64_col", lambda x: x.min(), id="int-min"), + pytest.param("int64_col", lambda x: x.max(), id="int-max"), + pytest.param("int64_col", lambda x: x.count(), id="int-count"), + pytest.param("int64_col", lambda x: x.nunique(), id="int-nunique"), + # Float aggregates + pytest.param("float64_col", lambda x: x.count(), id="float-count"), + pytest.param("float64_col", lambda x: x.nunique(), id="float-nunique"), + # Bool aggregates + pytest.param("bool_col", lambda x: x.sum(), id="bool-sum"), + pytest.param("bool_col", lambda x: x.count(), id="bool-count"), + pytest.param("bool_col", lambda x: x.nunique(), id="bool-nunique"), + # String aggregates + pytest.param("string_col", lambda x: x.count(), id="string-count"), + pytest.param("string_col", lambda x: x.nunique(), id="string-nunique"), + ], +) +def test_dataframe_aggregate_int(scalars_df_index, scalars_pandas_df_index, col, op): + bf_result = op(scalars_df_index[[col]]).to_pandas() + pd_result = op(scalars_pandas_df_index[[col]]) + + # Check dtype separately + assert bf_result.dtype == "Int64" + + # Pandas may produce narrower numeric types + # Pandas has object index type + assert_series_equal(pd_result, bf_result, check_dtype=False, check_index_type=False) + + @pytest.mark.parametrize( ("ordered"), [ @@ -2935,30 +3001,38 @@ def test_loc_setitem_bool_series_scalar_error(scalars_dfs): ], ) @pytest.mark.parametrize( - ("op"), + ("op", "bf_dtype"), [ - (lambda x: x.sum(numeric_only=True)), - (lambda x: x.mean(numeric_only=True)), - (lambda x: x.min(numeric_only=True)), - (lambda x: x.max(numeric_only=True)), - (lambda x: x.std(numeric_only=True)), - (lambda x: x.var(numeric_only=True)), - (lambda x: x.count(numeric_only=False)), - (lambda x: x.nunique()), + (lambda x: x.sum(numeric_only=True), "Float64"), + (lambda x: x.mean(numeric_only=True), "Float64"), + (lambda x: x.min(numeric_only=True), "Float64"), + (lambda x: x.max(numeric_only=True), "Float64"), + (lambda x: x.std(numeric_only=True), "Float64"), + (lambda x: x.var(numeric_only=True), "Float64"), + (lambda x: x.count(numeric_only=False), "Int64"), + (lambda x: x.nunique(), "Int64"), ], ids=["sum", "mean", "min", "max", "std", "var", "count", "nunique"], ) -def test_dataframe_aggregates(scalars_df_index, scalars_pandas_df_index, op, ordered): +def test_dataframe_aggregates( + scalars_df_index, scalars_pandas_df_index, op, bf_dtype, ordered +): col_names = ["int64_too", "float64_col", "string_col", "int64_col", "bool_col"] bf_series = op(scalars_df_index[col_names]) - pd_series = op(scalars_pandas_df_index[col_names]) bf_result = bf_series.to_pandas(ordered=ordered) + pd_result = op(scalars_pandas_df_index[col_names]) - # Pandas may produce narrower numeric types, but bigframes always produces Float64 - pd_series = pd_series.astype("Float64") + # Check dtype separately + assert bf_result.dtype == bf_dtype + + # Pandas may produce narrower numeric types # Pandas has object index type assert_series_equal( - pd_series, bf_result, check_index_type=False, ignore_order=not ordered + pd_result, + bf_result, + check_dtype=False, + check_index_type=False, + ignore_order=not ordered, ) From 257ef8a7ec1c51501def3bbf96af860bf1aa987f Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 8 Apr 2024 17:53:19 +0000 Subject: [PATCH 2/4] add aggregate tests for bool result --- tests/system/small/test_dataframe.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 6c5c92506b..7f6e65859b 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2993,6 +2993,25 @@ def test_dataframe_aggregate_int(scalars_df_index, scalars_pandas_df_index, col, assert_series_equal(pd_result, bf_result, check_dtype=False, check_index_type=False) +@pytest.mark.parametrize( + ("col", "op"), + [ + pytest.param("bool_col", lambda x: x.min(), id="bool-min"), + pytest.param("bool_col", lambda x: x.max(), id="bool-max"), + ], +) +def test_dataframe_aggregate_bool(scalars_df_index, scalars_pandas_df_index, col, op): + bf_result = op(scalars_df_index[[col]]).to_pandas() + pd_result = op(scalars_pandas_df_index[[col]]) + + # Check dtype separately + assert bf_result.dtype == "boolean" + + # Pandas may produce narrower numeric types + # Pandas has object index type + assert_series_equal(pd_result, bf_result, check_dtype=False, check_index_type=False) + + @pytest.mark.parametrize( ("ordered"), [ From e7c30cd84d60063404fe355e421ad09743f9b14a Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 10 Apr 2024 23:26:03 +0000 Subject: [PATCH 3/4] refactor and reuse dtypes.lcd_dtype --- bigframes/core/blocks.py | 14 +++++--------- bigframes/dtypes.py | 16 ++++++++++------ 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 2f8c3eafe0..a7474b6ae4 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -934,14 +934,12 @@ def aggregate_all_and_stack( # if all aggregates are of the same dtype then we should set that # dtype in the result if dtype is None: - aggregate_dtypes = set( - [ + dtype = bigframes.dtypes.lcd_type( + *[ aggregate_expr.get_column_type(c) for c in aggregate_expr.column_ids ] ) - if len(aggregate_dtypes) == 1: - dtype = aggregate_dtypes.pop() if dtype is None: dtype = pd.Float64Dtype() result_expr = aggregate_expr.unpivot( @@ -1150,12 +1148,10 @@ def summarize( for col_id in column_ids ] aggregate_expr = self.expr.aggregate(aggregations) - aggregate_dtypes = set( - [aggregate_expr.get_column_type(c) for c in aggregate_expr.column_ids] + dtype = bigframes.dtypes.lcd_type( + *[aggregate_expr.get_column_type(c) for c in aggregate_expr.column_ids] ) - if len(aggregate_dtypes) == 1: - dtype = aggregate_dtypes.pop() - else: + if dtype is None: dtype = pd.Float64Dtype() expr = aggregate_expr.unpivot( labels, diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 3b2092bf85..d2dc210e0d 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -658,10 +658,14 @@ def is_compatible(scalar: typing.Any, dtype: Dtype) -> typing.Optional[Dtype]: return None -def lcd_type(dtype1: Dtype, dtype2: Dtype) -> Dtype: - """Get the supertype of the two types.""" - if dtype1 == dtype2: - return dtype1 +def lcd_type(*dtypes: Dtype) -> Dtype: + if len(dtypes) < 1: + raise ValueError("at least one dypes should be provided") + if len(dtypes) == 1: + return dtypes[0] + unique_dtypes = set(dtypes) + if len(unique_dtypes) == 1: + return unique_dtypes.pop() # Implicit conversion currently only supported for numeric types hierarchy: list[Dtype] = [ pd.BooleanDtype(), @@ -670,9 +674,9 @@ def lcd_type(dtype1: Dtype, dtype2: Dtype) -> Dtype: pd.ArrowDtype(pa.decimal256(76, 38)), pd.Float64Dtype(), ] - if (dtype1 not in hierarchy) or (dtype2 not in hierarchy): + if any([dtype not in hierarchy for dtype in dtypes]): return None - lcd_index = max(hierarchy.index(dtype1), hierarchy.index(dtype2)) + lcd_index = max([hierarchy.index(dtype) for dtype in dtypes]) return hierarchy[lcd_index] From 7d5561bd69d0fc4c02eae04de136611a5b2e8ff4 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 11 Apr 2024 04:53:13 +0000 Subject: [PATCH 4/4] check_dtype=False --- tests/system/small/test_dataframe.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 0407b818fd..183c939b0d 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2395,7 +2395,9 @@ def test_dataframe_agg_single_string(scalars_dfs): pd_result = scalars_pandas_df[numeric_cols].agg("sum") assert bf_result.dtype == "Float64" - pd.testing.assert_series_equal(pd_result, bf_result, check_index_type=False) + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) def test_dataframe_agg_int_single_string(scalars_dfs): @@ -2406,8 +2408,9 @@ def test_dataframe_agg_int_single_string(scalars_dfs): pd_result = scalars_pandas_df[numeric_cols].agg("sum") assert bf_result.dtype == "Int64" - # Pandas has object index type - pd.testing.assert_series_equal(pd_result, bf_result, check_index_type=False) + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) def test_dataframe_agg_multi_string(scalars_dfs):