From 3d94c1437da95bd57b462ffac6f8d71dac1d531f Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 17 Jan 2024 00:24:05 +0000 Subject: [PATCH 1/3] refactor: simplify filter and join nodes --- bigframes/core/__init__.py | 33 +++++-------- bigframes/core/blocks.py | 35 +++++++++---- bigframes/core/compile/compiled.py | 32 +++--------- bigframes/core/compile/compiler.py | 18 +++---- bigframes/core/compile/row_identity.py | 41 +++++++++------- bigframes/core/compile/single_column.py | 61 ++++++++--------------- bigframes/core/indexes/index.py | 65 ++++++++++++++++++++----- bigframes/core/join_def.py | 58 ++++++++++++++++++++++ bigframes/core/joins/__init__.py | 4 +- bigframes/core/joins/name_resolution.py | 5 -- bigframes/core/nodes.py | 14 ++---- 11 files changed, 210 insertions(+), 156 deletions(-) create mode 100644 bigframes/core/join_def.py diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 8c08698b93..8c08d073d7 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -16,7 +16,7 @@ from dataclasses import dataclass import io import typing -from typing import Iterable, Literal, Sequence +from typing import Iterable, Sequence import ibis.expr.types as ibis_types import pandas @@ -24,12 +24,14 @@ import bigframes.core.compile as compiling import bigframes.core.expression as ex import bigframes.core.guid +import bigframes.core.join_def as join_def import bigframes.core.nodes as nodes from bigframes.core.ordering import OrderingColumnReference import bigframes.core.ordering as orderings import bigframes.core.utils from bigframes.core.window_spec import WindowSpec import bigframes.dtypes +import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops import bigframes.session._io.bigquery @@ -114,13 +116,15 @@ def row_count(self) -> ArrayValue: return ArrayValue(nodes.RowCountNode(child=self.node)) # Operations - def filter(self, predicate_id: str, keep_null: bool = False) -> ArrayValue: + def filter_by_id(self, predicate_id: str, keep_null: bool = False) -> ArrayValue: """Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression.""" - return ArrayValue( - nodes.FilterNode( - child=self.node, predicate_id=predicate_id, keep_null=keep_null - ) - ) + predicate = ex.free_var(predicate_id) + if keep_null: + predicate = ops.fillna_op.as_expr(predicate, ex.const(True)) + return self.filter(predicate) + + def filter(self, predicate: ex.Expression): + return ArrayValue(nodes.FilterNode(child=self.node, predicate=predicate)) def order_by(self, by: Sequence[OrderingColumnReference]) -> ArrayValue: return ArrayValue(nodes.OrderByNode(child=self.node, by=tuple(by))) @@ -356,26 +360,15 @@ def unpivot( def join( self, - self_column_ids: typing.Sequence[str], other: ArrayValue, - other_column_ids: typing.Sequence[str], - *, - how: Literal[ - "inner", - "left", - "outer", - "right", - "cross", - ], + join_def: join_def.JoinDefinition, allow_row_identity_join: bool = True, ): return ArrayValue( nodes.JoinNode( left_child=self.node, right_child=other.node, - left_column_ids=tuple(self_column_ids), - right_column_ids=tuple(other_column_ids), - how=how, + join=join_def, allow_row_identity_join=allow_row_identity_join, ) ) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 8c59f8106b..0e64a73997 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -38,7 +38,7 @@ import bigframes.core.expression as ex import bigframes.core.guid as guid import bigframes.core.indexes as indexes -import bigframes.core.joins.name_resolution as join_names +import bigframes.core.join_def as join_defs import bigframes.core.ordering as ordering import bigframes.core.utils import bigframes.core.utils as utils @@ -826,7 +826,7 @@ def assign_label(self, column_id: str, new_label: Label) -> Block: def filter(self, column_id: str, keep_null: bool = False): return Block( - self._expr.filter(column_id, keep_null), + self._expr.filter_by_id(column_id, keep_null), index_columns=self.index_columns, column_labels=self.column_labels, index_labels=self.index.names, @@ -1542,19 +1542,34 @@ def merge( sort: bool, suffixes: tuple[str, str] = ("_x", "_y"), ) -> Block: - joined_expr = self.expr.join( - left_join_ids, - other.expr, - right_join_ids, - how=how, - ) - get_column_left, get_column_right = join_names.JOIN_NAME_REMAPPER( - self.expr.column_ids, other.expr.column_ids + left_mappings = [ + join_defs.JoinColumnMapping( + join_defs.JoinSide.LEFT, id, guid.generate_guid() + ) + for id in self.expr.column_ids + ] + right_mappings = [ + join_defs.JoinColumnMapping( + join_defs.JoinSide.RIGHT, id, guid.generate_guid() + ) + for id in other.expr.column_ids + ] + + join_def = join_defs.JoinDefinition( + conditions=tuple( + join_defs.JoinCondition(left, right) + for left, right in zip(left_join_ids, right_join_ids) + ), + mappings=(*left_mappings, *right_mappings), + type=how, ) + joined_expr = self.expr.join(other.expr, join_def=join_def) result_columns = [] matching_join_labels = [] coalesced_ids = [] + get_column_left = join_def.get_left_mapping() + get_column_right = join_def.get_right_mapping() for left_id, right_id in zip(left_join_ids, right_join_ids): coalesced_id = guid.generate_guid() joined_expr = joined_expr.project_to_id( diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 2cab6fb95d..eaaf692a17 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -96,8 +96,8 @@ def _reduced_predicate(self) -> typing.Optional[ibis_types.BooleanValue]: ) @abc.abstractmethod - def filter(self: T, predicate_id: str, keep_null: bool = False) -> T: - """Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression.""" + def filter(self: T, predicate: ex.Expression) -> T: + """Filter the table on a given expression, the predicate must be a boolean expression.""" ... @abc.abstractmethod @@ -305,17 +305,9 @@ def _to_ibis_expr( table = table.filter(ibis.random() < ibis.literal(fraction)) return table - def filter(self, predicate_id: str, keep_null: bool = False) -> UnorderedIR: - condition = typing.cast( - ibis_types.BooleanValue, self._get_ibis_column(predicate_id) - ) - if keep_null: - condition = typing.cast( - ibis_types.BooleanValue, - condition.fillna( - typing.cast(ibis_types.BooleanScalar, ibis_types.literal(True)) - ), - ) + def filter(self, predicate: ex.Expression) -> UnorderedIR: + bindings = {col: self._get_ibis_column(col) for col in self.column_ids} + condition = op_compiler.compile_expression(predicate, bindings) return self._filter(condition) def _filter(self, predicate_value: ibis_types.BooleanValue) -> UnorderedIR: @@ -1140,17 +1132,9 @@ def _to_ibis_expr( table = table.filter(ibis.random() < ibis.literal(fraction)) return table - def filter(self, predicate_id: str, keep_null: bool = False) -> OrderedIR: - condition = typing.cast( - ibis_types.BooleanValue, self._get_ibis_column(predicate_id) - ) - if keep_null: - condition = typing.cast( - ibis_types.BooleanValue, - condition.fillna( - typing.cast(ibis_types.BooleanScalar, ibis_types.literal(True)) - ), - ) + def filter(self, predicate: ex.Expression) -> OrderedIR: + bindings = {col: self._get_ibis_column(col) for col in self.column_ids} + condition = op_compiler.compile_expression(predicate, bindings) return self._filter(condition) def _filter(self, predicate_value: ibis_types.BooleanValue) -> OrderedIR: diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 18fcd73d19..c948f0bdef 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -59,22 +59,18 @@ def compile_join(node: nodes.JoinNode, ordered: bool = True): left_ordered = compile_ordered(node.left_child) right_ordered = compile_ordered(node.right_child) return bigframes.core.compile.single_column.join_by_column_ordered( - left_ordered, - node.left_column_ids, - right_ordered, - node.right_column_ids, - how=node.how, + left=left_ordered, + right=right_ordered, + join=node.join, allow_row_identity_join=node.allow_row_identity_join, ) else: left_unordered = compile_unordered(node.left_child) right_unordered = compile_unordered(node.right_child) return bigframes.core.compile.single_column.join_by_column_unordered( - left_unordered, - node.left_column_ids, - right_unordered, - node.right_column_ids, - how=node.how, + left=left_unordered, + right=right_unordered, + join=node.join, allow_row_identity_join=node.allow_row_identity_join, ) @@ -113,7 +109,7 @@ def compile_promote_offsets(node: nodes.PromoteOffsetsNode, ordered: bool = True @_compile_node.register def compile_filter(node: nodes.FilterNode, ordered: bool = True): - return compile_node(node.child, ordered).filter(node.predicate_id, node.keep_null) + return compile_node(node.child, ordered).filter(node.predicate) @_compile_node.register diff --git a/bigframes/core/compile/row_identity.py b/bigframes/core/compile/row_identity.py index 7a87a435fe..f46e2f9463 100644 --- a/bigframes/core/compile/row_identity.py +++ b/bigframes/core/compile/row_identity.py @@ -24,6 +24,7 @@ import bigframes.constants as constants import bigframes.core.compile.compiled as compiled +import bigframes.core.join_def as join_def import bigframes.core.joins as joining import bigframes.core.ordering as orderings @@ -33,11 +34,10 @@ def join_by_row_identity_unordered( left: compiled.UnorderedIR, right: compiled.UnorderedIR, - *, - how: str, + join_def: join_def.JoinDefinition, ) -> compiled.UnorderedIR: """Compute join when we are joining by row identity not a specific column.""" - if how not in SUPPORTED_ROW_IDENTITY_HOW: + if join_def.type not in SUPPORTED_ROW_IDENTITY_HOW: raise NotImplementedError( f"Only how='outer','left','inner' currently supported. {constants.FEEDBACK_LINK}" ) @@ -60,17 +60,20 @@ def join_by_row_identity_unordered( combined_predicates = [] if left_predicates or right_predicates: joined_predicates = _join_predicates( - left_predicates, right_predicates, join_type=how + left_predicates, right_predicates, join_type=join_def.type ) combined_predicates = list(joined_predicates) # builder expects mutable list - left_mask = left_relative_predicates if how in ["right", "outer"] else None - right_mask = right_relative_predicates if how in ["left", "outer"] else None + left_mask = ( + left_relative_predicates if join_def.type in ["right", "outer"] else None + ) + right_mask = ( + right_relative_predicates if join_def.type in ["left", "outer"] else None + ) # Public mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result - map_left_id, map_right_id = joining.JOIN_NAME_REMAPPER( - left.column_ids, right.column_ids - ) + map_left_id = join_def.get_left_mapping() + map_right_id = join_def.get_right_mapping() joined_columns = [ _mask_value(left._get_ibis_column(key), left_mask).name(map_left_id[key]) for key in left.column_ids @@ -90,11 +93,10 @@ def join_by_row_identity_unordered( def join_by_row_identity_ordered( left: compiled.OrderedIR, right: compiled.OrderedIR, - *, - how: str, + join_def: join_def.JoinDefinition, ) -> compiled.OrderedIR: """Compute join when we are joining by row identity not a specific column.""" - if how not in SUPPORTED_ROW_IDENTITY_HOW: + if join_def.type not in SUPPORTED_ROW_IDENTITY_HOW: raise NotImplementedError( f"Only how='outer','left','inner' currently supported. {constants.FEEDBACK_LINK}" ) @@ -117,17 +119,20 @@ def join_by_row_identity_ordered( combined_predicates = [] if left_predicates or right_predicates: joined_predicates = _join_predicates( - left_predicates, right_predicates, join_type=how + left_predicates, right_predicates, join_type=join_def.type ) combined_predicates = list(joined_predicates) # builder expects mutable list - left_mask = left_relative_predicates if how in ["right", "outer"] else None - right_mask = right_relative_predicates if how in ["left", "outer"] else None + left_mask = ( + left_relative_predicates if join_def.type in ["right", "outer"] else None + ) + right_mask = ( + right_relative_predicates if join_def.type in ["left", "outer"] else None + ) # Public mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result - lpublicmapping, rpublicmapping = joining.JOIN_NAME_REMAPPER( - left.column_ids, right.column_ids - ) + lpublicmapping = join_def.get_left_mapping() + rpublicmapping = join_def.get_right_mapping() lhiddenmapping, rhiddenmapping = joining.JoinNameRemapper(namespace="hidden")( left._hidden_column_ids, right._hidden_column_ids ) diff --git a/bigframes/core/compile/single_column.py b/bigframes/core/compile/single_column.py index a9088feb49..d26e71d1b4 100644 --- a/bigframes/core/compile/single_column.py +++ b/bigframes/core/compile/single_column.py @@ -16,8 +16,7 @@ from __future__ import annotations -import typing -from typing import Literal, Mapping +from typing import Mapping import ibis import ibis.expr.datatypes as ibis_dtypes @@ -25,23 +24,15 @@ import bigframes.core.compile.compiled as compiled import bigframes.core.compile.row_identity +import bigframes.core.join_def as join_defs import bigframes.core.joins as joining import bigframes.core.ordering as orderings def join_by_column_ordered( left: compiled.OrderedIR, - left_column_ids: typing.Sequence[str], right: compiled.OrderedIR, - right_column_ids: typing.Sequence[str], - *, - how: Literal[ - "inner", - "left", - "outer", - "right", - "cross", - ], + join: join_defs.JoinDefinition, allow_row_identity_join: bool = True, ) -> compiled.OrderedIR: """Join two expressions by column equality. @@ -62,7 +53,7 @@ def join_by_column_ordered( """ if ( allow_row_identity_join - and how in bigframes.core.compile.row_identity.SUPPORTED_ROW_IDENTITY_HOW + and join.type in bigframes.core.compile.row_identity.SUPPORTED_ROW_IDENTITY_HOW and left._table.equals(right._table) # Make sure we're joining on exactly the same column(s), at least with # regards to value its possible that they both have the same names but @@ -71,22 +62,18 @@ def join_by_column_ordered( left._get_ibis_column(lcol) .name("index") .equals(right._get_ibis_column(rcol).name("index")) - for lcol, rcol in zip(left_column_ids, right_column_ids) + for lcol, rcol in join.conditions ) ): return bigframes.core.compile.row_identity.join_by_row_identity_ordered( - left, right, how=how + left, right, join_def=join ) else: - # Value column mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result - l_public_mapping, r_public_mapping = joining.JOIN_NAME_REMAPPER( - left.column_ids, right.column_ids - ) l_hidden_mapping, r_hidden_mapping = joining.JoinNameRemapper( namespace="hidden" )(left._hidden_column_ids, right._hidden_column_ids) - l_mapping = {**l_public_mapping, **l_hidden_mapping} - r_mapping = {**r_public_mapping, **r_hidden_mapping} + l_mapping = {**join.get_left_mapping(), **l_hidden_mapping} + r_mapping = {**join.get_right_mapping(), **r_hidden_mapping} left_table = left._to_ibis_expr( ordering_mode="unordered", @@ -101,14 +88,14 @@ def join_by_column_ordered( join_conditions = [ value_to_join_key(left_table[l_mapping[left_index]]) == value_to_join_key(right_table[r_mapping[right_index]]) - for left_index, right_index in zip(left_column_ids, right_column_ids) + for left_index, right_index in join.conditions ] combined_table = ibis.join( left_table, right_table, predicates=join_conditions, - how=how, # type: ignore + how=join.type, # type: ignore ) # Preserve ordering accross joins. @@ -117,7 +104,7 @@ def join_by_column_ordered( right._ordering, l_mapping, r_mapping, - left_order_dominates=(how != "right"), + left_order_dominates=(join.type != "right"), ) # We could filter out the original join columns, but predicates/ordering @@ -145,17 +132,8 @@ def join_by_column_ordered( def join_by_column_unordered( left: compiled.UnorderedIR, - left_column_ids: typing.Sequence[str], right: compiled.UnorderedIR, - right_column_ids: typing.Sequence[str], - *, - how: Literal[ - "inner", - "left", - "outer", - "right", - "cross", - ], + join: join_defs.JoinDefinition, allow_row_identity_join: bool = True, ) -> compiled.UnorderedIR: """Join two expressions by column equality. @@ -176,7 +154,7 @@ def join_by_column_unordered( """ if ( allow_row_identity_join - and how in bigframes.core.compile.row_identity.SUPPORTED_ROW_IDENTITY_HOW + and join.type in bigframes.core.compile.row_identity.SUPPORTED_ROW_IDENTITY_HOW and left._table.equals(right._table) # Make sure we're joining on exactly the same column(s), at least with # regards to value its possible that they both have the same names but @@ -185,17 +163,16 @@ def join_by_column_unordered( left._get_ibis_column(lcol) .name("index") .equals(right._get_ibis_column(rcol).name("index")) - for lcol, rcol in zip(left_column_ids, right_column_ids) + for lcol, rcol in join.conditions ) ): return bigframes.core.compile.row_identity.join_by_row_identity_unordered( - left, right, how=how + left, right, join_def=join ) else: # Value column mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result - l_mapping, r_mapping = joining.JOIN_NAME_REMAPPER( - left.column_ids, right.column_ids - ) + l_mapping = join.get_left_mapping() + r_mapping = join.get_right_mapping() left_table = left._to_ibis_expr( col_id_overrides=l_mapping, ) @@ -205,14 +182,14 @@ def join_by_column_unordered( join_conditions = [ value_to_join_key(left_table[l_mapping[left_index]]) == value_to_join_key(right_table[r_mapping[right_index]]) - for left_index, right_index in zip(left_column_ids, right_column_ids) + for left_index, right_index in join.conditions ] combined_table = ibis.join( left_table, right_table, predicates=join_conditions, - how=how, # type: ignore + how=join.type, # type: ignore ) # We could filter out the original join columns, but predicates/ordering # might still reference them in implicit joins. diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index 4ec11cb163..f454ba4452 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -27,7 +27,8 @@ import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.expression as ex -import bigframes.core.joins as joining +import bigframes.core.guid +import bigframes.core.join_def as join_defs import bigframes.core.ordering as order import bigframes.core.utils as utils import bigframes.dtypes @@ -480,16 +481,35 @@ def join_mono_indexed( ) -> Tuple[IndexValue, Tuple[Mapping[str, str], Mapping[str, str]],]: left_expr = left._block.expr right_expr = right._block.expr - get_column_left, get_column_right = joining.JOIN_NAME_REMAPPER( - left_expr.column_ids, right_expr.column_ids + left_mappings = [ + join_defs.JoinColumnMapping( + join_defs.JoinSide.LEFT, id, bigframes.core.guid.generate_guid() + ) + for id in left_expr.column_ids + ] + right_mappings = [ + join_defs.JoinColumnMapping( + join_defs.JoinSide.RIGHT, id, bigframes.core.guid.generate_guid() + ) + for id in right_expr.column_ids + ] + + join_def = join_defs.JoinDefinition( + conditions=( + join_defs.JoinCondition( + left._block.index_columns[0], right._block.index_columns[0] + ), + ), + mappings=(*left_mappings, *right_mappings), + type=how, ) - combined_expr = left._block.expr.join( - left._block.index_columns, - right._block.expr, - right._block.index_columns, - how=how, + combined_expr = left_expr.join( + right_expr, + join_def=join_def, allow_row_identity_join=(not block_identity_join), ) + get_column_left = join_def.get_left_mapping() + get_column_right = join_def.get_right_mapping() # Drop original indices from each side. and used the coalesced combination generated by the join. left_index = get_column_left[left._block.index_columns[0]] right_index = get_column_right[right._block.index_columns[0]] @@ -538,19 +558,38 @@ def join_multi_indexed( left_expr = left._block.expr right_expr = right._block.expr - get_column_left, get_column_right = joining.JOIN_NAME_REMAPPER( - left_expr.column_ids, right_expr.column_ids + + left_mappings = [ + join_defs.JoinColumnMapping( + join_defs.JoinSide.LEFT, id, bigframes.core.guid.generate_guid() + ) + for id in left_expr.column_ids + ] + right_mappings = [ + join_defs.JoinColumnMapping( + join_defs.JoinSide.RIGHT, id, bigframes.core.guid.generate_guid() + ) + for id in right_expr.column_ids + ] + + join_def = join_defs.JoinDefinition( + conditions=tuple( + join_defs.JoinCondition(left, right) + for left, right in zip(left_join_ids, right_join_ids) + ), + mappings=(*left_mappings, *right_mappings), + type=how, ) combined_expr = left_expr.join( - left_join_ids, right_expr, - right_join_ids, - how=how, + join_def=join_def, # If we're only joining on a subset of the index columns, we need to # perform a true join. allow_row_identity_join=(names_fully_match and not block_identity_join), ) + get_column_left = join_def.get_left_mapping() + get_column_right = join_def.get_right_mapping() left_ids_post_join = [get_column_left[id] for id in left_join_ids] right_ids_post_join = [get_column_right[id] for id in right_join_ids] # Drop original indices from each side. and used the coalesced combination generated by the join. diff --git a/bigframes/core/join_def.py b/bigframes/core/join_def.py new file mode 100644 index 0000000000..fd85677613 --- /dev/null +++ b/bigframes/core/join_def.py @@ -0,0 +1,58 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import dataclasses +import enum +from typing import Literal, Mapping, NamedTuple, Tuple + + +class JoinSide(enum.Enum): + LEFT = 0 + RIGHT = 1 + + +JoinType = Literal["inner", "outer", "left", "right", "cross"] + + +class JoinCondition(NamedTuple): + left_id: str + right_id: str + + +class JoinColumnMapping(NamedTuple): + source_table: JoinSide + source_id: str + destination_id: str + + +@dataclasses.dataclass(frozen=True) +class JoinDefinition: + conditions: Tuple[JoinCondition, ...] + mappings: Tuple[JoinColumnMapping, ...] + type: JoinType + + def get_left_mapping(self) -> Mapping[str, str]: + return { + i.source_id: i.destination_id + for i in self.mappings + if i.source_table == JoinSide.LEFT + } + + def get_right_mapping(self) -> Mapping[str, str]: + return { + i.source_id: i.destination_id + for i in self.mappings + if i.source_table == JoinSide.RIGHT + } diff --git a/bigframes/core/joins/__init__.py b/bigframes/core/joins/__init__.py index 5d407ec22b..415ee4e49d 100644 --- a/bigframes/core/joins/__init__.py +++ b/bigframes/core/joins/__init__.py @@ -15,6 +15,6 @@ """Helpers to join ArrayValue objects.""" from bigframes.core.joins.merge import merge -from bigframes.core.joins.name_resolution import JOIN_NAME_REMAPPER, JoinNameRemapper +from bigframes.core.joins.name_resolution import JoinNameRemapper -__all__ = ("merge", "JoinNameRemapper", "JOIN_NAME_REMAPPER") +__all__ = ("merge", "JoinNameRemapper") diff --git a/bigframes/core/joins/name_resolution.py b/bigframes/core/joins/name_resolution.py index df946b3a59..f648d28ad2 100644 --- a/bigframes/core/joins/name_resolution.py +++ b/bigframes/core/joins/name_resolution.py @@ -39,8 +39,3 @@ def __call__( col: f"{self._namespace}_r_{i}" for i, col in enumerate(right_column_ids) } return new_left_ids, new_right_ids - - -# Defines how column ids are remapped, regardless of join strategy or ordering mode -# Use this remapper for all value column remappings. -JOIN_NAME_REMAPPER = JoinNameRemapper("bfjoin") diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index d30db9a7f7..bf261b62f4 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -23,6 +23,7 @@ import bigframes.core.expression as ex import bigframes.core.guid +from bigframes.core.join_def import JoinDefinition from bigframes.core.ordering import OrderingColumnReference import bigframes.core.window_spec as window import bigframes.dtypes @@ -87,15 +88,7 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: class JoinNode(BigFrameNode): left_child: BigFrameNode right_child: BigFrameNode - left_column_ids: typing.Tuple[str, ...] - right_column_ids: typing.Tuple[str, ...] - how: typing.Literal[ - "inner", - "left", - "outer", - "right", - "cross", - ] + join: JoinDefinition allow_row_identity_join: bool = True @property @@ -155,8 +148,7 @@ def __hash__(self): @dataclass(frozen=True) class FilterNode(UnaryNode): - predicate_id: str - keep_null: bool = False + predicate: ex.Expression def __hash__(self): return self._node_hash From 3482e1af76cd6cb43c0c9d7a6c782a4b9febd1c8 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 17 Jan 2024 20:45:45 +0000 Subject: [PATCH 2/3] force kwarg for JoinColumnMapping constructor --- bigframes/core/blocks.py | 8 ++++++-- bigframes/core/indexes/index.py | 16 ++++++++++++---- bigframes/core/join_def.py | 3 ++- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 0e64a73997..3ec0419c6d 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1544,13 +1544,17 @@ def merge( ) -> Block: left_mappings = [ join_defs.JoinColumnMapping( - join_defs.JoinSide.LEFT, id, guid.generate_guid() + source_table=join_defs.JoinSide.LEFT, + source_id=id, + destination_id=guid.generate_guid(), ) for id in self.expr.column_ids ] right_mappings = [ join_defs.JoinColumnMapping( - join_defs.JoinSide.RIGHT, id, guid.generate_guid() + source_table=join_defs.JoinSide.RIGHT, + source_id=id, + destination_id=guid.generate_guid(), ) for id in other.expr.column_ids ] diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index f454ba4452..8b3613d82c 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -483,13 +483,17 @@ def join_mono_indexed( right_expr = right._block.expr left_mappings = [ join_defs.JoinColumnMapping( - join_defs.JoinSide.LEFT, id, bigframes.core.guid.generate_guid() + source_table=join_defs.JoinSide.LEFT, + source_id=id, + destination_id=bigframes.core.guid.generate_guid(), ) for id in left_expr.column_ids ] right_mappings = [ join_defs.JoinColumnMapping( - join_defs.JoinSide.RIGHT, id, bigframes.core.guid.generate_guid() + source_table=join_defs.JoinSide.RIGHT, + source_id=id, + destination_id=bigframes.core.guid.generate_guid(), ) for id in right_expr.column_ids ] @@ -561,13 +565,17 @@ def join_multi_indexed( left_mappings = [ join_defs.JoinColumnMapping( - join_defs.JoinSide.LEFT, id, bigframes.core.guid.generate_guid() + source_table=join_defs.JoinSide.LEFT, + source_id=id, + destination_id=bigframes.core.guid.generate_guid(), ) for id in left_expr.column_ids ] right_mappings = [ join_defs.JoinColumnMapping( - join_defs.JoinSide.RIGHT, id, bigframes.core.guid.generate_guid() + source_table=join_defs.JoinSide.RIGHT, + source_id=id, + destination_id=bigframes.core.guid.generate_guid(), ) for id in right_expr.column_ids ] diff --git a/bigframes/core/join_def.py b/bigframes/core/join_def.py index fd85677613..aa060ddd1a 100644 --- a/bigframes/core/join_def.py +++ b/bigframes/core/join_def.py @@ -31,7 +31,8 @@ class JoinCondition(NamedTuple): right_id: str -class JoinColumnMapping(NamedTuple): +@dataclasses.dataclass(frozen=True, kw_only=True) +class JoinColumnMapping: source_table: JoinSide source_id: str destination_id: str From 514e5ab1f0450d4493eaaed94053cd8727f7bd8a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 17 Jan 2024 20:55:21 +0000 Subject: [PATCH 3/3] remove kw_only dataclass arg not compatible with old python --- bigframes/core/join_def.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/join_def.py b/bigframes/core/join_def.py index aa060ddd1a..4646a0d6ae 100644 --- a/bigframes/core/join_def.py +++ b/bigframes/core/join_def.py @@ -31,7 +31,7 @@ class JoinCondition(NamedTuple): right_id: str -@dataclasses.dataclass(frozen=True, kw_only=True) +@dataclasses.dataclass(frozen=True) class JoinColumnMapping: source_table: JoinSide source_id: str