Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

refactor: Simplify legacy aligner #1217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
Loading
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 107 additions & 16 deletions 123 bigframes/core/rewrite/implicit_align.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import bigframes.core.identifiers
import bigframes.core.join_def
import bigframes.core.nodes
import bigframes.core.ordering
import bigframes.core.rewrite.predicates
import bigframes.core.window_spec
import bigframes.operations.aggregations

Expand All @@ -30,12 +32,23 @@
bigframes.core.nodes.WindowOpNode,
bigframes.core.nodes.PromoteOffsetsNode,
)
ORDERING_NODES = (
bigframes.core.nodes.OrderByNode,
bigframes.core.nodes.ReversedNode,
)
# Combination of selects and additive nodes can be merged as an explicit keyless "row join"
ALIGNABLE_NODES = (
*ADDITIVE_NODES,
bigframes.core.nodes.SelectionNode,
)

Predicates = Tuple[bigframes.core.expression.Expression, ...]
Selection = Tuple[
Tuple[bigframes.core.expression.DerefOp, bigframes.core.identifiers.ColumnId],
...,
]
Ordering = Tuple[bigframes.core.ordering.OrderingExpression, ...]


@dataclasses.dataclass(frozen=True)
class ExpressionSpec:
Expand Down Expand Up @@ -68,6 +81,15 @@ def get_expression_spec(
expression = expression.bind_refs(
proj_mappings, allow_partial_bindings=True
)
elif isinstance(
curr_node,
(
bigframes.core.nodes.FilterNode,
bigframes.core.nodes.OrderByNode,
bigframes.core.nodes.ReversedNode,
),
):
pass
elif isinstance(
curr_node,
(
Expand Down Expand Up @@ -116,24 +138,29 @@ def try_row_join(
) # Rename only right vars to avoid collisions with left vars
combined_selection = (*l_selection, *r_selection)

def _linearize_trees(
base_tree: bigframes.core.nodes.BigFrameNode,
append_tree: bigframes.core.nodes.BigFrameNode,
) -> bigframes.core.nodes.BigFrameNode:
"""Linearize two divergent tree who only diverge through different additive nodes."""
# base case: append tree does not have any divergent nodes to linearize
if append_tree == divergent_node:
return base_tree
else:
assert isinstance(append_tree, ADDITIVE_NODES)
return append_tree.replace_child(
_linearize_trees(base_tree, append_tree.child)
)

merged_node = _linearize_trees(l_node, r_node)
merged_node = linearize_trees(l_node, r_node, divergent_node)
return bigframes.core.nodes.SelectionNode(merged_node, combined_selection)


def linearize_trees(
base_tree: bigframes.core.nodes.BigFrameNode,
append_tree: bigframes.core.nodes.BigFrameNode,
divergent_node: Optional[bigframes.core.nodes.BigFrameNode] = None,
) -> bigframes.core.nodes.BigFrameNode:
if divergent_node is None:
divergent_node = first_shared_descendent(base_tree, append_tree, ADDITIVE_NODES)

"""Linearize two divergent tree who only diverge through different additive nodes."""
# base case: append tree does not have any divergent nodes to linearize
if append_tree == divergent_node:
return base_tree
else:
assert isinstance(append_tree, ADDITIVE_NODES)
return append_tree.replace_child(
linearize_trees(base_tree, append_tree.child, divergent_node)
)


def pull_up_selection(
node: bigframes.core.nodes.BigFrameNode,
stop: bigframes.core.nodes.BigFrameNode,
Expand Down Expand Up @@ -161,7 +188,7 @@ def pull_up_selection(
(bigframes.core.expression.DerefOp(field.id), field.id)
for field in node.fields
)
assert isinstance(node, (bigframes.core.nodes.SelectionNode, *ADDITIVE_NODES))
assert isinstance(node, bigframes.core.nodes.UnaryNode)
child_node, child_selections = pull_up_selection(
node.child, stop, rename_vars=rename_vars
)
Expand All @@ -187,6 +214,8 @@ def pull_up_selection(
)
new_selection = (*child_selections, *added_selections)
return new_node, new_selection
elif isinstance(node, (*ORDERING_NODES, bigframes.core.nodes.FilterNode)):
return node.replace_child(child_node).remap_refs(mapping), child_selections
elif isinstance(node, bigframes.core.nodes.SelectionNode):
new_selection = tuple(
(
Expand All @@ -199,6 +228,68 @@ def pull_up_selection(
raise ValueError(f"Couldn't pull up select from node: {node}")


def pull_up_filters(
root: bigframes.core.nodes.BigFrameNode,
stop: bigframes.core.nodes.BigFrameNode,
) -> Tuple[bigframes.core.nodes.BigFrameNode, Predicates]:
"""Pull filter nodes out of a tree section."""
if root == stop:
return root, ()
elif isinstance(root, bigframes.core.nodes.FilterNode):
this_mask = bigframes.core.rewrite.predicates.decompose_conjunction(
root.predicate
)
child_result, child_mask = pull_up_filters(root.child, stop)
return child_result, (*child_mask, *this_mask)
elif isinstance(root, ADDITIVE_NODES):
assert isinstance(root, ADDITIVE_NODES)
child_result, child_mask = pull_up_filters(root.child, stop)
return (
bigframes.core.rewrite.predicates.mask_node(
root.replace_child(child_result), child_mask
),
child_mask,
)
raise ValueError(f"Unexpected node: {root}")


def pull_up_order(
root: bigframes.core.nodes.BigFrameNode,
stop: bigframes.core.nodes.BigFrameNode,
) -> Tuple[bigframes.core.nodes.BigFrameNode, Ordering]:
"""Pull filter nodes out of a tree section."""
if root == stop:
return root, ()
elif isinstance(root, bigframes.core.nodes.ReversedNode):
child_result, child_order = pull_up_order(root.child, stop)
return root.replace_child(child_result), tuple(
order_part.with_reverse() for order_part in child_order
)
elif isinstance(root, bigframes.core.nodes.OrderByNode):
child_result, child_order = pull_up_order(root.child, stop)
return child_result, (*root.by, *child_order)
elif isinstance(root, bigframes.core.nodes.ProjectionNode):
child_result, child_order = pull_up_order(root.child, stop)
return root.replace_child(child_result), child_order
raise ValueError(f"Unexpected node: {root}")


def pull_up_reverse(
root: bigframes.core.nodes.BigFrameNode,
stop: bigframes.core.nodes.BigFrameNode,
) -> Tuple[bigframes.core.nodes.BigFrameNode, bool]:
"""Pull filter nodes out of a tree section."""
if root == stop:
return root, False
elif isinstance(root, bigframes.core.nodes.ReversedNode):
child_result, child_reverse = pull_up_reverse(root.child, stop)
return child_result, not child_reverse
elif isinstance(root, ADDITIVE_NODES):
child_result, child_reverse = pull_up_reverse(root.child, stop)
return root.replace_child(child_result), child_reverse
raise ValueError(f"Unexpected node: {root}")


## Traversal helpers
def first_shared_descendent(
left: bigframes.core.nodes.BigFrameNode,
Expand Down
Loading
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.