diff --git a/bigframes/core/reshape/__init__.py b/bigframes/core/reshape/__init__.py index 339ce7466a..dc61c3baad 100644 --- a/bigframes/core/reshape/__init__.py +++ b/bigframes/core/reshape/__init__.py @@ -20,6 +20,7 @@ import bigframes.core as core import bigframes.core.utils as utils import bigframes.dataframe +import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops import bigframes.series @@ -118,3 +119,35 @@ def cut( f"Only labels=False is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" ) return x._apply_window_op(agg_ops.CutOp(bins), window_spec=core.WindowSpec()) + + +def qcut( + x: bigframes.series.Series, + q: typing.Union[int, typing.Sequence[float]], + *, + labels: Optional[bool] = None, + duplicates: typing.Literal["drop", "error"] = "error", +) -> bigframes.series.Series: + if isinstance(q, int) and q <= 0: + raise ValueError("`q` should be a positive integer.") + + if labels is not False: + raise NotImplementedError( + f"Only labels=False is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" + ) + if duplicates != "drop": + raise NotImplementedError( + f"Only duplicates='drop' is supported in BigQuery DataFrames so far. {constants.FEEDBACK_LINK}" + ) + block = x._block + label = block.col_id_to_label[x._value_column] + block, nullity_id = block.apply_unary_op(x._value_column, ops.notnull_op) + block, result = block.apply_window_op( + x._value_column, + agg_ops.QcutOp(q), + window_spec=core.WindowSpec(grouping_keys=(nullity_id,)), + ) + block, result = block.apply_binary_op( + result, nullity_id, ops.partial_arg3(ops.where_op, None), result_label=label + ) + return bigframes.series.Series(block.select_column(result)) diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 23271e8220..465d188724 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -254,6 +254,53 @@ def handles_ties(self): return True +class QcutOp(WindowOp): + def __init__(self, quantiles: typing.Union[int, typing.Sequence[float]]): + self.name = f"qcut-{quantiles}" + self._quantiles = quantiles + + @numeric_op + def _as_ibis( + self, column: ibis_types.Column, window=None + ) -> ibis_types.IntegerValue: + if isinstance(self._quantiles, int): + quantiles_ibis = dtypes.literal_to_ibis_scalar(self._quantiles) + percent_ranks = typing.cast( + ibis_types.FloatingColumn, + _apply_window_if_present(column.percent_rank(), window), + ) + float_bucket = typing.cast( + ibis_types.FloatingColumn, (percent_ranks * quantiles_ibis) + ) + return float_bucket.ceil().clip(lower=_ibis_num(1)) - _ibis_num(1) + else: + percent_ranks = typing.cast( + ibis_types.FloatingColumn, + _apply_window_if_present(column.percent_rank(), window), + ) + out = ibis.case() + first_ibis_quantile = dtypes.literal_to_ibis_scalar(self._quantiles[0]) + out = out.when(percent_ranks < first_ibis_quantile, None) + for bucket_n in range(len(self._quantiles) - 1): + ibis_quantile = dtypes.literal_to_ibis_scalar( + self._quantiles[bucket_n + 1] + ) + out = out.when( + percent_ranks <= ibis_quantile, + dtypes.literal_to_ibis_scalar(bucket_n, force_dtype=Int64Dtype()), + ) + out = out.else_(None) + return out.end() + + @property + def skips_nulls(self): + return False + + @property + def handles_ties(self): + return True + + class NuniqueOp(AggregateOp): name = "nunique" @@ -491,3 +538,7 @@ def lookup_agg_func(key: str) -> AggregateOp: return _AGGREGATIONS_LOOKUP[key] else: raise ValueError(f"Unrecognize aggregate function: {key}") + + +def _ibis_num(number: float): + return typing.cast(ibis_types.NumericValue, ibis_types.literal(number)) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 5c1928e6f0..8d9726312f 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -134,6 +134,19 @@ def cut( cut.__doc__ = vendored_pandas_tile.cut.__doc__ +def qcut( + x: bigframes.series.Series, + q: int, + *, + labels: Optional[bool] = None, + duplicates: typing.Literal["drop", "error"] = "error", +) -> bigframes.series.Series: + return bigframes.core.reshape.qcut(x, q, labels=labels, duplicates=duplicates) + + +qcut.__doc__ = vendored_pandas_tile.qcut.__doc__ + + def merge( left: DataFrame, right: DataFrame, diff --git a/tests/system/small/test_pandas.py b/tests/system/small/test_pandas.py index a429c6551d..f8fa78587f 100644 --- a/tests/system/small/test_pandas.py +++ b/tests/system/small/test_pandas.py @@ -223,3 +223,28 @@ def test_cut(scalars_dfs): bf_result = bf_result.to_pandas() pd_result = pd_result.astype("Int64") pd.testing.assert_series_equal(bf_result, pd_result) + + +@pytest.mark.parametrize( + ("q",), + [ + (1,), + (2,), + (7,), + (32,), + ([0, 0.1, 0.3, 0.4, 0.9, 1.0],), + ([0.5, 0.9],), + ], +) +def test_qcut(scalars_dfs, q): + scalars_df, scalars_pandas_df = scalars_dfs + + pd_result = pd.qcut( + scalars_pandas_df["float64_col"], q, labels=False, duplicates="drop" + ) + bf_result = bpd.qcut(scalars_df["float64_col"], q, labels=False, duplicates="drop") + + bf_result = bf_result.to_pandas() + pd_result = pd_result.astype("Int64") + + pd.testing.assert_series_equal(bf_result, pd_result) diff --git a/third_party/bigframes_vendored/pandas/core/reshape/tile.py b/third_party/bigframes_vendored/pandas/core/reshape/tile.py index 4f5f2efef0..24ea655a5f 100644 --- a/third_party/bigframes_vendored/pandas/core/reshape/tile.py +++ b/third_party/bigframes_vendored/pandas/core/reshape/tile.py @@ -65,3 +65,33 @@ def cut( False : returns an ndarray of integers. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + +def qcut(x, q, *, labels=None, duplicates="error"): + """ + Quantile-based discretization function. + + Discretize variable into equal-sized buckets based on rank or based + on sample quantiles. For example 1000 values for 10 quantiles would + produce a Categorical object indicating quantile membership for each data point. + + Args: + x (Series): + The input Series to be binned. Must be 1-dimensional. + q (int or list-like of float): + Number of quantiles. 10 for deciles, 4 for quartiles, etc. Alternately + array of quantiles, e.g. [0, .25, .5, .75, 1.] for quartiles. + labels (None): + Used as labels for the resulting bins. Must be of the same length as + the resulting bins. If False, return only integer indicators of the + bins. If True, raises an error. + duplicates ({default 'raise', 'drop'}, optional): + If bin edges are not unique, raise ValueError or drop non-uniques. + + Returns: + Series: Categorical or Series of integers if labels is False + The return type (Categorical or Series) depends on the input: a Series + of type category if input is a Series else Categorical. Bins are + represented as categories when categorical data is returned. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)