diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index 0bc9f2e370..0fabcff13b 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -191,19 +191,27 @@ def normalized_impl(args: typing.Sequence[ibis_types.Value], op: ops.RowOp): return decorator - def register_nary_op(self, op_ref: typing.Union[ops.NaryOp, type[ops.NaryOp]]): + def register_nary_op( + self, op_ref: typing.Union[ops.NaryOp, type[ops.NaryOp]], pass_op: bool = False + ): """ Decorator to register a nary op implementation. Args: op_ref (NaryOp or NaryOp type): Class or instance of operator that is implemented by the decorated function. + pass_op (bool): + Set to true if implementation takes the operator object as the last argument. + This is needed for parameterized ops where parameters are part of op object. """ key = typing.cast(str, op_ref.name) def decorator(impl: typing.Callable[..., ibis_types.Value]): def normalized_impl(args: typing.Sequence[ibis_types.Value], op: ops.RowOp): - return impl(*args) + if pass_op: + return impl(*args, op=op) + else: + return impl(*args) self._register(key, normalized_impl) return impl @@ -1444,6 +1452,7 @@ def clip_op( ) +# N-ary Operations @scalar_op_compiler.register_nary_op(ops.case_when_op) def case_when_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value: # ibis can handle most type coercions, but we need to force bool -> int @@ -1463,6 +1472,19 @@ def case_when_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value: return case_val.end() +@scalar_op_compiler.register_nary_op(ops.NaryRemoteFunctionOp, pass_op=True) +def nary_remote_function_op_impl( + *operands: ibis_types.Value, op: ops.NaryRemoteFunctionOp +): + ibis_node = getattr(op.func, "ibis_node", None) + if ibis_node is None: + raise TypeError( + f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}" + ) + result = ibis_node(*operands) + return result + + # Helpers def is_null(value) -> bool: # float NaN/inf should be treated as distinct from 'true' null values diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 9789c7cf9f..9d3b153d3a 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3433,9 +3433,9 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: raise ValueError(f"na_action={na_action} not supported") # TODO(shobs): Support **kwargs - # Reproject as workaround to applying filter too late. This forces the filter - # to be applied before passing data to remote function, protecting from bad - # inputs causing errors. + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. reprojected_df = DataFrame(self._block._force_reproject()) return reprojected_df._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=(na_action is None)) @@ -3448,65 +3448,99 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): category=bigframes.exceptions.PreviewWarning, ) - # Early check whether the dataframe dtypes are currently supported - # in the remote function - # NOTE: Keep in sync with the value converters used in the gcf code - # generated in remote_function_template.py - remote_function_supported_dtypes = ( - bigframes.dtypes.INT_DTYPE, - bigframes.dtypes.FLOAT_DTYPE, - bigframes.dtypes.BOOL_DTYPE, - bigframes.dtypes.BYTES_DTYPE, - bigframes.dtypes.STRING_DTYPE, - ) - supported_dtypes_types = tuple( - type(dtype) - for dtype in remote_function_supported_dtypes - if not isinstance(dtype, pandas.ArrowDtype) - ) - # Check ArrowDtype separately since multiple BigQuery types map to - # ArrowDtype, including BYTES and TIMESTAMP. - supported_arrow_types = tuple( - dtype.pyarrow_dtype - for dtype in remote_function_supported_dtypes - if isinstance(dtype, pandas.ArrowDtype) - ) - supported_dtypes_hints = tuple( - str(dtype) for dtype in remote_function_supported_dtypes - ) - - for dtype in self.dtypes: - if ( - # Not one of the pandas/numpy types. - not isinstance(dtype, supported_dtypes_types) - # And not one of the arrow types. - and not ( - isinstance(dtype, pandas.ArrowDtype) - and any( - dtype.pyarrow_dtype.equals(arrow_type) - for arrow_type in supported_arrow_types - ) - ) - ): - raise NotImplementedError( - f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1." - f" Supported dtypes are {supported_dtypes_hints}." - ) - # Check if the function is a remote function if not hasattr(func, "bigframes_remote_function"): raise ValueError("For axis=1 a remote function must be used.") - # Serialize the rows as json values - block = self._get_block() - rows_as_json_series = bigframes.series.Series( - block._get_rows_as_json_values() - ) + is_row_processor = getattr(func, "is_row_processor") + if is_row_processor: + # Early check whether the dataframe dtypes are currently supported + # in the remote function + # NOTE: Keep in sync with the value converters used in the gcf code + # generated in remote_function_template.py + remote_function_supported_dtypes = ( + bigframes.dtypes.INT_DTYPE, + bigframes.dtypes.FLOAT_DTYPE, + bigframes.dtypes.BOOL_DTYPE, + bigframes.dtypes.BYTES_DTYPE, + bigframes.dtypes.STRING_DTYPE, + ) + supported_dtypes_types = tuple( + type(dtype) + for dtype in remote_function_supported_dtypes + if not isinstance(dtype, pandas.ArrowDtype) + ) + # Check ArrowDtype separately since multiple BigQuery types map to + # ArrowDtype, including BYTES and TIMESTAMP. + supported_arrow_types = tuple( + dtype.pyarrow_dtype + for dtype in remote_function_supported_dtypes + if isinstance(dtype, pandas.ArrowDtype) + ) + supported_dtypes_hints = tuple( + str(dtype) for dtype in remote_function_supported_dtypes + ) - # Apply the function - result_series = rows_as_json_series._apply_unary_op( - ops.RemoteFunctionOp(func=func, apply_on_null=True) - ) + for dtype in self.dtypes: + if ( + # Not one of the pandas/numpy types. + not isinstance(dtype, supported_dtypes_types) + # And not one of the arrow types. + and not ( + isinstance(dtype, pandas.ArrowDtype) + and any( + dtype.pyarrow_dtype.equals(arrow_type) + for arrow_type in supported_arrow_types + ) + ) + ): + raise NotImplementedError( + f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1." + f" Supported dtypes are {supported_dtypes_hints}." + ) + + # Serialize the rows as json values + block = self._get_block() + rows_as_json_series = bigframes.series.Series( + block._get_rows_as_json_values() + ) + + # Apply the function + result_series = rows_as_json_series._apply_unary_op( + ops.RemoteFunctionOp(func=func, apply_on_null=True) + ) + else: + # This is a special case where we are providing not-pandas-like + # extension. If the remote function can take one or more params + # then we assume that here the user intention is to use the + # column values of the dataframe as arguments to the function. + # For this to work the following condition must be true: + # 1. The number or input params in the function must be same + # as the number of columns in the dataframe + # 2. The dtypes of the columns in the dataframe must be + # compatible with the data types of the input params + # 3. The order of the columns in the dataframe must correspond + # to the order of the input params in the function + udf_input_dtypes = getattr(func, "input_dtypes") + if len(udf_input_dtypes) != len(self.columns): + raise ValueError( + f"Remote function takes {len(udf_input_dtypes)} arguments but DataFrame has {len(self.columns)} columns." + ) + if udf_input_dtypes != tuple(self.dtypes.to_list()): + raise ValueError( + f"Remote function takes arguments of types {udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}." + ) + + series_list = [self[col] for col in self.columns] + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. + reprojected_series = bigframes.series.Series( + series_list[0]._block._force_reproject() + ) + result_series = reprojected_series._apply_nary_op( + ops.NaryRemoteFunctionOp(func=func), series_list[1:] + ) result_series.name = None # Return Series with materialized result so that any error in the remote diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 1d31749760..6c5b66bc47 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -57,3 +57,7 @@ class QueryComplexityError(RuntimeError): class TimeTravelDisabledWarning(Warning): """A query was reattempted without time travel.""" + + +class UnknownDataTypeWarning(Warning): + """Data type is unknown.""" diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index d84fbcdbab..b3c6aee1b3 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -66,6 +66,7 @@ from bigframes import clients import bigframes.constants as constants import bigframes.core.compile.ibis_types +import bigframes.dtypes import bigframes.functions.remote_function_template logger = logging.getLogger(__name__) @@ -895,8 +896,8 @@ def remote_function( reuse (bool, Optional): Reuse the remote function if already exists. `True` by default, which will result in reusing an existing remote - function and corresponding cloud function (if any) that was - previously created for the same udf. + function and corresponding cloud function that was previously + created (if any) for the same udf. Please note that for an unnamed (i.e. created without an explicit `name` argument) remote function, the BigQuery DataFrames session id is attached in the cloud artifacts names. So for the @@ -1174,7 +1175,9 @@ def try_delattr(attr): try_delattr("bigframes_cloud_function") try_delattr("bigframes_remote_function") + try_delattr("input_dtypes") try_delattr("output_dtype") + try_delattr("is_row_processor") try_delattr("ibis_node") ( @@ -1216,12 +1219,20 @@ def try_delattr(attr): rf_name ) ) - + func.input_dtypes = tuple( + [ + bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( + input_type + ) + for input_type in ibis_signature.input_types + ] + ) func.output_dtype = ( bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( ibis_signature.output_type ) ) + func.is_row_processor = is_row_processor func.ibis_node = node # If a new remote function was created, update the cloud artifacts @@ -1305,6 +1316,29 @@ def func(*ignored_args, **ignored_kwargs): signature=(ibis_signature.input_types, ibis_signature.output_type), ) func.bigframes_remote_function = str(routine_ref) # type: ignore + + # set input bigframes data types + has_unknown_dtypes = False + function_input_dtypes = [] + for ibis_type in ibis_signature.input_types: + input_dtype = cast(bigframes.dtypes.Dtype, bigframes.dtypes.DEFAULT_DTYPE) + if ibis_type is None: + has_unknown_dtypes = True + else: + input_dtype = ( + bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( + ibis_type + ) + ) + function_input_dtypes.append(input_dtype) + if has_unknown_dtypes: + warnings.warn( + "The function has one or more missing input data types." + f" BigQuery DataFrames will assume default data type {bigframes.dtypes.DEFAULT_DTYPE} for them.", + category=bigframes.exceptions.UnknownDataTypeWarning, + ) + func.input_dtypes = tuple(function_input_dtypes) # type: ignore + func.output_dtype = bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( # type: ignore ibis_signature.output_type ) diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index 145c415ca0..4400288db2 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -659,6 +659,19 @@ def output_type(self, *input_types): raise AttributeError("output_dtype not defined") +@dataclasses.dataclass(frozen=True) +class NaryRemoteFunctionOp(NaryOp): + name: typing.ClassVar[str] = "nary_remote_function" + func: typing.Callable + + def output_type(self, *input_types): + # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method + if hasattr(self.func, "output_dtype"): + return self.func.output_dtype + else: + raise AttributeError("output_dtype not defined") + + add_op = AddOp() sub_op = SubOp() mul_op = create_binary_op(name="mul", type_signature=op_typing.BINARY_NUMERIC) diff --git a/bigframes/series.py b/bigframes/series.py index 1a5661529c..9e33801834 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1442,9 +1442,6 @@ def apply( ) -> Series: # TODO(shobs, b/274645634): Support convert_dtype, args, **kwargs # is actually a ternary op - # Reproject as workaround to applying filter too late. This forces the filter - # to be applied before passing data to remote function, protecting from bad - # inputs causing errors. if by_row not in ["compat", False]: raise ValueError("Param by_row must be one of 'compat' or False") @@ -1474,7 +1471,10 @@ def apply( ex.message += f"\n{_remote_function_recommendation_message}" raise - # We are working with remote function at this point + # We are working with remote function at this point. + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. reprojected_series = Series(self._block._force_reproject()) result_series = reprojected_series._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=True) @@ -1507,6 +1507,9 @@ def combine( ex.message += f"\n{_remote_function_recommendation_message}" raise + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. reprojected_series = Series(self._block._force_reproject()) result_series = reprojected_series._apply_binary_op( other, ops.BinaryRemoteFunctionOp(func=func) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 98cba867f2..233e6ef930 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1661,8 +1661,8 @@ def remote_function( reuse (bool, Optional): Reuse the remote function if already exists. `True` by default, which will result in reusing an existing remote - function and corresponding cloud function (if any) that was - previously created for the same udf. + function and corresponding cloud function that was previously + created (if any) for the same udf. Please note that for an unnamed (i.e. created without an explicit `name` argument) remote function, the BigQuery DataFrames session id is attached in the cloud artifacts names. So for the diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 303c74f1fd..095f7059cd 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -28,6 +28,9 @@ import test_utils.prefixer import bigframes +import bigframes.dataframe +import bigframes.dtypes +import bigframes.exceptions import bigframes.functions.remote_function as bigframes_rf import bigframes.pandas as bpd import bigframes.series @@ -363,7 +366,8 @@ def test_remote_function_input_types(session, scalars_dfs, input_types): def add_one(x): return x + 1 - remote_add_one = session.remote_function(input_types, int)(add_one) + remote_add_one = session.remote_function(input_types, int, reuse=False)(add_one) + assert remote_add_one.input_dtypes == (bigframes.dtypes.INT_DTYPE,) scalars_df, scalars_pandas_df = scalars_dfs @@ -1589,6 +1593,8 @@ def serialize_row(row): bigframes.series.Series, str, reuse=False )(serialize_row) + assert getattr(serialize_row_remote, "is_row_processor") + bf_result = scalars_df[columns].apply(serialize_row_remote, axis=1).to_pandas() pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) @@ -1622,7 +1628,11 @@ def analyze(row): } ) - analyze_remote = session.remote_function(bigframes.series.Series, str)(analyze) + analyze_remote = session.remote_function( + bigframes.series.Series, str, reuse=False + )(analyze) + + assert getattr(analyze_remote, "is_row_processor") bf_result = ( scalars_df[columns].dropna().apply(analyze_remote, axis=1).to_pandas() @@ -1727,6 +1737,8 @@ def serialize_row(row): bigframes.series.Series, str, reuse=False )(serialize_row) + assert getattr(serialize_row_remote, "is_row_processor") + bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() pd_result = pd_df.apply(serialize_row, axis=1) @@ -1787,6 +1799,8 @@ def float_parser(row): bigframes.series.Series, float, reuse=False )(float_parser) + assert getattr(float_parser_remote, "is_row_processor") + pd_result = pd_df.apply(float_parser, axis=1) bf_result = bf_df.apply(float_parser_remote, axis=1).to_pandas() @@ -1913,7 +1927,7 @@ def test_remote_function_named_perists_w_session_cleanup(): name = test_utils.prefixer.Prefixer("bigframes", "").create_prefix() # create an unnamed remote function in the session - @session.remote_function(name=name) + @session.remote_function(reuse=False, name=name) def foo(x: int) -> int: return x + 1 @@ -2004,3 +2018,137 @@ def foo_named(x: int) -> int: cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, foo_named ) + + +def test_df_apply_axis_1_multiple_params(session): + bf_df = bigframes.dataframe.DataFrame( + { + "Id": [1, 2, 3], + "Age": [22.5, 23, 23.5], + "Name": ["alpha", "beta", "gamma"], + } + ) + + expected_dtypes = ( + bigframes.dtypes.INT_DTYPE, + bigframes.dtypes.FLOAT_DTYPE, + bigframes.dtypes.STRING_DTYPE, + ) + + # Assert the dataframe dtypes + assert tuple(bf_df.dtypes) == expected_dtypes + + try: + + @session.remote_function([int, float, str], str, reuse=False) + def foo(x, y, z): + return f"I got {x}, {y} and {z}" + + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes + + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", + ): + bf_df[["Id", "Age"]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) + + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + "I got 1, 22.5 and alpha", + "I got 2, 23 and beta", + "I got 3, 23.5 and gamma", + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, foo + ) + + +def test_df_apply_axis_1_single_param_non_series(session): + bf_df = bigframes.dataframe.DataFrame( + { + "Id": [1, 2, 3], + } + ) + + expected_dtypes = (bigframes.dtypes.INT_DTYPE,) + + # Assert the dataframe dtypes + assert tuple(bf_df.dtypes) == expected_dtypes + + try: + + @session.remote_function([int], str, reuse=False) + def foo(x): + return f"I got {x}" + + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes + + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 1 arguments but DataFrame has 0 columns\\.$", + ): + bf_df[[]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 1 arguments but DataFrame has 2 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) + + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Id=bf_df["Id"].astype("Float64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + "I got 1", + "I got 2", + "I got 3", + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, foo + ) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index c07a0afb44..8ecf9eb368 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -21,6 +21,7 @@ import pytest import bigframes +import bigframes.dtypes import bigframes.exceptions from bigframes.functions import remote_function as rf from tests.system.utils import assert_pandas_df_equal @@ -708,6 +709,8 @@ def test_read_gbq_function_reads_udfs(session, bigquery_client, dataset_id): # It should point to the named routine and yield the expected results. assert square.bigframes_remote_function == str(routine.reference) + assert square.input_dtypes == (bigframes.dtypes.INT_DTYPE,) + assert square.output_dtype == bigframes.dtypes.INT_DTYPE src = {"x": [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5]} @@ -776,10 +779,14 @@ def test_read_gbq_function_enforces_explicit_types( str(both_types_specified.reference), session=session, ) - rf.read_gbq_function( - str(only_return_type_specified.reference), - session=session, - ) + with pytest.warns( + bigframes.exceptions.UnknownDataTypeWarning, + match="missing input data types.*assume default data type", + ): + rf.read_gbq_function( + str(only_return_type_specified.reference), + session=session, + ) with pytest.raises(ValueError): rf.read_gbq_function( str(only_arg_type_specified.reference), @@ -919,36 +926,41 @@ def add_ints(row): scalars_df[columns].apply(add_ints, axis=1) -@pytest.mark.parametrize( - ("column"), - [ - pytest.param("date_col"), - pytest.param("datetime_col"), - pytest.param("geography_col"), - pytest.param("numeric_col"), - pytest.param("time_col"), - pytest.param("timestamp_col"), - ], -) -def test_df_apply_axis_1_unsupported_dtype(scalars_dfs, column): - scalars_df, scalars_pandas_df = scalars_dfs - - # It doesn't matter if it is a remote function or not, the dtype check - # is done even before the function type check with axis=1 - def echo(row): - return row[column] +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_unsupported_dtype(session, scalars_dfs, dataset_id_permanent): + columns_with_not_supported_dtypes = [ + "date_col", + "datetime_col", + "geography_col", + "numeric_col", + "time_col", + "timestamp_col", + ] - # pandas works - scalars_pandas_df[[column]].apply(echo, axis=1) + scalars_df, scalars_pandas_df = scalars_dfs - dtype = scalars_df[column].dtype + def echo_len(row): + return len(row) - with pytest.raises( - NotImplementedError, - match=re.escape( - f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1. Supported dtypes are (" - ), - ), pytest.warns( - bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." - ): - scalars_df[[column]].apply(echo, axis=1) + echo_len_remote = session.remote_function( + bigframes.series.Series, + float, + dataset_id_permanent, + name=get_rf_name(echo_len, is_row_processor=True), + )(echo_len) + + for column in columns_with_not_supported_dtypes: + # pandas works + scalars_pandas_df[[column]].apply(echo_len, axis=1) + + dtype = scalars_df[column].dtype + + with pytest.raises( + NotImplementedError, + match=re.escape( + f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1. Supported dtypes are (" + ), + ), pytest.warns( + bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." + ): + scalars_df[[column]].apply(echo_len_remote, axis=1) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 7048d9c6dd..10565a2552 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4361,9 +4361,50 @@ def apply(self, func, *, axis=0, args=(), **kwargs): 1 19 dtype: Int64 + You could also apply a remote function which accepts multiple parameters + to every row of a DataFrame by using it with `axis=1` if the DataFrame + has matching number of columns and data types. Note: This feature is + currently in **preview**. + + >>> df = bpd.DataFrame({ + ... 'col1': [1, 2], + ... 'col2': [3, 4], + ... 'col3': [5, 5] + ... }) + >>> df + col1 col2 col3 + 0 1 3 5 + 1 2 4 5 + + [2 rows x 3 columns] + + >>> @bpd.remote_function(reuse=False) + ... def foo(x: int, y: int, z: int) -> float: + ... result = 1 + ... result += x + ... result += y/z + ... return result + + >>> df.apply(foo, axis=1) + 0 2.6 + 1 3.8 + dtype: Float64 + Args: func (function): - Function to apply to each column or row. + Function to apply to each column or row. To apply to each row + (i.e. when `axis=1` is specified) the function can be of one of + the two types: + + (1). It accepts a single input parameter of type `Series`, in + which case each row is delivered to the function as a pandas + Series. + + (2). It accept one or more parameters, in which case column values + are delivered to the function as separate arguments (mapping + to those parameters) for each row. For this to work the + `DataFrame` must have same number of columns and matching + data types. axis ({index (0), columns (1)}): Axis along which the function is applied. Specify 0 or 'index' to apply function to each column. Specify 1 or 'columns' to