Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: support ingress settings in remote_function #1011

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions 23 bigframes/functions/_remote_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import string
import sys
import tempfile
import types
from typing import cast, Tuple, TYPE_CHECKING

from bigframes_vendored import constants
Expand All @@ -43,6 +44,15 @@

logger = logging.getLogger(__name__)

# https://cloud.google.com/sdk/gcloud/reference/functions/deploy#--ingress-settings
_INGRESS_SETTINGS_MAP = types.MappingProxyType(
{
"all": functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL,
"internal-only": functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_ONLY,
"internal-and-gclb": functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_AND_GCLB,
}
)


class RemoteFunctionClient:
# Wait time (in seconds) for an IAM binding to take effect after creation
Expand Down Expand Up @@ -228,6 +238,7 @@ def create_cloud_function(
is_row_processor=False,
vpc_connector=None,
memory_mib=1024,
ingress_settings="all",
):
"""Create a cloud function from the given user defined function.

Expand Down Expand Up @@ -324,6 +335,16 @@ def create_cloud_function(
function.service_config.service_account_email = (
self._cloud_function_service_account
)
if ingress_settings not in _INGRESS_SETTINGS_MAP:
raise ValueError(
"'{}' not one of the supported ingress settings values: {}".format(
ingress_settings, list(_INGRESS_SETTINGS_MAP)
)
)
function.service_config.ingress_settings = cast(
functions_v2.ServiceConfig.IngressSettings,
_INGRESS_SETTINGS_MAP[ingress_settings],
)
function.kms_key_name = self._cloud_function_kms_key_name
create_function_request.function = function

Expand Down Expand Up @@ -372,6 +393,7 @@ def provision_bq_remote_function(
is_row_processor,
cloud_function_vpc_connector,
cloud_function_memory_mib,
cloud_function_ingress_settings,
):
"""Provision a BigQuery remote function."""
# Augment user package requirements with any internal package
Expand Down Expand Up @@ -418,6 +440,7 @@ def provision_bq_remote_function(
is_row_processor=is_row_processor,
vpc_connector=cloud_function_vpc_connector,
memory_mib=cloud_function_memory_mib,
ingress_settings=cloud_function_ingress_settings,
)
else:
logger.info(f"Cloud function {cloud_function_name} already exists.")
Expand Down
21 changes: 20 additions & 1 deletion 21 bigframes/functions/_remote_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,17 @@
import inspect
import sys
import threading
from typing import Any, cast, Dict, Mapping, Optional, Sequence, TYPE_CHECKING, Union
from typing import (
Any,
cast,
Dict,
Literal,
Mapping,
Optional,
Sequence,
TYPE_CHECKING,
Union,
)
import warnings

import bigframes_vendored.constants as constants
Expand Down Expand Up @@ -110,6 +120,9 @@ def remote_function(
cloud_function_max_instances: Optional[int] = None,
cloud_function_vpc_connector: Optional[str] = None,
cloud_function_memory_mib: Optional[int] = 1024,
cloud_function_ingress_settings: Literal[
"all", "internal-only", "internal-and-gclb"
] = "all",
):
"""Decorator to turn a user defined function into a BigQuery remote function.

Expand Down Expand Up @@ -280,6 +293,11 @@ def remote_function(
default memory of cloud functions be allocated, pass `None`. See
for more details
https://cloud.google.com/functions/docs/configuring/memory.
cloud_function_ingress_settings (str, Optional):
Ingress settings controls dictating what traffic can reach the
function. By default `all` will be used. It must be one of:
`all`, `internal-only`, `internal-and-gclb`. See for more details
https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings.
"""
# Some defaults may be used from the session if not provided otherwise
import bigframes.exceptions as bf_exceptions
Expand Down Expand Up @@ -504,6 +522,7 @@ def try_delattr(attr):
is_row_processor=is_row_processor,
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_memory_mib=cloud_function_memory_mib,
cloud_function_ingress_settings=cloud_function_ingress_settings,
)

# TODO(shobs): Find a better way to support udfs with param named "name".
Expand Down
4 changes: 4 additions & 0 deletions 4 bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,9 @@ def remote_function(
cloud_function_max_instances: Optional[int] = None,
cloud_function_vpc_connector: Optional[str] = None,
cloud_function_memory_mib: Optional[int] = 1024,
cloud_function_ingress_settings: Literal[
"all", "internal-only", "internal-and-gclb"
] = "all",
):
return global_session.with_default_session(
bigframes.session.Session.remote_function,
Expand All @@ -687,6 +690,7 @@ def remote_function(
cloud_function_max_instances=cloud_function_max_instances,
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_memory_mib=cloud_function_memory_mib,
cloud_function_ingress_settings=cloud_function_ingress_settings,
)


Expand Down
9 changes: 9 additions & 0 deletions 9 bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,9 @@ def remote_function(
cloud_function_max_instances: Optional[int] = None,
cloud_function_vpc_connector: Optional[str] = None,
cloud_function_memory_mib: Optional[int] = 1024,
cloud_function_ingress_settings: Literal[
"all", "internal-only", "internal-and-gclb"
] = "all",
):
"""Decorator to turn a user defined function into a BigQuery remote function. Check out
the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes.
Expand Down Expand Up @@ -1194,6 +1197,11 @@ def remote_function(
default memory of cloud functions be allocated, pass `None`. See
for more details
https://cloud.google.com/functions/docs/configuring/memory.
cloud_function_ingress_settings (str, Optional):
Ingress settings controls dictating what traffic can reach the
function. By default `all` will be used. It must be one of:
`all`, `internal-only`, `internal-and-gclb`. See for more details
https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings.
Returns:
callable: A remote function object pointing to the cloud assets created
in the background to support the remote execution. The cloud assets can be
Expand All @@ -1220,6 +1228,7 @@ def remote_function(
cloud_function_max_instances=cloud_function_max_instances,
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_memory_mib=cloud_function_memory_mib,
cloud_function_ingress_settings=cloud_function_ingress_settings,
)

def read_gbq_function(
Expand Down
66 changes: 66 additions & 0 deletions 66 tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -2173,3 +2173,69 @@ def foo(x):
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, foo
)


@pytest.mark.parametrize(
("ingress_settings_args", "effective_ingress_settings"),
[
pytest.param(
{}, functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, id="no-set"
),
pytest.param(
{"cloud_function_ingress_settings": "all"},
functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL,
id="set-all",
),
pytest.param(
{"cloud_function_ingress_settings": "internal-only"},
functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_ONLY,
id="set-internal-only",
),
pytest.param(
{"cloud_function_ingress_settings": "internal-and-gclb"},
functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_AND_GCLB,
id="set-internal-and-gclb",
),
],
)
@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_ingress_settings(
session, scalars_dfs, ingress_settings_args, effective_ingress_settings
):
try:

def square(x: int) -> int:
return x * x

square_remote = session.remote_function(reuse=False, **ingress_settings_args)(
square
)

# Assert that the GCF is created with the intended maximum timeout
gcf = session.cloudfunctionsclient.get_function(
name=square_remote.bigframes_cloud_function
)
assert gcf.service_config.ingress_settings == effective_ingress_settings

scalars_df, scalars_pandas_df = scalars_dfs

bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas()
pd_result = scalars_pandas_df["int64_too"].apply(square)

pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square_remote
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_ingress_settings_unsupported(session):
with pytest.raises(
ValueError, match="'unknown' not one of the supported ingress settings values"
):

@session.remote_function(reuse=False, cloud_function_ingress_settings="unknown")
def square(x: int) -> int:
return x * x
Morty Proxy This is a proxified and sanitized view of the page, visit original site.