From c71a061b6d8078b8be4f0357178c920e4cfc6fe3 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 20 Sep 2023 14:21:27 -0500 Subject: [PATCH 1/4] perf: inline small `Series` and `DataFrames` in query text The prevents unnecessary load and query jobs. Towards internal issue 296474170. --- bigframes/core/__init__.py | 62 +++++++++++++++++++++----- bigframes/core/blocks.py | 55 ++++++++++++----------- bigframes/dataframe.py | 4 +- bigframes/dtypes.py | 3 ++ bigframes/operations/base.py | 4 +- setup.py | 4 +- testing/constraints-3.9.txt | 2 +- tests/unit/core/__init__.py | 13 ++++++ tests/unit/core/test_blocks.py | 79 ++++++++++++++++++++++++++++++++++ 9 files changed, 178 insertions(+), 48 deletions(-) create mode 100644 tests/unit/core/__init__.py create mode 100644 tests/unit/core/test_blocks.py diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index dd91f80e63..d374ae97b2 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -144,21 +144,56 @@ def mem_expr_from_pandas( """ Builds an in-memory only (SQL only) expr from a pandas dataframe. - Caution: If session is None, only a subset of expr functionality will be available (null Session is usually not supported). + Caution: If session is None, only a subset of expr functionality will + be available (null Session is usually not supported). """ - # must set non-null column labels. these are not the user-facing labels - pd_df = pd_df.set_axis( - [column or bigframes.core.guid.generate_guid() for column in pd_df.columns], - axis="columns", - ) + # We can't include any hidden columns in the ArrayValue constructor, so + # grab the column names before we add the hidden ordering column. + column_names = [str(column) for column in pd_df.columns] + # Make sure column names are all strings. + pd_df = pd_df.set_axis(column_names, axis="columns") pd_df = pd_df.assign(**{ORDER_ID_COLUMN: range(len(pd_df))}) + # ibis memtable cannot handle NA, must convert to None pd_df = pd_df.astype("object") # type: ignore pd_df = pd_df.where(pandas.notnull(pd_df), None) + + # NULL type isn't valid in BigQuery, so retry with an explicit schema in these cases. keys_memtable = ibis.memtable(pd_df) + schema = keys_memtable.schema() + new_schema = [] + for column_index, column in enumerate(schema): + if column == ORDER_ID_COLUMN: + new_type = ibis_dtypes.int64 + else: + column_type = schema[column] + # The autodetected type might not be one we can support, such + # as NULL type for empty rows, so convert to a type we do + # support. + new_type = bigframes.dtypes.bigframes_dtype_to_ibis_dtype( + bigframes.dtypes.ibis_dtype_to_bigframes_dtype(column_type) + ) + # TODO(swast): Ibis memtable doesn't use backticks in struct + # field names, so spaces and other characters aren't allowed in + # the memtable context. Blocked by + # https://github.com/ibis-project/ibis/issues/7187 + column = f"col_{column_index}" + new_schema.append((column, new_type)) + + # must set non-null column labels. these are not the user-facing labels + pd_df = pd_df.set_axis( + [column for column, _ in new_schema], + axis="columns", + ) + keys_memtable = ibis.memtable(pd_df, schema=ibis.schema(new_schema)) + return cls( session, # type: ignore # Session cannot normally be none, see "caution" above keys_memtable, + columns=[ + keys_memtable[f"col_{column_index}"].name(column) + for column_index, column in enumerate(column_names) + ], ordering=ExpressionOrdering( ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)], total_ordering_columns=frozenset([ORDER_ID_COLUMN]), @@ -426,11 +461,16 @@ def shape(self) -> typing.Tuple[int, int]: width = len(self.columns) count_expr = self._to_ibis_expr(ordering_mode="unordered").count() sql = self._session.ibis_client.compile(count_expr) - row_iterator, _ = self._session._start_query( - sql=sql, - max_results=1, - ) - length = next(row_iterator)[0] + + # Support in-memory engines for hermetic unit tests. + if not isinstance(sql, str): + length = self._session.ibis_client.execute(count_expr) + else: + row_iterator, _ = self._session._start_query( + sql=sql, + max_results=1, + ) + length = next(row_iterator)[0] return (length, width) def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue: diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index e691a30f9c..f9f21208c7 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -44,6 +44,7 @@ import bigframes.dtypes import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops +import third_party.bigframes_vendored.pandas.io.common as vendored_pandas_io_common # Type constraint for wherever column labels are used Label = typing.Hashable @@ -1522,37 +1523,35 @@ def _is_monotonic( return result -def block_from_local(data, session=None, use_index=True) -> Block: - # TODO(tbergeron): Handle duplicate column labels +def block_from_local(data, session=None) -> Block: pd_data = pd.DataFrame(data) + columns = pd_data.columns - column_labels = list(pd_data.columns) - if not all((label is None) or isinstance(label, str) for label in column_labels): - raise NotImplementedError( - f"Only string column labels supported. {constants.FEEDBACK_LINK}" - ) + # Make a flattened version to treat as a table. + if len(pd_data.columns.names) > 1: + pd_data.columns = columns.to_flat_index() - if use_index: - if pd_data.index.nlevels > 1: - raise NotImplementedError( - f"multi-indices not supported. {constants.FEEDBACK_LINK}" - ) - index_label = pd_data.index.name - - index_id = guid.generate_guid() - pd_data = pd_data.reset_index(names=index_id) - keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session) - return Block( - keys_expr, - column_labels=column_labels, - index_columns=[index_id], - index_labels=[index_label], - ) - else: - keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session) - keys_expr, offsets_id = keys_expr.promote_offsets() - # Constructor will create default range index - return Block(keys_expr, index_columns=[offsets_id], column_labels=column_labels) + index_labels = list(pd_data.index.names) + # The ArrayValue layer doesn't know about indexes, so make sure indexes + # are real columns with unique IDs. + pd_data = pd_data.reset_index( + names=[f"level_{level}" for level in range(len(index_labels))] + ) + pd_data = pd_data.set_axis( + vendored_pandas_io_common.dedup_names( + pd_data.columns, is_potential_multiindex=False + ), + axis="columns", + ) + index_ids = pd_data.columns[: len(index_labels)] + + keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session) + return Block( + keys_expr, + column_labels=columns, + index_columns=index_ids, + index_labels=index_labels, + ) def _align_block_to_schema( diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 0b741feff6..76377cd477 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -65,9 +65,7 @@ # BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type. # TODO(tbergeron): Convert to bytes-based limit -# TODO(swast): Address issues with string escaping and empty tables before -# re-enabling inline data (ibis.memtable) feature. -MAX_INLINE_DF_SIZE = -1 +MAX_INLINE_DF_SIZE = 5000 LevelType = typing.Union[str, int] LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index af3209b0e1..9b07e85442 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -155,6 +155,9 @@ def ibis_dtype_to_bigframes_dtype( if ibis_dtype in IBIS_TO_BIGFRAMES: return IBIS_TO_BIGFRAMES[ibis_dtype] + elif isinstance(ibis_dtype, ibis_dtypes.Null): + # Fallback to STRING for NULL values for most flexibility in SQL. + return IBIS_TO_BIGFRAMES[ibis_dtypes.string] else: raise ValueError( f"Unexpected Ibis data type {ibis_dtype}. {constants.FEEDBACK_LINK}" diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index 81a5bc4c41..add6af57f4 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -30,9 +30,7 @@ # BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type. # TODO(tbergeron): Convert to bytes-based limit -# TODO(swast): Address issues with string escaping and empty tables before -# re-enabling inline data (ibis.memtable) feature. -MAX_INLINE_SERIES_SIZE = -1 +MAX_INLINE_SERIES_SIZE = 5000 class SeriesMethods: diff --git a/setup.py b/setup.py index 69b71c88f1..29eacb74a9 100644 --- a/setup.py +++ b/setup.py @@ -44,12 +44,12 @@ "google-cloud-resource-manager >=1.10.3", "google-cloud-storage >=2.0.0", # TODO: Relax upper bound once we have fixed `system_prerelease` tests. - "ibis-framework[bigquery] >=6.0.0,<=6.1.0", + "ibis-framework[bigquery] >=6.2.0,<7.0.0dev", "pandas >=1.5.0", "pydata-google-auth >=1.8.2", "requests >=2.27.1", "scikit-learn >=1.2.2", - "sqlalchemy >=1.4,<3.0", + "sqlalchemy >=1.4,<3.0dev", "ipywidgets >=7.7.1", "humanize >= 4.6.0", ] diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index cd69d45dc9..f43d3b4ca0 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -45,7 +45,7 @@ greenlet==2.0.2 grpc-google-iam-v1==0.12.6 grpcio==1.53.0 grpcio-status==1.48.2 -ibis-framework==6.0.0 +ibis-framework==6.2.0 humanize==4.6.0 identify==2.5.22 idna==3.4 diff --git a/tests/unit/core/__init__.py b/tests/unit/core/__init__.py new file mode 100644 index 0000000000..1dc90d1848 --- /dev/null +++ b/tests/unit/core/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/unit/core/test_blocks.py b/tests/unit/core/test_blocks.py new file mode 100644 index 0000000000..e58a00e30f --- /dev/null +++ b/tests/unit/core/test_blocks.py @@ -0,0 +1,79 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas +import pandas.testing +import pytest + +import bigframes.core.blocks as blocks + +from .. import resources + + +@pytest.mark.parametrize( + ("data",), + ( + pytest.param( + {"test 1": [1, 2, 3], "test 2": [0.25, 0.5, 0.75]}, + id="dict_spaces_in_column_names", + ), + pytest.param( + [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]], + id="nested_list", + ), + pytest.param( + pandas.concat( + [ + pandas.Series([1, 2, 3], name="some col"), + pandas.Series([2, 3, 4], name="some col"), + ], + axis="columns", + ), + id="duplicate_column_names", + ), + pytest.param( + pandas.DataFrame( + {"test": [1, 2, 3]}, + index=pandas.Index(["a", "b", "c"], name="string index"), + ), + id="string_index", + ), + pytest.param( + pandas.DataFrame( + [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]], + columns=pandas.MultiIndex.from_tuples( + [(1, 1), (1, 2), (0, 0), (0, 1)], + names=["some level", "another level"], + ), + ), + id="multiindex_columns", + ), + pytest.param( + pandas.DataFrame( + {"test": [1, 2, 3]}, + index=pandas.MultiIndex.from_tuples([(1, 1), (1, 2), (0, 0)]), + ), + id="multiindex_rows", + ), + ), +) +def test_block_from_local(data): + expected = pandas.DataFrame(data) + session = resources.create_pandas_session({}) + + block = blocks.block_from_local(data, session=session) + + pandas.testing.assert_index_equal(block.column_labels, expected.columns) + assert tuple(block.index_labels) == tuple(expected.index.names) + assert block.shape == expected.shape From ca382a8eb8d7fe11cec4a664171f466a1b16de8b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 21 Sep 2023 12:36:44 -0500 Subject: [PATCH 2/4] skip column multiindex test on pandas 1.x --- tests/unit/core/test_blocks.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/unit/core/test_blocks.py b/tests/unit/core/test_blocks.py index e58a00e30f..d0a6155ec4 100644 --- a/tests/unit/core/test_blocks.py +++ b/tests/unit/core/test_blocks.py @@ -57,6 +57,12 @@ names=["some level", "another level"], ), ), + marks=[ + pytest.mark.skipif( + pandas.__version__.split() < ("2", "0", "0"), + reason="pandas 1.5.3 treats column MultiIndex as Index of tuples", + ), + ], id="multiindex_columns", ), pytest.param( From 0ce3825885f4519b080ef732f0580daf20e0c1ef Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 21 Sep 2023 13:01:28 -0500 Subject: [PATCH 3/4] fix version comparison --- tests/unit/core/test_blocks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/core/test_blocks.py b/tests/unit/core/test_blocks.py index d0a6155ec4..a7e9b5a84b 100644 --- a/tests/unit/core/test_blocks.py +++ b/tests/unit/core/test_blocks.py @@ -59,7 +59,7 @@ ), marks=[ pytest.mark.skipif( - pandas.__version__.split() < ("2", "0", "0"), + tuple(pandas.__version__.split()) < ("2", "0", "0"), reason="pandas 1.5.3 treats column MultiIndex as Index of tuples", ), ], From b1dadadc91ecd85b1136ee8b72d2f86727650a57 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 21 Sep 2023 13:48:10 -0500 Subject: [PATCH 4/4] fix mypy --- bigframes/core/__init__.py | 2 +- bigframes/core/blocks.py | 2 +- bigframes/dtypes.py | 18 ++++++++++-------- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index d374ae97b2..3b3754642e 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -164,7 +164,7 @@ def mem_expr_from_pandas( new_schema = [] for column_index, column in enumerate(schema): if column == ORDER_ID_COLUMN: - new_type = ibis_dtypes.int64 + new_type: ibis_dtypes.DataType = ibis_dtypes.int64 else: column_type = schema[column] # The autodetected type might not be one we can support, such diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index f9f21208c7..ad4f72070f 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1539,7 +1539,7 @@ def block_from_local(data, session=None) -> Block: ) pd_data = pd_data.set_axis( vendored_pandas_io_common.dedup_names( - pd_data.columns, is_potential_multiindex=False + list(pd_data.columns), is_potential_multiindex=False ), axis="columns", ) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 9b07e85442..271b8aa2f2 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -96,13 +96,13 @@ ), ) -BIGFRAMES_TO_IBIS: Dict[Dtype, IbisDtype] = { +BIGFRAMES_TO_IBIS: Dict[Dtype, ibis_dtypes.DataType] = { pandas: ibis for ibis, pandas in BIDIRECTIONAL_MAPPINGS } -IBIS_TO_BIGFRAMES: Dict[ - Union[IbisDtype, ReadOnlyIbisDtype], Union[Dtype, np.dtype[Any]] -] = {ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS} +IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, Union[Dtype, np.dtype[Any]]] = { + ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS +} # Allow REQUIRED fields to map correctly. IBIS_TO_BIGFRAMES.update( {ibis.copy(nullable=False): pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS} @@ -130,7 +130,7 @@ def ibis_dtype_to_bigframes_dtype( - ibis_dtype: Union[IbisDtype, ReadOnlyIbisDtype] + ibis_dtype: ibis_dtypes.DataType, ) -> Union[Dtype, np.dtype[Any]]: """Converts an Ibis dtype to a BigQuery DataFrames dtype @@ -188,8 +188,8 @@ def ibis_table_to_canonical_types(table: ibis_types.Table) -> ibis_types.Table: def bigframes_dtype_to_ibis_dtype( - bigframes_dtype: Union[DtypeString, Dtype] -) -> IbisDtype: + bigframes_dtype: Union[DtypeString, Dtype, np.dtype[Any]] +) -> ibis_dtypes.DataType: """Converts a BigQuery DataFrames supported dtype to an Ibis dtype. Args: @@ -284,7 +284,9 @@ def literal_to_ibis_scalar( return scalar_expr -def cast_ibis_value(value: ibis_types.Value, to_type: IbisDtype) -> ibis_types.Value: +def cast_ibis_value( + value: ibis_types.Value, to_type: ibis_dtypes.DataType +) -> ibis_types.Value: """Perform compatible type casts of ibis values Args: