diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 79c6bb6495..0a2936419f 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -456,6 +456,19 @@ def join( return ArrayValue(bigframes.core.rewrite.maybe_rewrite_join(join_node)) return ArrayValue(join_node) + def try_align_as_projection( + self, + other: ArrayValue, + join_type: join_def.JoinType, + mappings: typing.Tuple[join_def.JoinColumnMapping, ...], + ) -> typing.Optional[ArrayValue]: + left_side = bigframes.core.rewrite.SquashedSelect.from_node(self.node) + right_side = bigframes.core.rewrite.SquashedSelect.from_node(other.node) + result = left_side.maybe_merge(right_side, join_type, mappings) + if result is not None: + return ArrayValue(result.expand()) + return None + def explode(self, column_ids: typing.Sequence[str]) -> ArrayValue: assert len(column_ids) > 0 for column_id in column_ids: diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index e12e6bf054..eaee2e2cc0 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -597,9 +597,11 @@ def skew( block = block.select_columns(skew_ids).with_column_labels(column_labels) if not grouping_column_ids: - # When ungrouped, stack everything into single column so can be returned as series - block = block.stack() - block = block.drop_levels([block.index_columns[0]]) + # When ungrouped, transpose result row into a series + # perform transpose last, so as to not invalidate cache + block, index_col = block.create_constant(None, None) + block = block.set_index([index_col]) + return block.transpose(original_row_index=pd.Index([None])) return block @@ -637,9 +639,11 @@ def kurt( block = block.select_columns(kurt_ids).with_column_labels(column_labels) if not grouping_column_ids: - # When ungrouped, stack everything into single column so can be returned as series - block = block.stack() - block = block.drop_levels([block.index_columns[0]]) + # When ungrouped, transpose result row into a series + # perform transpose last, so as to not invalidate cache + block, index_col = block.create_constant(None, None) + block = block.set_index([index_col]) + return block.transpose(original_row_index=pd.Index([None])) return block @@ -820,7 +824,8 @@ def idxmax(block: blocks.Block) -> blocks.Block: def _idx_extrema( block: blocks.Block, min_or_max: typing.Literal["min", "max"] ) -> blocks.Block: - if len(block.index_columns) != 1: + block._throw_if_null_index("idx") + if len(block.index_columns) > 1: # TODO: Need support for tuple dtype raise NotImplementedError( f"idxmin not support for multi-index. {constants.FEEDBACK_LINK}" diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 6cdb0021f5..0bbb8a0b61 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -51,6 +51,7 @@ import bigframes.core.utils as utils import bigframes.core.window_spec as window_specs import bigframes.dtypes +import bigframes.exceptions import bigframes.features import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops @@ -120,19 +121,11 @@ def __init__( f"'index_columns' (size {len(index_columns)}) and 'index_labels' (size {len(index_labels)}) must have equal length" ) - # If no index columns are set, create one. - # - # Note: get_index_cols in - # bigframes/session/_io/bigquery/read_gbq_table.py depends on this - # being as sequential integer index column. If this default behavior - # ever changes, please also update get_index_cols so - # that users who explicitly request a sequential integer index can - # still get one. if len(index_columns) == 0: - new_index_col_id = guid.generate_guid() - expr = expr.promote_offsets(new_index_col_id) - index_columns = [new_index_col_id] - + warnings.warn( + "Creating object with Null Index. Null Index is a preview feature.", + category=bigframes.exceptions.PreviewWarning, + ) self._index_columns = tuple(index_columns) # Index labels don't need complicated hierarchical access so can store as tuple self._index_labels = ( @@ -517,7 +510,8 @@ def _copy_index_to_pandas(self, df: pd.DataFrame): Warning: This method modifies ``df`` inplace. """ - if self.index_columns: + # Note: If BigQuery DataFrame has null index, a default one will be created for the local materialization. + if len(self.index_columns) > 0: df.set_index(list(self.index_columns), inplace=True) # Pandas names is annotated as list[str] rather than the more # general Sequence[Label] that BigQuery DataFrames has. @@ -1093,16 +1087,25 @@ def aggregate( aggregate_labels = self._get_labels_for_columns( [agg[0] for agg in aggregations] ) + names: typing.List[Label] = [] - for by_col_id in by_column_ids: - if by_col_id in self.value_columns: - names.append(self.col_id_to_label[by_col_id]) - else: - names.append(self.col_id_to_index_name[by_col_id]) + if len(by_column_ids) == 0: + label_id = guid.generate_guid() + result_expr = result_expr.assign_constant(label_id, 0, pd.Int64Dtype()) + index_columns = (label_id,) + names = [None] + else: + index_columns = tuple(by_column_ids) # type: ignore + for by_col_id in by_column_ids: + if by_col_id in self.value_columns: + names.append(self.col_id_to_label[by_col_id]) + else: + names.append(self.col_id_to_index_name[by_col_id]) + return ( Block( result_expr, - index_columns=by_column_ids, + index_columns=index_columns, column_labels=aggregate_labels, index_labels=names, ), @@ -1256,11 +1259,12 @@ def explode( expr = self.expr.explode(column_ids) if ignore_index: + new_index_ids = guid.generate_guid() return Block( - expr.drop_columns(self.index_columns), + expr.drop_columns(self.index_columns).promote_offsets(new_index_ids), column_labels=self.column_labels, # Initiates default index creation using the block constructor. - index_columns=[], + index_columns=[new_index_ids], ) else: return Block( @@ -1423,7 +1427,8 @@ def retrieve_repr_request_results( computed_df, query_job = head_block.to_pandas() formatted_df = computed_df.set_axis(self.column_labels, axis=1) # we reset the axis and substitute the bf index name(s) for the default - formatted_df.index.names = self.index.names # type: ignore + if len(self.index.names) > 0: + formatted_df.index.names = self.index.names # type: ignore return formatted_df, count, query_job def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: @@ -1907,9 +1912,26 @@ def join( other: Block, *, how="left", - sort=False, + sort: bool = False, block_identity_join: bool = False, ) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]: + """ + Join two blocks objects together, and provide mappings between source columns and output columns. + + Args: + other (Block): + The right operand of the join operation + how (str): + Describes the join type. 'inner', 'outer', 'left', or 'right' + sort (bool): + if true will sort result by index + block_identity_join (bool): + If true, will not convert join to a projection (implicitly assuming unique indices) + + Returns: + Block, (left_mapping, right_mapping): Result block and mappers from input column ids to result column ids. + """ + if not isinstance(other, Block): # TODO(swast): We need to improve this error message to be more # actionable for the user. For example, it's possible they @@ -1923,6 +1945,16 @@ def join( raise NotImplementedError( f"Only how='outer','left','right','inner' currently supported. {constants.FEEDBACK_LINK}" ) + # Special case for null index, + if ( + (self.index.nlevels == other.index.nlevels == 0) + and not sort + and not block_identity_join + ): + return join_indexless(self, other, how=how) + + self._throw_if_null_index("join") + other._throw_if_null_index("join") if self.index.nlevels == other.index.nlevels == 1: return join_mono_indexed( self, other, how=how, sort=sort, block_identity_join=block_identity_join @@ -2071,6 +2103,12 @@ def _is_monotonic( self._stats_cache[column_name].update({op_name: result}) return result + def _throw_if_null_index(self, opname: str): + if len(self.index_columns) == 0: + raise bigframes.exceptions.NullIndexError( + f"Cannot do {opname} without an index. Set an index using set_index." + ) + def _get_rows_as_json_values(self) -> Block: # We want to preserve any ordering currently present before turning to # direct SQL manipulation. We will restore the ordering when we rebuild @@ -2211,6 +2249,10 @@ def __repr__(self) -> str: def to_pandas(self) -> pd.Index: """Executes deferred operations and downloads the results.""" + if len(self.column_ids) == 0: + raise bigframes.exceptions.NullIndexError( + "Cannot materialize index, as this object does not have an index. Set index column(s) using set_index." + ) # Project down to only the index column. So the query can be cached to visualize other data. index_columns = list(self._block.index_columns) dtypes = dict(zip(index_columns, self.dtypes)) @@ -2252,6 +2294,53 @@ def is_uniquely_named(self: BlockIndexProperties): return len(set(self.names)) == len(self.names) +def join_indexless( + left: Block, + right: Block, + *, + how="left", +) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]: + """Joins two blocks""" + left_expr = left.expr + right_expr = right.expr + left_mappings = [ + join_defs.JoinColumnMapping( + source_table=join_defs.JoinSide.LEFT, + source_id=id, + destination_id=guid.generate_guid(), + ) + for id in left_expr.column_ids + ] + right_mappings = [ + join_defs.JoinColumnMapping( + source_table=join_defs.JoinSide.RIGHT, + source_id=id, + destination_id=guid.generate_guid(), + ) + for id in right_expr.column_ids + ] + combined_expr = left_expr.try_align_as_projection( + right_expr, + join_type=how, + mappings=(*left_mappings, *right_mappings), + ) + if combined_expr is None: + raise bigframes.exceptions.NullIndexError( + "Cannot implicitly align objects. Set an explicit index using set_index." + ) + get_column_left = {m.source_id: m.destination_id for m in left_mappings} + get_column_right = {m.source_id: m.destination_id for m in right_mappings} + block = Block( + combined_expr, + column_labels=[*left.column_labels, *right.column_labels], + index_columns=(), + ) + return ( + block, + (get_column_left, get_column_right), + ) + + def join_mono_indexed( left: Block, right: Block, diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 2db3e0791f..7f2c56c20a 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -101,6 +101,11 @@ def __new__( def from_frame( cls, frame: Union[bigframes.series.Series, bigframes.dataframe.DataFrame] ) -> Index: + if len(frame._block.index_columns) == 0: + raise bigframes.exceptions.NullIndexError( + "Cannot access index properties with Null Index. Set an index using set_index." + ) + frame._block._throw_if_null_index("from_frame") index = Index(frame._block) index._linked_frame = frame return index diff --git a/bigframes/core/rewrite.py b/bigframes/core/rewrite.py index e3a07c04b4..15999c0558 100644 --- a/bigframes/core/rewrite.py +++ b/bigframes/core/rewrite.py @@ -98,12 +98,12 @@ def order_with(self, by: Tuple[order.OrderingExpression, ...]): self.root, self.columns, self.predicate, new_ordering, self.reverse_root ) - def maybe_join( + def can_join( self, right: SquashedSelect, join_def: join_defs.JoinDefinition - ) -> Optional[SquashedSelect]: + ) -> bool: if join_def.type == "cross": # Cannot convert cross join to projection - return None + return False r_exprs_by_id = {id: expr for expr, id in right.columns} l_exprs_by_id = {id: expr for expr, id in self.columns} @@ -113,10 +113,17 @@ def maybe_join( if (self.root != right.root) or any( l_expr != r_expr for l_expr, r_expr in zip(l_join_exprs, r_join_exprs) ): + return False + return True + + def maybe_merge( + self, + right: SquashedSelect, + join_type: join_defs.JoinType, + mappings: Tuple[join_defs.JoinColumnMapping, ...], + ) -> Optional[SquashedSelect]: + if self.root != right.root: return None - - join_type = join_def.type - # Mask columns and remap names to expected schema lselection = self.columns rselection = right.columns @@ -136,7 +143,7 @@ def maybe_join( lselection = tuple((apply_mask(expr, lmask), id) for expr, id in lselection) if rmask is not None: rselection = tuple((apply_mask(expr, rmask), id) for expr, id in rselection) - new_columns = remap_names(join_def, lselection, rselection) + new_columns = remap_names(mappings, lselection, rselection) # Reconstruct ordering reverse_root = self.reverse_root @@ -201,20 +208,27 @@ def maybe_squash_projection(node: nodes.BigFrameNode) -> nodes.BigFrameNode: def maybe_rewrite_join(join_node: nodes.JoinNode) -> nodes.BigFrameNode: left_side = SquashedSelect.from_node(join_node.left_child) right_side = SquashedSelect.from_node(join_node.right_child) - joined = left_side.maybe_join(right_side, join_node.join) - if joined is not None: - return joined.expand() + if left_side.can_join(right_side, join_node.join): + merged = left_side.maybe_merge( + right_side, join_node.join.type, join_node.join.mappings + ) + assert ( + merged is not None + ), "Couldn't merge nodes. This shouldn't happen. Please share full stacktrace with the BigQuery DataFrames team at bigframes-feedback@google.com." + return merged.expand() else: return join_node def remap_names( - join: join_defs.JoinDefinition, lselection: Selection, rselection: Selection + mappings: Tuple[join_defs.JoinColumnMapping, ...], + lselection: Selection, + rselection: Selection, ) -> Selection: new_selection: Selection = tuple() l_exprs_by_id = {id: expr for expr, id in lselection} r_exprs_by_id = {id: expr for expr, id in rselection} - for mapping in join.mappings: + for mapping in mappings: if mapping.source_table == join_defs.JoinSide.LEFT: expr = l_exprs_by_id[mapping.source_id] else: # Right diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 2ac423e394..105588de2f 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -17,6 +17,7 @@ from __future__ import annotations import datetime +import functools import inspect import re import sys @@ -42,6 +43,7 @@ import google.cloud.bigquery as bigquery import numpy import pandas +import pandas.io.formats.format import tabulate import bigframes @@ -87,6 +89,15 @@ ) +def requires_index(meth): + @functools.wraps(meth) + def guarded_meth(df: DataFrame, *args, **kwargs): + df._throw_if_null_index(meth.__name__) + return meth(df, *args, **kwargs) + + return guarded_meth + + # Inherits from pandas DataFrame so that we can use the same docstrings. @log_adapter.class_logger class DataFrame(vendored_pandas_frame.DataFrame): @@ -244,6 +255,7 @@ def _sql_names( return results @property + @requires_index def index( self, ) -> indexes.Index: @@ -259,6 +271,7 @@ def index(self, value): self.index.name = value.name if hasattr(value, "name") else None @property + @requires_index def loc(self) -> indexers.LocDataFrameIndexer: return indexers.LocDataFrameIndexer(self) @@ -271,6 +284,7 @@ def iat(self) -> indexers.IatDataFrameIndexer: return indexers.IatDataFrameIndexer(self) @property + @requires_index def at(self) -> indexers.AtDataFrameIndexer: return indexers.AtDataFrameIndexer(self) @@ -317,10 +331,15 @@ def bqclient(self) -> bigframes.Session: def _session(self) -> bigframes.Session: return self._get_block().expr.session + @property + def _has_index(self) -> bool: + return len(self._block.index_columns) > 0 + @property def T(self) -> DataFrame: return DataFrame(self._get_block().transpose()) + @requires_index def transpose(self) -> DataFrame: return self.T @@ -613,7 +632,15 @@ def __repr__(self) -> str: column_count = len(pandas_df.columns) with display_options.pandas_repr(opts): - repr_string = repr(pandas_df) + import pandas.io.formats + + # safe to mutate this, this dict is owned by this code, and does not affect global config + to_string_kwargs = ( + pandas.io.formats.format.get_dataframe_repr_params() # type: ignore + ) + if not self._has_index: + to_string_kwargs.update({"index": False}) + repr_string = pandas_df.to_string(**to_string_kwargs) # Modify the end of the string to reflect count. lines = repr_string.split("\n") @@ -813,15 +840,18 @@ def _apply_dataframe_binop( ) # join columns schema # indexers will be none for exact match - columns, lcol_indexer, rcol_indexer = self.columns.join( - other.columns, how=how, return_indexers=True - ) + if self.columns.equals(other.columns): + columns, lcol_indexer, rcol_indexer = self.columns, None, None + else: + columns, lcol_indexer, rcol_indexer = self.columns.join( + other.columns, how=how, return_indexers=True + ) binop_result_ids = [] column_indices = zip( lcol_indexer if (lcol_indexer is not None) else range(len(columns)), - rcol_indexer if (lcol_indexer is not None) else range(len(columns)), + rcol_indexer if (rcol_indexer is not None) else range(len(columns)), ) for left_index, right_index in column_indices: @@ -1329,6 +1359,7 @@ def drop( block = self._block if index is not None: + self._throw_if_null_index("drop(axis=0)") level_id = self._resolve_levels(level or 0)[0] if utils.is_list_like(index): @@ -1603,6 +1634,7 @@ def set_index( col_ids_strs: List[str] = [col_id for col_id in col_ids if col_id is not None] return DataFrame(self._block.set_index(col_ids_strs, append=append, drop=drop)) + @requires_index def sort_index( self, ascending: bool = True, na_position: Literal["first", "last"] = "last" ) -> DataFrame: @@ -1804,6 +1836,7 @@ def reindex( if columns is not None: return self._reindex_columns(columns) + @requires_index def _reindex_rows( self, index, @@ -1850,9 +1883,11 @@ def _reindex_columns(self, columns): result_df.columns = new_column_index return result_df + @requires_index def reindex_like(self, other: DataFrame, *, validate: typing.Optional[bool] = None): return self.reindex(index=other.index, columns=other.columns, validate=validate) + @requires_index def interpolate(self, method: str = "linear") -> DataFrame: if method == "pad": return self.ffill() @@ -2044,14 +2079,13 @@ def quantile( if multi_q: return DataFrame(result.stack()).droplevel(0) else: - result_df = ( - DataFrame(result) - .stack(list(range(0, frame.columns.nlevels))) - .droplevel(0) + # Drop the last level, which contains q, unnecessary since only one q + result = result.with_column_labels(result.column_labels.droplevel(-1)) + result, index_col = result.create_constant(q, None) + result = result.set_index([index_col]) + return bigframes.series.Series( + result.transpose(original_row_index=pandas.Index([q])) ) - result_series = bigframes.series.Series(result_df._block) - result_series.name = q - return result_series def std( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -2146,9 +2180,11 @@ def agg( aggregate = agg aggregate.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.agg) + @requires_index def idxmin(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmin(self._block)) + @requires_index def idxmax(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmax(self._block)) @@ -2255,6 +2291,7 @@ def _pivot( ) return DataFrame(pivot_block) + @requires_index def pivot( self, *, @@ -2268,6 +2305,7 @@ def pivot( ) -> DataFrame: return self._pivot(columns=columns, index=index, values=values) + @requires_index def pivot_table( self, values: typing.Optional[ @@ -2366,6 +2404,7 @@ def _stack_multi(self, level: LevelsType = -1): block = block.stack(levels=len(level)) return DataFrame(block) + @requires_index def unstack(self, level: LevelsType = -1): if not utils.is_list_like(level): level = [level] @@ -2613,6 +2652,7 @@ def groupby( else: raise TypeError("You have to supply one of 'by' and 'level'") + @requires_index def _groupby_level( self, level: LevelsType, @@ -3578,3 +3618,9 @@ def __matmul__(self, other) -> DataFrame: return self.dot(other) __matmul__.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.__matmul__) + + def _throw_if_null_index(self, opname: str): + if not self._has_index: + raise bigframes.exceptions.NullIndexError( + f"DataFrame cannot perform {opname} as it has no index. Set an index using set_index." + ) diff --git a/bigframes/enums.py b/bigframes/enums.py index 4bec75f5df..9501d3f13e 100644 --- a/bigframes/enums.py +++ b/bigframes/enums.py @@ -27,3 +27,5 @@ class DefaultIndexKind(enum.Enum): #: ``n - 3``, ``n - 2``, ``n - 1``, where ``n`` is the number of items in #: the index. SEQUENTIAL_INT64 = enum.auto() + # A completely null index incapable of indexing or alignment. + NULL = enum.auto() diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index eae021b4cd..027b3a4236 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -37,3 +37,7 @@ class DefaultIndexWarning(Warning): class PreviewWarning(Warning): """The feature is in preview.""" + + +class NullIndexError(ValueError): + """Object has no index.""" diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index 75d14f3fbc..49ef7f76ee 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -14,6 +14,7 @@ from __future__ import annotations +import functools import typing from typing import List, Sequence @@ -34,6 +35,15 @@ import bigframes.session +def requires_index(meth): + @functools.wraps(meth) + def guarded_meth(df: SeriesMethods, *args, **kwargs): + df._throw_if_null_index(meth.__name__) + return meth(df, *args, **kwargs) + + return guarded_meth + + class SeriesMethods: def __init__( self, @@ -266,3 +276,9 @@ def _align_n( block, constant_col_id = block.create_constant(other, dtype=dtype) value_ids = [*value_ids, constant_col_id] return (value_ids, block) + + def _throw_if_null_index(self, opname: str): + if len(self._block.index_columns) == 0: + raise bigframes.exceptions.NullIndexError( + f"Series cannot perform {opname} as it has no index. Set an index using set_index." + ) diff --git a/bigframes/series.py b/bigframes/series.py index d1fb0d679b..4595164e80 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -51,6 +51,7 @@ import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops import bigframes.operations.base +from bigframes.operations.base import requires_index import bigframes.operations.datetimes as dt import bigframes.operations.plotting as plotting import bigframes.operations.strings as strings @@ -85,6 +86,7 @@ def dtypes(self): return self._dtype @property + @requires_index def loc(self) -> bigframes.core.indexers.LocSeriesIndexer: return bigframes.core.indexers.LocSeriesIndexer(self) @@ -97,6 +99,7 @@ def iat(self) -> bigframes.core.indexers.IatSeriesIndexer: return bigframes.core.indexers.IatSeriesIndexer(self) @property + @requires_index def at(self) -> bigframes.core.indexers.AtSeriesIndexer: return bigframes.core.indexers.AtSeriesIndexer(self) @@ -135,6 +138,7 @@ def values(self) -> numpy.ndarray: return self.to_numpy() @property + @requires_index def index(self) -> indexes.Index: return indexes.Index.from_frame(self) @@ -236,6 +240,7 @@ def rename( raise ValueError(f"Unsupported type of parameter index: {type(index)}") + @requires_index def rename_axis( self, mapper: typing.Union[blocks.Label, typing.Sequence[blocks.Label]], @@ -288,7 +293,17 @@ def __repr__(self) -> str: pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results) self._set_internal_query_job(query_job) - return repr(pandas_df.iloc[:, 0]) + pd_series = pandas_df.iloc[:, 0] + + import pandas.io.formats + + # safe to mutate this, this dict is owned by this code, and does not affect global config + to_string_kwargs = pandas.io.formats.format.get_series_repr_params() # type: ignore + if len(self._block.index_columns) == 0: + to_string_kwargs.update({"index": False}) + repr_string = pd_series.to_string(**to_string_kwargs) + + return repr_string def astype( self, @@ -379,10 +394,12 @@ def drop( block = block.drop_columns([condition_id]) return Series(block.select_column(self._value_column)) + @requires_index def droplevel(self, level: LevelsType, axis: int | str = 0): resolved_level_ids = self._resolve_levels(level) return Series(self._block.drop_levels(resolved_level_ids)) + @requires_index def swaplevel(self, i: int = -2, j: int = -1): level_i = self._block.index_columns[i] level_j = self._block.index_columns[j] @@ -392,6 +409,7 @@ def swaplevel(self, i: int = -2, j: int = -1): ] return Series(self._block.reorder_levels(reordering)) + @requires_index def reorder_levels(self, order: LevelsType, axis: int | str = 0): resolved_level_ids = self._resolve_levels(order) return Series(self._block.reorder_levels(resolved_level_ids)) @@ -570,6 +588,7 @@ def _mapping_replace(self, mapping: dict[typing.Hashable, typing.Hashable]): ) return Series(block.select_column(result)) + @requires_index def interpolate(self, method: str = "linear") -> Series: if method == "pad": return self.ffill() @@ -986,9 +1005,13 @@ def quantile(self, q: Union[float, Sequence[float]] = 0.5) -> Union[Series, floa qs = tuple(q) if utils.is_list_like(q) else (q,) result = block_ops.quantile(self._block, (self._value_column,), qs=qs) if utils.is_list_like(q): - result = result.stack() - result = result.drop_levels([result.index_columns[0]]) - return Series(result) + # Drop the first level, since only one column + result = result.with_column_labels(result.column_labels.droplevel(0)) + result, index_col = result.create_constant(self.name, None) + result = result.set_index([index_col]) + return Series( + result.transpose(original_row_index=pandas.Index([self.name])) + ) else: return cast(float, Series(result).to_pandas().squeeze()) @@ -1064,6 +1087,7 @@ def argmin(self) -> int: scalars.Scalar, Series(block.select_column(row_nums)).iloc[0] ) + @requires_index def unstack(self, level: LevelsType = -1): if isinstance(level, int) or isinstance(level, str): level = [level] @@ -1087,6 +1111,7 @@ def unstack(self, level: LevelsType = -1): ) return bigframes.dataframe.DataFrame(pivot_block) + @requires_index def idxmax(self) -> blocks.Label: block = self._block.order_by( [ @@ -1100,6 +1125,7 @@ def idxmax(self) -> blocks.Label: block = block.slice(0, 1) return indexes.Index(block).to_pandas()[0] + @requires_index def idxmin(self) -> blocks.Label: block = self._block.order_by( [ @@ -1209,6 +1235,7 @@ def sort_values( ) return Series(block) + @requires_index def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series: # TODO(tbergeron): Support level parameter once multi-index introduced. if na_position not in ["first", "last"]: @@ -1269,6 +1296,7 @@ def groupby( else: raise TypeError("You have to supply one of 'by' and 'level'") + @requires_index def _groupby_level( self, level: int | str | typing.Sequence[int] | typing.Sequence[str], @@ -1406,9 +1434,11 @@ def combine( materialized_series = result_series._cached() return materialized_series + @requires_index def add_prefix(self, prefix: str, axis: int | str | None = None) -> Series: return Series(self._get_block().add_prefix(prefix)) + @requires_index def add_suffix(self, suffix: str, axis: int | str | None = None) -> Series: return Series(self._get_block().add_suffix(suffix)) @@ -1460,6 +1490,7 @@ def filter( else: raise ValueError("Need to provide 'items', 'like', or 'regex'") + @requires_index def reindex(self, index=None, *, validate: typing.Optional[bool] = None): if validate and not self.index.is_unique: raise ValueError("Original index must be unique to reindex") @@ -1488,6 +1519,7 @@ def reindex(self, index=None, *, validate: typing.Optional[bool] = None): )._block return Series(result_block) + @requires_index def reindex_like(self, other: Series, *, validate: typing.Optional[bool] = None): return self.reindex(other.index, validate=validate) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 89637644cf..834ebaae13 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -26,6 +26,7 @@ Any, Callable, Dict, + Hashable, IO, Iterable, List, @@ -79,6 +80,7 @@ import bigframes.core as core import bigframes.core.blocks as blocks import bigframes.core.compile +import bigframes.core.guid import bigframes.core.nodes as nodes from bigframes.core.ordering import IntegerEncoding import bigframes.core.ordering as order @@ -800,15 +802,28 @@ def _read_gbq_table( ) # ---------------------------------------------------- - # Create Block & default index if len(index_cols) == 0 + # Create Default Sequential Index if still have no index # ---------------------------------------------------- + # If no index columns provided or found, fall back to sequential index + if (index_col != bigframes.enums.DefaultIndexKind.NULL) and len( + index_cols + ) == 0: + index_col = bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64 + + index_names: Sequence[Hashable] = index_cols + if index_col == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64: + sequential_index_col = bigframes.core.guid.generate_guid("index_") + array_value = array_value.promote_offsets(sequential_index_col) + index_cols = [sequential_index_col] + index_names = [None] + value_columns = [col for col in array_value.column_ids if col not in index_cols] block = blocks.Block( array_value, index_columns=index_cols, column_labels=value_columns, - index_labels=index_cols, + index_labels=index_names, ) if max_results: block = block.slice(stop=max_results) diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 87083529ce..e4aa143400 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -242,10 +242,8 @@ def get_index_cols( if index_col == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64: # User has explicity asked for a default, sequential index. # Use that, even if there are primary keys on the table. - # - # Note: This relies on the default behavior of the Block - # constructor to create a default sequential index. If that ever - # changes, this logic will need to be revisited. + return [] + if index_col == bigframes.enums.DefaultIndexKind.NULL: return [] else: # Note: It's actually quite difficult to mock this out to unit diff --git a/tests/system/conftest.py b/tests/system/conftest.py index a040f2b19b..ecf633b27f 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -391,6 +391,16 @@ def scalars_df_index( return session.read_gbq(scalars_table_id, index_col="rowindex") +@pytest.fixture(scope="session") +def scalars_df_empty_index( + scalars_table_id: str, session: bigframes.Session +) -> bigframes.dataframe.DataFrame: + """DataFrame pointing at test data.""" + return session.read_gbq( + scalars_table_id, index_col=bigframes.enums.DefaultIndexKind.NULL + ).sort_values("rowindex") + + @pytest.fixture(scope="session") def scalars_df_2_default_index( scalars_df_2_index: bigframes.dataframe.DataFrame, diff --git a/tests/system/small/test_empty_index.py b/tests/system/small/test_empty_index.py new file mode 100644 index 0000000000..7a1715e3d1 --- /dev/null +++ b/tests/system/small/test_empty_index.py @@ -0,0 +1,212 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pandas as pd +import pytest + +import bigframes.exceptions +import bigframes.pandas as bpd +from tests.system.utils import skip_legacy_pandas + + +def test_empty_index_materialize( + scalars_df_empty_index, scalars_pandas_df_default_index +): + bf_result = scalars_df_empty_index.to_pandas() + pd.testing.assert_frame_equal( + bf_result, scalars_pandas_df_default_index, check_index_type=False + ) + + +def test_empty_index_series_repr( + scalars_df_empty_index, scalars_pandas_df_default_index +): + bf_result = scalars_df_empty_index["int64_too"].head(5).__repr__() + pd_result = ( + scalars_pandas_df_default_index["int64_too"] + .head(5) + .to_string(dtype=True, index=False, length=False, name=True) + ) + assert bf_result == pd_result + + +def test_empty_index_dataframe_repr( + scalars_df_empty_index, scalars_pandas_df_default_index +): + bf_result = scalars_df_empty_index[["int64_too", "int64_col"]].head(5).__repr__() + pd_result = ( + scalars_pandas_df_default_index[["int64_too", "int64_col"]] + .head(5) + .to_string(index=False) + ) + assert bf_result == pd_result + "\n\n[5 rows x 2 columns]" + + +def test_empty_index_reset_index( + scalars_df_empty_index, scalars_pandas_df_default_index +): + bf_result = scalars_df_empty_index.reset_index().to_pandas() + pd_result = scalars_pandas_df_default_index.reset_index(drop=True) + pd.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False) + + +def test_empty_index_set_index(scalars_df_empty_index, scalars_pandas_df_default_index): + bf_result = scalars_df_empty_index.set_index("int64_col").to_pandas() + pd_result = scalars_pandas_df_default_index.set_index("int64_col") + pd.testing.assert_frame_equal(bf_result, pd_result) + + +def test_empty_index_concat(scalars_df_empty_index, scalars_pandas_df_default_index): + bf_result = bpd.concat( + [scalars_df_empty_index, scalars_df_empty_index], axis=0 + ).to_pandas() + pd_result = pd.concat( + [scalars_pandas_df_default_index, scalars_pandas_df_default_index], axis=0 + ) + pd.testing.assert_frame_equal(bf_result, pd_result.reset_index(drop=True)) + + +def test_empty_index_aggregate(scalars_df_empty_index, scalars_pandas_df_default_index): + bf_result = scalars_df_empty_index.count().to_pandas() + pd_result = scalars_pandas_df_default_index.count() + + pd_result.index = pd_result.index.astype("string[pyarrow]") + + pd.testing.assert_series_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + +def test_empty_index_groupby_aggregate( + scalars_df_empty_index, scalars_pandas_df_default_index +): + bf_result = scalars_df_empty_index.groupby("int64_col").count().to_pandas() + pd_result = scalars_pandas_df_default_index.groupby("int64_col").count() + + pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) + + +@skip_legacy_pandas +def test_empty_index_analytic(scalars_df_empty_index, scalars_pandas_df_default_index): + bf_result = scalars_df_empty_index["int64_col"].cumsum().to_pandas() + pd_result = scalars_pandas_df_default_index["int64_col"].cumsum() + pd.testing.assert_series_equal( + bf_result, pd_result.reset_index(drop=True), check_dtype=False + ) + + +def test_empty_index_groupby_analytic( + scalars_df_empty_index, scalars_pandas_df_default_index +): + bf_result = ( + scalars_df_empty_index.groupby("bool_col")["int64_col"].cummax().to_pandas() + ) + pd_result = scalars_pandas_df_default_index.groupby("bool_col")[ + "int64_col" + ].cummax() + pd.testing.assert_series_equal( + bf_result, pd_result.reset_index(drop=True), check_dtype=False + ) + + +@skip_legacy_pandas +def test_empty_index_stack(scalars_df_empty_index, scalars_pandas_df_default_index): + stacking_cols = ["int64_col", "int64_too"] + bf_result = scalars_df_empty_index[stacking_cols].stack().to_pandas() + pd_result = ( + scalars_pandas_df_default_index[stacking_cols] + .stack(future_stack=True) + .droplevel(level=0, axis=0) + ) + pd_result.index = pd_result.index.astype(bf_result.index.dtype) + pd.testing.assert_series_equal( + bf_result, + pd_result, + check_dtype=False, + ) + + +def test_empty_index_series_self_aligns( + scalars_df_empty_index, scalars_pandas_df_default_index +): + bf_result = ( + scalars_df_empty_index["int64_col"] + scalars_df_empty_index["int64_too"] + ) + pd_result = ( + scalars_pandas_df_default_index["int64_col"] + + scalars_pandas_df_default_index["int64_too"] + ) + pd.testing.assert_series_equal( + bf_result.to_pandas(), pd_result.reset_index(drop=True), check_dtype=False + ) + + +def test_empty_index_df_self_aligns( + scalars_df_empty_index, scalars_pandas_df_default_index +): + bf_result = ( + scalars_df_empty_index[["int64_col", "float64_col"]] + + scalars_df_empty_index[["int64_col", "float64_col"]] + ) + pd_result = ( + scalars_pandas_df_default_index[["int64_col", "float64_col"]] + + scalars_pandas_df_default_index[["int64_col", "float64_col"]] + ) + pd.testing.assert_frame_equal( + bf_result.to_pandas(), pd_result.reset_index(drop=True), check_dtype=False + ) + + +def test_empty_index_df_concat(scalars_df_empty_index, scalars_pandas_df_default_index): + bf_result = bpd.concat([scalars_df_empty_index, scalars_df_empty_index]) + pd_result = pd.concat( + [scalars_pandas_df_default_index, scalars_pandas_df_default_index] + ) + pd.testing.assert_frame_equal( + bf_result.to_pandas(), pd_result.reset_index(drop=True), check_dtype=False + ) + + +def test_empty_index_align_error(scalars_df_empty_index): + with pytest.raises(bigframes.exceptions.NullIndexError): + _ = ( + scalars_df_empty_index["int64_col"] + + scalars_df_empty_index["int64_col"].cumsum() + ) + + +def test_empty_index_loc_error(scalars_df_empty_index): + with pytest.raises(bigframes.exceptions.NullIndexError): + scalars_df_empty_index["int64_col"].loc[1] + + +def test_empty_index_at_error(scalars_df_empty_index): + with pytest.raises(bigframes.exceptions.NullIndexError): + scalars_df_empty_index["int64_col"].at[1] + + +def test_empty_index_idxmin_error(scalars_df_empty_index): + with pytest.raises(bigframes.exceptions.NullIndexError): + scalars_df_empty_index[["int64_col", "int64_too"]].idxmin() + + +def test_empty_index_index_property(scalars_df_empty_index): + with pytest.raises(bigframes.exceptions.NullIndexError): + _ = scalars_df_empty_index.index + + +def test_empty_index_transpose(scalars_df_empty_index): + with pytest.raises(bigframes.exceptions.NullIndexError): + _ = scalars_df_empty_index.T diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 2df7ab86b6..dbc8ddec6f 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -1430,13 +1430,13 @@ def test_numeric_literal(scalars_dfs): assert bf_result.dtype == pd.ArrowDtype(pa.decimal128(38, 9)) -def test_repr(scalars_dfs): +def test_series_small_repr(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs col_name = "int64_col" bf_series = scalars_df[col_name] pd_series = scalars_pandas_df[col_name] - assert repr(bf_series) == repr(pd_series) + assert repr(bf_series) == pd_series.to_string(length=False, dtype=True, name=True) def test_sum(scalars_dfs): @@ -3688,10 +3688,10 @@ def test_series_explode_reserve_order(ignore_index, ordered): res = s.explode(ignore_index=ignore_index).to_pandas(ordered=ordered) # type: ignore # TODO(b/340885567): fix type error pd_res = pd_s.explode(ignore_index=ignore_index).astype(pd.Int64Dtype()) # type: ignore + pd_res.index = pd_res.index.astype(pd.Int64Dtype()) pd.testing.assert_series_equal( res if ordered else res.sort_index(), pd_res, - check_index_type=False, )