diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 0d047b366e..a72ca47190 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -24,6 +24,7 @@ import google.cloud.bigquery import pandas as pd +from bigframes.core import utils import bigframes.core.compile.compiled as compiled import bigframes.core.compile.concat as concat_impl import bigframes.core.compile.explode @@ -173,6 +174,10 @@ def compile_readlocal(self, node: nodes.ReadLocalNode): io.BytesIO(node.feather_bytes), columns=[item.source_id for item in node.scan_list.items], ) + + # Convert timedeltas to microseconds for compatibility with BigQuery + _ = utils.replace_timedeltas_with_micros(array_as_pd) + offsets = node.offsets_col.sql if node.offsets_col else None return compiled.UnorderedIR.from_pandas( array_as_pd, node.scan_list, offsets=offsets diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index f665948be2..d891e385d5 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -59,6 +59,9 @@ def arrow_type_replacements(type: pa.DataType) -> pa.DataType: if pa.types.is_time64(type): # This is potentially lossy, but BigFrames doesn't support ns return pa.time64("us") + if pa.types.is_duration(type): + # This is potentially lossy, but BigFrames doesn't support ns + return pa.duration("us") if pa.types.is_decimal128(type): return pa.decimal128(38, 9) if pa.types.is_decimal256(type): diff --git a/bigframes/core/schema.py b/bigframes/core/schema.py index 2b49f81d85..e3808dfffd 100644 --- a/bigframes/core/schema.py +++ b/bigframes/core/schema.py @@ -38,9 +38,13 @@ class ArraySchema: items: typing.Tuple[SchemaItem, ...] @classmethod - def from_bq_table(cls, table: google.cloud.bigquery.Table): + def from_bq_table( + cls, + table: google.cloud.bigquery.Table, + column_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {}, + ): items = tuple( - SchemaItem(name, dtype) + SchemaItem(name, column_type_overrides.get(name, dtype)) for name, dtype in bigframes.dtypes.bf_type_from_type_kind( table.schema ).items() diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index f9ca6cb5f0..7cb2ec7535 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -19,6 +19,7 @@ import bigframes_vendored.pandas.io.common as vendored_pandas_io_common import pandas as pd +import pandas.api.types as pdtypes import typing_extensions import bigframes.exceptions as bfe @@ -184,3 +185,29 @@ def wrapper(*args, **kwargs): return wrapper return decorator + + +def timedelta_to_micros(td: pd.Timedelta) -> int: + # td.value returns total nanoseconds. + return td.value // 1000 + + +def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]: + """ + Replaces in-place timedeltas to integer values in microseconds. Nanosecond part is ignored. + + Returns: + The names of updated columns + """ + updated_columns = [] + + for col in dataframe.columns: + if pdtypes.is_timedelta64_dtype(dataframe[col].dtype): + dataframe[col] = dataframe[col].apply(timedelta_to_micros) + updated_columns.append(col) + + if pdtypes.is_timedelta64_dtype(dataframe.index.dtype): + dataframe.index = dataframe.index.map(timedelta_to_micros) + updated_columns.append(dataframe.index.name) + + return updated_columns diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index a8d9d60366..8b1ca3b0c8 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -409,11 +409,16 @@ def dtype_for_etype(etype: ExpressionType) -> Dtype: def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype: if arrow_dtype in _ARROW_TO_BIGFRAMES: return _ARROW_TO_BIGFRAMES[arrow_dtype] + if pa.types.is_list(arrow_dtype): return pd.ArrowDtype(arrow_dtype) + if pa.types.is_struct(arrow_dtype): return pd.ArrowDtype(arrow_dtype) + if pa.types.is_duration(arrow_dtype): + return pd.ArrowDtype(arrow_dtype) + # BigFrames doesn't distinguish between string and large_string because the # largest string (2 GB) is already larger than the largest BigQuery row. if pa.types.is_string(arrow_dtype) or pa.types.is_large_string(arrow_dtype): diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 24963bdcbc..c8c44be40b 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -120,6 +120,7 @@ pandas.ArrowDtype(pa.timestamp("us", tz="UTC")), pandas.ArrowDtype(pa.decimal128(38, 9)), pandas.ArrowDtype(pa.decimal256(76, 38)), + pandas.ArrowDtype(pa.duration("us")), ) diff --git a/bigframes/session/_io/pandas.py b/bigframes/session/_io/pandas.py index 301e1c4ebb..532a909430 100644 --- a/bigframes/session/_io/pandas.py +++ b/bigframes/session/_io/pandas.py @@ -14,7 +14,7 @@ from __future__ import annotations import dataclasses -from typing import Collection, Union +from typing import Collection, List, Union import bigframes_vendored.constants as constants import db_dtypes # type: ignore @@ -38,6 +38,7 @@ class DataFrameAndLabels: column_labels: Collection index_labels: Collection ordering_col: str + timedelta_cols: List[str] def _arrow_to_pandas_arrowdtype( @@ -163,9 +164,12 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL pandas_dataframe_copy.columns = pandas.Index(new_col_ids) pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0]) + timedelta_cols = utils.replace_timedeltas_with_micros(pandas_dataframe_copy) + return DataFrameAndLabels( df=pandas_dataframe_copy, column_labels=col_labels, index_labels=idx_labels, ordering_col=ordering_col, + timedelta_cols=timedelta_cols, ) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 0f6ea4afff..ba693696c3 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -176,10 +176,16 @@ def read_pandas_load_job( self._start_generic_job(load_job) destination_table = self._bqclient.get_table(load_table_destination) + col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = { + col: bigframes.dtypes.TIMEDETLA_DTYPE + for col in df_and_labels.timedelta_cols + } array_value = core.ArrayValue.from_table( table=destination_table, - # TODO: Generate this directly from original pandas df. - schema=schemata.ArraySchema.from_bq_table(destination_table), + # TODO (b/394156190): Generate this directly from original pandas df. + schema=schemata.ArraySchema.from_bq_table( + destination_table, col_type_overrides + ), session=self._session, offsets_col=ordering_col, ).drop_columns([ordering_col]) @@ -229,10 +235,16 @@ def read_pandas_streaming( f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}" ) + col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = { + col: bigframes.dtypes.TIMEDETLA_DTYPE + for col in df_and_labels.timedelta_cols + } array_value = ( core.ArrayValue.from_table( table=destination_table, - schema=schemata.ArraySchema.from_bq_table(destination_table), + schema=schemata.ArraySchema.from_bq_table( + destination_table, col_type_overrides + ), session=self._session, # Don't set the offsets column because we want to group by it. ) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index e95509e033..a4acb72117 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -691,6 +691,62 @@ def test_read_pandas_tokyo( assert len(expected) == result.total_rows +@pytest.mark.parametrize( + "write_engine", + ["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"], +) +def test_read_pandas_timedelta_dataframes(session, write_engine): + expected_df = pd.DataFrame({"my_col": pd.to_timedelta([1, 2, 3], unit="d")}) + + actual_result = ( + session.read_pandas(expected_df, write_engine=write_engine) + .to_pandas() + .astype("timedelta64[ns]") + ) + + if write_engine == "bigquery_streaming": + expected_df.index = pd.Index([pd.NA] * 3, dtype="Int64") + pd.testing.assert_frame_equal(actual_result, expected_df, check_index_type=False) + + +@pytest.mark.parametrize( + "write_engine", + ["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"], +) +def test_read_pandas_timedelta_series(session, write_engine): + expected_series = pd.Series(pd.to_timedelta([1, 2, 3], unit="d")) + + actual_result = ( + session.read_pandas(expected_series, write_engine=write_engine) + .to_pandas() + .astype("timedelta64[ns]") + ) + + if write_engine == "bigquery_streaming": + expected_series.index = pd.Index([pd.NA] * 3, dtype="Int64") + pd.testing.assert_series_equal( + actual_result, expected_series, check_index_type=False + ) + + +@pytest.mark.parametrize( + "write_engine", + ["default", "bigquery_inline", "bigquery_load"], +) +def test_read_pandas_timedelta_index(session, write_engine): + expected_index = pd.to_timedelta( + [1, 2, 3], unit="d" + ) # to_timedelta returns an index + + actual_result = ( + session.read_pandas(expected_index, write_engine=write_engine) + .to_pandas() + .astype("timedelta64[ns]") + ) + + pd.testing.assert_index_equal(actual_result, expected_index) + + @utils.skip_legacy_pandas @pytest.mark.parametrize( ("write_engine",),