diff --git a/bigframes/core/compile/sqlglot/__init__.py b/bigframes/core/compile/sqlglot/__init__.py new file mode 100644 index 0000000000..2f40894975 --- /dev/null +++ b/bigframes/core/compile/sqlglot/__init__.py @@ -0,0 +1,18 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from bigframes.core.compile.sqlglot.compiler import SQLGlotCompiler + +__all__ = ["SQLGlotCompiler"] diff --git a/bigframes/core/compile/sqlglot/compiler.py b/bigframes/core/compile/sqlglot/compiler.py new file mode 100644 index 0000000000..fad0168e1d --- /dev/null +++ b/bigframes/core/compile/sqlglot/compiler.py @@ -0,0 +1,264 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import dataclasses +import functools +import io +import itertools +import typing + +import pandas as pd +import pyarrow as pa +import pyarrow.feather as feather +import sqlglot as sg +import sqlglot.expressions as sge + +import bigframes.core +from bigframes.core import utils +import bigframes.core.compile.sqlglot.scalar_op_compiler as scalar_op_compiler +import bigframes.core.compile.sqlglot.sqlglot_types as sgt +import bigframes.core.expression as ex +import bigframes.core.guid as guid +import bigframes.core.identifiers as ids +import bigframes.core.nodes as nodes +import bigframes.core.ordering +import bigframes.core.rewrite +import bigframes.core.rewrite as rewrites +import bigframes.dtypes as dtypes +import bigframes.operations as ops +import bigframes.operations.aggregations as agg_ops + + +@dataclasses.dataclass(frozen=True) +class SQLGlotCompiler: + """ + Compiles BigFrameNode to SQLGlot expression tree recursively. + """ + + # In strict mode, ordering will always be deterministic + # In unstrict mode, ordering from ReadTable or after joins may be ambiguous to improve query performance. + strict: bool = True + # Whether to always quote identifiers. + quoted: bool = True + # TODO: the way how the scalar operation compiles stop the non-recursive compiler. + # Define scalar compiler for converting bigframes expressions to sqlglot expressions. + scalar_op_compiler = scalar_op_compiler.SQLGlotScalarOpCompiler() + + # Creates sequential IDs with separate counters for each prefix (e.g., "t", "c"). + # ID sequences are unique per instance of this class. + uid_generator = guid.SequentialUIDGenerator() + + # TODO: add BigQuery Dialect + def compile_sql( + self, + node: nodes.BigFrameNode, + ordered: bool, + limit: typing.Optional[int] = None, + ) -> sge.Select: + # later steps might add ids, so snapshot before those steps. + output_ids = node.schema.names + if ordered: + # Need to do this before replacing unsupported ops, as that will rewrite slice ops + node, pulled_up_limit = rewrites.pullup_limit_from_slice(node) + if (pulled_up_limit is not None) and ( + (limit is None) or limit > pulled_up_limit + ): + limit = pulled_up_limit + + node = self._replace_unsupported_ops(node) + # prune before pulling up order to avoid unnnecessary row_number() ops + node = rewrites.column_pruning(node) + node, ordering = rewrites.pull_up_order(node, order_root=ordered) + # final pruning to cleanup up any leftovers unused values + node = rewrites.column_pruning(node) + # return self.compile_node(node).to_sql( + # order_by=ordering.all_ordering_columns if ordered else (), + # limit=limit, + # selections=output_ids, + # ) + + select_node = self.compile_node(node) + + order_expr = self.compile_row_ordering(ordering) + if order_expr: + select_node = select_node.order_by(order_expr) + + # return select_node + + def _replace_unsupported_ops(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: + # TODO: Run all replacement rules as single bottom-up pass + node = nodes.bottom_up(node, rewrites.rewrite_slice) + node = nodes.bottom_up(node, rewrites.rewrite_timedelta_expressions) + return node + + def compile_row_ordering( + self, node: bigframes.core.ordering.RowOrdering + ) -> sge.Order: + if len(node.all_ordering_columns) == 0: + return None + + ordering_expr = [ + sge.Ordered( + this=sge.Column( + this=sge.to_identifier( + col_ref.scalar_expression.id.sql, quoted=self.quoted + ) + ), + nulls_first=not col_ref.na_last, + desc=not col_ref.direction.is_ascending, + ) + for col_ref in node.all_ordering_columns + ] + return sge.Order(expressions=ordering_expr) + + @functools.singledispatchmethod + def compile_node(self, node: nodes.BigFrameNode) -> sge.Select: + """Defines transformation but isn't cached, always use compile_node instead""" + raise ValueError(f"Can't compile unrecognized node: {node}") + + @compile_node.register + def compile_selection(self, node: nodes.SelectionNode) -> sge.Select: + child = self.compile_node(node.child) + selected_cols = [ + sge.Alias( + this=self.scalar_op_compiler.compile_expression(expr), + alias=sge.to_identifier(id.name, quoted=self.quoted), + ) + for expr, id in node.input_output_pairs + ] + return child.select(*selected_cols, append=False) + + @compile_node.register + def compile_projection(self, node: nodes.ProjectionNode) -> sge.Select: + child = self.compile_node(node.child) + + new_cols = [ + sge.Alias( + this=self.scalar_op_compiler.compile_expression(expr), + alias=sge.to_identifier(id.name, quoted=self.quoted), + ) + for expr, id in node.assignments + ] + + return child.select(*new_cols, append=True) + + @compile_node.register + def compile_readlocal(self, node: nodes.ReadLocalNode) -> sge.Select: + array_as_pd = pd.read_feather( + io.BytesIO(node.feather_bytes), + columns=[item.source_id for item in node.scan_list.items], + ) + scan_list_items = node.scan_list.items + + # In the order mode, adds the offset column containing the index (0 to N-1) + if node.offsets_col: + offsets_col_name = node.offsets_col.sql + array_as_pd[offsets_col_name] = range(len(array_as_pd)) + scan_list_items = scan_list_items + ( + nodes.ScanItem( + ids.ColumnId(offsets_col_name), dtypes.INT_DTYPE, offsets_col_name + ), + ) + + # Convert timedeltas to microseconds for compatibility with BigQuery + _ = utils.replace_timedeltas_with_micros(array_as_pd) + + array_expr = sge.DataType( + this=sge.DataType.Type.STRUCT, + expressions=[ + sge.ColumnDef( + this=sge.to_identifier(item.source_id, quoted=self.quoted), + kind=sgt.SQLGlotType.from_bigframes_dtype(item.dtype), + ) + for item in scan_list_items + ], + nested=True, + ) + array_values = [ + sge.Tuple( + expressions=tuple( + self.literal( + value=value, + dtype=sgt.SQLGlotType.from_bigframes_dtype(item.dtype), + ) + for value, item in zip(row, scan_list_items) + ) + ) + for _, row in array_as_pd.iterrows() + ] + expr = sge.Unnest( + expressions=[ + sge.DataType( + this=sge.DataType.Type.ARRAY, + expressions=[array_expr], + nested=True, + values=array_values, + ), + ], + ) + return sg.select(sge.Star()).from_(expr) + + @compile_node.register + def compile_filter(self, node: nodes.FilterNode) -> sge.Select: + child_expr = self.compile_node(node.child) + # cte_name = self.uid_generator.generate_sequential_uid("t") + # with_expr = self.create_cte_from_select(child_expr, cte_name) + + # predicate_expr = self.scalar_op_compiler.compile_expression(node.predicate) + + # result = ( + # sg.select(sge.Star()) + # .from_(sg.to_identifier(cte_name, quoted=self.quoted)) + # .where(predicate_expr) + # ) + # existing = result.args.get("with") + # if not existing: + # result.args.set("with", sge.With()) + # result.args.get("with").expressions = cte_list + + # predicate = node.predicate + # def filter(self, predicate: ex.Expression) -> UnorderedIR: + # table = self._to_ibis_expr() + # condition = op_compiler.compile_expression(predicate, table) + # table = table.filter(condition) + # return UnorderedIR( + # table, tuple(table[column_name] for column_name in self._column_names) + # ) + + return child_expr + + # TODO(refactor): Helpers to build SQLGlot expressions. + def cast(self, arg, to) -> sge.Cast: + return sge.Cast(this=sge.convert(arg), to=to, copy=False) + + def literal(self, value, dtype) -> sge.Expression: + if value is None: + return self.cast(sge.Null(), dtype) + + # TODO: handle other types like visit_DefaultLiteral + return sge.convert(value) + + def create_cte_from_select( + self, select: sge.Select, cte_name: str + ) -> sge.With: + new_cte = sge.CTE( + this=select, + alias=sge.TableAlias(this=sg.to_identifier(cte_name, quoted=self.quoted)), + ) + + with_expr = select.args.pop("with", sge.With()) + cte_list = with_expr.expressions + [new_cte] + return sge.With(expressions=cte_list) diff --git a/bigframes/core/compile/sqlglot/scalar_op_compiler.py b/bigframes/core/compile/sqlglot/scalar_op_compiler.py new file mode 100644 index 0000000000..631ecdfde8 --- /dev/null +++ b/bigframes/core/compile/sqlglot/scalar_op_compiler.py @@ -0,0 +1,97 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import dataclasses +import functools +import io +import itertools +import typing +from typing import cast, Sequence, Tuple, TYPE_CHECKING + +import pandas as pd +import pyarrow as pa +import pyarrow.feather as feather +import sqlglot as sg +import sqlglot.expressions as sge + +import bigframes.core +import bigframes.core.compile.sqlglot.sqlglot_types as sgt +import bigframes.core.expression as ex +import bigframes.core.guid as guid +import bigframes.core.nodes as nodes +import bigframes.core.rewrite +import bigframes.dtypes as dtypes +import bigframes.operations as ops +import bigframes.operations.aggregations as agg_ops + + +@dataclasses.dataclass(frozen=True) +class SQLGlotScalarOpCompiler: + """Scalar Op Compiler for converting BigFrames scalar op expressions to SQLGlot + expressions.""" + + # Whether to always quote identifiers. + quoted: bool = True + + @functools.singledispatchmethod + def compile_expression(self, expr: ex.Expression): + raise NotImplementedError(f"Cannot compile expression: {expr}") + + @compile_expression.register + def compile_deref_op(self, expr: ex.DerefOp) -> sge.Expression: + return sge.ColumnDef(this=sge.to_identifier(expr.id.sql, quoted=self.quoted)) + + @compile_expression.register + def compile_scalar_constant_expression( + self, expr: ex.ScalarConstantExpression + ) -> sge.Expression: + return sge.Literal(this=expr.value, is_string=dtypes.is_string_like(expr.dtype)) + + @compile_expression.register + def compile_op_expression(self, expr: ex.OpExpression) -> sge.Expression: + op = expr.op + # TODO: This can block non-recursively compiler. + args = tuple(map(self.compile_expression, expr.inputs)) + + op_name = expr.op.__class__.__name__ + method_name = f"compile_{op_name}" + method = getattr(self, method_name, None) + if method is None: + raise NotImplementedError(f"Cannot compile operator {method_name}") + + if isinstance(op, ops.UnaryOp): + return method(op, args[0]) + elif isinstance(op, ops.BinaryOp): + return method(op, args[0], args[1]) + elif isinstance(op, ops.TernaryOp): + return method(op, args[0], args[1], args[2]) + elif isinstance(op, ops.NaryOp): + return method(op, *args) + else: + raise NotImplementedError(f"Cannot compile operator {method_name}") + + # TODO: add parenthesize for operators + def compile_AddOp( + self, op: ops.AddOp, left: sge.Expression, right: sge.Expression + ) -> sge.Expression: + return sge.Add(this=left, expression=right) + + # TODO: check why it's not `GeOp` + def compile_ge( + self, op, left: sge.Expression, right: sge.Expression + ) -> sge.Expression: + return sge.GTE(this=left, expression=right) diff --git a/bigframes/core/compile/sqlglot/sqlglot_types.py b/bigframes/core/compile/sqlglot/sqlglot_types.py new file mode 100644 index 0000000000..06c78c1435 --- /dev/null +++ b/bigframes/core/compile/sqlglot/sqlglot_types.py @@ -0,0 +1,84 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import typing + +import bigframes_vendored.constants as constants +import numpy as np +import pandas as pd +import pyarrow as pa +import sqlglot as sg + +import bigframes.dtypes + + +class SQLGlotType: + @classmethod + def from_bigframes_dtype( + cls, + bigframes_dtype: typing.Union[ + bigframes.dtypes.DtypeString, bigframes.dtypes.Dtype, np.dtype[typing.Any] + ], + ): + if bigframes_dtype == bigframes.dtypes.INT_DTYPE: + return "INT64" + elif bigframes_dtype == bigframes.dtypes.FLOAT_DTYPE: + return "FLOAT64" + elif bigframes_dtype == bigframes.dtypes.STRING_DTYPE: + return "STRING" + elif bigframes_dtype == bigframes.dtypes.BOOL_DTYPE: + return "BOOLEAN" + elif bigframes_dtype == bigframes.dtypes.DATE_DTYPE: + return "DATE" + elif bigframes_dtype == bigframes.dtypes.TIME_DTYPE: + return "TIME" + elif bigframes_dtype == bigframes.dtypes.DATETIME_DTYPE: + return "DATETIME" + elif bigframes_dtype == bigframes.dtypes.TIMESTAMP_DTYPE: + return "TIMESTAMP" + elif bigframes_dtype == bigframes.dtypes.BYTES_DTYPE: + return "BYTES" + elif bigframes_dtype == bigframes.dtypes.NUMERIC_DTYPE: + return "NUMERIC" + elif bigframes_dtype == bigframes.dtypes.BIGNUMERIC_DTYPE: + return "BIGNUMERIC" + elif bigframes_dtype == bigframes.dtypes.JSON_DTYPE: + return "JSON" + elif bigframes_dtype == bigframes.dtypes.GEO_DTYPE: + return "GEOGRAPHY" + elif isinstance(bigframes_dtype, pd.ArrowDtype): + if pa.types.is_list(bigframes_dtype.pyarrow_dtype): + inner_bigframes_dtype = bigframes.dtypes.arrow_dtype_to_bigframes_dtype( + bigframes_dtype.pyarrow_dtype.value_type + ) + return ( + f"ARRAY<{SQLGlotType.from_bigframes_dtype(inner_bigframes_dtype)}>" + ) + elif pa.types.is_struct(bigframes_dtype.pyarrow_dtype): + struct_type = typing.cast(pa.StructType, bigframes_dtype.pyarrow_dtype) + inner_fields: list[str] = [] + for i in range(struct_type.num_fields): + field = struct_type.field(i) + key = sg.to_identifier(field.name).sql("bigquery") + dtype = SQLGlotType.from_bigframes_dtype( + bigframes.dtypes.arrow_dtype_to_bigframes_dtype(field.type) + ) + inner_fields.append(f"{key} {dtype}") + return "STRUCT<{}>".format(", ".join(inner_fields)) + + raise ValueError( + f"Unsupported type for {bigframes_dtype}. {constants.FEEDBACK_LINK}" + ) diff --git a/bigframes/core/guid.py b/bigframes/core/guid.py index 8930d0760a..289df4d6eb 100644 --- a/bigframes/core/guid.py +++ b/bigframes/core/guid.py @@ -19,3 +19,29 @@ def generate_guid(prefix="col_"): global _GUID_COUNTER _GUID_COUNTER += 1 return f"bfuid_{prefix}{_GUID_COUNTER}" + + +class SequentialUIDGenerator: + """ + Generates sequential-like UIDs with multiple prefixes, e.g., "t0", "t1", "c0", "t2", etc. + """ + + def __init__(self): + # Dictionary to store counters for each prefix + self.prefix_counters = {} + + def generate_sequential_uid(self, prefix: str) -> str: + """Generates a sequential UID with specified prefix. + + Args: + prefix: The prefix for the UID. + + Returns: + str: The sequential UID. + """ + if prefix not in self.prefix_counters: + self.prefix_counters[prefix] = 0 + + uid = f"{prefix}{self.prefix_counters[prefix]}" + self.prefix_counters[prefix] += 1 + return uid diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 18d0b38fba..3ad0aa05bc 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -98,7 +98,7 @@ # Inherits from pandas DataFrame so that we can use the same docstrings. -@log_adapter.class_logger +# @log_adapter.class_logger class DataFrame(vendored_pandas_frame.DataFrame): __doc__ = vendored_pandas_frame.DataFrame.__doc__ # internal flag to disable cache at all diff --git a/bigframes/series.py b/bigframes/series.py index 33ba6f8599..bd20bf0674 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -76,7 +76,7 @@ _list = list # Type alias to escape Series.list property -@log_adapter.class_logger +# @log_adapter.class_logger class Series(bigframes.operations.base.SeriesMethods, vendored_pandas_series.Series): # Must be above 5000 for pandas to delegate to bigframes for binops __pandas_priority__ = 13000 @@ -2068,6 +2068,16 @@ def __array_ufunc__( return NotImplemented + @property + def sql(self) -> str: + """Compiles this Series's expression tree to SQL. + Returns: + str: + string representing the compiled SQL. + """ + sql, _, _ = self._block.to_sql_query(include_index=False) + return sql + @property def plot(self): return plotting.PlotAccessor(self) diff --git a/tests/unit/core/compile/sqlglot/__init__.py b/tests/unit/core/compile/sqlglot/__init__.py new file mode 100644 index 0000000000..0a2669d7a2 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/unit/core/compile/sqlglot/conftest.py b/tests/unit/core/compile/sqlglot/conftest.py new file mode 100644 index 0000000000..bfde1925fe --- /dev/null +++ b/tests/unit/core/compile/sqlglot/conftest.py @@ -0,0 +1,48 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas as pd +import pandas.testing +import pyarrow as pa +import pytest + +import bigframes +import bigframes.pandas as bpd +from tests.system.utils import skip_legacy_pandas + + +@pytest.fixture(scope="module") +def sql_compiler_session(): + from . import sql_compiler_session + + return sql_compiler_session.SQLCompilerSession() + + +@pytest.fixture(scope="module") +def inline_pd_df() -> pd.DataFrame: + df = pd.DataFrame( + { + "int1": pd.Series([1, 2, 3], dtype="Int64"), + "int2": pd.Series([-10, 20, 30], dtype="Int64"), + "bools": pd.Series([True, None, False], dtype="boolean"), + "strings": pd.Series(["b", "aa", "ccc"], dtype="string[pyarrow]"), + # "intLists": pd.Series( + # [[1, 2, 3], [4, 5, 6, 7], None], + # dtype=pd.ArrowDtype(pa.list_(pa.int64())), + # ), + }, + ) + # add more complexity index. + df.index = df.index.astype("Int64") + return df diff --git a/tests/unit/core/compile/sqlglot/sql_compiler_session.py b/tests/unit/core/compile/sqlglot/sql_compiler_session.py new file mode 100644 index 0000000000..326cc747fb --- /dev/null +++ b/tests/unit/core/compile/sqlglot/sql_compiler_session.py @@ -0,0 +1,98 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses +from typing import Mapping, Optional, Union +from unittest import mock +import weakref + +from google.cloud import bigquery +import polars + +import bigframes +import bigframes.clients +import bigframes.core.blocks +import bigframes.core.compile.polars +import bigframes.core.compile.sqlglot as sqlglot +import bigframes.core.nodes as nodes +import bigframes.core.ordering +import bigframes.dataframe +import bigframes.session.clients +import bigframes.session.executor +import bigframes.session.metrics + + +# Does not support to_sql, export_gbq, export_gcs, dry_run, peek, head, get_row_count, cached +@dataclasses.dataclass +class SQLCompiler(bigframes.session.executor.Executor): + compiler = sqlglot.SQLGlotCompiler() + + def to_sql( + self, + array_value: bigframes.core.ArrayValue, + offset_column: Optional[str] = None, + ordered: bool = True, + enable_cache: bool = False, + ) -> str: + if offset_column: + array_value, internal_offset_col = array_value.promote_offsets() + # node = ( + # self.replace_cached_subtrees(array_value.node) + # if enable_cache + # else array_value.node + # ) + exp = self.compiler.compile_sql(array_value.node, ordered=ordered) + # if exp is None: + # raise ValueError("Got unexpected empty expression.") + + return exp.sql(dialect="bigquery", pretty=False) + + +class SQLCompilerSession(bigframes.session.Session): + def __init__(self): + self._location = None # type: ignore + self._bq_kms_key_name = None # type: ignore + self._clients_provider = None # type: ignore + self.ibis_client = None # type: ignore + self._bq_connection = None # type: ignore + self._skip_bq_connection_check = True + self._session_id: str = "sqlglot_test_session" + self._objects: list[ + weakref.ReferenceType[ + Union[ + bigframes.core.indexes.Index, + bigframes.series.Series, + bigframes.dataframe.DataFrame, + ] + ] + ] = [] + self._strictly_ordered: bool = True + self._allow_ambiguity = False # type: ignore + self._default_index_type = bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64 + self._metrics = bigframes.session.metrics.ExecutionMetrics() + self._remote_function_session = None # type: ignore + self._temp_storage_manager = None # type: ignore + self._executor = SQLCompiler() + self._loader = None # type: ignore + + self._clients_provider = bigframes.session.clients.ClientsProvider( + project="compile.sqlglot", + location="unit", + ) + self._clients_provider._bqclient = mock.create_autospec(spec=bigquery.Client) + + def read_pandas(self, pandas_dataframe, write_engine="default"): + # override read_pandas to always keep data local-only + local_block = bigframes.core.blocks.Block.from_local(pandas_dataframe, self) + return bigframes.dataframe.DataFrame(local_block) diff --git a/tests/unit/core/compile/sqlglot/test_compiler.py b/tests/unit/core/compile/sqlglot/test_compiler.py new file mode 100644 index 0000000000..0572c8f2bc --- /dev/null +++ b/tests/unit/core/compile/sqlglot/test_compiler.py @@ -0,0 +1,56 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import google.cloud.bigquery as bigquery +import pandas as pd +import pytest + +import bigframes.core +import bigframes.core.nodes as nodes +import bigframes.pandas as bpd + + +def test_compile_local( + inline_pd_df: pd.DataFrame, sql_compiler_session: bigframes.Session +): + bf_df = bpd.DataFrame(inline_pd_df, session=sql_compiler_session) + expected_sql = "SELECT `column_0` AS `int1`, `column_1` AS `int2`, `column_2` AS `bools`, `column_3` AS `strings`, `bfuid_col_1` AS `bfuid_col_2` FROM UNNEST(ARRAY>[(1, -10, TRUE, 'b', 0), (2, 20, CAST(NULL AS BOOLEAN), 'aa', 1), (3, 30, FALSE, 'ccc', 2)]) ORDER BY `bfuid_col_2` ASC NULLS LAST" + assert bf_df.sql == expected_sql + + +def test_compile_add( + inline_pd_df: pd.DataFrame, sql_compiler_session: bigframes.Session +): + bf_df = bpd.DataFrame(inline_pd_df, session=sql_compiler_session) + bf_add = bf_df["int1"] + bf_df["int2"] + expected_sql = "SELECT `bfuid_col_4` AS `bigframes_unnamed_column`, `bfuid_col_6` AS `bfuid_col_7` FROM UNNEST(ARRAY>[(1, -10, 0), (2, 20, 1), (3, 30, 2)]) ORDER BY `bfuid_col_7` ASC NULLS LAST" + assert bf_add.sql == expected_sql + + +def test_compile_order_by( + inline_pd_df: pd.DataFrame, sql_compiler_session: bigframes.Session +): + bf_df = bpd.DataFrame(inline_pd_df, session=sql_compiler_session) + bf_sort_values = bf_df.sort_values("strings") + expected_sql = "SELECT `column_0` AS `int1`, `column_1` AS `int2`, `column_2` AS `bools`, `column_3` AS `strings`, `bfuid_col_1` AS `bfuid_col_2` FROM UNNEST(ARRAY>[(1, -10, TRUE, 'b', 0), (2, 20, CAST(NULL AS BOOLEAN), 'aa', 1), (3, 30, FALSE, 'ccc', 2)]) ORDER BY `strings` ASC NULLS LAST, `bfuid_col_2` ASC NULLS LAST" + assert bf_sort_values.sql == expected_sql + + +def test_compile_filter( + inline_pd_df: pd.DataFrame, sql_compiler_session: bigframes.Session +): + bf_df = bpd.DataFrame(inline_pd_df, session=sql_compiler_session) + bf_filter = bf_df[bf_df["int2"] >= 1] + expected_sql = "SELECT `column_0` AS `int1`, `column_1` AS `int2`, `column_2` AS `bools`, `column_3` AS `strings`, `bfuid_col_1` AS `bfuid_col_2` FROM UNNEST(ARRAY>[(1, -10, TRUE, 'b', 0), (2, 20, CAST(NULL AS BOOLEAN), 'aa', 1), (3, 30, FALSE, 'ccc', 2)]) ORDER BY `strings` ASC NULLS LAST, `bfuid_col_2` ASC NULLS LAST" + assert bf_filter.sql == expected_sql diff --git a/tests/unit/core/compile/sqlglot/test_sqlglot_types.py b/tests/unit/core/compile/sqlglot/test_sqlglot_types.py new file mode 100644 index 0000000000..562e8c80f4 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/test_sqlglot_types.py @@ -0,0 +1,66 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas as pd +import pyarrow as pa + +import bigframes.core.compile.sqlglot.sqlglot_types as sgt +import bigframes.dtypes as dtypes + + +def test_from_bigframes_simple_dtypes(): + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.INT_DTYPE) == "INT64" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.FLOAT_DTYPE) == "FLOAT64" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.STRING_DTYPE) == "STRING" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.BOOL_DTYPE) == "BOOLEAN" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.DATE_DTYPE) == "DATE" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.TIME_DTYPE) == "TIME" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.DATETIME_DTYPE) == "DATETIME" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.TIMESTAMP_DTYPE) == "TIMESTAMP" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.BYTES_DTYPE) == "BYTES" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.NUMERIC_DTYPE) == "NUMERIC" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.BIGNUMERIC_DTYPE) == "BIGNUMERIC" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.JSON_DTYPE) == "JSON" + assert sgt.SQLGlotType.from_bigframes_dtype(dtypes.GEO_DTYPE) == "GEOGRAPHY" + + +def test_from_bigframes_struct_dtypes(): + fields = [pa.field("int_col", pa.int64()), pa.field("bool_col", pa.bool_())] + struct_type = pd.ArrowDtype(pa.struct(fields)) + expected = "STRUCT" + assert sgt.SQLGlotType.from_bigframes_dtype(struct_type) == expected + + +def test_from_bigframes_array_dtypes(): + int_array_type = pd.ArrowDtype(pa.list_(pa.int64())) + assert sgt.SQLGlotType.from_bigframes_dtype(int_array_type) == "ARRAY" + + string_array_type = pd.ArrowDtype(pa.list_(pa.string())) + assert sgt.SQLGlotType.from_bigframes_dtype(string_array_type) == "ARRAY" + + # TODO: test JSON, GEOGRAPHY etc. + + +def test_from_bigframes_struct_array_dtypes(): + fields = [ + pa.field("string_col", pa.string()), + pa.field("date_col", pa.date32()), + pa.field("array_col", pa.list_(pa.timestamp("us"))), + ] + array_type = pd.ArrowDtype(pa.list_(pa.struct(fields))) + + expected = ( + "ARRAY>>" + ) + assert sgt.SQLGlotType.from_bigframes_dtype(array_type) == expected