diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 178c911591..f866575a26 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -145,7 +145,13 @@ def __init__( self._cloud_function_docker_repository = cloud_function_docker_repository def create_bq_remote_function( - self, input_args, input_types, output_type, endpoint, bq_function_name + self, + input_args, + input_types, + output_type, + endpoint, + bq_function_name, + max_batching_rows, ): """Create a BigQuery remote function given the artifacts of a user defined function and the http endpoint of a corresponding cloud function.""" @@ -169,14 +175,25 @@ def create_bq_remote_function( bq_function_args.append( f"{name} {third_party_ibis_bqtypes.BigQueryType.from_ibis(input_types[idx])}" ) + + remote_function_options = { + "endpoint": endpoint, + "max_batching_rows": max_batching_rows, + } + + remote_function_options_str = ", ".join( + [ + f'{key}="{val}"' if isinstance(val, str) else f"{key}={val}" + for key, val in remote_function_options.items() + if val is not None + ] + ) + create_function_ddl = f""" CREATE OR REPLACE FUNCTION `{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}({','.join(bq_function_args)}) RETURNS {bq_function_return_type} REMOTE WITH CONNECTION `{self._gcp_project_id}.{self._bq_location}.{self._bq_connection_id}` - OPTIONS ( - endpoint = "{endpoint}", - max_batching_rows = 1000 - )""" + OPTIONS ({remote_function_options_str})""" logger.info(f"Creating BQ remote function: {create_function_ddl}") @@ -438,6 +455,7 @@ def provision_bq_remote_function( reuse, name, package_requirements, + max_batching_rows, ): """Provision a BigQuery remote function.""" # If reuse of any existing function with the same name (indicated by the @@ -485,7 +503,12 @@ def provision_bq_remote_function( "Exactly one type should be provided for every input arg." ) self.create_bq_remote_function( - input_args, input_types, output_type, cf_endpoint, remote_function_name + input_args, + input_types, + output_type, + cf_endpoint, + remote_function_name, + max_batching_rows, ) else: logger.info(f"Remote function {remote_function_name} already exists.") @@ -607,6 +630,7 @@ def remote_function( cloud_function_service_account: Optional[str] = None, cloud_function_kms_key_name: Optional[str] = None, cloud_function_docker_repository: Optional[str] = None, + max_batching_rows: Optional[int] = 1000, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -723,6 +747,15 @@ def remote_function( projects/PROJECT_ID/locations/LOCATION/repositories/REPOSITORY_NAME. For more details see https://cloud.google.com/functions/docs/securing/cmek#before_you_begin. + max_batching_rows (int, Optional): + The maximum number of rows to be batched for processing in the + BQ remote function. Default value is 1000. A lower number can be + passed to avoid timeouts in case the user code is too complex to + process large number of rows fast enough. A higher number can be + used to increase throughput in case the user code is fast enough. + `None` can be passed to let BQ remote functions service apply + default batching. See for more details + https://cloud.google.com/bigquery/docs/remote-functions#limiting_number_of_rows_in_a_batch_request. """ import bigframes.pandas as bpd @@ -846,6 +879,7 @@ def wrapper(f): reuse, name, packages, + max_batching_rows, ) # TODO: Move ibis logic to compiler step diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 91c3eb603b..96af6ab1b3 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -643,6 +643,7 @@ def remote_function( cloud_function_service_account: Optional[str] = None, cloud_function_kms_key_name: Optional[str] = None, cloud_function_docker_repository: Optional[str] = None, + max_batching_rows: Optional[int] = 1000, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -656,6 +657,7 @@ def remote_function( cloud_function_service_account=cloud_function_service_account, cloud_function_kms_key_name=cloud_function_kms_key_name, cloud_function_docker_repository=cloud_function_docker_repository, + max_batching_rows=max_batching_rows, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b6d56006be..64bcebb6cc 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1541,6 +1541,7 @@ def remote_function( cloud_function_service_account: Optional[str] = None, cloud_function_kms_key_name: Optional[str] = None, cloud_function_docker_repository: Optional[str] = None, + max_batching_rows: Optional[int] = 1000, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1635,6 +1636,15 @@ def remote_function( projects/PROJECT_ID/locations/LOCATION/repositories/REPOSITORY_NAME. For more details see https://cloud.google.com/functions/docs/securing/cmek#before_you_begin. + max_batching_rows (int, Optional): + The maximum number of rows to be batched for processing in the + BQ remote function. Default value is 1000. A lower number can be + passed to avoid timeouts in case the user code is too complex to + process large number of rows fast enough. A higher number can be + used to increase throughput in case the user code is fast enough. + `None` can be passed to let BQ remote functions service apply + default batching. See for more details + https://cloud.google.com/bigquery/docs/remote-functions#limiting_number_of_rows_in_a_batch_request. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1656,6 +1666,7 @@ def remote_function( cloud_function_service_account=cloud_function_service_account, cloud_function_kms_key_name=cloud_function_kms_key_name, cloud_function_docker_repository=cloud_function_docker_repository, + max_batching_rows=max_batching_rows, ) def read_gbq_function( diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index cf6b2a01f8..ec9acc292e 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1300,3 +1300,39 @@ def square_num(x): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, square_num ) + + +@pytest.mark.parametrize( + ("max_batching_rows"), + [ + 10_000, + None, + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_max_batching_rows(session, scalars_dfs, max_batching_rows): + try: + + def square(x): + return x * x + + square_remote = session.remote_function( + [int], int, reuse=False, max_batching_rows=max_batching_rows + )(square) + + bq_routine = session.bqclient.get_routine( + square_remote.bigframes_remote_function + ) + assert bq_routine.remote_function_options.max_batching_rows == max_batching_rows + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, square_remote + )