diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 89ef5f525e..00a36b9c05 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -344,10 +344,12 @@ def project_window_op( never_skip_nulls: will disable null skipping for operators that would otherwise do so skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection """ - if not self.session._strictly_ordered: - # TODO: Support unbounded windows with aggregate ops and some row-order-independent analytic ops - # TODO: Support non-deterministic windowing - raise ValueError("Windowed ops not supported in unordered mode") + # TODO: Support non-deterministic windowing + if window_spec.row_bounded or not op.order_independent: + if not self.session._strictly_ordered: + raise ValueError( + "Order-dependent windowed ops not supported in unordered mode" + ) return ArrayValue( nodes.WindowOpNode( child=self.node, diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 2b23ccf0e4..11a5d43ba0 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -27,6 +27,7 @@ import bigframes.core.blocks as blocks import bigframes.core.ordering as order import bigframes.core.utils as utils +import bigframes.core.validations as validations import bigframes.core.window as windows import bigframes.core.window_spec as window_specs import bigframes.dataframe as df @@ -72,6 +73,10 @@ def __init__( if col_id not in self._by_col_ids ] + @property + def _session(self) -> core.Session: + return self._block.session + def __getitem__( self, key: typing.Union[ @@ -229,20 +234,25 @@ def count(self) -> df.DataFrame: def nunique(self) -> df.DataFrame: return self._aggregate_all(agg_ops.nunique_op) + @validations.requires_strict_ordering() def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("cumsum") return self._apply_window_op(agg_ops.sum_op, numeric_only=True) + @validations.requires_strict_ordering() def cummin(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.min_op, numeric_only=numeric_only) + @validations.requires_strict_ordering() def cummax(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.max_op, numeric_only=numeric_only) + @validations.requires_strict_ordering() def cumprod(self, *args, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.product_op, numeric_only=True) + @validations.requires_strict_ordering() def shift(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -251,6 +261,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) + @validations.requires_strict_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -259,6 +270,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -274,6 +286,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: block, window_spec, self._selected_cols, drop_null_groups=self._dropna ) + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), @@ -514,6 +527,10 @@ def __init__( self._value_name = value_name self._dropna = dropna # Applies to aggregations but not windowing + @property + def _session(self) -> core.Session: + return self._block.session + def head(self, n: int = 5) -> series.Series: block = self._block if self._dropna: @@ -631,26 +648,31 @@ def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]: aggregate = agg + @validations.requires_strict_ordering() def cumsum(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.sum_op, ) + @validations.requires_strict_ordering() def cumprod(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.product_op, ) + @validations.requires_strict_ordering() def cummax(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.max_op, ) + @validations.requires_strict_ordering() def cummin(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.min_op, ) + @validations.requires_strict_ordering() def cumcount(self, *args, **kwargs) -> series.Series: return ( self._apply_window_op( @@ -660,6 +682,7 @@ def cumcount(self, *args, **kwargs) -> series.Series: - 1 ) + @validations.requires_strict_ordering() def shift(self, periods=1) -> series.Series: """Shift index by desired number of periods.""" window = window_specs.rows( @@ -669,6 +692,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) + @validations.requires_strict_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -677,6 +701,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -696,6 +721,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: is_series=True, ) + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index cfb22929c8..696742180b 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -30,6 +30,7 @@ import bigframes.core.expression as ex import bigframes.core.ordering as order import bigframes.core.utils as utils +import bigframes.core.validations as validations import bigframes.dtypes import bigframes.formatting_helpers as formatter import bigframes.operations as ops @@ -114,6 +115,10 @@ def from_frame( index._linked_frame = frame return index + @property + def _session(self): + return self._block.session + @property def name(self) -> blocks.Label: names = self.names @@ -179,6 +184,7 @@ def empty(self) -> bool: return self.shape[0] == 0 @property + @validations.requires_strict_ordering() def is_monotonic_increasing(self) -> bool: """ Return a boolean if the values are equal or increasing. @@ -192,6 +198,7 @@ def is_monotonic_increasing(self) -> bool: ) @property + @validations.requires_strict_ordering() def is_monotonic_decreasing(self) -> bool: """ Return a boolean if the values are equal or decreasing. @@ -341,6 +348,7 @@ def max(self) -> typing.Any: def min(self) -> typing.Any: return self._apply_aggregation(agg_ops.min_op) + @validations.requires_strict_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -353,6 +361,7 @@ def argmax(self) -> int: return typing.cast(int, series.Series(block.select_column(row_nums)).iloc[0]) + @validations.requires_strict_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -424,6 +433,8 @@ def dropna(self, how: typing.Literal["all", "any"] = "any") -> Index: return Index(result) def drop_duplicates(self, *, keep: str = "first") -> Index: + if keep is not False: + validations.enforce_ordered(self, "drop_duplicates") block = block_ops.drop_duplicates(self._block, self._block.index_columns, keep) return Index(block) diff --git a/bigframes/core/validations.py b/bigframes/core/validations.py new file mode 100644 index 0000000000..dc22047e3b --- /dev/null +++ b/bigframes/core/validations.py @@ -0,0 +1,51 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""DataFrame is a two dimensional data structure.""" + +from __future__ import annotations + +import functools +from typing import Protocol, TYPE_CHECKING + +import bigframes.constants +import bigframes.exceptions + +if TYPE_CHECKING: + from bigframes import Session + + +class HasSession(Protocol): + @property + def _session(self) -> Session: + ... + + +def requires_strict_ordering(): + def decorator(meth): + @functools.wraps(meth) + def guarded_meth(object: HasSession, *args, **kwargs): + enforce_ordered(object, meth.__name__) + return meth(object, *args, **kwargs) + + return guarded_meth + + return decorator + + +def enforce_ordered(object: HasSession, opname: str) -> None: + if not object._session._strictly_ordered: + raise bigframes.exceptions.OrderRequiredError( + f"Op {opname} not supported when strict ordering is disabled. {bigframes.constants.FEEDBACK_LINK}" + ) diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index 71e88a4c3d..57c57b451a 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -152,3 +152,13 @@ class WindowSpec: ordering: Tuple[orderings.OrderingExpression, ...] = tuple() bounds: Union[RowsWindowBounds, RangeWindowBounds, None] = None min_periods: int = 0 + + @property + def row_bounded(self): + """ + Whether the window is bounded by row offsets. + + This is relevant for determining whether the window requires a total order + to calculate deterministically. + """ + return isinstance(self.bounds, RowsWindowBounds) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 274e176dd5..4dcc4414ed 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -62,6 +62,7 @@ import bigframes.core.indexes as indexes import bigframes.core.ordering as order import bigframes.core.utils as utils +import bigframes.core.validations as validations import bigframes.core.window import bigframes.core.window_spec as window_spec import bigframes.dtypes @@ -278,10 +279,12 @@ def loc(self) -> indexers.LocDataFrameIndexer: return indexers.LocDataFrameIndexer(self) @property + @validations.requires_strict_ordering() def iloc(self) -> indexers.ILocDataFrameIndexer: return indexers.ILocDataFrameIndexer(self) @property + @validations.requires_strict_ordering() def iat(self) -> indexers.IatDataFrameIndexer: return indexers.IatDataFrameIndexer(self) @@ -338,10 +341,12 @@ def _has_index(self) -> bool: return len(self._block.index_columns) > 0 @property + @validations.requires_strict_ordering() def T(self) -> DataFrame: return DataFrame(self._get_block().transpose()) @requires_index + @validations.requires_strict_ordering() def transpose(self) -> DataFrame: return self.T @@ -1293,6 +1298,7 @@ def copy(self) -> DataFrame: def head(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[:n]) + @validations.requires_strict_ordering() def tail(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[-n:]) @@ -1335,6 +1341,8 @@ def nlargest( ) -> DataFrame: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + validations.enforce_ordered(self, "nlargest") column_ids = self._sql_names(columns) return DataFrame(block_ops.nlargest(self._block, n, column_ids, keep=keep)) @@ -1346,6 +1354,8 @@ def nsmallest( ) -> DataFrame: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + validations.enforce_ordered(self, "nlargest") column_ids = self._sql_names(columns) return DataFrame(block_ops.nsmallest(self._block, n, column_ids, keep=keep)) @@ -1528,6 +1538,7 @@ def rename_axis( labels = [mapper] return DataFrame(self._block.with_index_labels(labels)) + @validations.requires_strict_ordering() def equals(self, other: typing.Union[bigframes.series.Series, DataFrame]) -> bool: # Must be same object type, same column dtypes, and same label values if not isinstance(other, DataFrame): @@ -1925,6 +1936,7 @@ def _reindex_columns(self, columns): def reindex_like(self, other: DataFrame, *, validate: typing.Optional[bool] = None): return self.reindex(index=other.index, columns=other.columns, validate=validate) + @validations.requires_strict_ordering() @requires_index def interpolate(self, method: str = "linear") -> DataFrame: if method == "pad": @@ -1950,10 +1962,12 @@ def replace( lambda x: x.replace(to_replace=to_replace, value=value, regex=regex) ) + @validations.requires_strict_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) + @validations.requires_strict_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) @@ -2219,13 +2233,16 @@ def agg( aggregate.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.agg) @requires_index + @validations.requires_strict_ordering() def idxmin(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmin(self._block)) @requires_index + @validations.requires_strict_ordering() def idxmax(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmax(self._block)) + @validations.requires_strict_ordering() def melt( self, id_vars: typing.Optional[typing.Iterable[typing.Hashable]] = None, @@ -2330,6 +2347,7 @@ def _pivot( return DataFrame(pivot_block) @requires_index + @validations.requires_strict_ordering() def pivot( self, *, @@ -2344,6 +2362,7 @@ def pivot( return self._pivot(columns=columns, index=index, values=values) @requires_index + @validations.requires_strict_ordering() def pivot_table( self, values: typing.Optional[ @@ -2443,6 +2462,7 @@ def _stack_multi(self, level: LevelsType = -1): return DataFrame(block) @requires_index + @validations.requires_strict_ordering() def unstack(self, level: LevelsType = -1): if not utils.is_list_like(level): level = [level] @@ -2653,6 +2673,7 @@ def _perform_join_by_index( block, _ = self._block.join(other._block, how=how, block_identity_join=True) return DataFrame(block) + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_def = window_spec.rows( @@ -2662,6 +2683,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_def, self._block.value_columns ) + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window = window_spec.cumulative_rows(min_periods=min_periods) return bigframes.core.window.Window( @@ -2764,6 +2786,7 @@ def notna(self) -> DataFrame: notnull = notna notnull.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.notna) + @validations.requires_strict_ordering() def cumsum(self): is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2776,6 +2799,7 @@ def cumsum(self): window_spec.cumulative_rows(), ) + @validations.requires_strict_ordering() def cumprod(self) -> DataFrame: is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2788,18 +2812,21 @@ def cumprod(self) -> DataFrame: window_spec.cumulative_rows(), ) + @validations.requires_strict_ordering() def cummin(self) -> DataFrame: return self._apply_window_op( agg_ops.min_op, window_spec.cumulative_rows(), ) + @validations.requires_strict_ordering() def cummax(self) -> DataFrame: return self._apply_window_op( agg_ops.max_op, window_spec.cumulative_rows(), ) + @validations.requires_strict_ordering() def shift(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2807,6 +2834,7 @@ def shift(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) + @validations.requires_strict_ordering() def diff(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2814,6 +2842,7 @@ def diff(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) + @validations.requires_strict_ordering() def pct_change(self, periods: int = 1) -> DataFrame: # Future versions of pandas will not perfrom ffill automatically df = self.ffill() @@ -2831,6 +2860,7 @@ def _apply_window_op( ) return DataFrame(block.select_columns(result_ids)) + @validations.requires_strict_ordering() def sample( self, n: Optional[int] = None, @@ -3486,6 +3516,8 @@ def drop_duplicates( *, keep: str = "first", ) -> DataFrame: + if keep is not False: + validations.enforce_ordered(self, "drop_duplicates(keep != False)") if subset is None: column_ids = self._block.value_columns elif utils.is_list_like(subset): @@ -3499,6 +3531,8 @@ def drop_duplicates( return DataFrame(block) def duplicated(self, subset=None, keep: str = "first") -> bigframes.series.Series: + if keep is not False: + validations.enforce_ordered(self, "duplicated(keep != False)") if subset is None: column_ids = self._block.value_columns else: @@ -3592,6 +3626,7 @@ def _optimize_query_complexity(self): _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries") + @validations.requires_strict_ordering() def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: if not isinstance(other, (DataFrame, bf_series.Series)): raise NotImplementedError( diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index bae239b6da..bc0d83b4f6 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -47,5 +47,9 @@ class NullIndexError(ValueError): """Object has no index.""" +class OrderRequiredError(ValueError): + """Operation requires total row ordering to be enabled.""" + + class TimeTravelDisabledWarning(Warning): """A query was reattempted without time travel.""" diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 619183287f..675ead1188 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -42,6 +42,15 @@ def uses_total_row_ordering(self): def can_order_by(self): return False + @property + def order_independent(self): + """ + True if the output of the operator does not depend on the ordering of input rows. + + Navigation functions are a notable case that are not order independent. + """ + return False + @abc.abstractmethod def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: ... @@ -78,6 +87,15 @@ def name(self) -> str: def arguments(self) -> int: ... + @property + def order_independent(self): + """ + True if results don't depend on the order of the input. + + Almost all aggregation functions are order independent, excepting ``array_agg`` and ``string_agg``. + """ + return not self.can_order_by + @dataclasses.dataclass(frozen=True) class NullaryAggregateOp(AggregateOp, NullaryWindowOp): @@ -294,6 +312,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT ) return pd.ArrowDtype(pa_type) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class QcutOp(UnaryWindowOp): @@ -312,6 +334,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT dtypes.is_orderable, dtypes.INT_DTYPE, "orderable" ).output_type(input_types[0]) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class NuniqueOp(UnaryAggregateOp): @@ -349,6 +375,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT dtypes.is_orderable, dtypes.INT_DTYPE, "orderable" ).output_type(input_types[0]) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class DenseRankOp(UnaryWindowOp): @@ -361,6 +391,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT dtypes.is_orderable, dtypes.INT_DTYPE, "orderable" ).output_type(input_types[0]) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class FirstOp(UnaryWindowOp): diff --git a/bigframes/series.py b/bigframes/series.py index 3334321158..c325783e96 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -43,6 +43,7 @@ import bigframes.core.ordering as order import bigframes.core.scalar as scalars import bigframes.core.utils as utils +import bigframes.core.validations as validations import bigframes.core.window import bigframes.core.window_spec import bigframes.dataframe @@ -92,10 +93,12 @@ def loc(self) -> bigframes.core.indexers.LocSeriesIndexer: return bigframes.core.indexers.LocSeriesIndexer(self) @property + @validations.requires_strict_ordering() def iloc(self) -> bigframes.core.indexers.IlocSeriesIndexer: return bigframes.core.indexers.IlocSeriesIndexer(self) @property + @validations.requires_strict_ordering() def iat(self) -> bigframes.core.indexers.IatSeriesIndexer: return bigframes.core.indexers.IatSeriesIndexer(self) @@ -160,6 +163,7 @@ def struct(self) -> structs.StructAccessor: return structs.StructAccessor(self._block) @property + @validations.requires_strict_ordering() def T(self) -> Series: return self.transpose() @@ -171,6 +175,7 @@ def _info_axis(self) -> indexes.Index: def _session(self) -> bigframes.Session: return self._get_block().expr.session + @validations.requires_strict_ordering() def transpose(self) -> Series: return self @@ -266,6 +271,7 @@ def equals( return False return block_ops.equals(self._block, other._block) + @validations.requires_strict_ordering() def reset_index( self, *, @@ -454,11 +460,13 @@ def case_when(self, caselist) -> Series: ignore_self=True, ) + @validations.requires_strict_ordering() def cumsum(self) -> Series: return self._apply_window_op( agg_ops.sum_op, bigframes.core.window_spec.cumulative_rows() ) + @validations.requires_strict_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) @@ -466,25 +474,30 @@ def ffill(self, *, limit: typing.Optional[int] = None) -> Series: pad = ffill pad.__doc__ = inspect.getdoc(vendored_pandas_series.Series.ffill) + @validations.requires_strict_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) + @validations.requires_strict_ordering() def cummax(self) -> Series: return self._apply_window_op( agg_ops.max_op, bigframes.core.window_spec.cumulative_rows() ) + @validations.requires_strict_ordering() def cummin(self) -> Series: return self._apply_window_op( agg_ops.min_op, bigframes.core.window_spec.cumulative_rows() ) + @validations.requires_strict_ordering() def cumprod(self) -> Series: return self._apply_window_op( agg_ops.product_op, bigframes.core.window_spec.cumulative_rows() ) + @validations.requires_strict_ordering() def shift(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -492,6 +505,7 @@ def shift(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) + @validations.requires_strict_ordering() def diff(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -499,11 +513,13 @@ def diff(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) + @validations.requires_strict_ordering() def pct_change(self, periods: int = 1) -> Series: # Future versions of pandas will not perfrom ffill automatically series = self.ffill() return Series(block_ops.pct_change(series._block, periods=periods)) + @validations.requires_strict_ordering() def rank( self, axis=0, @@ -595,6 +611,7 @@ def _mapping_replace(self, mapping: dict[typing.Hashable, typing.Hashable]): ) return Series(block.select_column(result)) + @validations.requires_strict_ordering() @requires_index def interpolate(self, method: str = "linear") -> Series: if method == "pad": @@ -617,9 +634,11 @@ def dropna( result = result.reset_index() return Series(result) + @validations.requires_strict_ordering() def head(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[0:n]) + @validations.requires_strict_ordering() def tail(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[-n:]) @@ -660,6 +679,8 @@ def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame: def nlargest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + validations.enforce_ordered(self, "nlargest(keep != 'all')") return Series( block_ops.nlargest(self._block, n, [self._value_column], keep=keep) ) @@ -667,6 +688,8 @@ def nlargest(self, n: int = 5, keep: str = "first") -> Series: def nsmallest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + validations.enforce_ordered(self, "nsmallest(keep != 'all')") return Series( block_ops.nsmallest(self._block, n, [self._value_column], keep=keep) ) @@ -1116,6 +1139,7 @@ def clip(self, lower, upper): ) return Series(block.select_column(result_id).with_column_labels([self.name])) + @validations.requires_strict_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1128,6 +1152,7 @@ def argmax(self) -> int: scalars.Scalar, Series(block.select_column(row_nums)).iloc[0] ) + @validations.requires_strict_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1193,12 +1218,14 @@ def idxmin(self) -> blocks.Label: return indexes.Index(block).to_pandas()[0] @property + @validations.requires_strict_ordering() def is_monotonic_increasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_increasing(self._value_column) ) @property + @validations.requires_strict_ordering() def is_monotonic_decreasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_decreasing(self._value_column) @@ -1306,6 +1333,7 @@ def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series: block = block.order_by(ordering) return Series(block) + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = bigframes.core.window_spec.rows( @@ -1315,6 +1343,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_spec, self._block.value_columns, is_series=True ) + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window_spec = bigframes.core.window_spec.cumulative_rows( min_periods=min_periods @@ -1579,13 +1608,18 @@ def reindex_like(self, other: Series, *, validate: typing.Optional[bool] = None) return self.reindex(other.index, validate=validate) def drop_duplicates(self, *, keep: str = "first") -> Series: + if keep is not False: + validations.enforce_ordered(self, "drop_duplicates(keep != False)") block = block_ops.drop_duplicates(self._block, (self._value_column,), keep) return Series(block) + @validations.requires_strict_ordering() def unique(self) -> Series: return self.drop_duplicates() def duplicated(self, keep: str = "first") -> Series: + if keep is not False: + validations.enforce_ordered(self, "duplicated(keep != False)") block, indicator = block_ops.indicate_duplicates( self._block, (self._value_column,), keep ) @@ -1751,6 +1785,7 @@ def map( result_df = self_df.join(map_df, on="series") return result_df[self.name] + @validations.requires_strict_ordering() def sample( self, n: Optional[int] = None, diff --git a/tests/system/conftest.py b/tests/system/conftest.py index a41e6dc6b7..df4ff9aff0 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -149,9 +149,7 @@ def unordered_session() -> Generator[bigframes.Session, None, None]: @pytest.fixture(scope="session") def session_tokyo(tokyo_location: str) -> Generator[bigframes.Session, None, None]: - context = bigframes.BigQueryOptions( - location=tokyo_location, - ) + context = bigframes.BigQueryOptions(location=tokyo_location) session = bigframes.Session(context=context) yield session session.close() # close generated session at cleanup type diff --git a/tests/system/small/test_unordered.py b/tests/system/small/test_unordered.py index d555cedcc0..36bf2a2585 100644 --- a/tests/system/small/test_unordered.py +++ b/tests/system/small/test_unordered.py @@ -13,7 +13,9 @@ # limitations under the License. import pandas as pd import pyarrow as pa +import pytest +import bigframes.exceptions import bigframes.pandas as bpd from tests.system.utils import assert_pandas_df_equal, skip_legacy_pandas @@ -59,3 +61,52 @@ def test_unordered_mode_read_gbq(unordered_session): ) # Don't need ignore_order as there is only 1 row assert_pandas_df_equal(df.to_pandas(), expected) + + +@pytest.mark.parametrize( + ("keep"), + [ + pytest.param( + "first", + marks=pytest.mark.xfail(raises=bigframes.exceptions.OrderRequiredError), + ), + pytest.param( + False, + ), + ], +) +def test_unordered_drop_duplicates(unordered_session, keep): + pd_df = pd.DataFrame({"a": [1, 1, 3], "b": [4, 4, 6]}, dtype=pd.Int64Dtype()) + bf_df = bpd.DataFrame(pd_df, session=unordered_session) + + bf_result = bf_df.drop_duplicates(keep=keep) + pd_result = pd_df.drop_duplicates(keep=keep) + + assert_pandas_df_equal(bf_result.to_pandas(), pd_result, ignore_order=True) + + +@pytest.mark.parametrize( + ("function"), + [ + pytest.param( + lambda x: x.cumsum(), + id="cumsum", + ), + pytest.param( + lambda x: x.idxmin(), + id="idxmin", + ), + pytest.param( + lambda x: x.a.iloc[1::2], + id="series_iloc", + ), + ], +) +def test_unordered_mode_blocks_windowing(unordered_session, function): + pd_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, dtype=pd.Int64Dtype()) + df = bpd.DataFrame(pd_df, session=unordered_session) + with pytest.raises( + bigframes.exceptions.OrderRequiredError, + match=r"Op.*not supported when strict ordering is disabled", + ): + function(df)