diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 2a888125f8..5965c96374 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -27,7 +27,7 @@ import os import random import typing -from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple +from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple, Union import warnings import google.cloud.bigquery as bigquery @@ -105,6 +105,8 @@ def __init__( index_columns: Iterable[str], column_labels: typing.Union[pd.Index, typing.Iterable[Label]], index_labels: typing.Union[pd.Index, typing.Iterable[Label], None] = None, + *, + transpose_cache: Optional[Block] = None, ): """Construct a block object, will create default index if no index columns specified.""" index_columns = list(index_columns) @@ -144,6 +146,7 @@ def __init__( # TODO(kemppeterson) Add a cache for corr to parallel the single-column stats. self._stats_cache[" ".join(self.index_columns)] = {} + self._transpose_cache: Optional[Block] = transpose_cache @classmethod def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block: @@ -716,6 +719,15 @@ def with_column_labels( index_labels=self.index.names, ) + def with_transpose_cache(self, transposed: Block): + return Block( + self._expr, + index_columns=self.index_columns, + column_labels=self._column_labels, + index_labels=self.index.names, + transpose_cache=transposed, + ) + def with_index_labels(self, value: typing.Sequence[Label]) -> Block: if len(value) != len(self.index_columns): raise ValueError( @@ -804,18 +816,35 @@ def multi_apply_window_op( def multi_apply_unary_op( self, columns: typing.Sequence[str], - op: ops.UnaryOp, + op: Union[ops.UnaryOp, ex.Expression], ) -> Block: + if isinstance(op, ops.UnaryOp): + input_varname = guid.generate_guid() + expr = op.as_expr(input_varname) + else: + input_varnames = op.unbound_variables + assert len(input_varnames) == 1 + expr = op + input_varname = input_varnames[0] + block = self - for i, col_id in enumerate(columns): + for col_id in columns: label = self.col_id_to_label[col_id] - block, result_id = block.apply_unary_op( - col_id, - op, - result_label=label, + block, result_id = block.project_expr( + expr.bind_all_variables({input_varname: ex.free_var(col_id)}), + label=label, ) block = block.copy_values(result_id, col_id) block = block.drop_columns([result_id]) + # Special case, we can preserve transpose cache for full-frame unary ops + if (self._transpose_cache is not None) and set(self.value_columns) == set( + columns + ): + transpose_columns = self._transpose_cache.value_columns + new_transpose_cache = self._transpose_cache.multi_apply_unary_op( + transpose_columns, op + ) + block = block.with_transpose_cache(new_transpose_cache) return block def apply_window_op( @@ -922,20 +951,17 @@ def aggregate_all_and_stack( (ex.UnaryAggregation(operation, ex.free_var(col_id)), col_id) for col_id in self.value_columns ] - index_col_ids = [ - guid.generate_guid() for i in range(self.column_labels.nlevels) - ] - result_expr = self.expr.aggregate(aggregations, dropna=dropna).unpivot( - row_labels=self.column_labels.to_list(), - index_col_ids=index_col_ids, - unpivot_columns=tuple([(value_col_id, tuple(self.value_columns))]), - ) + index_id = guid.generate_guid() + result_expr = self.expr.aggregate( + aggregations, dropna=dropna + ).assign_constant(index_id, None, None) + # Transpose as last operation so that final block has valid transpose cache return Block( result_expr, - index_columns=index_col_ids, - column_labels=[None], - index_labels=self.column_labels.names, - ) + index_columns=[index_id], + column_labels=self.column_labels, + index_labels=[None], + ).transpose(original_row_index=pd.Index([None])) else: # axis_n == 1 # using offsets as identity to group on. # TODO: Allow to promote identity/total_order columns instead for better perf @@ -1575,10 +1601,19 @@ def melt( index_columns=[index_id], ) - def transpose(self) -> Block: - """Transpose the block. Will fail if dtypes aren't coercible to a common type or too many rows""" + def transpose(self, *, original_row_index: Optional[pd.Index] = None) -> Block: + """Transpose the block. Will fail if dtypes aren't coercible to a common type or too many rows. + Can provide the original_row_index directly if it is already known, otherwise a query is needed. + """ + if self._transpose_cache is not None: + return self._transpose_cache.with_transpose_cache(self) + original_col_index = self.column_labels - original_row_index = self.index.to_pandas() + original_row_index = ( + original_row_index + if original_row_index is not None + else self.index.to_pandas() + ) original_row_count = len(original_row_index) if original_row_count > bigframes.constants.MAX_COLUMNS: raise NotImplementedError( @@ -1619,6 +1654,7 @@ def transpose(self) -> Block: result.with_column_labels(original_row_index) .order_by([ordering.ascending_over(result.index_columns[-1])]) .drop_levels([result.index_columns[-1]]) + .with_transpose_cache(self) ) def _create_stack_column( diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index a59d599679..88c1006c79 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -823,17 +823,14 @@ def to_sql( for col in baked_ir.column_ids ] selection = ", ".join(map(lambda col_id: f"`{col_id}`", output_columns)) - order_by_clause = baked_ir._ordering_clause( - baked_ir._ordering.all_ordering_columns - ) - sql = textwrap.dedent( - f"SELECT {selection}\n" - "FROM (\n" - f"{sql}\n" - ")\n" - f"{order_by_clause}\n" - ) + sql = textwrap.dedent(f"SELECT {selection}\n" "FROM (\n" f"{sql}\n" ")\n") + # Single row frames may not have any ordering columns + if len(baked_ir._ordering.all_ordering_columns) > 0: + order_by_clause = baked_ir._ordering_clause( + baked_ir._ordering.all_ordering_columns + ) + sql += f"{order_by_clause}\n" else: sql = ibis_bigquery.Backend().compile( self._to_ibis_expr( diff --git a/bigframes/core/ordering.py b/bigframes/core/ordering.py index 2543a3b722..9009e31be3 100644 --- a/bigframes/core/ordering.py +++ b/bigframes/core/ordering.py @@ -167,6 +167,8 @@ def _truncate_ordering( truncated_refs.append(order_part) if columns_seen.issuperset(must_see): return tuple(truncated_refs) + if len(must_see) == 0: + return () raise ValueError("Ordering did not contain all total_order_cols") def with_reverse(self): diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index a55b7f569b..48c4af7a37 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -693,18 +693,19 @@ def _apply_binop( def _apply_scalar_binop( self, other: float | int, op: ops.BinaryOp, reverse: bool = False ) -> DataFrame: - block = self._block - for column_id, label in zip( - self._block.value_columns, self._block.column_labels - ): - expr = ( - op.as_expr(ex.const(other), column_id) - if reverse - else op.as_expr(column_id, ex.const(other)) + if reverse: + expr = op.as_expr( + left_input=ex.const(other), + right_input=bigframes.core.guid.generate_guid(), ) - block, _ = block.project_expr(expr, label) - block = block.drop_columns([column_id]) - return DataFrame(block) + else: + expr = op.as_expr( + left_input=bigframes.core.guid.generate_guid(), + right_input=ex.const(other), + ) + return DataFrame( + self._block.multi_apply_unary_op(self._block.value_columns, expr) + ) def _apply_series_binop_axis_0( self, @@ -1974,7 +1975,7 @@ def any( else: frame = self._drop_non_bool() block = frame._block.aggregate_all_and_stack(agg_ops.any_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def all( self, axis: typing.Union[str, int] = 0, *, bool_only: bool = False @@ -1984,7 +1985,7 @@ def all( else: frame = self._drop_non_bool() block = frame._block.aggregate_all_and_stack(agg_ops.all_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def sum( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -1994,7 +1995,7 @@ def sum( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.sum_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def mean( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -2004,7 +2005,7 @@ def mean( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.mean_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def median( self, *, numeric_only: bool = False, exact: bool = True @@ -2019,7 +2020,7 @@ def median( return result else: block = frame._block.aggregate_all_and_stack(agg_ops.median_op) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def quantile( self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False @@ -2052,7 +2053,7 @@ def std( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.std_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def var( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -2062,7 +2063,7 @@ def var( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.var_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def min( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -2072,7 +2073,7 @@ def min( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.min_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def max( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -2082,7 +2083,7 @@ def max( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.max_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def prod( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -2092,7 +2093,7 @@ def prod( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.product_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) product = prod product.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.prod) @@ -2103,11 +2104,11 @@ def count(self, *, numeric_only: bool = False) -> bigframes.series.Series: else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.count_op) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def nunique(self) -> bigframes.series.Series: block = self._block.aggregate_all_and_stack(agg_ops.nunique_op) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def agg( self, func: str | typing.Sequence[str] diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index f41a21add0..d33cb7d148 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2488,6 +2488,20 @@ def test_df_transpose_error(): dataframe.DataFrame([[1, "hello"], [2, "world"]]).transpose() +def test_df_transpose_repeated_uses_cache(): + bf_df = dataframe.DataFrame([[1, 2.5], [2, 3.5]]) + pd_df = pandas.DataFrame([[1, 2.5], [2, 3.5]]) + # Transposing many times so that operation will fail from complexity if not using cache + for i in range(10): + # Cache still works even with simple scalar binop + bf_df = bf_df.transpose() + i + pd_df = pd_df.transpose() + i + + pd.testing.assert_frame_equal( + pd_df, bf_df.to_pandas(), check_dtype=False, check_index_type=False + ) + + @pytest.mark.parametrize( ("ordered"), [