diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index ea642c20fd..3e5f10eca4 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -742,6 +742,11 @@ def timestamp_diff_op_impl(x: ibis_types.TimestampValue, y: ibis_types.Timestamp return x.delta(y, "microsecond") +@scalar_op_compiler.register_binary_op(ops.timestamp_add_op) +def timestamp_add_op_impl(x: ibis_types.TimestampValue, y: ibis_types.IntegerValue): + return x + y.to_interval("us") + + @scalar_op_compiler.register_unary_op(ops.FloorDtOp, pass_op=True) def floor_dt_op_impl(x: ibis_types.Value, op: ops.FloorDtOp): supported_freqs = ["Y", "Q", "M", "W", "D", "h", "min", "s", "ms", "us", "ns"] diff --git a/bigframes/core/rewrite/operators.py b/bigframes/core/rewrite/operators.py index 3145a9e9ae..136e9cc220 100644 --- a/bigframes/core/rewrite/operators.py +++ b/bigframes/core/rewrite/operators.py @@ -19,7 +19,7 @@ from bigframes import dtypes from bigframes import operations as ops from bigframes.core import expression as ex -from bigframes.core import nodes, schema +from bigframes.core import nodes, schema, utils @dataclasses.dataclass @@ -50,7 +50,7 @@ def _rewrite_expressions(expr: ex.Expression, schema: schema.ArraySchema) -> _Ty return _TypedExpr(expr, schema.get_type(expr.id.sql)) if isinstance(expr, ex.ScalarConstantExpression): - return _TypedExpr(expr, expr.dtype) + return _rewrite_scalar_constant_expr(expr) if isinstance(expr, ex.OpExpression): updated_inputs = tuple( @@ -61,12 +61,23 @@ def _rewrite_expressions(expr: ex.Expression, schema: schema.ArraySchema) -> _Ty raise AssertionError(f"Unexpected expression type: {type(expr)}") +def _rewrite_scalar_constant_expr(expr: ex.ScalarConstantExpression) -> _TypedExpr: + if expr.dtype is dtypes.TIMEDELTA_DTYPE: + int_repr = utils.timedelta_to_micros(expr.value) # type: ignore + return _TypedExpr(ex.const(int_repr, expr.dtype), expr.dtype) + + return _TypedExpr(expr, expr.dtype) + + def _rewrite_op_expr( expr: ex.OpExpression, inputs: typing.Tuple[_TypedExpr, ...] ) -> _TypedExpr: if isinstance(expr.op, ops.SubOp): return _rewrite_sub_op(inputs[0], inputs[1]) + if isinstance(expr.op, ops.AddOp): + return _rewrite_add_op(inputs[0], inputs[1]) + input_types = tuple(map(lambda x: x.dtype, inputs)) return _TypedExpr(expr, expr.op.output_type(*input_types)) @@ -80,3 +91,24 @@ def _rewrite_sub_op(left: _TypedExpr, right: _TypedExpr) -> _TypedExpr: result_op.as_expr(left.expr, right.expr), result_op.output_type(left.dtype, right.dtype), ) + + +def _rewrite_add_op(left: _TypedExpr, right: _TypedExpr) -> _TypedExpr: + if dtypes.is_datetime_like(left.dtype) and right.dtype is dtypes.TIMEDELTA_DTYPE: + return _TypedExpr( + ops.timestamp_add_op.as_expr(left.expr, right.expr), + ops.timestamp_add_op.output_type(left.dtype, right.dtype), + ) + + if left.dtype is dtypes.TIMEDELTA_DTYPE and dtypes.is_datetime_like(right.dtype): + # Re-arrange operands such that timestamp is always on the left and timedelta is + # always on the right. + return _TypedExpr( + ops.timestamp_add_op.as_expr(right.expr, left.expr), + ops.timestamp_add_op.output_type(right.dtype, left.dtype), + ) + + return _TypedExpr( + ops.add_op.as_expr(left.expr, right.expr), + ops.add_op.output_type(left.dtype, right.dtype), + ) diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index 7cb2ec7535..0198f12537 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import functools import re import typing @@ -18,6 +19,7 @@ import warnings import bigframes_vendored.pandas.io.common as vendored_pandas_io_common +import numpy as np import pandas as pd import pandas.api.types as pdtypes import typing_extensions @@ -187,9 +189,22 @@ def wrapper(*args, **kwargs): return decorator -def timedelta_to_micros(td: pd.Timedelta) -> int: - # td.value returns total nanoseconds. - return td.value // 1000 +def timedelta_to_micros( + timedelta: typing.Union[pd.Timedelta, datetime.timedelta, np.timedelta64] +) -> int: + if isinstance(timedelta, pd.Timedelta): + # pd.Timedelta.value returns total nanoseconds. + return timedelta.value // 1000 + + if isinstance(timedelta, np.timedelta64): + return timedelta.astype("timedelta64[us]").astype(np.int64) + + if isinstance(timedelta, datetime.timedelta): + return ( + (timedelta.days * 3600 * 24) + timedelta.seconds + ) * 1_000_000 + timedelta.microseconds + + raise TypeError(f"Unrecognized input type: {type(timedelta)}") def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]: diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index d5be2ca584..eed45e1dde 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -105,6 +105,9 @@ pd.Timestamp, datetime.date, datetime.time, + pd.Timedelta, + datetime.timedelta, + np.timedelta64, ] LOCAL_SCALAR_TYPES = typing.get_args(LOCAL_SCALAR_TYPE) @@ -420,7 +423,7 @@ def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype: return pd.ArrowDtype(arrow_dtype) if pa.types.is_duration(arrow_dtype): - return pd.ArrowDtype(arrow_dtype) + return TIMEDELTA_DTYPE # BigFrames doesn't distinguish between string and large_string because the # largest string (2 GB) is already larger than the largest BigQuery row. @@ -562,6 +565,10 @@ def _is_bigframes_dtype(dtype) -> bool: def _infer_dtype_from_python_type(type: type) -> Dtype: + if type in (datetime.timedelta, pd.Timedelta, np.timedelta64): + # Must check timedelta type first. Otherwise other branchs will be evaluated to true + # E.g. np.timedelta64 is a sublcass as np.integer + return TIMEDELTA_DTYPE if issubclass(type, (bool, np.bool_)): return BOOL_DTYPE if issubclass(type, (int, np.integer)): diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index d35fa2c5c2..88406317fe 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -103,6 +103,7 @@ from bigframes.operations.numeric_ops import ( abs_op, add_op, + AddOp, arccos_op, arccosh_op, arcsin_op, @@ -177,7 +178,7 @@ ) from bigframes.operations.struct_ops import StructFieldOp, StructOp from bigframes.operations.time_ops import hour_op, minute_op, normalize_op, second_op -from bigframes.operations.timedelta_ops import ToTimedeltaOp +from bigframes.operations.timedelta_ops import timestamp_add_op, ToTimedeltaOp __all__ = [ # Base ops @@ -249,6 +250,7 @@ "second_op", "normalize_op", # Timedelta ops + "timestamp_add_op", "ToTimedeltaOp", # Datetime ops "date_op", @@ -263,6 +265,7 @@ # Numeric ops "abs_op", "add_op", + "AddOp", "arccos_op", "arccosh_op", "arcsin_op", diff --git a/bigframes/operations/numeric_ops.py b/bigframes/operations/numeric_ops.py index 413d8d66e1..5183e5c4c5 100644 --- a/bigframes/operations/numeric_ops.py +++ b/bigframes/operations/numeric_ops.py @@ -116,12 +116,18 @@ def output_type(self, *input_types): if all(map(dtypes.is_string_like, input_types)) and len(set(input_types)) == 1: # String addition return input_types[0] + + # Timestamp addition. + if dtypes.is_datetime_like(left_type) and right_type is dtypes.TIMEDELTA_DTYPE: + return left_type + if left_type is dtypes.TIMEDELTA_DTYPE and dtypes.is_datetime_like(right_type): + return right_type + if (left_type is None or dtypes.is_numeric(left_type)) and ( right_type is None or dtypes.is_numeric(right_type) ): # Numeric addition return dtypes.coerce_to_common(left_type, right_type) - # TODO: Add temporal addition once delta types supported raise TypeError(f"Cannot add dtypes {left_type} and {right_type}") diff --git a/bigframes/operations/timedelta_ops.py b/bigframes/operations/timedelta_ops.py index f5b82c2331..69e054fa5c 100644 --- a/bigframes/operations/timedelta_ops.py +++ b/bigframes/operations/timedelta_ops.py @@ -25,7 +25,32 @@ class ToTimedeltaOp(base_ops.UnaryOp): name: typing.ClassVar[str] = "to_timedelta" unit: typing.Literal["us", "ms", "s", "m", "h", "d", "W"] - def output_type(self, *input_types): + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: if input_types[0] in (dtypes.INT_DTYPE, dtypes.FLOAT_DTYPE): return dtypes.TIMEDELTA_DTYPE raise TypeError("expected integer or float input") + + +@dataclasses.dataclass(frozen=True) +class TimestampAdd(base_ops.BinaryOp): + name: typing.ClassVar[str] = "timestamp_add" + + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + # timestamp + timedelta => timestamp + if ( + dtypes.is_datetime_like(input_types[0]) + and input_types[1] is dtypes.TIMEDELTA_DTYPE + ): + return input_types[0] + # timedelta + timestamp => timestamp + if input_types[0] is dtypes.TIMEDELTA_DTYPE and dtypes.is_datetime_like( + input_types[1] + ): + return input_types[1] + + raise TypeError( + f"unsupported types for timestamp_add. left: {input_types[0]} right: {input_types[1]}" + ) + + +timestamp_add_op = TimestampAdd() diff --git a/tests/data/scalars.jsonl b/tests/data/scalars.jsonl index 172a55ec11..03755c94b7 100644 --- a/tests/data/scalars.jsonl +++ b/tests/data/scalars.jsonl @@ -6,4 +6,4 @@ {"bool_col": false, "bytes_col": "R8O8dGVuIFRhZw==", "date_col": "1980-03-14", "datetime_col": "1980-03-14 15:16:17", "geography_col": null, "int64_col": "55555", "int64_too": "0", "numeric_col": "5.555555", "float64_col": "555.555", "rowindex": 5, "rowindex_2": 5, "string_col": "Güten Tag!", "time_col": "15:16:17.181921", "timestamp_col": "1980-03-14T15:16:17.181921Z"} {"bool_col": true, "bytes_col": "SGVsbG8JQmlnRnJhbWVzIQc=", "date_col": "2023-05-23", "datetime_col": "2023-05-23 11:37:01", "geography_col": "MULTIPOINT (20 20, 10 40, 40 30, 30 10)", "int64_col": "101202303", "int64_too": "2", "numeric_col": "-10.090807", "float64_col": "-123.456", "rowindex": 6, "rowindex_2": 6, "string_col": "capitalize, This ", "time_col": "01:02:03.456789", "timestamp_col": "2023-05-23T11:42:55.000001Z"} {"bool_col": true, "bytes_col": null, "date_col": "2038-01-20", "datetime_col": "2038-01-19 03:14:08", "geography_col": null, "int64_col": "-214748367", "int64_too": "2", "numeric_col": "11111111.1", "float64_col": "42.42", "rowindex": 7, "rowindex_2": 7, "string_col": " سلام", "time_col": "12:00:00.000001", "timestamp_col": "2038-01-19T03:14:17.999999Z"} -{"bool_col": false, "bytes_col": null, "date_col": null, "datetime_col": null, "geography_col": null, "int64_col": "2", "int64_too": "1", "numeric_col": null, "float64_col": "6.87", "rowindex": 8, "rowindex_2": 8, "string_col": "T", "time_col": null, "timestamp_col": null} +{"bool_col": false, "bytes_col": null, "date_col": null, "datetime_col": null, "geography_col": null, "int64_col": "2", "int64_too": "1", "numeric_col": null, "float64_col": "6.87", "rowindex": 8, "rowindex_2": 8, "string_col": "T", "time_col": null, "timestamp_col": null} \ No newline at end of file diff --git a/tests/system/small/operations/test_timedeltas.py b/tests/system/small/operations/test_timedeltas.py new file mode 100644 index 0000000000..6c44a62686 --- /dev/null +++ b/tests/system/small/operations/test_timedeltas.py @@ -0,0 +1,166 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import datetime + +import numpy as np +import pandas as pd +import pandas.testing +import pytest + + +@pytest.fixture(scope="module") +def temporal_dfs(session): + pandas_df = pd.DataFrame( + { + "datetime_col": [ + pd.Timestamp("2025-02-01 01:00:01"), + pd.Timestamp("2019-01-02 02:00:00"), + ], + "timestamp_col": [ + pd.Timestamp("2023-01-01 01:00:01", tz="UTC"), + pd.Timestamp("2024-01-02 02:00:00", tz="UTC"), + ], + "timedelta_col": [pd.Timedelta(3, "s"), pd.Timedelta(-4, "d")], + } + ) + + bigframes_df = session.read_pandas(pandas_df) + + return bigframes_df, pandas_df + + +@pytest.mark.parametrize( + ("column", "pd_dtype"), + [ + ("datetime_col", "