From f60a8c02cabdc60c330b33c77aebeadd03a64bb3 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Sat, 15 Feb 2025 00:02:27 +0000 Subject: [PATCH 1/3] [WIP] support time series diff --- bigframes/core/compile/aggregate_compiler.py | 18 +++++++++++ bigframes/core/rewrite/timedeltas.py | 33 ++++++++++++++++++++ bigframes/operations/aggregations.py | 19 +++++++++++ 3 files changed, 70 insertions(+) diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index 02c7ae128b..4914027bdf 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -553,6 +553,24 @@ def _( raise TypeError(f"Cannot perform diff on type{column.type()}") +@compile_unary_agg.register +def _( + op: agg_ops.TimeSeriesDiffOp, + column: ibis_types.Column, + window=None, +) -> ibis_types.Value: + if not column.type().is_timestamp(): + raise TypeError(f"Cannot perform time series diff on type{column.type()}") + + original_column = cast(ibis_types.TimestampColumn, column) + shifted_column = cast( + ibis_types.TimestampColumn, + compile_unary_agg(agg_ops.ShiftOp(op.periods), column, window), + ) + + return original_column.delta(shifted_column, part="microsecond") + + @compile_unary_agg.register def _( op: agg_ops.AllOp, diff --git a/bigframes/core/rewrite/timedeltas.py b/bigframes/core/rewrite/timedeltas.py index db3a426635..6d32466521 100644 --- a/bigframes/core/rewrite/timedeltas.py +++ b/bigframes/core/rewrite/timedeltas.py @@ -22,6 +22,7 @@ from bigframes import operations as ops from bigframes.core import expression as ex from bigframes.core import nodes, schema, utils +from bigframes.operations import aggregations as aggs @dataclasses.dataclass @@ -59,6 +60,16 @@ def rewrite_timedelta_expressions(root: nodes.BigFrameNode) -> nodes.BigFrameNod by = tuple(_rewrite_ordering_expr(x, root.schema) for x in root.by) return nodes.OrderByNode(root.child, by) + if isinstance(root, nodes.WindowOpNode): + return nodes.WindowOpNode( + root.child, + _rewrite_aggregation(root.expression, root.schema), + root.window_spec, + root.output_name, + root.never_skip_nulls, + root.skip_reproject_unsafe, + ) + return root @@ -126,3 +137,25 @@ def _rewrite_add_op(left: _TypedExpr, right: _TypedExpr) -> _TypedExpr: return _TypedExpr.create_op_expr(ops.timestamp_add_op, right, left) return _TypedExpr.create_op_expr(ops.add_op, left, right) + + +@functools.cache +def _rewrite_aggregation( + aggregation: ex.Aggregation, schema: schema.ArraySchema +) -> ex.Aggregation: + if not isinstance(aggregation, ex.UnaryAggregation): + return aggregation + if not isinstance(aggregation.op, aggs.DiffOp): + return aggregation + + if isinstance(aggregation.arg, ex.DerefOp): + input_type = schema.get_type(aggregation.arg.id.sql) + else: + input_type = aggregation.arg.dtype + + if dtypes.is_datetime_like(input_type): + return ex.UnaryAggregation( + aggs.TimeSeriesDiffOp(aggregation.op.periods), aggregation.arg + ) + + return aggregation diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 5f32cb980a..e9d102b42d 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -484,6 +484,25 @@ class DiffOp(UnaryWindowOp): def skips_nulls(self): return False + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + if dtypes.is_datetime_like(input_types[0]): + return dtypes.TIMEDELTA_DTYPE + return super().output_type(*input_types) + + +@dataclasses.dataclass(frozen=True) +class TimeSeriesDiffOp(UnaryWindowOp): + periods: int + + @property + def skips_nulls(self): + return False + + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + if dtypes.is_datetime_like(input_types[0]): + return dtypes.TIMEDELTA_DTYPE + raise TypeError(f"expect datetime-like types, but got {input_types[0]}") + @dataclasses.dataclass(frozen=True) class AllOp(UnaryAggregateOp): From c4aeee080c083fa00496551d35ce36b47310c991 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Tue, 18 Feb 2025 20:41:03 +0000 Subject: [PATCH 2/3] add tests --- tests/system/small/operations/test_datetimes.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/system/small/operations/test_datetimes.py b/tests/system/small/operations/test_datetimes.py index 936becff76..ca83604dd5 100644 --- a/tests/system/small/operations/test_datetimes.py +++ b/tests/system/small/operations/test_datetimes.py @@ -448,3 +448,15 @@ def test_timestamp_diff_literal_sub_series(scalars_dfs, column, value): expected_result = value - pd_series assert_series_equal(actual_result, expected_result) + + +@pytest.mark.parametrize("column", ["timestamp_col", "datetime_col"]) +def test_timestamp_series_diff_agg(scalars_dfs, column): + bf_df, pd_df = scalars_dfs + bf_series = bf_df[column] + pd_series = pd_df[column] + + actual_result = bf_series.diff().to_pandas() + + expected_result = pd_series.diff() + assert_series_equal(actual_result, expected_result) From d1078a9f4ca1dae7b081f83b50041bf9c1d0947f Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 19 Feb 2025 18:54:45 +0000 Subject: [PATCH 3/3] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- bigframes/core/rewrite/timedeltas.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/bigframes/core/rewrite/timedeltas.py b/bigframes/core/rewrite/timedeltas.py index d05653b084..dad474e5a1 100644 --- a/bigframes/core/rewrite/timedeltas.py +++ b/bigframes/core/rewrite/timedeltas.py @@ -199,6 +199,3 @@ def _rewrite_aggregation( ) return aggregation - - -