From f6068a14a3c117e37f68aad37fa92659171b3fda Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Sat, 1 Feb 2025 00:53:14 +0000 Subject: [PATCH 01/10] chore: support timedeltas for read_pandas() --- bigframes/core/compile/compiler.py | 10 ++++++ bigframes/core/local_data.py | 3 ++ bigframes/core/schema.py | 8 +++-- bigframes/core/utils.py | 4 +++ bigframes/dtypes.py | 5 +++ bigframes/session/__init__.py | 1 + bigframes/session/_io/pandas.py | 28 +++++++++++++++- bigframes/session/loader.py | 16 +++++++-- tests/system/small/test_session.py | 52 ++++++++++++++++++++++++++++++ 9 files changed, 122 insertions(+), 5 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 0d047b366e..85abed7ec5 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -23,7 +23,9 @@ import bigframes_vendored.ibis.expr.types as ibis_types import google.cloud.bigquery import pandas as pd +import pandas.api.types as pdtypes +from bigframes.core import utils import bigframes.core.compile.compiled as compiled import bigframes.core.compile.concat as concat_impl import bigframes.core.compile.explode @@ -173,6 +175,14 @@ def compile_readlocal(self, node: nodes.ReadLocalNode): io.BytesIO(node.feather_bytes), columns=[item.source_id for item in node.scan_list.items], ) + + # Convert timedeltas to microseconds for compatibility with BigQuery + for col in array_as_pd.columns: + if pdtypes.is_timedelta64_dtype(array_as_pd[col].dtype): + array_as_pd[col] = array_as_pd[col].map(utils.timedelta_to_micros) + if pdtypes.is_timedelta64_dtype(array_as_pd.index.dtype): + array_as_pd.index = array_as_pd.index.map(utils.timedelta_to_micros) + offsets = node.offsets_col.sql if node.offsets_col else None return compiled.UnorderedIR.from_pandas( array_as_pd, node.scan_list, offsets=offsets diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index f665948be2..d891e385d5 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -59,6 +59,9 @@ def arrow_type_replacements(type: pa.DataType) -> pa.DataType: if pa.types.is_time64(type): # This is potentially lossy, but BigFrames doesn't support ns return pa.time64("us") + if pa.types.is_duration(type): + # This is potentially lossy, but BigFrames doesn't support ns + return pa.duration("us") if pa.types.is_decimal128(type): return pa.decimal128(38, 9) if pa.types.is_decimal256(type): diff --git a/bigframes/core/schema.py b/bigframes/core/schema.py index 2b49f81d85..4e55b0dcb8 100644 --- a/bigframes/core/schema.py +++ b/bigframes/core/schema.py @@ -38,9 +38,13 @@ class ArraySchema: items: typing.Tuple[SchemaItem, ...] @classmethod - def from_bq_table(cls, table: google.cloud.bigquery.Table): + def from_bq_table( + cls, + table: google.cloud.bigquery.Table, + override_types: typing.Dict[str, bigframes.dtypes.Dtype] = {}, + ): items = tuple( - SchemaItem(name, dtype) + SchemaItem(name, override_types.get(name, dtype)) for name, dtype in bigframes.dtypes.bf_type_from_type_kind( table.schema ).items() diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index f9ca6cb5f0..074a90b8c1 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -184,3 +184,7 @@ def wrapper(*args, **kwargs): return wrapper return decorator + + +def timedelta_to_micros(td: pd.Timedelta) -> int: + return round(td.total_seconds() * 1_000_000) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index a8d9d60366..8b1ca3b0c8 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -409,11 +409,16 @@ def dtype_for_etype(etype: ExpressionType) -> Dtype: def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype: if arrow_dtype in _ARROW_TO_BIGFRAMES: return _ARROW_TO_BIGFRAMES[arrow_dtype] + if pa.types.is_list(arrow_dtype): return pd.ArrowDtype(arrow_dtype) + if pa.types.is_struct(arrow_dtype): return pd.ArrowDtype(arrow_dtype) + if pa.types.is_duration(arrow_dtype): + return pd.ArrowDtype(arrow_dtype) + # BigFrames doesn't distinguish between string and large_string because the # largest string (2 GB) is already larger than the largest BigQuery row. if pa.types.is_string(arrow_dtype) or pa.types.is_large_string(arrow_dtype): diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 24963bdcbc..c8c44be40b 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -120,6 +120,7 @@ pandas.ArrowDtype(pa.timestamp("us", tz="UTC")), pandas.ArrowDtype(pa.decimal128(38, 9)), pandas.ArrowDtype(pa.decimal256(76, 38)), + pandas.ArrowDtype(pa.duration("us")), ) diff --git a/bigframes/session/_io/pandas.py b/bigframes/session/_io/pandas.py index 301e1c4ebb..0e32f705cc 100644 --- a/bigframes/session/_io/pandas.py +++ b/bigframes/session/_io/pandas.py @@ -14,13 +14,14 @@ from __future__ import annotations import dataclasses -from typing import Collection, Union +from typing import Collection, List, Union import bigframes_vendored.constants as constants import db_dtypes # type: ignore import geopandas # type: ignore import numpy as np import pandas +import pandas.api.types as pdtypes import pandas.arrays import pyarrow # type: ignore import pyarrow.compute # type: ignore @@ -38,6 +39,7 @@ class DataFrameAndLabels: column_labels: Collection index_labels: Collection ordering_col: str + timedelta_cols: Collection def _arrow_to_pandas_arrowdtype( @@ -163,9 +165,33 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL pandas_dataframe_copy.columns = pandas.Index(new_col_ids) pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0]) + timedelta_cols = _replace_timedeltas_with_micros(pandas_dataframe_copy) + return DataFrameAndLabels( df=pandas_dataframe_copy, column_labels=col_labels, index_labels=idx_labels, ordering_col=ordering_col, + timedelta_cols=timedelta_cols, ) + + +def _replace_timedeltas_with_micros(dataframe: pandas.DataFrame) -> List[str]: + """ + Replaces in-place timedeltas to their nearest integer values in microseconds. + + Returns: + The names of updated columns + """ + updated_columns = [] + + for col in dataframe.columns: + if pdtypes.is_timedelta64_dtype(dataframe[col].dtype): + dataframe[col] = dataframe[col].apply(utils.timedelta_to_micros) + updated_columns.append(col) + + if pdtypes.is_timedelta64_dtype(dataframe.index.dtype): + dataframe.index = dataframe.index.map(utils.timedelta_to_micros) + updated_columns.append(dataframe.index.name) + + return updated_columns diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 43faae37c3..9710802caf 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -176,10 +176,16 @@ def read_pandas_load_job( self._start_generic_job(load_job) destination_table = self._bqclient.get_table(load_table_destination) + col_type_overrides = { + col: bigframes.dtypes.TIMEDETLA_DTYPE + for col in df_and_labels.timedelta_cols + } array_value = core.ArrayValue.from_table( table=destination_table, # TODO: Generate this directly from original pandas df. - schema=schemata.ArraySchema.from_bq_table(destination_table), + schema=schemata.ArraySchema.from_bq_table( + destination_table, col_type_overrides + ), session=self._session, offsets_col=ordering_col, ).drop_columns([ordering_col]) @@ -229,10 +235,16 @@ def read_pandas_streaming( f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}" ) + col_type_overrides = { + col: bigframes.dtypes.TIMEDETLA_DTYPE + for col in df_and_labels.timedelta_cols + } array_value = ( core.ArrayValue.from_table( table=destination_table, - schema=schemata.ArraySchema.from_bq_table(destination_table), + schema=schemata.ArraySchema.from_bq_table( + destination_table, col_type_overrides + ), session=self._session, # Don't set the offsets column because we want to group by it. ) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index e95509e033..3f9a04ad69 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -691,6 +691,58 @@ def test_read_pandas_tokyo( assert len(expected) == result.total_rows +@pytest.mark.parametrize( + "write_engine", + ["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"], +) +def test_read_pandas_timedelta_dataframes(session, write_engine): + expected_df = pd.DataFrame({"my_col": pd.to_timedelta([1, 2, 3], unit="d")}) + + actual_result = ( + session.read_pandas(expected_df, write_engine=write_engine) + .to_pandas() + .astype("timedelta64[ns]") + ) + + if write_engine == 'bigquery_streaming': + expected_df.index = pd.Index([pd.NA] * 3, dtype='Int64') + pd.testing.assert_frame_equal(actual_result, expected_df, check_index_type=False) + + +@pytest.mark.parametrize( + "write_engine", + ["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"], +) +def test_read_pandas_timedelta_series(session, write_engine): + expected_series = pd.Series(pd.to_timedelta([1, 2, 3], unit="d")) + + actual_result = ( + session.read_pandas(expected_series, write_engine=write_engine) + .to_pandas() + .astype("timedelta64[ns]") + ) + + if write_engine == 'bigquery_streaming': + expected_series.index = pd.Index([pd.NA] * 3, dtype='Int64') + pd.testing.assert_series_equal(actual_result, expected_series, check_index_type=False) + + +@pytest.mark.parametrize( + "write_engine", + ["default", "bigquery_inline", "bigquery_load"], +) +def test_read_pandas_timedelta_index(session, write_engine): + expected_index = pd.to_timedelta([1, 2, 3], unit="d") # to_timedelta returns an index + + actual_result = ( + session.read_pandas(expected_index, write_engine=write_engine) + .to_pandas() + .astype("timedelta64[ns]") + ) + + pd.testing.assert_index_equal(actual_result, expected_index) + + @utils.skip_legacy_pandas @pytest.mark.parametrize( ("write_engine",), From cd97a94ec2e576e13c817a8f98d2bdd6e7fd5858 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Sat, 1 Feb 2025 00:54:36 +0000 Subject: [PATCH 02/10] fix format --- tests/system/small/test_session.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 3f9a04ad69..a4acb72117 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -704,8 +704,8 @@ def test_read_pandas_timedelta_dataframes(session, write_engine): .astype("timedelta64[ns]") ) - if write_engine == 'bigquery_streaming': - expected_df.index = pd.Index([pd.NA] * 3, dtype='Int64') + if write_engine == "bigquery_streaming": + expected_df.index = pd.Index([pd.NA] * 3, dtype="Int64") pd.testing.assert_frame_equal(actual_result, expected_df, check_index_type=False) @@ -722,9 +722,11 @@ def test_read_pandas_timedelta_series(session, write_engine): .astype("timedelta64[ns]") ) - if write_engine == 'bigquery_streaming': - expected_series.index = pd.Index([pd.NA] * 3, dtype='Int64') - pd.testing.assert_series_equal(actual_result, expected_series, check_index_type=False) + if write_engine == "bigquery_streaming": + expected_series.index = pd.Index([pd.NA] * 3, dtype="Int64") + pd.testing.assert_series_equal( + actual_result, expected_series, check_index_type=False + ) @pytest.mark.parametrize( @@ -732,7 +734,9 @@ def test_read_pandas_timedelta_series(session, write_engine): ["default", "bigquery_inline", "bigquery_load"], ) def test_read_pandas_timedelta_index(session, write_engine): - expected_index = pd.to_timedelta([1, 2, 3], unit="d") # to_timedelta returns an index + expected_index = pd.to_timedelta( + [1, 2, 3], unit="d" + ) # to_timedelta returns an index actual_result = ( session.read_pandas(expected_index, write_engine=write_engine) From 665288c4bc157413078e755cfedc5b71744ed87f Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Sat, 1 Feb 2025 01:22:20 +0000 Subject: [PATCH 03/10] fix mypy error --- bigframes/session/_io/pandas.py | 2 +- bigframes/session/loader.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bigframes/session/_io/pandas.py b/bigframes/session/_io/pandas.py index 0e32f705cc..c5b4bf2e45 100644 --- a/bigframes/session/_io/pandas.py +++ b/bigframes/session/_io/pandas.py @@ -39,7 +39,7 @@ class DataFrameAndLabels: column_labels: Collection index_labels: Collection ordering_col: str - timedelta_cols: Collection + timedelta_cols: List[str] def _arrow_to_pandas_arrowdtype( diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 9710802caf..fe4813d223 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -176,7 +176,7 @@ def read_pandas_load_job( self._start_generic_job(load_job) destination_table = self._bqclient.get_table(load_table_destination) - col_type_overrides = { + col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = { col: bigframes.dtypes.TIMEDETLA_DTYPE for col in df_and_labels.timedelta_cols } @@ -235,7 +235,7 @@ def read_pandas_streaming( f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}" ) - col_type_overrides = { + col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = { col: bigframes.dtypes.TIMEDETLA_DTYPE for col in df_and_labels.timedelta_cols } From 9139be6b18d303f693ef506fba1a1aa316deb6c0 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 3 Feb 2025 22:47:29 +0000 Subject: [PATCH 04/10] centralize timedelta to microsecs replacement logic --- bigframes/core/compile/compiler.py | 6 +----- bigframes/core/schema.py | 4 ++-- bigframes/core/utils.py | 21 +++++++++++++++++++++ bigframes/session/_io/pandas.py | 23 +---------------------- 4 files changed, 25 insertions(+), 29 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 85abed7ec5..8a82a5ec8a 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -177,11 +177,7 @@ def compile_readlocal(self, node: nodes.ReadLocalNode): ) # Convert timedeltas to microseconds for compatibility with BigQuery - for col in array_as_pd.columns: - if pdtypes.is_timedelta64_dtype(array_as_pd[col].dtype): - array_as_pd[col] = array_as_pd[col].map(utils.timedelta_to_micros) - if pdtypes.is_timedelta64_dtype(array_as_pd.index.dtype): - array_as_pd.index = array_as_pd.index.map(utils.timedelta_to_micros) + _ = utils.replace_timedeltas_with_micros(array_as_pd) offsets = node.offsets_col.sql if node.offsets_col else None return compiled.UnorderedIR.from_pandas( diff --git a/bigframes/core/schema.py b/bigframes/core/schema.py index 4e55b0dcb8..e3808dfffd 100644 --- a/bigframes/core/schema.py +++ b/bigframes/core/schema.py @@ -41,10 +41,10 @@ class ArraySchema: def from_bq_table( cls, table: google.cloud.bigquery.Table, - override_types: typing.Dict[str, bigframes.dtypes.Dtype] = {}, + column_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {}, ): items = tuple( - SchemaItem(name, override_types.get(name, dtype)) + SchemaItem(name, column_type_overrides.get(name, dtype)) for name, dtype in bigframes.dtypes.bf_type_from_type_kind( table.schema ).items() diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index 074a90b8c1..bd34cc5ade 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -20,6 +20,7 @@ import bigframes_vendored.pandas.io.common as vendored_pandas_io_common import pandas as pd import typing_extensions +import pandas.api.types as pdtypes import bigframes.exceptions as bfe @@ -188,3 +189,23 @@ def wrapper(*args, **kwargs): def timedelta_to_micros(td: pd.Timedelta) -> int: return round(td.total_seconds() * 1_000_000) + +def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]: + """ + Replaces in-place timedeltas to their nearest integer values in microseconds. + + Returns: + The names of updated columns + """ + updated_columns = [] + + for col in dataframe.columns: + if pdtypes.is_timedelta64_dtype(dataframe[col].dtype): + dataframe[col] = dataframe[col].apply(timedelta_to_micros) + updated_columns.append(col) + + if pdtypes.is_timedelta64_dtype(dataframe.index.dtype): + dataframe.index = dataframe.index.map(timedelta_to_micros) + updated_columns.append(dataframe.index.name) + + return updated_columns \ No newline at end of file diff --git a/bigframes/session/_io/pandas.py b/bigframes/session/_io/pandas.py index c5b4bf2e45..cc54cbd35c 100644 --- a/bigframes/session/_io/pandas.py +++ b/bigframes/session/_io/pandas.py @@ -165,7 +165,7 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL pandas_dataframe_copy.columns = pandas.Index(new_col_ids) pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0]) - timedelta_cols = _replace_timedeltas_with_micros(pandas_dataframe_copy) + timedelta_cols = utils.replace_timedeltas_with_micros(pandas_dataframe_copy) return DataFrameAndLabels( df=pandas_dataframe_copy, @@ -174,24 +174,3 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL ordering_col=ordering_col, timedelta_cols=timedelta_cols, ) - - -def _replace_timedeltas_with_micros(dataframe: pandas.DataFrame) -> List[str]: - """ - Replaces in-place timedeltas to their nearest integer values in microseconds. - - Returns: - The names of updated columns - """ - updated_columns = [] - - for col in dataframe.columns: - if pdtypes.is_timedelta64_dtype(dataframe[col].dtype): - dataframe[col] = dataframe[col].apply(utils.timedelta_to_micros) - updated_columns.append(col) - - if pdtypes.is_timedelta64_dtype(dataframe.index.dtype): - dataframe.index = dataframe.index.map(utils.timedelta_to_micros) - updated_columns.append(dataframe.index.name) - - return updated_columns From 1f8526af336a5e8d5359986ca2978eadb89d5a9c Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 3 Feb 2025 22:49:45 +0000 Subject: [PATCH 05/10] fix format --- bigframes/core/utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index bd34cc5ade..85da993544 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -19,8 +19,8 @@ import bigframes_vendored.pandas.io.common as vendored_pandas_io_common import pandas as pd -import typing_extensions import pandas.api.types as pdtypes +import typing_extensions import bigframes.exceptions as bfe @@ -190,6 +190,7 @@ def wrapper(*args, **kwargs): def timedelta_to_micros(td: pd.Timedelta) -> int: return round(td.total_seconds() * 1_000_000) + def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]: """ Replaces in-place timedeltas to their nearest integer values in microseconds. @@ -208,4 +209,4 @@ def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]: dataframe.index = dataframe.index.map(timedelta_to_micros) updated_columns.append(dataframe.index.name) - return updated_columns \ No newline at end of file + return updated_columns From 4a24ed6d75affb5505e3d42c49278f4fe56d1545 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Mon, 3 Feb 2025 22:50:36 +0000 Subject: [PATCH 06/10] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- bigframes/core/utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index bd34cc5ade..85da993544 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -19,8 +19,8 @@ import bigframes_vendored.pandas.io.common as vendored_pandas_io_common import pandas as pd -import typing_extensions import pandas.api.types as pdtypes +import typing_extensions import bigframes.exceptions as bfe @@ -190,6 +190,7 @@ def wrapper(*args, **kwargs): def timedelta_to_micros(td: pd.Timedelta) -> int: return round(td.total_seconds() * 1_000_000) + def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]: """ Replaces in-place timedeltas to their nearest integer values in microseconds. @@ -208,4 +209,4 @@ def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]: dataframe.index = dataframe.index.map(timedelta_to_micros) updated_columns.append(dataframe.index.name) - return updated_columns \ No newline at end of file + return updated_columns From 10bd59342242878420d90ad42e578f5623e4a125 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 3 Feb 2025 22:53:01 +0000 Subject: [PATCH 07/10] remove redundant imports --- bigframes/core/compile/compiler.py | 1 - bigframes/session/_io/pandas.py | 1 - 2 files changed, 2 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 8a82a5ec8a..a72ca47190 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -23,7 +23,6 @@ import bigframes_vendored.ibis.expr.types as ibis_types import google.cloud.bigquery import pandas as pd -import pandas.api.types as pdtypes from bigframes.core import utils import bigframes.core.compile.compiled as compiled diff --git a/bigframes/session/_io/pandas.py b/bigframes/session/_io/pandas.py index cc54cbd35c..532a909430 100644 --- a/bigframes/session/_io/pandas.py +++ b/bigframes/session/_io/pandas.py @@ -21,7 +21,6 @@ import geopandas # type: ignore import numpy as np import pandas -import pandas.api.types as pdtypes import pandas.arrays import pyarrow # type: ignore import pyarrow.compute # type: ignore From 3c25a0451d258270bcf8712a437cc974876b92ca Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 3 Feb 2025 23:08:07 +0000 Subject: [PATCH 08/10] polish todo comment --- bigframes/session/loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index dafaf57293..ba693696c3 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -182,7 +182,7 @@ def read_pandas_load_job( } array_value = core.ArrayValue.from_table( table=destination_table, - # TODO: Generate this directly from original pandas df. + # TODO (b/394156190): Generate this directly from original pandas df. schema=schemata.ArraySchema.from_bq_table( destination_table, col_type_overrides ), From 26b6e210ddb169dcfc80330379d7be6ae009a315 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 3 Feb 2025 23:28:50 +0000 Subject: [PATCH 09/10] update timdelta to microsecond conversion algo --- bigframes/core/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index 85da993544..413488b3d3 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -188,7 +188,8 @@ def wrapper(*args, **kwargs): def timedelta_to_micros(td: pd.Timedelta) -> int: - return round(td.total_seconds() * 1_000_000) + # td.value returns total nanoseconds. + return td.value // 1000 def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]: From 5cb7c3f9be305fb52d599f05876e4933f0f1a7f8 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 3 Feb 2025 23:37:03 +0000 Subject: [PATCH 10/10] update python doc --- bigframes/core/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index 413488b3d3..7cb2ec7535 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -194,7 +194,7 @@ def timedelta_to_micros(td: pd.Timedelta) -> int: def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]: """ - Replaces in-place timedeltas to their nearest integer values in microseconds. + Replaces in-place timedeltas to integer values in microseconds. Nanosecond part is ignored. Returns: The names of updated columns