Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: support array output in remote_function #1057

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 54 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
82307b1
feat: support array output in `remote_function`
shobsi Oct 7, 2024
8962270
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 8, 2024
5049db6
add multiindex test
shobsi Oct 8, 2024
bedb16c
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 10, 2024
f65e84f
move array type conversion to bigquery module, test multiindex
shobsi Oct 10, 2024
8fd0dc9
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 14, 2024
b785bd5
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 15, 2024
eb555b4
add `bigframes.bigquery.json_extract_string_array`, support int and s…
shobsi Oct 16, 2024
5c48d31
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 16, 2024
5607974
increase cleanup rate
shobsi Oct 16, 2024
1987bac
update input and output types doc
shobsi Oct 17, 2024
440cff0
support array output in DataFrame.apply
shobsi Oct 17, 2024
0f86c52
support read_gbq_function on a remote function created for array output
shobsi Oct 18, 2024
bb0d777
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 18, 2024
ea667fd
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 21, 2024
7e746d2
fix the json_set after variable renaming
shobsi Oct 21, 2024
18cbea1
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 22, 2024
b8c53d5
add tests for output_type in read_gbq_function
shobsi Oct 22, 2024
285bc93
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 22, 2024
5725e8f
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 23, 2024
2485aa3
temporarily exclude system 3.9 tests and include 3.10 and 3.11
shobsi Oct 23, 2024
f4b2f15
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 23, 2024
1f4ad12
Revert "temporarily exclude system 3.9 tests and include 3.10 and 3.11"
shobsi Oct 23, 2024
fe010cb
add more info in the unexpected exception
shobsi Oct 23, 2024
86fe316
more debug info
shobsi Oct 23, 2024
00a7194
use unique routine name across tests
shobsi Oct 24, 2024
75b02c4
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 24, 2024
1baf7af
Revert "more debug info"
shobsi Oct 24, 2024
fda3d26
Revert "add more info in the unexpected exception"
shobsi Oct 24, 2024
8621a5d
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 24, 2024
5a910d2
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Oct 29, 2024
32f4f97
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Nov 7, 2024
a4d70e5
support array output in binary remote function operations
shobsi Nov 26, 2024
b03e2cd
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Nov 26, 2024
c02d758
support array output in nary remote function operations
shobsi Nov 26, 2024
c375dfd
preserve array output type in function description to avoid explit ou…
shobsi Dec 6, 2024
b539134
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Dec 6, 2024
4d925a5
fix one failing read_gbq_function test
shobsi Dec 9, 2024
5244593
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Dec 9, 2024
18e6ddc
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Dec 10, 2024
3a6b10d
make test parameterization order deterministic
shobsi Dec 10, 2024
2f7a91b
fix sorting of types for mypy
shobsi Dec 10, 2024
5f23aa2
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Dec 11, 2024
41b4dfc
remove test parameterization with sorting inside
shobsi Dec 11, 2024
18d08d1
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Dec 11, 2024
d65a1e3
include partial ordering mode testing for read_gbq_function
shobsi Dec 13, 2024
3b894a8
add remote function array out test in partial ordering mode
shobsi Dec 13, 2024
3ec74d4
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Dec 20, 2024
1dfce56
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Jan 3, 2025
28e94aa
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Jan 13, 2025
7beceb3
avoid repr-eval for output type serialization/deserialization
shobsi Jan 15, 2025
d4dca60
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Jan 15, 2025
84f1aa7
remove unsupported scenarios system tests, use common exception for u…
shobsi Jan 15, 2025
bd9248d
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-rf…
shobsi Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions 1 bigframes/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import cast, Optional

import google.api_core.exceptions
import google.api_core.retry
from google.cloud import bigquery_connection_v1, resourcemanager_v3
from google.iam.v1 import iam_policy_pb2, policy_pb2

Expand Down
17 changes: 15 additions & 2 deletions 17 bigframes/core/compile/ibis_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from __future__ import annotations

import textwrap
import typing
from typing import Any, cast, Dict, Iterable, Optional, Tuple, Union
import warnings

Expand All @@ -22,7 +23,7 @@
import bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
from bigframes_vendored.ibis.expr.datatypes.core import (
dtype as python_type_to_bigquery_type,
dtype as python_type_to_ibis_type,
)
import bigframes_vendored.ibis.expr.types as ibis_types
import geopandas as gpd # type: ignore
Expand Down Expand Up @@ -472,12 +473,24 @@ class UnsupportedTypeError(ValueError):
def __init__(self, type_, supported_types):
self.type = type_
self.supported_types = supported_types
super().__init__(
f"'{type_}' is not one of the supported types {supported_types}"
)


def ibis_type_from_python_type(t: type) -> ibis_dtypes.DataType:
if t not in bigframes.dtypes.RF_SUPPORTED_IO_PYTHON_TYPES:
raise UnsupportedTypeError(t, bigframes.dtypes.RF_SUPPORTED_IO_PYTHON_TYPES)
return python_type_to_bigquery_type(t)
return python_type_to_ibis_type(t)


def ibis_array_output_type_from_python_type(t: type) -> ibis_dtypes.DataType:
array_of = typing.get_args(t)[0]
if array_of not in bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES:
raise UnsupportedTypeError(
array_of, bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES
)
return python_type_to_ibis_type(t)


def ibis_type_from_type_kind(tk: bigquery.StandardSqlTypeNames) -> ibis_dtypes.DataType:
Expand Down
13 changes: 13 additions & 0 deletions 13 bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4014,6 +4014,19 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
ops.NaryRemoteFunctionOp(func=func), series_list[1:]
)
result_series.name = None

# if the output is an array, reconstruct it from the json serialized
# string form
if bigframes.dtypes.is_array_like(func.output_dtype):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually handle any array-like dtype?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Um, in this PR we are looking to support types like list[int] on the output side? Or I didn't get you?

import bigframes.bigquery as bbq

result_dtype = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(
func.output_dtype.pyarrow_dtype.value_type
)
result_series = bbq.json_extract_string_array(
result_series, value_dtype=result_dtype
)

return result_series

# Per-column apply
Expand Down
7 changes: 7 additions & 0 deletions 7 bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,13 @@ def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype:
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
RF_SUPPORTED_IO_PYTHON_TYPES = {bool, bytes, float, int, str}

# Support array output types in BigQuery DataFrames remote functions even though
# it is not currently (2024-10-06) supported in BigQuery remote functions.
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
# TODO(b/284515241): remove this special handling when BigQuery remote functions
# support array.
RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES = {bool, float, int, str}

RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS = {
"BOOLEAN",
"BOOL",
Expand Down
28 changes: 11 additions & 17 deletions 28 bigframes/functions/_remote_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def create_bq_remote_function(
endpoint,
bq_function_name,
max_batching_rows,
metadata,
):
"""Create a BigQuery remote function given the artifacts of a user defined
function and the http endpoint of a corresponding cloud function."""
Expand All @@ -120,9 +121,14 @@ def create_bq_remote_function(
"max_batching_rows": max_batching_rows,
}

if metadata:
# We are using the description field to store this structured
# bigframes specific metadata for the lack of a better option
remote_function_options["description"] = metadata

remote_function_options_str = ", ".join(
[
f'{key}="{val}"' if isinstance(val, str) else f"{key}={val}"
f"{key}='{val}'" if isinstance(val, str) else f"{key}={val}"
for key, val in remote_function_options.items()
if val is not None
]
Expand Down Expand Up @@ -200,14 +206,7 @@ def generate_cloud_function_code(
package_requirements=None,
is_row_processor=False,
):
"""Generate the cloud function code for a given user defined function.

Args:
input_types (tuple[str]):
Types of the input arguments in BigQuery SQL data type names.
output_type (str):
Types of the output scalar as a BigQuery SQL data type name.
"""
"""Generate the cloud function code for a given user defined function."""

# requirements.txt
if package_requirements:
Expand Down Expand Up @@ -240,14 +239,7 @@ def create_cloud_function(
memory_mib=1024,
ingress_settings="all",
):
"""Create a cloud function from the given user defined function.

Args:
input_types (tuple[str]):
Types of the input arguments in BigQuery SQL data type names.
output_type (str):
Types of the output scalar as a BigQuery SQL data type name.
"""
"""Create a cloud function from the given user defined function."""

# Build and deploy folder structure containing cloud function
with tempfile.TemporaryDirectory() as directory:
Expand Down Expand Up @@ -394,6 +386,7 @@ def provision_bq_remote_function(
cloud_function_vpc_connector,
cloud_function_memory_mib,
cloud_function_ingress_settings,
bq_metadata,
):
"""Provision a BigQuery remote function."""
# Augment user package requirements with any internal package
Expand Down Expand Up @@ -473,6 +466,7 @@ def provision_bq_remote_function(
cf_endpoint,
remote_function_name,
max_batching_rows,
bq_metadata,
)

created_new = True
Expand Down
43 changes: 34 additions & 9 deletions 43 bigframes/functions/_remote_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import bigframes_vendored.constants as constants
import bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
import bigframes_vendored.ibis.expr.operations.udf as ibis_udf
import cloudpickle
import google.api_core.exceptions
Expand Down Expand Up @@ -167,12 +168,19 @@ def remote_function(
`$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`.

Args:
input_types (None, type, or sequence(type)):
input_types (type or sequence(type), Optional):
For scalar user defined function it should be the input type or
sequence of input types. For row processing user defined function,
type `Series` should be specified.
output_type (Optional[type]):
Data type of the output in the user defined function.
sequence of input types. The supported scalar input types are
`bool`, `bytes`, `float`, `int`, `str`. For row processing user
defined function (i.e. functions that receive a single input
representing a row in form of a Series), type `Series` should be
specified.
output_type (type, Optional):
Data type of the output in the user defined function. If the
user defined function returns an array, then `list[type]` should
be specified. The supported output types are `bool`, `bytes`,
`float`, `int`, `str`, `list[bool]`, `list[float]`, `list[int]`
and `list[str]`.
session (bigframes.Session, Optional):
BigQuery DataFrames session to use for getting default project,
dataset and BigQuery connection.
Expand Down Expand Up @@ -497,6 +505,24 @@ def try_delattr(attr):
try_delattr("is_row_processor")
try_delattr("ibis_node")

# resolve the output type that can be supported in the bigframes,
# ibis, BQ remote functions and cloud functions integration
ibis_output_type_for_bqrf = ibis_signature.output_type
bqrf_metadata = None
if isinstance(ibis_signature.output_type, ibis_dtypes.Array):
# TODO(b/284515241): remove this special handling to support
# array output types once BQ remote functions support ARRAY.
# Until then, use json serialized strings at the cloud function
# and BQ level, and parse that to the intended output type at
# the bigframes level.
ibis_output_type_for_bqrf = ibis_dtypes.String()
bqrf_metadata = _utils.get_bigframes_metadata(
python_output_type=output_type
)
bqrf_output_type = third_party_ibis_bqtypes.BigQueryType.from_ibis(
ibis_output_type_for_bqrf
)

(
rf_name,
cf_name,
Expand All @@ -508,9 +534,7 @@ def try_delattr(attr):
for type_ in ibis_signature.input_types
if type_ is not None
),
output_type=third_party_ibis_bqtypes.BigQueryType.from_ibis(
ibis_signature.output_type
),
output_type=bqrf_output_type,
reuse=reuse,
name=name,
package_requirements=packages,
Expand All @@ -521,6 +545,7 @@ def try_delattr(attr):
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_memory_mib=cloud_function_memory_mib,
cloud_function_ingress_settings=cloud_function_ingress_settings,
bq_metadata=bqrf_metadata,
)

# TODO(shobs): Find a better way to support udfs with param named "name".
Expand All @@ -541,7 +566,7 @@ def try_delattr(attr):
name=rf_name,
catalog=dataset_ref.project,
database=dataset_ref.dataset_id,
signature=(ibis_signature.input_types, ibis_signature.output_type),
signature=(ibis_signature.input_types, ibis_output_type_for_bqrf),
) # type: ignore
func.bigframes_cloud_function = (
remote_function_client.get_cloud_function_fully_qualified_name(cf_name)
Expand Down
82 changes: 75 additions & 7 deletions 82 bigframes/functions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import hashlib
import inspect
import json
import typing
from typing import cast, List, NamedTuple, Optional, Sequence, Set

import bigframes_vendored.ibis.expr.datatypes.core as ibis_dtypes
Expand All @@ -26,6 +28,7 @@
import pyarrow

import bigframes.core.compile.ibis_types
import bigframes.dtypes

# Naming convention for the remote function artifacts
_BIGFRAMES_REMOTE_FUNCTION_PREFIX = "bigframes"
Expand Down Expand Up @@ -194,6 +197,7 @@ class IbisSignature(NamedTuple):
parameter_names: List[str]
input_types: List[Optional[ibis_dtypes.DataType]]
output_type: ibis_dtypes.DataType
output_type_override: Optional[ibis_dtypes.DataType] = None


def ibis_signature_from_python_signature(
Expand All @@ -202,13 +206,77 @@ def ibis_signature_from_python_signature(
output_type: type,
) -> IbisSignature:

ibis_input_types: List[Optional[ibis_dtypes.DataType]] = [
bigframes.core.compile.ibis_types.ibis_type_from_python_type(t)
for t in input_types
]

if typing.get_origin(output_type) is list:
ibis_output_type = (
bigframes.core.compile.ibis_types.ibis_array_output_type_from_python_type(
output_type
)
)
else:
ibis_output_type = bigframes.core.compile.ibis_types.ibis_type_from_python_type(
output_type
)

return IbisSignature(
parameter_names=list(signature.parameters.keys()),
input_types=[
bigframes.core.compile.ibis_types.ibis_type_from_python_type(t)
for t in input_types
],
output_type=bigframes.core.compile.ibis_types.ibis_type_from_python_type(
output_type
),
input_types=ibis_input_types,
output_type=ibis_output_type,
)


def get_python_output_type_from_bigframes_metadata(
metadata_text: str,
) -> Optional[type]:
try:
metadata_dict = json.loads(metadata_text)
except (TypeError, json.decoder.JSONDecodeError):
return None

try:
output_type = metadata_dict["value"]["python_array_output_type"]
except KeyError:
return None

for (
python_output_array_type
) in bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES:
if python_output_array_type.__name__ == output_type:
return list[python_output_array_type] # type: ignore

return None


def get_bigframes_metadata(*, python_output_type: Optional[type] = None) -> str:
# Let's keep the actual metadata inside one level of nesting so that in
# future we can use a top level key "version" (parallel to "value"), based
# on which "value" can be interpreted according to the "version". The
# absence of "version" should be interpreted as default version.
inner_metadata = {}
if typing.get_origin(python_output_type) is list:
python_output_array_type = typing.get_args(python_output_type)[0]
if (
python_output_array_type
in bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES
):
inner_metadata[
"python_array_output_type"
] = python_output_array_type.__name__

metadata = {"value": inner_metadata}
metadata_ser = json.dumps(metadata)

# let's make sure the serialized value is deserializable
if (
get_python_output_type_from_bigframes_metadata(metadata_ser)
!= python_output_type
):
raise ValueError(
f"python_output_type {python_output_type} is not serializable."
)

return metadata_ser
Loading
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.