Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

perf: Improve isin performance #1203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 29, 2025
12 changes: 12 additions & 0 deletions 12 bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,18 @@ def project_window_op(
output_name,
)

def isin(
self, other: ArrayValue, lcol: str, rcol: str
) -> typing.Tuple[ArrayValue, str]:
node = nodes.InNode(
self.node,
other.node,
ex.deref(lcol),
ex.deref(rcol),
indicator_col=ids.ColumnId.unique(),
)
return ArrayValue(node), node.indicator_col.name

def relational_join(
self,
other: ArrayValue,
Expand Down
16 changes: 4 additions & 12 deletions 16 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2036,23 +2036,15 @@ def isin(self, other: Block):
return block

def _isin_inner(self: Block, col: str, unique_values: core.ArrayValue) -> Block:
unique_values, const = unique_values.create_constant(
True, dtype=bigframes.dtypes.BOOL_DTYPE
)
expr, (l_map, r_map) = self._expr.relational_join(
unique_values, ((col, unique_values.column_ids[0]),), type="left"
)
expr, matches = expr.project_to_id(ops.notnull_op.as_expr(r_map[const]))
expr, matches = self._expr.isin(unique_values, col, unique_values.column_ids[0])

new_index_cols = tuple(l_map[idx_col] for idx_col in self.index_columns)
new_value_cols = tuple(
l_map[val_col] if val_col != col else matches
for val_col in self.value_columns
val_col if val_col != col else matches for val_col in self.value_columns
)
expr = expr.select_columns((*new_index_cols, *new_value_cols))
expr = expr.select_columns((*self.index_columns, *new_value_cols))
return Block(
expr,
index_columns=new_index_cols,
index_columns=self.index_columns,
column_labels=self.column_labels,
index_labels=self._index_labels,
)
Expand Down
12 changes: 12 additions & 0 deletions 12 bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import bigframes.core.compile.concat as concat_impl
import bigframes.core.compile.explode
import bigframes.core.compile.ibis_types
import bigframes.core.compile.isin
import bigframes.core.compile.scalar_op_compiler
import bigframes.core.compile.scalar_op_compiler as compile_scalar
import bigframes.core.compile.schema_translator
Expand Down Expand Up @@ -128,6 +129,17 @@ def compile_join(self, node: nodes.JoinNode):
conditions=condition_pairs,
)

@_compile_node.register
def compile_isin(self, node: nodes.InNode):
left_unordered = self.compile_node(node.left_child)
right_unordered = self.compile_node(node.right_child)
return bigframes.core.compile.isin.isin_unordered(
left=left_unordered,
right=right_unordered,
indicator_col=node.indicator_col.sql,
conditions=(node.left_col.id.sql, node.right_col.id.sql),
)

@_compile_node.register
def compile_fromrange(self, node: nodes.FromRangeNode):
# Both start and end are single elements and do not inherently have an order
Expand Down
71 changes: 71 additions & 0 deletions 71 bigframes/core/compile/isin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers to join ArrayValue objects."""

from __future__ import annotations

import itertools
from typing import Tuple

import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
import bigframes_vendored.ibis.expr.types as ibis_types

import bigframes.core.compile.compiled as compiled


def isin_unordered(
left: compiled.UnorderedIR,
right: compiled.UnorderedIR,
indicator_col: str,
conditions: Tuple[str, str],
) -> compiled.UnorderedIR:
"""Join two expressions by column equality.

Arguments:
left: Expression for left table to join.
right: Expression for right table to join.
conditions: Id pairs to compare
Returns:
The joined expression.
"""
left_table = left._to_ibis_expr()
right_table = right._to_ibis_expr()
new_column = (
value_to_join_key(left_table[conditions[0]])
.isin(value_to_join_key(right_table[conditions[1]]))
.name(indicator_col)
)

columns = tuple(
itertools.chain(
(left_table[col.get_name()] for col in left.columns), (new_column,)
)
)

return compiled.UnorderedIR(
left_table,
columns=columns,
)


def value_to_join_key(value: ibis_types.Value):
"""Converts nullable values to non-null string SQL will not match null keys together - but pandas does."""
if not value.type().is_string():
value = value.cast(ibis_dtypes.str)
return (
value.fill_null(ibis_types.literal("$NULL_SENTINEL$"))
if hasattr(value, "fill_null")
else value.fillna(ibis_types.literal("$NULL_SENTINEL$"))
)
161 changes: 158 additions & 3 deletions 161 bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ def explicitly_ordered(self) -> bool:
"""
...

@functools.cached_property
def height(self) -> int:
if len(self.child_nodes) == 0:
return 0
return max(child.height for child in self.child_nodes) + 1

@functools.cached_property
def total_variables(self) -> int:
return self.variables_introduced + sum(
Expand Down Expand Up @@ -284,6 +290,34 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
return self.transform_children(lambda x: x.prune(used_cols))


class AdditiveNode:
"""Definition of additive - if you drop added_fields, you end up with the descendent.

.. code-block:: text

AdditiveNode (fields: a, b, c; added_fields: c)
|
| additive_base
V
BigFrameNode (fields: a, b)

"""

@property
@abc.abstractmethod
def added_fields(self) -> Tuple[Field, ...]:
...

@property
@abc.abstractmethod
def additive_base(self) -> BigFrameNode:
...

@abc.abstractmethod
def replace_additive_base(self, BigFrameNode):
...


@dataclasses.dataclass(frozen=True, eq=False)
class UnaryNode(BigFrameNode):
child: BigFrameNode
Expand Down Expand Up @@ -381,6 +415,106 @@ def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]):
return self


@dataclasses.dataclass(frozen=True, eq=False)
class InNode(BigFrameNode, AdditiveNode):
"""
Special Join Type that only returns rows from the left side, as well as adding a bool column indicating whether a match exists on the right side.

Modelled separately from join node, as this operation preserves row identity.
"""

left_child: BigFrameNode
right_child: BigFrameNode
left_col: ex.DerefOp
right_col: ex.DerefOp
indicator_col: bfet_ids.ColumnId

def _validate(self):
assert not (
set(self.left_child.ids) & set(self.right_child.ids)
), "Join ids collide"

@property
def row_preserving(self) -> bool:
return False

@property
def non_local(self) -> bool:
return True

@property
def child_nodes(self) -> typing.Sequence[BigFrameNode]:
return (self.left_child, self.right_child)

@property
def order_ambiguous(self) -> bool:
return False

@property
def explicitly_ordered(self) -> bool:
# Preserves left ordering always
return True

@property
def added_fields(self) -> Tuple[Field, ...]:
return (Field(self.indicator_col, bigframes.dtypes.BOOL_DTYPE),)

@property
def fields(self) -> Iterable[Field]:
return itertools.chain(
self.left_child.fields,
self.added_fields,
)

@functools.cached_property
def variables_introduced(self) -> int:
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
return 1

@property
def joins(self) -> bool:
return True

@property
def row_count(self) -> Optional[int]:
return self.left_child.row_count

@property
def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]:
return (self.indicator_col,)

@property
def additive_base(self) -> BigFrameNode:
return self.left_child

def replace_additive_base(self, node: BigFrameNode):
return dataclasses.replace(self, left_child=node)
Comment on lines +490 to +491
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking aloud: I wonder if there's some way we can organize these sorts of tree transformations so that it's easier to reason about which can be applied in which order?


def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
transformed = dataclasses.replace(
self, left_child=t(self.left_child), right_child=t(self.right_child)
)
if self == transformed:
# reusing existing object speeds up eq, and saves a small amount of memory
return self
return transformed

def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
return self

def remap_vars(
self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]
) -> BigFrameNode:
return dataclasses.replace(
self, indicator_col=mappings.get(self.indicator_col, self.indicator_col)
)

def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]):
return dataclasses.replace(self, left_col=self.left_col.remap_column_refs(mappings, allow_partial_bindings=True), right_col=self.right_col.remap_column_refs(mappings, allow_partial_bindings=True)) # type: ignore


@dataclasses.dataclass(frozen=True, eq=False)
class JoinNode(BigFrameNode):
left_child: BigFrameNode
Expand Down Expand Up @@ -926,7 +1060,7 @@ class CachedTableNode(ReadTableNode):

# Unary nodes
@dataclasses.dataclass(frozen=True, eq=False)
class PromoteOffsetsNode(UnaryNode):
class PromoteOffsetsNode(UnaryNode, AdditiveNode):
col_id: bigframes.core.identifiers.ColumnId

@property
Expand Down Expand Up @@ -959,6 +1093,13 @@ def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]:
def added_fields(self) -> Tuple[Field, ...]:
return (Field(self.col_id, bigframes.dtypes.INT_DTYPE),)

@property
def additive_base(self) -> BigFrameNode:
return self.child

def replace_additive_base(self, node: BigFrameNode):
return dataclasses.replace(self, child=node)

def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
if self.col_id not in used_cols:
return self.child.prune(used_cols)
Expand Down Expand Up @@ -1171,7 +1312,7 @@ def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]):


@dataclasses.dataclass(frozen=True, eq=False)
class ProjectionNode(UnaryNode):
class ProjectionNode(UnaryNode, AdditiveNode):
"""Assigns new variables (without modifying existing ones)"""

assignments: typing.Tuple[
Expand Down Expand Up @@ -1212,6 +1353,13 @@ def row_count(self) -> Optional[int]:
def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]:
return tuple(id for _, id in self.assignments)

@property
def additive_base(self) -> BigFrameNode:
return self.child

def replace_additive_base(self, node: BigFrameNode):
return dataclasses.replace(self, child=node)

def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
pruned_assignments = tuple(i for i in self.assignments if i[1] in used_cols)
if len(pruned_assignments) == 0:
Expand Down Expand Up @@ -1378,7 +1526,7 @@ def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]):


@dataclasses.dataclass(frozen=True, eq=False)
class WindowOpNode(UnaryNode):
class WindowOpNode(UnaryNode, AdditiveNode):
expression: ex.Aggregation
window_spec: window.WindowSpec
output_name: bigframes.core.identifiers.ColumnId
Expand Down Expand Up @@ -1438,6 +1586,13 @@ def inherits_order(self) -> bool:
) and self.expression.op.implicitly_inherits_order
return op_inherits_order or self.window_spec.row_bounded

@property
def additive_base(self) -> BigFrameNode:
return self.child

def replace_additive_base(self, node: BigFrameNode):
return dataclasses.replace(self, child=node)

def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
if self.output_name not in used_cols:
return self.child.prune(used_cols)
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.