diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index 0ab19ca353..c69e430836 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -176,7 +176,7 @@ def remote_function( getting and setting IAM roles on cloud resources. If this param is not provided then resource manager client from the session would be used. - dataset (str, Optional.): + dataset (str, Optional): Dataset in which to create a BigQuery remote function. It should be in `.` or `` format. If this parameter is not provided then session dataset id is used. diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index ddb36a9bef..39e3bfd8f0 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -108,6 +108,7 @@ def read_gbq_function( function_name: str, *, session: Session, + is_row_processor: bool = False, ): """ Read an existing BigQuery function and prepare it for use in future queries. @@ -194,5 +195,6 @@ def func(*ignored_args, **ignored_kwargs): func.output_dtype = bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( # type: ignore ibis_signature.output_type ) + func.is_row_processor = is_row_processor # type: ignore func.ibis_node = node # type: ignore return func diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 08d808572d..9f33a8a1ea 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -692,10 +692,11 @@ def remote_function( remote_function.__doc__ = inspect.getdoc(bigframes.session.Session.remote_function) -def read_gbq_function(function_name: str): +def read_gbq_function(function_name: str, is_row_processor: bool = False): return global_session.with_default_session( bigframes.session.Session.read_gbq_function, function_name=function_name, + is_row_processor=is_row_processor, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index e52e2ef17f..045483bd53 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1225,6 +1225,7 @@ def remote_function( def read_gbq_function( self, function_name: str, + is_row_processor: bool = False, ): """Loads a BigQuery function from BigQuery. @@ -1255,7 +1256,7 @@ def read_gbq_function( >>> func('AURÉLIE') 'aurÉlie' - You can apply it to a BigQuery DataFrame Series. + You can apply it to a BigQuery DataFrames Series. >>> df = bpd.DataFrame({'id': [1, 2, 3], 'name': ['AURÉLIE', 'CÉLESTINE', 'DAPHNÉ']}) >>> df @@ -1275,13 +1276,33 @@ def read_gbq_function( [3 rows x 3 columns] - You can even use a function with multiple inputs. For example, let's use - [cw_instr4](https://github.com/GoogleCloudPlatform/bigquery-utils/blob/master/udfs/community/README.md#cw_instr4source-string-search-string-position-int64-ocurrence-int64) + You can even use a function with multiple inputs. For example, + [cw_regexp_replace_5](https://github.com/GoogleCloudPlatform/bigquery-utils/blob/master/udfs/community/README.md#cw_regexp_replace_5haystack-string-regexp-string-replacement-string-offset-int64-occurrence-int64) from Community UDFs. - >>> func = bpd.read_gbq_function("bqutil.fn.cw_instr4") - >>> func('TestStr123456Str', 'Str', 1, 2) - 14 + >>> func = bpd.read_gbq_function("bqutil.fn.cw_regexp_replace_5") + >>> func('TestStr123456', 'Str', 'Cad$', 1, 1) + 'TestCad$123456' + + >>> df = bpd.DataFrame({ + ... "haystack" : ["TestStr123456", "TestStr123456Str", "TestStr123456Str"], + ... "regexp" : ["Str", "Str", "Str"], + ... "replacement" : ["Cad$", "Cad$", "Cad$"], + ... "offset" : [1, 1, 1], + ... "occurrence" : [1, 2, 1] + ... }) + >>> df + haystack regexp replacement offset occurrence + 0 TestStr123456 Str Cad$ 1 1 + 1 TestStr123456Str Str Cad$ 1 2 + 2 TestStr123456Str Str Cad$ 1 1 + + [3 rows x 5 columns] + >>> df.apply(func, axis=1) + 0 TestCad$123456 + 1 TestStr123456Cad$ + 2 TestCad$123456Str + dtype: string Args: function_name (str): @@ -1290,6 +1311,10 @@ def read_gbq_function( `dataset_id.function_name` to load from the default project, or `function_name` to load from the default project and the dataset associated with the current session. + is_row_processor (bool, default False): + Whether the function is a row processor. This is set to True + for a function which receives an entire row of a DataFrame as + a pandas Series. Returns: callable: A function object pointing to the BigQuery function read @@ -1303,6 +1328,7 @@ def read_gbq_function( return bigframes_rf.read_gbq_function( function_name=function_name, session=self, + is_row_processor=is_row_processor, ) def _prepare_copy_job_config(self) -> bigquery.CopyJobConfig: diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index d6eefc1e31..77ea4627ec 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1603,6 +1603,13 @@ def serialize_row(row): # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' # , ignore this mismatch by using check_dtype=False. pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + # Let's make sure the read_gbq_function path works for this function + serialize_row_reuse = session.read_gbq_function( + serialize_row_remote.bigframes_remote_function, is_row_processor=True + ) + bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( @@ -2085,6 +2092,13 @@ def foo(x, y, z): pandas.testing.assert_series_equal( expected_result, bf_result, check_dtype=False, check_index_type=False ) + + # Let's make sure the read_gbq_function path works for this function + foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) + bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets(