Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

test: ensure all remote_function APIs work in partial ordering mode #1000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 23, 2024
Merged
14 changes: 2 additions & 12 deletions 14 bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3473,11 +3473,7 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame:
raise ValueError(f"na_action={na_action} not supported")

# TODO(shobs): Support **kwargs
# Reproject as workaround to applying filter too late. This forces the
# filter to be applied before passing data to remote function,
# protecting from bad inputs causing errors.
reprojected_df = DataFrame(self._block._force_reproject())
return reprojected_df._apply_unary_op(
return self._apply_unary_op(
ops.RemoteFunctionOp(func=func, apply_on_null=(na_action is None))
)

Expand Down Expand Up @@ -3572,13 +3568,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
)

series_list = [self[col] for col in self.columns]
# Reproject as workaround to applying filter too late. This forces the
# filter to be applied before passing data to remote function,
# protecting from bad inputs causing errors.
reprojected_series = bigframes.series.Series(
series_list[0]._block._force_reproject()
)
result_series = reprojected_series._apply_nary_op(
result_series = series_list[0]._apply_nary_op(
ops.NaryRemoteFunctionOp(func=func), series_list[1:]
)
result_series.name = None
Expand Down
6 changes: 1 addition & 5 deletions 6 bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1480,11 +1480,7 @@ def combine(
ex.message += f"\n{_remote_function_recommendation_message}"
raise

# Reproject as workaround to applying filter too late. This forces the
# filter to be applied before passing data to remote function,
# protecting from bad inputs causing errors.
reprojected_series = Series(self._block._force_reproject())
result_series = reprojected_series._apply_binary_op(
result_series = self._apply_binary_op(
other, ops.BinaryRemoteFunctionOp(func=func)
)

Expand Down
246 changes: 221 additions & 25 deletions 246 tests/system/small/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,37 @@ def add_one(x):
assert_pandas_df_equal(bf_result, pd_result)


@pytest.mark.flaky(retries=2, delay=120)
def test_dataframe_applymap_explicit_filter(
session_with_bq_connection, scalars_dfs, dataset_id_permanent
):
def add_one(x):
return x + 1

remote_add_one = session_with_bq_connection.remote_function(
[int], int, dataset_id_permanent, name=get_rf_name(add_one)
)(add_one)

scalars_df, scalars_pandas_df = scalars_dfs
int64_cols = ["int64_col", "int64_too"]

bf_int64_df = scalars_df[int64_cols]
bf_int64_df_filtered = bf_int64_df[bf_int64_df["int64_col"].notnull()]
bf_result = bf_int64_df_filtered.applymap(remote_add_one).to_pandas()

pd_int64_df = scalars_pandas_df[int64_cols]
pd_int64_df_filtered = pd_int64_df[pd_int64_df["int64_col"].notnull()]
pd_result = pd_int64_df_filtered.applymap(add_one)
# TODO(shobs): Figure why pandas .applymap() changes the dtype, i.e.
# pd_int64_df_filtered.dtype is Int64Dtype()
# pd_int64_df_filtered.applymap(lambda x: x).dtype is int64.
# For this test let's force the pandas dtype to be same as input.
for col in pd_result:
pd_result[col] = pd_result[col].astype(pd_int64_df_filtered[col].dtype)

assert_pandas_df_equal(bf_result, pd_result)


@pytest.mark.flaky(retries=2, delay=120)
def test_dataframe_applymap_na_ignore(
session_with_bq_connection, scalars_dfs, dataset_id_permanent
Expand Down Expand Up @@ -1024,12 +1055,21 @@ def test_read_gbq_function_application_repr(session, dataset_id, scalars_df_inde
repr(s.mask(should_mask, "REDACTED"))


@pytest.mark.parametrize(
("method",),
[
pytest.param("apply"),
pytest.param("map"),
pytest.param("mask"),
],
)
@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_apply_after_filter(session, dataset_id_permanent, scalars_dfs):

def test_remote_function_unary_applied_after_filter(
session, dataset_id_permanent, scalars_dfs, method
):
# This function is deliberately written to not work with NA input
def plus_one(x: int) -> int:
return x + 1
def is_odd(x: int) -> bool:
return x % 2 == 1

scalars_df, scalars_pandas_df = scalars_dfs
int_col_name_with_nulls = "int64_col"
Expand All @@ -1038,47 +1078,203 @@ def plus_one(x: int) -> int:
assert any([pd.isna(val) for val in scalars_df[int_col_name_with_nulls]])

# create a remote function
plus_one_remote = session.remote_function(
dataset=dataset_id_permanent, name=get_rf_name(plus_one)
)(plus_one)
is_odd_remote = session.remote_function(
dataset=dataset_id_permanent, name=get_rf_name(is_odd)
)(is_odd)

# with nulls in the series the remote function application would fail
with pytest.raises(
google.api_core.exceptions.BadRequest, match="unsupported operand"
):
scalars_df[int_col_name_with_nulls].apply(plus_one_remote).to_pandas()
bf_method = getattr(scalars_df[int_col_name_with_nulls], method)
bf_method(is_odd_remote).to_pandas()

# after filtering out nulls the remote function application should works
# after filtering out nulls the remote function application should work
# similar to pandas
pd_result = scalars_pandas_df[scalars_pandas_df[int_col_name_with_nulls].notnull()][
int_col_name_with_nulls
].apply(plus_one)
bf_result = (
pd_method = getattr(
scalars_pandas_df[scalars_pandas_df[int_col_name_with_nulls].notnull()][
int_col_name_with_nulls
],
method,
)
pd_result = pd_method(is_odd)
bf_method = getattr(
scalars_df[scalars_df[int_col_name_with_nulls].notnull()][
int_col_name_with_nulls
]
.apply(plus_one_remote)
],
method,
)
bf_result = bf_method(is_odd_remote).to_pandas()

# ignore any dtype difference
pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_binary_applied_after_filter(
session, dataset_id_permanent, scalars_dfs
):
# This function is deliberately written to not work with NA input
def add(x: int, y: int) -> int:
return x + y

scalars_df, scalars_pandas_df = scalars_dfs
int_col_name_with_nulls = "int64_col"
int_col_name_no_nulls = "int64_too"
bf_df = scalars_df[[int_col_name_with_nulls, int_col_name_no_nulls]]
pd_df = scalars_pandas_df[[int_col_name_with_nulls, int_col_name_no_nulls]]

# make sure there are NA values in the test column
assert any([pd.isna(val) for val in bf_df[int_col_name_with_nulls]])

# create a remote function
add_remote = session.remote_function(
dataset=dataset_id_permanent, name=get_rf_name(add)
)(add)

# with nulls in the series the remote function application would fail
with pytest.raises(
google.api_core.exceptions.BadRequest, match="unsupported operand"
):
bf_df[int_col_name_with_nulls].combine(
bf_df[int_col_name_no_nulls], add_remote
).to_pandas()

# after filtering out nulls the remote function application should work
# similar to pandas
pd_filter = pd_df[int_col_name_with_nulls].notnull()
pd_result = pd_df[pd_filter][int_col_name_with_nulls].combine(
pd_df[pd_filter][int_col_name_no_nulls], add
)
bf_filter = bf_df[int_col_name_with_nulls].notnull()
bf_result = (
bf_df[bf_filter][int_col_name_with_nulls]
.combine(bf_df[bf_filter][int_col_name_no_nulls], add_remote)
.to_pandas()
)

# ignore pandas "int64" vs bigframes "Int64" dtype difference
# ignore any dtype difference
pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_apply_assign_partial_ordering_mode(dataset_id_permanent):
session = bigframes.Session(bigframes.BigQueryOptions(ordering_mode="partial"))
def test_remote_function_nary_applied_after_filter(
session, dataset_id_permanent, scalars_dfs
):
# This function is deliberately written to not work with NA input
def add(x: int, y: int, z: float) -> float:
return x + y + z

df = session.read_gbq("bigquery-public-data.baseball.schedules")[
scalars_df, scalars_pandas_df = scalars_dfs
int_col_name_with_nulls = "int64_col"
int_col_name_no_nulls = "int64_too"
float_col_name_with_nulls = "float64_col"
bf_df = scalars_df[
[int_col_name_with_nulls, int_col_name_no_nulls, float_col_name_with_nulls]
]
pd_df = scalars_pandas_df[
[int_col_name_with_nulls, int_col_name_no_nulls, float_col_name_with_nulls]
]

# make sure there are NA values in the test columns
assert any([pd.isna(val) for val in bf_df[int_col_name_with_nulls]])
assert any([pd.isna(val) for val in bf_df[float_col_name_with_nulls]])

# create a remote function
add_remote = session.remote_function(
dataset=dataset_id_permanent, name=get_rf_name(add)
)(add)

# pandas does not support nary functions, so let's create a proxy function
# for testing purpose that takes a series and in turn calls the naray function
def add_pandas(s: pd.Series) -> float:
return add(
s[int_col_name_with_nulls],
s[int_col_name_no_nulls],
s[float_col_name_with_nulls],
)

# with nulls in the series the remote function application would fail
with pytest.raises(
google.api_core.exceptions.BadRequest, match="unsupported operand"
):
bf_df.apply(add_remote, axis=1).to_pandas()

# after filtering out nulls the remote function application should work
# similar to pandas
pd_filter = (
pd_df[int_col_name_with_nulls].notnull()
& pd_df[float_col_name_with_nulls].notnull()
)
pd_result = pd_df[pd_filter].apply(add_pandas, axis=1)
bf_filter = (
bf_df[int_col_name_with_nulls].notnull()
& bf_df[float_col_name_with_nulls].notnull()
)
bf_result = bf_df[bf_filter].apply(add_remote, axis=1).to_pandas()

# ignore any dtype difference
pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)


@pytest.mark.parametrize(
("method",),
[
pytest.param("apply"),
pytest.param("map"),
pytest.param("mask"),
],
)
@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_unary_partial_ordering_mode_assign(
unordered_session, dataset_id_permanent, method
):
df = unordered_session.read_gbq("bigquery-public-data.baseball.schedules")[
["duration_minutes"]
]

def plus_one(x: int) -> int:
return x + 1
def is_long_duration(minutes: int) -> bool:
return minutes >= 120

is_long_duration = unordered_session.remote_function(
dataset=dataset_id_permanent, name=get_rf_name(is_long_duration)
)(is_long_duration)

method = getattr(df["duration_minutes"], method)

df1 = df.assign(duration_meta=method(is_long_duration))
repr(df1)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_binary_partial_ordering_mode_assign(
unordered_session, dataset_id_permanent, scalars_df_index
):
def combiner(x: int, y: int) -> int:
if x is None:
return y
return x

combiner = unordered_session.remote_function(
dataset=dataset_id_permanent, name=get_rf_name(combiner)
)(combiner)

df = scalars_df_index[["int64_col", "int64_too", "float64_col", "string_col"]]
df1 = df.assign(int64_combined=df["int64_col"].combine(df["int64_too"], combiner))
repr(df1)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_nary_partial_ordering_mode_assign(
unordered_session, dataset_id_permanent, scalars_df_index
):
def processor(x: int, y: int, z: float, w: str) -> str:
return f"I got x={x}, y={y}, z={z} and w={w}"

plus_one = session.remote_function(
dataset=dataset_id_permanent, name=get_rf_name(plus_one)
)(plus_one)
processor = unordered_session.remote_function(
dataset=dataset_id_permanent, name=get_rf_name(processor)
)(processor)

df1 = df.assign(duration_cat=df["duration_minutes"].apply(plus_one))
df = scalars_df_index[["int64_col", "int64_too", "float64_col", "string_col"]]
df1 = df.assign(combined=df.apply(processor, axis=1))
repr(df1)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.