Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on May 7, 2026. It is now read-only.

Commit 4caf74c

Browse filesBrowse files
feat: Add cloud_function_cpus option to remote_function (#2475)
1 parent cb00daa commit 4caf74c
Copy full SHA for 4caf74c

5 files changed

+120-17Lines changed: 120 additions & 17 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎bigframes/functions/_function_client.py‎

Copy file name to clipboardExpand all lines: bigframes/functions/_function_client.py
+67-1Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
# BQ managed functions (@udf) currently only support Python 3.11.
6666
_MANAGED_FUNC_PYTHON_VERSION = "python-3.11"
6767

68+
_DEFAULT_FUNCTION_MEMORY_MIB = 1024
69+
6870

6971
class FunctionClient:
7072
# Wait time (in seconds) for an IAM binding to take effect after creation.
@@ -402,8 +404,12 @@ def create_cloud_function(
402404
is_row_processor=False,
403405
vpc_connector=None,
404406
vpc_connector_egress_settings="private-ranges-only",
405-
memory_mib=1024,
407+
memory_mib=None,
408+
cpus=None,
406409
ingress_settings="internal-only",
410+
workers=None,
411+
threads=None,
412+
concurrency=None,
407413
):
408414
"""Create a cloud function from the given user defined function."""
409415

@@ -486,6 +492,8 @@ def create_cloud_function(
486492
function.service_config = functions_v2.ServiceConfig()
487493
if memory_mib is not None:
488494
function.service_config.available_memory = f"{memory_mib}Mi"
495+
if cpus is not None:
496+
function.service_config.available_cpu = str(cpus)
489497
if timeout_seconds is not None:
490498
if timeout_seconds > 1200:
491499
raise bf_formatting.create_exception_with_feedback_link(
@@ -517,6 +525,20 @@ def create_cloud_function(
517525
function.service_config.service_account_email = (
518526
self._cloud_function_service_account
519527
)
528+
if concurrency:
529+
function.service_config.max_instance_request_concurrency = concurrency
530+
531+
# Functions framework use environment variables to pass config to gunicorn
532+
# See https://github.com/GoogleCloudPlatform/functions-framework-python/issues/241
533+
# Code: https://github.com/GoogleCloudPlatform/functions-framework-python/blob/v3.10.1/src/functions_framework/_http/gunicorn.py#L37-L43
534+
env_vars = {}
535+
if workers:
536+
env_vars["WORKERS"] = str(workers)
537+
if threads:
538+
env_vars["THREADS"] = str(threads)
539+
if env_vars:
540+
function.service_config.environment_variables = env_vars
541+
520542
if ingress_settings not in _INGRESS_SETTINGS_MAP:
521543
raise bf_formatting.create_exception_with_feedback_link(
522544
ValueError,
@@ -581,6 +603,7 @@ def provision_bq_remote_function(
581603
cloud_function_vpc_connector,
582604
cloud_function_vpc_connector_egress_settings,
583605
cloud_function_memory_mib,
606+
cloud_function_cpus,
584607
cloud_function_ingress_settings,
585608
bq_metadata,
586609
):
@@ -616,6 +639,21 @@ def provision_bq_remote_function(
616639
)
617640
cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name)
618641

642+
if cloud_function_memory_mib is None:
643+
cloud_function_memory_mib = _DEFAULT_FUNCTION_MEMORY_MIB
644+
645+
# assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL
646+
# therefore, want to allocate a worker for each cpu, and allow a concurrent request per worker
647+
expected_milli_cpus = (
648+
int(cloud_function_cpus * 1000)
649+
if (cloud_function_cpus is not None)
650+
else _infer_milli_cpus_from_memory(cloud_function_memory_mib)
651+
)
652+
workers = -(expected_milli_cpus // -1000) # ceil(cpus) without invoking floats
653+
threads = 4 # (per worker)
654+
# max concurrency==1 for vcpus < 1 hard limit from cloud run
655+
concurrency = (workers * threads) if (expected_milli_cpus >= 1000) else 1
656+
619657
# Create the cloud function if it does not exist
620658
if not cf_endpoint:
621659
cf_endpoint = self.create_cloud_function(
@@ -630,7 +668,11 @@ def provision_bq_remote_function(
630668
vpc_connector=cloud_function_vpc_connector,
631669
vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
632670
memory_mib=cloud_function_memory_mib,
671+
cpus=cloud_function_cpus,
633672
ingress_settings=cloud_function_ingress_settings,
673+
workers=workers,
674+
threads=threads,
675+
concurrency=concurrency,
634676
)
635677
else:
636678
logger.info(f"Cloud function {cloud_function_name} already exists.")
@@ -696,3 +738,27 @@ def get_remote_function_specs(self, remote_function_name):
696738
# Note: list_routines doesn't make an API request until we iterate on the response object.
697739
pass
698740
return (http_endpoint, bq_connection)
741+
742+
743+
def _infer_milli_cpus_from_memory(memory_mib: int) -> int:
744+
# observed values, not formally documented by cloud run functions
745+
if memory_mib < 128:
746+
raise ValueError("Cloud run supports at minimum 128MiB per instance")
747+
elif memory_mib == 128:
748+
return 83
749+
elif memory_mib <= 256:
750+
return 167
751+
elif memory_mib <= 512:
752+
return 333
753+
elif memory_mib <= 1024:
754+
return 583
755+
elif memory_mib <= 2048:
756+
return 1000
757+
elif memory_mib <= 8192:
758+
return 2000
759+
elif memory_mib <= 16384:
760+
return 4000
761+
elif memory_mib <= 32768:
762+
return 8000
763+
else:
764+
raise ValueError("Cloud run supports at most 32768MiB per instance")
Collapse file

‎bigframes/functions/_function_session.py‎

Copy file name to clipboardExpand all lines: bigframes/functions/_function_session.py
+7-1Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,8 @@ def remote_function(
248248
cloud_function_vpc_connector_egress_settings: Optional[
249249
Literal["all", "private-ranges-only", "unspecified"]
250250
] = None,
251-
cloud_function_memory_mib: Optional[int] = 1024,
251+
cloud_function_memory_mib: Optional[int] = None,
252+
cloud_function_cpus: Optional[float] = None,
252253
cloud_function_ingress_settings: Literal[
253254
"all", "internal-only", "internal-and-gclb"
254255
] = "internal-only",
@@ -444,6 +445,10 @@ def remote_function(
444445
default memory of cloud functions be allocated, pass `None`. See
445446
for more details
446447
https://cloud.google.com/functions/docs/configuring/memory.
448+
cloud_function_cpus (float, Optional):
449+
The number of cpus to allocate for the cloud
450+
function (2nd gen) created.
451+
https://docs.cloud.google.com/run/docs/configuring/services/cpu.
447452
cloud_function_ingress_settings (str, Optional):
448453
Ingress settings controls dictating what traffic can reach the
449454
function. Options are: `all`, `internal-only`, or `internal-and-gclb`.
@@ -638,6 +643,7 @@ def wrapper(func):
638643
cloud_function_vpc_connector=cloud_function_vpc_connector,
639644
cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
640645
cloud_function_memory_mib=cloud_function_memory_mib,
646+
cloud_function_cpus=cloud_function_cpus,
641647
cloud_function_ingress_settings=cloud_function_ingress_settings,
642648
bq_metadata=bqrf_metadata,
643649
)
Collapse file

‎bigframes/pandas/__init__.py‎

Copy file name to clipboardExpand all lines: bigframes/pandas/__init__.py
+3-1Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ def remote_function(
8888
cloud_function_vpc_connector_egress_settings: Optional[
8989
Literal["all", "private-ranges-only", "unspecified"]
9090
] = None,
91-
cloud_function_memory_mib: Optional[int] = 1024,
91+
cloud_function_memory_mib: Optional[int] = None,
92+
cloud_function_cpus: Optional[float] = None,
9293
cloud_function_ingress_settings: Literal[
9394
"all", "internal-only", "internal-and-gclb"
9495
] = "internal-only",
@@ -112,6 +113,7 @@ def remote_function(
112113
cloud_function_vpc_connector=cloud_function_vpc_connector,
113114
cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
114115
cloud_function_memory_mib=cloud_function_memory_mib,
116+
cloud_function_cpus=cloud_function_cpus,
115117
cloud_function_ingress_settings=cloud_function_ingress_settings,
116118
cloud_build_service_account=cloud_build_service_account,
117119
)
Collapse file

‎bigframes/session/__init__.py‎

Copy file name to clipboardExpand all lines: bigframes/session/__init__.py
+7-1Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1536,7 +1536,8 @@ def remote_function(
15361536
cloud_function_vpc_connector_egress_settings: Optional[
15371537
Literal["all", "private-ranges-only", "unspecified"]
15381538
] = None,
1539-
cloud_function_memory_mib: Optional[int] = 1024,
1539+
cloud_function_memory_mib: Optional[int] = None,
1540+
cloud_function_cpus: Optional[float] = None,
15401541
cloud_function_ingress_settings: Literal[
15411542
"all", "internal-only", "internal-and-gclb"
15421543
] = "internal-only",
@@ -1717,6 +1718,10 @@ def remote_function(
17171718
default memory of cloud functions be allocated, pass `None`. See
17181719
for more details
17191720
https://cloud.google.com/functions/docs/configuring/memory.
1721+
cloud_function_cpus (float, Optional):
1722+
The number of cpus to allocate for the cloud
1723+
function (2nd gen) created.
1724+
https://docs.cloud.google.com/run/docs/configuring/services/cpu.
17201725
cloud_function_ingress_settings (str, Optional):
17211726
Ingress settings controls dictating what traffic can reach the
17221727
function. Options are: `all`, `internal-only`, or `internal-and-gclb`.
@@ -1767,6 +1772,7 @@ def remote_function(
17671772
cloud_function_vpc_connector=cloud_function_vpc_connector,
17681773
cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
17691774
cloud_function_memory_mib=cloud_function_memory_mib,
1775+
cloud_function_cpus=cloud_function_cpus,
17701776
cloud_function_ingress_settings=cloud_function_ingress_settings,
17711777
cloud_build_service_account=cloud_build_service_account,
17721778
)
Collapse file

‎tests/system/large/functions/test_remote_function.py‎

Copy file name to clipboardExpand all lines: tests/system/large/functions/test_remote_function.py
+36-13Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2089,19 +2089,40 @@ def foo_list(x: pandas.Series, y0: float, y1, y2) -> list[str]:
20892089

20902090

20912091
@pytest.mark.parametrize(
2092-
("memory_mib_args", "expected_memory"),
2092+
(
2093+
"memory_mib_args",
2094+
"expected_memory",
2095+
"expected_cpus",
2096+
),
20932097
[
2094-
pytest.param({}, "1024Mi", id="no-set"),
2095-
pytest.param({"cloud_function_memory_mib": None}, "256M", id="set-None"),
2096-
pytest.param({"cloud_function_memory_mib": 128}, "128Mi", id="set-128"),
2097-
pytest.param({"cloud_function_memory_mib": 1024}, "1024Mi", id="set-1024"),
2098-
pytest.param({"cloud_function_memory_mib": 4096}, "4096Mi", id="set-4096"),
2099-
pytest.param({"cloud_function_memory_mib": 32768}, "32768Mi", id="set-32768"),
2098+
pytest.param({}, "1024Mi", None, id="no-set"),
2099+
pytest.param(
2100+
{"cloud_function_memory_mib": None}, "1024Mi", None, id="set-None"
2101+
),
2102+
pytest.param({"cloud_function_memory_mib": 128}, "128Mi", None, id="set-128"),
2103+
pytest.param(
2104+
{"cloud_function_memory_mib": 512, "cloud_function_cpus": 0.6},
2105+
"512Mi",
2106+
"0.6",
2107+
id="set-512",
2108+
),
2109+
pytest.param(
2110+
{"cloud_function_memory_mib": 1024}, "1024Mi", None, id="set-1024"
2111+
),
2112+
pytest.param(
2113+
{"cloud_function_memory_mib": 4096, "cloud_function_cpus": 4},
2114+
"4096Mi",
2115+
"4",
2116+
id="set-4096",
2117+
),
2118+
pytest.param(
2119+
{"cloud_function_memory_mib": 32768}, "32768Mi", None, id="set-32768"
2120+
),
21002121
],
21012122
)
21022123
@pytest.mark.flaky(retries=2, delay=120)
21032124
def test_remote_function_gcf_memory(
2104-
session, scalars_dfs, memory_mib_args, expected_memory
2125+
session, scalars_dfs, memory_mib_args, expected_memory, expected_cpus
21052126
):
21062127
try:
21072128

@@ -2117,6 +2138,12 @@ def square(x: int) -> int:
21172138
name=square_remote.bigframes_cloud_function
21182139
)
21192140
assert gcf.service_config.available_memory == expected_memory
2141+
if expected_cpus is not None:
2142+
assert gcf.service_config.available_cpu == expected_cpus
2143+
if float(gcf.service_config.available_cpu) >= 1.0:
2144+
assert gcf.service_config.max_instance_request_concurrency >= float(
2145+
gcf.service_config.available_cpu
2146+
)
21202147

21212148
scalars_df, scalars_pandas_df = scalars_dfs
21222149

@@ -2138,12 +2165,8 @@ def square(x: int) -> int:
21382165
pytest.param(32769, id="set-32769-too-high"),
21392166
],
21402167
)
2141-
@pytest.mark.flaky(retries=2, delay=120)
21422168
def test_remote_function_gcf_memory_unsupported(session, memory_mib):
2143-
with pytest.raises(
2144-
google.api_core.exceptions.InvalidArgument,
2145-
match="Invalid value specified for container memory",
2146-
):
2169+
with pytest.raises(ValueError, match="Cloud run supports"):
21472170

21482171
@session.remote_function(
21492172
reuse=False,

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.