From 49e726eaaf522853840fb578a09558fc0a8ff87f Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 18 Jun 2024 22:56:28 +0000 Subject: [PATCH 1/2] fix: include internally required packages in `remote_function` id computation --- bigframes/functions/remote_function.py | 34 +++++++++++++++++--------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 472ac07547..cf552643ac 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -276,18 +276,10 @@ def generate_cloud_function_code( """ # requirements.txt - requirements = ["cloudpickle >= 2.1.0"] - if is_row_processor: - # bigframes remote function will send an entire row of data as json, - # which would be converted to a pandas series and processed - requirements.append(f"pandas=={pandas.__version__}") - requirements.append(f"pyarrow=={pyarrow.__version__}") if package_requirements: - requirements.extend(package_requirements) - requirements = sorted(requirements) - requirements_txt = os.path.join(directory, "requirements.txt") - with open(requirements_txt, "w") as f: - f.write("\n".join(requirements)) + requirements_txt = os.path.join(directory, "requirements.txt") + with open(requirements_txt, "w") as f: + f.write("\n".join(package_requirements)) # main.py entry_point = bigframes.functions.remote_function_template.generate_cloud_function_main_code( @@ -440,6 +432,20 @@ def create_cloud_function( ) return endpoint + def _get_updated_package_requirements(self, package_requirements, is_row_processor): + requirements = ["cloudpickle>=2.1.0"] + if is_row_processor: + # bigframes remote function will send an entire row of data as json, + # which would be converted to a pandas series and processed + requirements.append(f"pandas=={pandas.__version__}") + requirements.append(f"pyarrow=={pyarrow.__version__}") + + if package_requirements: + requirements.extend(package_requirements) + + requirements = sorted(requirements) + return requirements + def provision_bq_remote_function( self, def_, @@ -464,6 +470,12 @@ def provision_bq_remote_function( random.choices(string.ascii_lowercase + string.digits, k=8) ) + # Augment user package requirements with any internal package + # requirements + package_requirements = self._get_updated_package_requirements( + package_requirements, is_row_processor + ) + # Derive the name of the cloud function underlying the intended BQ # remote function cloud_function_name = get_cloud_function_name( From 337697a87f8461b32bb89c15715a0af9a8cf528f Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 19 Jun 2024 00:46:09 +0000 Subject: [PATCH 2/2] refactor to keep the tests supported --- bigframes/functions/remote_function.py | 55 ++++++++++++---------- tests/system/large/test_remote_function.py | 2 +- 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index cf552643ac..ecc6c2e100 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -101,6 +101,21 @@ def _get_hash(def_, package_requirements=None): return hashlib.md5(def_repr).hexdigest() +def _get_updated_package_requirements(package_requirements, is_row_processor): + requirements = [f"cloudpickle=={cloudpickle.__version__}"] + if is_row_processor: + # bigframes remote function will send an entire row of data as json, + # which would be converted to a pandas series and processed + requirements.append(f"pandas=={pandas.__version__}") + requirements.append(f"pyarrow=={pyarrow.__version__}") + + if package_requirements: + requirements.extend(package_requirements) + + requirements = sorted(requirements) + return requirements + + def routine_ref_to_string_for_query(routine_ref: bigquery.RoutineReference) -> str: return f"`{routine_ref.project}.{routine_ref.dataset_id}`.{routine_ref.routine_id}" @@ -111,13 +126,22 @@ class IbisSignature(NamedTuple): output_type: IbisDataType -def get_cloud_function_name(def_, uniq_suffix=None, package_requirements=None): +def get_cloud_function_name( + def_, uniq_suffix=None, package_requirements=None, is_row_processor=False +): "Get a name for the cloud function for the given user defined function." + + # Augment user package requirements with any internal package + # requirements + package_requirements = _get_updated_package_requirements( + package_requirements, is_row_processor + ) + cf_name = _get_hash(def_, package_requirements) cf_name = f"bigframes-{cf_name}" # for identification if uniq_suffix: cf_name = f"{cf_name}-{uniq_suffix}" - return cf_name + return cf_name, package_requirements def get_remote_function_name(def_, uniq_suffix=None, package_requirements=None): @@ -432,20 +456,6 @@ def create_cloud_function( ) return endpoint - def _get_updated_package_requirements(self, package_requirements, is_row_processor): - requirements = ["cloudpickle>=2.1.0"] - if is_row_processor: - # bigframes remote function will send an entire row of data as json, - # which would be converted to a pandas series and processed - requirements.append(f"pandas=={pandas.__version__}") - requirements.append(f"pyarrow=={pyarrow.__version__}") - - if package_requirements: - requirements.extend(package_requirements) - - requirements = sorted(requirements) - return requirements - def provision_bq_remote_function( self, def_, @@ -470,16 +480,11 @@ def provision_bq_remote_function( random.choices(string.ascii_lowercase + string.digits, k=8) ) - # Augment user package requirements with any internal package - # requirements - package_requirements = self._get_updated_package_requirements( - package_requirements, is_row_processor - ) - # Derive the name of the cloud function underlying the intended BQ - # remote function - cloud_function_name = get_cloud_function_name( - def_, uniq_suffix, package_requirements + # remote function, also collect updated package requirements as + # determined in the name resolution + cloud_function_name, package_requirements = get_cloud_function_name( + def_, uniq_suffix, package_requirements, is_row_processor ) cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 6bfc9f0da3..3f4bfea97e 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -590,7 +590,7 @@ def add_one(x): add_one_uniq, add_one_uniq_dir = make_uniq_udf(add_one) # Expected cloud function name for the unique udf - add_one_uniq_cf_name = get_cloud_function_name(add_one_uniq) + add_one_uniq_cf_name, _ = get_cloud_function_name(add_one_uniq) # There should be no cloud function yet for the unique udf cloud_functions = list(