Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

chore: support timedeltas for read_pandas() #1349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 4, 2025
5 changes: 5 additions & 0 deletions 5 bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import google.cloud.bigquery
import pandas as pd

from bigframes.core import utils
import bigframes.core.compile.compiled as compiled
import bigframes.core.compile.concat as concat_impl
import bigframes.core.compile.explode
Expand Down Expand Up @@ -173,6 +174,10 @@ def compile_readlocal(self, node: nodes.ReadLocalNode):
io.BytesIO(node.feather_bytes),
columns=[item.source_id for item in node.scan_list.items],
)

# Convert timedeltas to microseconds for compatibility with BigQuery
_ = utils.replace_timedeltas_with_micros(array_as_pd)

offsets = node.offsets_col.sql if node.offsets_col else None
return compiled.UnorderedIR.from_pandas(
array_as_pd, node.scan_list, offsets=offsets
Expand Down
3 changes: 3 additions & 0 deletions 3 bigframes/core/local_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def arrow_type_replacements(type: pa.DataType) -> pa.DataType:
if pa.types.is_time64(type):
# This is potentially lossy, but BigFrames doesn't support ns
return pa.time64("us")
if pa.types.is_duration(type):
# This is potentially lossy, but BigFrames doesn't support ns
return pa.duration("us")
if pa.types.is_decimal128(type):
return pa.decimal128(38, 9)
if pa.types.is_decimal256(type):
Expand Down
8 changes: 6 additions & 2 deletions 8 bigframes/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ class ArraySchema:
items: typing.Tuple[SchemaItem, ...]

@classmethod
def from_bq_table(cls, table: google.cloud.bigquery.Table):
def from_bq_table(
cls,
table: google.cloud.bigquery.Table,
column_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {},
):
items = tuple(
SchemaItem(name, dtype)
SchemaItem(name, column_type_overrides.get(name, dtype))
for name, dtype in bigframes.dtypes.bf_type_from_type_kind(
table.schema
).items()
Expand Down
27 changes: 27 additions & 0 deletions 27 bigframes/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import bigframes_vendored.pandas.io.common as vendored_pandas_io_common
import pandas as pd
import pandas.api.types as pdtypes
import typing_extensions

import bigframes.exceptions as bfe
Expand Down Expand Up @@ -184,3 +185,29 @@ def wrapper(*args, **kwargs):
return wrapper

return decorator


def timedelta_to_micros(td: pd.Timedelta) -> int:
# td.value returns total nanoseconds.
return td.value // 1000


def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]:
"""
Replaces in-place timedeltas to integer values in microseconds. Nanosecond part is ignored.

Returns:
The names of updated columns
"""
updated_columns = []

for col in dataframe.columns:
if pdtypes.is_timedelta64_dtype(dataframe[col].dtype):
dataframe[col] = dataframe[col].apply(timedelta_to_micros)
updated_columns.append(col)

if pdtypes.is_timedelta64_dtype(dataframe.index.dtype):
dataframe.index = dataframe.index.map(timedelta_to_micros)
updated_columns.append(dataframe.index.name)

return updated_columns
5 changes: 5 additions & 0 deletions 5 bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,16 @@ def dtype_for_etype(etype: ExpressionType) -> Dtype:
def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype:
if arrow_dtype in _ARROW_TO_BIGFRAMES:
return _ARROW_TO_BIGFRAMES[arrow_dtype]

if pa.types.is_list(arrow_dtype):
return pd.ArrowDtype(arrow_dtype)

if pa.types.is_struct(arrow_dtype):
return pd.ArrowDtype(arrow_dtype)

if pa.types.is_duration(arrow_dtype):
return pd.ArrowDtype(arrow_dtype)

# BigFrames doesn't distinguish between string and large_string because the
# largest string (2 GB) is already larger than the largest BigQuery row.
if pa.types.is_string(arrow_dtype) or pa.types.is_large_string(arrow_dtype):
Expand Down
1 change: 1 addition & 0 deletions 1 bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
pandas.ArrowDtype(pa.timestamp("us", tz="UTC")),
pandas.ArrowDtype(pa.decimal128(38, 9)),
pandas.ArrowDtype(pa.decimal256(76, 38)),
pandas.ArrowDtype(pa.duration("us")),
)


Expand Down
6 changes: 5 additions & 1 deletion 6 bigframes/session/_io/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

import dataclasses
from typing import Collection, Union
from typing import Collection, List, Union

import bigframes_vendored.constants as constants
import db_dtypes # type: ignore
Expand All @@ -38,6 +38,7 @@ class DataFrameAndLabels:
column_labels: Collection
index_labels: Collection
ordering_col: str
timedelta_cols: List[str]
sycai marked this conversation as resolved.
Show resolved Hide resolved


def _arrow_to_pandas_arrowdtype(
Expand Down Expand Up @@ -163,9 +164,12 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL
pandas_dataframe_copy.columns = pandas.Index(new_col_ids)
pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0])

timedelta_cols = utils.replace_timedeltas_with_micros(pandas_dataframe_copy)

return DataFrameAndLabels(
df=pandas_dataframe_copy,
column_labels=col_labels,
index_labels=idx_labels,
ordering_col=ordering_col,
timedelta_cols=timedelta_cols,
)
18 changes: 15 additions & 3 deletions 18 bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,16 @@ def read_pandas_load_job(
self._start_generic_job(load_job)

destination_table = self._bqclient.get_table(load_table_destination)
col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
col: bigframes.dtypes.TIMEDETLA_DTYPE
for col in df_and_labels.timedelta_cols
}
array_value = core.ArrayValue.from_table(
table=destination_table,
# TODO: Generate this directly from original pandas df.
schema=schemata.ArraySchema.from_bq_table(destination_table),
# TODO (b/394156190): Generate this directly from original pandas df.
schema=schemata.ArraySchema.from_bq_table(
destination_table, col_type_overrides
),
session=self._session,
offsets_col=ordering_col,
).drop_columns([ordering_col])
Expand Down Expand Up @@ -229,10 +235,16 @@ def read_pandas_streaming(
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
)

col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
col: bigframes.dtypes.TIMEDETLA_DTYPE
for col in df_and_labels.timedelta_cols
}
array_value = (
core.ArrayValue.from_table(
table=destination_table,
schema=schemata.ArraySchema.from_bq_table(destination_table),
schema=schemata.ArraySchema.from_bq_table(
destination_table, col_type_overrides
),
session=self._session,
# Don't set the offsets column because we want to group by it.
)
Expand Down
56 changes: 56 additions & 0 deletions 56 tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,62 @@ def test_read_pandas_tokyo(
assert len(expected) == result.total_rows


@pytest.mark.parametrize(
"write_engine",
["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"],
)
def test_read_pandas_timedelta_dataframes(session, write_engine):
expected_df = pd.DataFrame({"my_col": pd.to_timedelta([1, 2, 3], unit="d")})

actual_result = (
session.read_pandas(expected_df, write_engine=write_engine)
.to_pandas()
.astype("timedelta64[ns]")
)

if write_engine == "bigquery_streaming":
expected_df.index = pd.Index([pd.NA] * 3, dtype="Int64")
pd.testing.assert_frame_equal(actual_result, expected_df, check_index_type=False)


@pytest.mark.parametrize(
"write_engine",
["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"],
)
def test_read_pandas_timedelta_series(session, write_engine):
expected_series = pd.Series(pd.to_timedelta([1, 2, 3], unit="d"))

actual_result = (
session.read_pandas(expected_series, write_engine=write_engine)
.to_pandas()
.astype("timedelta64[ns]")
)

if write_engine == "bigquery_streaming":
expected_series.index = pd.Index([pd.NA] * 3, dtype="Int64")
pd.testing.assert_series_equal(
actual_result, expected_series, check_index_type=False
)


@pytest.mark.parametrize(
"write_engine",
["default", "bigquery_inline", "bigquery_load"],
)
def test_read_pandas_timedelta_index(session, write_engine):
expected_index = pd.to_timedelta(
[1, 2, 3], unit="d"
) # to_timedelta returns an index

actual_result = (
session.read_pandas(expected_index, write_engine=write_engine)
.to_pandas()
.astype("timedelta64[ns]")
)

pd.testing.assert_index_equal(actual_result, expected_index)


@utils.skip_legacy_pandas
@pytest.mark.parametrize(
("write_engine",),
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.