diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index f40dfc0071..379c661179 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -467,6 +467,36 @@ def _validate_result_schema(self, result_df: pd.DataFrame): f"This error should only occur while testing. Ibis schema: {ibis_schema} does not match actual schema: {actual_schema}" ) + def to_arrow( + self, + *, + ordered: bool = True, + ) -> Tuple[pa.Table, bigquery.QueryJob]: + """Run query and download results as a pyarrow Table.""" + # pa.Table.from_pandas puts index columns last, so update the expression to match. + expr = self.expr.select_columns( + list(self.value_columns) + list(self.index_columns) + ) + + _, query_job = self.session._query_to_destination( + self.session._to_sql(expr, ordered=ordered), + list(self.index_columns), + api_name="cached", + do_clustering=False, + ) + results_iterator = query_job.result() + pa_table = results_iterator.to_arrow() + + pa_index_labels = [] + for index_level, index_label in enumerate(self._index_labels): + if isinstance(index_label, str): + pa_index_labels.append(index_label) + else: + pa_index_labels.append(f"__index_level_{index_level}__") + + pa_table = pa_table.rename_columns(list(self.column_labels) + pa_index_labels) + return pa_table, query_job + def to_pandas( self, max_download_size: Optional[int] = None, diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 75420ca957..274e176dd5 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -44,6 +44,7 @@ import numpy import pandas import pandas.io.formats.format +import pyarrow import tabulate import bigframes @@ -1183,6 +1184,34 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: return DataFrame(frame._block.calculate_pairwise_metric(agg_ops.CovOp())) + def to_arrow( + self, + *, + ordered: Optional[bool] = None, + ) -> pyarrow.Table: + """Write DataFrame to an Arrow table / record batch. + + Args: + ordered (bool, default None): + Determines whether the resulting Arrow table will be deterministically ordered. + In some cases, unordered may result in a faster-executing query. If set to a value + other than None, will override Session default. + + Returns: + pyarrow.Table: A pyarrow Table with all rows and columns of this DataFrame. + """ + warnings.warn( + "to_arrow is in preview. Types and unnamed / duplicate name columns may change in future.", + category=bigframes.exceptions.PreviewWarning, + ) + + self._optimize_query_complexity() + pa_table, query_job = self._block.to_arrow( + ordered=ordered if ordered is not None else self._session._strictly_ordered, + ) + self._set_internal_query_job(query_job) + return pa_table + def to_pandas( self, max_download_size: Optional[int] = None, diff --git a/samples/polars/create_polars_df_with_to_arrow_test.py b/samples/polars/create_polars_df_with_to_arrow_test.py new file mode 100644 index 0000000000..acb79f23c8 --- /dev/null +++ b/samples/polars/create_polars_df_with_to_arrow_test.py @@ -0,0 +1,40 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_create_polars_df() -> None: + # [START bigquery_dataframes_to_polars] + import polars + + import bigframes.enums + import bigframes.pandas as bpd + + bf_df = bpd.read_gbq_table( + "bigquery-public-data.usa_names.usa_1910_current", + # Setting index_col to either a unique column or NULL will give the + # best performance. + index_col=bigframes.enums.DefaultIndexKind.NULL, + ) + # TODO(developer): Do some analysis using BigQuery DataFrames. + # ... + + # Run the query and download the results as an Arrow table to convert into + # a Polars DataFrame. Use ordered=False if your polars analysis is OK with + # non-deterministic ordering. + arrow_table = bf_df.to_arrow(ordered=False) + polars_df = polars.from_arrow(arrow_table) + # [END bigquery_dataframes_to_polars] + + assert polars_df.shape == bf_df.shape + assert polars_df["number"].sum() == bf_df["number"].sum() diff --git a/samples/polars/noxfile.py b/samples/polars/noxfile.py new file mode 100644 index 0000000000..c36d5f2d81 --- /dev/null +++ b/samples/polars/noxfile.py @@ -0,0 +1,292 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import glob +import os +from pathlib import Path +import sys +from typing import Callable, Dict, Optional + +import nox + +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING +# DO NOT EDIT THIS FILE EVER! +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING + +BLACK_VERSION = "black==22.3.0" +ISORT_VERSION = "isort==5.10.1" + +# Copy `noxfile_config.py` to your directory and modify it instead. + +# `TEST_CONFIG` dict is a configuration hook that allows users to +# modify the test configurations. The values here should be in sync +# with `noxfile_config.py`. Users will copy `noxfile_config.py` into +# their directory and modify it. + +TEST_CONFIG = { + # You can opt out from the test for specific Python versions. + "ignored_versions": [], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": False, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} + + +try: + # Ensure we can import noxfile_config in the project's directory. + sys.path.append(".") + from noxfile_config import TEST_CONFIG_OVERRIDE +except ImportError as e: + print("No user noxfile_config found: detail: {}".format(e)) + TEST_CONFIG_OVERRIDE = {} + +# Update the TEST_CONFIG with the user supplied values. +TEST_CONFIG.update(TEST_CONFIG_OVERRIDE) + + +def get_pytest_env_vars() -> Dict[str, str]: + """Returns a dict for pytest invocation.""" + ret = {} + + # Override the GCLOUD_PROJECT and the alias. + env_key = TEST_CONFIG["gcloud_project_env"] + # This should error out if not set. + ret["GOOGLE_CLOUD_PROJECT"] = os.environ[env_key] + + # Apply user supplied envs. + ret.update(TEST_CONFIG["envs"]) + return ret + + +# DO NOT EDIT - automatically generated. +# All versions used to test samples. +ALL_VERSIONS = ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] + +# Any default versions that should be ignored. +IGNORED_VERSIONS = TEST_CONFIG["ignored_versions"] + +TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) + +INSTALL_LIBRARY_FROM_SOURCE = os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False) in ( + "True", + "true", +) + +# Error if a python version is missing +nox.options.error_on_missing_interpreters = True + +# +# Style Checks +# + + +# Linting with flake8. +# +# We ignore the following rules: +# E203: whitespace before ‘:’ +# E266: too many leading ‘#’ for block comment +# E501: line too long +# I202: Additional newline in a section of imports +# +# We also need to specify the rules which are ignored by default: +# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121'] +FLAKE8_COMMON_ARGS = [ + "--show-source", + "--builtin=gettext", + "--max-complexity=20", + "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py", + "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202", + "--max-line-length=88", +] + + +@nox.session +def lint(session: nox.sessions.Session) -> None: + if not TEST_CONFIG["enforce_type_hints"]: + session.install("flake8") + else: + session.install("flake8", "flake8-annotations") + + args = FLAKE8_COMMON_ARGS + [ + ".", + ] + session.run("flake8", *args) + + +# +# Black +# + + +@nox.session +def blacken(session: nox.sessions.Session) -> None: + """Run black. Format code to uniform standard.""" + session.install(BLACK_VERSION) + python_files = [path for path in os.listdir(".") if path.endswith(".py")] + + session.run("black", *python_files) + + +# +# format = isort + black +# + + +@nox.session +def format(session: nox.sessions.Session) -> None: + """ + Run isort to sort imports. Then run black + to format code to uniform standard. + """ + session.install(BLACK_VERSION, ISORT_VERSION) + python_files = [path for path in os.listdir(".") if path.endswith(".py")] + + # Use the --fss option to sort imports using strict alphabetical order. + # See https://pycqa.github.io/isort/docs/configuration/options.html#force-sort-within-sections + session.run("isort", "--fss", *python_files) + session.run("black", *python_files) + + +# +# Sample Tests +# + + +PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"] + + +def _session_tests( + session: nox.sessions.Session, post_install: Callable = None +) -> None: + # check for presence of tests + test_list = glob.glob("**/*_test.py", recursive=True) + glob.glob( + "**/test_*.py", recursive=True + ) + test_list.extend(glob.glob("**/tests", recursive=True)) + + if len(test_list) == 0: + print("No tests found, skipping directory.") + return + + if TEST_CONFIG["pip_version_override"]: + pip_version = TEST_CONFIG["pip_version_override"] + session.install(f"pip=={pip_version}") + """Runs py.test for a particular project.""" + concurrent_args = [] + if os.path.exists("requirements.txt"): + if os.path.exists("constraints.txt"): + session.install("-r", "requirements.txt", "-c", "constraints.txt") + else: + session.install("-r", "requirements.txt") + with open("requirements.txt") as rfile: + packages = rfile.read() + + if os.path.exists("requirements-test.txt"): + if os.path.exists("constraints-test.txt"): + session.install("-r", "requirements-test.txt", "-c", "constraints-test.txt") + else: + session.install("-r", "requirements-test.txt") + with open("requirements-test.txt") as rtfile: + packages += rtfile.read() + + if INSTALL_LIBRARY_FROM_SOURCE: + session.install("-e", _get_repo_root()) + + if post_install: + post_install(session) + + if "pytest-parallel" in packages: + concurrent_args.extend(["--workers", "auto", "--tests-per-worker", "auto"]) + elif "pytest-xdist" in packages: + concurrent_args.extend(["-n", "auto"]) + + session.run( + "pytest", + *(PYTEST_COMMON_ARGS + session.posargs + concurrent_args), + # Pytest will return 5 when no tests are collected. This can happen + # on travis where slow and flaky tests are excluded. + # See http://doc.pytest.org/en/latest/_modules/_pytest/main.html + success_codes=[0, 5], + env=get_pytest_env_vars(), + ) + + +@nox.session(python=ALL_VERSIONS) +def py(session: nox.sessions.Session) -> None: + """Runs py.test for a sample using the specified version of Python.""" + if session.python in TESTED_VERSIONS: + _session_tests(session) + else: + session.skip( + "SKIPPED: {} tests are disabled for this sample.".format(session.python) + ) + + +# +# Readmegen +# + + +def _get_repo_root() -> Optional[str]: + """Returns the root folder of the project.""" + # Get root of this repository. Assume we don't have directories nested deeper than 10 items. + p = Path(os.getcwd()) + for i in range(10): + if p is None: + break + if Path(p / ".git").exists(): + return str(p) + # .git is not available in repos cloned via Cloud Build + # setup.py is always in the library's root, so use that instead + # https://github.com/googleapis/synthtool/issues/792 + if Path(p / "setup.py").exists(): + return str(p) + p = p.parent + raise Exception("Unable to detect repository root.") + + +GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")]) + + +@nox.session +@nox.parametrize("path", GENERATED_READMES) +def readmegen(session: nox.sessions.Session, path: str) -> None: + """(Re-)generates the readme for a sample.""" + session.install("jinja2", "pyyaml") + dir_ = os.path.dirname(path) + + if os.path.exists(os.path.join(dir_, "requirements.txt")): + session.install("-r", os.path.join(dir_, "requirements.txt")) + + in_file = os.path.join(dir_, "README.rst.in") + session.run( + "python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file + ) diff --git a/samples/polars/noxfile_config.py b/samples/polars/noxfile_config.py new file mode 100644 index 0000000000..91238e9e2f --- /dev/null +++ b/samples/polars/noxfile_config.py @@ -0,0 +1,42 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be inported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7", "3.7", "3.8"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # "gcloud_project_env": "BUILD_SPECIFIC_GCLOUD_PROJECT", + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} diff --git a/samples/polars/requirements-test.txt b/samples/polars/requirements-test.txt new file mode 100644 index 0000000000..beca2e44d9 --- /dev/null +++ b/samples/polars/requirements-test.txt @@ -0,0 +1,3 @@ +# samples/snippets should be runnable with no "extras" +google-cloud-testutils==1.4.0 +pytest==8.2.0 diff --git a/samples/polars/requirements.txt b/samples/polars/requirements.txt new file mode 100644 index 0000000000..e3f886e7e3 --- /dev/null +++ b/samples/polars/requirements.txt @@ -0,0 +1,3 @@ +bigframes==1.6.0 +polars==0.20.31 +pyarrow==15.0.0 diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index 8adbea88e4..ab1fdceae5 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -132,6 +132,67 @@ def test_sql_executes_and_includes_named_multiindex( ) +def test_to_arrow(scalars_df_default_index, scalars_pandas_df_default_index): + """Verify to_arrow() APIs returns the expected data.""" + expected = pa.Table.from_pandas( + scalars_pandas_df_default_index.drop(columns=["geography_col"]) + ) + + with pytest.warns( + bigframes.exceptions.PreviewWarning, + match="to_arrow", + ): + actual = scalars_df_default_index.drop(columns=["geography_col"]).to_arrow() + + # Make string_col match type. Otherwise, pa.Table.from_pandas uses + # LargeStringArray. LargeStringArray is unnecessary because our strings are + # less than 2 GB. + expected = expected.set_column( + expected.column_names.index("string_col"), + pa.field("string_col", pa.string()), + expected["string_col"].cast(pa.string()), + ) + + # Note: the final .equals assertion covers all these checks, but these + # finer-grained assertions are easier to debug. + assert actual.column_names == expected.column_names + for column in actual.column_names: + assert actual[column].equals(expected[column]) + assert actual.equals(expected) + + +def test_to_arrow_multiindex(scalars_df_index, scalars_pandas_df_index): + scalars_df_multiindex = scalars_df_index.set_index(["string_col", "int64_col"]) + scalars_pandas_df_multiindex = scalars_pandas_df_index.set_index( + ["string_col", "int64_col"] + ) + expected = pa.Table.from_pandas( + scalars_pandas_df_multiindex.drop(columns=["geography_col"]) + ) + + with pytest.warns( + bigframes.exceptions.PreviewWarning, + match="to_arrow", + ): + actual = scalars_df_multiindex.drop(columns=["geography_col"]).to_arrow() + + # Make string_col match type. Otherwise, pa.Table.from_pandas uses + # LargeStringArray. LargeStringArray is unnecessary because our strings are + # less than 2 GB. + expected = expected.set_column( + expected.column_names.index("string_col"), + pa.field("string_col", pa.string()), + expected["string_col"].cast(pa.string()), + ) + + # Note: the final .equals assertion covers all these checks, but these + # finer-grained assertions are easier to debug. + assert actual.column_names == expected.column_names + for column in actual.column_names: + assert actual[column].equals(expected[column]) + assert actual.equals(expected) + + def test_to_pandas_w_correct_dtypes(scalars_df_default_index): """Verify to_pandas() APIs returns the expected dtypes.""" actual = scalars_df_default_index.to_pandas().dtypes