From 0d57e0d2e999a900813481eb24333792be0b4e64 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 25 Jul 2024 00:45:41 +0000 Subject: [PATCH 1/5] refactor: Internal flag to permit window ambiguity --- bigframes/core/__init__.py | 24 +++++++++++++++++++----- bigframes/core/validations.py | 3 ++- bigframes/exceptions.py | 4 ++++ bigframes/session/__init__.py | 5 +++++ 4 files changed, 30 insertions(+), 6 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index aa66129572..29b96e0b24 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -194,8 +194,15 @@ def promote_offsets(self, col_id: str) -> ArrayValue: """ Convenience function to promote copy of column offsets to a value column. Can be used to reset index. """ - if self.node.order_ambiguous and not self.session._strictly_ordered: - raise ValueError("Generating offsets not supported in unordered mode") + if self.node.order_ambiguous and not (self.session._strictly_ordered): + if not self.session.allow_ambiguity: + raise ValueError("Generating offsets not supported in unordered mode") + else: + warnings.warn( + "Window ordering may be ambiguous, this can cause unstable results.", + bigframes.exceptions.AmbiguousWindowWarning, + ) + return ArrayValue(nodes.PromoteOffsetsNode(child=self.node, col_id=col_id)) def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue: @@ -347,9 +354,16 @@ def project_window_op( # TODO: Support non-deterministic windowing if window_spec.row_bounded or not op.order_independent: if self.node.order_ambiguous and not self.session._strictly_ordered: - raise ValueError( - "Order-dependent windowed ops not supported in unordered mode" - ) + if not self.session.allow_ambiguity: + raise ValueError( + "Generating offsets not supported in unordered mode" + ) + else: + warnings.warn( + "Window ordering may be ambiguous, this can cause unstable results.", + bigframes.exceptions.AmbiguousWindowWarning, + ) + return ArrayValue( nodes.WindowOpNode( child=self.node, diff --git a/bigframes/core/validations.py b/bigframes/core/validations.py index dc22047e3b..06ee55a4d2 100644 --- a/bigframes/core/validations.py +++ b/bigframes/core/validations.py @@ -45,7 +45,8 @@ def guarded_meth(object: HasSession, *args, **kwargs): def enforce_ordered(object: HasSession, opname: str) -> None: - if not object._session._strictly_ordered: + session = object._session + if not (session._strictly_ordered or session.allow_ambiguity): raise bigframes.exceptions.OrderRequiredError( f"Op {opname} not supported when strict ordering is disabled. {bigframes.constants.FEEDBACK_LINK}" ) diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 1d31749760..7493364296 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -57,3 +57,7 @@ class QueryComplexityError(RuntimeError): class TimeTravelDisabledWarning(Warning): """A query was reattempted without time travel.""" + + +class AmbiguousWindowWarning(Warning): + """A query may produce nondeterministic results as the window may be ambiguously ordered.""" diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 9c953ee594..631f7de01e 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -307,6 +307,7 @@ def __init__( self._compiler = bigframes.core.compile.SQLCompiler( strict=context._strictly_ordered ) + self._allow_ambiguity = False self._remote_function_session = bigframes_rf._RemoteFunctionSession() @@ -371,6 +372,10 @@ def slot_millis_sum(self): """The sum of all slot time used by bigquery jobs in this session.""" return self._slot_millis_sum + @property + def allow_ambiguity(self) -> bool: + return self._allow_ambiguity + def _add_bytes_processed(self, amount: int): """Increment bytes_processed_sum by amount.""" self._bytes_processed_sum += amount From 3266a0df780cf8cd4dd39eab61235c4d57f0acd2 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 29 Jul 2024 22:25:22 +0000 Subject: [PATCH 2/5] allow windowing in partially ordered mode --- bigframes/core/blocks.py | 4 ++ bigframes/core/groupby/__init__.py | 38 ++++++++-------- bigframes/core/indexes/base.py | 8 ++-- bigframes/core/nodes.py | 42 +++++++++++++++++ bigframes/core/validations.py | 16 ++++++- bigframes/dataframe.py | 54 +++++++++++----------- bigframes/series.py | 52 ++++++++++----------- bigframes/session/__init__.py | 2 +- tests/system/small/test_unordered.py | 67 +++++++++++++++------------- 9 files changed, 173 insertions(+), 110 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 2d7c543678..8840d7918a 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -280,6 +280,10 @@ def index_name_to_col_id(self) -> typing.Mapping[Label, typing.Sequence[str]]: mapping[label] = (*mapping.get(label, ()), id) return mapping + @property + def explicitly_ordered(self) -> bool: + return self.expr.node.explicitly_ordered + def cols_matching_label(self, partial_label: Label) -> typing.Sequence[str]: """ Unlike label_to_col_id, this works with partial labels for multi-index. diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 02bf201ca0..2b80d0389e 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -109,7 +109,7 @@ def __getitem__( dropna=self._dropna, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def head(self, n: int = 5) -> df.DataFrame: block = self._block if self._dropna: @@ -235,25 +235,25 @@ def count(self) -> df.DataFrame: def nunique(self) -> df.DataFrame: return self._aggregate_all(agg_ops.nunique_op) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("cumsum") return self._apply_window_op(agg_ops.sum_op, numeric_only=True) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummin(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.min_op, numeric_only=numeric_only) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummax(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.max_op, numeric_only=numeric_only) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumprod(self, *args, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.product_op, numeric_only=True) - @validations.requires_strict_ordering() + @validations.requires_ordering() def shift(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -262,7 +262,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -271,7 +271,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -287,7 +287,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: block, window_spec, self._selected_cols, drop_null_groups=self._dropna ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), @@ -532,7 +532,7 @@ def __init__( def _session(self) -> core.Session: return self._block.session - @validations.requires_strict_ordering() + @validations.requires_ordering() def head(self, n: int = 5) -> series.Series: block = self._block if self._dropna: @@ -650,31 +650,31 @@ def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]: aggregate = agg - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumsum(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.sum_op, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumprod(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.product_op, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummax(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.max_op, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummin(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.min_op, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumcount(self, *args, **kwargs) -> series.Series: return ( self._apply_window_op( @@ -684,7 +684,7 @@ def cumcount(self, *args, **kwargs) -> series.Series: - 1 ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def shift(self, periods=1) -> series.Series: """Shift index by desired number of periods.""" window = window_specs.rows( @@ -694,7 +694,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -703,7 +703,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -723,7 +723,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: is_series=True, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 8b039707c2..0376e37f96 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -184,7 +184,7 @@ def empty(self) -> bool: return self.shape[0] == 0 @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def is_monotonic_increasing(self) -> bool: """ Return a boolean if the values are equal or increasing. @@ -198,7 +198,7 @@ def is_monotonic_increasing(self) -> bool: ) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def is_monotonic_decreasing(self) -> bool: """ Return a boolean if the values are equal or decreasing. @@ -348,7 +348,7 @@ def max(self) -> typing.Any: def min(self) -> typing.Any: return self._apply_aggregation(agg_ops.min_op) - @validations.requires_strict_ordering() + @validations.requires_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -361,7 +361,7 @@ def argmax(self) -> int: return typing.cast(int, series.Series(block.select_column(row_nums)).iloc[0]) - @validations.requires_strict_ordering() + @validations.requires_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index a979e07972..30edc7740a 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -135,6 +135,14 @@ def order_ambiguous(self) -> bool: """ ... + @property + @abc.abstractmethod + def explicitly_ordered(self) -> bool: + """ + Whether row ordering is potentially ambiguous. For example, ReadTable (without a primary key) could be ordered in different ways. + """ + ... + @functools.cached_property def total_variables(self) -> int: return self.variables_introduced + sum( @@ -180,6 +188,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: def schema(self) -> schemata.ArraySchema: return self.child.schema + @property + def explicitly_ordered(self) -> bool: + return self.child.explicitly_ordered + def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] ) -> BigFrameNode: @@ -212,6 +224,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: def order_ambiguous(self) -> bool: return True + @property + def explicitly_ordered(self) -> bool: + return False + def __hash__(self): return self._node_hash @@ -267,6 +283,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: def order_ambiguous(self) -> bool: return any(child.order_ambiguous for child in self.children) + @property + def explicitly_ordered(self) -> bool: + return all(child.explicitly_ordered for child in self.children) + def __hash__(self): return self._node_hash @@ -317,6 +337,10 @@ def variables_introduced(self) -> int: def order_ambiguous(self) -> bool: return False + @property + def explicitly_ordered(self) -> bool: + return True + def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] ) -> BigFrameNode: @@ -378,6 +402,10 @@ def relation_ops_created(self) -> int: def order_ambiguous(self) -> bool: return len(self.total_order_cols) == 0 + @property + def explicitly_ordered(self) -> bool: + return len(self.total_order_cols) > 0 + @functools.cached_property def variables_introduced(self) -> int: return len(self.schema.items) + 1 @@ -449,6 +477,12 @@ def hidden_columns(self) -> typing.Tuple[str, ...]: def order_ambiguous(self) -> bool: return not isinstance(self.ordering, orderings.TotalOrdering) + @property + def explicitly_ordered(self) -> bool: + return (self.ordering is not None) and len( + self.ordering.all_ordering_columns + ) > 0 + def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] ) -> BigFrameNode: @@ -523,6 +557,10 @@ def relation_ops_created(self) -> int: # Doesnt directly create any relational operations return 0 + @property + def explicitly_ordered(self) -> bool: + return True + @dataclass(frozen=True) class ReversedNode(UnaryNode): @@ -636,6 +674,10 @@ def variables_introduced(self) -> int: def order_ambiguous(self) -> bool: return False + @property + def explicitly_ordered(self) -> bool: + return True + @dataclass(frozen=True) class WindowOpNode(UnaryNode): diff --git a/bigframes/core/validations.py b/bigframes/core/validations.py index 06682142ba..f4c0bf7f4a 100644 --- a/bigframes/core/validations.py +++ b/bigframes/core/validations.py @@ -24,6 +24,7 @@ if TYPE_CHECKING: from bigframes import Session + from bigframes.core.blocks import Block class HasSession(Protocol): @@ -31,8 +32,12 @@ class HasSession(Protocol): def _session(self) -> Session: ... + @property + def _block(self) -> Block: + ... + -def requires_strict_ordering(suggestion: Optional[str] = None): +def requires_ordering(suggestion: Optional[str] = None): def decorator(meth): @functools.wraps(meth) def guarded_meth(object: HasSession, *args, **kwargs): @@ -48,8 +53,15 @@ def enforce_ordered( object: HasSession, opname: str, suggestion: Optional[str] = None ) -> None: session = object._session - if not (session._strictly_ordered or session.allow_ambiguity): + if session._strictly_ordered or not object._block.expr.node.order_ambiguous: + # No ambiguity for how to calculate ordering, so no error or warning + return None + if not session.allow_ambiguity: suggestion_substr = suggestion + " " if suggestion else "" raise bigframes.exceptions.OrderRequiredError( f"Op {opname} not supported when strict ordering is disabled. {suggestion_substr}{bigframes.constants.FEEDBACK_LINK}" ) + if not object._block.explicitly_ordered: + raise bigframes.exceptions.OrderRequiredError( + f"Op {opname} requires an ordering. Use .sort_values or .sort_index to provide an ordering. {bigframes.constants.FEEDBACK_LINK}" + ) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 717549316a..fdfa080be1 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -280,12 +280,12 @@ def loc(self) -> indexers.LocDataFrameIndexer: return indexers.LocDataFrameIndexer(self) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def iloc(self) -> indexers.ILocDataFrameIndexer: return indexers.ILocDataFrameIndexer(self) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def iat(self) -> indexers.IatDataFrameIndexer: return indexers.IatDataFrameIndexer(self) @@ -342,12 +342,12 @@ def _has_index(self) -> bool: return len(self._block.index_columns) > 0 @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def T(self) -> DataFrame: return DataFrame(self._get_block().transpose()) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def transpose(self) -> DataFrame: return self.T @@ -1294,11 +1294,11 @@ def _compute_dry_run(self) -> bigquery.QueryJob: def copy(self) -> DataFrame: return DataFrame(self._block) - @validations.requires_strict_ordering(bigframes.constants.SUGGEST_PEEK_PREVIEW) + @validations.requires_ordering(bigframes.constants.SUGGEST_PEEK_PREVIEW) def head(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[:n]) - @validations.requires_strict_ordering() + @validations.requires_ordering() def tail(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[-n:]) @@ -1538,7 +1538,7 @@ def rename_axis( labels = [mapper] return DataFrame(self._block.with_index_labels(labels)) - @validations.requires_strict_ordering() + @validations.requires_ordering() def equals(self, other: typing.Union[bigframes.series.Series, DataFrame]) -> bool: # Must be same object type, same column dtypes, and same label values if not isinstance(other, DataFrame): @@ -1936,7 +1936,7 @@ def _reindex_columns(self, columns): def reindex_like(self, other: DataFrame, *, validate: typing.Optional[bool] = None): return self.reindex(index=other.index, columns=other.columns, validate=validate) - @validations.requires_strict_ordering() + @validations.requires_ordering() @requires_index def interpolate(self, method: str = "linear") -> DataFrame: if method == "pad": @@ -1962,12 +1962,12 @@ def replace( lambda x: x.replace(to_replace=to_replace, value=value, regex=regex) ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) @@ -2233,16 +2233,16 @@ def agg( aggregate.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.agg) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def idxmin(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmin(self._block)) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def idxmax(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmax(self._block)) - @validations.requires_strict_ordering() + @validations.requires_ordering() def melt( self, id_vars: typing.Optional[typing.Iterable[typing.Hashable]] = None, @@ -2347,7 +2347,7 @@ def _pivot( return DataFrame(pivot_block) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def pivot( self, *, @@ -2362,7 +2362,7 @@ def pivot( return self._pivot(columns=columns, index=index, values=values) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def pivot_table( self, values: typing.Optional[ @@ -2462,7 +2462,7 @@ def _stack_multi(self, level: LevelsType = -1): return DataFrame(block) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def unstack(self, level: LevelsType = -1): if not utils.is_list_like(level): level = [level] @@ -2673,7 +2673,7 @@ def _perform_join_by_index( block, _ = self._block.join(other._block, how=how, block_identity_join=True) return DataFrame(block) - @validations.requires_strict_ordering() + @validations.requires_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_def = window_spec.rows( @@ -2683,7 +2683,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_def, self._block.value_columns ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window = window_spec.cumulative_rows(min_periods=min_periods) return bigframes.core.window.Window( @@ -2786,7 +2786,7 @@ def notna(self) -> DataFrame: notnull = notna notnull.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.notna) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumsum(self): is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2799,7 +2799,7 @@ def cumsum(self): window_spec.cumulative_rows(), ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumprod(self) -> DataFrame: is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2812,21 +2812,21 @@ def cumprod(self) -> DataFrame: window_spec.cumulative_rows(), ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummin(self) -> DataFrame: return self._apply_window_op( agg_ops.min_op, window_spec.cumulative_rows(), ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummax(self) -> DataFrame: return self._apply_window_op( agg_ops.max_op, window_spec.cumulative_rows(), ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def shift(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2834,7 +2834,7 @@ def shift(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def diff(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2842,7 +2842,7 @@ def diff(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def pct_change(self, periods: int = 1) -> DataFrame: # Future versions of pandas will not perfrom ffill automatically df = self.ffill() @@ -2860,7 +2860,7 @@ def _apply_window_op( ) return DataFrame(block.select_columns(result_ids)) - @validations.requires_strict_ordering() + @validations.requires_ordering() def sample( self, n: Optional[int] = None, @@ -3626,7 +3626,7 @@ def _optimize_query_complexity(self): _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries") - @validations.requires_strict_ordering() + @validations.requires_ordering() def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: if not isinstance(other, (DataFrame, bf_series.Series)): raise NotImplementedError( diff --git a/bigframes/series.py b/bigframes/series.py index 7c530b9612..7a07d5290a 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -93,12 +93,12 @@ def loc(self) -> bigframes.core.indexers.LocSeriesIndexer: return bigframes.core.indexers.LocSeriesIndexer(self) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def iloc(self) -> bigframes.core.indexers.IlocSeriesIndexer: return bigframes.core.indexers.IlocSeriesIndexer(self) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def iat(self) -> bigframes.core.indexers.IatSeriesIndexer: return bigframes.core.indexers.IatSeriesIndexer(self) @@ -163,7 +163,7 @@ def struct(self) -> structs.StructAccessor: return structs.StructAccessor(self._block) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def T(self) -> Series: return self.transpose() @@ -175,7 +175,7 @@ def _info_axis(self) -> indexes.Index: def _session(self) -> bigframes.Session: return self._get_block().expr.session - @validations.requires_strict_ordering() + @validations.requires_ordering() def transpose(self) -> Series: return self @@ -271,7 +271,7 @@ def equals( return False return block_ops.equals(self._block, other._block) - @validations.requires_strict_ordering() + @validations.requires_ordering() def reset_index( self, *, @@ -459,13 +459,13 @@ def case_when(self, caselist) -> Series: ignore_self=True, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumsum(self) -> Series: return self._apply_window_op( agg_ops.sum_op, bigframes.core.window_spec.cumulative_rows() ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) @@ -473,30 +473,30 @@ def ffill(self, *, limit: typing.Optional[int] = None) -> Series: pad = ffill pad.__doc__ = inspect.getdoc(vendored_pandas_series.Series.ffill) - @validations.requires_strict_ordering() + @validations.requires_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummax(self) -> Series: return self._apply_window_op( agg_ops.max_op, bigframes.core.window_spec.cumulative_rows() ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummin(self) -> Series: return self._apply_window_op( agg_ops.min_op, bigframes.core.window_spec.cumulative_rows() ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumprod(self) -> Series: return self._apply_window_op( agg_ops.product_op, bigframes.core.window_spec.cumulative_rows() ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def shift(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -504,7 +504,7 @@ def shift(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def diff(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -512,13 +512,13 @@ def diff(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def pct_change(self, periods: int = 1) -> Series: # Future versions of pandas will not perfrom ffill automatically series = self.ffill() return Series(block_ops.pct_change(series._block, periods=periods)) - @validations.requires_strict_ordering() + @validations.requires_ordering() def rank( self, axis=0, @@ -610,7 +610,7 @@ def _mapping_replace(self, mapping: dict[typing.Hashable, typing.Hashable]): ) return Series(block.select_column(result)) - @validations.requires_strict_ordering() + @validations.requires_ordering() @requires_index def interpolate(self, method: str = "linear") -> Series: if method == "pad": @@ -633,11 +633,11 @@ def dropna( result = result.reset_index() return Series(result) - @validations.requires_strict_ordering(bigframes.constants.SUGGEST_PEEK_PREVIEW) + @validations.requires_ordering(bigframes.constants.SUGGEST_PEEK_PREVIEW) def head(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[0:n]) - @validations.requires_strict_ordering() + @validations.requires_ordering() def tail(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[-n:]) @@ -1138,7 +1138,7 @@ def clip(self, lower, upper): ) return Series(block.select_column(result_id).with_column_labels([self.name])) - @validations.requires_strict_ordering() + @validations.requires_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1151,7 +1151,7 @@ def argmax(self) -> int: scalars.Scalar, Series(block.select_column(row_nums)).iloc[0] ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1217,14 +1217,14 @@ def idxmin(self) -> blocks.Label: return indexes.Index(block).to_pandas()[0] @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def is_monotonic_increasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_increasing(self._value_column) ) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def is_monotonic_decreasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_decreasing(self._value_column) @@ -1332,7 +1332,7 @@ def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series: block = block.order_by(ordering) return Series(block) - @validations.requires_strict_ordering() + @validations.requires_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = bigframes.core.window_spec.rows( @@ -1342,7 +1342,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_spec, self._block.value_columns, is_series=True ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window_spec = bigframes.core.window_spec.cumulative_rows( min_periods=min_periods @@ -1612,7 +1612,7 @@ def drop_duplicates(self, *, keep: str = "first") -> Series: block = block_ops.drop_duplicates(self._block, (self._value_column,), keep) return Series(block) - @validations.requires_strict_ordering() + @validations.requires_ordering() def unique(self) -> Series: return self.drop_duplicates() @@ -1784,7 +1784,7 @@ def map( result_df = self_df.join(map_df, on="series") return result_df[self.name] - @validations.requires_strict_ordering() + @validations.requires_ordering() def sample( self, n: Optional[int] = None, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 4a8ce57a7d..a9e5bee745 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -313,7 +313,7 @@ def __init__( self._compiler = bigframes.core.compile.SQLCompiler( strict=self._strictly_ordered ) - self._allow_ambiguity = False + self._allow_ambiguity = not self._strictly_ordered self._remote_function_session = bigframes_rf._RemoteFunctionSession() diff --git a/tests/system/small/test_unordered.py b/tests/system/small/test_unordered.py index 2e97078ef5..f4f38bb813 100644 --- a/tests/system/small/test_unordered.py +++ b/tests/system/small/test_unordered.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import warnings + import pandas as pd import pyarrow as pa import pytest @@ -116,37 +118,6 @@ def test_unordered_drop_duplicates(unordered_session, keep): assert_pandas_df_equal(bf_result.to_pandas(), pd_result, ignore_order=True) -@pytest.mark.parametrize( - ("function"), - [ - pytest.param( - lambda x: x.cumsum(), - id="cumsum", - ), - pytest.param( - lambda x: x.idxmin(), - id="idxmin", - ), - pytest.param( - lambda x: x.a.iloc[1::2], - id="series_iloc", - ), - pytest.param( - lambda x: x.head(3), - id="head", - ), - ], -) -def test_unordered_mode_blocks_windowing(unordered_session, function): - pd_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, dtype=pd.Int64Dtype()) - df = bpd.DataFrame(pd_df, session=unordered_session) - with pytest.raises( - bigframes.exceptions.OrderRequiredError, - match=r"Op.*not supported when strict ordering is disabled", - ): - function(df) - - def test_unordered_mode_cache_preserves_order(unordered_session): pd_df = pd.DataFrame( {"a": [1, 2, 3, 4, 5, 6], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype() @@ -159,3 +130,37 @@ def test_unordered_mode_cache_preserves_order(unordered_session): # B is unique so unstrict order mode result here should be equivalent to strictly ordered assert_pandas_df_equal(bf_result, pd_result, ignore_order=False) + + +def test_unordered_mode_no_ordering_error(unordered_session): + pd_df = pd.DataFrame( + {"a": [1, 2, 3, 4, 5, 1], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype() + ) + pd_df.index = pd_df.index.astype(pd.Int64Dtype()) + df = bpd.DataFrame(pd_df, session=unordered_session) + + with pytest.raises(bigframes.exceptions.OrderRequiredError): + df.merge(df, on="a").head(3) + + +def test_unordered_mode_ambiguity_warning(unordered_session): + pd_df = pd.DataFrame( + {"a": [1, 2, 3, 4, 5, 1], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype() + ) + pd_df.index = pd_df.index.astype(pd.Int64Dtype()) + df = bpd.DataFrame(pd_df, session=unordered_session) + + with pytest.warns(bigframes.exceptions.AmbiguousWindowWarning): + df.merge(df, on="a").sort_values("b_x").head(3) + + +def test_unordered_mode_no_ambiguity_warning(unordered_session): + pd_df = pd.DataFrame( + {"a": [1, 2, 3, 4, 5, 1], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype() + ) + pd_df.index = pd_df.index.astype(pd.Int64Dtype()) + df = bpd.DataFrame(pd_df, session=unordered_session) + + with warnings.catch_warnings(): + warnings.simplefilter("error") + df.groupby("a").head(3) From 84f4d90ce6f24d1a34dec6101ad728f62857f789 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 2 Aug 2024 20:36:10 +0000 Subject: [PATCH 3/5] remove windowing blocked tests --- tests/system/small/test_unordered.py | 31 ---------------------------- 1 file changed, 31 deletions(-) diff --git a/tests/system/small/test_unordered.py b/tests/system/small/test_unordered.py index 00a8e1fb78..9f85ec99f9 100644 --- a/tests/system/small/test_unordered.py +++ b/tests/system/small/test_unordered.py @@ -139,37 +139,6 @@ def test_unordered_merge(unordered_session): assert_pandas_df_equal(bf_result.to_pandas(), pd_result, ignore_order=True) -@pytest.mark.parametrize( - ("function"), - [ - pytest.param( - lambda x: x.cumsum(), - id="cumsum", - ), - pytest.param( - lambda x: x.idxmin(), - id="idxmin", - ), - pytest.param( - lambda x: x.a.iloc[1::2], - id="series_iloc", - ), - pytest.param( - lambda x: x.head(3), - id="head", - ), - ], -) -def test_unordered_mode_blocks_windowing(unordered_session, function): - pd_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, dtype=pd.Int64Dtype()) - df = bpd.DataFrame(pd_df, session=unordered_session) - with pytest.raises( - bigframes.exceptions.OrderRequiredError, - match=r"Op.*not supported when strict ordering is disabled", - ): - function(df) - - def test_unordered_mode_cache_preserves_order(unordered_session): pd_df = pd.DataFrame( {"a": [1, 2, 3, 4, 5, 6], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype() From e544d3adb1fc9f7d64707eff35943f15316edeb4 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 2 Aug 2024 20:50:20 +0000 Subject: [PATCH 4/5] rename session.allow_ambiguity to private property --- bigframes/core/__init__.py | 4 ++-- bigframes/core/validations.py | 2 +- bigframes/session/__init__.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 29b96e0b24..d9222fd81f 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -195,7 +195,7 @@ def promote_offsets(self, col_id: str) -> ArrayValue: Convenience function to promote copy of column offsets to a value column. Can be used to reset index. """ if self.node.order_ambiguous and not (self.session._strictly_ordered): - if not self.session.allow_ambiguity: + if not self.session._allows_ambiguity: raise ValueError("Generating offsets not supported in unordered mode") else: warnings.warn( @@ -354,7 +354,7 @@ def project_window_op( # TODO: Support non-deterministic windowing if window_spec.row_bounded or not op.order_independent: if self.node.order_ambiguous and not self.session._strictly_ordered: - if not self.session.allow_ambiguity: + if not self.session._allows_ambiguity: raise ValueError( "Generating offsets not supported in unordered mode" ) diff --git a/bigframes/core/validations.py b/bigframes/core/validations.py index f4c0bf7f4a..9c03ddb930 100644 --- a/bigframes/core/validations.py +++ b/bigframes/core/validations.py @@ -56,7 +56,7 @@ def enforce_ordered( if session._strictly_ordered or not object._block.expr.node.order_ambiguous: # No ambiguity for how to calculate ordering, so no error or warning return None - if not session.allow_ambiguity: + if not session._allows_ambiguity: suggestion_substr = suggestion + " " if suggestion else "" raise bigframes.exceptions.OrderRequiredError( f"Op {opname} not supported when strict ordering is disabled. {suggestion_substr}{bigframes.constants.FEEDBACK_LINK}" diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 45a0a7c623..dc1da488a1 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -380,7 +380,7 @@ def slot_millis_sum(self): return self._slot_millis_sum @property - def allow_ambiguity(self) -> bool: + def _allows_ambiguity(self) -> bool: return self._allow_ambiguity def _add_bytes_processed(self, amount: int): From 7c0464ad603da363173484fbd9eea04b3e040027 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 2 Aug 2024 23:24:49 +0000 Subject: [PATCH 5/5] unordered mode now called partial ordering mode everywhere --- bigframes/core/__init__.py | 6 ++++-- bigframes/core/compile/compiled.py | 2 +- tests/system/small/test_dataframe.py | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index d9222fd81f..2e9b5fa994 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -196,7 +196,9 @@ def promote_offsets(self, col_id: str) -> ArrayValue: """ if self.node.order_ambiguous and not (self.session._strictly_ordered): if not self.session._allows_ambiguity: - raise ValueError("Generating offsets not supported in unordered mode") + raise ValueError( + "Generating offsets not supported in partial ordering mode" + ) else: warnings.warn( "Window ordering may be ambiguous, this can cause unstable results.", @@ -356,7 +358,7 @@ def project_window_op( if self.node.order_ambiguous and not self.session._strictly_ordered: if not self.session._allows_ambiguity: raise ValueError( - "Generating offsets not supported in unordered mode" + "Generating offsets not supported in partial ordering mode" ) else: warnings.warn( diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index c822dd331c..538789f9d7 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -263,7 +263,7 @@ def to_sql( ordered: bool = False, ) -> str: if offset_column or ordered: - raise ValueError("Cannot produce sorted sql in unordered mode") + raise ValueError("Cannot produce sorted sql in partial ordering mode") sql = ibis_bigquery.Backend().compile( self._to_ibis_expr( col_id_overrides=col_id_overrides, diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 3a7eff621f..d838251dca 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2273,7 +2273,7 @@ def test_series_binop_add_different_table( def test_join_same_table(scalars_dfs_maybe_ordered, how): bf_df, pd_df = scalars_dfs_maybe_ordered if not bf_df._session._strictly_ordered and how == "cross": - pytest.skip("Cross join not supported in unordered mode.") + pytest.skip("Cross join not supported in partial ordering mode.") bf_df_a = bf_df.set_index("int64_too")[["string_col", "int64_col"]] bf_df_a = bf_df_a.sort_index()