Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: expose max_batching_rows in remote_function #622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions 46 bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,13 @@ def __init__(
self._cloud_function_docker_repository = cloud_function_docker_repository

def create_bq_remote_function(
self, input_args, input_types, output_type, endpoint, bq_function_name
self,
input_args,
input_types,
output_type,
endpoint,
bq_function_name,
max_batching_rows,
):
"""Create a BigQuery remote function given the artifacts of a user defined
function and the http endpoint of a corresponding cloud function."""
Expand All @@ -169,14 +175,25 @@ def create_bq_remote_function(
bq_function_args.append(
f"{name} {third_party_ibis_bqtypes.BigQueryType.from_ibis(input_types[idx])}"
)

remote_function_options = {
"endpoint": endpoint,
"max_batching_rows": max_batching_rows,
}

remote_function_options_str = ", ".join(
[
f'{key}="{val}"' if isinstance(val, str) else f"{key}={val}"
for key, val in remote_function_options.items()
if val is not None
]
)

create_function_ddl = f"""
CREATE OR REPLACE FUNCTION `{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}({','.join(bq_function_args)})
RETURNS {bq_function_return_type}
REMOTE WITH CONNECTION `{self._gcp_project_id}.{self._bq_location}.{self._bq_connection_id}`
OPTIONS (
endpoint = "{endpoint}",
max_batching_rows = 1000
)"""
OPTIONS ({remote_function_options_str})"""

logger.info(f"Creating BQ remote function: {create_function_ddl}")

Expand Down Expand Up @@ -438,6 +455,7 @@ def provision_bq_remote_function(
reuse,
name,
package_requirements,
max_batching_rows,
):
"""Provision a BigQuery remote function."""
# If reuse of any existing function with the same name (indicated by the
Expand Down Expand Up @@ -485,7 +503,12 @@ def provision_bq_remote_function(
"Exactly one type should be provided for every input arg."
)
self.create_bq_remote_function(
input_args, input_types, output_type, cf_endpoint, remote_function_name
input_args,
input_types,
output_type,
cf_endpoint,
remote_function_name,
max_batching_rows,
)
else:
logger.info(f"Remote function {remote_function_name} already exists.")
Expand Down Expand Up @@ -607,6 +630,7 @@ def remote_function(
cloud_function_service_account: Optional[str] = None,
cloud_function_kms_key_name: Optional[str] = None,
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
):
"""Decorator to turn a user defined function into a BigQuery remote function.

Expand Down Expand Up @@ -723,6 +747,15 @@ def remote_function(
projects/PROJECT_ID/locations/LOCATION/repositories/REPOSITORY_NAME.
For more details see
https://cloud.google.com/functions/docs/securing/cmek#before_you_begin.
max_batching_rows (int, Optional):
The maximum number of rows to be batched for processing in the
BQ remote function. Default value is 1000. A lower number can be
passed to avoid timeouts in case the user code is too complex to
process large number of rows fast enough. A higher number can be
used to increase throughput in case the user code is fast enough.
`None` can be passed to let BQ remote functions service apply
default batching. See for more details
https://cloud.google.com/bigquery/docs/remote-functions#limiting_number_of_rows_in_a_batch_request.
"""
import bigframes.pandas as bpd

Expand Down Expand Up @@ -846,6 +879,7 @@ def wrapper(f):
reuse,
name,
packages,
max_batching_rows,
)

# TODO: Move ibis logic to compiler step
Expand Down
2 changes: 2 additions & 0 deletions 2 bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ def remote_function(
cloud_function_service_account: Optional[str] = None,
cloud_function_kms_key_name: Optional[str] = None,
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
):
return global_session.with_default_session(
bigframes.session.Session.remote_function,
Expand All @@ -656,6 +657,7 @@ def remote_function(
cloud_function_service_account=cloud_function_service_account,
cloud_function_kms_key_name=cloud_function_kms_key_name,
cloud_function_docker_repository=cloud_function_docker_repository,
max_batching_rows=max_batching_rows,
)


Expand Down
11 changes: 11 additions & 0 deletions 11 bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,7 @@ def remote_function(
cloud_function_service_account: Optional[str] = None,
cloud_function_kms_key_name: Optional[str] = None,
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
):
"""Decorator to turn a user defined function into a BigQuery remote function. Check out
the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes.
Expand Down Expand Up @@ -1635,6 +1636,15 @@ def remote_function(
projects/PROJECT_ID/locations/LOCATION/repositories/REPOSITORY_NAME.
For more details see
https://cloud.google.com/functions/docs/securing/cmek#before_you_begin.
max_batching_rows (int, Optional):
The maximum number of rows to be batched for processing in the
BQ remote function. Default value is 1000. A lower number can be
passed to avoid timeouts in case the user code is too complex to
process large number of rows fast enough. A higher number can be
used to increase throughput in case the user code is fast enough.
`None` can be passed to let BQ remote functions service apply
default batching. See for more details
https://cloud.google.com/bigquery/docs/remote-functions#limiting_number_of_rows_in_a_batch_request.
Returns:
callable: A remote function object pointing to the cloud assets created
in the background to support the remote execution. The cloud assets can be
Expand All @@ -1656,6 +1666,7 @@ def remote_function(
cloud_function_service_account=cloud_function_service_account,
cloud_function_kms_key_name=cloud_function_kms_key_name,
cloud_function_docker_repository=cloud_function_docker_repository,
max_batching_rows=max_batching_rows,
)

def read_gbq_function(
Expand Down
36 changes: 36 additions & 0 deletions 36 tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -1300,3 +1300,39 @@ def square_num(x):
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square_num
)


@pytest.mark.parametrize(
("max_batching_rows"),
[
10_000,
None,
],
)
@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_max_batching_rows(session, scalars_dfs, max_batching_rows):
shobsi marked this conversation as resolved.
Show resolved Hide resolved
try:

def square(x):
return x * x

square_remote = session.remote_function(
[int], int, reuse=False, max_batching_rows=max_batching_rows
)(square)

bq_routine = session.bqclient.get_routine(
square_remote.bigframes_remote_function
)
assert bq_routine.remote_function_options.max_batching_rows == max_batching_rows

scalars_df, scalars_pandas_df = scalars_dfs

bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas()
pd_result = scalars_pandas_df["int64_too"].apply(square)

pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square_remote
)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.