diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 6e42ca9f48..2a7a900779 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -441,6 +441,7 @@ def create_cloud_function( timeout_seconds=600, max_instance_count=None, is_row_processor=False, + vpc_connector=None, ): """Create a cloud function from the given user defined function.""" @@ -519,6 +520,8 @@ def create_cloud_function( function.service_config.timeout_seconds = timeout_seconds if max_instance_count is not None: function.service_config.max_instance_count = max_instance_count + if vpc_connector is not None: + function.service_config.vpc_connector = vpc_connector function.service_config.service_account_email = ( self._cloud_function_service_account ) @@ -568,6 +571,7 @@ def provision_bq_remote_function( cloud_function_timeout, cloud_function_max_instance_count, is_row_processor, + cloud_function_vpc_connector, ): """Provision a BigQuery remote function.""" # If reuse of any existing function with the same name (indicated by the @@ -595,6 +599,7 @@ def provision_bq_remote_function( cloud_function_timeout, cloud_function_max_instance_count, is_row_processor, + cloud_function_vpc_connector, ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") @@ -750,6 +755,7 @@ def remote_function( max_batching_rows: Optional[int] = 1000, cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, + cloud_function_vpc_connector: Optional[str] = None, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -894,7 +900,12 @@ def remote_function( control the spike in the billing. Higher setting can help support processing larger scale data. When not specified, cloud function's default setting applies. For more details see - https://cloud.google.com/functions/docs/configuring/max-instances + https://cloud.google.com/functions/docs/configuring/max-instances. + cloud_function_vpc_connector (str, Optional): + The VPC connector you would like to configure for your cloud + function. This is useful if your code needs access to data or + service(s) that are on a VPC network. See for more details + https://cloud.google.com/functions/docs/networking/connecting-vpc. """ is_row_processor = False @@ -1041,6 +1052,7 @@ def wrapper(f): cloud_function_timeout, cloud_function_max_instances, is_row_processor, + cloud_function_vpc_connector, ) # TODO: Move ibis logic to compiler step diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 1d6da46fae..8d2c0b148c 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -654,6 +654,7 @@ def remote_function( max_batching_rows: Optional[int] = 1000, cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, + cloud_function_vpc_connector: Optional[str] = None, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -670,6 +671,7 @@ def remote_function( max_batching_rows=max_batching_rows, cloud_function_timeout=cloud_function_timeout, cloud_function_max_instances=cloud_function_max_instances, + cloud_function_vpc_connector=cloud_function_vpc_connector, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 473fc4f098..727269e7ee 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1412,6 +1412,7 @@ def remote_function( max_batching_rows: Optional[int] = 1000, cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, + cloud_function_vpc_connector: Optional[str] = None, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1537,7 +1538,12 @@ def remote_function( control the spike in the billing. Higher setting can help support processing larger scale data. When not specified, cloud function's default setting applies. For more details see - https://cloud.google.com/functions/docs/configuring/max-instances + https://cloud.google.com/functions/docs/configuring/max-instances. + cloud_function_vpc_connector (str, Optional): + The VPC connector you would like to configure for your cloud + function. This is useful if your code needs access to data or + service(s) that are on a VPC network. See for more details + https://cloud.google.com/functions/docs/networking/connecting-vpc. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1562,6 +1568,7 @@ def remote_function( max_batching_rows=max_batching_rows, cloud_function_timeout=cloud_function_timeout, cloud_function_max_instances=cloud_function_max_instances, + cloud_function_vpc_connector=cloud_function_vpc_connector, ) def read_gbq_function( diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index e086903d03..b7d99ea36c 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -21,7 +21,7 @@ import tempfile import textwrap -from google.api_core.exceptions import BadRequest, NotFound +from google.api_core.exceptions import BadRequest, InvalidArgument, NotFound from google.cloud import bigquery, storage import pandas import pytest @@ -1333,6 +1333,79 @@ def square_num(x): ) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_via_session_vpc(scalars_dfs): + # TODO(shobs): Automate the following set-up during testing in the test project. + # + # For upfront convenience, the following set up has been statically created + # in the project bigfrmames-dev-perf via cloud console: + # + # 1. Create a vpc connector as per + # https://cloud.google.com/vpc/docs/configure-serverless-vpc-access#gcloud + # + # $ gcloud compute networks vpc-access connectors create bigframes-vpc --project=bigframes-dev-perf --region=us-central1 --range 10.8.0.0/28 + # Create request issued for: [bigframes-vpc] + # Waiting for operation [projects/bigframes-dev-perf/locations/us-central1/operations/f9f90df6-7cf4-4420-8c2f-b3952775dcfb] to complete...done. + # Created connector [bigframes-vpc]. + # + # $ gcloud compute networks vpc-access connectors list --project=bigframes-dev-perf --region=us-central1 + # CONNECTOR_ID REGION NETWORK IP_CIDR_RANGE SUBNET SUBNET_PROJECT MACHINE_TYPE MIN_INSTANCES MAX_INSTANCES MIN_THROUGHPUT MAX_THROUGHPUT STATE + # bigframes-vpc us-central1 default 10.8.0.0/28 e2-micro 2 10 200 1000 READY + + project = "bigframes-dev-perf" + gcf_vpc_connector = "bigframes-vpc" + + rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=project)) + + try: + + def square_num(x): + if x is None: + return x + return x * x + + square_num_remote = rf_session.remote_function( + [int], int, reuse=False, cloud_function_vpc_connector=gcf_vpc_connector + )(square_num) + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_int64_col = scalars_df["int64_col"] + bf_result_col = bf_int64_col.apply(square_num_remote) + bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() + + pd_int64_col = scalars_pandas_df["int64_col"] + pd_result_col = pd_int64_col.apply(square_num) + pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) + + assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + + # Assert that the GCF is created with the intended vpc connector + gcf = rf_session.cloudfunctionsclient.get_function( + name=square_num_remote.bigframes_cloud_function + ) + assert gcf.service_config.vpc_connector == gcf_vpc_connector + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + rf_session.bqclient, rf_session.cloudfunctionsclient, square_num_remote + ) + + +def test_remote_function_via_session_vpc_invalid(session): + with pytest.raises( + InvalidArgument, match="400.*Serverless VPC Access connector is not found" + ): + + @session.remote_function( + [int], int, reuse=False, cloud_function_vpc_connector="does-not-exist" + ) + def square_num(x): + if x is None: + return x + return x * x + + @pytest.mark.parametrize( ("max_batching_rows"), [