diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index cef426aea8..da1c174bc4 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -265,7 +265,13 @@ def _adapt_pandas_series( ) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]: # Mostly rely on pyarrow conversions, but have to convert geo without its help. if series.dtype == bigframes.dtypes.GEO_DTYPE: - series = geopandas.GeoSeries(series).to_wkt(rounding_precision=-1) + # geoseries produces eg "POINT (1, 1)", while bq uses style "POINT(1, 1)" + # we normalize to bq style for consistency + series = ( + geopandas.GeoSeries(series) + .to_wkt(rounding_precision=-1) + .str.replace(r"(\w+) \(", repl=r"\1(", regex=True) + ) return pa.array(series, type=pa.string()), bigframes.dtypes.GEO_DTYPE try: return _adapt_arrow_array(pa.array(series)) @@ -326,7 +332,7 @@ def _adapt_arrow_array(array: pa.Array) -> tuple[pa.Array, bigframes.dtypes.Dtyp return new_value.fill_null([]), bigframes.dtypes.list_type(values_type) if array.type == bigframes.dtypes.JSON_ARROW_TYPE: return _canonicalize_json(array), bigframes.dtypes.JSON_DTYPE - target_type = _logical_type_replacements(array.type) + target_type = logical_type_replacements(array.type) if target_type != array.type: # TODO: Maybe warn if lossy conversion? array = array.cast(target_type) @@ -372,6 +378,10 @@ def recursive_f(type: pa.DataType) -> pa.DataType: if new_field_t != type.value_type: return pa.list_(new_field_t) return type + # polars can produce large lists, and we want to map these down to regular lists + if pa.types.is_large_list(type): + new_field_t = recursive_f(type.value_type) + return pa.list_(new_field_t) if pa.types.is_struct(type): struct_type = cast(pa.StructType, type) new_fields: list[pa.Field] = [] @@ -385,7 +395,7 @@ def recursive_f(type: pa.DataType) -> pa.DataType: @_recursive_map_types -def _logical_type_replacements(type: pa.DataType) -> pa.DataType: +def logical_type_replacements(type: pa.DataType) -> pa.DataType: if pa.types.is_timestamp(type): # This is potentially lossy, but BigFrames doesn't support ns new_tz = "UTC" if (type.tz is not None) else None @@ -403,8 +413,11 @@ def _logical_type_replacements(type: pa.DataType) -> pa.DataType: if pa.types.is_large_string(type): # simple string type can handle the largest strings needed return pa.string() + if pa.types.is_large_binary(type): + # simple string type can handle the largest strings needed + return pa.binary() if pa.types.is_dictionary(type): - return _logical_type_replacements(type.value_type) + return logical_type_replacements(type.value_type) if pa.types.is_null(type): # null as a type not allowed, default type is float64 for bigframes return pa.float64() diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 9e5ed12dfe..9dcd74182b 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -601,6 +601,10 @@ class ScanList: items: typing.Tuple[ScanItem, ...] + @classmethod + def from_items(cls, items: Iterable[ScanItem]) -> ScanList: + return cls(tuple(items)) + def filter_cols( self, ids: AbstractSet[identifiers.ColumnId], diff --git a/bigframes/core/pyarrow_utils.py b/bigframes/core/pyarrow_utils.py index bcbffdc78c..4196e68304 100644 --- a/bigframes/core/pyarrow_utils.py +++ b/bigframes/core/pyarrow_utils.py @@ -94,3 +94,9 @@ def append_offsets( return pa_table.append_column( offsets_col, pa.array(range(pa_table.num_rows), type=pa.int64()) ) + + +def as_nullable(pa_table: pa.Table): + """Normalizes schema to nullable for value-wise comparisons.""" + nullable_schema = pa.schema(field.with_nullable(True) for field in pa_table.schema) + return pa_table.cast(nullable_schema) diff --git a/bigframes/session/direct_gbq_execution.py b/bigframes/session/direct_gbq_execution.py new file mode 100644 index 0000000000..4b19f7441d --- /dev/null +++ b/bigframes/session/direct_gbq_execution.py @@ -0,0 +1,76 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Optional, Tuple + +from google.cloud import bigquery +import google.cloud.bigquery.job as bq_job +import google.cloud.bigquery.table as bq_table + +from bigframes.core import compile, nodes +from bigframes.session import executor, semi_executor +import bigframes.session._io.bigquery as bq_io + + +# used only in testing right now, BigQueryCachingExecutor is the fully featured engine +# simplified, doesnt not do large >10 gb result queries, error handling, respect global config +# or record metrics. Also avoids caching, and most pre-compile rewrites, to better serve as a +# reference for validating more complex executors. +class DirectGbqExecutor(semi_executor.SemiExecutor): + def __init__(self, bqclient: bigquery.Client): + self.bqclient = bqclient + + def execute( + self, + plan: nodes.BigFrameNode, + ordered: bool, + peek: Optional[int] = None, + ) -> executor.ExecuteResult: + """Just execute whatever plan as is, without further caching or decomposition.""" + # TODO(swast): plumb through the api_name of the user-facing api that + # caused this query. + + compiled = compile.compile_sql( + compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek) + ) + iterator, query_job = self._run_execute_query( + sql=compiled.sql, + ) + + return executor.ExecuteResult( + arrow_batches=iterator.to_arrow_iterable(), + schema=plan.schema, + query_job=query_job, + total_rows=iterator.total_rows, + ) + + def _run_execute_query( + self, + sql: str, + job_config: Optional[bq_job.QueryJobConfig] = None, + ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: + """ + Starts BigQuery query job and waits for results. + """ + return bq_io.start_query_with_client( + self.bqclient, + sql, + job_config=job_config or bq_job.QueryJobConfig(), + project=None, + location=None, + timeout=None, + metrics=None, + query_with_job=False, + ) diff --git a/bigframes/session/polars_executor.py b/bigframes/session/polars_executor.py new file mode 100644 index 0000000000..e215866874 --- /dev/null +++ b/bigframes/session/polars_executor.py @@ -0,0 +1,80 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Optional, TYPE_CHECKING + +import pyarrow as pa + +from bigframes.core import array_value, bigframe_node, local_data, nodes +from bigframes.session import executor, semi_executor + +if TYPE_CHECKING: + import polars as pl + + +_COMPATIBLE_NODES = ( + nodes.ReadLocalNode, + nodes.OrderByNode, + nodes.ReversedNode, + nodes.SelectionNode, + nodes.FilterNode, # partial support + nodes.ProjectionNode, # partial support +) + + +class PolarsExecutor(semi_executor.SemiExecutor): + def __init__(self): + # This will error out if polars is not installed + from bigframes.core.compile.polars import PolarsCompiler + + self._compiler = PolarsCompiler() + + def execute( + self, + plan: bigframe_node.BigFrameNode, + ordered: bool, + peek: Optional[int] = None, + ) -> Optional[executor.ExecuteResult]: + if not self._can_execute(plan): + return None + # Note: Ignoring ordered flag, as just executing totally ordered is fine. + try: + lazy_frame: pl.LazyFrame = self._compiler.compile( + array_value.ArrayValue(plan) + ) + except Exception: + return None + if peek is not None: + lazy_frame = lazy_frame.limit(peek) + pa_table = lazy_frame.collect().to_arrow() + return executor.ExecuteResult( + arrow_batches=iter(map(self._adapt_batch, pa_table.to_batches())), + schema=plan.schema, + total_bytes=pa_table.nbytes, + total_rows=pa_table.num_rows, + ) + + def _can_execute(self, plan: bigframe_node.BigFrameNode): + return all(isinstance(node, _COMPATIBLE_NODES) for node in plan.unique_nodes()) + + def _adapt_array(self, array: pa.Array) -> pa.Array: + target_type = local_data.logical_type_replacements(array.type) + if target_type != array.type: + return array.cast(target_type) + return array + + def _adapt_batch(self, batch: pa.RecordBatch) -> pa.RecordBatch: + new_arrays = [self._adapt_array(arr) for arr in batch.columns] + return pa.RecordBatch.from_arrays(new_arrays, names=batch.column_names) diff --git a/noxfile.py b/noxfile.py index e3cfbf83a4..dee5f929b7 100644 --- a/noxfile.py +++ b/noxfile.py @@ -108,8 +108,8 @@ SYSTEM_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = { "3.9": ["tests"], "3.10": ["tests"], - "3.12": ["tests", "scikit-learn"], - "3.13": ["tests"], + "3.12": ["tests", "scikit-learn", "polars"], + "3.13": ["tests", "polars"], } LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME" diff --git a/tests/system/small/engines/__init__.py b/tests/system/small/engines/__init__.py new file mode 100644 index 0000000000..0a2669d7a2 --- /dev/null +++ b/tests/system/small/engines/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/system/small/engines/conftest.py b/tests/system/small/engines/conftest.py new file mode 100644 index 0000000000..2a72cb2196 --- /dev/null +++ b/tests/system/small/engines/conftest.py @@ -0,0 +1,81 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pathlib +from typing import Generator + +from google.cloud import bigquery +import pandas as pd +import pytest + +import bigframes +from bigframes.core import local_data +from bigframes.session import ( + direct_gbq_execution, + local_scan_executor, + polars_executor, + semi_executor, +) + +CURRENT_DIR = pathlib.Path(__file__).parent +DATA_DIR = CURRENT_DIR.parent.parent.parent / "data" + + +@pytest.fixture(scope="module") +def fake_session() -> Generator[bigframes.Session, None, None]: + import bigframes.core.global_session + + # its a "polars session", but we are bypassing session-provided execution + # we just want a minimal placeholder session without expensive setup + from bigframes.testing import polars_session + + session = polars_session.TestSession() + with bigframes.core.global_session._GlobalSessionContext(session): + yield session + + +@pytest.fixture(scope="session", params=["pyarrow", "polars", "bq"]) +def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecutor: + if request.param == "pyarrow": + return local_scan_executor.LocalScanExecutor() + if request.param == "polars": + return polars_executor.PolarsExecutor() + if request.param == "bq": + return direct_gbq_execution.DirectGbqExecutor(bigquery_client) + raise ValueError(f"Unrecognized param: {request.param}") + + +@pytest.fixture(scope="module") +def managed_data_source( + scalars_pandas_df_index: pd.DataFrame, +) -> local_data.ManagedArrowTable: + return local_data.ManagedArrowTable.from_pandas(scalars_pandas_df_index) + + +@pytest.fixture(scope="module") +def zero_row_source() -> local_data.ManagedArrowTable: + return local_data.ManagedArrowTable.from_pandas(pd.DataFrame({"a": [], "b": []})) + + +@pytest.fixture(scope="module") +def nested_data_source( + nested_pandas_df: pd.DataFrame, +) -> local_data.ManagedArrowTable: + return local_data.ManagedArrowTable.from_pandas(nested_pandas_df) + + +@pytest.fixture(scope="module") +def repeated_data_source( + repeated_pandas_df: pd.DataFrame, +) -> local_data.ManagedArrowTable: + return local_data.ManagedArrowTable.from_pandas(repeated_pandas_df) diff --git a/tests/system/small/engines/test_read_local.py b/tests/system/small/engines/test_read_local.py new file mode 100644 index 0000000000..7bf1316a44 --- /dev/null +++ b/tests/system/small/engines/test_read_local.py @@ -0,0 +1,132 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import bigframes +from bigframes.core import identifiers, local_data, nodes +from bigframes.session import polars_executor, semi_executor + +pytest.importorskip("polars") + +# Polars used as reference as its fast and local. Generally though, prefer gbq engine where they disagree. +REFERENCE_ENGINE = polars_executor.PolarsExecutor() + + +def ensure_equivalence( + node: nodes.BigFrameNode, + engine1: semi_executor.SemiExecutor, + engine2: semi_executor.SemiExecutor, +): + e1_result = engine1.execute(node, ordered=True) + e2_result = engine2.execute(node, ordered=True) + assert e1_result is not None + assert e2_result is not None + # Schemas might have extra nullity markers, normalize to node expected schema, which should be looser + e1_table = e1_result.to_arrow_table().cast(node.schema.to_pyarrow()) + e2_table = e2_result.to_arrow_table().cast(node.schema.to_pyarrow()) + assert e1_table.equals(e2_table), f"{e1_table} is not equal to {e2_table}" + + +def test_engines_read_local( + fake_session: bigframes.Session, + managed_data_source: local_data.ManagedArrowTable, + engine, +): + scan_list = nodes.ScanList.from_items( + nodes.ScanItem(identifiers.ColumnId(item.column), item.dtype, item.column) + for item in managed_data_source.schema.items + ) + local_node = nodes.ReadLocalNode( + managed_data_source, scan_list, fake_session, offsets_col=None + ) + ensure_equivalence(local_node, REFERENCE_ENGINE, engine) + + +def test_engines_read_local_w_offsets( + fake_session: bigframes.Session, + managed_data_source: local_data.ManagedArrowTable, + engine, +): + scan_list = nodes.ScanList.from_items( + nodes.ScanItem(identifiers.ColumnId(item.column), item.dtype, item.column) + for item in managed_data_source.schema.items + ) + local_node = nodes.ReadLocalNode( + managed_data_source, + scan_list, + fake_session, + offsets_col=identifiers.ColumnId("offsets"), + ) + ensure_equivalence(local_node, REFERENCE_ENGINE, engine) + + +def test_engines_read_local_w_col_subset( + fake_session: bigframes.Session, + managed_data_source: local_data.ManagedArrowTable, + engine, +): + scan_list = nodes.ScanList.from_items( + nodes.ScanItem(identifiers.ColumnId(item.column), item.dtype, item.column) + for item in managed_data_source.schema.items[::-2] + ) + local_node = nodes.ReadLocalNode( + managed_data_source, scan_list, fake_session, offsets_col=None + ) + ensure_equivalence(local_node, REFERENCE_ENGINE, engine) + + +def test_engines_read_local_w_zero_row_source( + fake_session: bigframes.Session, + zero_row_source: local_data.ManagedArrowTable, + engine, +): + scan_list = nodes.ScanList.from_items( + nodes.ScanItem(identifiers.ColumnId(item.column), item.dtype, item.column) + for item in zero_row_source.schema.items + ) + local_node = nodes.ReadLocalNode( + zero_row_source, scan_list, fake_session, offsets_col=None + ) + ensure_equivalence(local_node, REFERENCE_ENGINE, engine) + + +def test_engines_read_local_w_nested_source( + fake_session: bigframes.Session, + nested_data_source: local_data.ManagedArrowTable, + engine, +): + scan_list = nodes.ScanList.from_items( + nodes.ScanItem(identifiers.ColumnId(item.column), item.dtype, item.column) + for item in nested_data_source.schema.items + ) + local_node = nodes.ReadLocalNode( + nested_data_source, scan_list, fake_session, offsets_col=None + ) + ensure_equivalence(local_node, REFERENCE_ENGINE, engine) + + +def test_engines_read_local_w_repeated_source( + fake_session: bigframes.Session, + repeated_data_source: local_data.ManagedArrowTable, + engine, +): + scan_list = nodes.ScanList.from_items( + nodes.ScanItem(identifiers.ColumnId(item.column), item.dtype, item.column) + for item in repeated_data_source.schema.items + ) + local_node = nodes.ReadLocalNode( + repeated_data_source, scan_list, fake_session, offsets_col=None + ) + ensure_equivalence(local_node, REFERENCE_ENGINE, engine) diff --git a/tests/unit/core/compile/sqlglot/snapshots/test_compile_readlocal/test_compile_readlocal/out.sql b/tests/unit/core/compile/sqlglot/snapshots/test_compile_readlocal/test_compile_readlocal/out.sql index d7e47b6032..a34f3526d6 100644 --- a/tests/unit/core/compile/sqlglot/snapshots/test_compile_readlocal/test_compile_readlocal/out.sql +++ b/tests/unit/core/compile/sqlglot/snapshots/test_compile_readlocal/test_compile_readlocal/out.sql @@ -7,7 +7,7 @@ WITH `bfcte_0` AS ( CAST(b'Hello, World!' AS BYTES), CAST('2021-07-21' AS DATE), CAST('2021-07-21T11:39:45' AS DATETIME), - ST_GEOGFROMTEXT('POINT (-122.0838511 37.3860517)'), + ST_GEOGFROMTEXT('POINT(-122.0838511 37.3860517)'), 123456789, 0, CAST(1.234567890 AS NUMERIC), @@ -24,7 +24,7 @@ WITH `bfcte_0` AS ( CAST(b'\xe3\x81\x93\xe3\x82\x93\xe3\x81\xab\xe3\x81\xa1\xe3\x81\xaf' AS BYTES), CAST('1991-02-03' AS DATE), CAST('1991-01-02T03:45:06' AS DATETIME), - ST_GEOGFROMTEXT('POINT (-71.104 42.315)'), + ST_GEOGFROMTEXT('POINT(-71.104 42.315)'), -987654321, 1, CAST(1.234567890 AS NUMERIC), @@ -41,7 +41,7 @@ WITH `bfcte_0` AS ( CAST(b'\xc2\xa1Hola Mundo!' AS BYTES), CAST('2023-03-01' AS DATE), CAST('2023-03-01T10:55:13' AS DATETIME), - ST_GEOGFROMTEXT('POINT (-0.124474760143016 51.5007826749545)'), + ST_GEOGFROMTEXT('POINT(-0.124474760143016 51.5007826749545)'), 314159, 0, CAST(101.101010100 AS NUMERIC), @@ -109,7 +109,7 @@ WITH `bfcte_0` AS ( CAST(b'Hello\tBigFrames!\x07' AS BYTES), CAST('2023-05-23' AS DATE), CAST('2023-05-23T11:37:01' AS DATETIME), - ST_GEOGFROMTEXT('LINESTRING (-0.127959 51.507728, -0.127026 51.507473)'), + ST_GEOGFROMTEXT('LINESTRING(-0.127959 51.507728, -0.127026 51.507473)'), 101202303, 2, CAST(-10.090807000 AS NUMERIC),