Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: Add groupby.rank() #1433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions 37 bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import bigframes.core.expression as ex
import bigframes.core.ordering as ordering
import bigframes.core.window_spec as windows
import bigframes.dtypes
import bigframes.dtypes as dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
Expand Down Expand Up @@ -409,6 +410,8 @@ def rank(
method: str = "average",
na_option: str = "keep",
ascending: bool = True,
grouping_cols: tuple[str, ...] = (),
columns: tuple[str, ...] = (),
):
if method not in ["average", "min", "max", "first", "dense"]:
raise ValueError(
Expand All @@ -417,8 +420,8 @@ def rank(
if na_option not in ["keep", "top", "bottom"]:
raise ValueError("na_option must be one of 'keep', 'top', or 'bottom'")

columns = block.value_columns
labels = block.column_labels
columns = columns or tuple(col for col in block.value_columns)
labels = [block.col_id_to_label[id] for id in columns]
# Step 1: Calculate row numbers for each row
# Identify null values to be treated according to na_option param
rownum_col_ids = []
Expand All @@ -442,9 +445,13 @@ def rank(
block, rownum_id = block.apply_window_op(
col if na_option == "keep" else nullity_col_id,
agg_ops.dense_rank_op if method == "dense" else agg_ops.count_op,
window_spec=windows.unbound(ordering=window_ordering)
window_spec=windows.unbound(
grouping_keys=grouping_cols, ordering=window_ordering
)
if method == "dense"
else windows.rows(following=0, ordering=window_ordering),
else windows.rows(
following=0, ordering=window_ordering, grouping_keys=grouping_cols
),
skip_reproject_unsafe=(col != columns[-1]),
)
rownum_col_ids.append(rownum_id)
Expand All @@ -462,12 +469,32 @@ def rank(
block, result_id = block.apply_window_op(
rownum_col_ids[i],
agg_op,
window_spec=windows.unbound(grouping_keys=(columns[i],)),
window_spec=windows.unbound(grouping_keys=(columns[i], *grouping_cols)),
skip_reproject_unsafe=(i < (len(columns) - 1)),
)
post_agg_rownum_col_ids.append(result_id)
rownum_col_ids = post_agg_rownum_col_ids

# Pandas masks all values where any grouping column is null
# Note: we use pd.NA instead of float('nan')
if grouping_cols:
predicate = functools.reduce(
ops.and_op.as_expr,
[ops.notnull_op.as_expr(column_id) for column_id in grouping_cols],
)
block = block.project_exprs(
[
ops.where_op.as_expr(
ex.deref(col),
predicate,
ex.const(None),
)
for col in rownum_col_ids
],
labels=labels,
)
rownum_col_ids = list(block.value_columns[-len(rownum_col_ids) :])

# Step 3: post processing: mask null values and cast to float
if method in ["min", "max", "first", "dense"]:
# Pandas rank always produces Float64, so must cast for aggregation types that produce ints
Expand Down
1 change: 1 addition & 0 deletions 1 bigframes/core/compile/ibis_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ def literal_to_ibis_scalar(
)
# "correct" way would be to use ibis.array, but this produces invalid BQ SQL syntax
return tuple(literal)

if not pd.api.types.is_list_like(literal) and pd.isna(literal):
if ibis_dtype:
return bigframes_vendored.ibis.null().cast(ibis_dtype)
Expand Down
28 changes: 28 additions & 0 deletions 28 bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,20 @@ def median(self, numeric_only: bool = False, *, exact: bool = True) -> df.DataFr
return self.quantile(0.5)
return self._aggregate_all(agg_ops.median_op, numeric_only=True)

def rank(
self, method="average", ascending: bool = True, na_option: str = "keep"
) -> df.DataFrame:
return df.DataFrame(
block_ops.rank(
self._block,
method,
na_option,
ascending,
grouping_cols=tuple(self._by_col_ids),
columns=tuple(self._selected_cols),
)
)

def quantile(
self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False
) -> df.DataFrame:
Expand Down Expand Up @@ -574,6 +588,20 @@ def sum(self, *args) -> series.Series:
def mean(self, *args) -> series.Series:
return self._aggregate(agg_ops.mean_op)

def rank(
self, method="average", ascending: bool = True, na_option: str = "keep"
) -> series.Series:
return series.Series(
block_ops.rank(
self._block,
method,
na_option,
ascending,
grouping_cols=tuple(self._by_col_ids),
columns=(self._value_column,),
)
)

def median(
self,
*args,
Expand Down
134 changes: 133 additions & 1 deletion 134 tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pytest

import bigframes.pandas as bpd
from tests.system.utils import assert_pandas_df_equal
from tests.system.utils import assert_pandas_df_equal, skip_legacy_pandas

# =================
# DataFrame.groupby
Expand Down Expand Up @@ -94,6 +94,72 @@ def test_dataframe_groupby_quantile(scalars_df_index, scalars_pandas_df_index, q
)


@skip_legacy_pandas
@pytest.mark.parametrize(
("na_option", "method", "ascending"),
[
(
"keep",
"average",
True,
),
(
"top",
"min",
False,
),
(
"bottom",
"max",
False,
),
(
"top",
"first",
False,
),
(
"bottom",
"dense",
False,
),
],
)
def test_dataframe_groupby_rank(
scalars_df_index,
scalars_pandas_df_index,
na_option,
method,
ascending,
):
col_names = ["int64_too", "float64_col", "int64_col", "string_col"]
bf_result = (
scalars_df_index[col_names]
.groupby("string_col")
.rank(
na_option=na_option,
method=method,
ascending=ascending,
)
).to_pandas()
pd_result = (
(
scalars_pandas_df_index[col_names]
.groupby("string_col")
.rank(
na_option=na_option,
method=method,
ascending=ascending,
)
)
.astype("float64")
.astype("Float64")
)
pd.testing.assert_frame_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize(
("operator"),
[
Expand Down Expand Up @@ -534,6 +600,72 @@ def test_series_groupby_agg_list(scalars_df_index, scalars_pandas_df_index):
)


@skip_legacy_pandas
@pytest.mark.parametrize(
("na_option", "method", "ascending"),
[
(
"keep",
"average",
True,
),
(
"top",
"min",
False,
),
(
"bottom",
"max",
False,
),
(
"top",
"first",
False,
),
(
"bottom",
"dense",
False,
),
],
)
def test_series_groupby_rank(
scalars_df_index,
scalars_pandas_df_index,
na_option,
method,
ascending,
):
col_names = ["int64_col", "string_col"]
bf_result = (
scalars_df_index[col_names]
.groupby("string_col")["int64_col"]
.rank(
na_option=na_option,
method=method,
ascending=ascending,
)
).to_pandas()
pd_result = (
(
scalars_pandas_df_index[col_names]
.groupby("string_col")["int64_col"]
.rank(
na_option=na_option,
method=method,
ascending=ascending,
)
)
.astype("float64")
.astype("Float64")
)
pd.testing.assert_series_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize("dropna", [True, False])
def test_series_groupby_head(scalars_df_index, scalars_pandas_df_index, dropna):
bf_result = (
Expand Down
71 changes: 71 additions & 0 deletions 71 third_party/bigframes_vendored/pandas/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,77 @@ def var(
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def rank(
self,
method: str = "average",
ascending: bool = True,
na_option: str = "keep",
):
"""
Provide the rank of values within each group.

**Examples:**

>>> import bigframes.pandas as bpd
>>> import numpy as np
>>> bpd.options.display.progress_bar = None

>>> df = bpd.DataFrame(
... {
... "group": ["a", "a", "a", "a", "a", "b", "b", "b", "b", "b"],
... "value": [2, 4, 2, 3, 5, 1, 2, 4, 1, 5],
... }
... )
>>> df
group value
0 a 2
1 a 4
2 a 2
3 a 3
4 a 5
5 b 1
6 b 2
7 b 4
8 b 1
9 b 5
<BLANKLINE>
[10 rows x 2 columns]
>>> for method in ['average', 'min', 'max', 'dense', 'first']:
... df[f'{method}_rank'] = df.groupby('group')['value'].rank(method)
>>> df
group value average_rank min_rank max_rank dense_rank first_rank
0 a 2 1.5 1.0 2.0 1.0 1.0
1 a 4 4.0 4.0 4.0 3.0 4.0
2 a 2 1.5 1.0 2.0 1.0 2.0
3 a 3 3.0 3.0 3.0 2.0 3.0
4 a 5 5.0 5.0 5.0 4.0 5.0
5 b 1 1.5 1.0 2.0 1.0 1.0
6 b 2 3.0 3.0 3.0 2.0 3.0
7 b 4 4.0 4.0 4.0 3.0 4.0
8 b 1 1.5 1.0 2.0 1.0 2.0
9 b 5 5.0 5.0 5.0 4.0 5.0
<BLANKLINE>
[10 rows x 7 columns]

Args:
method ({'average', 'min', 'max', 'first', 'dense'}, default 'average'):
* average: average rank of group.
* min: lowest rank in group.
* max: highest rank in group.
* first: ranks assigned in order they appear in the array.
* dense: like 'min', but rank always increases by 1 between groups.
ascending (bool, default True):
False for ranks by high (1) to low (N).
na_option ({'keep', 'top', 'bottom'}, default 'keep'):
* keep: leave NA values where they are.
* top: smallest rank if ascending.
* bottom: smallest rank if descending.

Returns:
DataFrame with ranking of values within each group
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def skew(
self,
*,
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.