Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

fix: df.drop_na preserves columns dtype #457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 10 additions & 27 deletions 37 bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
from __future__ import annotations

import functools
import typing

import pandas as pd
Expand Down Expand Up @@ -307,7 +308,7 @@ def drop_duplicates(
) -> blocks.Block:
block, dupe_indicator_id = indicate_duplicates(block, columns, keep)
block, keep_indicator_id = block.apply_unary_op(dupe_indicator_id, ops.invert_op)
return block.filter(keep_indicator_id).drop_columns(
return block.filter_by_id(keep_indicator_id).drop_columns(
(dupe_indicator_id, keep_indicator_id)
)

Expand Down Expand Up @@ -459,32 +460,14 @@ def dropna(
"""
Drop na entries from block
"""
predicates = [ops.notnull_op.as_expr(column_id) for column_id in column_ids]
if len(predicates) == 0:
return block
if how == "any":
filtered_block = block
for column in column_ids:
filtered_block, result_id = filtered_block.apply_unary_op(
column, ops.notnull_op
)
filtered_block = filtered_block.filter(result_id)
filtered_block = filtered_block.drop_columns([result_id])
return filtered_block
predicate = functools.reduce(ops.and_op.as_expr, predicates)
else: # "all"
filtered_block = block
predicate = None
for column in column_ids:
filtered_block, partial_predicate = filtered_block.apply_unary_op(
column, ops.notnull_op
)
if predicate:
filtered_block, predicate = filtered_block.apply_binary_op(
partial_predicate, predicate, ops.or_op
)
else:
predicate = partial_predicate
if predicate:
filtered_block = filtered_block.filter(predicate)
filtered_block = filtered_block.select_columns(block.value_columns)
return filtered_block
predicate = functools.reduce(ops.or_op.as_expr, predicates)
return block.filter(predicate)


def nsmallest(
Expand Down Expand Up @@ -513,7 +496,7 @@ def nsmallest(
window_spec=windows.WindowSpec(ordering=tuple(order_refs)),
)
block, condition = block.project_expr(ops.le_op.as_expr(counter, ex.const(n)))
block = block.filter(condition)
block = block.filter_by_id(condition)
return block.drop_columns([counter, condition])


Expand Down Expand Up @@ -543,7 +526,7 @@ def nlargest(
window_spec=windows.WindowSpec(ordering=tuple(order_refs)),
)
block, condition = block.project_expr(ops.le_op.as_expr(counter, ex.const(n)))
block = block.filter(condition)
block = block.filter_by_id(condition)
return block.drop_columns([counter, condition])


Expand Down
45 changes: 29 additions & 16 deletions 45 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import bigframes.constants as constants
import bigframes.core as core
import bigframes.core.expression as ex
import bigframes.core.expression as scalars
import bigframes.core.guid as guid
import bigframes.core.join_def as join_defs
import bigframes.core.ordering as ordering
Expand Down Expand Up @@ -701,7 +702,7 @@ def project_expr(
block = Block(
array_val,
index_columns=self.index_columns,
column_labels=[*self.column_labels, label],
column_labels=self.column_labels.insert(len(self.column_labels), label),
index_labels=self.index.names,
)
return (block, result_id)
Expand Down Expand Up @@ -793,7 +794,7 @@ def apply_window_op(
if skip_null_groups:
for key in window_spec.grouping_keys:
block, not_null_id = block.apply_unary_op(key, ops.notnull_op)
block = block.filter(not_null_id).drop_columns([not_null_id])
block = block.filter_by_id(not_null_id).drop_columns([not_null_id])
result_id = guid.generate_guid()
expr = block._expr.project_window_op(
column,
Expand All @@ -806,7 +807,9 @@ def apply_window_op(
block = Block(
expr,
index_columns=self.index_columns,
column_labels=[*self.column_labels, result_label],
column_labels=self.column_labels.insert(
len(self.column_labels), result_label
),
index_labels=self._index_labels,
)
return (block, result_id)
Expand Down Expand Up @@ -850,14 +853,22 @@ def assign_label(self, column_id: str, new_label: Label) -> Block:
)
return self.with_column_labels(new_labels)

def filter(self, column_id: str, keep_null: bool = False):
def filter_by_id(self, column_id: str, keep_null: bool = False):
return Block(
self._expr.filter_by_id(column_id, keep_null),
index_columns=self.index_columns,
column_labels=self.column_labels,
index_labels=self.index.names,
)

def filter(self, predicate: scalars.Expression):
return Block(
self._expr.filter(predicate),
index_columns=self.index_columns,
column_labels=self.column_labels,
index_labels=self.index.names,
)

def aggregate_all_and_stack(
self,
operation: agg_ops.UnaryAggregateOp,
Expand Down Expand Up @@ -1086,8 +1097,11 @@ def summarize(
unpivot_columns=tuple(columns),
index_col_ids=tuple([label_col_id]),
)
labels = self._get_labels_for_columns(column_ids)
return Block(expr, column_labels=labels, index_columns=[label_col_id])
return Block(
expr,
column_labels=self._get_labels_for_columns(column_ids),
index_columns=[label_col_id],
)

def corr(self):
"""Returns a block object to compute the self-correlation on this block."""
Expand Down Expand Up @@ -1156,10 +1170,10 @@ def _standard_stats(self, column_id) -> typing.Sequence[agg_ops.UnaryAggregateOp

return stats

def _get_labels_for_columns(self, column_ids: typing.Sequence[str]):
def _get_labels_for_columns(self, column_ids: typing.Sequence[str]) -> pd.Index:
"""Get column label for value columns, or index name for index columns"""
lookup = self.col_id_to_label
return [lookup.get(col_id, None) for col_id in column_ids]
indices = [self.value_columns.index(col_id) for col_id in column_ids]
return self.column_labels.take(indices, allow_fill=False)

def _normalize_expression(
self,
Expand Down Expand Up @@ -1255,7 +1269,7 @@ def _forward_slice(self, start: int = 0, stop=None, step: int = 1):

for cond in conditions:
block, cond_id = block.project_expr(cond)
block = block.filter(cond_id)
block = block.filter_by_id(cond_id)

return block.select_columns(self.value_columns)

Expand Down Expand Up @@ -1292,7 +1306,7 @@ def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
Block(
expr,
index_columns=self.index_columns,
column_labels=[label, *self.column_labels],
column_labels=self.column_labels.insert(0, label),
index_labels=self._index_labels,
),
result_id,
Expand Down Expand Up @@ -1391,10 +1405,9 @@ def pivot(
if values_in_index or len(values) > 1:
value_labels = self._get_labels_for_columns(values)
column_index = self._create_pivot_column_index(value_labels, columns_values)
return result_block.with_column_labels(column_index)
else:
column_index = columns_values

return result_block.with_column_labels(column_index)
return result_block.with_column_labels(columns_values)

def stack(self, how="left", levels: int = 1):
"""Unpivot last column axis level into row axis"""
Expand Down Expand Up @@ -1517,8 +1530,8 @@ def _column_type(self, col_id: str) -> bigframes.dtypes.Dtype:

@staticmethod
def _create_pivot_column_index(
value_labels: Sequence[typing.Hashable], columns_values: pd.Index
):
value_labels: pd.Index, columns_values: pd.Index
) -> pd.Index:
index_parts = []
for value in value_labels:
as_frame = columns_values.to_frame()
Expand Down
2 changes: 1 addition & 1 deletion 2 bigframes/core/indexes/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def drop(
block, condition_id = block.project_expr(
ops.ne_op.as_expr(level_id, ex.const(labels))
)
block = block.filter(condition_id, keep_null=True)
block = block.filter_by_id(condition_id, keep_null=True)
block = block.drop_columns([condition_id])
return Index(block)

Expand Down
10 changes: 5 additions & 5 deletions 10 bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ def _getitem_bool_series(self, key: bigframes.series.Series) -> DataFrame:
) = self._block.join(key._block, how="left")
block = combined_index
filter_col_id = get_column_right[key._value_column]
block = block.filter(filter_col_id)
block = block.filter_by_id(filter_col_id)
block = block.drop_columns([filter_col_id])
return DataFrame(block)

Expand Down Expand Up @@ -1193,7 +1193,7 @@ def drop(
block, condition_id = block.project_expr(
ops.ne_op.as_expr(level_id, ex.const(index))
)
block = block.filter(condition_id, keep_null=True).select_columns(
block = block.filter_by_id(condition_id, keep_null=True).select_columns(
self._block.value_columns
)
if columns:
Expand All @@ -1214,7 +1214,7 @@ def _drop_by_index(self, index: indexes.Index) -> DataFrame:
ops.isnull_op,
)

drop_block = drop_block.filter(drop_col)
drop_block = drop_block.filter_by_id(drop_col)
original_columns = [
get_column_left[column] for column in self._block.value_columns
]
Expand Down Expand Up @@ -1558,7 +1558,7 @@ def _filter_rows(
label_string_id, ops.StrContainsRegexOp(pat=regex)
)

block = block.filter(mask_id)
block = block.filter_by_id(mask_id)
block = block.select_columns(self._block.value_columns)
return DataFrame(block)
elif items is not None:
Expand All @@ -1567,7 +1567,7 @@ def _filter_rows(
block, mask_id = block.apply_unary_op(
self._block.index_columns[0], ops.IsInOp(values=tuple(items))
)
block = block.filter(mask_id)
block = block.filter_by_id(mask_id)
block = block.select_columns(self._block.value_columns)
return DataFrame(block)
else:
Expand Down
10 changes: 5 additions & 5 deletions 10 bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def drop(
block, condition_id = block.project_expr(
ops.ne_op.as_expr(level_id, ex.const(index))
)
block = block.filter(condition_id, keep_null=True)
block = block.filter_by_id(condition_id, keep_null=True)
block = block.drop_columns([condition_id])
return Series(block.select_column(self._value_column))

Expand Down Expand Up @@ -861,7 +861,7 @@ def mode(self) -> Series:
max_value_count_col_id,
ops.eq_op,
)
block = block.filter(is_mode_col_id)
block = block.filter_by_id(is_mode_col_id)
# use temporary name for reset_index to avoid collision, restore after dropping extra columns
block = (
block.with_index_labels(["mode_temp_internal"])
Expand Down Expand Up @@ -1032,7 +1032,7 @@ def __getitem__(self, indexer):
return self.iloc[indexer]
if isinstance(indexer, Series):
(left, right, block) = self._align(indexer, "left")
block = block.filter(right)
block = block.filter_by_id(right)
block = block.select_column(left)
return Series(block)
return self.loc[indexer]
Expand Down Expand Up @@ -1304,7 +1304,7 @@ def filter(
label_string_id, ops.StrContainsRegexOp(pat=regex)
)

block = block.filter(mask_id)
block = block.filter_by_id(mask_id)
block = block.select_columns([self._value_column])
return Series(block)
elif items is not None:
Expand All @@ -1313,7 +1313,7 @@ def filter(
block, mask_id = block.apply_unary_op(
self._block.index_columns[0], ops.IsInOp(values=tuple(items))
)
block = block.filter(mask_id)
block = block.filter_by_id(mask_id)
block = block.select_columns([self._value_column])
return Series(block)
else:
Expand Down
18 changes: 16 additions & 2 deletions 18 tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ def test_assign_callable_lambda(scalars_dfs):
assert_pandas_df_equal(bf_result, pd_result)


@skip_legacy_pandas
@pytest.mark.parametrize(
("axis", "how", "ignore_index"),
[
Expand All @@ -852,8 +853,6 @@ def test_assign_callable_lambda(scalars_dfs):
],
)
def test_df_dropna(scalars_dfs, axis, how, ignore_index):
if pd.__version__.startswith("1."):
pytest.skip("ignore_index parameter not supported in pandas 1.x.")
scalars_df, scalars_pandas_df = scalars_dfs
df = scalars_df.dropna(axis=axis, how=how, ignore_index=ignore_index)
bf_result = df.to_pandas()
Expand All @@ -864,6 +863,21 @@ def test_df_dropna(scalars_dfs, axis, how, ignore_index):
pandas.testing.assert_frame_equal(bf_result, pd_result)


@skip_legacy_pandas
def test_df_dropna_range_columns(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
scalars_df = scalars_df.copy()
scalars_pandas_df = scalars_pandas_df.copy()
scalars_df.columns = pandas.RangeIndex(0, len(scalars_df.columns))
scalars_pandas_df.columns = pandas.RangeIndex(0, len(scalars_pandas_df.columns))

df = scalars_df.dropna()
bf_result = df.to_pandas()
pd_result = scalars_pandas_df.dropna()

pandas.testing.assert_frame_equal(bf_result, pd_result)


def test_df_interpolate(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
columns = ["int64_col", "int64_too", "float64_col"]
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.