From 604d95114339712193196edcfaf5e58b7a3648b7 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 18 Mar 2024 19:39:08 +0000 Subject: [PATCH 1/3] fix: df.drop_na preserves columns dtype --- bigframes/core/block_transforms.py | 37 ++++++++-------------------- bigframes/core/blocks.py | 29 +++++++++++++++------- bigframes/core/indexes/index.py | 2 +- bigframes/dataframe.py | 10 ++++---- bigframes/series.py | 10 ++++---- tests/system/small/test_dataframe.py | 20 +++++++++++++-- 6 files changed, 59 insertions(+), 49 deletions(-) diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index 9cc0a05680..6b9a367f55 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import annotations +import functools import typing import pandas as pd @@ -307,7 +308,7 @@ def drop_duplicates( ) -> blocks.Block: block, dupe_indicator_id = indicate_duplicates(block, columns, keep) block, keep_indicator_id = block.apply_unary_op(dupe_indicator_id, ops.invert_op) - return block.filter(keep_indicator_id).drop_columns( + return block.filter_by_id(keep_indicator_id).drop_columns( (dupe_indicator_id, keep_indicator_id) ) @@ -459,32 +460,14 @@ def dropna( """ Drop na entries from block """ + predicates = [ops.notnull_op.as_expr(column_id) for column_id in column_ids] + if len(predicates) == 0: + return block if how == "any": - filtered_block = block - for column in column_ids: - filtered_block, result_id = filtered_block.apply_unary_op( - column, ops.notnull_op - ) - filtered_block = filtered_block.filter(result_id) - filtered_block = filtered_block.drop_columns([result_id]) - return filtered_block + predicate = functools.reduce(ops.and_op.as_expr, predicates) else: # "all" - filtered_block = block - predicate = None - for column in column_ids: - filtered_block, partial_predicate = filtered_block.apply_unary_op( - column, ops.notnull_op - ) - if predicate: - filtered_block, predicate = filtered_block.apply_binary_op( - partial_predicate, predicate, ops.or_op - ) - else: - predicate = partial_predicate - if predicate: - filtered_block = filtered_block.filter(predicate) - filtered_block = filtered_block.select_columns(block.value_columns) - return filtered_block + predicate = functools.reduce(ops.or_op.as_expr, predicates) + return block.filter(predicate) def nsmallest( @@ -513,7 +496,7 @@ def nsmallest( window_spec=windows.WindowSpec(ordering=tuple(order_refs)), ) block, condition = block.project_expr(ops.le_op.as_expr(counter, ex.const(n))) - block = block.filter(condition) + block = block.filter_by_id(condition) return block.drop_columns([counter, condition]) @@ -543,7 +526,7 @@ def nlargest( window_spec=windows.WindowSpec(ordering=tuple(order_refs)), ) block, condition = block.project_expr(ops.le_op.as_expr(counter, ex.const(n))) - block = block.filter(condition) + block = block.filter_by_id(condition) return block.drop_columns([counter, condition]) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 375ce7e7e0..4f90c4e463 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -37,6 +37,7 @@ import bigframes.constants as constants import bigframes.core as core import bigframes.core.expression as ex +import bigframes.core.expression as scalars import bigframes.core.guid as guid import bigframes.core.join_def as join_defs import bigframes.core.ordering as ordering @@ -701,7 +702,7 @@ def project_expr( block = Block( array_val, index_columns=self.index_columns, - column_labels=[*self.column_labels, label], + column_labels=self.column_labels.insert(len(self.column_labels), label), index_labels=self.index.names, ) return (block, result_id) @@ -793,7 +794,7 @@ def apply_window_op( if skip_null_groups: for key in window_spec.grouping_keys: block, not_null_id = block.apply_unary_op(key, ops.notnull_op) - block = block.filter(not_null_id).drop_columns([not_null_id]) + block = block.filter_by_id(not_null_id).drop_columns([not_null_id]) result_id = guid.generate_guid() expr = block._expr.project_window_op( column, @@ -806,7 +807,9 @@ def apply_window_op( block = Block( expr, index_columns=self.index_columns, - column_labels=[*self.column_labels, result_label], + column_labels=self.column_labels.insert( + len(self.column_labels), result_label + ), index_labels=self._index_labels, ) return (block, result_id) @@ -850,7 +853,7 @@ def assign_label(self, column_id: str, new_label: Label) -> Block: ) return self.with_column_labels(new_labels) - def filter(self, column_id: str, keep_null: bool = False): + def filter_by_id(self, column_id: str, keep_null: bool = False): return Block( self._expr.filter_by_id(column_id, keep_null), index_columns=self.index_columns, @@ -858,6 +861,14 @@ def filter(self, column_id: str, keep_null: bool = False): index_labels=self.index.names, ) + def filter(self, predicate: scalars.Expression): + return Block( + self._expr.filter(predicate), + index_columns=self.index_columns, + column_labels=self.column_labels, + index_labels=self.index.names, + ) + def aggregate_all_and_stack( self, operation: agg_ops.UnaryAggregateOp, @@ -1156,10 +1167,10 @@ def _standard_stats(self, column_id) -> typing.Sequence[agg_ops.UnaryAggregateOp return stats - def _get_labels_for_columns(self, column_ids: typing.Sequence[str]): + def _get_labels_for_columns(self, column_ids: typing.Sequence[str]) -> pd.Index: """Get column label for value columns, or index name for index columns""" - lookup = self.col_id_to_label - return [lookup.get(col_id, None) for col_id in column_ids] + indices = [self.value_columns.index(col_id) for col_id in column_ids] + return self.column_labels.take(indices, allow_fill=False) def _normalize_expression( self, @@ -1255,7 +1266,7 @@ def _forward_slice(self, start: int = 0, stop=None, step: int = 1): for cond in conditions: block, cond_id = block.project_expr(cond) - block = block.filter(cond_id) + block = block.filter_by_id(cond_id) return block.select_columns(self.value_columns) @@ -1292,7 +1303,7 @@ def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: Block( expr, index_columns=self.index_columns, - column_labels=[label, *self.column_labels], + column_labels=self.column_labels.insert(0, label), index_labels=self._index_labels, ), result_id, diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index 328dd49397..c8cb07d339 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -378,7 +378,7 @@ def drop( block, condition_id = block.project_expr( ops.ne_op.as_expr(level_id, ex.const(labels)) ) - block = block.filter(condition_id, keep_null=True) + block = block.filter_by_id(condition_id, keep_null=True) block = block.drop_columns([condition_id]) return Index(block) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 5dae7a82f9..482e98ddd1 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -525,7 +525,7 @@ def _getitem_bool_series(self, key: bigframes.series.Series) -> DataFrame: ) = self._block.join(key._block, how="left") block = combined_index filter_col_id = get_column_right[key._value_column] - block = block.filter(filter_col_id) + block = block.filter_by_id(filter_col_id) block = block.drop_columns([filter_col_id]) return DataFrame(block) @@ -1193,7 +1193,7 @@ def drop( block, condition_id = block.project_expr( ops.ne_op.as_expr(level_id, ex.const(index)) ) - block = block.filter(condition_id, keep_null=True).select_columns( + block = block.filter_by_id(condition_id, keep_null=True).select_columns( self._block.value_columns ) if columns: @@ -1214,7 +1214,7 @@ def _drop_by_index(self, index: indexes.Index) -> DataFrame: ops.isnull_op, ) - drop_block = drop_block.filter(drop_col) + drop_block = drop_block.filter_by_id(drop_col) original_columns = [ get_column_left[column] for column in self._block.value_columns ] @@ -1558,7 +1558,7 @@ def _filter_rows( label_string_id, ops.StrContainsRegexOp(pat=regex) ) - block = block.filter(mask_id) + block = block.filter_by_id(mask_id) block = block.select_columns(self._block.value_columns) return DataFrame(block) elif items is not None: @@ -1567,7 +1567,7 @@ def _filter_rows( block, mask_id = block.apply_unary_op( self._block.index_columns[0], ops.IsInOp(values=tuple(items)) ) - block = block.filter(mask_id) + block = block.filter_by_id(mask_id) block = block.select_columns(self._block.value_columns) return DataFrame(block) else: diff --git a/bigframes/series.py b/bigframes/series.py index ef2feb4f92..2fcd5000df 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -354,7 +354,7 @@ def drop( block, condition_id = block.project_expr( ops.ne_op.as_expr(level_id, ex.const(index)) ) - block = block.filter(condition_id, keep_null=True) + block = block.filter_by_id(condition_id, keep_null=True) block = block.drop_columns([condition_id]) return Series(block.select_column(self._value_column)) @@ -861,7 +861,7 @@ def mode(self) -> Series: max_value_count_col_id, ops.eq_op, ) - block = block.filter(is_mode_col_id) + block = block.filter_by_id(is_mode_col_id) # use temporary name for reset_index to avoid collision, restore after dropping extra columns block = ( block.with_index_labels(["mode_temp_internal"]) @@ -1032,7 +1032,7 @@ def __getitem__(self, indexer): return self.iloc[indexer] if isinstance(indexer, Series): (left, right, block) = self._align(indexer, "left") - block = block.filter(right) + block = block.filter_by_id(right) block = block.select_column(left) return Series(block) return self.loc[indexer] @@ -1304,7 +1304,7 @@ def filter( label_string_id, ops.StrContainsRegexOp(pat=regex) ) - block = block.filter(mask_id) + block = block.filter_by_id(mask_id) block = block.select_columns([self._value_column]) return Series(block) elif items is not None: @@ -1313,7 +1313,7 @@ def filter( block, mask_id = block.apply_unary_op( self._block.index_columns[0], ops.IsInOp(values=tuple(items)) ) - block = block.filter(mask_id) + block = block.filter_by_id(mask_id) block = block.select_columns([self._value_column]) return Series(block) else: diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index be4211a2fc..0d35e03272 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -842,6 +842,7 @@ def test_assign_callable_lambda(scalars_dfs): assert_pandas_df_equal(bf_result, pd_result) +@skip_legacy_pandas @pytest.mark.parametrize( ("axis", "how", "ignore_index"), [ @@ -852,8 +853,6 @@ def test_assign_callable_lambda(scalars_dfs): ], ) def test_df_dropna(scalars_dfs, axis, how, ignore_index): - if pd.__version__.startswith("1."): - pytest.skip("ignore_index parameter not supported in pandas 1.x.") scalars_df, scalars_pandas_df = scalars_dfs df = scalars_df.dropna(axis=axis, how=how, ignore_index=ignore_index) bf_result = df.to_pandas() @@ -864,6 +863,23 @@ def test_df_dropna(scalars_dfs, axis, how, ignore_index): pandas.testing.assert_frame_equal(bf_result, pd_result) +@skip_legacy_pandas +def test_df_dropna_range_columns(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + scalars_df = scalars_df.copy() + scalars_pandas_df = scalars_pandas_df.copy() + scalars_df.columns = pandas.RangeIndex(0, len(scalars_df.columns)) + scalars_pandas_df.columns = pandas.RangeIndex(0, len(scalars_pandas_df.columns)) + + df = scalars_df.dropna() + bf_result = df.to_pandas() + pd_result = scalars_pandas_df.dropna() + + # Pandas uses int64 instead of Int64 (nullable) dtype. + # pd_result.index = pd_result.index.astype(pd.Int64Dtype()) + pandas.testing.assert_frame_equal(bf_result, pd_result) + + def test_df_interpolate(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs columns = ["int64_col", "int64_too", "float64_col"] From c73d2bc5a56d6fdd9dcd28ea0bd360e9e9ad19dc Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 18 Mar 2024 20:14:30 +0000 Subject: [PATCH 2/3] fix mypy errors --- bigframes/core/blocks.py | 14 ++++++++------ tests/system/small/test_dataframe.py | 2 -- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 4f90c4e463..a6b2468b1d 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1097,8 +1097,11 @@ def summarize( unpivot_columns=tuple(columns), index_col_ids=tuple([label_col_id]), ) - labels = self._get_labels_for_columns(column_ids) - return Block(expr, column_labels=labels, index_columns=[label_col_id]) + return Block( + expr, + column_labels=self._get_labels_for_columns(column_ids), + index_columns=[label_col_id], + ) def corr(self): """Returns a block object to compute the self-correlation on this block.""" @@ -1402,10 +1405,9 @@ def pivot( if values_in_index or len(values) > 1: value_labels = self._get_labels_for_columns(values) column_index = self._create_pivot_column_index(value_labels, columns_values) + return result_block.with_column_labels(column_index) else: - column_index = columns_values - - return result_block.with_column_labels(column_index) + return result_block.with_column_labels(columns_values) def stack(self, how="left", levels: int = 1): """Unpivot last column axis level into row axis""" @@ -1529,7 +1531,7 @@ def _column_type(self, col_id: str) -> bigframes.dtypes.Dtype: @staticmethod def _create_pivot_column_index( value_labels: Sequence[typing.Hashable], columns_values: pd.Index - ): + ) -> pd.Index: index_parts = [] for value in value_labels: as_frame = columns_values.to_frame() diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 0d35e03272..371bd68df4 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -875,8 +875,6 @@ def test_df_dropna_range_columns(scalars_dfs): bf_result = df.to_pandas() pd_result = scalars_pandas_df.dropna() - # Pandas uses int64 instead of Int64 (nullable) dtype. - # pd_result.index = pd_result.index.astype(pd.Int64Dtype()) pandas.testing.assert_frame_equal(bf_result, pd_result) From e6ad80618ea276cbf268d89bfefcc2bd14b3e292 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 18 Mar 2024 20:58:56 +0000 Subject: [PATCH 3/3] fix mypy issue --- bigframes/core/blocks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index a6b2468b1d..0ebbe48cc4 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1530,7 +1530,7 @@ def _column_type(self, col_id: str) -> bigframes.dtypes.Dtype: @staticmethod def _create_pivot_column_index( - value_labels: Sequence[typing.Hashable], columns_values: pd.Index + value_labels: pd.Index, columns_values: pd.Index ) -> pd.Index: index_parts = [] for value in value_labels: