From fc80e1774f02cc31af6ddd41744561059edc2b99 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 20 Nov 2024 02:01:05 +0000 Subject: [PATCH 01/13] perf: update df.corr, df.cov to be used with more than 30 columns case. --- bigframes/core/blocks.py | 44 ------- bigframes/dataframe.py | 188 ++++++++++++++++++++++++++- tests/system/conftest.py | 31 +++++ tests/system/small/test_dataframe.py | 11 +- 4 files changed, 223 insertions(+), 51 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 4fc663817c..72078c9c50 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1317,50 +1317,6 @@ def summarize( index_columns=index_cols, ) - def calculate_pairwise_metric(self, op=agg_ops.CorrOp()): - """ - Returns a block object to compute pairwise metrics among all value columns in this block. - - The metric to be computed is specified by the `op` parameter, which can be either a - correlation operation (default) or a covariance operation. - """ - if len(self.value_columns) > 30: - raise NotImplementedError( - "This function supports dataframes with 30 columns or fewer. " - f"Provided dataframe has {len(self.value_columns)} columns. {constants.FEEDBACK_LINK}" - ) - - aggregations = [ - ( - ex.BinaryAggregation(op, ex.deref(left_col), ex.deref(right_col)), - f"{left_col}-{right_col}", - ) - for left_col in self.value_columns - for right_col in self.value_columns - ] - expr = self.expr.aggregate(aggregations) - - input_count = len(self.value_columns) - unpivot_columns = tuple( - tuple(expr.column_ids[input_count * i : input_count * (i + 1)]) - for i in range(input_count) - ) - labels = self._get_labels_for_columns(self.value_columns) - - # TODO(b/340896143): fix type error - expr, (index_col_ids, _, _) = unpivot( - expr, - row_labels=labels, - unpivot_columns=unpivot_columns, - ) - - return Block( - expr, - column_labels=self.column_labels, - index_columns=index_col_ids, - index_labels=self.column_labels.names, - ) - def explode( self, column_ids: typing.Sequence[str], diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 0b639a5649..42214d0bfc 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1197,7 +1197,105 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr else: frame = self._drop_non_numeric() - return DataFrame(frame._block.calculate_pairwise_metric(op=agg_ops.CorrOp())) + orig_columns = frame.columns + # Replace column names with 0 to n - 1 to keep order + # and avoid the influence of duplicated column name + frame.columns = pandas.Index(range(len(orig_columns))) + frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE) + block = frame._block + + # A new column that uniquely identifies each row + block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx") + + val_col_ids = [ + col_id for col_id in block.value_columns if col_id != ordering_col + ] + + block = block.melt( + [ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value" + ) + + block = block.merge( + block, + left_join_ids=[ordering_col], + right_join_ids=[ordering_col], + how="inner", + sort=False, + ) + + frame = DataFrame(block) + frame = frame.dropna(subset=["_bigframes_value_x", "_bigframes_value_y"]) + + paired_mean_frame = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_paired_mean_x=bigframes.pandas.NamedAgg( + column="_bigframes_value_x", aggfunc="mean" + ), + _bigframes_paired_mean_y=bigframes.pandas.NamedAgg( + column="_bigframes_value_y", aggfunc="mean" + ), + ) + .reset_index() + ) + + frame = frame.merge( + paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"] + ) + frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"] + frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"] + + frame["_bigframes_dividend"] = ( + frame["_bigframes_value_x"] * frame["_bigframes_value_y"] + ) + frame["_bigframes_x_square"] = ( + frame["_bigframes_value_x"] * frame["_bigframes_value_x"] + ) + frame["_bigframes_y_square"] = ( + frame["_bigframes_value_y"] * frame["_bigframes_value_y"] + ) + + result = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_dividend_sum=bigframes.pandas.NamedAgg( + column="_bigframes_dividend", aggfunc="sum" + ), + _bigframes_x_square_sum=bigframes.pandas.NamedAgg( + column="_bigframes_x_square", aggfunc="sum" + ), + _bigframes_y_square_sum=bigframes.pandas.NamedAgg( + column="_bigframes_y_square", aggfunc="sum" + ), + ) + .reset_index() + ) + result["_bigframes_corr"] = result["_bigframes_dividend_sum"] / ( + ( + result["_bigframes_x_square_sum"] * result["_bigframes_y_square_sum"] + )._apply_unary_op(ops.sqrt_op) + ) + result = result._pivot( + index="_bigframes_variable_x", + columns="_bigframes_variable_y", + values="_bigframes_corr", + ) + + result.columns = orig_columns + + map_df = bigframes.dataframe.DataFrame( + { + "_biframes_keys": range(len(orig_columns)), + "_bigframes_new_index": orig_columns, + }, + session=self._get_block().expr.session, + ).set_index("_biframes_keys") + result = result.join(map_df) + result = result.sort_index() + result = result.set_index("_bigframes_new_index") + result.index.name = None + + return result def cov(self, *, numeric_only: bool = False) -> DataFrame: if not numeric_only: @@ -1205,7 +1303,93 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: else: frame = self._drop_non_numeric() - return DataFrame(frame._block.calculate_pairwise_metric(agg_ops.CovOp())) + orig_columns = frame.columns + # Replace column names with 0 to n - 1 to keep order + # and avoid the influence of duplicated column name + frame.columns = pandas.Index(range(len(orig_columns))) + frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE) + block = frame._block + + # A new column that uniquely identifies each row + block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx") + + val_col_ids = [ + col_id for col_id in block.value_columns if col_id != ordering_col + ] + + block = block.melt( + [ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value" + ) + block = block.merge( + block, + left_join_ids=[ordering_col], + right_join_ids=[ordering_col], + how="inner", + sort=False, + ) + + frame = DataFrame(block) + frame = frame.dropna(subset=["_bigframes_value_x", "_bigframes_value_y"]) + + paired_mean_frame = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_paired_mean_x=bigframes.pandas.NamedAgg( + column="_bigframes_value_x", aggfunc="mean" + ), + _bigframes_paired_mean_y=bigframes.pandas.NamedAgg( + column="_bigframes_value_y", aggfunc="mean" + ), + ) + .reset_index() + ) + + frame = frame.merge( + paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"] + ) + frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"] + frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"] + + frame["_bigframes_dividend"] = ( + frame["_bigframes_value_x"] * frame["_bigframes_value_y"] + ) + + result = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_dividend_sum=bigframes.pandas.NamedAgg( + column="_bigframes_dividend", aggfunc="sum" + ), + _bigframes_dividend_count=bigframes.pandas.NamedAgg( + column="_bigframes_dividend", aggfunc="count" + ), + ) + .reset_index() + ) + result["_bigframes_cov"] = result["_bigframes_dividend_sum"] / ( + result["_bigframes_dividend_count"] - 1 + ) + result = result._pivot( + index="_bigframes_variable_x", + columns="_bigframes_variable_y", + values="_bigframes_cov", + ) + + result.columns = orig_columns + + map_df = bigframes.dataframe.DataFrame( + { + "_biframes_keys": range(len(orig_columns)), + "_bigframes_new_index": orig_columns, + }, + session=self._get_block().expr.session, + ).set_index("_biframes_keys") + result = result.join(map_df) + result = result.sort_index() + result = result.set_index("_bigframes_new_index") + result.index.name = None + + return result def to_arrow( self, diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 5e95b8ee5d..a4c1d28d96 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -506,6 +506,37 @@ def json_pandas_df() -> pd.DataFrame: return df +@pytest.fixture(scope="session") +def scalars_df_numeric_150_columns_maybe_ordered( + scalars_dfs_maybe_ordered: bigframes.dataframe.DataFrame, +): + """DataFrame pointing at test data.""" + # TODO(b/379911038): After the error fixed, add numeric type. + df, pandas_df = scalars_dfs_maybe_ordered + df = df.reset_index(drop=False)[ + [ + "rowindex", + "rowindex_2", + "float64_col", + "int64_col", + "int64_too", + ] + ] + df = bpd.concat([df] * 30, axis=1) + + pandas_df = pandas_df.reset_index(drop=False)[ + [ + "rowindex", + "rowindex_2", + "float64_col", + "int64_col", + "int64_too", + ] + * 30 + ] + return (df, pandas_df) + + @pytest.fixture(scope="session") def scalars_df_default_index( scalars_df_index: bigframes.dataframe.DataFrame, diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index f69eb2eb4a..233f5e32b0 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2061,8 +2061,8 @@ def test_combine_first( ), ], ) -def test_corr_w_numeric_only(scalars_dfs, columns, numeric_only): - scalars_df, scalars_pandas_df = scalars_dfs +def test_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered bf_result = scalars_df[columns].corr(numeric_only=numeric_only).to_pandas() pd_result = scalars_pandas_df[columns].corr(numeric_only=numeric_only) @@ -2101,11 +2101,12 @@ def test_corr_w_invalid_parameters(scalars_dfs): ), ], ) -def test_cov_w_numeric_only(scalars_dfs, columns, numeric_only): - scalars_df, scalars_pandas_df = scalars_dfs +def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered bf_result = scalars_df[columns].cov(numeric_only=numeric_only).to_pandas() + print(bf_result) pd_result = scalars_pandas_df[columns].cov(numeric_only=numeric_only) - + print(pd_result) # BigFrames and Pandas differ in their data type handling: # - Column types: BigFrames uses Float64, Pandas uses float64. # - Index types: BigFrames uses strign, Pandas uses object. From 52aaa380b190ee6b3c12d9bd8aa402ec9d03a1f5 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 20 Nov 2024 02:02:19 +0000 Subject: [PATCH 02/13] add large test --- tests/system/large/test_dataframe.py | 29 ++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 tests/system/large/test_dataframe.py diff --git a/tests/system/large/test_dataframe.py b/tests/system/large/test_dataframe.py new file mode 100644 index 0000000000..855a613138 --- /dev/null +++ b/tests/system/large/test_dataframe.py @@ -0,0 +1,29 @@ +import pandas as pd + + +def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): + scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered + bf_result = scalars_df.corr(numeric_only=True).to_pandas() + pd_result = scalars_pandas_df.corr(numeric_only=True) + + pd.testing.assert_frame_equal( + bf_result, + pd_result, + check_dtype=False, + check_index_type=False, + check_column_type=False, + ) + + +def test_cov_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): + scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered + bf_result = scalars_df.cov(numeric_only=True).to_pandas() + pd_result = scalars_pandas_df.cov(numeric_only=True) + + pd.testing.assert_frame_equal( + bf_result, + pd_result, + check_dtype=False, + check_index_type=False, + check_column_type=False, + ) From 359aea941a357f4b6bc6d3bef36a7c1e57ad181a Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 20 Nov 2024 02:12:35 +0000 Subject: [PATCH 03/13] remove print --- tests/system/small/test_dataframe.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 233f5e32b0..be04a47129 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2104,9 +2104,7 @@ def test_corr_w_invalid_parameters(scalars_dfs): def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered bf_result = scalars_df[columns].cov(numeric_only=numeric_only).to_pandas() - print(bf_result) pd_result = scalars_pandas_df[columns].cov(numeric_only=numeric_only) - print(pd_result) # BigFrames and Pandas differ in their data type handling: # - Column types: BigFrames uses Float64, Pandas uses float64. # - Index types: BigFrames uses strign, Pandas uses object. From 9c2b99729dbadfce7439cca9fbb3afa5d9308171 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 20 Nov 2024 04:20:10 +0000 Subject: [PATCH 04/13] fix_index --- bigframes/dataframe.py | 42 +++++++++++++++------------ tests/system/small/test_multiindex.py | 8 +++-- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 42214d0bfc..ae35fa61db 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1281,19 +1281,22 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr values="_bigframes_corr", ) - result.columns = orig_columns - + n_levels = len(orig_columns.levels) + map_data = { + f"_bigframes_level_{i}": orig_columns.get_level_values(i) + for i in range(n_levels) + } + map_data["_bigframes_keys"] = range(len(orig_columns)) map_df = bigframes.dataframe.DataFrame( - { - "_biframes_keys": range(len(orig_columns)), - "_bigframes_new_index": orig_columns, - }, + map_data, session=self._get_block().expr.session, - ).set_index("_biframes_keys") + ).set_index("_bigframes_keys") result = result.join(map_df) result = result.sort_index() - result = result.set_index("_bigframes_new_index") - result.index.name = None + index_columns = [f"_bigframes_level_{i}" for i in range(n_levels)] + result = result.set_index(index_columns) + result.index.names = orig_columns.names + result.columns = orig_columns return result @@ -1375,19 +1378,22 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: values="_bigframes_cov", ) - result.columns = orig_columns - + n_levels = len(orig_columns.levels) + map_data = { + f"_bigframes_level_{i}": orig_columns.get_level_values(i) + for i in range(n_levels) + } + map_data["_bigframes_keys"] = range(len(orig_columns)) map_df = bigframes.dataframe.DataFrame( - { - "_biframes_keys": range(len(orig_columns)), - "_bigframes_new_index": orig_columns, - }, + map_data, session=self._get_block().expr.session, - ).set_index("_biframes_keys") + ).set_index("_bigframes_keys") result = result.join(map_df) result = result.sort_index() - result = result.set_index("_bigframes_new_index") - result.index.name = None + index_columns = [f"_bigframes_level_{i}" for i in range(n_levels)] + result = result.set_index(index_columns) + result.index.names = orig_columns.names + result.columns = orig_columns return result diff --git a/tests/system/small/test_multiindex.py b/tests/system/small/test_multiindex.py index cab74f617d..1c78ac63d9 100644 --- a/tests/system/small/test_multiindex.py +++ b/tests/system/small/test_multiindex.py @@ -910,7 +910,9 @@ def test_column_multi_index_unstack(scalars_df_index, scalars_pandas_df_index): def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index): columns = ["int64_too", "float64_col", "int64_col"] - multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2])) + multi_columns = pandas.MultiIndex.from_tuples( + zip(["a", "b", "b"], [1, 2, 2]), names=[None, "level_2"] + ) bf = scalars_df_index[columns].copy() bf.columns = multi_columns @@ -931,7 +933,9 @@ def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index): def test_cov_w_multi_index(scalars_df_index, scalars_pandas_df_index): columns = ["int64_too", "float64_col", "int64_col"] - multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2])) + multi_columns = pandas.MultiIndex.from_tuples( + zip(["a", "b", "b"], [1, 2, 2]), names=["level_1", None] + ) bf = scalars_df_index[columns].copy() bf.columns = multi_columns From dcb1eb21ee6cde0f67e9c1ebf8532caf92662cfa Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 20 Nov 2024 04:49:21 +0000 Subject: [PATCH 05/13] fix index --- bigframes/dataframe.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index ae35fa61db..3d144725e9 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1280,11 +1280,9 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr columns="_bigframes_variable_y", values="_bigframes_corr", ) - - n_levels = len(orig_columns.levels) map_data = { f"_bigframes_level_{i}": orig_columns.get_level_values(i) - for i in range(n_levels) + for i in range(orig_columns.nlevels) } map_data["_bigframes_keys"] = range(len(orig_columns)) map_df = bigframes.dataframe.DataFrame( @@ -1293,7 +1291,7 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr ).set_index("_bigframes_keys") result = result.join(map_df) result = result.sort_index() - index_columns = [f"_bigframes_level_{i}" for i in range(n_levels)] + index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)] result = result.set_index(index_columns) result.index.names = orig_columns.names result.columns = orig_columns @@ -1378,10 +1376,9 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: values="_bigframes_cov", ) - n_levels = len(orig_columns.levels) map_data = { f"_bigframes_level_{i}": orig_columns.get_level_values(i) - for i in range(n_levels) + for i in range(orig_columns.nlevels) } map_data["_bigframes_keys"] = range(len(orig_columns)) map_df = bigframes.dataframe.DataFrame( @@ -1390,7 +1387,7 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: ).set_index("_bigframes_keys") result = result.join(map_df) result = result.sort_index() - index_columns = [f"_bigframes_level_{i}" for i in range(n_levels)] + index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)] result = result.set_index(index_columns) result.index.names = orig_columns.names result.columns = orig_columns From b657bbb137c4c6ec0ddaf20272bcfcd0f324c2b6 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 20 Nov 2024 22:06:34 +0000 Subject: [PATCH 06/13] test fix --- tests/system/conftest.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index a4c1d28d96..603484d911 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -522,7 +522,9 @@ def scalars_df_numeric_150_columns_maybe_ordered( "int64_too", ] ] - df = bpd.concat([df] * 30, axis=1) + + # Cache to avoid RecursionError + df = bpd.concat([df] * 30, axis=1).cache() pandas_df = pandas_df.reset_index(drop=False)[ [ From 951580331d68b957f5b8a7bdf841d54ae2fa774f Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 21 Nov 2024 18:53:41 +0000 Subject: [PATCH 07/13] fix test --- tests/system/conftest.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 603484d911..414d461d62 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -523,8 +523,9 @@ def scalars_df_numeric_150_columns_maybe_ordered( ] ] - # Cache to avoid RecursionError - df = bpd.concat([df] * 30, axis=1).cache() + # concat twice to avoid RecursionError + df = bpd.concat([df] * 5, axis=1) + df = bpd.concat([df] * 6, axis=1) pandas_df = pandas_df.reset_index(drop=False)[ [ From 493e80ecbd65b4ac2f8a53ff53f60acd660d0696 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 21 Nov 2024 19:43:48 +0000 Subject: [PATCH 08/13] fix test --- tests/system/conftest.py | 56 ++++++++++++++++------------------------ 1 file changed, 22 insertions(+), 34 deletions(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 414d461d62..14abd2420d 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -506,40 +506,6 @@ def json_pandas_df() -> pd.DataFrame: return df -@pytest.fixture(scope="session") -def scalars_df_numeric_150_columns_maybe_ordered( - scalars_dfs_maybe_ordered: bigframes.dataframe.DataFrame, -): - """DataFrame pointing at test data.""" - # TODO(b/379911038): After the error fixed, add numeric type. - df, pandas_df = scalars_dfs_maybe_ordered - df = df.reset_index(drop=False)[ - [ - "rowindex", - "rowindex_2", - "float64_col", - "int64_col", - "int64_too", - ] - ] - - # concat twice to avoid RecursionError - df = bpd.concat([df] * 5, axis=1) - df = bpd.concat([df] * 6, axis=1) - - pandas_df = pandas_df.reset_index(drop=False)[ - [ - "rowindex", - "rowindex_2", - "float64_col", - "int64_col", - "int64_too", - ] - * 30 - ] - return (df, pandas_df) - - @pytest.fixture(scope="session") def scalars_df_default_index( scalars_df_index: bigframes.dataframe.DataFrame, @@ -640,6 +606,28 @@ def scalars_dfs_maybe_ordered( ) +@pytest.fixture(scope="session") +def scalars_df_numeric_150_columns_maybe_ordered( + maybe_ordered_session, + scalars_pandas_df_index, +): + """DataFrame pointing at test data.""" + # TODO(b/379911038): After the error fixed, add numeric type. + pandas_df = scalars_pandas_df_index.reset_index(drop=False)[ + [ + "rowindex", + "rowindex_2", + "float64_col", + "int64_col", + "int64_too", + ] + * 30 + ] + + df = maybe_ordered_session.read_pandas(pandas_df) + return (df, pandas_df) + + @pytest.fixture(scope="session") def hockey_df( hockey_table_id: str, session: bigframes.Session From 1abd51ee7b7999d710f432c3fc003b135bb15be5 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 21 Nov 2024 20:26:39 +0000 Subject: [PATCH 09/13] slightly improve multi_apply_unary_op to avoid RecursionError --- bigframes/core/blocks.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 72078c9c50..daa1245fcb 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -912,6 +912,8 @@ def multi_apply_unary_op( input_varname = input_varnames[0] block = self + + result_ids = [] for col_id in columns: label = self.col_id_to_label[col_id] block, result_id = block.project_expr( @@ -919,7 +921,8 @@ def multi_apply_unary_op( label=label, ) block = block.copy_values(result_id, col_id) - block = block.drop_columns([result_id]) + result_ids.append(result_id) + block = block.drop_columns(result_ids) # Special case, we can preserve transpose cache for full-frame unary ops if (self._transpose_cache is not None) and set(self.value_columns) == set( columns From 0ffa77890e7a272a25834bd354157b37829a9792 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 21 Nov 2024 22:28:12 +0000 Subject: [PATCH 10/13] update recursion limit for nox session --- bigframes/dataframe.py | 1 + noxfile.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 3d144725e9..28705a2233 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1280,6 +1280,7 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr columns="_bigframes_variable_y", values="_bigframes_corr", ) + map_data = { f"_bigframes_level_{i}": orig_columns.get_level_values(i) for i in range(orig_columns.nlevels) diff --git a/noxfile.py b/noxfile.py index 341de704e5..9c6c699ecf 100644 --- a/noxfile.py +++ b/noxfile.py @@ -22,6 +22,7 @@ import pathlib import re import shutil +import sys import time from typing import Dict, List import warnings @@ -112,6 +113,9 @@ # Error if a python version is missing nox.options.error_on_missing_interpreters = True +# Set recursionlimit to 10000 +sys.setrecursionlimit(10000) + @nox.session(python=DEFAULT_PYTHON_VERSION) def lint(session): From 52f0dfe95f2ad4fe50bfd0f274d8caa39e546064 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Fri, 22 Nov 2024 21:30:03 +0000 Subject: [PATCH 11/13] skip the test in e2e/python 3.12 --- noxfile.py | 4 ---- tests/system/large/test_dataframe.py | 13 +++++++++++++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/noxfile.py b/noxfile.py index 9c6c699ecf..341de704e5 100644 --- a/noxfile.py +++ b/noxfile.py @@ -22,7 +22,6 @@ import pathlib import re import shutil -import sys import time from typing import Dict, List import warnings @@ -113,9 +112,6 @@ # Error if a python version is missing nox.options.error_on_missing_interpreters = True -# Set recursionlimit to 10000 -sys.setrecursionlimit(10000) - @nox.session(python=DEFAULT_PYTHON_VERSION) def lint(session): diff --git a/tests/system/large/test_dataframe.py b/tests/system/large/test_dataframe.py index 855a613138..20d383463a 100644 --- a/tests/system/large/test_dataframe.py +++ b/tests/system/large/test_dataframe.py @@ -1,6 +1,14 @@ +import sys + import pandas as pd +import pytest +@pytest.mark.skipif( + sys.version_info >= (3, 12), + # See: https://github.com/python/cpython/issues/112282 + reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", +) def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered bf_result = scalars_df.corr(numeric_only=True).to_pandas() @@ -15,6 +23,11 @@ def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): ) +@pytest.mark.skipif( + sys.version_info >= (3, 12), + # See: https://github.com/python/cpython/issues/112282 + reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", +) def test_cov_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered bf_result = scalars_df.cov(numeric_only=True).to_pandas() From 19764ad3db93013fe436aa5ac369d82d26c7cec8 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 4 Dec 2024 01:01:10 +0000 Subject: [PATCH 12/13] simplify code --- bigframes/dataframe.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 28705a2233..68ee4a2677 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1223,8 +1223,9 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr sort=False, ) - frame = DataFrame(block) - frame = frame.dropna(subset=["_bigframes_value_x", "_bigframes_value_y"]) + frame = DataFrame(block).dropna( + subset=["_bigframes_value_x", "_bigframes_value_y"] + ) paired_mean_frame = ( frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) @@ -1330,8 +1331,9 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: sort=False, ) - frame = DataFrame(block) - frame = frame.dropna(subset=["_bigframes_value_x", "_bigframes_value_y"]) + frame = DataFrame(block).dropna( + subset=["_bigframes_value_x", "_bigframes_value_y"] + ) paired_mean_frame = ( frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) From bd46366ffdb9dfb0744388e69a62fb48c594dfac Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 4 Dec 2024 01:06:31 +0000 Subject: [PATCH 13/13] simplify code --- bigframes/dataframe.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 68ee4a2677..d04d09a4c0 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1291,8 +1291,7 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr map_data, session=self._get_block().expr.session, ).set_index("_bigframes_keys") - result = result.join(map_df) - result = result.sort_index() + result = result.join(map_df).sort_index() index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)] result = result.set_index(index_columns) result.index.names = orig_columns.names @@ -1388,8 +1387,7 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: map_data, session=self._get_block().expr.session, ).set_index("_bigframes_keys") - result = result.join(map_df) - result = result.sort_index() + result = result.join(map_df).sort_index() index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)] result = result.set_index(index_columns) result.index.names = orig_columns.names