Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: expose gcf memory param in remote_function #803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 27, 2024
17 changes: 16 additions & 1 deletion 17 bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ def create_cloud_function(
max_instance_count=None,
is_row_processor=False,
vpc_connector=None,
memory_mib=1024,
):
"""Create a cloud function from the given user defined function.

Expand Down Expand Up @@ -410,7 +411,8 @@ def create_cloud_function(
self._cloud_function_docker_repository
)
function.service_config = functions_v2.ServiceConfig()
function.service_config.available_memory = "1024M"
if memory_mib is not None:
function.service_config.available_memory = f"{memory_mib}Mi"
if timeout_seconds is not None:
if timeout_seconds > 1200:
raise ValueError(
Expand Down Expand Up @@ -473,6 +475,7 @@ def provision_bq_remote_function(
cloud_function_max_instance_count,
is_row_processor,
cloud_function_vpc_connector,
cloud_function_memory_mib,
):
"""Provision a BigQuery remote function."""
# If reuse of any existing function with the same name (indicated by the
Expand Down Expand Up @@ -504,6 +507,7 @@ def provision_bq_remote_function(
max_instance_count=cloud_function_max_instance_count,
is_row_processor=is_row_processor,
vpc_connector=cloud_function_vpc_connector,
memory_mib=cloud_function_memory_mib,
)
else:
logger.info(f"Cloud function {cloud_function_name} already exists.")
Expand Down Expand Up @@ -667,6 +671,7 @@ def remote_function(
cloud_function_timeout: Optional[int] = 600,
cloud_function_max_instances: Optional[int] = None,
cloud_function_vpc_connector: Optional[str] = None,
cloud_function_memory_mib: Optional[int] = 1024,
):
"""Decorator to turn a user defined function into a BigQuery remote function.

Expand Down Expand Up @@ -817,6 +822,15 @@ def remote_function(
function. This is useful if your code needs access to data or
service(s) that are on a VPC network. See for more details
https://cloud.google.com/functions/docs/networking/connecting-vpc.
cloud_function_memory_mib (int, Optional):
The amounts of memory (in mebibytes) to allocate for the cloud
function (2nd gen) created. This also dictates a corresponding
amount of allocated CPU for the function. By default a memory of
1024 MiB is set for the cloud functions created to support
BigQuery DataFrames remote function. If you want to let the
default memory of cloud functions be allocated, pass `None`. See
for more details
https://cloud.google.com/functions/docs/configuring/memory.
"""
# Some defaults may be used from the session if not provided otherwise
import bigframes.exceptions as bf_exceptions
Expand Down Expand Up @@ -1027,6 +1041,7 @@ def try_delattr(attr):
cloud_function_max_instance_count=cloud_function_max_instances,
is_row_processor=is_row_processor,
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_memory_mib=cloud_function_memory_mib,
)

# TODO: Move ibis logic to compiler step
Expand Down
2 changes: 2 additions & 0 deletions 2 bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ def remote_function(
cloud_function_timeout: Optional[int] = 600,
cloud_function_max_instances: Optional[int] = None,
cloud_function_vpc_connector: Optional[str] = None,
cloud_function_memory_mib: Optional[int] = 1024,
):
return global_session.with_default_session(
bigframes.session.Session.remote_function,
Expand All @@ -683,6 +684,7 @@ def remote_function(
cloud_function_timeout=cloud_function_timeout,
cloud_function_max_instances=cloud_function_max_instances,
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_memory_mib=cloud_function_memory_mib,
)


Expand Down
11 changes: 11 additions & 0 deletions 11 bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,7 @@ def remote_function(
cloud_function_timeout: Optional[int] = 600,
cloud_function_max_instances: Optional[int] = None,
cloud_function_vpc_connector: Optional[str] = None,
cloud_function_memory_mib: Optional[int] = 1024,
):
"""Decorator to turn a user defined function into a BigQuery remote function. Check out
the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes.
Expand Down Expand Up @@ -1670,6 +1671,15 @@ def remote_function(
function. This is useful if your code needs access to data or
service(s) that are on a VPC network. See for more details
https://cloud.google.com/functions/docs/networking/connecting-vpc.
cloud_function_memory_mib (int, Optional):
The amounts of memory (in mebibytes) to allocate for the cloud
function (2nd gen) created. This also dictates a corresponding
amount of allocated CPU for the function. By default a memory of
1024 MiB is set for the cloud functions created to support
BigQuery DataFrames remote function. If you want to let the
default memory of cloud functions be allocated, pass `None`. See
for more details
https://cloud.google.com/functions/docs/configuring/memory.
Returns:
callable: A remote function object pointing to the cloud assets created
in the background to support the remote execution. The cloud assets can be
Expand All @@ -1695,6 +1705,7 @@ def remote_function(
cloud_function_timeout=cloud_function_timeout,
cloud_function_max_instances=cloud_function_max_instances,
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_memory_mib=cloud_function_memory_mib,
)

def read_gbq_function(
Expand Down
60 changes: 60 additions & 0 deletions 60 tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -1800,3 +1800,63 @@ def float_parser(row):
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, float_parser_remote
)


@pytest.mark.parametrize(
("memory_mib_args", "expected_memory"),
[
pytest.param({}, "1024Mi", id="no-set"),
pytest.param({"cloud_function_memory_mib": None}, "256M", id="set-None"),
shobsi marked this conversation as resolved.
Show resolved Hide resolved
pytest.param({"cloud_function_memory_mib": 128}, "128Mi", id="set-128"),
pytest.param({"cloud_function_memory_mib": 1024}, "1024Mi", id="set-1024"),
pytest.param({"cloud_function_memory_mib": 4096}, "4096Mi", id="set-4096"),
pytest.param({"cloud_function_memory_mib": 32768}, "32768Mi", id="set-32768"),
shobsi marked this conversation as resolved.
Show resolved Hide resolved
],
)
@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_gcf_memory(
session, scalars_dfs, memory_mib_args, expected_memory
):
try:

def square(x: int) -> int:
return x * x

square_remote = session.remote_function(reuse=False, **memory_mib_args)(square)

# Assert that the GCF is created with the intended memory
gcf = session.cloudfunctionsclient.get_function(
name=square_remote.bigframes_cloud_function
)
assert gcf.service_config.available_memory == expected_memory

scalars_df, scalars_pandas_df = scalars_dfs

bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas()
pd_result = scalars_pandas_df["int64_too"].apply(square)

pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square_remote
)


@pytest.mark.parametrize(
("memory_mib",),
[
pytest.param(127, id="127-too-low"),
pytest.param(32769, id="set-32769-too-high"),
],
)
@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_gcf_memory_unsupported(session, memory_mib):
with pytest.raises(
google.api_core.exceptions.InvalidArgument,
match="Invalid value specified for container memory",
):

@session.remote_function(reuse=False, cloud_function_memory_mib=memory_mib)
def square(x: int) -> int:
return x * x
Morty Proxy This is a proxified and sanitized view of the page, visit original site.