diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index f65509e5b7..b4074dd94f 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -17,9 +17,8 @@ import datetime import functools import io -import itertools import typing -from typing import Iterable, Optional, Sequence +from typing import Iterable, Optional, Sequence, Tuple import warnings import google.cloud.bigquery @@ -191,19 +190,14 @@ def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue: nodes.ConcatNode(children=tuple([self.node, *[val.node for val in other]])) ) - def project_to_id(self, expression: ex.Expression, output_id: str): + def compute_values(self, assignments: Sequence[Tuple[ex.Expression, str]]): return ArrayValue( - nodes.ProjectionNode( - child=self.node, - assignments=( - ( - expression, - output_id, - ), - ), - ) + nodes.ProjectionNode(child=self.node, assignments=tuple(assignments)) ) + def project_to_id(self, expression: ex.Expression, output_id: str): + return self.compute_values(((expression, output_id),)) + def assign(self, source_id: str, destination_id: str) -> ArrayValue: if destination_id in self.column_ids: # Mutate case exprs = [ @@ -341,124 +335,33 @@ def _reproject_to_table(self) -> ArrayValue: ) ) - def unpivot( - self, - row_labels: typing.Sequence[typing.Hashable], - unpivot_columns: typing.Sequence[ - typing.Tuple[str, typing.Tuple[typing.Optional[str], ...]] - ], - *, - passthrough_columns: typing.Sequence[str] = (), - index_col_ids: typing.Sequence[str] = ["index"], - join_side: typing.Literal["left", "right"] = "left", - ) -> ArrayValue: - """ - Unpivot ArrayValue columns. - - Args: - row_labels: Identifies the source of the row. Must be equal to length to source column list in unpivot_columns argument. - unpivot_columns: Mapping of column id to list of input column ids. Lists of input columns may use None. - passthrough_columns: Columns that will not be unpivoted. Column id will be preserved. - index_col_id (str): The column id to be used for the row labels. - - Returns: - ArrayValue: The unpivoted ArrayValue - """ - # There will be N labels, used to disambiguate which of N source columns produced each output row - explode_offsets_id = bigframes.core.guid.generate_guid("unpivot_offsets_") - labels_array = self._create_unpivot_labels_array( - row_labels, index_col_ids, explode_offsets_id - ) - - # Unpivot creates N output rows for each input row, labels disambiguate these N rows - joined_array = self._cross_join_w_labels(labels_array, join_side) - - # Build the output rows as a case statment that selects between the N input columns - unpivot_exprs = [] - # Supports producing multiple stacked ouput columns for stacking only part of hierarchical index - for col_id, input_ids in unpivot_columns: - # row explode offset used to choose the input column - # we use offset instead of label as labels are not necessarily unique - cases = itertools.chain( - *( - ( - ops.eq_op.as_expr(explode_offsets_id, ex.const(i)), - ex.free_var(id_or_null) - if (id_or_null is not None) - else ex.const(None), - ) - for i, id_or_null in enumerate(input_ids) - ) - ) - col_expr = ops.case_when_op.as_expr(*cases) - unpivot_exprs.append((col_expr, col_id)) - - unpivot_col_ids = [id for id, _ in unpivot_columns] - return ArrayValue( - nodes.ProjectionNode( - child=joined_array.node, - assignments=(*unpivot_exprs,), - ) - ).select_columns([*index_col_ids, *unpivot_col_ids, *passthrough_columns]) - - def _cross_join_w_labels( - self, labels_array: ArrayValue, join_side: typing.Literal["left", "right"] - ) -> ArrayValue: - """ - Convert each row in self to N rows, one for each label in labels array. - """ - table_join_side = ( - join_def.JoinSide.LEFT if join_side == "left" else join_def.JoinSide.RIGHT - ) - labels_join_side = table_join_side.inverse() - labels_mappings = tuple( - join_def.JoinColumnMapping(labels_join_side, id, id) - for id in labels_array.schema.names - ) - table_mappings = tuple( - join_def.JoinColumnMapping(table_join_side, id, id) - for id in self.schema.names - ) - join = join_def.JoinDefinition( - conditions=(), mappings=(*labels_mappings, *table_mappings), type="cross" - ) - if join_side == "left": - joined_array = self.relational_join(labels_array, join_def=join) - else: - joined_array = labels_array.relational_join(self, join_def=join) - return joined_array - - def _create_unpivot_labels_array( - self, - former_column_labels: typing.Sequence[typing.Hashable], - col_ids: typing.Sequence[str], - offsets_id: str, - ) -> ArrayValue: - """Create an ArrayValue from a list of label tuples.""" - rows = [] - for row_offset in range(len(former_column_labels)): - row_label = former_column_labels[row_offset] - row_label = (row_label,) if not isinstance(row_label, tuple) else row_label - row = { - col_ids[i]: (row_label[i] if pandas.notnull(row_label[i]) else None) - for i in range(len(col_ids)) - } - row[offsets_id] = row_offset - rows.append(row) - - return ArrayValue.from_pyarrow(pa.Table.from_pylist(rows), session=self.session) - def relational_join( self, other: ArrayValue, - join_def: join_def.JoinDefinition, - ) -> ArrayValue: + conditions: typing.Tuple[typing.Tuple[str, str], ...] = (), + type: typing.Literal["inner", "outer", "left", "right", "cross"] = "inner", + ) -> typing.Tuple[ArrayValue, typing.Tuple[dict[str, str], dict[str, str]]]: join_node = nodes.JoinNode( left_child=self.node, right_child=other.node, - join=join_def, + conditions=conditions, + type=type, ) - return ArrayValue(join_node) + # Maps input ids to output ids for caller convenience + l_size = len(self.node.schema) + l_mapping = { + lcol: ocol + for lcol, ocol in zip( + self.node.schema.names, join_node.schema.names[:l_size] + ) + } + r_mapping = { + rcol: ocol + for rcol, ocol in zip( + other.node.schema.names, join_node.schema.names[l_size:] + ) + } + return ArrayValue(join_node), (l_mapping, r_mapping) def try_align_as_projection( self, diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 4db171ec70..42b1a0aeb0 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -44,6 +44,7 @@ import bigframes.core.expression as ex import bigframes.core.expression as scalars import bigframes.core.guid as guid +import bigframes.core.identifiers import bigframes.core.join_def as join_defs import bigframes.core.ordering as ordering import bigframes.core.schema as bf_schema @@ -1050,7 +1051,6 @@ def aggregate_all_and_stack( operation: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp], *, axis: int | str = 0, - value_col_id: str = "values", dropna: bool = True, ) -> Block: axis_n = utils.get_axis_number(axis) @@ -1080,15 +1080,18 @@ def aggregate_all_and_stack( # TODO: Allow to promote identity/total_order columns instead for better perf offset_col = guid.generate_guid() expr_with_offsets = self.expr.promote_offsets(offset_col) - stacked_expr = expr_with_offsets.unpivot( - row_labels=self.column_labels.to_list(), - index_col_ids=[guid.generate_guid()], - unpivot_columns=[(value_col_id, tuple(self.value_columns))], + stacked_expr, (_, value_col_ids, passthrough_cols,) = unpivot( + expr_with_offsets, + row_labels=self.column_labels, + unpivot_columns=[tuple(self.value_columns)], passthrough_columns=[*self.index_columns, offset_col], ) + # these corresponed to passthrough_columns provided to unpivot + index_cols = passthrough_cols[:-1] + og_offset_col = passthrough_cols[-1] index_aggregations = [ (ex.UnaryAggregation(agg_ops.AnyValueOp(), ex.free_var(col_id)), col_id) - for col_id in [*self.index_columns] + for col_id in index_cols ] # TODO: may need add NullaryAggregation in main_aggregation # when agg add support for axis=1, needed for agg("size", axis=1) @@ -1096,17 +1099,18 @@ def aggregate_all_and_stack( operation, agg_ops.UnaryAggregateOp ), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)." main_aggregation = ( - ex.UnaryAggregation(operation, ex.free_var(value_col_id)), - value_col_id, + ex.UnaryAggregation(operation, ex.free_var(value_col_ids[0])), + value_col_ids[0], ) + # Drop row identity after aggregating over it result_expr = stacked_expr.aggregate( [*index_aggregations, main_aggregation], - by_column_ids=[offset_col], + by_column_ids=[og_offset_col], dropna=dropna, - ) + ).drop_columns([og_offset_col]) return Block( - result_expr.drop_columns([offset_col]), - self.index_columns, + result_expr, + index_columns=index_cols, column_labels=[None], index_labels=self.index.names, ) @@ -1318,8 +1322,7 @@ def summarize( ], ): """Get a list of stats as a deferred block object.""" - label_col_id = guid.generate_guid() - labels = [stat.name for stat in stats] + labels = pd.Index([stat.name for stat in stats]) aggregations = [ ( ex.UnaryAggregation(stat, ex.free_var(col_id)) @@ -1331,18 +1334,17 @@ def summarize( for col_id in column_ids ] columns = [ - (col_id, tuple(f"{col_id}-{stat.name}" for stat in stats)) - for col_id in column_ids + (tuple(f"{col_id}-{stat.name}" for stat in stats)) for col_id in column_ids ] - expr = self.expr.aggregate(aggregations).unpivot( + expr, (index_cols, _, _) = unpivot( + self.expr.aggregate(aggregations), labels, unpivot_columns=tuple(columns), - index_col_ids=tuple([label_col_id]), ) return Block( expr, column_labels=self._get_labels_for_columns(column_ids), - index_columns=[label_col_id], + index_columns=index_cols, ) def calculate_pairwise_metric(self, op=agg_ops.CorrOp()): @@ -1368,23 +1370,17 @@ def calculate_pairwise_metric(self, op=agg_ops.CorrOp()): ] expr = self.expr.aggregate(aggregations) - index_col_ids = [ - guid.generate_guid() for i in range(self.column_labels.nlevels) - ] input_count = len(self.value_columns) unpivot_columns = tuple( - ( - guid.generate_guid(), - tuple(expr.column_ids[input_count * i : input_count * (i + 1)]), - ) + tuple(expr.column_ids[input_count * i : input_count * (i + 1)]) for i in range(input_count) ) labels = self._get_labels_for_columns(self.value_columns) # TODO(b/340896143): fix type error - expr = expr.unpivot( - row_labels=labels, # type: ignore - index_col_ids=index_col_ids, + expr, (index_col_ids, _, _) = unpivot( + expr, + row_labels=labels, unpivot_columns=unpivot_columns, ) @@ -1604,7 +1600,7 @@ def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: Block( expr, index_columns=self.index_columns, - column_labels=self.column_labels.insert(0, label), + column_labels=self.column_labels.insert(len(self.column_labels), label), index_labels=self._index_labels, ), result_id, @@ -1722,8 +1718,6 @@ def stack(self, how="left", levels: int = 1): col_labels, row_labels = utils.split_index(self.column_labels, levels=levels) row_labels = row_labels.drop_duplicates() - row_label_tuples = utils.index_as_tuples(row_labels) - if col_labels is None: result_index: pd.Index = pd.Index([None]) result_col_labels: Sequence[Tuple] = list([()]) @@ -1737,26 +1731,24 @@ def stack(self, how="left", levels: int = 1): result_col_labels = utils.index_as_tuples(result_index) # Get matching columns - unpivot_columns: List[Tuple[str, List[str]]] = [] + unpivot_columns: List[Tuple[Optional[str], ...]] = [] for val in result_col_labels: - col_id = guid.generate_guid("unpivot_") - input_columns, dtype = self._create_stack_column(val, row_label_tuples) - unpivot_columns.append((col_id, input_columns)) + input_columns, _ = self._create_stack_column(val, row_labels) + unpivot_columns.append(input_columns) - added_index_columns = [guid.generate_guid() for _ in range(row_labels.nlevels)] - unpivot_expr = self._expr.unpivot( - row_labels=row_label_tuples, + unpivot_expr, (added_index_columns, _, passthrough_cols) = unpivot( + self._expr, + row_labels=row_labels, passthrough_columns=self.index_columns, unpivot_columns=unpivot_columns, - index_col_ids=added_index_columns, join_side=how, ) new_index_level_names = self.column_labels.names[-levels:] if how == "left": - index_columns = [*self.index_columns, *added_index_columns] + index_columns = [*passthrough_cols, *added_index_columns] index_labels = [*self._index_labels, *new_index_level_names] else: - index_columns = [*added_index_columns, *self.index_columns] + index_columns = [*added_index_columns, *passthrough_cols] index_labels = [*new_index_level_names, *self._index_labels] return Block( @@ -1780,18 +1772,16 @@ def melt( Arguments correspond to pandas.melt arguments. """ # TODO: Implement col_level and ignore_index - unpivot_col_id = guid.generate_guid() - var_col_ids = tuple([guid.generate_guid() for _ in var_names]) - # single unpivot col - unpivot_col = (unpivot_col_id, tuple(value_vars)) - value_labels = [self.col_id_to_label[col_id] for col_id in value_vars] + value_labels: pd.Index = pd.Index( + [self.col_id_to_label[col_id] for col_id in value_vars] + ) id_labels = [self.col_id_to_label[col_id] for col_id in id_vars] - unpivot_expr = self._expr.unpivot( + unpivot_expr, (var_col_ids, unpivot_out, passthrough_cols) = unpivot( + self._expr, row_labels=value_labels, passthrough_columns=id_vars, - unpivot_columns=(unpivot_col,), - index_col_ids=var_col_ids, + unpivot_columns=(tuple(value_vars),), # single unpivot col join_side="right", ) @@ -1804,7 +1794,7 @@ def melt( # Need to reorder to get id_vars before var_col and unpivot_col unpivot_expr = unpivot_expr.select_columns( - [*index_cols, *id_vars, *var_col_ids, unpivot_col_id] + [*index_cols, *passthrough_cols, *var_col_ids, *unpivot_out] ) return Block( @@ -1859,6 +1849,7 @@ def transpose( value_vars=block.value_columns, create_offsets_index=False, ) + row_offset = stacked_block.value_columns[0] col_labels = stacked_block.value_columns[-2 - original_col_index.nlevels : -2] col_offset = stacked_block.value_columns[-2] # disambiguator we created earlier cell_values = stacked_block.value_columns[-1] @@ -1867,7 +1858,7 @@ def transpose( [*col_labels, col_offset] ) # col index is now row index result = stacked_block.pivot( - columns=[offsets], + columns=[row_offset], values=[cell_values], columns_unique_values=tuple(range(original_row_count)), ) @@ -1879,12 +1870,10 @@ def transpose( .with_transpose_cache(self) ) - def _create_stack_column( - self, col_label: typing.Tuple, stack_labels: typing.Sequence[typing.Tuple] - ): + def _create_stack_column(self, col_label: typing.Tuple, stack_labels: pd.Index): dtype = None input_columns: list[Optional[str]] = [] - for uvalue in stack_labels: + for uvalue in utils.index_as_tuples(stack_labels): label_to_match = (*col_label, *uvalue) label_to_match = ( label_to_match[0] if len(label_to_match) == 1 else label_to_match @@ -2013,38 +2002,16 @@ def merge( sort: bool, suffixes: tuple[str, str] = ("_x", "_y"), ) -> Block: - left_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.LEFT, - source_id=id, - destination_id=guid.generate_guid(), - ) - for id in self.expr.column_ids - ] - right_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.RIGHT, - source_id=id, - destination_id=guid.generate_guid(), - ) - for id in other.expr.column_ids - ] - - join_def = join_defs.JoinDefinition( - conditions=tuple( - join_defs.JoinCondition(left, right) - for left, right in zip(left_join_ids, right_join_ids) - ), - mappings=(*left_mappings, *right_mappings), - type=how, + conditions = tuple( + (lid, rid) for lid, rid in zip(left_join_ids, right_join_ids) + ) + joined_expr, (get_column_left, get_column_right) = self.expr.relational_join( + other.expr, type=how, conditions=conditions ) - joined_expr = self.expr.relational_join(other.expr, join_def=join_def) result_columns = [] matching_join_labels = [] coalesced_ids = [] - get_column_left = join_def.get_left_mapping() - get_column_right = join_def.get_right_mapping() for left_id, right_id in zip(left_join_ids, right_join_ids): coalesced_id = guid.generate_guid() joined_expr = joined_expr.project_to_id( @@ -2748,34 +2715,10 @@ def join_with_single_row( left_expr = left.expr # ignore index columns by dropping them right_expr = single_row_block.expr.select_columns(single_row_block.value_columns) - left_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.LEFT, - source_id=id, - destination_id=guid.generate_guid(), - ) - for id in left_expr.column_ids - ] - right_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.RIGHT, - source_id=id, - destination_id=guid.generate_guid(), - ) - for id in right_expr.column_ids # skip index column - ] - - join_def = join_defs.JoinDefinition( - conditions=(), - mappings=(*left_mappings, *right_mappings), - type="cross", - ) - combined_expr = left_expr.relational_join( + combined_expr, (get_column_left, get_column_right) = left_expr.relational_join( right_expr, - join_def=join_def, + type="cross", ) - get_column_left = join_def.get_left_mapping() - get_column_right = join_def.get_right_mapping() # Drop original indices from each side. and used the coalesced combination generated by the join. index_cols_post_join = [get_column_left[id] for id in left.index_columns] @@ -2800,38 +2743,15 @@ def join_mono_indexed( ) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]: left_expr = left.expr right_expr = right.expr - left_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.LEFT, - source_id=id, - destination_id=guid.generate_guid(), - ) - for id in left_expr.column_ids - ] - right_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.RIGHT, - source_id=id, - destination_id=guid.generate_guid(), - ) - for id in right_expr.column_ids - ] - join_def = join_defs.JoinDefinition( + combined_expr, (get_column_left, get_column_right) = left_expr.relational_join( + right_expr, + type=how, conditions=( join_defs.JoinCondition(left.index_columns[0], right.index_columns[0]), ), - mappings=(*left_mappings, *right_mappings), - type=how, ) - combined_expr = left_expr.relational_join( - right_expr, - join_def=join_def, - ) - - get_column_left = join_def.get_left_mapping() - get_column_right = join_def.get_right_mapping() left_index = get_column_left[left.index_columns[0]] right_index = get_column_right[right.index_columns[0]] # Drop original indices from each side. and used the coalesced combination generated by the join. @@ -2886,39 +2806,15 @@ def join_multi_indexed( left_expr = left.expr right_expr = right.expr - left_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.LEFT, - source_id=id, - destination_id=guid.generate_guid(), - ) - for id in left_expr.column_ids - ] - right_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.RIGHT, - source_id=id, - destination_id=guid.generate_guid(), - ) - for id in right_expr.column_ids - ] - - join_def = join_defs.JoinDefinition( + combined_expr, (get_column_left, get_column_right) = left_expr.relational_join( + right_expr, + type=how, conditions=tuple( join_defs.JoinCondition(left, right) for left, right in zip(left_join_ids, right_join_ids) ), - mappings=(*left_mappings, *right_mappings), - type=how, ) - combined_expr = left_expr.relational_join( - right_expr, - join_def=join_def, - ) - - get_column_left = join_def.get_left_mapping() - get_column_right = join_def.get_right_mapping() left_ids_post_join = [get_column_left[id] for id in left_join_ids] right_ids_post_join = [get_column_right[id] for id in right_join_ids] # Drop original indices from each side. and used the coalesced combination generated by the join. @@ -3114,3 +3010,94 @@ def _get_block_schema( for label, dtype in zip(block.column_labels, block.dtypes): result[label] = typing.cast(bigframes.dtypes.Dtype, dtype) return result + + +## Unpivot helpers +def unpivot( + array_value: core.ArrayValue, + row_labels: pd.Index, + unpivot_columns: Sequence[Tuple[Optional[str], ...]], + *, + passthrough_columns: typing.Sequence[str] = (), + join_side: Literal["left", "right"] = "left", +) -> Tuple[core.ArrayValue, Tuple[Tuple[str, ...], Tuple[str, ...], Tuple[str, ...]]]: + """ + Unpivot ArrayValue columns. + + Args: + row_labels: Identifies the source of the row. Must be equal to length to source column list in unpivot_columns argument. + unpivot_columns: Sequence of column ids tuples. Each tuple of columns will be combined into a single output column + passthrough_columns: Columns that will not be unpivoted. Column id will be preserved. + index_col_id (str): The column id to be used for the row labels. + + Returns: + ArrayValue, (index_cols, unpivot_cols, passthrough_cols): The unpivoted ArrayValue and resulting column ids. + """ + # There will be N labels, used to disambiguate which of N source columns produced each output row + labels_array = _pd_index_to_array_value( + session=array_value.session, index=row_labels + ) + + # Unpivot creates N output rows for each input row, labels disambiguate these N rows + # Join_side is necessary to produce desired row ordering + if join_side == "left": + joined_array, (column_mapping, labels_mapping) = array_value.relational_join( + labels_array, type="cross" + ) + else: + joined_array, (labels_mapping, column_mapping) = labels_array.relational_join( + array_value, type="cross" + ) + new_passthrough_cols = [column_mapping[col] for col in passthrough_columns] + # Last column is offsets + index_col_ids = [labels_mapping[col] for col in labels_array.column_ids[:-1]] + explode_offsets_id = labels_mapping[labels_array.column_ids[-1]] + + # Build the output rows as a case statment that selects between the N input columns + unpivot_exprs: List[Tuple[ex.Expression, str]] = [] + # Supports producing multiple stacked ouput columns for stacking only part of hierarchical index + for input_ids in unpivot_columns: + # row explode offset used to choose the input column + # we use offset instead of label as labels are not necessarily unique + cases = itertools.chain( + *( + ( + ops.eq_op.as_expr(explode_offsets_id, ex.const(i)), + ex.free_var(column_mapping[id_or_null]) + if (id_or_null is not None) + else ex.const(None), + ) + for i, id_or_null in enumerate(input_ids) + ) + ) + col_expr = ops.case_when_op.as_expr(*cases) + unpivot_exprs.append((col_expr, guid.generate_guid())) + + unpivot_col_ids = [id for _, id in unpivot_exprs] + + return joined_array.compute_values(unpivot_exprs).select_columns( + [*index_col_ids, *unpivot_col_ids, *new_passthrough_cols] + ), (tuple(index_col_ids), tuple(unpivot_col_ids), tuple(new_passthrough_cols)) + + +def _pd_index_to_array_value( + session: core.Session, + index: pd.Index, +) -> core.ArrayValue: + """ + Create an ArrayValue from a list of label tuples. + The last column will be row offsets. + """ + rows = [] + labels_as_tuples = utils.index_as_tuples(index) + for row_offset in range(len(index)): + id_gen = bigframes.core.identifiers.standard_identifiers() + row_label = labels_as_tuples[row_offset] + row_label = (row_label,) if not isinstance(row_label, tuple) else row_label + row = {} + for label_part, id in zip(row_label, id_gen): + row[id] = label_part if pd.notnull(label_part) else None + row[next(id_gen)] = row_offset + rows.append(row) + + return core.ArrayValue.from_pyarrow(pa.Table.from_pylist(rows), session=session) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 9a9f598e89..38b8fb50e3 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -791,10 +791,10 @@ def promote_offsets(self, col_id: str) -> OrderedIR: if ordering.is_sequential and (ordering.total_order_col is not None): expr_builder = self.builder() expr_builder.columns = [ + *self.columns, self._compile_expression( ordering.total_order_col.scalar_expression ).name(col_id), - *self.columns, ] return expr_builder.build() # Cannot nest analytic expressions, so reproject to cte first if needed. diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 80d5f5a893..950c2c2cc7 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -81,7 +81,8 @@ def compile_join(self, node: nodes.JoinNode, ordered: bool = True): return bigframes.core.compile.single_column.join_by_column_ordered( left=left_ordered, right=right_ordered, - join=node.join, + type=node.type, + conditions=node.conditions, ) else: left_unordered = self.compile_unordered_ir(node.left_child) @@ -89,7 +90,8 @@ def compile_join(self, node: nodes.JoinNode, ordered: bool = True): return bigframes.core.compile.single_column.join_by_column_unordered( left=left_unordered, right=right_unordered, - join=node.join, + type=node.type, + conditions=node.conditions, ) @_compile_node.register diff --git a/bigframes/core/compile/single_column.py b/bigframes/core/compile/single_column.py index 9b621c9c79..26af969b74 100644 --- a/bigframes/core/compile/single_column.py +++ b/bigframes/core/compile/single_column.py @@ -16,20 +16,23 @@ from __future__ import annotations +from typing import Literal, Tuple + import ibis import ibis.expr.datatypes as ibis_dtypes import ibis.expr.types as ibis_types import bigframes.core.compile.compiled as compiled import bigframes.core.guid as guids -import bigframes.core.join_def as join_defs +import bigframes.core.identifiers as ids import bigframes.core.ordering as orderings def join_by_column_ordered( left: compiled.OrderedIR, right: compiled.OrderedIR, - join: join_defs.JoinDefinition, + conditions: Tuple[Tuple[str, str], ...], + type: Literal["inner", "outer", "left", "right", "cross"], ) -> compiled.OrderedIR: """Join two expressions by column equality. @@ -48,6 +51,11 @@ def join_by_column_ordered( finally, all the right columns. """ + # Do not reset the generator + id_generator = ids.standard_identifiers() + l_value_mapping = dict(zip(left.column_ids, id_generator)) + r_value_mapping = dict(zip(right.column_ids, id_generator)) + l_hidden_mapping = { id: guids.generate_guid("hidden_") for id in left._hidden_column_ids } @@ -55,8 +63,8 @@ def join_by_column_ordered( id: guids.generate_guid("hidden_") for id in right._hidden_column_ids } - l_mapping = {**join.get_left_mapping(), **l_hidden_mapping} - r_mapping = {**join.get_right_mapping(), **r_hidden_mapping} + l_mapping = {**l_value_mapping, **l_hidden_mapping} + r_mapping = {**r_value_mapping, **r_hidden_mapping} left_table = left._to_ibis_expr( ordering_mode="unordered", @@ -71,14 +79,14 @@ def join_by_column_ordered( join_conditions = [ value_to_join_key(left_table[l_mapping[left_index]]) == value_to_join_key(right_table[r_mapping[right_index]]) - for left_index, right_index in join.conditions + for left_index, right_index in conditions ] combined_table = ibis.join( left_table, right_table, predicates=join_conditions, - how=join.type, # type: ignore + how=type, # type: ignore ) # Preserve ordering accross joins. @@ -87,7 +95,7 @@ def join_by_column_ordered( right._ordering, l_mapping, r_mapping, - left_order_dominates=(join.type != "right"), + left_order_dominates=(type != "right"), ) # We could filter out the original join columns, but predicates/ordering @@ -116,7 +124,8 @@ def join_by_column_ordered( def join_by_column_unordered( left: compiled.UnorderedIR, right: compiled.UnorderedIR, - join: join_defs.JoinDefinition, + conditions: Tuple[Tuple[str, str], ...], + type: Literal["inner", "outer", "left", "right", "cross"], ) -> compiled.UnorderedIR: """Join two expressions by column equality. @@ -134,9 +143,9 @@ def join_by_column_unordered( first the coalesced join keys, then, all the left columns, and finally, all the right columns. """ - # Value column mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result - l_mapping = join.get_left_mapping() - r_mapping = join.get_right_mapping() + id_generator = ids.standard_identifiers() + l_mapping = dict(zip(left.column_ids, id_generator)) + r_mapping = dict(zip(right.column_ids, id_generator)) left_table = left._to_ibis_expr( col_id_overrides=l_mapping, ) @@ -146,14 +155,14 @@ def join_by_column_unordered( join_conditions = [ value_to_join_key(left_table[l_mapping[left_index]]) == value_to_join_key(right_table[r_mapping[right_index]]) - for left_index, right_index in join.conditions + for left_index, right_index in conditions ] combined_table = ibis.join( left_table, right_table, predicates=join_conditions, - how=join.type, # type: ignore + how=type, # type: ignore ) # We could filter out the original join columns, but predicates/ordering # might still reference them in implicit joins. diff --git a/bigframes/core/guid.py b/bigframes/core/guid.py index 4eb6c7a9d6..8930d0760a 100644 --- a/bigframes/core/guid.py +++ b/bigframes/core/guid.py @@ -18,4 +18,4 @@ def generate_guid(prefix="col_"): global _GUID_COUNTER _GUID_COUNTER += 1 - return prefix + str(_GUID_COUNTER) + return f"bfuid_{prefix}{_GUID_COUNTER}" diff --git a/bigframes/core/identifiers.py b/bigframes/core/identifiers.py new file mode 100644 index 0000000000..9239c41248 --- /dev/null +++ b/bigframes/core/identifiers.py @@ -0,0 +1,26 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Later, plan on migrating ids to use integers to reduce memory usage allow use of bitmaps to represent column sets + +from typing import Generator + +ID_TYPE = str + + +def standard_identifiers() -> Generator[ID_TYPE, None, None]: + i = 0 + while True: + yield f"col_{i}" + i = i + 1 diff --git a/bigframes/core/join_def.py b/bigframes/core/join_def.py index 4079abc8fa..5b7b7e45dd 100644 --- a/bigframes/core/join_def.py +++ b/bigframes/core/join_def.py @@ -15,7 +15,9 @@ import dataclasses import enum -from typing import Literal, Mapping, NamedTuple, Tuple +from typing import Literal, NamedTuple + +import bigframes.core.identifiers as ids class JoinSide(enum.Enum): @@ -32,42 +34,21 @@ def inverse(self) -> JoinSide: class JoinCondition(NamedTuple): - left_id: str - right_id: str + left_id: ids.ID_TYPE + right_id: ids.ID_TYPE @dataclasses.dataclass(frozen=True) class JoinColumnMapping: source_table: JoinSide - source_id: str - destination_id: str + source_id: ids.ID_TYPE + destination_id: ids.ID_TYPE @dataclasses.dataclass(frozen=True) class CoalescedColumnMapping: """Special column mapping used only by implicit joiner only""" - left_source_id: str - right_source_id: str - destination_id: str - - -@dataclasses.dataclass(frozen=True) -class JoinDefinition: - conditions: Tuple[JoinCondition, ...] - mappings: Tuple[JoinColumnMapping, ...] - type: JoinType - - def get_left_mapping(self) -> Mapping[str, str]: - return { - i.source_id: i.destination_id - for i in self.mappings - if i.source_table == JoinSide.LEFT - } - - def get_right_mapping(self) -> Mapping[str, str]: - return { - i.source_id: i.destination_id - for i in self.mappings - if i.source_table == JoinSide.RIGHT - } + left_source_id: ids.ID_TYPE + right_source_id: ids.ID_TYPE + destination_id: ids.ID_TYPE diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 27e76c7910..e90ecd06b6 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -26,7 +26,7 @@ import bigframes.core.expression as ex import bigframes.core.guid -from bigframes.core.join_def import JoinColumnMapping, JoinDefinition, JoinSide +import bigframes.core.identifiers as bfet_ids from bigframes.core.ordering import OrderingExpression import bigframes.core.schema as schemata import bigframes.core.window_spec as window @@ -206,7 +206,8 @@ def order_ambiguous(self) -> bool: class JoinNode(BigFrameNode): left_child: BigFrameNode right_child: BigFrameNode - join: JoinDefinition + conditions: typing.Tuple[typing.Tuple[str, str], ...] + type: typing.Literal["inner", "outer", "left", "right", "cross"] @property def row_preserving(self) -> bool: @@ -233,19 +234,14 @@ def __hash__(self): @functools.cached_property def schema(self) -> schemata.ArraySchema: - def join_mapping_to_schema_item(mapping: JoinColumnMapping): - result_id = mapping.destination_id - result_dtype = ( - self.left_child.schema.get_type(mapping.source_id) - if mapping.source_table == JoinSide.LEFT - else self.right_child.schema.get_type(mapping.source_id) - ) - return schemata.SchemaItem(result_id, result_dtype) - - items = tuple( - join_mapping_to_schema_item(mapping) for mapping in self.join.mappings + items = [] + schema_items = itertools.chain( + self.left_child.schema.items, self.right_child.schema.items ) - return schemata.ArraySchema(items) + identifiers = bfet_ids.standard_identifiers() + for id, item in zip(identifiers, schema_items): + items.append(schemata.SchemaItem(id, item.dtype)) + return schemata.ArraySchema(tuple(items)) @functools.cached_property def variables_introduced(self) -> int: @@ -545,7 +541,7 @@ def non_local(self) -> bool: @property def schema(self) -> schemata.ArraySchema: - return self.child.schema.prepend( + return self.child.schema.append( schemata.SchemaItem(self.col_id, bigframes.dtypes.INT_DTYPE) ) @@ -626,6 +622,10 @@ def relation_ops_created(self) -> int: class SelectionNode(UnaryNode): input_output_pairs: typing.Tuple[typing.Tuple[str, str], ...] + def __post_init__(self): + for input, _ in self.input_output_pairs: + assert input in self.child.schema.names + def __hash__(self): return self._node_hash diff --git a/bigframes/core/schema.py b/bigframes/core/schema.py index ee27c6ff30..03e4de8993 100644 --- a/bigframes/core/schema.py +++ b/bigframes/core/schema.py @@ -92,3 +92,6 @@ def update_dtype( def get_type(self, id: ColumnIdentifierType): return self._mapping[id] + + def __len__(self) -> int: + return len(self.items)