Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

fix: include internally required packages in remote_function hash #799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions 55 bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,24 @@ def _get_hash(def_, package_requirements=None):
return hashlib.md5(def_repr).hexdigest()


def _get_updated_package_requirements(package_requirements, is_row_processor):
requirements = [f"cloudpickle=={cloudpickle.__version__}"]
if is_row_processor:
# bigframes remote function will send an entire row of data as json,
# which would be converted to a pandas series and processed
# Ensure numpy versions match to avoid unpickling problems. See
# internal issue b/347934471.
requirements.append(f"numpy=={numpy.__version__}")
requirements.append(f"pandas=={pandas.__version__}")
requirements.append(f"pyarrow=={pyarrow.__version__}")

if package_requirements:
requirements.extend(package_requirements)

requirements = sorted(requirements)
return requirements


def routine_ref_to_string_for_query(routine_ref: bigquery.RoutineReference) -> str:
return f"`{routine_ref.project}.{routine_ref.dataset_id}`.{routine_ref.routine_id}"

Expand All @@ -112,13 +130,22 @@ class IbisSignature(NamedTuple):
output_type: IbisDataType


def get_cloud_function_name(def_, uniq_suffix=None, package_requirements=None):
def get_cloud_function_name(
def_, uniq_suffix=None, package_requirements=None, is_row_processor=False
):
"Get a name for the cloud function for the given user defined function."

# Augment user package requirements with any internal package
# requirements
package_requirements = _get_updated_package_requirements(
package_requirements, is_row_processor
)

cf_name = _get_hash(def_, package_requirements)
cf_name = f"bigframes-{cf_name}" # for identification
if uniq_suffix:
cf_name = f"{cf_name}-{uniq_suffix}"
return cf_name
return cf_name, package_requirements


def get_remote_function_name(def_, uniq_suffix=None, package_requirements=None):
Expand Down Expand Up @@ -277,21 +304,10 @@ def generate_cloud_function_code(
"""

# requirements.txt
requirements = ["cloudpickle >= 2.1.0"]
if is_row_processor:
# bigframes remote function will send an entire row of data as json,
# which would be converted to a pandas series and processed
# Ensure numpy versions match to avoid unpickling problems. See
# internal issue b/347934471.
requirements.append(f"numpy=={numpy.__version__}")
requirements.append(f"pandas=={pandas.__version__}")
requirements.append(f"pyarrow=={pyarrow.__version__}")
if package_requirements:
requirements.extend(package_requirements)
requirements = sorted(requirements)
requirements_txt = os.path.join(directory, "requirements.txt")
with open(requirements_txt, "w") as f:
f.write("\n".join(requirements))
requirements_txt = os.path.join(directory, "requirements.txt")
with open(requirements_txt, "w") as f:
f.write("\n".join(package_requirements))

# main.py
entry_point = bigframes.functions.remote_function_template.generate_cloud_function_main_code(
Expand Down Expand Up @@ -469,9 +485,10 @@ def provision_bq_remote_function(
)

# Derive the name of the cloud function underlying the intended BQ
# remote function
cloud_function_name = get_cloud_function_name(
def_, uniq_suffix, package_requirements
# remote function, also collect updated package requirements as
# determined in the name resolution
cloud_function_name, package_requirements = get_cloud_function_name(
def_, uniq_suffix, package_requirements, is_row_processor
)
cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name)

Expand Down
2 changes: 1 addition & 1 deletion 2 tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ def add_one(x):
add_one_uniq, add_one_uniq_dir = make_uniq_udf(add_one)

# Expected cloud function name for the unique udf
add_one_uniq_cf_name = get_cloud_function_name(add_one_uniq)
add_one_uniq_cf_name, _ = get_cloud_function_name(add_one_uniq)

# There should be no cloud function yet for the unique udf
cloud_functions = list(
Expand Down
103 changes: 103 additions & 0 deletions 103 tests/system/small/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,109 @@ def test_read_gbq_function_enforces_explicit_types(
)


@pytest.mark.flaky(retries=2, delay=120)
def test_df_apply_axis_1(session, scalars_dfs):
columns = [
"bool_col",
"int64_col",
"int64_too",
"float64_col",
"string_col",
"bytes_col",
]
scalars_df, scalars_pandas_df = scalars_dfs

def add_ints(row):
return row["int64_col"] + row["int64_too"]

with pytest.warns(
bigframes.exceptions.PreviewWarning,
match="input_types=Series is in preview.",
):
add_ints_remote = session.remote_function(
bigframes.series.Series,
int,
)(add_ints)

with pytest.warns(
bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview."
):
bf_result = scalars_df[columns].apply(add_ints_remote, axis=1).to_pandas()

pd_result = scalars_pandas_df[columns].apply(add_ints, axis=1)

# bf_result.dtype is 'Int64' while pd_result.dtype is 'object', ignore this
# mismatch by using check_dtype=False.
#
# bf_result.to_numpy() produces an array of numpy.float64's
# (in system_prerelease tests), while pd_result.to_numpy() produces an
# array of ints, ignore this mismatch by using check_exact=False.
pd.testing.assert_series_equal(
pd_result, bf_result, check_dtype=False, check_exact=False
)


@pytest.mark.flaky(retries=2, delay=120)
def test_df_apply_axis_1_ordering(session, scalars_dfs):
columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"]
ordering_columns = ["bool_col", "int64_col"]
scalars_df, scalars_pandas_df = scalars_dfs

def add_ints(row):
return row["int64_col"] + row["int64_too"]

add_ints_remote = session.remote_function(bigframes.series.Series, int)(add_ints)

bf_result = (
scalars_df[columns]
.sort_values(ordering_columns)
.apply(add_ints_remote, axis=1)
.to_pandas()
)
pd_result = (
scalars_pandas_df[columns].sort_values(ordering_columns).apply(add_ints, axis=1)
)

# bf_result.dtype is 'Int64' while pd_result.dtype is 'object', ignore this
# mismatch by using check_dtype=False.
#
# bf_result.to_numpy() produces an array of numpy.float64's
# (in system_prerelease tests), while pd_result.to_numpy() produces an
# array of ints, ignore this mismatch by using check_exact=False.
pd.testing.assert_series_equal(
pd_result, bf_result, check_dtype=False, check_exact=False
)


@pytest.mark.flaky(retries=2, delay=120)
def test_df_apply_axis_1_multiindex(session):
pd_df = pd.DataFrame(
{"x": [1, 2, 3], "y": [1.5, 3.75, 5], "z": ["pq", "rs", "tu"]},
index=pd.MultiIndex.from_tuples([("a", 100), ("a", 200), ("b", 300)]),
)
bf_df = session.read_pandas(pd_df)

def add_numbers(row):
return row["x"] + row["y"]

add_numbers_remote = session.remote_function(bigframes.series.Series, float)(
add_numbers
)

bf_result = bf_df.apply(add_numbers_remote, axis=1).to_pandas()
pd_result = pd_df.apply(add_numbers, axis=1)

# bf_result.dtype is 'Float64' while pd_result.dtype is 'float64', ignore this
# mismatch by using check_dtype=False.
#
# bf_result.index[0].dtype is 'string[pyarrow]' while
# pd_result.index[0].dtype is 'object', ignore this mismatch by using
# check_index_type=False.
pd.testing.assert_series_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)


def test_df_apply_axis_1_unsupported_callable(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"]
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.