diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 3fa690ef37..9e6b86fc30 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -354,10 +354,7 @@ def unpivot( *, passthrough_columns: typing.Sequence[str] = (), index_col_ids: typing.Sequence[str] = ["index"], - dtype: typing.Union[ - bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...] - ] = pandas.Float64Dtype(), - how: typing.Literal["left", "right"] = "left", + join_side: typing.Literal["left", "right"] = "left", ) -> ArrayValue: """ Unpivot ArrayValue columns. @@ -367,23 +364,88 @@ def unpivot( unpivot_columns: Mapping of column id to list of input column ids. Lists of input columns may use None. passthrough_columns: Columns that will not be unpivoted. Column id will be preserved. index_col_id (str): The column id to be used for the row labels. - dtype (dtype or list of dtype): Dtype to use for the unpivot columns. If list, must be equal in number to unpivot_columns. Returns: ArrayValue: The unpivoted ArrayValue """ + # There will be N labels, used to disambiguate which of N source columns produced each output row + explode_offsets_id = bigframes.core.guid.generate_guid("unpivot_offsets_") + labels_array = self._create_unpivot_labels_array(row_labels, index_col_ids) + labels_array = labels_array.promote_offsets(explode_offsets_id) + + # Unpivot creates N output rows for each input row, labels disambiguate these N rows + joined_array = self._cross_join_w_labels(labels_array, join_side) + + # Build the output rows as a case statment that selects between the N input columns + unpivot_exprs = [] + # Supports producing multiple stacked ouput columns for stacking only part of hierarchical index + for col_id, input_ids in unpivot_columns: + # row explode offset used to choose the input column + # we use offset instead of label as labels are not necessarily unique + cases = tuple( + ( + ops.eq_op.as_expr(explode_offsets_id, ex.const(i)), + ex.free_var(id_or_null) + if (id_or_null is not None) + else ex.const(None), + ) + for i, id_or_null in enumerate(input_ids) + ) + col_expr = ops.case_when_op.as_expr(*cases) + unpivot_exprs.append((col_expr, col_id)) + + label_exprs = ((ex.free_var(id), id) for id in index_col_ids) + # passthrough columns are unchanged, just repeated N times each + passthrough_exprs = ((ex.free_var(id), id) for id in passthrough_columns) return ArrayValue( - nodes.UnpivotNode( - child=self.node, - row_labels=tuple(row_labels), - unpivot_columns=tuple(unpivot_columns), - passthrough_columns=tuple(passthrough_columns), - index_col_ids=tuple(index_col_ids), - dtype=dtype, - how=how, + nodes.ProjectionNode( + child=joined_array.node, + assignments=(*label_exprs, *unpivot_exprs, *passthrough_exprs), ) ) + def _cross_join_w_labels( + self, labels_array: ArrayValue, join_side: typing.Literal["left", "right"] + ) -> ArrayValue: + """ + Convert each row in self to N rows, one for each label in labels array. + """ + table_join_side = ( + join_def.JoinSide.LEFT if join_side == "left" else join_def.JoinSide.RIGHT + ) + labels_join_side = table_join_side.inverse() + labels_mappings = tuple( + join_def.JoinColumnMapping(labels_join_side, id, id) + for id in labels_array.schema.names + ) + table_mappings = tuple( + join_def.JoinColumnMapping(table_join_side, id, id) + for id in self.schema.names + ) + join = join_def.JoinDefinition( + conditions=(), mappings=(*labels_mappings, *table_mappings), type="cross" + ) + if join_side == "left": + joined_array = self.join(labels_array, join_def=join) + else: + joined_array = labels_array.join(self, join_def=join) + return joined_array + + def _create_unpivot_labels_array( + self, + former_column_labels: typing.Sequence[typing.Hashable], + col_ids: typing.Sequence[str], + ) -> ArrayValue: + """Create an ArrayValue from a list of label tuples.""" + rows = [] + for row_offset in range(len(former_column_labels)): + row_label = former_column_labels[row_offset] + row_label = (row_label,) if not isinstance(row_label, tuple) else row_label + row = {col_ids[i]: row_label[i] for i in range(len(col_ids))} + rows.append(row) + + return ArrayValue.from_pyarrow(pa.Table.from_pylist(rows), session=self.session) + def join( self, other: ArrayValue, diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index 1eae73014c..562689a736 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -857,5 +857,5 @@ def _idx_extrema( # Stack the entire column axis to produce single-column result # Assumption: uniform dtype for stackability return block.aggregate_all_and_stack( - agg_ops.AnyValueOp(), dtype=block.dtypes[0] + agg_ops.AnyValueOp(), ).with_column_labels([original_block.index.name]) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index f6850020df..0f9cacd83d 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -914,9 +914,6 @@ def aggregate_all_and_stack( axis: int | str = 0, value_col_id: str = "values", dropna: bool = True, - dtype: typing.Union[ - bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...] - ] = pd.Float64Dtype(), ) -> Block: axis_n = utils.get_axis_number(axis) if axis_n == 0: @@ -931,7 +928,6 @@ def aggregate_all_and_stack( row_labels=self.column_labels.to_list(), index_col_ids=index_col_ids, unpivot_columns=tuple([(value_col_id, tuple(self.value_columns))]), - dtype=dtype, ) return Block( result_expr, @@ -949,7 +945,6 @@ def aggregate_all_and_stack( index_col_ids=[guid.generate_guid()], unpivot_columns=[(value_col_id, tuple(self.value_columns))], passthrough_columns=[*self.index_columns, offset_col], - dtype=dtype, ) index_aggregations = [ (ex.UnaryAggregation(agg_ops.AnyValueOp(), ex.free_var(col_id)), col_id) @@ -1512,13 +1507,10 @@ def stack(self, how="left", levels: int = 1): # Get matching columns unpivot_columns: List[Tuple[str, List[str]]] = [] - dtypes = [] for val in result_col_labels: col_id = guid.generate_guid("unpivot_") input_columns, dtype = self._create_stack_column(val, row_label_tuples) unpivot_columns.append((col_id, input_columns)) - if dtype: - dtypes.append(dtype or pd.Float64Dtype()) added_index_columns = [guid.generate_guid() for _ in range(row_labels.nlevels)] unpivot_expr = self._expr.unpivot( @@ -1526,8 +1518,7 @@ def stack(self, how="left", levels: int = 1): passthrough_columns=self.index_columns, unpivot_columns=unpivot_columns, index_col_ids=added_index_columns, - dtype=tuple(dtypes), - how=how, + join_side=how, ) new_index_level_names = self.column_labels.names[-levels:] if how == "left": @@ -1559,15 +1550,12 @@ def melt( value_labels = [self.col_id_to_label[col_id] for col_id in value_vars] id_labels = [self.col_id_to_label[col_id] for col_id in id_vars] - dtype = self._expr.get_column_type(value_vars[0]) - unpivot_expr = self._expr.unpivot( row_labels=value_labels, passthrough_columns=id_vars, unpivot_columns=(unpivot_col,), index_col_ids=var_col_ids, - dtype=dtype, - how="right", + join_side="right", ) index_id = guid.generate_guid() unpivot_expr = unpivot_expr.promote_offsets(index_id) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index f1c5d62010..a59d599679 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -40,10 +40,8 @@ OrderingExpression, ) import bigframes.core.schema as schemata -import bigframes.core.utils as utils from bigframes.core.window_spec import WindowSpec import bigframes.dtypes -import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops ORDER_ID_COLUMN = "bigframes_ordering_id" @@ -109,36 +107,6 @@ def filter(self: T, predicate: ex.Expression) -> T: """Filter the table on a given expression, the predicate must be a boolean expression.""" ... - @abc.abstractmethod - def unpivot( - self: T, - row_labels: typing.Sequence[typing.Hashable], - unpivot_columns: typing.Sequence[ - typing.Tuple[str, typing.Sequence[typing.Optional[str]]] - ], - *, - passthrough_columns: typing.Sequence[str] = (), - index_col_ids: typing.Sequence[str] = ["index"], - dtype: typing.Union[ - bigframes.dtypes.Dtype, typing.Sequence[bigframes.dtypes.Dtype] - ] = pandas.Float64Dtype(), - how="left", - ) -> T: - """ - Unpivot ArrayValue columns. - - Args: - row_labels: Identifies the source of the row. Must be equal to length to source column list in unpivot_columns argument. - unpivot_columns: Mapping of column id to list of input column ids. Lists of input columns may use None. - passthrough_columns: Columns that will not be unpivoted. Column id will be preserved. - index_col_id (str): The column id to be used for the row labels. - dtype (dtype or list of dtype): Dtype to use for the unpivot columns. If list, must be equal in number to unpivot_columns. - - Returns: - ArrayValue: The unpivoted ArrayValue - """ - ... - @abc.abstractmethod def _reproject_to_table(self: T) -> T: """ @@ -332,115 +300,6 @@ def _filter(self, predicate_value: ibis_types.BooleanValue) -> UnorderedIR: expr.predicates = [*self._predicates, predicate_value] return expr.build() - def unpivot( - self, - row_labels: typing.Sequence[typing.Hashable], - unpivot_columns: typing.Sequence[ - typing.Tuple[str, typing.Sequence[typing.Optional[str]]] - ], - *, - passthrough_columns: typing.Sequence[str] = (), - index_col_ids: typing.Sequence[str] = ["index"], - dtype: typing.Union[ - bigframes.dtypes.Dtype, typing.Sequence[bigframes.dtypes.Dtype] - ] = pandas.Float64Dtype(), - how="left", - ) -> UnorderedIR: - if how not in ("left", "right"): - raise ValueError("'how' must be 'left' or 'right'") - table = self._to_ibis_expr() - row_n = len(row_labels) - if not all( - len(source_columns) == row_n for _, source_columns in unpivot_columns - ): - raise ValueError("Columns and row labels must all be same length.") - - unpivot_offset_id = bigframes.core.guid.generate_guid("unpivot_offsets_") - unpivot_table = table.cross_join( - ibis.memtable({unpivot_offset_id: range(row_n)}) - ) - # Use ibis memtable to infer type of rowlabels (if possible) - # TODO: Allow caller to specify dtype - if isinstance(row_labels[0], tuple): - labels_table = ibis.memtable(row_labels) - labels_ibis_types = [ - labels_table[col].type() for col in labels_table.columns - ] - else: - labels_ibis_types = [ibis.memtable({"col": row_labels})["col"].type()] - labels_dtypes = [ - bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type) - for ibis_type in labels_ibis_types - ] - - label_columns = [] - for label_part, (col_id, label_dtype) in enumerate( - zip(index_col_ids, labels_dtypes) - ): - # interpret as tuples even if it wasn't originally so can apply same logic for multi-column labels - labels_as_tuples = [ - label if isinstance(label, tuple) else (label,) for label in row_labels - ] - cases = [ - ( - i, - bigframes.dtypes.literal_to_ibis_scalar( - label_tuple[label_part], # type:ignore - force_dtype=label_dtype, # type:ignore - ), - ) - for i, label_tuple in enumerate(labels_as_tuples) - ] - labels_value = ( - typing.cast(ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id]) - .cases(cases, default=None) # type:ignore - .name(col_id) - ) - label_columns.append(labels_value) - - unpivot_values = [] - for j in range(len(unpivot_columns)): - col_dtype = dtype[j] if utils.is_list_like(dtype) else dtype - result_col, source_cols = unpivot_columns[j] - null_value = bigframes.dtypes.literal_to_ibis_scalar( - None, force_dtype=col_dtype - ) - ibis_values = [ - op_compiler.compile_row_op( - ops.AsTypeOp(col_dtype), (unpivot_table[col],) - ) - if col is not None - else null_value - for col in source_cols - ] - cases = [(i, ibis_values[i]) for i in range(len(ibis_values))] - unpivot_value = typing.cast( - ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id] - ).cases( - cases, default=null_value # type:ignore - ) - unpivot_values.append(unpivot_value.name(result_col)) - - unpivot_table = unpivot_table.select( - passthrough_columns, - *label_columns, - *unpivot_values, - unpivot_offset_id, - ) - - value_columns = [ - unpivot_table[value_col_id] for value_col_id, _ in unpivot_columns - ] - passthrough_values = [unpivot_table[col] for col in passthrough_columns] - return UnorderedIR( - table=unpivot_table, - columns=[ - *[unpivot_table[col_id] for col_id in index_col_ids], - *value_columns, - *passthrough_values, - ], - ) - def aggregate( self, aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]], @@ -920,149 +779,6 @@ def project_window_op( # TODO(tbergeron): Automatically track analytic expression usage and defer reprojection until required for valid query generation. return result._reproject_to_table() if not skip_reproject_unsafe else result - def unpivot( - self, - row_labels: typing.Sequence[typing.Hashable], - unpivot_columns: typing.Sequence[ - typing.Tuple[str, typing.Sequence[typing.Optional[str]]] - ], - *, - passthrough_columns: typing.Sequence[str] = (), - index_col_ids: typing.Sequence[str] = ["index"], - dtype: typing.Union[ - bigframes.dtypes.Dtype, typing.Sequence[bigframes.dtypes.Dtype] - ] = pandas.Float64Dtype(), - how="left", - ) -> OrderedIR: - if how not in ("left", "right"): - raise ValueError("'how' must be 'left' or 'right'") - table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True) - row_n = len(row_labels) - hidden_col_ids = self._hidden_ordering_column_names.keys() - if not all( - len(source_columns) == row_n for _, source_columns in unpivot_columns - ): - raise ValueError("Columns and row labels must all be same length.") - - unpivot_offset_id = bigframes.core.guid.generate_guid("unpivot_offsets_") - unpivot_table = table.cross_join( - ibis.memtable({unpivot_offset_id: range(row_n)}) - ) - # Use ibis memtable to infer type of rowlabels (if possible) - # TODO: Allow caller to specify dtype - if isinstance(row_labels[0], tuple): - labels_table = ibis.memtable(row_labels) - labels_ibis_types = [ - labels_table[col].type() for col in labels_table.columns - ] - else: - labels_ibis_types = [ibis.memtable({"col": row_labels})["col"].type()] - labels_dtypes = [ - bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type) - for ibis_type in labels_ibis_types - ] - - label_columns = [] - for label_part, (col_id, label_dtype) in enumerate( - zip(index_col_ids, labels_dtypes) - ): - # interpret as tuples even if it wasn't originally so can apply same logic for multi-column labels - labels_as_tuples = [ - label if isinstance(label, tuple) else (label,) for label in row_labels - ] - cases = [ - ( - i, - bigframes.dtypes.literal_to_ibis_scalar( - label_tuple[label_part], # type:ignore - force_dtype=label_dtype, # type:ignore - ), - ) - for i, label_tuple in enumerate(labels_as_tuples) - ] - labels_value = ( - typing.cast(ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id]) - .cases(cases, default=None) # type:ignore - .name(col_id) - ) - label_columns.append(labels_value) - - unpivot_values = [] - for j in range(len(unpivot_columns)): - col_dtype = dtype[j] if utils.is_list_like(dtype) else dtype - result_col, source_cols = unpivot_columns[j] - null_value = bigframes.dtypes.literal_to_ibis_scalar( - None, force_dtype=col_dtype - ) - ibis_values = [ - op_compiler.compile_row_op( - ops.AsTypeOp(col_dtype), (unpivot_table[col],) - ) - if col is not None - else null_value - for col in source_cols - ] - cases = [(i, ibis_values[i]) for i in range(len(ibis_values))] - unpivot_value = typing.cast( - ibis_types.IntegerColumn, unpivot_table[unpivot_offset_id] - ).cases( - cases, default=null_value # type:ignore - ) - unpivot_values.append(unpivot_value.name(result_col)) - - unpivot_table = unpivot_table.select( - passthrough_columns, - *label_columns, - *unpivot_values, - *hidden_col_ids, - unpivot_offset_id, - ) - - # Extend the original ordering using unpivot_offset_id - old_ordering = self._ordering - if how == "left": - new_ordering = ExpressionOrdering( - ordering_value_columns=tuple( - [ - *old_ordering.ordering_value_columns, - ascending_over(unpivot_offset_id), - ] - ), - total_ordering_columns=frozenset( - [*old_ordering.total_ordering_columns, unpivot_offset_id] - ), - ) - else: # how=="right" - new_ordering = ExpressionOrdering( - ordering_value_columns=tuple( - [ - ascending_over(unpivot_offset_id), - *old_ordering.ordering_value_columns, - ] - ), - total_ordering_columns=frozenset( - [*old_ordering.total_ordering_columns, unpivot_offset_id] - ), - ) - value_columns = [ - unpivot_table[value_col_id] for value_col_id, _ in unpivot_columns - ] - passthrough_values = [unpivot_table[col] for col in passthrough_columns] - hidden_ordering_columns = [ - unpivot_table[unpivot_offset_id], - *[unpivot_table[hidden_col] for hidden_col in hidden_col_ids], - ] - return OrderedIR( - table=unpivot_table, - columns=[ - *[unpivot_table[col_id] for col_id in index_col_ids], - *value_columns, - *passthrough_values, - ], - hidden_ordering_columns=hidden_ordering_columns, - ordering=new_ordering, - ) - def _reproject_to_table(self) -> OrderedIR: table = self._to_ibis_expr( ordering_mode="unordered", diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 638e3eacdd..a68023d13d 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -179,18 +179,6 @@ def compile_reproject(node: nodes.ReprojectOpNode, ordered: bool = True): return compile_node(node.child, ordered)._reproject_to_table() -@_compile_node.register -def compile_unpivot(node: nodes.UnpivotNode, ordered: bool = True): - return compile_node(node.child, ordered).unpivot( - node.row_labels, - node.unpivot_columns, - passthrough_columns=node.passthrough_columns, - index_col_ids=node.index_col_ids, - dtype=node.dtype, - how=node.how, - ) - - @_compile_node.register def compiler_explode(node: nodes.ExplodeNode, ordered: bool = True): return compile_node(node.child, ordered).explode(node.column_ids) diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index 53a25d63ed..072d974b39 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -189,6 +189,25 @@ def normalized_impl(args: typing.Sequence[ibis_types.Value], op: ops.RowOp): return decorator + def register_nary_op(self, op_ref: typing.Union[ops.NaryOp, type[ops.NaryOp]]): + """ + Decorator to register a nary op implementation. + + Args: + op_ref (NaryOp or NaryOp type): + Class or instance of operator that is implemented by the decorated function. + """ + key = typing.cast(str, op_ref.name) + + def decorator(impl: typing.Callable[..., ibis_types.Value]): + def normalized_impl(args: typing.Sequence[ibis_types.Value], op: ops.RowOp): + return impl(*args) + + self._register(key, normalized_impl) + return impl + + return decorator + def _register( self, op_name: str, @@ -1346,6 +1365,25 @@ def clip_op( ) +@scalar_op_compiler.register_nary_op(ops.case_when_op) +def switch_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value: + # ibis can handle most type coercions, but we need to force bool -> int + # TODO: dispatch coercion depending on bigframes dtype schema + result_values = cases_and_outputs[1::2] + do_upcast_bool = any(t.type().is_numeric() for t in result_values) + if do_upcast_bool: + # Just need to upcast to int, ibis can handle further coercion + result_values = tuple( + val.cast(ibis_dtypes.int64) if val.type().is_boolean() else val + for val in result_values + ) + + case_val = ibis.case() + for predicate, output in zip(cases_and_outputs[::2], result_values): + case_val = case_val.when(predicate, output) + return case_val.end() + + # Helpers def is_null(value) -> bool: # float NaN/inf should be treated as distinct from 'true' null values diff --git a/bigframes/core/expression.py b/bigframes/core/expression.py index 4980f5369d..70eb519a1b 100644 --- a/bigframes/core/expression.py +++ b/bigframes/core/expression.py @@ -190,9 +190,6 @@ class OpExpression(Expression): op: bigframes.operations.RowOp inputs: typing.Tuple[Expression, ...] - def __post_init__(self): - assert self.op.arguments == len(self.inputs) - @property def unbound_variables(self) -> typing.Tuple[str, ...]: return tuple( diff --git a/bigframes/core/join_def.py b/bigframes/core/join_def.py index 4646a0d6ae..632a1864da 100644 --- a/bigframes/core/join_def.py +++ b/bigframes/core/join_def.py @@ -22,6 +22,11 @@ class JoinSide(enum.Enum): LEFT = 0 RIGHT = 1 + def inverse(self) -> JoinSide: + if self == JoinSide.LEFT: + return JoinSide.RIGHT + return JoinSide.LEFT + JoinType = Literal["inner", "outer", "left", "right", "cross"] diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index a1072b0d68..688e165732 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -21,8 +21,6 @@ import typing from typing import Callable, Tuple -import pandas - import bigframes.core.expression as ex import bigframes.core.guid from bigframes.core.join_def import JoinColumnMapping, JoinDefinition, JoinSide @@ -579,88 +577,6 @@ def relation_ops_created(self) -> int: return 0 -@dataclass(frozen=True) -class UnpivotNode(UnaryNode): - # TODO: Refactor unpivot - row_labels: typing.Tuple[typing.Hashable, ...] - unpivot_columns: typing.Tuple[ - typing.Tuple[str, typing.Tuple[typing.Optional[str], ...]], ... - ] - passthrough_columns: typing.Tuple[str, ...] = () - index_col_ids: typing.Tuple[str, ...] = ("index",) - dtype: typing.Union[ - bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...] - ] = (pandas.Float64Dtype(),) - how: typing.Literal["left", "right"] = "left" - - def __hash__(self): - return self._node_hash - - @property - def row_preserving(self) -> bool: - return False - - @property - def non_local(self) -> bool: - return True - - @property - def joins(self) -> bool: - return True - - @functools.cached_property - def schema(self) -> schemata.ArraySchema: - def infer_dtype( - values: typing.Iterable[typing.Hashable], - ) -> bigframes.dtypes.Dtype: - item_types = map(lambda x: bigframes.dtypes.infer_literal_type(x), values) - etype = functools.reduce( - lambda t1, t2: bigframes.dtypes.lcd_type(t1, t2) - if (t1 and t2) - else None, - item_types, - ) - return bigframes.dtypes.dtype_for_etype(etype) - - label_tuples = [ - label if isinstance(label, tuple) else (label,) for label in self.row_labels - ] - idx_dtypes = [ - infer_dtype(map(lambda x: typing.cast(tuple, x)[i], label_tuples)) - for i in range(len(self.index_col_ids)) - ] - - index_items = [ - schemata.SchemaItem(id, dtype) - for id, dtype in zip(self.index_col_ids, idx_dtypes) - ] - value_dtypes = ( - self.dtype - if isinstance(self.dtype, tuple) - else (self.dtype,) * len(self.unpivot_columns) - ) - value_items = [ - schemata.SchemaItem(col[0], dtype) - for col, dtype in zip(self.unpivot_columns, value_dtypes) - ] - passthrough_items = [ - schemata.SchemaItem(id, self.child.schema.get_type(id)) - for id in self.passthrough_columns - ] - return schemata.ArraySchema((*index_items, *value_items, *passthrough_items)) - - @property - def variables_introduced(self) -> int: - return ( - len(self.schema.items) - len(self.passthrough_columns) + OVERHEAD_VARIABLES - ) - - @property - def relation_ops_created(self) -> int: - # Unpivot is essentially a cross join and a projection. - return 2 - - @dataclass(frozen=True) class RandomSampleNode(UnaryNode): fraction: float diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 953a89c34f..11e592542c 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1961,9 +1961,7 @@ def any( frame = self._raise_on_non_boolean("any") else: frame = self._drop_non_bool() - block = frame._block.aggregate_all_and_stack( - agg_ops.any_op, dtype=pandas.BooleanDtype(), axis=axis - ) + block = frame._block.aggregate_all_and_stack(agg_ops.any_op, axis=axis) return bigframes.series.Series(block.select_column("values")) def all( @@ -1973,9 +1971,7 @@ def all( frame = self._raise_on_non_boolean("all") else: frame = self._drop_non_bool() - block = frame._block.aggregate_all_and_stack( - agg_ops.all_op, dtype=pandas.BooleanDtype(), axis=axis - ) + block = frame._block.aggregate_all_and_stack(agg_ops.all_op, axis=axis) return bigframes.series.Series(block.select_column("values")) def sum( diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index d631ba8508..a7c385a2b8 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -15,7 +15,9 @@ from __future__ import annotations import dataclasses +import functools import typing +from typing import Tuple, Union import numpy as np import pandas as pd @@ -34,11 +36,6 @@ class RowOp(typing.Protocol): def name(self) -> str: ... - @property - def arguments(self) -> int: - """The number of column argument the operation takes""" - ... - def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: ... @@ -48,21 +45,29 @@ def order_preserving(self) -> bool: ... -# These classes can be used to create simple ops that don't take local parameters -# All is needed is a unique name, and to register an implementation in ibis_mappings.py @dataclasses.dataclass(frozen=True) -class UnaryOp: +class NaryOp: @property def name(self) -> str: raise NotImplementedError("RowOp abstract base class has no implementation") + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + raise NotImplementedError("Abstract operation has no output type") + + @property + def order_preserving(self) -> bool: + """Whether the row operation preserves total ordering. Can be pruned from ordering expressions.""" + return False + + +# These classes can be used to create simple ops that don't take local parameters +# All is needed is a unique name, and to register an implementation in ibis_mappings.py +@dataclasses.dataclass(frozen=True) +class UnaryOp(NaryOp): @property def arguments(self) -> int: return 1 - def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: - raise NotImplementedError("Abstract operation has no output type") - def as_expr( self, input_id: typing.Union[str, bigframes.core.expression.Expression] = "arg" ) -> bigframes.core.expression.Expression: @@ -72,25 +77,13 @@ def as_expr( self, (_convert_expr_input(input_id),) ) - @property - def order_preserving(self) -> bool: - """Whether the row operation preserves total ordering. Can be pruned from ordering expressions.""" - return False - @dataclasses.dataclass(frozen=True) -class BinaryOp: - @property - def name(self) -> str: - raise NotImplementedError("RowOp abstract base class has no implementation") - +class BinaryOp(NaryOp): @property def arguments(self) -> int: return 2 - def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: - raise NotImplementedError("Abstract operation has no output type") - def as_expr( self, left_input: typing.Union[str, bigframes.core.expression.Expression] = "arg1", @@ -106,25 +99,13 @@ def as_expr( ), ) - @property - def order_preserving(self) -> bool: - """Whether the row operation preserves total ordering. Can be pruned from ordering expressions.""" - return False - @dataclasses.dataclass(frozen=True) -class TernaryOp: - @property - def name(self) -> str: - raise NotImplementedError("RowOp abstract base class has no implementation") - +class TernaryOp(NaryOp): @property def arguments(self) -> int: return 3 - def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: - raise NotImplementedError("Abstract operation has no output type") - def as_expr( self, input1: typing.Union[str, bigframes.core.expression.Expression] = "arg1", @@ -142,11 +123,6 @@ def as_expr( ), ) - @property - def order_preserving(self) -> bool: - """Whether the row operation preserves total ordering. Can be pruned from ordering expressions.""" - return False - def _convert_expr_input( input: typing.Union[str, bigframes.core.expression.Expression] @@ -664,6 +640,46 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT clip_op = ClipOp() + +class CaseWhenOp(NaryOp): + name: typing.ClassVar[str] = "switch" + + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + assert len(input_types) % 2 == 0 + # predicate1, output1, predicate2, output2... + if not all(map(lambda x: x == dtypes.BOOL_DTYPE, input_types[::2])): + raise TypeError(f"Case inputs {input_types[::2]} must be boolean-valued") + output_expr_types = input_types[1::2] + return functools.reduce( + lambda t1, t2: dtypes.coerce_to_common(t1, t2), + output_expr_types, + ) + + def as_expr( + self, + *case_output_pairs: Tuple[ + Union[str | bigframes.core.expression.Expression], + Union[str | bigframes.core.expression.Expression], + ], + ) -> bigframes.core.expression.Expression: + import bigframes.core.expression + + # Keep this in sync with output_type and compilers + inputs: list[bigframes.core.expression.Expression] = [] + + for case, output in case_output_pairs: + inputs.append(_convert_expr_input(case)) + inputs.append(_convert_expr_input(output)) + + return bigframes.core.expression.OpExpression( + self, + tuple(inputs), + ) + + +case_when_op = CaseWhenOp() + + # Just parameterless unary ops for now # TODO: Parameter mappings NUMPY_TO_OP: typing.Final = { diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 7fef7a9dc7..4c598a682d 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2987,10 +2987,14 @@ def test_dataframe_aggregates(scalars_df_index, scalars_pandas_df_index, op, ord bf_result = bf_series.to_pandas(ordered=ordered) # Pandas may produce narrower numeric types, but bigframes always produces Float64 - pd_series = pd_series.astype("Float64") # Pandas has object index type + pd_series.index = pd_series.index.astype(pd.StringDtype(storage="pyarrow")) assert_series_equal( - pd_series, bf_result, check_index_type=False, ignore_order=not ordered + pd_series, + bf_result, + check_index_type=False, + ignore_order=not ordered, + check_dtype=False, ) @@ -3079,7 +3083,7 @@ def test_dataframe_bool_aggregates(scalars_df_index, scalars_pandas_df_index, op pd_series = op(scalars_pandas_df_index).astype("boolean") bf_result = bf_series.to_pandas() - # Pandas has object index type + pd_series.index = pd_series.index.astype(bf_result.index.dtype) pd.testing.assert_series_equal(pd_series, bf_result, check_index_type=False) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index e894900646..c692bdbfec 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4327,16 +4327,16 @@ def min(self, axis=0, *, numeric_only: bool = False): Finding the minimum value in each column (the default behavior without an explicit axis parameter). >>> df.min() - A 1.0 - B 2.0 - dtype: Float64 + A 1 + B 2 + dtype: Int64 Finding the minimum value in each row. >>> df.min(axis=1) - 0 1.0 - 1 3.0 - dtype: Float64 + 0 1 + 1 3 + dtype: Int64 Args: axis ({index (0), columns (1)}): @@ -4372,16 +4372,16 @@ def max(self, axis=0, *, numeric_only: bool = False): Finding the maximum value in each column (the default behavior without an explicit axis parameter). >>> df.max() - A 3.0 - B 4.0 - dtype: Float64 + A 3 + B 4 + dtype: Int64 Finding the maximum value in each row. >>> df.max(axis=1) - 0 2.0 - 1 4.0 - dtype: Float64 + 0 2 + 1 4 + dtype: Int64 Args: axis ({index (0), columns (1)}): @@ -4416,16 +4416,16 @@ def sum(self, axis=0, *, numeric_only: bool = False): Calculating the sum of each column (the default behavior without an explicit axis parameter). >>> df.sum() - A 4.0 - B 6.0 - dtype: Float64 + A 4 + B 6 + dtype: Int64 Calculating the sum of each row. >>> df.sum(axis=1) - 0 3.0 - 1 7.0 - dtype: Float64 + 0 3 + 1 7 + dtype: Int64 Args: axis ({index (0), columns (1)}): @@ -4500,9 +4500,9 @@ def median(self, *, numeric_only: bool = False, exact: bool = False): Finding the median value of each column. >>> df.median() - A 1.0 - B 2.0 - dtype: Float64 + A 1 + B 2 + dtype: Int64 Args: numeric_only (bool. default False): @@ -4748,10 +4748,10 @@ def count(self, *, numeric_only: bool = False): Counting non-NA values for each column: >>> df.count() - A 4.0 - B 5.0 - C 3.0 - dtype: Float64 + A 4 + B 5 + C 3 + dtype: Int64 Args: numeric_only (bool, default False): @@ -5051,17 +5051,17 @@ def melt(self, id_vars, value_vars, var_name, value_name): Using `melt` with `id_vars` and `value_vars`: >>> df.melt(id_vars='A', value_vars=['B', 'C']) - A variable value - 0 1.0 B 1 - 1 B 2 - 2 3.0 B 3 - 3 4.0 B 4 - 4 5.0 B 5 - 5 1.0 C - 6 C 3 - 7 3.0 C - 8 4.0 C 4 - 9 5.0 C 5 + A variable value + 0 1.0 B 1.0 + 1 B 2.0 + 2 3.0 B 3.0 + 3 4.0 B 4.0 + 4 5.0 B 5.0 + 5 1.0 C + 6 C 3.5 + 7 3.0 C + 8 4.0 C 4.5 + 9 5.0 C 5.0 [10 rows x 3 columns] @@ -5102,9 +5102,9 @@ def nunique(self): [3 rows x 2 columns] >>> df.nunique() - A 3.0 - B 2.0 - dtype: Float64 + A 3 + B 2 + dtype: Int64 Returns: bigframes.series.Series: Series with number of distinct elements. @@ -5313,9 +5313,9 @@ def agg(self, func): Using a single function: >>> df.agg('sum') - A 6.0 - B 6.0 - dtype: Float64 + A 6 + B 6 + dtype: Int64 Using a list of functions: diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 5e3b4c46ef..edefb334b3 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -593,9 +593,9 @@ def agg(self, func): 1 >>> s.agg(['min', 'max']) - min 1.0 - max 4.0 - dtype: Float64 + min 1 + max 4 + dtype: Int64 Args: func (function):