Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: Allow windowing in 'partial' ordering mode #861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions 26 bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,17 @@ def promote_offsets(self, col_id: str) -> ArrayValue:
"""
Convenience function to promote copy of column offsets to a value column. Can be used to reset index.
"""
if self.node.order_ambiguous and not self.session._strictly_ordered:
raise ValueError("Generating offsets not supported in unordered mode")
if self.node.order_ambiguous and not (self.session._strictly_ordered):
if not self.session._allows_ambiguity:
raise ValueError(
"Generating offsets not supported in partial ordering mode"
)
else:
warnings.warn(
"Window ordering may be ambiguous, this can cause unstable results.",
bigframes.exceptions.AmbiguousWindowWarning,
)

return ArrayValue(nodes.PromoteOffsetsNode(child=self.node, col_id=col_id))

def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
Expand Down Expand Up @@ -347,9 +356,16 @@ def project_window_op(
# TODO: Support non-deterministic windowing
if window_spec.row_bounded or not op.order_independent:
if self.node.order_ambiguous and not self.session._strictly_ordered:
raise ValueError(
"Order-dependent windowed ops not supported in unordered mode"
)
if not self.session._allows_ambiguity:
raise ValueError(
"Generating offsets not supported in partial ordering mode"
)
else:
warnings.warn(
"Window ordering may be ambiguous, this can cause unstable results.",
bigframes.exceptions.AmbiguousWindowWarning,
)

return ArrayValue(
nodes.WindowOpNode(
child=self.node,
Expand Down
4 changes: 4 additions & 0 deletions 4 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ def index_name_to_col_id(self) -> typing.Mapping[Label, typing.Sequence[str]]:
mapping[label] = (*mapping.get(label, ()), id)
return mapping

@property
def explicitly_ordered(self) -> bool:
return self.expr.node.explicitly_ordered

def cols_matching_label(self, partial_label: Label) -> typing.Sequence[str]:
"""
Unlike label_to_col_id, this works with partial labels for multi-index.
Expand Down
2 changes: 1 addition & 1 deletion 2 bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def to_sql(
ordered: bool = False,
) -> str:
if offset_column or ordered:
raise ValueError("Cannot produce sorted sql in unordered mode")
raise ValueError("Cannot produce sorted sql in partial ordering mode")
sql = ibis_bigquery.Backend().compile(
self._to_ibis_expr(
col_id_overrides=col_id_overrides,
Expand Down
38 changes: 19 additions & 19 deletions 38 bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def __getitem__(
dropna=self._dropna,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def head(self, n: int = 5) -> df.DataFrame:
block = self._block
if self._dropna:
Expand Down Expand Up @@ -235,25 +235,25 @@ def count(self) -> df.DataFrame:
def nunique(self) -> df.DataFrame:
return self._aggregate_all(agg_ops.nunique_op)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame:
if not numeric_only:
self._raise_on_non_numeric("cumsum")
return self._apply_window_op(agg_ops.sum_op, numeric_only=True)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cummin(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame:
return self._apply_window_op(agg_ops.min_op, numeric_only=numeric_only)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cummax(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame:
return self._apply_window_op(agg_ops.max_op, numeric_only=numeric_only)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cumprod(self, *args, **kwargs) -> df.DataFrame:
return self._apply_window_op(agg_ops.product_op, numeric_only=True)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def shift(self, periods=1) -> series.Series:
window = window_specs.rows(
grouping_keys=tuple(self._by_col_ids),
Expand All @@ -262,7 +262,7 @@ def shift(self, periods=1) -> series.Series:
)
return self._apply_window_op(agg_ops.ShiftOp(periods), window=window)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def diff(self, periods=1) -> series.Series:
window = window_specs.rows(
grouping_keys=tuple(self._by_col_ids),
Expand All @@ -271,7 +271,7 @@ def diff(self, periods=1) -> series.Series:
)
return self._apply_window_op(agg_ops.DiffOp(periods), window=window)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def rolling(self, window: int, min_periods=None) -> windows.Window:
# To get n size window, need current row and n-1 preceding rows.
window_spec = window_specs.rows(
Expand All @@ -287,7 +287,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window:
block, window_spec, self._selected_cols, drop_null_groups=self._dropna
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def expanding(self, min_periods: int = 1) -> windows.Window:
window_spec = window_specs.cumulative_rows(
grouping_keys=tuple(self._by_col_ids),
Expand Down Expand Up @@ -532,7 +532,7 @@ def __init__(
def _session(self) -> core.Session:
return self._block.session

@validations.requires_strict_ordering()
@validations.requires_ordering()
def head(self, n: int = 5) -> series.Series:
block = self._block
if self._dropna:
Expand Down Expand Up @@ -650,31 +650,31 @@ def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]:

aggregate = agg

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cumsum(self, *args, **kwargs) -> series.Series:
return self._apply_window_op(
agg_ops.sum_op,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cumprod(self, *args, **kwargs) -> series.Series:
return self._apply_window_op(
agg_ops.product_op,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cummax(self, *args, **kwargs) -> series.Series:
return self._apply_window_op(
agg_ops.max_op,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cummin(self, *args, **kwargs) -> series.Series:
return self._apply_window_op(
agg_ops.min_op,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def cumcount(self, *args, **kwargs) -> series.Series:
return (
self._apply_window_op(
Expand All @@ -684,7 +684,7 @@ def cumcount(self, *args, **kwargs) -> series.Series:
- 1
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def shift(self, periods=1) -> series.Series:
"""Shift index by desired number of periods."""
window = window_specs.rows(
Expand All @@ -694,7 +694,7 @@ def shift(self, periods=1) -> series.Series:
)
return self._apply_window_op(agg_ops.ShiftOp(periods), window=window)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def diff(self, periods=1) -> series.Series:
window = window_specs.rows(
grouping_keys=tuple(self._by_col_ids),
Expand All @@ -703,7 +703,7 @@ def diff(self, periods=1) -> series.Series:
)
return self._apply_window_op(agg_ops.DiffOp(periods), window=window)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def rolling(self, window: int, min_periods=None) -> windows.Window:
# To get n size window, need current row and n-1 preceding rows.
window_spec = window_specs.rows(
Expand All @@ -723,7 +723,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window:
is_series=True,
)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def expanding(self, min_periods: int = 1) -> windows.Window:
window_spec = window_specs.cumulative_rows(
grouping_keys=tuple(self._by_col_ids),
Expand Down
8 changes: 4 additions & 4 deletions 8 bigframes/core/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def empty(self) -> bool:
return self.shape[0] == 0

@property
@validations.requires_strict_ordering()
@validations.requires_ordering()
def is_monotonic_increasing(self) -> bool:
"""
Return a boolean if the values are equal or increasing.
Expand All @@ -198,7 +198,7 @@ def is_monotonic_increasing(self) -> bool:
)

@property
@validations.requires_strict_ordering()
@validations.requires_ordering()
def is_monotonic_decreasing(self) -> bool:
"""
Return a boolean if the values are equal or decreasing.
Expand Down Expand Up @@ -348,7 +348,7 @@ def max(self) -> typing.Any:
def min(self) -> typing.Any:
return self._apply_aggregation(agg_ops.min_op)

@validations.requires_strict_ordering()
@validations.requires_ordering()
def argmax(self) -> int:
block, row_nums = self._block.promote_offsets()
block = block.order_by(
Expand All @@ -361,7 +361,7 @@ def argmax(self) -> int:

return typing.cast(int, series.Series(block.select_column(row_nums)).iloc[0])

@validations.requires_strict_ordering()
@validations.requires_ordering()
def argmin(self) -> int:
block, row_nums = self._block.promote_offsets()
block = block.order_by(
Expand Down
42 changes: 42 additions & 0 deletions 42 bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ def order_ambiguous(self) -> bool:
"""
...

@property
@abc.abstractmethod
def explicitly_ordered(self) -> bool:
"""
Whether row ordering is potentially ambiguous. For example, ReadTable (without a primary key) could be ordered in different ways.
"""
...

@functools.cached_property
def total_variables(self) -> int:
return self.variables_introduced + sum(
Expand Down Expand Up @@ -180,6 +188,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def schema(self) -> schemata.ArraySchema:
return self.child.schema

@property
def explicitly_ordered(self) -> bool:
return self.child.explicitly_ordered

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
Expand Down Expand Up @@ -212,6 +224,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def order_ambiguous(self) -> bool:
return True

@property
def explicitly_ordered(self) -> bool:
return False

def __hash__(self):
return self._node_hash

Expand Down Expand Up @@ -267,6 +283,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def order_ambiguous(self) -> bool:
return any(child.order_ambiguous for child in self.children)

@property
def explicitly_ordered(self) -> bool:
return all(child.explicitly_ordered for child in self.children)

def __hash__(self):
return self._node_hash

Expand Down Expand Up @@ -317,6 +337,10 @@ def variables_introduced(self) -> int:
def order_ambiguous(self) -> bool:
return False

@property
def explicitly_ordered(self) -> bool:
return True

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
Expand Down Expand Up @@ -378,6 +402,10 @@ def relation_ops_created(self) -> int:
def order_ambiguous(self) -> bool:
return len(self.total_order_cols) == 0

@property
def explicitly_ordered(self) -> bool:
return len(self.total_order_cols) > 0

@functools.cached_property
def variables_introduced(self) -> int:
return len(self.schema.items) + 1
Expand Down Expand Up @@ -449,6 +477,12 @@ def hidden_columns(self) -> typing.Tuple[str, ...]:
def order_ambiguous(self) -> bool:
return not isinstance(self.ordering, orderings.TotalOrdering)

@property
def explicitly_ordered(self) -> bool:
return (self.ordering is not None) and len(
self.ordering.all_ordering_columns
) > 0

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
Expand Down Expand Up @@ -523,6 +557,10 @@ def relation_ops_created(self) -> int:
# Doesnt directly create any relational operations
return 0

@property
def explicitly_ordered(self) -> bool:
return True


@dataclass(frozen=True)
class ReversedNode(UnaryNode):
Expand Down Expand Up @@ -636,6 +674,10 @@ def variables_introduced(self) -> int:
def order_ambiguous(self) -> bool:
return False

@property
def explicitly_ordered(self) -> bool:
return True


@dataclass(frozen=True)
class WindowOpNode(UnaryNode):
Expand Down
17 changes: 15 additions & 2 deletions 17 bigframes/core/validations.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@

if TYPE_CHECKING:
from bigframes import Session
from bigframes.core.blocks import Block


class HasSession(Protocol):
@property
def _session(self) -> Session:
...

@property
def _block(self) -> Block:
...


def requires_strict_ordering(suggestion: Optional[str] = None):
def requires_ordering(suggestion: Optional[str] = None):
def decorator(meth):
@functools.wraps(meth)
def guarded_meth(object: HasSession, *args, **kwargs):
Expand All @@ -47,8 +52,16 @@ def guarded_meth(object: HasSession, *args, **kwargs):
def enforce_ordered(
object: HasSession, opname: str, suggestion: Optional[str] = None
) -> None:
if not object._session._strictly_ordered:
session = object._session
if session._strictly_ordered or not object._block.expr.node.order_ambiguous:
# No ambiguity for how to calculate ordering, so no error or warning
return None
if not session._allows_ambiguity:
suggestion_substr = suggestion + " " if suggestion else ""
raise bigframes.exceptions.OrderRequiredError(
f"Op {opname} not supported when strict ordering is disabled. {suggestion_substr}{bigframes.constants.FEEDBACK_LINK}"
)
if not object._block.explicitly_ordered:
raise bigframes.exceptions.OrderRequiredError(
f"Op {opname} requires an ordering. Use .sort_values or .sort_index to provide an ordering. {bigframes.constants.FEEDBACK_LINK}"
)
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.