From f257dd2f730821835ba1a100d5264b955060eece Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 3 Jul 2024 01:40:37 +0000 Subject: [PATCH 1/6] feat: support remote function cleanup with `session.close` --- bigframes/functions/remote_function.py | 111 ++++++++++++++++----- bigframes/session/__init__.py | 16 ++- tests/system/large/test_remote_function.py | 85 +++++++++++++++- 3 files changed, 179 insertions(+), 33 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index c1878b6c31..632bf1bed0 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -24,9 +24,11 @@ import string import sys import tempfile +import threading from typing import ( Any, cast, + Dict, List, Mapping, NamedTuple, @@ -71,6 +73,36 @@ # https://docs.python.org/3/library/pickle.html#data-stream-format _pickle_protocol_version = 4 +# Module level mapping of session-id to remote function artifacts +_temp_session_artifacts: Dict[str, Dict[str, str]] = {} +_session_artifacts_lock = threading.Lock() + + +def _update_session_artifacts(session_id: str, bqrf_routine: str, gcf_path: str): + """Update remote function artifacts for a session id.""" + global _temp_session_artifacts, _session_artifacts_lock + + with _session_artifacts_lock: + if session_id not in _temp_session_artifacts: + _temp_session_artifacts[session_id] = {} + _temp_session_artifacts[session_id][bqrf_routine] = gcf_path + + +def _clean_up_session_artifacts( + bqclient: bigquery.Client, + gcfclient: functions_v2.FunctionServiceClient, + session_id: str, +): + """Delete remote function artifacts for a session id.""" + global _temp_session_artifacts, _session_artifacts_lock + + with _session_artifacts_lock: + if session_id in _temp_session_artifacts: + for bqrf_routine, gcf_path in _temp_session_artifacts[session_id].items(): + bqclient.delete_routine(bqrf_routine) + gcfclient.delete_function(name=gcf_path) + _temp_session_artifacts.pop(session_id) + def get_remote_function_locations(bq_location): """Get BQ location and cloud functions region given a BQ client.""" @@ -102,7 +134,9 @@ def _get_hash(def_, package_requirements=None): return hashlib.md5(def_repr).hexdigest() -def _get_updated_package_requirements(package_requirements, is_row_processor): +def _get_updated_package_requirements( + package_requirements=None, is_row_processor=False +): requirements = [f"cloudpickle=={cloudpickle.__version__}"] if is_row_processor: # bigframes remote function will send an entire row of data as json, @@ -130,28 +164,17 @@ class IbisSignature(NamedTuple): output_type: IbisDataType -def get_cloud_function_name( - def_, uniq_suffix=None, package_requirements=None, is_row_processor=False -): +def get_cloud_function_name(function_hash, uniq_suffix=None): "Get a name for the cloud function for the given user defined function." - - # Augment user package requirements with any internal package - # requirements - package_requirements = _get_updated_package_requirements( - package_requirements, is_row_processor - ) - - cf_name = _get_hash(def_, package_requirements) - cf_name = f"bigframes-{cf_name}" # for identification + cf_name = f"bigframes-{function_hash}" # for identification if uniq_suffix: cf_name = f"{cf_name}-{uniq_suffix}" - return cf_name, package_requirements + return cf_name -def get_remote_function_name(def_, uniq_suffix=None, package_requirements=None): +def get_remote_function_name(function_hash, uniq_suffix=None): "Get a name for the BQ remote function for the given user defined function." - bq_rf_name = _get_hash(def_, package_requirements) - bq_rf_name = f"bigframes_{bq_rf_name}" # for identification + bq_rf_name = f"bigframes_{function_hash}" # for identification if uniq_suffix: bq_rf_name = f"{bq_rf_name}_{uniq_suffix}" return bq_rf_name @@ -272,6 +295,10 @@ def get_cloud_function_fully_qualified_name(self, name): self._gcp_project_id, self._cloud_function_region, name ) + def get_remote_function_fully_qualilfied_name(self, name): + "Get the fully qualilfied name for a BQ remote function." + return f"{self._gcp_project_id}.{self._bq_dataset}.{name}" + def get_cloud_function_endpoint(self, name): """Get the http endpoint of a cloud function if it exists.""" fully_qualified_name = self.get_cloud_function_fully_qualified_name(name) @@ -462,6 +489,21 @@ def create_cloud_function( ) return endpoint + def _record_session_artifacts( + self, remote_function_name: str, cloud_function_name: str + ): + remote_function_full_name = self.get_remote_function_fully_qualilfied_name( + remote_function_name + ) + cloud_function_full_name = self.get_cloud_function_fully_qualified_name( + cloud_function_name + ) + _update_session_artifacts( + self._session.session_id, + remote_function_full_name, + cloud_function_full_name, + ) + def provision_bq_remote_function( self, def_, @@ -487,12 +529,19 @@ def provision_bq_remote_function( random.choices(string.ascii_lowercase + string.digits, k=8) ) + # Augment user package requirements with any internal package + # requirements + package_requirements = _get_updated_package_requirements( + package_requirements, is_row_processor + ) + + # Compute a unique hash representing the user code + function_hash = _get_hash(def_, package_requirements) + # Derive the name of the cloud function underlying the intended BQ # remote function, also collect updated package requirements as # determined in the name resolution - cloud_function_name, package_requirements = get_cloud_function_name( - def_, uniq_suffix, package_requirements, is_row_processor - ) + cloud_function_name = get_cloud_function_name(function_hash, uniq_suffix) cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) # Create the cloud function if it does not exist @@ -515,9 +564,7 @@ def provision_bq_remote_function( # Derive the name of the remote function remote_function_name = name if not remote_function_name: - remote_function_name = get_remote_function_name( - def_, uniq_suffix, package_requirements - ) + remote_function_name = get_remote_function_name(function_hash, uniq_suffix) rf_endpoint, rf_conn = self.get_remote_function_specs(remote_function_name) # Create the BQ remote function in following circumstances: @@ -540,6 +587,18 @@ def provision_bq_remote_function( remote_function_name, max_batching_rows, ) + + # Update module level mapping of session id to the cloud artifacts + # created. This would be used to clean up any resources for a + # session. Note that we need to do this only for the case where an + # explicit name was not provided by the user and we used an internal + # name. For the cases where the user provided an explicit name, we + # are assuming that the user wants to persist them with that name + # and would directly manage their lifecycle. + if not name: + self._record_session_artifacts( + remote_function_name, cloud_function_name + ) else: logger.info(f"Remote function {remote_function_name} already exists.") @@ -926,7 +985,7 @@ def remote_function( " For more details see https://cloud.google.com/functions/docs/securing/cmek#before_you_begin" ) - bq_connection_manager = None if session is None else session.bqconnectionmanager + bq_connection_manager = session.bqconnectionmanager def wrapper(func): nonlocal input_types, output_type @@ -1054,7 +1113,9 @@ def try_delattr(attr): func.bigframes_cloud_function = ( remote_function_client.get_cloud_function_fully_qualified_name(cf_name) ) - func.bigframes_remote_function = str(dataset_ref.routine(rf_name)) # type: ignore + func.bigframes_remote_function = ( + remote_function_client.get_remote_function_fully_qualilfied_name(rf_name) + ) func.output_dtype = ( bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 867bdedf1c..57bbe3d09c 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -95,8 +95,7 @@ import bigframes.dtypes import bigframes.exceptions import bigframes.formatting_helpers as formatting_helpers -from bigframes.functions.remote_function import read_gbq_function as bigframes_rgf -from bigframes.functions.remote_function import remote_function as bigframes_rf +import bigframes.functions.remote_function as bigframes_rf import bigframes.session._io.bigquery as bf_io_bigquery import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table import bigframes.session.clients @@ -383,7 +382,7 @@ def __hash__(self): # Stable hash needed to use in expression tree return hash(str(self._anonymous_dataset)) - def close(self): + def _clean_up_tables(self): """Delete tables that were created with this session's session_id.""" client = self.bqclient project_id = self._anonymous_dataset.project @@ -393,6 +392,13 @@ def close(self): full_id = ".".join([project_id, dataset_id, table_id]) client.delete_table(full_id, not_found_ok=True) + def close(self): + """Delete resources that were created with this session's session_id.""" + self._clean_up_tables() + bigframes_rf._clean_up_session_artifacts( + self.bqclient, self.cloudfunctionsclient, self.session_id + ) + def read_gbq( self, query_or_table: str, @@ -1689,7 +1695,7 @@ def remote_function( `bigframes_remote_function` - The bigquery remote function capable of calling into `bigframes_cloud_function`. """ - return bigframes_rf( + return bigframes_rf.remote_function( input_types, output_type, session=self, @@ -1769,7 +1775,7 @@ def read_gbq_function( not including the `bigframes_cloud_function` property. """ - return bigframes_rgf( + return bigframes_rf.read_gbq_function( function_name=function_name, session=self, ) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index ef8b9811df..e31423121d 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -22,13 +22,13 @@ import textwrap import google.api_core.exceptions -from google.cloud import bigquery, storage +from google.cloud import bigquery, functions_v2, storage import pandas import pytest import test_utils.prefixer import bigframes -from bigframes.functions.remote_function import get_cloud_function_name +import bigframes.functions.remote_function as bigframes_rf import bigframes.series from tests.system.utils import ( assert_pandas_df_equal, @@ -590,7 +590,9 @@ def add_one(x): add_one_uniq, add_one_uniq_dir = make_uniq_udf(add_one) # Expected cloud function name for the unique udf - add_one_uniq_cf_name, _ = get_cloud_function_name(add_one_uniq) + package_requirements = bigframes_rf._get_updated_package_requirements() + add_one_uniq_hash = bigframes_rf._get_hash(add_one_uniq, package_requirements) + add_one_uniq_cf_name = bigframes_rf.get_cloud_function_name(add_one_uniq_hash) # There should be no cloud function yet for the unique udf cloud_functions = list( @@ -1860,3 +1862,80 @@ def test_remote_function_gcf_memory_unsupported(session, memory_mib): @session.remote_function(reuse=False, cloud_function_memory_mib=memory_mib) def square(x: int) -> int: return x * x + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_unnamed_removed_w_session_cleanup(): + # create a clean session + session = bigframes.connect() + + # create an unnamed remote function in the session + @session.remote_function(reuse=False) + def foo(x: int) -> int: + return x + 1 + + # ensure that remote function artifacts are created + assert foo.bigframes_remote_function is not None + session.bqclient.get_routine(foo.bigframes_remote_function) is not None + assert foo.bigframes_cloud_function is not None + session.cloudfunctionsclient.get_function( + name=foo.bigframes_cloud_function + ) is not None + + # explicitly close the session + session.close() + + # ensure that the bq remote function is deleted + with pytest.raises(google.cloud.exceptions.NotFound): + session.bqclient.get_routine(foo.bigframes_remote_function) + + # the deletion of cloud function happens in a non-blocking way, ensure that + # it either exists in a being-deleted state, or is already deleted + try: + gcf = session.cloudfunctionsclient.get_function( + name=foo.bigframes_cloud_function + ) + assert gcf.state is functions_v2.Function.State.DELETING + except google.cloud.exceptions.NotFound: + pass + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_named_perists_w_session_cleanup(): + try: + # create a clean session + session = bigframes.connect() + + # create a name for the remote function + name = test_utils.prefixer.Prefixer("bigframes", "").create_prefix() + + # create an unnamed remote function in the session + @session.remote_function(name=name) + def foo(x: int) -> int: + return x + 1 + + # ensure that remote function artifacts are created + assert foo.bigframes_remote_function is not None + session.bqclient.get_routine(foo.bigframes_remote_function) is not None + assert foo.bigframes_cloud_function is not None + session.cloudfunctionsclient.get_function( + name=foo.bigframes_cloud_function + ) is not None + + # explicitly close the session + session.close() + + # ensure that the bq remote function still exists + session.bqclient.get_routine(foo.bigframes_remote_function) is not None + + # the deletion of cloud function happens in a non-blocking way, ensure + # that it exists in active state + gcf = session.cloudfunctionsclient.get_function( + name=foo.bigframes_cloud_function + ) + assert gcf.state is functions_v2.Function.State.ACTIVE + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, foo + ) From 5d160563127d45898e3471eae455c4bd2d667349 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 3 Jul 2024 21:23:46 +0000 Subject: [PATCH 2/6] accept the possibility that the artifact may have already been deleted --- bigframes/functions/remote_function.py | 13 +++++++++++-- tests/system/large/test_remote_function.py | 2 +- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 632bf1bed0..9d2159927c 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -99,8 +99,17 @@ def _clean_up_session_artifacts( with _session_artifacts_lock: if session_id in _temp_session_artifacts: for bqrf_routine, gcf_path in _temp_session_artifacts[session_id].items(): - bqclient.delete_routine(bqrf_routine) - gcfclient.delete_function(name=gcf_path) + # Let's accept the possibility that the remote function may have + # been deleted directly by the user + bqclient.delete_routine(bqrf_routine, not_found_ok=True) + + # Let's accept the possibility that the cloud function may have + # been deleted directly by the user + try: + gcfclient.delete_function(name=gcf_path) + except google.api_core.exceptions.NotFound: + pass + _temp_session_artifacts.pop(session_id) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index e31423121d..8eaa122683 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1929,7 +1929,7 @@ def foo(x: int) -> int: session.bqclient.get_routine(foo.bigframes_remote_function) is not None # the deletion of cloud function happens in a non-blocking way, ensure - # that it exists in active state + # that it was not deleted and still exists in active state gcf = session.cloudfunctionsclient.get_function( name=foo.bigframes_cloud_function ) From c713e468f6a61651741ce1e927e5799c52e9cca1 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 9 Jul 2024 09:48:48 +0000 Subject: [PATCH 3/6] add cleanup by previous session id --- bigframes/functions/remote_function.py | 124 +++++++++++++++++---- bigframes/pandas/__init__.py | 10 +- bigframes/session/__init__.py | 2 +- tests/system/large/test_remote_function.py | 66 ++++++++++- tests/system/large/test_session.py | 10 +- 5 files changed, 177 insertions(+), 35 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 9d2159927c..07783b3f36 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -34,6 +34,7 @@ NamedTuple, Optional, Sequence, + Set, Tuple, TYPE_CHECKING, Union, @@ -69,6 +70,11 @@ logger = logging.getLogger(__name__) +# Naming convention for the remote function artifacts +_BIGFRAMES_REMOTE_FUNCTION_PREFIX = "bigframes" +_BQ_FUNCTION_NAME_SEPERATOR = "_" +_GCF_FUNCTION_NAME_SEPERATOR = "-" + # Protocol version 4 is available in python version 3.4 and above # https://docs.python.org/3/library/pickle.html#data-stream-format _pickle_protocol_version = 4 @@ -78,8 +84,8 @@ _session_artifacts_lock = threading.Lock() -def _update_session_artifacts(session_id: str, bqrf_routine: str, gcf_path: str): - """Update remote function artifacts for a session id.""" +def _update_alive_session_artifacts(session_id: str, bqrf_routine: str, gcf_path: str): + """Update remote function artifacts for a session id in the current runtime.""" global _temp_session_artifacts, _session_artifacts_lock with _session_artifacts_lock: @@ -88,12 +94,12 @@ def _update_session_artifacts(session_id: str, bqrf_routine: str, gcf_path: str) _temp_session_artifacts[session_id][bqrf_routine] = gcf_path -def _clean_up_session_artifacts( +def _clean_up_alive_session( bqclient: bigquery.Client, gcfclient: functions_v2.FunctionServiceClient, session_id: str, ): - """Delete remote function artifacts for a session id.""" + """Delete remote function artifacts for a session id in the current runtime.""" global _temp_session_artifacts, _session_artifacts_lock with _session_artifacts_lock: @@ -113,6 +119,70 @@ def _clean_up_session_artifacts( _temp_session_artifacts.pop(session_id) +def _clean_up_by_session_id( + bqclient: bigquery.Client, + gcfclient: functions_v2.FunctionServiceClient, + dataset: bigquery.DatasetReference, + session_id: str, +): + """Delete remote function artifacts for a session id, where the session id + was not necessarily created in the current runtime. This is useful if the + user worked with a BigQuery DataFrames session previously and remembered the + session id, and now wants to clean up its temporary resources at a later + point in time. + """ + + # First clean up the BQ remote functions and then the underlying + # cloud functions, so that at no point we are left with a remote function + # that is pointing to a cloud function that does not exist + + endpoints_to_be_deleted: Set[str] = set() + match_prefix = "".join( + [ + _BIGFRAMES_REMOTE_FUNCTION_PREFIX, + _BQ_FUNCTION_NAME_SEPERATOR, + session_id, + _BQ_FUNCTION_NAME_SEPERATOR, + ] + ) + for routine in bqclient.list_routines(dataset): + routine = cast(bigquery.Routine, routine) + + # skip past the routines not belonging to the given session id, or + # non-remote-function routines + if ( + routine.type_ != bigquery.RoutineType.SCALAR_FUNCTION + or not cast(str, routine.routine_id).startswith(match_prefix) + or not routine.remote_function_options + or not routine.remote_function_options.endpoint + ): + continue + + # Let's forgive the edge case possibility that the BQ remote function + # may have been deleted at the same time directly by the user + bqclient.delete_routine(routine, not_found_ok=True) + endpoints_to_be_deleted.add(routine.remote_function_options.endpoint) + + # Now clean up the cloud functions + bq_location = bqclient.get_dataset(dataset).location + bq_location, gcf_location = get_remote_function_locations(bq_location) + parent_path = gcfclient.common_location_path( + project=dataset.project, location=gcf_location + ) + for gcf in gcfclient.list_functions(parent=parent_path): + # skip past the cloud functions not attached to any BQ remote function + # belonging to the given session id + if gcf.service_config.uri not in endpoints_to_be_deleted: + continue + + # Let's forgive the edge case possibility that the cloud function + # may have been deleted at the same time directly by the user + try: + gcfclient.delete_function(name=gcf.name) + except google.api_core.exceptions.NotFound: + pass + + def get_remote_function_locations(bq_location): """Get BQ location and cloud functions region given a BQ client.""" # TODO(shobs, b/274647164): Find the best way to determine default location. @@ -173,20 +243,20 @@ class IbisSignature(NamedTuple): output_type: IbisDataType -def get_cloud_function_name(function_hash, uniq_suffix=None): +def get_cloud_function_name(function_hash, session_id, uniq_suffix=None): "Get a name for the cloud function for the given user defined function." - cf_name = f"bigframes-{function_hash}" # for identification + parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX, session_id, function_hash] if uniq_suffix: - cf_name = f"{cf_name}-{uniq_suffix}" - return cf_name + parts.append(uniq_suffix) + return _GCF_FUNCTION_NAME_SEPERATOR.join(parts) -def get_remote_function_name(function_hash, uniq_suffix=None): +def get_remote_function_name(function_hash, session_id, uniq_suffix=None): "Get a name for the BQ remote function for the given user defined function." - bq_rf_name = f"bigframes_{function_hash}" # for identification + parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX, session_id, function_hash] if uniq_suffix: - bq_rf_name = f"{bq_rf_name}_{uniq_suffix}" - return bq_rf_name + parts.append(uniq_suffix) + return _BQ_FUNCTION_NAME_SEPERATOR.join(parts) class RemoteFunctionClient: @@ -507,7 +577,7 @@ def _record_session_artifacts( cloud_function_full_name = self.get_cloud_function_fully_qualified_name( cloud_function_name ) - _update_session_artifacts( + _update_alive_session_artifacts( self._session.session_id, remote_function_full_name, cloud_function_full_name, @@ -529,15 +599,6 @@ def provision_bq_remote_function( cloud_function_memory_mib, ): """Provision a BigQuery remote function.""" - # If reuse of any existing function with the same name (indicated by the - # same hash of its source code) is not intended, then attach a unique - # suffix to the intended function name to make it unique. - uniq_suffix = None - if not reuse: - uniq_suffix = "".join( - random.choices(string.ascii_lowercase + string.digits, k=8) - ) - # Augment user package requirements with any internal package # requirements package_requirements = _get_updated_package_requirements( @@ -547,10 +608,23 @@ def provision_bq_remote_function( # Compute a unique hash representing the user code function_hash = _get_hash(def_, package_requirements) + # If reuse of any existing function with the same name (indicated by the + # same hash of its source code) is not intended, then attach a unique + # suffix to the intended function name to make it unique. + uniq_suffix = None + if not reuse: + # use 4 digits as a unique suffix which should suffice for + # uniqueness per session + uniq_suffix = "".join( + random.choices(string.ascii_lowercase + string.digits, k=4) + ) + # Derive the name of the cloud function underlying the intended BQ # remote function, also collect updated package requirements as # determined in the name resolution - cloud_function_name = get_cloud_function_name(function_hash, uniq_suffix) + cloud_function_name = get_cloud_function_name( + function_hash, self._session.session_id, uniq_suffix + ) cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) # Create the cloud function if it does not exist @@ -573,7 +647,9 @@ def provision_bq_remote_function( # Derive the name of the remote function remote_function_name = name if not remote_function_name: - remote_function_name = get_remote_function_name(function_hash, uniq_suffix) + remote_function_name = get_remote_function_name( + function_hash, self._session.session_id, uniq_suffix + ) rf_endpoint, rf_conn = self.get_remote_function_specs(remote_function_name) # Create the BQ remote function in following circumstances: diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index faba0f3aa3..2aab57c0d9 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -63,6 +63,7 @@ import bigframes.core.tools import bigframes.dataframe import bigframes.enums +import bigframes.functions.remote_function as bigframes_rf import bigframes.operations as ops import bigframes.series import bigframes.session @@ -794,7 +795,6 @@ def clean_up_by_session_id( None """ session = get_global_session() - client = session.bqclient if (location is None) != (project is None): raise ValueError( @@ -804,14 +804,18 @@ def clean_up_by_session_id( dataset = session._anonymous_dataset else: dataset = bigframes.session._io.bigquery.create_bq_dataset_reference( - client, + session.bqclient, location=location, project=project, api_name="clean_up_by_session_id", ) bigframes.session._io.bigquery.delete_tables_matching_session_id( - client, dataset, session_id + session.bqclient, dataset, session_id + ) + + bigframes_rf._clean_up_by_session_id( + session.bqclient, session.cloudfunctionsclient, dataset, session_id ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 13bbd3bf05..a829a8fb48 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -395,7 +395,7 @@ def _clean_up_tables(self): def close(self): """Delete resources that were created with this session's session_id.""" self._clean_up_tables() - bigframes_rf._clean_up_session_artifacts( + bigframes_rf._clean_up_alive_session( self.bqclient, self.cloudfunctionsclient, self.session_id ) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 8eaa122683..68ba657f18 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -29,6 +29,7 @@ import bigframes import bigframes.functions.remote_function as bigframes_rf +import bigframes.pandas as bpd import bigframes.series from tests.system.utils import ( assert_pandas_df_equal, @@ -592,7 +593,9 @@ def add_one(x): # Expected cloud function name for the unique udf package_requirements = bigframes_rf._get_updated_package_requirements() add_one_uniq_hash = bigframes_rf._get_hash(add_one_uniq, package_requirements) - add_one_uniq_cf_name = bigframes_rf.get_cloud_function_name(add_one_uniq_hash) + add_one_uniq_cf_name = bigframes_rf.get_cloud_function_name( + add_one_uniq_hash, session.session_id + ) # There should be no cloud function yet for the unique udf cloud_functions = list( @@ -1939,3 +1942,64 @@ def foo(x: int) -> int: cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, foo ) + + +def test_remote_function_clean_up_by_session_id(): + # Use a brand new session to avoid conflict with other tests + session = bigframes.Session() + session_id = session.session_id + try: + # we will create remote functions, one with explicit name and another + # without it, and later confirm that the former is deleted when the session + # is cleaned up by session id, but the latter remains + ## unnamed + @session.remote_function(reuse=False) + def foo_unnamed(x: int) -> int: + return x + 1 + + ## named + rf_name = test_utils.prefixer.Prefixer("bigframes", "").create_prefix() + + @session.remote_function(reuse=False, name=rf_name) + def foo_named(x: int) -> int: + return x + 2 + + # check that BQ remote functiosn were created with corresponding cloud + # functions + for foo in [foo_unnamed, foo_named]: + assert foo.bigframes_remote_function is not None + session.bqclient.get_routine(foo.bigframes_remote_function) is not None + assert foo.bigframes_cloud_function is not None + session.cloudfunctionsclient.get_function( + name=foo.bigframes_cloud_function + ) is not None + + # clean up using explicit session id + bpd.clean_up_by_session_id( + session_id, location=session._location, project=session._project + ) + + # ensure that the unnamed bq remote function is deleted along with its + # corresponding cloud function + with pytest.raises(google.cloud.exceptions.NotFound): + session.bqclient.get_routine(foo_unnamed.bigframes_remote_function) + try: + gcf = session.cloudfunctionsclient.get_function( + name=foo_unnamed.bigframes_cloud_function + ) + assert gcf.state is functions_v2.Function.State.DELETING + except google.cloud.exceptions.NotFound: + pass + + # ensure that the named bq remote function still exists along with its + # corresponding cloud function + session.bqclient.get_routine(foo_named.bigframes_remote_function) is not None + gcf = session.cloudfunctionsclient.get_function( + name=foo_named.bigframes_cloud_function + ) + assert gcf.state is functions_v2.Function.State.ACTIVE + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, foo_named + ) diff --git a/tests/system/large/test_session.py b/tests/system/large/test_session.py index c7a19dc26e..2b82d0133b 100644 --- a/tests/system/large/test_session.py +++ b/tests/system/large/test_session.py @@ -19,6 +19,7 @@ import bigframes import bigframes.pandas as bpd +import bigframes.session._io.bigquery @pytest.mark.parametrize( @@ -93,8 +94,7 @@ def test_clean_up_by_session_id(): session_id = session.session_id # we will create two tables and confirm that they are deleted - # when the session is closed by id - + # when the session is cleaned up by id bqclient = session.bqclient dataset = session._anonymous_dataset expiration = ( @@ -110,9 +110,7 @@ def test_clean_up_by_session_id(): max_results=bigframes.session._io.bigquery._LIST_TABLES_LIMIT, page_size=bigframes.session._io.bigquery._LIST_TABLES_LIMIT, ) - assert any( - [(session.session_id in table.full_table_id) for table in list(tables_before)] - ) + assert any([(session.session_id in table.full_table_id) for table in tables_before]) bpd.clean_up_by_session_id( session_id, location=session._location, project=session._project @@ -125,5 +123,5 @@ def test_clean_up_by_session_id(): page_size=bigframes.session._io.bigquery._LIST_TABLES_LIMIT, ) assert not any( - [(session.session_id in table.full_table_id) for table in list(tables_after)] + [(session.session_id in table.full_table_id) for table in tables_after] ) From 9626fed00281c47108cda5820098c60a3a807d78 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 9 Jul 2024 23:41:54 +0000 Subject: [PATCH 4/6] add more documentation --- bigframes/functions/remote_function.py | 8 +++++++- bigframes/pandas/__init__.py | 7 +++++-- bigframes/session/__init__.py | 12 ++++++++++-- tests/system/large/test_remote_function.py | 1 + 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 07783b3f36..521aa630b8 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -908,7 +908,13 @@ def remote_function( Explicit name of the persisted BigQuery remote function. Use it with caution, because two users working in the same project and dataset could overwrite each other's remote functions if they use the same - persistent name. + persistent name. When an explicit name is provided, any session + specific clean up (``bigframes.session.Session.close``/ + ``bigframes.pandas.close_session``/ + ``bigframes.pandas.reset_session``/ + ``bigframes.pandas.clean_up_by_session_id``) does not clean up + the function, and leaves it for the user to manage the function + and the associated cloud function directly. packages (str[], Optional): Explicit name of the external package dependencies. Each dependency is added to the `requirements.txt` as is, and can be of the form diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 2aab57c0d9..eb990d2393 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -769,8 +769,11 @@ def clean_up_by_session_id( location: Optional[str] = None, project: Optional[str] = None, ) -> None: - """Searches through table names in BigQuery and deletes tables - found matching the expected format. + """Searches through BigQuery tables and routines and deletes the ones + created during the session with the given session id. The match is + determined by having the session id present in the resource name or + metadata. The cloud functions serving the cleaned up routines are also + cleaned up. This could be useful if the session object has been lost. Calling `session.close()` or `bigframes.pandas.close_session()` diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 3a215cebb0..c7284e3001 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -393,7 +393,9 @@ def _clean_up_tables(self): client.delete_table(full_id, not_found_ok=True) def close(self): - """Delete resources that were created with this session's session_id.""" + """Delete resources that were created with this session's session_id. + This includes BigQuery tables, remote functions and cloud functions + serving the remote functions""" self._clean_up_tables() bigframes_rf._clean_up_alive_session( self.bqclient, self.cloudfunctionsclient, self.session_id @@ -1619,7 +1621,13 @@ def remote_function( Explicit name of the persisted BigQuery remote function. Use it with caution, because two users working in the same project and dataset could overwrite each other's remote functions if they use the same - persistent name. + persistent name. When an explicit name is provided, any session + specific clean up (``bigframes.session.Session.close``/ + ``bigframes.pandas.close_session``/ + ``bigframes.pandas.reset_session``/ + ``bigframes.pandas.clean_up_by_session_id``) does not clean up + the function, and leaves it for the user to manage the function + and the associated cloud function directly. packages (str[], Optional): Explicit name of the external package dependencies. Each dependency is added to the `requirements.txt` as is, and can be of the form diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 68ba657f18..303c74f1fd 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1944,6 +1944,7 @@ def foo(x: int) -> int: ) +@pytest.mark.flaky(retries=2, delay=120) def test_remote_function_clean_up_by_session_id(): # Use a brand new session to avoid conflict with other tests session = bigframes.Session() From bb6227a3b9bd33ba6f15b460050ab244486b46ed Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sat, 13 Jul 2024 01:07:16 +0000 Subject: [PATCH 5/6] hold session artifacts in a remote function session class --- bigframes/functions/remote_function.py | 933 +++++++++++++------------ bigframes/session/__init__.py | 6 +- 2 files changed, 472 insertions(+), 467 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 521aa630b8..a369b3d69c 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -79,45 +79,6 @@ # https://docs.python.org/3/library/pickle.html#data-stream-format _pickle_protocol_version = 4 -# Module level mapping of session-id to remote function artifacts -_temp_session_artifacts: Dict[str, Dict[str, str]] = {} -_session_artifacts_lock = threading.Lock() - - -def _update_alive_session_artifacts(session_id: str, bqrf_routine: str, gcf_path: str): - """Update remote function artifacts for a session id in the current runtime.""" - global _temp_session_artifacts, _session_artifacts_lock - - with _session_artifacts_lock: - if session_id not in _temp_session_artifacts: - _temp_session_artifacts[session_id] = {} - _temp_session_artifacts[session_id][bqrf_routine] = gcf_path - - -def _clean_up_alive_session( - bqclient: bigquery.Client, - gcfclient: functions_v2.FunctionServiceClient, - session_id: str, -): - """Delete remote function artifacts for a session id in the current runtime.""" - global _temp_session_artifacts, _session_artifacts_lock - - with _session_artifacts_lock: - if session_id in _temp_session_artifacts: - for bqrf_routine, gcf_path in _temp_session_artifacts[session_id].items(): - # Let's accept the possibility that the remote function may have - # been deleted directly by the user - bqclient.delete_routine(bqrf_routine, not_found_ok=True) - - # Let's accept the possibility that the cloud function may have - # been deleted directly by the user - try: - gcfclient.delete_function(name=gcf_path) - except google.api_core.exceptions.NotFound: - pass - - _temp_session_artifacts.pop(session_id) - def _clean_up_by_session_id( bqclient: bigquery.Client, @@ -568,21 +529,6 @@ def create_cloud_function( ) return endpoint - def _record_session_artifacts( - self, remote_function_name: str, cloud_function_name: str - ): - remote_function_full_name = self.get_remote_function_fully_qualilfied_name( - remote_function_name - ) - cloud_function_full_name = self.get_cloud_function_fully_qualified_name( - cloud_function_name - ) - _update_alive_session_artifacts( - self._session.session_id, - remote_function_full_name, - cloud_function_full_name, - ) - def provision_bq_remote_function( self, def_, @@ -656,6 +602,7 @@ def provision_bq_remote_function( # 1. It does not exist # 2. It exists but the existing remote function has different # configuration than intended + created_new = False if not rf_endpoint or ( rf_endpoint != cf_endpoint or rf_conn != self._bq_connection_id ): @@ -673,21 +620,11 @@ def provision_bq_remote_function( max_batching_rows, ) - # Update module level mapping of session id to the cloud artifacts - # created. This would be used to clean up any resources for a - # session. Note that we need to do this only for the case where an - # explicit name was not provided by the user and we used an internal - # name. For the cases where the user provided an explicit name, we - # are assuming that the user wants to persist them with that name - # and would directly manage their lifecycle. - if not name: - self._record_session_artifacts( - remote_function_name, cloud_function_name - ) + created_new = True else: logger.info(f"Remote function {remote_function_name} already exists.") - return remote_function_name, cloud_function_name + return remote_function_name, cloud_function_name, created_new def get_remote_function_specs(self, remote_function_name): """Check whether a remote function already exists for the udf.""" @@ -789,434 +726,500 @@ def get_routine_reference( return dataset_ref.routine(routine_ref_str) -# Inspired by @udf decorator implemented in ibis-bigquery package -# https://github.com/ibis-project/ibis-bigquery/blob/main/ibis_bigquery/udf/__init__.py -# which has moved as @js to the ibis package -# https://github.com/ibis-project/ibis/blob/master/ibis/backends/bigquery/udf/__init__.py -def remote_function( - input_types: Union[None, type, Sequence[type]] = None, - output_type: Optional[type] = None, - session: Optional[Session] = None, - bigquery_client: Optional[bigquery.Client] = None, - bigquery_connection_client: Optional[ - bigquery_connection_v1.ConnectionServiceClient - ] = None, - cloud_functions_client: Optional[functions_v2.FunctionServiceClient] = None, - resource_manager_client: Optional[resourcemanager_v3.ProjectsClient] = None, - dataset: Optional[str] = None, - bigquery_connection: Optional[str] = None, - reuse: bool = True, - name: Optional[str] = None, - packages: Optional[Sequence[str]] = None, - cloud_function_service_account: Optional[str] = None, - cloud_function_kms_key_name: Optional[str] = None, - cloud_function_docker_repository: Optional[str] = None, - max_batching_rows: Optional[int] = 1000, - cloud_function_timeout: Optional[int] = 600, - cloud_function_max_instances: Optional[int] = None, - cloud_function_vpc_connector: Optional[str] = None, - cloud_function_memory_mib: Optional[int] = 1024, -): - """Decorator to turn a user defined function into a BigQuery remote function. - - .. deprecated:: 0.0.1 - This is an internal method. Please use :func:`bigframes.pandas.remote_function` instead. - - .. note:: - Please make sure following is setup before using this API: - - 1. Have the below APIs enabled for your project: - - * BigQuery Connection API - * Cloud Functions API - * Cloud Run API - * Cloud Build API - * Artifact Registry API - * Cloud Resource Manager API - - This can be done from the cloud console (change `PROJECT_ID` to yours): - https://console.cloud.google.com/apis/enableflow?apiid=bigqueryconnection.googleapis.com,cloudfunctions.googleapis.com,run.googleapis.com,cloudbuild.googleapis.com,artifactregistry.googleapis.com,cloudresourcemanager.googleapis.com&project=PROJECT_ID - - Or from the gcloud CLI: - - `$ gcloud services enable bigqueryconnection.googleapis.com cloudfunctions.googleapis.com run.googleapis.com cloudbuild.googleapis.com artifactregistry.googleapis.com cloudresourcemanager.googleapis.com` - - 2. Have following IAM roles enabled for you: - - * BigQuery Data Editor (roles/bigquery.dataEditor) - * BigQuery Connection Admin (roles/bigquery.connectionAdmin) - * Cloud Functions Developer (roles/cloudfunctions.developer) - * Service Account User (roles/iam.serviceAccountUser) on the service account `PROJECT_NUMBER-compute@developer.gserviceaccount.com` - * Storage Object Viewer (roles/storage.objectViewer) - * Project IAM Admin (roles/resourcemanager.projectIamAdmin) (Only required if the bigquery connection being used is not pre-created and is created dynamically with user credentials.) - - 3. Either the user has setIamPolicy privilege on the project, or a BigQuery connection is pre-created with necessary IAM role set: - - 1. To create a connection, follow https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_connection - 2. To set up IAM, follow https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#grant_permission_on_function - - Alternatively, the IAM could also be setup via the gcloud CLI: - - `$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`. - - Args: - input_types (None, type, or sequence(type)): - For scalar user defined function it should be the input type or - sequence of input types. For row processing user defined function, - type `Series` should be specified. - output_type (Optional[type]): - Data type of the output in the user defined function. - session (bigframes.Session, Optional): - BigQuery DataFrames session to use for getting default project, - dataset and BigQuery connection. - bigquery_client (google.cloud.bigquery.Client, Optional): - Client to use for BigQuery operations. If this param is not provided - then bigquery client from the session would be used. - bigquery_connection_client (google.cloud.bigquery_connection_v1.ConnectionServiceClient, Optional): - Client to use for BigQuery connection operations. If this param is - not provided then bigquery connection client from the session would - be used. - cloud_functions_client (google.cloud.functions_v2.FunctionServiceClient, Optional): - Client to use for cloud functions operations. If this param is not - provided then the functions client from the session would be used. - resource_manager_client (google.cloud.resourcemanager_v3.ProjectsClient, Optional): - Client to use for cloud resource management operations, e.g. for - getting and setting IAM roles on cloud resources. If this param is - not provided then resource manager client from the session would be - used. - dataset (str, Optional.): - Dataset in which to create a BigQuery remote function. It should be in - `.` or `` format. If this - parameter is not provided then session dataset id is used. - bigquery_connection (str, Optional): - Name of the BigQuery connection in the form of `CONNECTION_ID` or - `LOCATION.CONNECTION_ID` or `PROJECT_ID.LOCATION.CONNECTION_ID`. - If this param is not provided then the bigquery connection from the session - would be used. If it is pre created in the same location as the - `bigquery_client.location` then it would be used, otherwise it is created - dynamically using the `bigquery_connection_client` assuming the user has necessary - priviliges. The PROJECT_ID should be the same as the BigQuery connection project. - reuse (bool, Optional): - Reuse the remote function if is already exists. - `True` by default, which results in reusing an existing remote - function and corresponding cloud function (if any) that was - previously created for the same udf. - Setting it to `False` forces the creation of a unique remote function. - If the required remote function does not exist then it would be - created irrespective of this param. - name (str, Optional): - Explicit name of the persisted BigQuery remote function. Use it with - caution, because two users working in the same project and dataset - could overwrite each other's remote functions if they use the same - persistent name. When an explicit name is provided, any session - specific clean up (``bigframes.session.Session.close``/ - ``bigframes.pandas.close_session``/ - ``bigframes.pandas.reset_session``/ - ``bigframes.pandas.clean_up_by_session_id``) does not clean up - the function, and leaves it for the user to manage the function - and the associated cloud function directly. - packages (str[], Optional): - Explicit name of the external package dependencies. Each dependency - is added to the `requirements.txt` as is, and can be of the form - supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. - cloud_function_service_account (str, Optional): - Service account to use for the cloud functions. If not provided then - the default service account would be used. See - https://cloud.google.com/functions/docs/securing/function-identity - for more details. Please make sure the service account has the - necessary IAM permissions configured as described in - https://cloud.google.com/functions/docs/reference/iam/roles#additional-configuration. - cloud_function_kms_key_name (str, Optional): - Customer managed encryption key to protect cloud functions and - related data at rest. This is of the format - projects/PROJECT_ID/locations/LOCATION/keyRings/KEYRING/cryptoKeys/KEY. - Read https://cloud.google.com/functions/docs/securing/cmek for - more details including granting necessary service accounts - access to the key. - cloud_function_docker_repository (str, Optional): - Docker repository created with the same encryption key as - `cloud_function_kms_key_name` to store encrypted artifacts - created to support the cloud function. This is of the format - projects/PROJECT_ID/locations/LOCATION/repositories/REPOSITORY_NAME. - For more details see - https://cloud.google.com/functions/docs/securing/cmek#before_you_begin. - max_batching_rows (int, Optional): - The maximum number of rows to be batched for processing in the - BQ remote function. Default value is 1000. A lower number can be - passed to avoid timeouts in case the user code is too complex to - process large number of rows fast enough. A higher number can be - used to increase throughput in case the user code is fast enough. - `None` can be passed to let BQ remote functions service apply - default batching. See for more details - https://cloud.google.com/bigquery/docs/remote-functions#limiting_number_of_rows_in_a_batch_request. - cloud_function_timeout (int, Optional): - The maximum amount of time (in seconds) BigQuery should wait for - the cloud function to return a response. See for more details - https://cloud.google.com/functions/docs/configuring/timeout. - Please note that even though the cloud function (2nd gen) itself - allows seeting up to 60 minutes of timeout, BigQuery remote - function can wait only up to 20 minutes, see for more details - https://cloud.google.com/bigquery/quotas#remote_function_limits. - By default BigQuery DataFrames uses a 10 minute timeout. `None` - can be passed to let the cloud functions default timeout take effect. - cloud_function_max_instances (int, Optional): - The maximumm instance count for the cloud function created. This - can be used to control how many cloud function instances can be - active at max at any given point of time. Lower setting can help - control the spike in the billing. Higher setting can help - support processing larger scale data. When not specified, cloud - function's default setting applies. For more details see - https://cloud.google.com/functions/docs/configuring/max-instances. - cloud_function_vpc_connector (str, Optional): - The VPC connector you would like to configure for your cloud - function. This is useful if your code needs access to data or - service(s) that are on a VPC network. See for more details - https://cloud.google.com/functions/docs/networking/connecting-vpc. - cloud_function_memory_mib (int, Optional): - The amounts of memory (in mebibytes) to allocate for the cloud - function (2nd gen) created. This also dictates a corresponding - amount of allocated CPU for the function. By default a memory of - 1024 MiB is set for the cloud functions created to support - BigQuery DataFrames remote function. If you want to let the - default memory of cloud functions be allocated, pass `None`. See - for more details - https://cloud.google.com/functions/docs/configuring/memory. - """ - # Some defaults may be used from the session if not provided otherwise - import bigframes.exceptions as bf_exceptions - import bigframes.pandas as bpd - import bigframes.series as bf_series - import bigframes.session - - session = cast(bigframes.session.Session, session or bpd.get_global_session()) - - # A BigQuery client is required to perform BQ operations - if not bigquery_client: - bigquery_client = session.bqclient - if not bigquery_client: - raise ValueError( - "A bigquery client must be provided, either directly or via session. " - f"{constants.FEEDBACK_LINK}" - ) - - # A BigQuery connection client is required to perform BQ connection operations - if not bigquery_connection_client: - bigquery_connection_client = session.bqconnectionclient - if not bigquery_connection_client: - raise ValueError( - "A bigquery connection client must be provided, either directly or via session. " - f"{constants.FEEDBACK_LINK}" - ) +class _RemoteFunctionSession: + """Session to manage remote functions.""" - # A cloud functions client is required to perform cloud functions operations - if not cloud_functions_client: - cloud_functions_client = session.cloudfunctionsclient - if not cloud_functions_client: - raise ValueError( - "A cloud functions client must be provided, either directly or via session. " - f"{constants.FEEDBACK_LINK}" - ) + def __init__(self): + # Session level mapping of remote function artifacts + self._temp_session_artifacts: Dict[str, str] = dict() - # A resource manager client is required to get/set IAM operations - if not resource_manager_client: - resource_manager_client = session.resourcemanagerclient - if not resource_manager_client: - raise ValueError( - "A resource manager client must be provided, either directly or via session. " - f"{constants.FEEDBACK_LINK}" - ) + # Lock to synchronize the update of the session level mapping + self._session_artifacts_lock = threading.Lock() - # BQ remote function must be persisted, for which we need a dataset - # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#:~:text=You%20cannot%20create%20temporary%20remote%20functions. - if dataset: - dataset_ref = bigquery.DatasetReference.from_string( - dataset, default_project=bigquery_client.project - ) - else: - dataset_ref = session._anonymous_dataset + def _update_artifacts(self, bqrf_routine: str, gcf_path: str): + """Update remote function artifacts in the current session.""" + with self._session_artifacts_lock: + self._temp_session_artifacts[bqrf_routine] = gcf_path - bq_location, cloud_function_region = get_remote_function_locations( - bigquery_client.location - ) + def clean_up( + self, + bqclient: bigquery.Client, + gcfclient: functions_v2.FunctionServiceClient, + session_id: str, + ): + """Delete remote function artifacts in the current session.""" + with self._session_artifacts_lock: + for bqrf_routine, gcf_path in self._temp_session_artifacts.items(): + # Let's accept the possibility that the remote function may have + # been deleted directly by the user + bqclient.delete_routine(bqrf_routine, not_found_ok=True) - # A connection is required for BQ remote function - # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function - if not bigquery_connection: - bigquery_connection = session._bq_connection # type: ignore + # Let's accept the possibility that the cloud function may have + # been deleted directly by the user + try: + gcfclient.delete_function(name=gcf_path) + except google.api_core.exceptions.NotFound: + pass - bigquery_connection = clients.resolve_full_bq_connection_name( - bigquery_connection, - default_project=dataset_ref.project, - default_location=bq_location, - ) - # Guaranteed to be the form of .. - ( - gcp_project_id, - bq_connection_location, - bq_connection_id, - ) = bigquery_connection.split(".") - if gcp_project_id.casefold() != dataset_ref.project.casefold(): - raise ValueError( - "The project_id does not match BigQuery connection gcp_project_id: " - f"{dataset_ref.project}." - ) - if bq_connection_location.casefold() != bq_location.casefold(): - raise ValueError( - "The location does not match BigQuery connection location: " - f"{bq_location}." - ) + self._temp_session_artifacts.clear() - # If any CMEK is intended then check that a docker repository is also specified - if ( - cloud_function_kms_key_name is not None - and cloud_function_docker_repository is None + # Inspired by @udf decorator implemented in ibis-bigquery package + # https://github.com/ibis-project/ibis-bigquery/blob/main/ibis_bigquery/udf/__init__.py + # which has moved as @js to the ibis package + # https://github.com/ibis-project/ibis/blob/master/ibis/backends/bigquery/udf/__init__.py + def remote_function( + self, + input_types: Union[None, type, Sequence[type]] = None, + output_type: Optional[type] = None, + session: Optional[Session] = None, + bigquery_client: Optional[bigquery.Client] = None, + bigquery_connection_client: Optional[ + bigquery_connection_v1.ConnectionServiceClient + ] = None, + cloud_functions_client: Optional[functions_v2.FunctionServiceClient] = None, + resource_manager_client: Optional[resourcemanager_v3.ProjectsClient] = None, + dataset: Optional[str] = None, + bigquery_connection: Optional[str] = None, + reuse: bool = True, + name: Optional[str] = None, + packages: Optional[Sequence[str]] = None, + cloud_function_service_account: Optional[str] = None, + cloud_function_kms_key_name: Optional[str] = None, + cloud_function_docker_repository: Optional[str] = None, + max_batching_rows: Optional[int] = 1000, + cloud_function_timeout: Optional[int] = 600, + cloud_function_max_instances: Optional[int] = None, + cloud_function_vpc_connector: Optional[str] = None, + cloud_function_memory_mib: Optional[int] = 1024, ): - raise ValueError( - "cloud_function_docker_repository must be specified with cloud_function_kms_key_name." - " For more details see https://cloud.google.com/functions/docs/securing/cmek#before_you_begin" - ) + """Decorator to turn a user defined function into a BigQuery remote function. + + .. deprecated:: 0.0.1 + This is an internal method. Please use :func:`bigframes.pandas.remote_function` instead. + + .. note:: + Please make sure following is setup before using this API: + + 1. Have the below APIs enabled for your project: + + * BigQuery Connection API + * Cloud Functions API + * Cloud Run API + * Cloud Build API + * Artifact Registry API + * Cloud Resource Manager API - bq_connection_manager = session.bqconnectionmanager + This can be done from the cloud console (change `PROJECT_ID` to yours): + https://console.cloud.google.com/apis/enableflow?apiid=bigqueryconnection.googleapis.com,cloudfunctions.googleapis.com,run.googleapis.com,cloudbuild.googleapis.com,artifactregistry.googleapis.com,cloudresourcemanager.googleapis.com&project=PROJECT_ID - def wrapper(func): - nonlocal input_types, output_type + Or from the gcloud CLI: - if not callable(func): - raise TypeError("f must be callable, got {}".format(func)) + `$ gcloud services enable bigqueryconnection.googleapis.com cloudfunctions.googleapis.com run.googleapis.com cloudbuild.googleapis.com artifactregistry.googleapis.com cloudresourcemanager.googleapis.com` - if sys.version_info >= (3, 10): - # Add `eval_str = True` so that deferred annotations are turned into their - # corresponding type objects. Need Python 3.10 for eval_str parameter. - # https://docs.python.org/3/library/inspect.html#inspect.signature - signature_kwargs: Mapping[str, Any] = {"eval_str": True} + 2. Have following IAM roles enabled for you: + + * BigQuery Data Editor (roles/bigquery.dataEditor) + * BigQuery Connection Admin (roles/bigquery.connectionAdmin) + * Cloud Functions Developer (roles/cloudfunctions.developer) + * Service Account User (roles/iam.serviceAccountUser) on the service account `PROJECT_NUMBER-compute@developer.gserviceaccount.com` + * Storage Object Viewer (roles/storage.objectViewer) + * Project IAM Admin (roles/resourcemanager.projectIamAdmin) (Only required if the bigquery connection being used is not pre-created and is created dynamically with user credentials.) + + 3. Either the user has setIamPolicy privilege on the project, or a BigQuery connection is pre-created with necessary IAM role set: + + 1. To create a connection, follow https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_connection + 2. To set up IAM, follow https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#grant_permission_on_function + + Alternatively, the IAM could also be setup via the gcloud CLI: + + `$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`. + + Args: + input_types (None, type, or sequence(type)): + For scalar user defined function it should be the input type or + sequence of input types. For row processing user defined function, + type `Series` should be specified. + output_type (Optional[type]): + Data type of the output in the user defined function. + session (bigframes.Session, Optional): + BigQuery DataFrames session to use for getting default project, + dataset and BigQuery connection. + bigquery_client (google.cloud.bigquery.Client, Optional): + Client to use for BigQuery operations. If this param is not provided + then bigquery client from the session would be used. + bigquery_connection_client (google.cloud.bigquery_connection_v1.ConnectionServiceClient, Optional): + Client to use for BigQuery connection operations. If this param is + not provided then bigquery connection client from the session would + be used. + cloud_functions_client (google.cloud.functions_v2.FunctionServiceClient, Optional): + Client to use for cloud functions operations. If this param is not + provided then the functions client from the session would be used. + resource_manager_client (google.cloud.resourcemanager_v3.ProjectsClient, Optional): + Client to use for cloud resource management operations, e.g. for + getting and setting IAM roles on cloud resources. If this param is + not provided then resource manager client from the session would be + used. + dataset (str, Optional.): + Dataset in which to create a BigQuery remote function. It should be in + `.` or `` format. If this + parameter is not provided then session dataset id is used. + bigquery_connection (str, Optional): + Name of the BigQuery connection in the form of `CONNECTION_ID` or + `LOCATION.CONNECTION_ID` or `PROJECT_ID.LOCATION.CONNECTION_ID`. + If this param is not provided then the bigquery connection from the session + would be used. If it is pre created in the same location as the + `bigquery_client.location` then it would be used, otherwise it is created + dynamically using the `bigquery_connection_client` assuming the user has necessary + priviliges. The PROJECT_ID should be the same as the BigQuery connection project. + reuse (bool, Optional): + Reuse the remote function if is already exists. + `True` by default, which results in reusing an existing remote + function and corresponding cloud function (if any) that was + previously created for the same udf. + Setting it to `False` forces the creation of a unique remote function. + If the required remote function does not exist then it would be + created irrespective of this param. + name (str, Optional): + Explicit name of the persisted BigQuery remote function. Use it with + caution, because two users working in the same project and dataset + could overwrite each other's remote functions if they use the same + persistent name. When an explicit name is provided, any session + specific clean up (``bigframes.session.Session.close``/ + ``bigframes.pandas.close_session``/ + ``bigframes.pandas.reset_session``/ + ``bigframes.pandas.clean_up_by_session_id``) does not clean up + the function, and leaves it for the user to manage the function + and the associated cloud function directly. + packages (str[], Optional): + Explicit name of the external package dependencies. Each dependency + is added to the `requirements.txt` as is, and can be of the form + supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. + cloud_function_service_account (str, Optional): + Service account to use for the cloud functions. If not provided then + the default service account would be used. See + https://cloud.google.com/functions/docs/securing/function-identity + for more details. Please make sure the service account has the + necessary IAM permissions configured as described in + https://cloud.google.com/functions/docs/reference/iam/roles#additional-configuration. + cloud_function_kms_key_name (str, Optional): + Customer managed encryption key to protect cloud functions and + related data at rest. This is of the format + projects/PROJECT_ID/locations/LOCATION/keyRings/KEYRING/cryptoKeys/KEY. + Read https://cloud.google.com/functions/docs/securing/cmek for + more details including granting necessary service accounts + access to the key. + cloud_function_docker_repository (str, Optional): + Docker repository created with the same encryption key as + `cloud_function_kms_key_name` to store encrypted artifacts + created to support the cloud function. This is of the format + projects/PROJECT_ID/locations/LOCATION/repositories/REPOSITORY_NAME. + For more details see + https://cloud.google.com/functions/docs/securing/cmek#before_you_begin. + max_batching_rows (int, Optional): + The maximum number of rows to be batched for processing in the + BQ remote function. Default value is 1000. A lower number can be + passed to avoid timeouts in case the user code is too complex to + process large number of rows fast enough. A higher number can be + used to increase throughput in case the user code is fast enough. + `None` can be passed to let BQ remote functions service apply + default batching. See for more details + https://cloud.google.com/bigquery/docs/remote-functions#limiting_number_of_rows_in_a_batch_request. + cloud_function_timeout (int, Optional): + The maximum amount of time (in seconds) BigQuery should wait for + the cloud function to return a response. See for more details + https://cloud.google.com/functions/docs/configuring/timeout. + Please note that even though the cloud function (2nd gen) itself + allows seeting up to 60 minutes of timeout, BigQuery remote + function can wait only up to 20 minutes, see for more details + https://cloud.google.com/bigquery/quotas#remote_function_limits. + By default BigQuery DataFrames uses a 10 minute timeout. `None` + can be passed to let the cloud functions default timeout take effect. + cloud_function_max_instances (int, Optional): + The maximumm instance count for the cloud function created. This + can be used to control how many cloud function instances can be + active at max at any given point of time. Lower setting can help + control the spike in the billing. Higher setting can help + support processing larger scale data. When not specified, cloud + function's default setting applies. For more details see + https://cloud.google.com/functions/docs/configuring/max-instances. + cloud_function_vpc_connector (str, Optional): + The VPC connector you would like to configure for your cloud + function. This is useful if your code needs access to data or + service(s) that are on a VPC network. See for more details + https://cloud.google.com/functions/docs/networking/connecting-vpc. + cloud_function_memory_mib (int, Optional): + The amounts of memory (in mebibytes) to allocate for the cloud + function (2nd gen) created. This also dictates a corresponding + amount of allocated CPU for the function. By default a memory of + 1024 MiB is set for the cloud functions created to support + BigQuery DataFrames remote function. If you want to let the + default memory of cloud functions be allocated, pass `None`. See + for more details + https://cloud.google.com/functions/docs/configuring/memory. + """ + # Some defaults may be used from the session if not provided otherwise + import bigframes.exceptions as bf_exceptions + import bigframes.pandas as bpd + import bigframes.series as bf_series + import bigframes.session + + session = cast(bigframes.session.Session, session or bpd.get_global_session()) + + # A BigQuery client is required to perform BQ operations + if not bigquery_client: + bigquery_client = session.bqclient + if not bigquery_client: + raise ValueError( + "A bigquery client must be provided, either directly or via session. " + f"{constants.FEEDBACK_LINK}" + ) + + # A BigQuery connection client is required to perform BQ connection operations + if not bigquery_connection_client: + bigquery_connection_client = session.bqconnectionclient + if not bigquery_connection_client: + raise ValueError( + "A bigquery connection client must be provided, either directly or via session. " + f"{constants.FEEDBACK_LINK}" + ) + + # A cloud functions client is required to perform cloud functions operations + if not cloud_functions_client: + cloud_functions_client = session.cloudfunctionsclient + if not cloud_functions_client: + raise ValueError( + "A cloud functions client must be provided, either directly or via session. " + f"{constants.FEEDBACK_LINK}" + ) + + # A resource manager client is required to get/set IAM operations + if not resource_manager_client: + resource_manager_client = session.resourcemanagerclient + if not resource_manager_client: + raise ValueError( + "A resource manager client must be provided, either directly or via session. " + f"{constants.FEEDBACK_LINK}" + ) + + # BQ remote function must be persisted, for which we need a dataset + # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#:~:text=You%20cannot%20create%20temporary%20remote%20functions. + if dataset: + dataset_ref = bigquery.DatasetReference.from_string( + dataset, default_project=bigquery_client.project + ) else: - signature_kwargs = {} + dataset_ref = session._anonymous_dataset - signature = inspect.signature( - func, - **signature_kwargs, + bq_location, cloud_function_region = get_remote_function_locations( + bigquery_client.location ) - # Try to get input types via type annotations. - if input_types is None: - input_types = [] - for parameter in signature.parameters.values(): - if (param_type := parameter.annotation) is inspect.Signature.empty: + # A connection is required for BQ remote function + # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function + if not bigquery_connection: + bigquery_connection = session._bq_connection # type: ignore + + bigquery_connection = clients.resolve_full_bq_connection_name( + bigquery_connection, + default_project=dataset_ref.project, + default_location=bq_location, + ) + # Guaranteed to be the form of .. + ( + gcp_project_id, + bq_connection_location, + bq_connection_id, + ) = bigquery_connection.split(".") + if gcp_project_id.casefold() != dataset_ref.project.casefold(): + raise ValueError( + "The project_id does not match BigQuery connection gcp_project_id: " + f"{dataset_ref.project}." + ) + if bq_connection_location.casefold() != bq_location.casefold(): + raise ValueError( + "The location does not match BigQuery connection location: " + f"{bq_location}." + ) + + # If any CMEK is intended then check that a docker repository is also specified + if ( + cloud_function_kms_key_name is not None + and cloud_function_docker_repository is None + ): + raise ValueError( + "cloud_function_docker_repository must be specified with cloud_function_kms_key_name." + " For more details see https://cloud.google.com/functions/docs/securing/cmek#before_you_begin" + ) + + bq_connection_manager = session.bqconnectionmanager + + def wrapper(func): + nonlocal input_types, output_type + + if not callable(func): + raise TypeError("f must be callable, got {}".format(func)) + + if sys.version_info >= (3, 10): + # Add `eval_str = True` so that deferred annotations are turned into their + # corresponding type objects. Need Python 3.10 for eval_str parameter. + # https://docs.python.org/3/library/inspect.html#inspect.signature + signature_kwargs: Mapping[str, Any] = {"eval_str": True} + else: + signature_kwargs = {} + + signature = inspect.signature( + func, + **signature_kwargs, + ) + + # Try to get input types via type annotations. + if input_types is None: + input_types = [] + for parameter in signature.parameters.values(): + if (param_type := parameter.annotation) is inspect.Signature.empty: + raise ValueError( + "'input_types' was not set and parameter " + f"'{parameter.name}' is missing a type annotation. " + "Types are required to use @remote_function." + ) + input_types.append(param_type) + elif not isinstance(input_types, collections.abc.Sequence): + input_types = [input_types] + + if output_type is None: + if ( + output_type := signature.return_annotation + ) is inspect.Signature.empty: raise ValueError( - "'input_types' was not set and parameter " - f"'{parameter.name}' is missing a type annotation. " - "Types are required to use @remote_function." + "'output_type' was not set and function is missing a " + "return type annotation. Types are required to use " + "@remote_function." ) - input_types.append(param_type) - elif not isinstance(input_types, collections.abc.Sequence): - input_types = [input_types] - if output_type is None: - if (output_type := signature.return_annotation) is inspect.Signature.empty: - raise ValueError( - "'output_type' was not set and function is missing a " - "return type annotation. Types are required to use " - "@remote_function." + # The function will actually be receiving a pandas Series, but allow both + # BigQuery DataFrames and pandas object types for compatibility. + is_row_processor = False + if len(input_types) == 1 and ( + (input_type := input_types[0]) == bf_series.Series + or input_type == pandas.Series + ): + warnings.warn( + "input_types=Series is in preview.", + stacklevel=1, + category=bf_exceptions.PreviewWarning, ) - # The function will actually be receiving a pandas Series, but allow both - # BigQuery DataFrames and pandas object types for compatibility. - is_row_processor = False - if len(input_types) == 1 and ( - (input_type := input_types[0]) == bf_series.Series - or input_type == pandas.Series - ): - warnings.warn( - "input_types=Series is in preview.", - stacklevel=1, - category=bf_exceptions.PreviewWarning, - ) + # we will model the row as a json serialized string containing the data + # and the metadata representing the row + input_types = [str] + is_row_processor = True + elif isinstance(input_types, type): + input_types = [input_types] - # we will model the row as a json serialized string containing the data - # and the metadata representing the row - input_types = [str] - is_row_processor = True - elif isinstance(input_types, type): - input_types = [input_types] + # TODO(b/340898611): fix type error + ibis_signature = ibis_signature_from_python_signature( + signature, input_types, output_type # type: ignore + ) - # TODO(b/340898611): fix type error - ibis_signature = ibis_signature_from_python_signature( - signature, input_types, output_type # type: ignore - ) + remote_function_client = RemoteFunctionClient( + dataset_ref.project, + cloud_function_region, + cloud_functions_client, + bq_location, + dataset_ref.dataset_id, + bigquery_client, + bq_connection_id, + bq_connection_manager, + cloud_function_service_account, + cloud_function_kms_key_name, + cloud_function_docker_repository, + session=session, # type: ignore + ) - remote_function_client = RemoteFunctionClient( - dataset_ref.project, - cloud_function_region, - cloud_functions_client, - bq_location, - dataset_ref.dataset_id, - bigquery_client, - bq_connection_id, - bq_connection_manager, - cloud_function_service_account, - cloud_function_kms_key_name, - cloud_function_docker_repository, - session=session, # type: ignore - ) + # In the unlikely case where the user is trying to re-deploy the same + # function, cleanup the attributes we add below, first. This prevents + # the pickle from having dependencies that might not otherwise be + # present such as ibis or pandas. + def try_delattr(attr): + try: + delattr(func, attr) + except AttributeError: + pass - # In the unlikely case where the user is trying to re-deploy the same - # function, cleanup the attributes we add below, first. This prevents - # the pickle from having dependencies that might not otherwise be - # present such as ibis or pandas. - def try_delattr(attr): - try: - delattr(func, attr) - except AttributeError: - pass - - try_delattr("bigframes_cloud_function") - try_delattr("bigframes_remote_function") - try_delattr("output_dtype") - try_delattr("ibis_node") - - rf_name, cf_name = remote_function_client.provision_bq_remote_function( - func, - input_types=tuple( - third_party_ibis_bqtypes.BigQueryType.from_ibis(type_) - for type_ in ibis_signature.input_types - ), - output_type=third_party_ibis_bqtypes.BigQueryType.from_ibis( - ibis_signature.output_type - ), - reuse=reuse, - name=name, - package_requirements=packages, - max_batching_rows=max_batching_rows, - cloud_function_timeout=cloud_function_timeout, - cloud_function_max_instance_count=cloud_function_max_instances, - is_row_processor=is_row_processor, - cloud_function_vpc_connector=cloud_function_vpc_connector, - cloud_function_memory_mib=cloud_function_memory_mib, - ) + try_delattr("bigframes_cloud_function") + try_delattr("bigframes_remote_function") + try_delattr("output_dtype") + try_delattr("ibis_node") + + ( + rf_name, + cf_name, + created_new, + ) = remote_function_client.provision_bq_remote_function( + func, + input_types=tuple( + third_party_ibis_bqtypes.BigQueryType.from_ibis(type_) + for type_ in ibis_signature.input_types + ), + output_type=third_party_ibis_bqtypes.BigQueryType.from_ibis( + ibis_signature.output_type + ), + reuse=reuse, + name=name, + package_requirements=packages, + max_batching_rows=max_batching_rows, + cloud_function_timeout=cloud_function_timeout, + cloud_function_max_instance_count=cloud_function_max_instances, + is_row_processor=is_row_processor, + cloud_function_vpc_connector=cloud_function_vpc_connector, + cloud_function_memory_mib=cloud_function_memory_mib, + ) - # TODO: Move ibis logic to compiler step - node = ibis.udf.scalar.builtin( - func, - name=rf_name, - schema=f"{dataset_ref.project}.{dataset_ref.dataset_id}", - signature=(ibis_signature.input_types, ibis_signature.output_type), - ) - func.bigframes_cloud_function = ( - remote_function_client.get_cloud_function_fully_qualified_name(cf_name) - ) - func.bigframes_remote_function = ( - remote_function_client.get_remote_function_fully_qualilfied_name(rf_name) - ) + # TODO: Move ibis logic to compiler step + node = ibis.udf.scalar.builtin( + func, + name=rf_name, + schema=f"{dataset_ref.project}.{dataset_ref.dataset_id}", + signature=(ibis_signature.input_types, ibis_signature.output_type), + ) + func.bigframes_cloud_function = ( + remote_function_client.get_cloud_function_fully_qualified_name(cf_name) + ) + func.bigframes_remote_function = ( + remote_function_client.get_remote_function_fully_qualilfied_name( + rf_name + ) + ) - func.output_dtype = ( - bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( - ibis_signature.output_type + func.output_dtype = ( + bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( + ibis_signature.output_type + ) ) - ) - func.ibis_node = node - return func + func.ibis_node = node + + # If a new remote function was created, update the cloud artifacts + # created in the session. This would be used to clean up any + # resources in the session. Note that we need to do this only for + # the case where an explicit name was not provided by the user and + # we used an internal name. For the cases where the user provided an + # explicit name, we are assuming that the user wants to persist them + # with that name and would directly manage their lifecycle. + if created_new and (not name): + self._update_artifacts( + func.bigframes_remote_function, func.bigframes_cloud_function + ) + return func + + return wrapper + + +def remote_function(*args, **kwargs): + remote_function_session = _RemoteFunctionSession() + remote_function_session.remote_function(*args, **kwargs) + - return wrapper +remote_function.__doc__ = _RemoteFunctionSession.remote_function.__doc__ def read_gbq_function( diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index c7284e3001..10c0797873 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -305,6 +305,8 @@ def __init__( else bigframes.enums.DefaultIndexKind.NULL ) + self._remote_function_session = bigframes_rf._RemoteFunctionSession() + @property def bqclient(self): return self._clients_provider.bqclient @@ -397,7 +399,7 @@ def close(self): This includes BigQuery tables, remote functions and cloud functions serving the remote functions""" self._clean_up_tables() - bigframes_rf._clean_up_alive_session( + self._remote_function_session.clean_up( self.bqclient, self.cloudfunctionsclient, self.session_id ) @@ -1703,7 +1705,7 @@ def remote_function( `bigframes_remote_function` - The bigquery remote function capable of calling into `bigframes_cloud_function`. """ - return bigframes_rf.remote_function( + return self._remote_function_session.remote_function( input_types, output_type, session=self, From a1365855cfe07ce9cc4f567e84a781fbfe9ca239 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sat, 13 Jul 2024 02:30:51 +0000 Subject: [PATCH 6/6] fix the missing return keyword --- bigframes/functions/remote_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index a369b3d69c..f24ba1b5fb 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -1216,7 +1216,7 @@ def try_delattr(attr): def remote_function(*args, **kwargs): remote_function_session = _RemoteFunctionSession() - remote_function_session.remote_function(*args, **kwargs) + return remote_function_session.remote_function(*args, **kwargs) remote_function.__doc__ = _RemoteFunctionSession.remote_function.__doc__