diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 817a02f492..49a668f008 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3473,11 +3473,7 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: raise ValueError(f"na_action={na_action} not supported") # TODO(shobs): Support **kwargs - # Reproject as workaround to applying filter too late. This forces the - # filter to be applied before passing data to remote function, - # protecting from bad inputs causing errors. - reprojected_df = DataFrame(self._block._force_reproject()) - return reprojected_df._apply_unary_op( + return self._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=(na_action is None)) ) @@ -3572,13 +3568,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ) series_list = [self[col] for col in self.columns] - # Reproject as workaround to applying filter too late. This forces the - # filter to be applied before passing data to remote function, - # protecting from bad inputs causing errors. - reprojected_series = bigframes.series.Series( - series_list[0]._block._force_reproject() - ) - result_series = reprojected_series._apply_nary_op( + result_series = series_list[0]._apply_nary_op( ops.NaryRemoteFunctionOp(func=func), series_list[1:] ) result_series.name = None diff --git a/bigframes/series.py b/bigframes/series.py index 82fb6c5089..193eea7ee3 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1480,11 +1480,7 @@ def combine( ex.message += f"\n{_remote_function_recommendation_message}" raise - # Reproject as workaround to applying filter too late. This forces the - # filter to be applied before passing data to remote function, - # protecting from bad inputs causing errors. - reprojected_series = Series(self._block._force_reproject()) - result_series = reprojected_series._apply_binary_op( + result_series = self._apply_binary_op( other, ops.BinaryRemoteFunctionOp(func=func) ) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 5ffda56f92..f68589f431 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -498,6 +498,37 @@ def add_one(x): assert_pandas_df_equal(bf_result, pd_result) +@pytest.mark.flaky(retries=2, delay=120) +def test_dataframe_applymap_explicit_filter( + session_with_bq_connection, scalars_dfs, dataset_id_permanent +): + def add_one(x): + return x + 1 + + remote_add_one = session_with_bq_connection.remote_function( + [int], int, dataset_id_permanent, name=get_rf_name(add_one) + )(add_one) + + scalars_df, scalars_pandas_df = scalars_dfs + int64_cols = ["int64_col", "int64_too"] + + bf_int64_df = scalars_df[int64_cols] + bf_int64_df_filtered = bf_int64_df[bf_int64_df["int64_col"].notnull()] + bf_result = bf_int64_df_filtered.applymap(remote_add_one).to_pandas() + + pd_int64_df = scalars_pandas_df[int64_cols] + pd_int64_df_filtered = pd_int64_df[pd_int64_df["int64_col"].notnull()] + pd_result = pd_int64_df_filtered.applymap(add_one) + # TODO(shobs): Figure why pandas .applymap() changes the dtype, i.e. + # pd_int64_df_filtered.dtype is Int64Dtype() + # pd_int64_df_filtered.applymap(lambda x: x).dtype is int64. + # For this test let's force the pandas dtype to be same as input. + for col in pd_result: + pd_result[col] = pd_result[col].astype(pd_int64_df_filtered[col].dtype) + + assert_pandas_df_equal(bf_result, pd_result) + + @pytest.mark.flaky(retries=2, delay=120) def test_dataframe_applymap_na_ignore( session_with_bq_connection, scalars_dfs, dataset_id_permanent @@ -1024,12 +1055,21 @@ def test_read_gbq_function_application_repr(session, dataset_id, scalars_df_inde repr(s.mask(should_mask, "REDACTED")) +@pytest.mark.parametrize( + ("method",), + [ + pytest.param("apply"), + pytest.param("map"), + pytest.param("mask"), + ], +) @pytest.mark.flaky(retries=2, delay=120) -def test_remote_function_apply_after_filter(session, dataset_id_permanent, scalars_dfs): - +def test_remote_function_unary_applied_after_filter( + session, dataset_id_permanent, scalars_dfs, method +): # This function is deliberately written to not work with NA input - def plus_one(x: int) -> int: - return x + 1 + def is_odd(x: int) -> bool: + return x % 2 == 1 scalars_df, scalars_pandas_df = scalars_dfs int_col_name_with_nulls = "int64_col" @@ -1038,47 +1078,203 @@ def plus_one(x: int) -> int: assert any([pd.isna(val) for val in scalars_df[int_col_name_with_nulls]]) # create a remote function - plus_one_remote = session.remote_function( - dataset=dataset_id_permanent, name=get_rf_name(plus_one) - )(plus_one) + is_odd_remote = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(is_odd) + )(is_odd) # with nulls in the series the remote function application would fail with pytest.raises( google.api_core.exceptions.BadRequest, match="unsupported operand" ): - scalars_df[int_col_name_with_nulls].apply(plus_one_remote).to_pandas() + bf_method = getattr(scalars_df[int_col_name_with_nulls], method) + bf_method(is_odd_remote).to_pandas() - # after filtering out nulls the remote function application should works + # after filtering out nulls the remote function application should work # similar to pandas - pd_result = scalars_pandas_df[scalars_pandas_df[int_col_name_with_nulls].notnull()][ - int_col_name_with_nulls - ].apply(plus_one) - bf_result = ( + pd_method = getattr( + scalars_pandas_df[scalars_pandas_df[int_col_name_with_nulls].notnull()][ + int_col_name_with_nulls + ], + method, + ) + pd_result = pd_method(is_odd) + bf_method = getattr( scalars_df[scalars_df[int_col_name_with_nulls].notnull()][ int_col_name_with_nulls - ] - .apply(plus_one_remote) + ], + method, + ) + bf_result = bf_method(is_odd_remote).to_pandas() + + # ignore any dtype difference + pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_binary_applied_after_filter( + session, dataset_id_permanent, scalars_dfs +): + # This function is deliberately written to not work with NA input + def add(x: int, y: int) -> int: + return x + y + + scalars_df, scalars_pandas_df = scalars_dfs + int_col_name_with_nulls = "int64_col" + int_col_name_no_nulls = "int64_too" + bf_df = scalars_df[[int_col_name_with_nulls, int_col_name_no_nulls]] + pd_df = scalars_pandas_df[[int_col_name_with_nulls, int_col_name_no_nulls]] + + # make sure there are NA values in the test column + assert any([pd.isna(val) for val in bf_df[int_col_name_with_nulls]]) + + # create a remote function + add_remote = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(add) + )(add) + + # with nulls in the series the remote function application would fail + with pytest.raises( + google.api_core.exceptions.BadRequest, match="unsupported operand" + ): + bf_df[int_col_name_with_nulls].combine( + bf_df[int_col_name_no_nulls], add_remote + ).to_pandas() + + # after filtering out nulls the remote function application should work + # similar to pandas + pd_filter = pd_df[int_col_name_with_nulls].notnull() + pd_result = pd_df[pd_filter][int_col_name_with_nulls].combine( + pd_df[pd_filter][int_col_name_no_nulls], add + ) + bf_filter = bf_df[int_col_name_with_nulls].notnull() + bf_result = ( + bf_df[bf_filter][int_col_name_with_nulls] + .combine(bf_df[bf_filter][int_col_name_no_nulls], add_remote) .to_pandas() ) - # ignore pandas "int64" vs bigframes "Int64" dtype difference + # ignore any dtype difference pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) -def test_remote_function_apply_assign_partial_ordering_mode(dataset_id_permanent): - session = bigframes.Session(bigframes.BigQueryOptions(ordering_mode="partial")) +def test_remote_function_nary_applied_after_filter( + session, dataset_id_permanent, scalars_dfs +): + # This function is deliberately written to not work with NA input + def add(x: int, y: int, z: float) -> float: + return x + y + z - df = session.read_gbq("bigquery-public-data.baseball.schedules")[ + scalars_df, scalars_pandas_df = scalars_dfs + int_col_name_with_nulls = "int64_col" + int_col_name_no_nulls = "int64_too" + float_col_name_with_nulls = "float64_col" + bf_df = scalars_df[ + [int_col_name_with_nulls, int_col_name_no_nulls, float_col_name_with_nulls] + ] + pd_df = scalars_pandas_df[ + [int_col_name_with_nulls, int_col_name_no_nulls, float_col_name_with_nulls] + ] + + # make sure there are NA values in the test columns + assert any([pd.isna(val) for val in bf_df[int_col_name_with_nulls]]) + assert any([pd.isna(val) for val in bf_df[float_col_name_with_nulls]]) + + # create a remote function + add_remote = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(add) + )(add) + + # pandas does not support nary functions, so let's create a proxy function + # for testing purpose that takes a series and in turn calls the naray function + def add_pandas(s: pd.Series) -> float: + return add( + s[int_col_name_with_nulls], + s[int_col_name_no_nulls], + s[float_col_name_with_nulls], + ) + + # with nulls in the series the remote function application would fail + with pytest.raises( + google.api_core.exceptions.BadRequest, match="unsupported operand" + ): + bf_df.apply(add_remote, axis=1).to_pandas() + + # after filtering out nulls the remote function application should work + # similar to pandas + pd_filter = ( + pd_df[int_col_name_with_nulls].notnull() + & pd_df[float_col_name_with_nulls].notnull() + ) + pd_result = pd_df[pd_filter].apply(add_pandas, axis=1) + bf_filter = ( + bf_df[int_col_name_with_nulls].notnull() + & bf_df[float_col_name_with_nulls].notnull() + ) + bf_result = bf_df[bf_filter].apply(add_remote, axis=1).to_pandas() + + # ignore any dtype difference + pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + +@pytest.mark.parametrize( + ("method",), + [ + pytest.param("apply"), + pytest.param("map"), + pytest.param("mask"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_unary_partial_ordering_mode_assign( + unordered_session, dataset_id_permanent, method +): + df = unordered_session.read_gbq("bigquery-public-data.baseball.schedules")[ ["duration_minutes"] ] - def plus_one(x: int) -> int: - return x + 1 + def is_long_duration(minutes: int) -> bool: + return minutes >= 120 + + is_long_duration = unordered_session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(is_long_duration) + )(is_long_duration) + + method = getattr(df["duration_minutes"], method) + + df1 = df.assign(duration_meta=method(is_long_duration)) + repr(df1) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_binary_partial_ordering_mode_assign( + unordered_session, dataset_id_permanent, scalars_df_index +): + def combiner(x: int, y: int) -> int: + if x is None: + return y + return x + + combiner = unordered_session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(combiner) + )(combiner) + + df = scalars_df_index[["int64_col", "int64_too", "float64_col", "string_col"]] + df1 = df.assign(int64_combined=df["int64_col"].combine(df["int64_too"], combiner)) + repr(df1) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_nary_partial_ordering_mode_assign( + unordered_session, dataset_id_permanent, scalars_df_index +): + def processor(x: int, y: int, z: float, w: str) -> str: + return f"I got x={x}, y={y}, z={z} and w={w}" - plus_one = session.remote_function( - dataset=dataset_id_permanent, name=get_rf_name(plus_one) - )(plus_one) + processor = unordered_session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(processor) + )(processor) - df1 = df.assign(duration_cat=df["duration_minutes"].apply(plus_one)) + df = scalars_df_index[["int64_col", "int64_too", "float64_col", "string_col"]] + df1 = df.assign(combined=df.apply(processor, axis=1)) repr(df1)