From fdee925531b45ae99ec48d9b71781c2e7994b484 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sun, 21 Jul 2024 06:09:22 +0000 Subject: [PATCH 01/10] feat: extend `df.apply(axis=1)` to support remote function with mutiple params --- bigframes/core/compile/scalar_op_compiler.py | 26 +++- bigframes/dataframe.py | 136 +++++++++++-------- bigframes/functions/remote_function.py | 15 +- bigframes/operations/__init__.py | 13 ++ tests/system/large/test_remote_function.py | 80 ++++++++++- tests/system/small/test_remote_function.py | 67 +++++---- 6 files changed, 248 insertions(+), 89 deletions(-) diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index 0bc9f2e370..0fabcff13b 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -191,19 +191,27 @@ def normalized_impl(args: typing.Sequence[ibis_types.Value], op: ops.RowOp): return decorator - def register_nary_op(self, op_ref: typing.Union[ops.NaryOp, type[ops.NaryOp]]): + def register_nary_op( + self, op_ref: typing.Union[ops.NaryOp, type[ops.NaryOp]], pass_op: bool = False + ): """ Decorator to register a nary op implementation. Args: op_ref (NaryOp or NaryOp type): Class or instance of operator that is implemented by the decorated function. + pass_op (bool): + Set to true if implementation takes the operator object as the last argument. + This is needed for parameterized ops where parameters are part of op object. """ key = typing.cast(str, op_ref.name) def decorator(impl: typing.Callable[..., ibis_types.Value]): def normalized_impl(args: typing.Sequence[ibis_types.Value], op: ops.RowOp): - return impl(*args) + if pass_op: + return impl(*args, op=op) + else: + return impl(*args) self._register(key, normalized_impl) return impl @@ -1444,6 +1452,7 @@ def clip_op( ) +# N-ary Operations @scalar_op_compiler.register_nary_op(ops.case_when_op) def case_when_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value: # ibis can handle most type coercions, but we need to force bool -> int @@ -1463,6 +1472,19 @@ def case_when_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value: return case_val.end() +@scalar_op_compiler.register_nary_op(ops.NaryRemoteFunctionOp, pass_op=True) +def nary_remote_function_op_impl( + *operands: ibis_types.Value, op: ops.NaryRemoteFunctionOp +): + ibis_node = getattr(op.func, "ibis_node", None) + if ibis_node is None: + raise TypeError( + f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}" + ) + result = ibis_node(*operands) + return result + + # Helpers def is_null(value) -> bool: # float NaN/inf should be treated as distinct from 'true' null values diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 4dcc4414ed..dfb25a14ac 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3432,65 +3432,95 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): category=bigframes.exceptions.PreviewWarning, ) - # Early check whether the dataframe dtypes are currently supported - # in the remote function - # NOTE: Keep in sync with the value converters used in the gcf code - # generated in remote_function_template.py - remote_function_supported_dtypes = ( - bigframes.dtypes.INT_DTYPE, - bigframes.dtypes.FLOAT_DTYPE, - bigframes.dtypes.BOOL_DTYPE, - bigframes.dtypes.BYTES_DTYPE, - bigframes.dtypes.STRING_DTYPE, - ) - supported_dtypes_types = tuple( - type(dtype) - for dtype in remote_function_supported_dtypes - if not isinstance(dtype, pandas.ArrowDtype) - ) - # Check ArrowDtype separately since multiple BigQuery types map to - # ArrowDtype, including BYTES and TIMESTAMP. - supported_arrow_types = tuple( - dtype.pyarrow_dtype - for dtype in remote_function_supported_dtypes - if isinstance(dtype, pandas.ArrowDtype) - ) - supported_dtypes_hints = tuple( - str(dtype) for dtype in remote_function_supported_dtypes - ) + # Check if the function is a remote function + if not hasattr(func, "bigframes_remote_function"): + raise ValueError("For axis=1 a remote function must be used.") - for dtype in self.dtypes: - if ( - # Not one of the pandas/numpy types. - not isinstance(dtype, supported_dtypes_types) - # And not one of the arrow types. - and not ( - isinstance(dtype, pandas.ArrowDtype) - and any( - dtype.pyarrow_dtype.equals(arrow_type) - for arrow_type in supported_arrow_types - ) + udf_input_dtypes = getattr(func, "input_dtypes") + if len(udf_input_dtypes) > 1: + # This is a special case where we are providing not-pandas-like + # extension. If the remote function can take multiple params + # then we assume that here the user intention is to use the + # column values of the dataframe as arguments to the function. + # For this to work the following condition must be true: + # 1. The number or input params in the function must be same + # as the number of columns in the dataframe + # 2. The dtypes of the columns in the dataframe must be + # compatible with the data types of the input params + # 3. The order of the columns in the dataframe must correspond + # to the order of the input params in the function + if len(udf_input_dtypes) != len(self.columns): + raise ValueError( + f"Remote function takes {len(udf_input_dtypes)} arguments but DataFrame has {len(self.columns)} columns." ) - ): - raise NotImplementedError( - f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1." - f" Supported dtypes are {supported_dtypes_hints}." + if udf_input_dtypes != tuple(self.dtypes.to_list()): + raise ValueError( + f"Remote function takes arguments of types {udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}." ) - # Check if the function is a remote function - if not hasattr(func, "bigframes_remote_function"): - raise ValueError("For axis=1 a remote function must be used.") + series_list = [self[col] for col in self.columns] + reprojected_series = bigframes.series.Series( + series_list[0]._block._force_reproject() + ) + result_series = reprojected_series._apply_nary_op( + ops.NaryRemoteFunctionOp(func=func), series_list[1:] + ) + else: + # Early check whether the dataframe dtypes are currently supported + # in the remote function + # NOTE: Keep in sync with the value converters used in the gcf code + # generated in remote_function_template.py + remote_function_supported_dtypes = ( + bigframes.dtypes.INT_DTYPE, + bigframes.dtypes.FLOAT_DTYPE, + bigframes.dtypes.BOOL_DTYPE, + bigframes.dtypes.BYTES_DTYPE, + bigframes.dtypes.STRING_DTYPE, + ) + supported_dtypes_types = tuple( + type(dtype) + for dtype in remote_function_supported_dtypes + if not isinstance(dtype, pandas.ArrowDtype) + ) + # Check ArrowDtype separately since multiple BigQuery types map to + # ArrowDtype, including BYTES and TIMESTAMP. + supported_arrow_types = tuple( + dtype.pyarrow_dtype + for dtype in remote_function_supported_dtypes + if isinstance(dtype, pandas.ArrowDtype) + ) + supported_dtypes_hints = tuple( + str(dtype) for dtype in remote_function_supported_dtypes + ) - # Serialize the rows as json values - block = self._get_block() - rows_as_json_series = bigframes.series.Series( - block._get_rows_as_json_values() - ) + for dtype in self.dtypes: + if ( + # Not one of the pandas/numpy types. + not isinstance(dtype, supported_dtypes_types) + # And not one of the arrow types. + and not ( + isinstance(dtype, pandas.ArrowDtype) + and any( + dtype.pyarrow_dtype.equals(arrow_type) + for arrow_type in supported_arrow_types + ) + ) + ): + raise NotImplementedError( + f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1." + f" Supported dtypes are {supported_dtypes_hints}." + ) - # Apply the function - result_series = rows_as_json_series._apply_unary_op( - ops.RemoteFunctionOp(func=func, apply_on_null=True) - ) + # Serialize the rows as json values + block = self._get_block() + rows_as_json_series = bigframes.series.Series( + block._get_rows_as_json_values() + ) + + # Apply the function + result_series = rows_as_json_series._apply_unary_op( + ops.RemoteFunctionOp(func=func, apply_on_null=True) + ) result_series.name = None # Return Series with materialized result so that any error in the remote diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index f24ba1b5fb..022028142f 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -1190,7 +1190,14 @@ def try_delattr(attr): rf_name ) ) - + func.input_dtypes = tuple( + [ + bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( + input_type + ) + for input_type in ibis_signature.input_types + ] + ) func.output_dtype = ( bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( ibis_signature.output_type @@ -1279,6 +1286,12 @@ def func(*ignored_args, **ignored_kwargs): signature=(ibis_signature.input_types, ibis_signature.output_type), ) func.bigframes_remote_function = str(routine_ref) # type: ignore + func.input_dtypes = tuple( # type: ignore + [ + bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype(input_type) + for input_type in ibis_signature.input_types + ] + ) func.output_dtype = bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( # type: ignore ibis_signature.output_type ) diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index 145c415ca0..4400288db2 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -659,6 +659,19 @@ def output_type(self, *input_types): raise AttributeError("output_dtype not defined") +@dataclasses.dataclass(frozen=True) +class NaryRemoteFunctionOp(NaryOp): + name: typing.ClassVar[str] = "nary_remote_function" + func: typing.Callable + + def output_type(self, *input_types): + # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method + if hasattr(self.func, "output_dtype"): + return self.func.output_dtype + else: + raise AttributeError("output_dtype not defined") + + add_op = AddOp() sub_op = SubOp() mul_op = create_binary_op(name="mul", type_signature=op_typing.BINARY_NUMERIC) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 303c74f1fd..edc09a2957 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -28,6 +28,8 @@ import test_utils.prefixer import bigframes +import bigframes.dataframe +import bigframes.dtypes import bigframes.functions.remote_function as bigframes_rf import bigframes.pandas as bpd import bigframes.series @@ -363,7 +365,8 @@ def test_remote_function_input_types(session, scalars_dfs, input_types): def add_one(x): return x + 1 - remote_add_one = session.remote_function(input_types, int)(add_one) + remote_add_one = session.remote_function(input_types, int, reuse=False)(add_one) + assert remote_add_one.input_dtypes == (bigframes.dtypes.INT_DTYPE,) scalars_df, scalars_pandas_df = scalars_dfs @@ -1622,7 +1625,9 @@ def analyze(row): } ) - analyze_remote = session.remote_function(bigframes.series.Series, str)(analyze) + analyze_remote = session.remote_function( + bigframes.series.Series, str, reuse=False + )(analyze) bf_result = ( scalars_df[columns].dropna().apply(analyze_remote, axis=1).to_pandas() @@ -1913,7 +1918,7 @@ def test_remote_function_named_perists_w_session_cleanup(): name = test_utils.prefixer.Prefixer("bigframes", "").create_prefix() # create an unnamed remote function in the session - @session.remote_function(name=name) + @session.remote_function(reuse=False, name=name) def foo(x: int) -> int: return x + 1 @@ -2004,3 +2009,72 @@ def foo_named(x: int) -> int: cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, foo_named ) + + +def test_df_apply_axis_1_multiple_params(session): + bf_df = bigframes.dataframe.DataFrame( + { + "Id": [1, 2, 3], + "Age": [22.5, 23, 23.5], + "Name": ["alpha", "beta", "gamma"], + } + ) + + expected_dtypes = ( + bigframes.dtypes.INT_DTYPE, + bigframes.dtypes.FLOAT_DTYPE, + bigframes.dtypes.STRING_DTYPE, + ) + + # Assert the dataframe dtypes + assert tuple(bf_df.dtypes) == expected_dtypes + + try: + + @session.remote_function([int, float, str], str, reuse=False) + def foo(x, y, z): + return f"I got {x}, {y} and {z}" + + assert getattr(foo, "input_dtypes") == expected_dtypes + + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", + ): + bf_df[["Id", "Age"]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) + + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + "I got 1, 22.5 and alpha", + "I got 2, 23 and beta", + "I got 3, 23.5 and gamma", + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, foo + ) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index d84d520988..dee8accb9b 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -21,6 +21,7 @@ import pytest import bigframes +import bigframes.dtypes import bigframes.exceptions from bigframes.functions import remote_function as rf from tests.system.utils import assert_pandas_df_equal @@ -658,6 +659,8 @@ def test_read_gbq_function_reads_udfs(session, bigquery_client, dataset_id): # It should point to the named routine and yield the expected results. assert square.bigframes_remote_function == str(routine.reference) + assert square.input_dtypes == (bigframes.dtypes.INT_DTYPE,) + assert square.output_dtype == bigframes.dtypes.INT_DTYPE src = {"x": [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5]} @@ -859,36 +862,40 @@ def add_ints(row): scalars_df[columns].apply(add_ints, axis=1) -@pytest.mark.parametrize( - ("column"), - [ - pytest.param("date_col"), - pytest.param("datetime_col"), - pytest.param("geography_col"), - pytest.param("numeric_col"), - pytest.param("time_col"), - pytest.param("timestamp_col"), - ], -) -def test_df_apply_axis_1_unsupported_dtype(scalars_dfs, column): - scalars_df, scalars_pandas_df = scalars_dfs - - # It doesn't matter if it is a remote function or not, the dtype check - # is done even before the function type check with axis=1 - def echo(row): - return row[column] +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_unsupported_dtype(session, scalars_dfs, dataset_id_permanent): + columns_with_not_supported_dtypes = [ + "date_col", + "datetime_col", + "geography_col", + "numeric_col", + "time_col", + "timestamp_col", + ] - # pandas works - scalars_pandas_df[[column]].apply(echo, axis=1) + scalars_df, scalars_pandas_df = scalars_dfs - dtype = scalars_df[column].dtype + def echo_len(row): + return len(row) - with pytest.raises( - NotImplementedError, - match=re.escape( - f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1. Supported dtypes are (" - ), - ), pytest.warns( - bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." - ): - scalars_df[[column]].apply(echo, axis=1) + echo_len_remote = session.remote_function( + bigframes.series.Series, + float, + dataset_id_permanent, + )(echo_len) + + for column in columns_with_not_supported_dtypes: + # pandas works + scalars_pandas_df[[column]].apply(echo_len, axis=1) + + dtype = scalars_df[column].dtype + + with pytest.raises( + NotImplementedError, + match=re.escape( + f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1. Supported dtypes are (" + ), + ), pytest.warns( + bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." + ): + scalars_df[[column]].apply(echo_len_remote, axis=1) From 8b82a14b0b1050a1752ad5a24c8856e3e4fc6ac7 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 23 Jul 2024 02:03:07 +0000 Subject: [PATCH 02/10] add doctest, make small test remote function sticky --- tests/system/small/test_remote_function.py | 1 + .../bigframes_vendored/pandas/core/frame.py | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index ef2486d6cd..ca30372b51 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -942,6 +942,7 @@ def echo_len(row): bigframes.series.Series, float, dataset_id_permanent, + name=get_rf_name(echo_len, is_row_processor=True), )(echo_len) for column in columns_with_not_supported_dtypes: diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index f8088f8060..5de798578c 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4357,6 +4357,23 @@ def apply(self, func, *, axis=0, args=(), **kwargs): 1 19 dtype: Int64 + You could also apply a remote function which accepts multiple parameters + to every row of a DataFrame by using it with `axis=1` if the DataFrame + has matching number of columns and data types. Note: This feature is + currently in **preview**. + + >>> @bpd.remote_function(reuse=False) + ... def foo(val1: int, val2: int) -> float: + ... result = 1 + ... result += val1 + ... result += val2/2 + ... return result + + >>> df[["col1", "col2"]].apply(foo, axis=1) + 0 3.5 + 1 5.0 + dtype: Float64 + Args: func (function): Function to apply to each column or row. From c2d56813e5d774b4d948768d8c94b440029e78f5 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 23 Jul 2024 21:37:50 +0000 Subject: [PATCH 03/10] handle single param non-row-processing functions --- bigframes/dataframe.py | 61 ++++++++-------- bigframes/functions/remote_function.py | 1 + tests/system/large/test_remote_function.py | 73 +++++++++++++++++++ .../bigframes_vendored/pandas/core/frame.py | 13 +++- 4 files changed, 117 insertions(+), 31 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 7ee1f2b0dd..0b966f9be1 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3432,36 +3432,8 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): if not hasattr(func, "bigframes_remote_function"): raise ValueError("For axis=1 a remote function must be used.") - udf_input_dtypes = getattr(func, "input_dtypes") - if len(udf_input_dtypes) > 1: - # This is a special case where we are providing not-pandas-like - # extension. If the remote function can take multiple params - # then we assume that here the user intention is to use the - # column values of the dataframe as arguments to the function. - # For this to work the following condition must be true: - # 1. The number or input params in the function must be same - # as the number of columns in the dataframe - # 2. The dtypes of the columns in the dataframe must be - # compatible with the data types of the input params - # 3. The order of the columns in the dataframe must correspond - # to the order of the input params in the function - if len(udf_input_dtypes) != len(self.columns): - raise ValueError( - f"Remote function takes {len(udf_input_dtypes)} arguments but DataFrame has {len(self.columns)} columns." - ) - if udf_input_dtypes != tuple(self.dtypes.to_list()): - raise ValueError( - f"Remote function takes arguments of types {udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}." - ) - - series_list = [self[col] for col in self.columns] - reprojected_series = bigframes.series.Series( - series_list[0]._block._force_reproject() - ) - result_series = reprojected_series._apply_nary_op( - ops.NaryRemoteFunctionOp(func=func), series_list[1:] - ) - else: + is_row_processor = getattr(func, "is_row_processor") + if is_row_processor: # Early check whether the dataframe dtypes are currently supported # in the remote function # NOTE: Keep in sync with the value converters used in the gcf code @@ -3517,6 +3489,35 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): result_series = rows_as_json_series._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=True) ) + else: + # This is a special case where we are providing not-pandas-like + # extension. If the remote function can take one or more params + # then we assume that here the user intention is to use the + # column values of the dataframe as arguments to the function. + # For this to work the following condition must be true: + # 1. The number or input params in the function must be same + # as the number of columns in the dataframe + # 2. The dtypes of the columns in the dataframe must be + # compatible with the data types of the input params + # 3. The order of the columns in the dataframe must correspond + # to the order of the input params in the function + udf_input_dtypes = getattr(func, "input_dtypes") + if len(udf_input_dtypes) != len(self.columns): + raise ValueError( + f"Remote function takes {len(udf_input_dtypes)} arguments but DataFrame has {len(self.columns)} columns." + ) + if udf_input_dtypes != tuple(self.dtypes.to_list()): + raise ValueError( + f"Remote function takes arguments of types {udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}." + ) + + series_list = [self[col] for col in self.columns] + reprojected_series = bigframes.series.Series( + series_list[0]._block._force_reproject() + ) + result_series = reprojected_series._apply_nary_op( + ops.NaryRemoteFunctionOp(func=func), series_list[1:] + ) result_series.name = None # Return Series with materialized result so that any error in the remote diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 1259a88624..324aa1b013 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -1208,6 +1208,7 @@ def try_delattr(attr): ibis_signature.output_type ) ) + func.is_row_processor = is_row_processor func.ibis_node = node # If a new remote function was created, update the cloud artifacts diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index edc09a2957..4d53bb8364 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1592,6 +1592,8 @@ def serialize_row(row): bigframes.series.Series, str, reuse=False )(serialize_row) + assert getattr(serialize_row_remote, "is_row_processor") + bf_result = scalars_df[columns].apply(serialize_row_remote, axis=1).to_pandas() pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) @@ -1629,6 +1631,8 @@ def analyze(row): bigframes.series.Series, str, reuse=False )(analyze) + assert getattr(analyze_remote, "is_row_processor") + bf_result = ( scalars_df[columns].dropna().apply(analyze_remote, axis=1).to_pandas() ) @@ -1732,6 +1736,8 @@ def serialize_row(row): bigframes.series.Series, str, reuse=False )(serialize_row) + assert getattr(serialize_row_remote, "is_row_processor") + bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() pd_result = pd_df.apply(serialize_row, axis=1) @@ -1792,6 +1798,8 @@ def float_parser(row): bigframes.series.Series, float, reuse=False )(float_parser) + assert getattr(float_parser_remote, "is_row_processor") + pd_result = pd_df.apply(float_parser, axis=1) bf_result = bf_df.apply(float_parser_remote, axis=1).to_pandas() @@ -2035,6 +2043,7 @@ def test_df_apply_axis_1_multiple_params(session): def foo(x, y, z): return f"I got {x}, {y} and {z}" + assert getattr(foo, "is_row_processor") is False assert getattr(foo, "input_dtypes") == expected_dtypes # Fails to apply on dataframe with incompatible number of columns @@ -2078,3 +2087,67 @@ def foo(x, y, z): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, foo ) + + +def test_df_apply_axis_1_single_param_non_series(session): + bf_df = bigframes.dataframe.DataFrame( + { + "Id": [1, 2, 3], + } + ) + + expected_dtypes = (bigframes.dtypes.INT_DTYPE,) + + # Assert the dataframe dtypes + assert tuple(bf_df.dtypes) == expected_dtypes + + try: + + @session.remote_function([int], str, reuse=False) + def foo(x): + return f"I got {x}" + + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes + + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 1 arguments but DataFrame has 0 columns\\.$", + ): + bf_df[[]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 1 arguments but DataFrame has 2 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) + + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Id=bf_df["Id"].astype("Float64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + "I got 1", + "I got 2", + "I got 3", + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, foo + ) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 5de798578c..2af9f776fa 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4376,7 +4376,18 @@ def apply(self, func, *, axis=0, args=(), **kwargs): Args: func (function): - Function to apply to each column or row. + Function to apply to each column or row. To apply to each row + (i.e. when `axis=1` is specified) the function can be of one of + the two types: + + 1. It can accept a single input parameter of + type `Series`, in which case each row is delivered to the + function as a pandas Series. + + 2. It can accept multiple + params, in which case column values for each row are delivered + to the function separately (as those parameters). For this to work the `DataFrame` must have same number of + columns with matching data types. axis ({index (0), columns (1)}): Axis along which the function is applied. Specify 0 or 'index' to apply function to each column. Specify 1 or 'columns' to From c133a5d86967d44e125e66c91355f4661c02580e Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 23 Jul 2024 23:24:07 +0000 Subject: [PATCH 04/10] reword the documentation a bit --- .../bigframes_vendored/pandas/core/frame.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 2af9f776fa..94f8a76769 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4380,14 +4380,15 @@ def apply(self, func, *, axis=0, args=(), **kwargs): (i.e. when `axis=1` is specified) the function can be of one of the two types: - 1. It can accept a single input parameter of - type `Series`, in which case each row is delivered to the - function as a pandas Series. - - 2. It can accept multiple - params, in which case column values for each row are delivered - to the function separately (as those parameters). For this to work the `DataFrame` must have same number of - columns with matching data types. + (1). It accepts a single input parameter of type `Series`, in + which case each row is delivered to the function as a pandas + Series. + + (2). It accept one or more parameters, in which case column values + are delivered to the function as separate arguments (mapping + to those parameters) for each row. For this to work the + `DataFrame` must have same number of columns and matching + data types. axis ({index (0), columns (1)}): Axis along which the function is applied. Specify 0 or 'index' to apply function to each column. Specify 1 or 'columns' to From faeb9a1cab26e5eebc9df7a67f29995bd9298f3a Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 24 Jul 2024 23:56:35 +0000 Subject: [PATCH 05/10] handle missing input dtype in read_gbq_function --- bigframes/exceptions.py | 4 +++ bigframes/functions/remote_function.py | 30 +++++++++++++++++----- tests/system/large/test_remote_function.py | 1 + tests/system/small/test_remote_function.py | 12 ++++++--- 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 1d31749760..6c5b66bc47 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -57,3 +57,7 @@ class QueryComplexityError(RuntimeError): class TimeTravelDisabledWarning(Warning): """A query was reattempted without time travel.""" + + +class UnknownDataTypeWarning(Warning): + """Data type is unknown.""" diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 324aa1b013..a54caf82b6 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -66,6 +66,7 @@ from bigframes import clients import bigframes.constants as constants import bigframes.core.compile.ibis_types +import bigframes.dtypes import bigframes.functions.remote_function_template logger = logging.getLogger(__name__) @@ -1292,12 +1293,29 @@ def func(*ignored_args, **ignored_kwargs): signature=(ibis_signature.input_types, ibis_signature.output_type), ) func.bigframes_remote_function = str(routine_ref) # type: ignore - func.input_dtypes = tuple( # type: ignore - [ - bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype(input_type) - for input_type in ibis_signature.input_types - ] - ) + + # set input bigframes data types + has_unknown_dtypes = False + function_input_dtypes = [] + for ibis_type in ibis_signature.input_types: + input_dtype = cast(bigframes.dtypes.Dtype, bigframes.dtypes.DEFAULT_DTYPE) + if ibis_type is None: + has_unknown_dtypes = True + else: + input_dtype = ( + bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( + ibis_type + ) + ) + function_input_dtypes.append(input_dtype) + if has_unknown_dtypes: + warnings.warn( + "The function has one or more missing input data types." + f" BigQuery DataFrames will assume default data type {bigframes.dtypes.DEFAULT_DTYPE} for them.", + category=bigframes.exceptions.UnknownDataTypeWarning, + ) + func.input_dtypes = function_input_dtypes # type: ignore + func.output_dtype = bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( # type: ignore ibis_signature.output_type ) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 4d53bb8364..095f7059cd 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -30,6 +30,7 @@ import bigframes import bigframes.dataframe import bigframes.dtypes +import bigframes.exceptions import bigframes.functions.remote_function as bigframes_rf import bigframes.pandas as bpd import bigframes.series diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index ca30372b51..8ecf9eb368 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -779,10 +779,14 @@ def test_read_gbq_function_enforces_explicit_types( str(both_types_specified.reference), session=session, ) - rf.read_gbq_function( - str(only_return_type_specified.reference), - session=session, - ) + with pytest.warns( + bigframes.exceptions.UnknownDataTypeWarning, + match="missing input data types.*assume default data type", + ): + rf.read_gbq_function( + str(only_return_type_specified.reference), + session=session, + ) with pytest.raises(ValueError): rf.read_gbq_function( str(only_arg_type_specified.reference), From d926cacf691b647b870897573ba62f1cfa5defa2 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 25 Jul 2024 07:30:16 +0000 Subject: [PATCH 06/10] restore input types as tuple in read_gbq_function --- bigframes/functions/remote_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index a54caf82b6..424be7a068 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -1314,7 +1314,7 @@ def func(*ignored_args, **ignored_kwargs): f" BigQuery DataFrames will assume default data type {bigframes.dtypes.DEFAULT_DTYPE} for them.", category=bigframes.exceptions.UnknownDataTypeWarning, ) - func.input_dtypes = function_input_dtypes # type: ignore + func.input_dtypes = tuple(function_input_dtypes) # type: ignore func.output_dtype = bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( # type: ignore ibis_signature.output_type From ea5663d3a8cfe1b67c1bdb3fe09c03c90c4189e9 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 31 Jul 2024 01:13:58 +0000 Subject: [PATCH 07/10] clear previous remote function attributes --- bigframes/functions/remote_function.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 424be7a068..ec1318e901 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -1154,7 +1154,9 @@ def try_delattr(attr): try_delattr("bigframes_cloud_function") try_delattr("bigframes_remote_function") + try_delattr("input_dtypes") try_delattr("output_dtype") + try_delattr("is_row_processor") try_delattr("ibis_node") ( From f57e1e3e6479aa47c0214f47752e082a6f2bf76f Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 31 Jul 2024 01:17:28 +0000 Subject: [PATCH 08/10] reword documentation for clarity --- bigframes/functions/remote_function.py | 4 ++-- bigframes/session/__init__.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index c026c92f3e..b3c6aee1b3 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -896,8 +896,8 @@ def remote_function( reuse (bool, Optional): Reuse the remote function if already exists. `True` by default, which will result in reusing an existing remote - function and corresponding cloud function (if any) that was - previously created for the same udf. + function and corresponding cloud function that was previously + created (if any) for the same udf. Please note that for an unnamed (i.e. created without an explicit `name` argument) remote function, the BigQuery DataFrames session id is attached in the cloud artifacts names. So for the diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index dfec83a56a..eab2abfb64 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1627,8 +1627,8 @@ def remote_function( reuse (bool, Optional): Reuse the remote function if already exists. `True` by default, which will result in reusing an existing remote - function and corresponding cloud function (if any) that was - previously created for the same udf. + function and corresponding cloud function that was previously + created (if any) for the same udf. Please note that for an unnamed (i.e. created without an explicit `name` argument) remote function, the BigQuery DataFrames session id is attached in the cloud artifacts names. So for the From be7988da9c232b5d97d3fc0baa184c51e8ac93a0 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 1 Aug 2024 06:30:25 +0000 Subject: [PATCH 09/10] add/update comments to explain force reproject --- bigframes/dataframe.py | 9 ++++++--- bigframes/series.py | 11 +++++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 830a6ce18d..9d3b153d3a 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3433,9 +3433,9 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: raise ValueError(f"na_action={na_action} not supported") # TODO(shobs): Support **kwargs - # Reproject as workaround to applying filter too late. This forces the filter - # to be applied before passing data to remote function, protecting from bad - # inputs causing errors. + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. reprojected_df = DataFrame(self._block._force_reproject()) return reprojected_df._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=(na_action is None)) @@ -3532,6 +3532,9 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ) series_list = [self[col] for col in self.columns] + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. reprojected_series = bigframes.series.Series( series_list[0]._block._force_reproject() ) diff --git a/bigframes/series.py b/bigframes/series.py index 1a5661529c..9e33801834 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1442,9 +1442,6 @@ def apply( ) -> Series: # TODO(shobs, b/274645634): Support convert_dtype, args, **kwargs # is actually a ternary op - # Reproject as workaround to applying filter too late. This forces the filter - # to be applied before passing data to remote function, protecting from bad - # inputs causing errors. if by_row not in ["compat", False]: raise ValueError("Param by_row must be one of 'compat' or False") @@ -1474,7 +1471,10 @@ def apply( ex.message += f"\n{_remote_function_recommendation_message}" raise - # We are working with remote function at this point + # We are working with remote function at this point. + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. reprojected_series = Series(self._block._force_reproject()) result_series = reprojected_series._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=True) @@ -1507,6 +1507,9 @@ def combine( ex.message += f"\n{_remote_function_recommendation_message}" raise + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. reprojected_series = Series(self._block._force_reproject()) result_series = reprojected_series._apply_binary_op( other, ops.BinaryRemoteFunctionOp(func=func) From 3f8ebcfab0d9ffceb3e6af4211a0ce3ae08f1be0 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 1 Aug 2024 07:02:52 +0000 Subject: [PATCH 10/10] make doctest example remote function with 3 params --- .../bigframes_vendored/pandas/core/frame.py | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index f58817ad6f..10565a2552 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4366,16 +4366,28 @@ def apply(self, func, *, axis=0, args=(), **kwargs): has matching number of columns and data types. Note: This feature is currently in **preview**. + >>> df = bpd.DataFrame({ + ... 'col1': [1, 2], + ... 'col2': [3, 4], + ... 'col3': [5, 5] + ... }) + >>> df + col1 col2 col3 + 0 1 3 5 + 1 2 4 5 + + [2 rows x 3 columns] + >>> @bpd.remote_function(reuse=False) - ... def foo(val1: int, val2: int) -> float: + ... def foo(x: int, y: int, z: int) -> float: ... result = 1 - ... result += val1 - ... result += val2/2 + ... result += x + ... result += y/z ... return result - >>> df[["col1", "col2"]].apply(foo, axis=1) - 0 3.5 - 1 5.0 + >>> df.apply(foo, axis=1) + 0 2.6 + 1 3.8 dtype: Float64 Args: