Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: support gcf vpc connector in remote_function #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion 14 bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ def create_cloud_function(
timeout_seconds=600,
max_instance_count=None,
is_row_processor=False,
vpc_connector=None,
):
"""Create a cloud function from the given user defined function."""

Expand Down Expand Up @@ -519,6 +520,8 @@ def create_cloud_function(
function.service_config.timeout_seconds = timeout_seconds
if max_instance_count is not None:
function.service_config.max_instance_count = max_instance_count
if vpc_connector is not None:
function.service_config.vpc_connector = vpc_connector
function.service_config.service_account_email = (
self._cloud_function_service_account
)
Expand Down Expand Up @@ -568,6 +571,7 @@ def provision_bq_remote_function(
cloud_function_timeout,
cloud_function_max_instance_count,
is_row_processor,
cloud_function_vpc_connector,
):
"""Provision a BigQuery remote function."""
# If reuse of any existing function with the same name (indicated by the
Expand Down Expand Up @@ -595,6 +599,7 @@ def provision_bq_remote_function(
cloud_function_timeout,
cloud_function_max_instance_count,
is_row_processor,
cloud_function_vpc_connector,
)
else:
logger.info(f"Cloud function {cloud_function_name} already exists.")
Expand Down Expand Up @@ -750,6 +755,7 @@ def remote_function(
max_batching_rows: Optional[int] = 1000,
cloud_function_timeout: Optional[int] = 600,
cloud_function_max_instances: Optional[int] = None,
cloud_function_vpc_connector: Optional[str] = None,
):
"""Decorator to turn a user defined function into a BigQuery remote function.

Expand Down Expand Up @@ -894,7 +900,12 @@ def remote_function(
control the spike in the billing. Higher setting can help
support processing larger scale data. When not specified, cloud
function's default setting applies. For more details see
https://cloud.google.com/functions/docs/configuring/max-instances
https://cloud.google.com/functions/docs/configuring/max-instances.
cloud_function_vpc_connector (str, Optional):
The VPC connector you would like to configure for your cloud
function. This is useful if your code needs access to data or
service(s) that are on a VPC network. See for more details
https://cloud.google.com/functions/docs/networking/connecting-vpc.
"""
is_row_processor = False

Expand Down Expand Up @@ -1041,6 +1052,7 @@ def wrapper(f):
cloud_function_timeout,
cloud_function_max_instances,
is_row_processor,
cloud_function_vpc_connector,
)

# TODO: Move ibis logic to compiler step
Expand Down
2 changes: 2 additions & 0 deletions 2 bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ def remote_function(
max_batching_rows: Optional[int] = 1000,
cloud_function_timeout: Optional[int] = 600,
cloud_function_max_instances: Optional[int] = None,
cloud_function_vpc_connector: Optional[str] = None,
):
return global_session.with_default_session(
bigframes.session.Session.remote_function,
Expand All @@ -670,6 +671,7 @@ def remote_function(
max_batching_rows=max_batching_rows,
cloud_function_timeout=cloud_function_timeout,
cloud_function_max_instances=cloud_function_max_instances,
cloud_function_vpc_connector=cloud_function_vpc_connector,
)


Expand Down
9 changes: 8 additions & 1 deletion 9 bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,7 @@ def remote_function(
max_batching_rows: Optional[int] = 1000,
cloud_function_timeout: Optional[int] = 600,
cloud_function_max_instances: Optional[int] = None,
cloud_function_vpc_connector: Optional[str] = None,
):
"""Decorator to turn a user defined function into a BigQuery remote function. Check out
the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes.
Expand Down Expand Up @@ -1537,7 +1538,12 @@ def remote_function(
control the spike in the billing. Higher setting can help
support processing larger scale data. When not specified, cloud
function's default setting applies. For more details see
https://cloud.google.com/functions/docs/configuring/max-instances
https://cloud.google.com/functions/docs/configuring/max-instances.
cloud_function_vpc_connector (str, Optional):
The VPC connector you would like to configure for your cloud
function. This is useful if your code needs access to data or
service(s) that are on a VPC network. See for more details
https://cloud.google.com/functions/docs/networking/connecting-vpc.
Returns:
callable: A remote function object pointing to the cloud assets created
in the background to support the remote execution. The cloud assets can be
Expand All @@ -1562,6 +1568,7 @@ def remote_function(
max_batching_rows=max_batching_rows,
cloud_function_timeout=cloud_function_timeout,
cloud_function_max_instances=cloud_function_max_instances,
cloud_function_vpc_connector=cloud_function_vpc_connector,
)

def read_gbq_function(
Expand Down
75 changes: 74 additions & 1 deletion 75 tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import tempfile
import textwrap

from google.api_core.exceptions import BadRequest, NotFound
from google.api_core.exceptions import BadRequest, InvalidArgument, NotFound
from google.cloud import bigquery, storage
import pandas
import pytest
Expand Down Expand Up @@ -1333,6 +1333,79 @@ def square_num(x):
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_via_session_vpc(scalars_dfs):
# TODO(shobs): Automate the following set-up during testing in the test project.
#
# For upfront convenience, the following set up has been statically created
# in the project bigfrmames-dev-perf via cloud console:
#
# 1. Create a vpc connector as per
# https://cloud.google.com/vpc/docs/configure-serverless-vpc-access#gcloud
#
# $ gcloud compute networks vpc-access connectors create bigframes-vpc --project=bigframes-dev-perf --region=us-central1 --range 10.8.0.0/28
# Create request issued for: [bigframes-vpc]
# Waiting for operation [projects/bigframes-dev-perf/locations/us-central1/operations/f9f90df6-7cf4-4420-8c2f-b3952775dcfb] to complete...done.
# Created connector [bigframes-vpc].
#
# $ gcloud compute networks vpc-access connectors list --project=bigframes-dev-perf --region=us-central1
# CONNECTOR_ID REGION NETWORK IP_CIDR_RANGE SUBNET SUBNET_PROJECT MACHINE_TYPE MIN_INSTANCES MAX_INSTANCES MIN_THROUGHPUT MAX_THROUGHPUT STATE
# bigframes-vpc us-central1 default 10.8.0.0/28 e2-micro 2 10 200 1000 READY

project = "bigframes-dev-perf"
gcf_vpc_connector = "bigframes-vpc"

rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=project))

try:

def square_num(x):
if x is None:
return x
return x * x

square_num_remote = rf_session.remote_function(
[int], int, reuse=False, cloud_function_vpc_connector=gcf_vpc_connector
)(square_num)

scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_result_col = bf_int64_col.apply(square_num_remote)
bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas()

pd_int64_col = scalars_pandas_df["int64_col"]
pd_result_col = pd_int64_col.apply(square_num)
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result, check_dtype=False)

# Assert that the GCF is created with the intended vpc connector
gcf = rf_session.cloudfunctionsclient.get_function(
name=square_num_remote.bigframes_cloud_function
)
assert gcf.service_config.vpc_connector == gcf_vpc_connector
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
rf_session.bqclient, rf_session.cloudfunctionsclient, square_num_remote
)


def test_remote_function_via_session_vpc_invalid(session):
with pytest.raises(
InvalidArgument, match="400.*Serverless VPC Access connector is not found"
):

@session.remote_function(
[int], int, reuse=False, cloud_function_vpc_connector="does-not-exist"
)
def square_num(x):
if x is None:
return x
return x * x


@pytest.mark.parametrize(
("max_batching_rows"),
[
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.