Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

fix: support read_gbq_function for axis=1 application #950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 2 bigframes/functions/_remote_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def remote_function(
getting and setting IAM roles on cloud resources. If this param is
not provided then resource manager client from the session would be
used.
dataset (str, Optional.):
dataset (str, Optional):
Dataset in which to create a BigQuery remote function. It should be in
`<project_id>.<dataset_name>` or `<dataset_name>` format. If this
parameter is not provided then session dataset id is used.
Expand Down
2 changes: 2 additions & 0 deletions 2 bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def read_gbq_function(
function_name: str,
*,
session: Session,
is_row_processor: bool = False,
):
"""
Read an existing BigQuery function and prepare it for use in future queries.
Expand Down Expand Up @@ -194,5 +195,6 @@ def func(*ignored_args, **ignored_kwargs):
func.output_dtype = bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( # type: ignore
ibis_signature.output_type
)
func.is_row_processor = is_row_processor # type: ignore
func.ibis_node = node # type: ignore
return func
3 changes: 2 additions & 1 deletion 3 bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,11 @@ def remote_function(
remote_function.__doc__ = inspect.getdoc(bigframes.session.Session.remote_function)


def read_gbq_function(function_name: str):
def read_gbq_function(function_name: str, is_row_processor: bool = False):
return global_session.with_default_session(
bigframes.session.Session.read_gbq_function,
function_name=function_name,
is_row_processor=is_row_processor,
)


Expand Down
38 changes: 32 additions & 6 deletions 38 bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,7 @@ def remote_function(
def read_gbq_function(
self,
function_name: str,
is_row_processor: bool = False,
):
"""Loads a BigQuery function from BigQuery.

Expand Down Expand Up @@ -1255,7 +1256,7 @@ def read_gbq_function(
>>> func('AURÉLIE')
'aurÉlie'

You can apply it to a BigQuery DataFrame Series.
You can apply it to a BigQuery DataFrames Series.

>>> df = bpd.DataFrame({'id': [1, 2, 3], 'name': ['AURÉLIE', 'CÉLESTINE', 'DAPHNÉ']})
>>> df
Expand All @@ -1275,13 +1276,33 @@ def read_gbq_function(
<BLANKLINE>
[3 rows x 3 columns]

You can even use a function with multiple inputs. For example, let's use
[cw_instr4](https://github.com/GoogleCloudPlatform/bigquery-utils/blob/master/udfs/community/README.md#cw_instr4source-string-search-string-position-int64-ocurrence-int64)
You can even use a function with multiple inputs. For example,
[cw_regexp_replace_5](https://github.com/GoogleCloudPlatform/bigquery-utils/blob/master/udfs/community/README.md#cw_regexp_replace_5haystack-string-regexp-string-replacement-string-offset-int64-occurrence-int64)
from Community UDFs.

>>> func = bpd.read_gbq_function("bqutil.fn.cw_instr4")
>>> func('TestStr123456Str', 'Str', 1, 2)
14
>>> func = bpd.read_gbq_function("bqutil.fn.cw_regexp_replace_5")
>>> func('TestStr123456', 'Str', 'Cad$', 1, 1)
'TestCad$123456'

>>> df = bpd.DataFrame({
... "haystack" : ["TestStr123456", "TestStr123456Str", "TestStr123456Str"],
... "regexp" : ["Str", "Str", "Str"],
... "replacement" : ["Cad$", "Cad$", "Cad$"],
... "offset" : [1, 1, 1],
... "occurrence" : [1, 2, 1]
... })
>>> df
haystack regexp replacement offset occurrence
0 TestStr123456 Str Cad$ 1 1
1 TestStr123456Str Str Cad$ 1 2
2 TestStr123456Str Str Cad$ 1 1
<BLANKLINE>
[3 rows x 5 columns]
>>> df.apply(func, axis=1)
0 TestCad$123456
1 TestStr123456Cad$
2 TestCad$123456Str
dtype: string

Args:
function_name (str):
Expand All @@ -1290,6 +1311,10 @@ def read_gbq_function(
`dataset_id.function_name` to load from the default project, or
`function_name` to load from the default project and the dataset
associated with the current session.
is_row_processor (bool, default False):
Whether the function is a row processor. This is set to True
for a function which receives an entire row of a DataFrame as
a pandas Series.

Returns:
callable: A function object pointing to the BigQuery function read
Expand All @@ -1303,6 +1328,7 @@ def read_gbq_function(
return bigframes_rf.read_gbq_function(
function_name=function_name,
session=self,
is_row_processor=is_row_processor,
)

def _prepare_copy_job_config(self) -> bigquery.CopyJobConfig:
Expand Down
14 changes: 14 additions & 0 deletions 14 tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,13 @@ def serialize_row(row):
# bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object'
# , ignore this mismatch by using check_dtype=False.
pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)

# Let's make sure the read_gbq_function path works for this function
serialize_row_reuse = session.read_gbq_function(
serialize_row_remote.bigframes_remote_function, is_row_processor=True
)
bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas()
pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
Expand Down Expand Up @@ -2085,6 +2092,13 @@ def foo(x, y, z):
pandas.testing.assert_series_equal(
expected_result, bf_result, check_dtype=False, check_index_type=False
)

# Let's make sure the read_gbq_function path works for this function
foo_reuse = session.read_gbq_function(foo.bigframes_remote_function)
bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas()
pandas.testing.assert_series_equal(
expected_result, bf_result, check_dtype=False, check_index_type=False
)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.