From 6d53b66450e8688c0cf739f2d8f611aa20230ec0 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Mon, 6 May 2024 21:31:53 +0000 Subject: [PATCH 01/16] feat: bigframes.bigquery.array_agg(SeriesGroupBy|DataFrameGroupby) --- bigframes/bigquery/__init__.py | 6 + bigframes/core/compile/aggregate_compiler.py | 183 +++++++++++++++--- bigframes/core/compile/compiled.py | 80 ++++++-- bigframes/core/compile/compiler.py | 16 +- bigframes/operations/aggregations.py | 18 +- .../ibis/backends/bigquery/registry.py | 14 ++ .../ibis/expr/operations/__init__.py | 2 +- .../expr/operations/{generic.py => arrays.py} | 7 +- .../ibis/expr/operations/reductions.py | 20 +- 9 files changed, 284 insertions(+), 62 deletions(-) rename third_party/bigframes_vendored/ibis/expr/operations/{generic.py => arrays.py} (64%) diff --git a/bigframes/bigquery/__init__.py b/bigframes/bigquery/__init__.py index 197e0a83b5..e03d40eda6 100644 --- a/bigframes/bigquery/__init__.py +++ b/bigframes/bigquery/__init__.py @@ -22,7 +22,9 @@ import typing +import bigframes.core.groupby as groupby import bigframes.operations as ops +import bigframes.operations.aggregations as agg_ops if typing.TYPE_CHECKING: import bigframes.series as series @@ -58,3 +60,7 @@ def array_length(series: series.Series) -> series.Series: """ return series._apply_unary_op(ops.len_op) + + +def array_agg(groupby_series: groupby.SeriesGroupBy) -> series.Series: + return groupby_series._aggregate(agg_ops.ArrayAggOp()) diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index 98d296c779..4147cbbfcd 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -34,13 +34,11 @@ def compile_aggregate( aggregate: ex.Aggregation, bindings: typing.Dict[str, ibis_types.Value], + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: if isinstance(aggregate, ex.UnaryAggregation): input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings) - return compile_unary_agg( - aggregate.op, - input, - ) + return compile_unary_agg(aggregate.op, input, agg_order_by=agg_order_by) elif isinstance(aggregate, ex.BinaryAggregation): left = scalar_compiler.compile_expression(aggregate.left, bindings=bindings) right = scalar_compiler.compile_expression(aggregate.right, bindings=bindings) @@ -54,6 +52,7 @@ def compile_analytic( window: window_spec.WindowSpec, bindings: typing.Dict[str, ibis_types.Value], ) -> ibis_types.Value: + # TODO: check how to trigger here! if isinstance(aggregate, ex.UnaryAggregation): input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings) return compile_unary_agg(aggregate.op, input, window) @@ -77,13 +76,19 @@ def compile_unary_agg( op: agg_ops.WindowOp, input: ibis_types.Column, window: Optional[window_spec.WindowSpec] = None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: raise ValueError(f"Can't compile unrecognized operation: {op}") def numeric_op(operation): @functools.wraps(operation) - def constrained_op(op, column: ibis_types.Column, window=None): + def constrained_op( + op, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], + ): if column.type().is_boolean(): column = typing.cast( ibis_types.NumericColumn, column.cast(ibis_dtypes.int64) @@ -104,7 +109,10 @@ def constrained_op(op, column: ibis_types.Column, window=None): @compile_unary_agg.register @numeric_op def _( - op: agg_ops.SumOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.SumOp, + column: ibis_types.NumericColumn, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # Will be null if all inputs are null. Pandas defaults to zero sum though. bq_sum = _apply_window_if_present(column.sum(), window) @@ -116,7 +124,10 @@ def _( @compile_unary_agg.register @numeric_op def _( - op: agg_ops.MedianOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.MedianOp, + column: ibis_types.NumericColumn, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # PERCENTILE_CONT has very few allowed windows. For example, "window # framing clause is not allowed for analytic function percentile_cont". @@ -134,7 +145,10 @@ def _( @compile_unary_agg.register @numeric_op def _( - op: agg_ops.ApproxQuartilesOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.ApproxQuartilesOp, + column: ibis_types.NumericColumn, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # PERCENTILE_CONT has very few allowed windows. For example, "window # framing clause is not allowed for analytic function percentile_cont". @@ -151,7 +165,10 @@ def _( @compile_unary_agg.register @numeric_op def _( - op: agg_ops.QuantileOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.QuantileOp, + column: ibis_types.NumericColumn, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: return _apply_window_if_present(column.quantile(op.q), window) @@ -159,7 +176,10 @@ def _( @compile_unary_agg.register @numeric_op def _( - op: agg_ops.MeanOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.MeanOp, + column: ibis_types.NumericColumn, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: return _apply_window_if_present(column.mean(), window) @@ -167,7 +187,10 @@ def _( @compile_unary_agg.register @numeric_op def _( - op: agg_ops.ProductOp, column: ibis_types.NumericColumn, window=None + op: agg_ops.ProductOp, + column: ibis_types.NumericColumn, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # Need to short-circuit as log with zeroes is illegal sql is_zero = cast(ibis_types.BooleanColumn, (column == 0)) @@ -202,30 +225,55 @@ def _( @compile_unary_agg.register -def _(op: agg_ops.MaxOp, column: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.MaxOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.Value: return _apply_window_if_present(column.max(), window) @compile_unary_agg.register -def _(op: agg_ops.MinOp, column: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.MinOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.Value: return _apply_window_if_present(column.min(), window) @compile_unary_agg.register @numeric_op -def _(op: agg_ops.StdOp, x: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.StdOp, + x: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.Value: return _apply_window_if_present(cast(ibis_types.NumericColumn, x).std(), window) @compile_unary_agg.register @numeric_op -def _(op: agg_ops.VarOp, x: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.VarOp, + x: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.Value: return _apply_window_if_present(cast(ibis_types.NumericColumn, x).var(), window) @compile_unary_agg.register @numeric_op -def _(op: agg_ops.PopVarOp, x: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.PopVarOp, + x: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.Value: return _apply_window_if_present( cast(ibis_types.NumericColumn, x).var(how="pop"), window ) @@ -233,13 +281,46 @@ def _(op: agg_ops.PopVarOp, x: ibis_types.Column, window=None) -> ibis_types.Val @compile_unary_agg.register def _( - op: agg_ops.CountOp, column: ibis_types.Column, window=None + op: agg_ops.CountOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.count(), window) @compile_unary_agg.register -def _(op: agg_ops.CutOp, x: ibis_types.Column, window=None): +def _( + op: agg_ops.ArrayAggOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.ArrayValue: + # BigQuery doesn't currently support using ARRAY_AGG with both window and aggregate + # functions simultaneously. Some aggregate functions (or its equivalent syntax) + # are more important, such as: + # - `IGNORE NULLS` are required to avoid an raised error if the final result + # contains a NULL element. + # - `ORDER BY` are required for the default ordering mode. + # To keep things simpler, windowing support is skipped for now. + if window is not None: + raise NotImplementedError( + f"ArrayAgg with windowing is not supported. {constants.FEEDBACK_LINK}" + ) + + return vendored_ibis_ops.ArrayAggregate( + column, + order_by_columns=agg_order_by, + ).to_expr() + + +@compile_unary_agg.register +def _( + op: agg_ops.CutOp, + x: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], +): out = ibis.case() if isinstance(op.bins, int): col_min = _apply_window_if_present(x.min(), window) @@ -292,7 +373,10 @@ def _(op: agg_ops.CutOp, x: ibis_types.Column, window=None): @compile_unary_agg.register @numeric_op def _( - self: agg_ops.QcutOp, column: ibis_types.Column, window=None + self: agg_ops.QcutOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: if isinstance(self.quantiles, int): quantiles_ibis = dtypes.literal_to_ibis_scalar(self.quantiles) @@ -322,21 +406,30 @@ def _( @compile_unary_agg.register def _( - op: agg_ops.NuniqueOp, column: ibis_types.Column, window=None + op: agg_ops.NuniqueOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.nunique(), window) @compile_unary_agg.register def _( - op: agg_ops.AnyValueOp, column: ibis_types.Column, window=None + op: agg_ops.AnyValueOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.arbitrary(), window) @compile_unary_agg.register def _( - op: agg_ops.RankOp, column: ibis_types.Column, window=None + op: agg_ops.RankOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: # Ibis produces 0-based ranks, while pandas creates 1-based ranks return _apply_window_if_present(ibis.rank(), window) + 1 @@ -344,7 +437,10 @@ def _( @compile_unary_agg.register def _( - op: agg_ops.DenseRankOp, column: ibis_types.Column, window=None + op: agg_ops.DenseRankOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: # Ibis produces 0-based ranks, while pandas creates 1-based ranks return _apply_window_if_present(column.dense_rank(), window) + 1 @@ -357,7 +453,10 @@ def _(op: agg_ops.FirstOp, column: ibis_types.Column, window=None) -> ibis_types @compile_unary_agg.register def _( - op: agg_ops.FirstNonNullOp, column: ibis_types.Column, window=None + op: agg_ops.FirstNonNullOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present( vendored_ibis_ops.FirstNonNullValue(column).to_expr(), window # type: ignore @@ -365,13 +464,21 @@ def _( @compile_unary_agg.register -def _(op: agg_ops.LastOp, column: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.LastOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.Value: return _apply_window_if_present(column.last(), window) @compile_unary_agg.register def _( - op: agg_ops.LastNonNullOp, column: ibis_types.Column, window=None + op: agg_ops.LastNonNullOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present( vendored_ibis_ops.LastNonNullValue(column).to_expr(), window # type: ignore @@ -379,7 +486,12 @@ def _( @compile_unary_agg.register -def _(op: agg_ops.ShiftOp, column: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.ShiftOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.Value: if op.periods == 0: # No-op return column if op.periods > 0: @@ -388,7 +500,12 @@ def _(op: agg_ops.ShiftOp, column: ibis_types.Column, window=None) -> ibis_types @compile_unary_agg.register -def _(op: agg_ops.DiffOp, column: ibis_types.Column, window=None) -> ibis_types.Value: +def _( + op: agg_ops.DiffOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.Value: shifted = compile_unary_agg(agg_ops.ShiftOp(op.periods), column, window) if column.type().is_boolean(): return cast(ibis_types.BooleanColumn, column) != cast( @@ -404,7 +521,10 @@ def _(op: agg_ops.DiffOp, column: ibis_types.Column, window=None) -> ibis_types. @compile_unary_agg.register def _( - op: agg_ops.AllOp, column: ibis_types.Column, window=None + op: agg_ops.AllOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.BooleanValue: # BQ will return null for empty column, result would be true in pandas. result = _is_true(column).all() @@ -416,7 +536,10 @@ def _( @compile_unary_agg.register def _( - op: agg_ops.AnyOp, column: ibis_types.Column, window=None + op: agg_ops.AnyOp, + column: ibis_types.Column, + window=None, + agg_order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.BooleanValue: # BQ will return null for empty column, result would be false in pandas. result = _is_true(column).any() diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index b57e0c4d35..445bbab3c5 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -305,13 +305,14 @@ def aggregate( aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]], by_column_ids: typing.Sequence[str] = (), dropna: bool = True, - ) -> OrderedIR: + ) -> UnorderedIR: """ Apply aggregations to the expression. Arguments: aggregations: input_column_id, operation, output_column_id tuples by_column_id: column id of the aggregation key, this is preserved through the transform dropna: whether null keys should be dropped + TODO: test UnorderedIR """ table = self._to_ibis_expr() bindings = {col: table[col] for col in self.column_ids} @@ -321,32 +322,18 @@ def aggregate( } if by_column_ids: result = table.group_by(by_column_ids).aggregate(**stats) - # Must have deterministic ordering, so order by the unique "by" column - ordering = ExpressionOrdering( - tuple([ascending_over(column_id) for column_id in by_column_ids]), - total_ordering_columns=frozenset(by_column_ids), - ) columns = tuple(result[key] for key in result.columns) - expr = OrderedIR(result, columns=columns, ordering=ordering) + expr = UnorderedIR(result, columns=columns) if dropna: for column_id in by_column_ids: expr = expr._filter(expr._get_ibis_column(column_id).notnull()) - # Can maybe remove this as Ordering id is redundant as by_column is unique after aggregation - return expr._project_offsets() + return expr else: aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} result = table.aggregate(**aggregates) - # Ordering is irrelevant for single-row output, but set ordering id regardless as other ops(join etc.) expect it. - # TODO: Maybe can make completely empty - ordering = ExpressionOrdering( - ordering_value_columns=tuple([]), - total_ordering_columns=frozenset([]), - ) - return OrderedIR( + return UnorderedIR( result, columns=[result[col_id] for col_id in [*stats.keys()]], - hidden_ordering_columns=[result[ORDER_ID_COLUMN]], - ordering=ordering, ) def _uniform_sampling(self, fraction: float) -> UnorderedIR: @@ -604,6 +591,60 @@ def reversed(self) -> OrderedIR: expr_builder.ordering = self._ordering.with_reverse() return expr_builder.build() + def aggregate( + self, + aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]], + by_column_ids: typing.Sequence[str] = (), + dropna: bool = True, + ) -> OrderedIR: + """ + Apply aggregations to the expression. + Arguments: + aggregations: input_column_id, operation, output_column_id tuples + by_column_id: column id of the aggregation key, this is preserved through the transform + dropna: whether null keys should be dropped + Returns: + OrderedIR + """ + table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True) + bindings = {col: table[col] for col in self.column_ids} + agg_order_by = [table[col] for col in self._hidden_ordering_column_names] + stats = { + col_out: agg_compiler.compile_aggregate( + aggregate, bindings, agg_order_by=agg_order_by + ) + for aggregate, col_out in aggregations + } + if by_column_ids: + result = table.group_by(by_column_ids).aggregate(**stats) + # Must have deterministic ordering, so order by the unique "by" column + ordering = ExpressionOrdering( + tuple([ascending_over(column_id) for column_id in by_column_ids]), + total_ordering_columns=frozenset(by_column_ids), + ) + columns = tuple(result[key] for key in result.columns) + expr = OrderedIR(result, columns=columns, ordering=ordering) + if dropna: + for column_id in by_column_ids: + expr = expr._filter(expr._get_ibis_column(column_id).notnull()) + return expr + else: + aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} + result = table.aggregate(**aggregates) + # Ordering is irrelevant for single-row output, but set ordering id + # regardless as other ops(join etc.) expect it. + # TODO: Maybe can make completely empty + ordering = ExpressionOrdering( + ordering_value_columns=tuple([]), + total_ordering_columns=frozenset([]), + ) + return OrderedIR( + result, + columns=[result[col_id] for col_id in [*stats.keys()]], + hidden_ordering_columns=[result[ORDER_ID_COLUMN]], + ordering=ordering, + ) + def _uniform_sampling(self, fraction: float) -> OrderedIR: """Sampling the table on given fraction. @@ -1069,7 +1110,8 @@ def _bake_ordering(self) -> OrderedIR: ) def _project_offsets(self) -> OrderedIR: - """Create a new expression that contains offsets. Should only be executed when offsets are needed for an operations. Has no effect on expression semantics.""" + """Create a new expression that contains offsets. Should only be executed when + offsets are needed for an operations. Has no effect on expression semantics.""" if self._ordering.is_sequential: return self table = self._to_ibis_expr( diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index a68023d13d..39f6741c79 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -155,10 +155,14 @@ def compile_rowcount(node: nodes.RowCountNode, ordered: bool = True): @_compile_node.register def compile_aggregate(node: nodes.AggregateNode, ordered: bool = True): - result = compile_unordered_ir(node.child).aggregate( - node.aggregations, node.by_column_ids, node.dropna - ) - return result if ordered else result.to_unordered() + if ordered: + return compile_ordered_ir(node.child).aggregate( + node.aggregations, node.by_column_ids, node.dropna + ) + else: + return compile_unordered_ir(node.child).aggregate( + node.aggregations, node.by_column_ids, node.dropna + ) @_compile_node.register @@ -180,10 +184,10 @@ def compile_reproject(node: nodes.ReprojectOpNode, ordered: bool = True): @_compile_node.register -def compiler_explode(node: nodes.ExplodeNode, ordered: bool = True): +def compile_explode(node: nodes.ExplodeNode, ordered: bool = True): return compile_node(node.child, ordered).explode(node.column_ids) @_compile_node.register -def compiler_random_sample(node: nodes.RandomSampleNode, ordered: bool = True): +def compile_random_sample(node: nodes.RandomSampleNode, ordered: bool = True): return compile_node(node.child, ordered)._uniform_sampling(node.fraction) diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index c57fac4112..509f0fa05e 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -115,7 +115,7 @@ class QuantileOp(UnaryAggregateOp): @property def name(self): - return f"{int(self.q*100)}%" + return f"{int(self.q * 100)}%" def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: return signatures.UNARY_REAL_NUMERIC.output_type(input_types[0]) @@ -127,7 +127,7 @@ class ApproxQuartilesOp(UnaryAggregateOp): @property def name(self): - return f"{self.quartile*25}%" + return f"{self.quartile * 25}%" def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: if not dtypes.is_orderable(input_types[0]): @@ -222,6 +222,20 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT ).output_type(input_types[0]) +@dataclasses.dataclass(frozen=True) +class ArrayAggOp(UnaryAggregateOp): + name: ClassVar[str] = "arrayagg" + + @property + def skips_nulls(self): + return True + + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + return pd.ArrowDtype( + pa.list_(dtypes.bigframes_dtype_to_arrow_dtype(input_types[0])) + ) + + @dataclasses.dataclass(frozen=True) class CutOp(UnaryWindowOp): # TODO: Unintuitive, refactor into multiple ops? diff --git a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py index 64ef05366d..58bd3b95bc 100644 --- a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py +++ b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py @@ -43,6 +43,19 @@ def _quantile(translator, op: ibis_reductions.Quantile): return f"PERCENTILE_CONT({arg}, {quantile})" +def _array_aggregate(translator, op: vendored_ibis_ops.ArrayAggregate): + arg = translator.translate(op.arg) + + order_by_sql = "" + if len(op.order_by_columns) > 0: + order_by_columns = ", ".join( + [translator.translate(column) for column in op.order_by_columns] + ) + order_by_sql = f"ORDER BY {order_by_columns}" + + return f"ARRAY_AGG({arg} IGNORE NULLS {order_by_sql})" + + patched_ops = { vendored_ibis_ops.ApproximateMultiQuantile: _approx_quantiles, # type:ignore vendored_ibis_ops.FirstNonNullValue: _first_non_null_value, # type:ignore @@ -51,6 +64,7 @@ def _quantile(translator, op: ibis_reductions.Quantile): vendored_ibis_ops.GenerateArray: _generate_array, # type:ignore vendored_ibis_ops.SafeCastToDatetime: _safe_cast_to_datetime, # type:ignore ibis_reductions.Quantile: _quantile, # type:ignore + vendored_ibis_ops.ArrayAggregate: _array_aggregate, # type:ignore } OPERATION_REGISTRY.update(patched_ops) diff --git a/third_party/bigframes_vendored/ibis/expr/operations/__init__.py b/third_party/bigframes_vendored/ibis/expr/operations/__init__.py index 3d5a5a7fa0..3ae5fc10e4 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/__init__.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/__init__.py @@ -2,6 +2,6 @@ from __future__ import annotations from bigframes_vendored.ibis.expr.operations.analytic import * # noqa: F401 F403 -from bigframes_vendored.ibis.expr.operations.generic import * # noqa: F401 F403 +from bigframes_vendored.ibis.expr.operations.arrays import * # noqa: F401 F403 from bigframes_vendored.ibis.expr.operations.json import * # noqa: F401 F403 from bigframes_vendored.ibis.expr.operations.reductions import * # noqa: F401 F403 diff --git a/third_party/bigframes_vendored/ibis/expr/operations/generic.py b/third_party/bigframes_vendored/ibis/expr/operations/arrays.py similarity index 64% rename from third_party/bigframes_vendored/ibis/expr/operations/generic.py rename to third_party/bigframes_vendored/ibis/expr/operations/arrays.py index 98acaacfbd..a0ad915a9b 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/generic.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/arrays.py @@ -1,4 +1,4 @@ -# Contains code from https://github.com/ibis-project/ibis/blob/master/ibis/expr/operations/generic.py +# Contains code from https://github.com/ibis-project/ibis/blob/master/ibis/expr/operations/arrays.py from __future__ import annotations import ibis.expr.datatypes as dt @@ -6,6 +6,11 @@ class GenerateArray(Unary): + """ + Generates an array of values, similar to ibis.range(), but with simpler and + more efficient SQL generation. + """ + dtype = dt.Array(dt.int64) diff --git a/third_party/bigframes_vendored/ibis/expr/operations/reductions.py b/third_party/bigframes_vendored/ibis/expr/operations/reductions.py index e6644f477a..341b1c182a 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/reductions.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/reductions.py @@ -2,6 +2,8 @@ from __future__ import annotations +import ibis.common.annotations as ibis_annotations +from ibis.common.typing import VarTuple import ibis.expr.datatypes as dt import ibis.expr.operations.core as ibis_ops_core from ibis.expr.operations.reductions import Filterable, Reduction @@ -18,6 +20,18 @@ class ApproximateMultiQuantile(Filterable, Reduction): dtype = dt.Array(dt.float64) -__all__ = [ - "ApproximateMultiQuantile", -] +class ArrayAggregate(Filterable, Reduction): + """ + Collects the elements of this expression into an ordered array. Similar to + the ibis `ArrayCollect`, but adds `order_by_*` and `distinct_only` parameters. + """ + + arg: ibis_ops_core.Column + order_by_columns: VarTuple[ibis_ops_core.Value] = () + + @ibis_annotations.attribute + def dtype(self): + return dt.Array(self.arg.dtype) + + +__all__ = ["ApproximateMultiQuantile", "ArrayAggregate"] From 3097c8a075409ed6c5eaf1749f61db3c9115fe0f Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Tue, 7 May 2024 23:55:23 +0000 Subject: [PATCH 02/16] supports dataframegroupby and adds tests/docs --- bigframes/bigquery/__init__.py | 58 +++++++++++++++++- tests/system/small/bigquery/test_array.py | 75 +++++++++++++++++++++-- 2 files changed, 127 insertions(+), 6 deletions(-) diff --git a/bigframes/bigquery/__init__.py b/bigframes/bigquery/__init__.py index e03d40eda6..623c7a300a 100644 --- a/bigframes/bigquery/__init__.py +++ b/bigframes/bigquery/__init__.py @@ -22,11 +22,13 @@ import typing +import bigframes.constants as constants import bigframes.core.groupby as groupby import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops if typing.TYPE_CHECKING: + import bigframes.dataframe as dataframe import bigframes.series as series @@ -54,6 +56,10 @@ def array_length(series: series.Series) -> series.Series: 2 2 dtype: Int64 + Args: + series (bigframes.series.Series): + A Series with array columns. + Returns: bigframes.series.Series: A Series of integer values indicating the length of each element in the Series. @@ -62,5 +68,53 @@ def array_length(series: series.Series) -> series.Series: return series._apply_unary_op(ops.len_op) -def array_agg(groupby_series: groupby.SeriesGroupBy) -> series.Series: - return groupby_series._aggregate(agg_ops.ArrayAggOp()) +def array_agg( + obj: groupby.SeriesGroupBy | groupby.DataFrameGroupBy, +) -> series.Series | dataframe.DataFrame: + """Group data and create arrays from selected columns, omitting NULLs to avoid + BigQuery errors (NULLs not allowed in arrays). + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> import bigframes.bigquery as bbq + >>> bpd.options.display.progress_bar = None + + For a SeriesGroupBy object: + + >>> lst = ['a', 'a', 'b', 'b', 'a'] + >>> s = bpd.Series([1, 2, 3, 4, np.nan], index=lst) + >>> bbq.array_agg(s.groupby(level=0)) + a [1. 2.] + b [3. 4.] + dtype: list[pyarrow] + + For a DataFrameGroupBy object: + + >>> l = [[1, 2, 3], [1, None, 4], [2, 1, 3], [1, 2, 2]] + >>> df = bpd.DataFrame(l, columns=["a", "b", "c"]) + >>> bbq.array_agg(df.groupby(by=["b"])) + b a c + 1.0 [2] [3] + 2.0 [1 1] [3 2] + 2 rows × 2 columns + + [2 rows x 2 columns in total] + + Args: + obj (groupby.SeriesGroupBy | groupby.DataFrameGroupBy): + A GroupBy object to be applied the function. + + Returns: + bigframes.series.Series | bigframes.dataframe.DataFrame: A Series or + DataFrame containing aggregated array columns, and indexed by the + original group columns. + """ + if isinstance(obj, groupby.SeriesGroupBy): + return obj._aggregate(agg_ops.ArrayAggOp()) + elif isinstance(obj, groupby.DataFrameGroupBy): + return obj._aggregate_all(agg_ops.ArrayAggOp(), numeric_only=False) + else: + raise ValueError( + f"Unsupported type {type(obj)} to apply `array_agg` function. {constants.FEEDBACK_LINK}" + ) diff --git a/tests/system/small/bigquery/test_array.py b/tests/system/small/bigquery/test_array.py index a91669cd88..a5bd50a7e7 100644 --- a/tests/system/small/bigquery/test_array.py +++ b/tests/system/small/bigquery/test_array.py @@ -14,6 +14,7 @@ import numpy as np import pandas as pd +import pytest import bigframes.bigquery as bbq import bigframes.pandas as bpd @@ -23,10 +24,76 @@ def test_array_length(): series = bpd.Series([["A", "AA", "AAA"], ["BB", "B"], np.nan, [], ["C"]]) # TODO(b/336880368): Allow for NULL values to be input for ARRAY columns. # Once we actually store NULL values, this will be NULL where the input is NULL. - expected = pd.Series([3, 2, 0, 0, 1]) + expected = bpd.Series([3, 2, 0, 0, 1]) pd.testing.assert_series_equal( bbq.array_length(series).to_pandas(), - expected, - check_dtype=False, - check_index_type=False, + expected.to_pandas(), + ) + + +@pytest.mark.parametrize( + ("input_data", "output_data"), + [ + pytest.param([1, 2, 3, 4, 5], [[1, 2], [3, 4], [5]], id="ints"), + pytest.param( + ["e", "d", "c", "b", "a"], + [["e", "d"], ["c", "b"], ["a"]], + id="reverse_strings", + ), + pytest.param( + [1.0, 2.0, np.nan, np.nan, np.nan], [[1.0, 2.0], [], []], id="nans" + ), + pytest.param( + [{"A": {"x": 1.0}}, {"A": {"z": 4.0}}, {}, {"B": "b"}, np.nan], + [[{"A": {"x": 1.0}}, {"A": {"z": 4.0}}], [{}, {"B": "b"}], []], + id="structs", + ), + ], +) +def test_array_agg_w_series(input_data, output_data): + input_index = ["a", "a", "b", "b", "c"] + series = bpd.Series(input_data, index=input_index) + result = bbq.array_agg(series.groupby(level=0)) + + expected = bpd.Series(output_data, index=["a", "b", "c"]) + pd.testing.assert_series_equal( + result.to_pandas(), + expected.to_pandas(), + ) + + +def test_array_agg_w_dataframe(): + data = { + "a": [1, 1, 2, 1], + "b": [2, None, 1, 2], + "c": [3, 4, 3, 2], + } + df = bpd.DataFrame(data) + result = bbq.array_agg(df.groupby(by=["b"])) + + expected_data = { + "b": [1.0, 2.0], + "a": [[2], [1, 1]], + "c": [[3], [3, 2]], + } + expected = bpd.DataFrame(expected_data).set_index("b") + + pd.testing.assert_frame_equal( + result.to_pandas(), + expected.to_pandas(), + ) + +def assert_array_agg_matches_after_explode(): + data = { + "index": np.arange(10), + "a": [np.random.randint(0, 10, 10) for _ in range(10)], + "b": [np.random.randint(0, 10, 10) for _ in range(10)], + } + df = bpd.DataFrame(data).set_index("index") + result = bbq.array_agg(df.explode(["a", "b"]).groupby(level=0)) + result.index.name = "index" + + pd.testing.assert_frame_equal( + result.to_pandas(), + df.to_pandas(), ) From 620adc1b0644e8c752b7160c74a98292ca7b50ec Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Wed, 8 May 2024 16:56:36 +0000 Subject: [PATCH 03/16] fix order_by syntax --- bigframes/core/compile/aggregate_compiler.py | 62 +++++++++---------- bigframes/core/compile/compiled.py | 3 +- tests/system/small/bigquery/test_array.py | 34 ++++++++++ .../ibis/backends/bigquery/registry.py | 8 +-- .../ibis/expr/operations/reductions.py | 2 +- 5 files changed, 70 insertions(+), 39 deletions(-) diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index 4147cbbfcd..88ae5863a1 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -34,11 +34,11 @@ def compile_aggregate( aggregate: ex.Aggregation, bindings: typing.Dict[str, ibis_types.Value], - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: if isinstance(aggregate, ex.UnaryAggregation): input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings) - return compile_unary_agg(aggregate.op, input, agg_order_by=agg_order_by) + return compile_unary_agg(aggregate.op, input, order_by=order_by) elif isinstance(aggregate, ex.BinaryAggregation): left = scalar_compiler.compile_expression(aggregate.left, bindings=bindings) right = scalar_compiler.compile_expression(aggregate.right, bindings=bindings) @@ -76,7 +76,7 @@ def compile_unary_agg( op: agg_ops.WindowOp, input: ibis_types.Column, window: Optional[window_spec.WindowSpec] = None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: raise ValueError(f"Can't compile unrecognized operation: {op}") @@ -87,7 +87,7 @@ def constrained_op( op, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ): if column.type().is_boolean(): column = typing.cast( @@ -112,7 +112,7 @@ def _( op: agg_ops.SumOp, column: ibis_types.NumericColumn, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # Will be null if all inputs are null. Pandas defaults to zero sum though. bq_sum = _apply_window_if_present(column.sum(), window) @@ -127,7 +127,7 @@ def _( op: agg_ops.MedianOp, column: ibis_types.NumericColumn, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # PERCENTILE_CONT has very few allowed windows. For example, "window # framing clause is not allowed for analytic function percentile_cont". @@ -148,7 +148,7 @@ def _( op: agg_ops.ApproxQuartilesOp, column: ibis_types.NumericColumn, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # PERCENTILE_CONT has very few allowed windows. For example, "window # framing clause is not allowed for analytic function percentile_cont". @@ -168,7 +168,7 @@ def _( op: agg_ops.QuantileOp, column: ibis_types.NumericColumn, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: return _apply_window_if_present(column.quantile(op.q), window) @@ -179,7 +179,7 @@ def _( op: agg_ops.MeanOp, column: ibis_types.NumericColumn, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: return _apply_window_if_present(column.mean(), window) @@ -190,7 +190,7 @@ def _( op: agg_ops.ProductOp, column: ibis_types.NumericColumn, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # Need to short-circuit as log with zeroes is illegal sql is_zero = cast(ibis_types.BooleanColumn, (column == 0)) @@ -229,7 +229,7 @@ def _( op: agg_ops.MaxOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present(column.max(), window) @@ -239,7 +239,7 @@ def _( op: agg_ops.MinOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present(column.min(), window) @@ -250,7 +250,7 @@ def _( op: agg_ops.StdOp, x: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present(cast(ibis_types.NumericColumn, x).std(), window) @@ -261,7 +261,7 @@ def _( op: agg_ops.VarOp, x: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present(cast(ibis_types.NumericColumn, x).var(), window) @@ -272,7 +272,7 @@ def _( op: agg_ops.PopVarOp, x: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present( cast(ibis_types.NumericColumn, x).var(how="pop"), window @@ -284,7 +284,7 @@ def _( op: agg_ops.CountOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.count(), window) @@ -294,7 +294,7 @@ def _( op: agg_ops.ArrayAggOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.ArrayValue: # BigQuery doesn't currently support using ARRAY_AGG with both window and aggregate # functions simultaneously. Some aggregate functions (or its equivalent syntax) @@ -310,7 +310,7 @@ def _( return vendored_ibis_ops.ArrayAggregate( column, - order_by_columns=agg_order_by, + order_by=order_by, ).to_expr() @@ -319,7 +319,7 @@ def _( op: agg_ops.CutOp, x: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ): out = ibis.case() if isinstance(op.bins, int): @@ -376,7 +376,7 @@ def _( self: agg_ops.QcutOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: if isinstance(self.quantiles, int): quantiles_ibis = dtypes.literal_to_ibis_scalar(self.quantiles) @@ -409,7 +409,7 @@ def _( op: agg_ops.NuniqueOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.nunique(), window) @@ -419,7 +419,7 @@ def _( op: agg_ops.AnyValueOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.arbitrary(), window) @@ -429,7 +429,7 @@ def _( op: agg_ops.RankOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: # Ibis produces 0-based ranks, while pandas creates 1-based ranks return _apply_window_if_present(ibis.rank(), window) + 1 @@ -440,7 +440,7 @@ def _( op: agg_ops.DenseRankOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: # Ibis produces 0-based ranks, while pandas creates 1-based ranks return _apply_window_if_present(column.dense_rank(), window) + 1 @@ -456,7 +456,7 @@ def _( op: agg_ops.FirstNonNullOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present( vendored_ibis_ops.FirstNonNullValue(column).to_expr(), window # type: ignore @@ -468,7 +468,7 @@ def _( op: agg_ops.LastOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present(column.last(), window) @@ -478,7 +478,7 @@ def _( op: agg_ops.LastNonNullOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present( vendored_ibis_ops.LastNonNullValue(column).to_expr(), window # type: ignore @@ -490,7 +490,7 @@ def _( op: agg_ops.ShiftOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: if op.periods == 0: # No-op return column @@ -504,7 +504,7 @@ def _( op: agg_ops.DiffOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: shifted = compile_unary_agg(agg_ops.ShiftOp(op.periods), column, window) if column.type().is_boolean(): @@ -524,7 +524,7 @@ def _( op: agg_ops.AllOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.BooleanValue: # BQ will return null for empty column, result would be true in pandas. result = _is_true(column).all() @@ -539,7 +539,7 @@ def _( op: agg_ops.AnyOp, column: ibis_types.Column, window=None, - agg_order_by: typing.Sequence[ibis_types.Value] = [], + order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.BooleanValue: # BQ will return null for empty column, result would be false in pandas. result = _is_true(column).any() diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 445bbab3c5..62dae2641e 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -608,10 +608,9 @@ def aggregate( """ table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True) bindings = {col: table[col] for col in self.column_ids} - agg_order_by = [table[col] for col in self._hidden_ordering_column_names] stats = { col_out: agg_compiler.compile_aggregate( - aggregate, bindings, agg_order_by=agg_order_by + aggregate, bindings, order_by=self._ibis_order ) for aggregate, col_out in aggregations } diff --git a/tests/system/small/bigquery/test_array.py b/tests/system/small/bigquery/test_array.py index a5bd50a7e7..5dc91de110 100644 --- a/tests/system/small/bigquery/test_array.py +++ b/tests/system/small/bigquery/test_array.py @@ -83,6 +83,40 @@ def test_array_agg_w_dataframe(): expected.to_pandas(), ) + +@pytest.mark.parametrize( + ("ascending", "expected_b", "expected_c"), + [ + pytest.param( + True, [["a", "b"], ["e", "d", "c"]], [[4, 5], [1, 2, 3]], id="asc" + ), + pytest.param( + False, [["b", "a"], ["c", "d", "e"]], [[5, 4], [3, 2, 1]], id="des" + ), + ], +) +def test_array_agg_reserve_order(ascending, expected_b, expected_c): + data = { + "a": [1, 1, 2, 2, 2], + "b": ["a", "b", "c", "d", "e"], + "c": [4, 5, 3, 2, 1], + } + df = bpd.DataFrame(data) + + result = bbq.array_agg(df.sort_values("c", ascending=ascending).groupby(by=["a"])) + expected_data = { + "a": [1, 2], + "b": expected_b, + "c": expected_c, + } + expected = bpd.DataFrame(expected_data).set_index("a") + + pd.testing.assert_frame_equal( + result.to_pandas(), + expected.to_pandas(), + ) + + def assert_array_agg_matches_after_explode(): data = { "index": np.arange(10), diff --git a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py index 58bd3b95bc..71e2b428df 100644 --- a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py +++ b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py @@ -47,11 +47,9 @@ def _array_aggregate(translator, op: vendored_ibis_ops.ArrayAggregate): arg = translator.translate(op.arg) order_by_sql = "" - if len(op.order_by_columns) > 0: - order_by_columns = ", ".join( - [translator.translate(column) for column in op.order_by_columns] - ) - order_by_sql = f"ORDER BY {order_by_columns}" + if len(op.order_by) > 0: + order_by = ", ".join([translator.translate(column) for column in op.order_by]) + order_by_sql = f"ORDER BY {order_by}" return f"ARRAY_AGG({arg} IGNORE NULLS {order_by_sql})" diff --git a/third_party/bigframes_vendored/ibis/expr/operations/reductions.py b/third_party/bigframes_vendored/ibis/expr/operations/reductions.py index 341b1c182a..bd971e408a 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/reductions.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/reductions.py @@ -27,7 +27,7 @@ class ArrayAggregate(Filterable, Reduction): """ arg: ibis_ops_core.Column - order_by_columns: VarTuple[ibis_ops_core.Value] = () + order_by: VarTuple[ibis_ops_core.Value] = () @ibis_annotations.attribute def dtype(self): From 281bf2d0fe3c4f2f5676c3385bc4b1d969d37676 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Wed, 8 May 2024 17:02:34 +0000 Subject: [PATCH 04/16] remove debug notes --- bigframes/core/compile/aggregate_compiler.py | 1 - bigframes/core/compile/compiled.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index 88ae5863a1..961b6fb7cc 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -52,7 +52,6 @@ def compile_analytic( window: window_spec.WindowSpec, bindings: typing.Dict[str, ibis_types.Value], ) -> ibis_types.Value: - # TODO: check how to trigger here! if isinstance(aggregate, ex.UnaryAggregation): input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings) return compile_unary_agg(aggregate.op, input, window) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 62dae2641e..8cb19ec0b8 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -312,7 +312,8 @@ def aggregate( aggregations: input_column_id, operation, output_column_id tuples by_column_id: column id of the aggregation key, this is preserved through the transform dropna: whether null keys should be dropped - TODO: test UnorderedIR + Returns: + UnorderedIR """ table = self._to_ibis_expr() bindings = {col: table[col] for col in self.column_ids} From 0719248750b3da4bfa2b6e531973e7b1a3a9016a Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Wed, 8 May 2024 17:35:52 +0000 Subject: [PATCH 05/16] address comments offline --- bigframes/core/compile/aggregate_compiler.py | 4 ++- bigframes/core/compile/compiled.py | 33 +++++++++++++++----- bigframes/operations/aggregations.py | 8 +++++ 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index 961b6fb7cc..7122da227a 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -38,7 +38,9 @@ def compile_aggregate( ) -> ibis_types.Value: if isinstance(aggregate, ex.UnaryAggregation): input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings) - return compile_unary_agg(aggregate.op, input, order_by=order_by) + return compile_unary_agg( + aggregate.op, input, order_by=order_by if aggregate.op.can_order_by else [] + ) elif isinstance(aggregate, ex.BinaryAggregation): left = scalar_compiler.compile_expression(aggregate.left, bindings=bindings) right = scalar_compiler.compile_expression(aggregate.right, bindings=bindings) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 8cb19ec0b8..5a874bc6d1 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -305,15 +305,17 @@ def aggregate( aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]], by_column_ids: typing.Sequence[str] = (), dropna: bool = True, - ) -> UnorderedIR: + ) -> OrderedIR: """ Apply aggregations to the expression. Arguments: aggregations: input_column_id, operation, output_column_id tuples - by_column_id: column id of the aggregation key, this is preserved through the transform + by_column_id: column id of the aggregation key, this is preserved through + the transform dropna: whether null keys should be dropped Returns: - UnorderedIR + OrderedIR: the grouping key is a unique-valued column and has ordering + information. """ table = self._to_ibis_expr() bindings = {col: table[col] for col in self.column_ids} @@ -323,8 +325,13 @@ def aggregate( } if by_column_ids: result = table.group_by(by_column_ids).aggregate(**stats) + # Must have deterministic ordering, so order by the unique "by" column + ordering = ExpressionOrdering( + tuple([ascending_over(column_id) for column_id in by_column_ids]), + total_ordering_columns=frozenset(by_column_ids), + ) columns = tuple(result[key] for key in result.columns) - expr = UnorderedIR(result, columns=columns) + expr = OrderedIR(result, columns=columns, ordering=ordering) if dropna: for column_id in by_column_ids: expr = expr._filter(expr._get_ibis_column(column_id).notnull()) @@ -332,9 +339,18 @@ def aggregate( else: aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} result = table.aggregate(**aggregates) + # Ordering is irrelevant for single-row output, but set ordering id regardless + # as other ops(join etc.) expect it. + # TODO: Maybe can make completely empty + ordering = ExpressionOrdering( + ordering_value_columns=tuple([]), + total_ordering_columns=frozenset([]), + ) return UnorderedIR( result, columns=[result[col_id] for col_id in [*stats.keys()]], + hidden_ordering_columns=[result[ORDER_ID_COLUMN]], + ordering=ordering, ) def _uniform_sampling(self, fraction: float) -> UnorderedIR: @@ -523,7 +539,8 @@ def from_pandas( """ Builds an in-memory only (SQL only) expr from a pandas dataframe. - Assumed that the dataframe has unique string column names and bigframes-suppported dtypes. + Assumed that the dataframe has unique string column names and bigframes-suppported + dtypes. """ # ibis memtable cannot handle NA, must convert to None @@ -560,7 +577,8 @@ def _hidden_column_ids(self) -> typing.Sequence[str]: @property def _ibis_order(self) -> Sequence[ibis_types.Value]: - """Returns a sequence of ibis values which can be directly used to order a table expression. Has direction modifiers applied.""" + """Returns a sequence of ibis values which can be directly used to order a + table expression. Has direction modifiers applied.""" return _convert_ordering_to_table_values( {**self._column_names, **self._hidden_ordering_column_names}, self._ordering.all_ordering_columns, @@ -602,7 +620,8 @@ def aggregate( Apply aggregations to the expression. Arguments: aggregations: input_column_id, operation, output_column_id tuples - by_column_id: column id of the aggregation key, this is preserved through the transform + by_column_id: column id of the aggregation key, this is preserved through + the transform dropna: whether null keys should be dropped Returns: OrderedIR diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 509f0fa05e..2a911b34c0 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -52,6 +52,10 @@ def arguments(self) -> int: def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: return input_types[0] + @property + def can_order_by(self): + return False + @dataclasses.dataclass(frozen=True) class AggregateOp(WindowOp): @@ -226,6 +230,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT class ArrayAggOp(UnaryAggregateOp): name: ClassVar[str] = "arrayagg" + @property + def can_order_by(self): + return True + @property def skips_nulls(self): return True From 5da5040f1362b5730b843ff8940fd8896ecc3a13 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Wed, 8 May 2024 22:11:37 +0000 Subject: [PATCH 06/16] fix mypy --- bigframes/core/compile/compiled.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 5a874bc6d1..da7f67ea08 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -346,7 +346,7 @@ def aggregate( ordering_value_columns=tuple([]), total_ordering_columns=frozenset([]), ) - return UnorderedIR( + return OrderedIR( result, columns=[result[col_id] for col_id in [*stats.keys()]], hidden_ordering_columns=[result[ORDER_ID_COLUMN]], From dc3ce316a5c09a547bbe3e064f4a7217534f1be4 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Wed, 8 May 2024 22:28:01 +0000 Subject: [PATCH 07/16] fixing --- bigframes/core/compile/compiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 39f6741c79..8b8b5ab08a 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -162,7 +162,7 @@ def compile_aggregate(node: nodes.AggregateNode, ordered: bool = True): else: return compile_unordered_ir(node.child).aggregate( node.aggregations, node.by_column_ids, node.dropna - ) + ).to_unordered() @_compile_node.register From 444a63e17bf237a4e6094464e9f0b5c1d9171a08 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Wed, 8 May 2024 22:47:18 +0000 Subject: [PATCH 08/16] fixing --- bigframes/core/compile/compiler.py | 10 +++++++--- bigframes/operations/aggregations.py | 8 ++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 8b8b5ab08a..812abdcca8 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -155,14 +155,18 @@ def compile_rowcount(node: nodes.RowCountNode, ordered: bool = True): @_compile_node.register def compile_aggregate(node: nodes.AggregateNode, ordered: bool = True): - if ordered: + has_orderred_aggregation_ops = any( + aggregate.op.can_order_by for aggregate, _ in node.aggregations + ) + if ordered and has_orderred_aggregation_ops: return compile_ordered_ir(node.child).aggregate( node.aggregations, node.by_column_ids, node.dropna ) else: - return compile_unordered_ir(node.child).aggregate( + result = compile_unordered_ir(node.child).aggregate( node.aggregations, node.by_column_ids, node.dropna - ).to_unordered() + ) + return result if ordered else result.to_unordered() @_compile_node.register diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 2a911b34c0..3b5310554b 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -38,6 +38,10 @@ def uses_total_row_ordering(self): """Whether the operator needs total row ordering. (eg. lead, lag, array_agg)""" return False + @property + def can_order_by(self): + return False + @abc.abstractmethod def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: ... @@ -52,10 +56,6 @@ def arguments(self) -> int: def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: return input_types[0] - @property - def can_order_by(self): - return False - @dataclasses.dataclass(frozen=True) class AggregateOp(WindowOp): From 1a2be5a8a774a723f26e68873bd6d034c8909d7b Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Fri, 10 May 2024 19:59:39 +0000 Subject: [PATCH 09/16] remove redundant codes --- bigframes/core/compile/compiled.py | 147 +++++++++--------- bigframes/core/compile/compiler.py | 4 +- .../ibis/backends/bigquery/registry.py | 4 + 3 files changed, 81 insertions(+), 74 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index da7f67ea08..08d1d5ea49 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -102,6 +102,12 @@ def _reduced_predicate(self) -> typing.Optional[ibis_types.BooleanValue]: def _ibis_bindings(self) -> dict[str, ibis_types.Value]: return {col: self._get_ibis_column(col) for col in self.column_ids} + @property + @abc.abstractmethod + def is_ordered_ir(self: T) -> bool: + """Whether it is a OrderedIR or UnorderedIR.""" + ... + @abc.abstractmethod def filter(self: T, predicate: ex.Expression) -> T: """Filter the table on a given expression, the predicate must be a boolean expression.""" @@ -163,6 +169,54 @@ def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type), ) + def _aggregate_helper( + self, + table: ibis_types.Table, + order_by: typing.Sequence[ibis_types.Value] = [], + aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]] = [], + by_column_ids: typing.Sequence[str] = (), + dropna: bool = True, + ) -> OrderedIR: + if not self.is_ordered_ir and len(order_by) > 0: + raise ValueError("Cannot apply 'order_by' to an UnorderedIR instance.") + + bindings = {col: table[col] for col in self.column_ids} + stats = { + col_out: agg_compiler.compile_aggregate( + aggregate, bindings, order_by=order_by + ) + for aggregate, col_out in aggregations + } + if by_column_ids: + result = table.group_by(by_column_ids).aggregate(**stats) + # Must have deterministic ordering, so order by the unique "by" column + ordering = ExpressionOrdering( + tuple([ascending_over(column_id) for column_id in by_column_ids]), + total_ordering_columns=frozenset(by_column_ids), + ) + columns = tuple(result[key] for key in result.columns) + expr = OrderedIR(result, columns=columns, ordering=ordering) + if dropna: + for column_id in by_column_ids: + expr = expr._filter(expr._get_ibis_column(column_id).notnull()) + return expr + else: + aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} + result = table.aggregate(**aggregates) + # Ordering is irrelevant for single-row output, but set ordering id regardless + # as other ops(join etc.) expect it. + # TODO: Maybe can make completely empty + ordering = ExpressionOrdering( + ordering_value_columns=tuple([]), + total_ordering_columns=frozenset([]), + ) + return OrderedIR( + result, + columns=[result[col_id] for col_id in [*stats.keys()]], + hidden_ordering_columns=[result[ORDER_ID_COLUMN]], + ordering=ordering, + ) + # Ibis Implementations class UnorderedIR(BaseIbisIR): @@ -174,6 +228,10 @@ def __init__( ): super().__init__(table, columns, predicates) + @property + def is_ordered_ir(self) -> bool: + return False + def builder(self): """Creates a mutable builder for expressions.""" # Since ArrayValue is intended to be immutable (immutability offers @@ -310,7 +368,7 @@ def aggregate( Apply aggregations to the expression. Arguments: aggregations: input_column_id, operation, output_column_id tuples - by_column_id: column id of the aggregation key, this is preserved through + by_column_ids: column ids of the aggregation key, this is preserved through the transform dropna: whether null keys should be dropped Returns: @@ -318,40 +376,9 @@ def aggregate( information. """ table = self._to_ibis_expr() - bindings = {col: table[col] for col in self.column_ids} - stats = { - col_out: agg_compiler.compile_aggregate(aggregate, bindings) - for aggregate, col_out in aggregations - } - if by_column_ids: - result = table.group_by(by_column_ids).aggregate(**stats) - # Must have deterministic ordering, so order by the unique "by" column - ordering = ExpressionOrdering( - tuple([ascending_over(column_id) for column_id in by_column_ids]), - total_ordering_columns=frozenset(by_column_ids), - ) - columns = tuple(result[key] for key in result.columns) - expr = OrderedIR(result, columns=columns, ordering=ordering) - if dropna: - for column_id in by_column_ids: - expr = expr._filter(expr._get_ibis_column(column_id).notnull()) - return expr - else: - aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} - result = table.aggregate(**aggregates) - # Ordering is irrelevant for single-row output, but set ordering id regardless - # as other ops(join etc.) expect it. - # TODO: Maybe can make completely empty - ordering = ExpressionOrdering( - ordering_value_columns=tuple([]), - total_ordering_columns=frozenset([]), - ) - return OrderedIR( - result, - columns=[result[col_id] for col_id in [*stats.keys()]], - hidden_ordering_columns=[result[ORDER_ID_COLUMN]], - ordering=ordering, - ) + return self._aggregate_helper( + table, aggregations=aggregations, by_column_ids=by_column_ids, dropna=dropna + ) def _uniform_sampling(self, fraction: float) -> UnorderedIR: """Sampling the table on given fraction. @@ -530,6 +557,10 @@ def __init__( if not ordering_valid: raise ValueError(f"Illegal ordering keys: {ordering.all_ordering_columns}") + @property + def is_ordered_ir(self) -> bool: + return True + @classmethod def from_pandas( cls, @@ -620,49 +651,21 @@ def aggregate( Apply aggregations to the expression. Arguments: aggregations: input_column_id, operation, output_column_id tuples - by_column_id: column id of the aggregation key, this is preserved through + by_column_ids: column ids of the aggregation key, this is preserved through the transform dropna: whether null keys should be dropped Returns: OrderedIR """ table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True) - bindings = {col: table[col] for col in self.column_ids} - stats = { - col_out: agg_compiler.compile_aggregate( - aggregate, bindings, order_by=self._ibis_order - ) - for aggregate, col_out in aggregations - } - if by_column_ids: - result = table.group_by(by_column_ids).aggregate(**stats) - # Must have deterministic ordering, so order by the unique "by" column - ordering = ExpressionOrdering( - tuple([ascending_over(column_id) for column_id in by_column_ids]), - total_ordering_columns=frozenset(by_column_ids), - ) - columns = tuple(result[key] for key in result.columns) - expr = OrderedIR(result, columns=columns, ordering=ordering) - if dropna: - for column_id in by_column_ids: - expr = expr._filter(expr._get_ibis_column(column_id).notnull()) - return expr - else: - aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} - result = table.aggregate(**aggregates) - # Ordering is irrelevant for single-row output, but set ordering id - # regardless as other ops(join etc.) expect it. - # TODO: Maybe can make completely empty - ordering = ExpressionOrdering( - ordering_value_columns=tuple([]), - total_ordering_columns=frozenset([]), - ) - return OrderedIR( - result, - columns=[result[col_id] for col_id in [*stats.keys()]], - hidden_ordering_columns=[result[ORDER_ID_COLUMN]], - ordering=ordering, - ) + order_by = self._ibis_order + return self._aggregate_helper( + table, + order_by=order_by, + aggregations=aggregations, + by_column_ids=by_column_ids, + dropna=dropna, + ) def _uniform_sampling(self, fraction: float) -> OrderedIR: """Sampling the table on given fraction. diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 812abdcca8..a9908192f3 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -155,10 +155,10 @@ def compile_rowcount(node: nodes.RowCountNode, ordered: bool = True): @_compile_node.register def compile_aggregate(node: nodes.AggregateNode, ordered: bool = True): - has_orderred_aggregation_ops = any( + has_ordered_aggregation_ops = any( aggregate.op.can_order_by for aggregate, _ in node.aggregations ) - if ordered and has_orderred_aggregation_ops: + if ordered and has_ordered_aggregation_ops: return compile_ordered_ir(node.child).aggregate( node.aggregations, node.by_column_ids, node.dropna ) diff --git a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py index 71e2b428df..ecef2115e5 100644 --- a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py +++ b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py @@ -44,6 +44,10 @@ def _quantile(translator, op: ibis_reductions.Quantile): def _array_aggregate(translator, op: vendored_ibis_ops.ArrayAggregate): + """This method provides the same functionality as the collect() method in Ibis, with + the added capability of ordering the results using order_by. + https://github.com/ibis-project/ibis/issues/9170 + """ arg = translator.translate(op.arg) order_by_sql = "" From 74b6a5bb02720f417aca16d10531437c220d6921 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Tue, 14 May 2024 18:35:50 +0000 Subject: [PATCH 10/16] fix docs --- bigframes/bigquery/__init__.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/bigframes/bigquery/__init__.py b/bigframes/bigquery/__init__.py index 623c7a300a..e9d2abc7ca 100644 --- a/bigframes/bigquery/__init__.py +++ b/bigframes/bigquery/__init__.py @@ -78,6 +78,7 @@ def array_agg( >>> import bigframes.pandas as bpd >>> import bigframes.bigquery as bbq + >>> import numpy as np >>> bpd.options.display.progress_bar = None For a SeriesGroupBy object: @@ -94,12 +95,12 @@ def array_agg( >>> l = [[1, 2, 3], [1, None, 4], [2, 1, 3], [1, 2, 2]] >>> df = bpd.DataFrame(l, columns=["a", "b", "c"]) >>> bbq.array_agg(df.groupby(by=["b"])) - b a c - 1.0 [2] [3] - 2.0 [1 1] [3 2] - 2 rows × 2 columns - - [2 rows x 2 columns in total] + a c + b + 1.0 [2] [3] + 2.0 [1 1] [3 2] + + [2 rows x 2 columns] Args: obj (groupby.SeriesGroupBy | groupby.DataFrameGroupBy): From e38b31641eebd68e9ca06ddb8b355f3494d7d184 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Tue, 14 May 2024 22:33:28 +0000 Subject: [PATCH 11/16] address comment --- bigframes/core/compile/compiled.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 08d1d5ea49..9b30091f05 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -169,7 +169,7 @@ def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_type), ) - def _aggregate_helper( + def _aggregate_base( self, table: ibis_types.Table, order_by: typing.Sequence[ibis_types.Value] = [], @@ -376,7 +376,7 @@ def aggregate( information. """ table = self._to_ibis_expr() - return self._aggregate_helper( + return self._aggregate_base( table, aggregations=aggregations, by_column_ids=by_column_ids, dropna=dropna ) @@ -659,7 +659,7 @@ def aggregate( """ table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True) order_by = self._ibis_order - return self._aggregate_helper( + return self._aggregate_base( table, order_by=order_by, aggregations=aggregations, From 866266d8b67b07a1aeb1308acf33848ec33e0528 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Tue, 14 May 2024 22:50:40 +0000 Subject: [PATCH 12/16] increase test coverage --- tests/system/small/bigquery/test_array.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/system/small/bigquery/test_array.py b/tests/system/small/bigquery/test_array.py index 5dc91de110..0c4e4076e3 100644 --- a/tests/system/small/bigquery/test_array.py +++ b/tests/system/small/bigquery/test_array.py @@ -50,7 +50,7 @@ def test_array_length(): ), ], ) -def test_array_agg_w_series(input_data, output_data): +def test_array_agg_w_series_groupby(input_data, output_data): input_index = ["a", "a", "b", "b", "c"] series = bpd.Series(input_data, index=input_index) result = bbq.array_agg(series.groupby(level=0)) @@ -62,7 +62,7 @@ def test_array_agg_w_series(input_data, output_data): ) -def test_array_agg_w_dataframe(): +def test_array_agg_w_dataframe_groupby(): data = { "a": [1, 1, 2, 1], "b": [2, None, 1, 2], @@ -84,6 +84,12 @@ def test_array_agg_w_dataframe(): ) +def test_array_agg_w_series(): + series = bpd.Series([1, 2, 3, 4, 5], index=["a", "a", "b", "b", "c"]) + with pytest.raises(ValueError): + bbq.array_agg(series) + + @pytest.mark.parametrize( ("ascending", "expected_b", "expected_c"), [ From 0b55dc7c0264628cc675cef93b76c29c0887e341 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 14 May 2024 22:53:09 +0000 Subject: [PATCH 13/16] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- bigframes/bigquery/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/bigquery/__init__.py b/bigframes/bigquery/__init__.py index e9d2abc7ca..6c9c04dca7 100644 --- a/bigframes/bigquery/__init__.py +++ b/bigframes/bigquery/__init__.py @@ -96,7 +96,7 @@ def array_agg( >>> df = bpd.DataFrame(l, columns=["a", "b", "c"]) >>> bbq.array_agg(df.groupby(by=["b"])) a c - b + b 1.0 [2] [3] 2.0 [1 1] [3 2] From 9054df6400c6a907afa9225759d81994a06195cf Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Wed, 15 May 2024 21:39:33 +0000 Subject: [PATCH 14/16] fixing ordering alias --- bigframes/core/compile/compiled.py | 17 ++++++++++++++--- tests/system/small/bigquery/test_array.py | 2 +- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 9b30091f05..1c2217c25a 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -177,8 +177,7 @@ def _aggregate_base( by_column_ids: typing.Sequence[str] = (), dropna: bool = True, ) -> OrderedIR: - if not self.is_ordered_ir and len(order_by) > 0: - raise ValueError("Cannot apply 'order_by' to an UnorderedIR instance.") + assert not self.is_ordered_ir or len(order_by) > 0 bindings = {col: table[col] for col in self.column_ids} stats = { @@ -658,7 +657,19 @@ def aggregate( OrderedIR """ table = self._to_ibis_expr(ordering_mode="unordered", expose_hidden_cols=True) - order_by = self._ibis_order + + all_columns = { + column_name: table[column_name] + for column_name in { + **self._column_names, + **self._hidden_ordering_column_names, + } + } + order_by = _convert_ordering_to_table_values( + all_columns, + self._ordering.all_ordering_columns, + ) + return self._aggregate_base( table, order_by=order_by, diff --git a/tests/system/small/bigquery/test_array.py b/tests/system/small/bigquery/test_array.py index 0c4e4076e3..a4f4e006c4 100644 --- a/tests/system/small/bigquery/test_array.py +++ b/tests/system/small/bigquery/test_array.py @@ -123,7 +123,7 @@ def test_array_agg_reserve_order(ascending, expected_b, expected_c): ) -def assert_array_agg_matches_after_explode(): +def test_array_agg_matches_after_explode(): data = { "index": np.arange(10), "a": [np.random.randint(0, 10, 10) for _ in range(10)], From f8ebae12a5324b4d7fa3e6932250f283bcbba7e5 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Thu, 16 May 2024 01:25:32 +0000 Subject: [PATCH 15/16] create compile_ordered_unary_agg dispatcher --- bigframes/core/compile/aggregate_compiler.py | 95 +++++++++----------- 1 file changed, 41 insertions(+), 54 deletions(-) diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index 7122da227a..c0b0562a54 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -38,9 +38,10 @@ def compile_aggregate( ) -> ibis_types.Value: if isinstance(aggregate, ex.UnaryAggregation): input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings) - return compile_unary_agg( - aggregate.op, input, order_by=order_by if aggregate.op.can_order_by else [] - ) + if aggregate.op.can_order_by: + return compile_ordered_unary_agg(aggregate.op, input, order_by=order_by) + else: + return compile_unary_agg(aggregate.op, input) elif isinstance(aggregate, ex.BinaryAggregation): left = scalar_compiler.compile_expression(aggregate.left, bindings=bindings) right = scalar_compiler.compile_expression(aggregate.right, bindings=bindings) @@ -66,7 +67,8 @@ def compile_analytic( @functools.singledispatch def compile_binary_agg( op: agg_ops.WindowOp, - input: ibis_types.Column, + left: ibis_types.Column, + right: ibis_types.Column, window: Optional[window_spec.WindowSpec] = None, ) -> ibis_types.Value: raise ValueError(f"Can't compile unrecognized operation: {op}") @@ -77,6 +79,15 @@ def compile_unary_agg( op: agg_ops.WindowOp, input: ibis_types.Column, window: Optional[window_spec.WindowSpec] = None, +) -> ibis_types.Value: + raise ValueError(f"Can't compile unrecognized operation: {op}") + + +@functools.singledispatch +def compile_ordered_unary_agg( + op: agg_ops.WindowOp, + input: ibis_types.Column, + window: Optional[window_spec.WindowSpec] = None, order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: raise ValueError(f"Can't compile unrecognized operation: {op}") @@ -113,7 +124,6 @@ def _( op: agg_ops.SumOp, column: ibis_types.NumericColumn, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # Will be null if all inputs are null. Pandas defaults to zero sum though. bq_sum = _apply_window_if_present(column.sum(), window) @@ -128,7 +138,6 @@ def _( op: agg_ops.MedianOp, column: ibis_types.NumericColumn, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # PERCENTILE_CONT has very few allowed windows. For example, "window # framing clause is not allowed for analytic function percentile_cont". @@ -149,7 +158,6 @@ def _( op: agg_ops.ApproxQuartilesOp, column: ibis_types.NumericColumn, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # PERCENTILE_CONT has very few allowed windows. For example, "window # framing clause is not allowed for analytic function percentile_cont". @@ -169,7 +177,6 @@ def _( op: agg_ops.QuantileOp, column: ibis_types.NumericColumn, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: return _apply_window_if_present(column.quantile(op.q), window) @@ -180,7 +187,7 @@ def _( op: agg_ops.MeanOp, column: ibis_types.NumericColumn, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], + # order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: return _apply_window_if_present(column.mean(), window) @@ -191,7 +198,6 @@ def _( op: agg_ops.ProductOp, column: ibis_types.NumericColumn, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.NumericValue: # Need to short-circuit as log with zeroes is illegal sql is_zero = cast(ibis_types.BooleanColumn, (column == 0)) @@ -230,7 +236,6 @@ def _( op: agg_ops.MaxOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present(column.max(), window) @@ -240,7 +245,6 @@ def _( op: agg_ops.MinOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present(column.min(), window) @@ -251,7 +255,6 @@ def _( op: agg_ops.StdOp, x: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present(cast(ibis_types.NumericColumn, x).std(), window) @@ -262,7 +265,6 @@ def _( op: agg_ops.VarOp, x: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present(cast(ibis_types.NumericColumn, x).var(), window) @@ -273,7 +275,6 @@ def _( op: agg_ops.PopVarOp, x: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present( cast(ibis_types.NumericColumn, x).var(how="pop"), window @@ -285,42 +286,15 @@ def _( op: agg_ops.CountOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.count(), window) -@compile_unary_agg.register -def _( - op: agg_ops.ArrayAggOp, - column: ibis_types.Column, - window=None, - order_by: typing.Sequence[ibis_types.Value] = [], -) -> ibis_types.ArrayValue: - # BigQuery doesn't currently support using ARRAY_AGG with both window and aggregate - # functions simultaneously. Some aggregate functions (or its equivalent syntax) - # are more important, such as: - # - `IGNORE NULLS` are required to avoid an raised error if the final result - # contains a NULL element. - # - `ORDER BY` are required for the default ordering mode. - # To keep things simpler, windowing support is skipped for now. - if window is not None: - raise NotImplementedError( - f"ArrayAgg with windowing is not supported. {constants.FEEDBACK_LINK}" - ) - - return vendored_ibis_ops.ArrayAggregate( - column, - order_by=order_by, - ).to_expr() - - @compile_unary_agg.register def _( op: agg_ops.CutOp, x: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ): out = ibis.case() if isinstance(op.bins, int): @@ -377,7 +351,6 @@ def _( self: agg_ops.QcutOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: if isinstance(self.quantiles, int): quantiles_ibis = dtypes.literal_to_ibis_scalar(self.quantiles) @@ -410,7 +383,6 @@ def _( op: agg_ops.NuniqueOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.nunique(), window) @@ -420,7 +392,6 @@ def _( op: agg_ops.AnyValueOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.arbitrary(), window) @@ -430,7 +401,6 @@ def _( op: agg_ops.RankOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: # Ibis produces 0-based ranks, while pandas creates 1-based ranks return _apply_window_if_present(ibis.rank(), window) + 1 @@ -441,7 +411,6 @@ def _( op: agg_ops.DenseRankOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.IntegerValue: # Ibis produces 0-based ranks, while pandas creates 1-based ranks return _apply_window_if_present(column.dense_rank(), window) + 1 @@ -457,7 +426,6 @@ def _( op: agg_ops.FirstNonNullOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present( vendored_ibis_ops.FirstNonNullValue(column).to_expr(), window # type: ignore @@ -469,7 +437,6 @@ def _( op: agg_ops.LastOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present(column.last(), window) @@ -479,7 +446,6 @@ def _( op: agg_ops.LastNonNullOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: return _apply_window_if_present( vendored_ibis_ops.LastNonNullValue(column).to_expr(), window # type: ignore @@ -491,7 +457,6 @@ def _( op: agg_ops.ShiftOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: if op.periods == 0: # No-op return column @@ -505,7 +470,6 @@ def _( op: agg_ops.DiffOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.Value: shifted = compile_unary_agg(agg_ops.ShiftOp(op.periods), column, window) if column.type().is_boolean(): @@ -525,7 +489,6 @@ def _( op: agg_ops.AllOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.BooleanValue: # BQ will return null for empty column, result would be true in pandas. result = _is_true(column).all() @@ -540,7 +503,6 @@ def _( op: agg_ops.AnyOp, column: ibis_types.Column, window=None, - order_by: typing.Sequence[ibis_types.Value] = [], ) -> ibis_types.BooleanValue: # BQ will return null for empty column, result would be false in pandas. result = _is_true(column).any() @@ -550,6 +512,31 @@ def _( ) +@compile_ordered_unary_agg.register +def _( + op: agg_ops.ArrayAggOp, + column: ibis_types.Column, + window=None, + order_by: typing.Sequence[ibis_types.Value] = [], +) -> ibis_types.ArrayValue: + # BigQuery doesn't currently support using ARRAY_AGG with both window and aggregate + # functions simultaneously. Some aggregate functions (or its equivalent syntax) + # are more important, such as: + # - `IGNORE NULLS` are required to avoid an raised error if the final result + # contains a NULL element. + # - `ORDER BY` are required for the default ordering mode. + # To keep things simpler, windowing support is skipped for now. + if window is not None: + raise NotImplementedError( + f"ArrayAgg with windowing is not supported. {constants.FEEDBACK_LINK}" + ) + + return vendored_ibis_ops.ArrayAggregate( + column, + order_by=order_by, + ).to_expr() + + @compile_binary_agg.register def _( op: agg_ops.CorrOp, left: ibis_types.Column, right: ibis_types.Column, window=None From 559609505eec4bc4ab65577b76c4b93d4b245b76 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Thu, 16 May 2024 17:35:21 +0000 Subject: [PATCH 16/16] fixing mypy --- tests/system/small/bigquery/test_array.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/system/small/bigquery/test_array.py b/tests/system/small/bigquery/test_array.py index a4f4e006c4..0664c31a3c 100644 --- a/tests/system/small/bigquery/test_array.py +++ b/tests/system/small/bigquery/test_array.py @@ -57,7 +57,7 @@ def test_array_agg_w_series_groupby(input_data, output_data): expected = bpd.Series(output_data, index=["a", "b", "c"]) pd.testing.assert_series_equal( - result.to_pandas(), + result.to_pandas(), # type: ignore expected.to_pandas(), ) @@ -79,15 +79,17 @@ def test_array_agg_w_dataframe_groupby(): expected = bpd.DataFrame(expected_data).set_index("b") pd.testing.assert_frame_equal( - result.to_pandas(), + result.to_pandas(), # type: ignore expected.to_pandas(), ) def test_array_agg_w_series(): series = bpd.Series([1, 2, 3, 4, 5], index=["a", "a", "b", "b", "c"]) + # Mypy error expected: array_agg currently incompatible with Series. + # Test for coverage. with pytest.raises(ValueError): - bbq.array_agg(series) + bbq.array_agg(series) # type: ignore @pytest.mark.parametrize( @@ -118,7 +120,7 @@ def test_array_agg_reserve_order(ascending, expected_b, expected_c): expected = bpd.DataFrame(expected_data).set_index("a") pd.testing.assert_frame_equal( - result.to_pandas(), + result.to_pandas(), # type: ignore expected.to_pandas(), ) @@ -134,6 +136,6 @@ def test_array_agg_matches_after_explode(): result.index.name = "index" pd.testing.assert_frame_equal( - result.to_pandas(), + result.to_pandas(), # type: ignore df.to_pandas(), )