Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

refactor: Add function to make all column ids in a tree unique and sequential #1094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion 8 bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,13 @@ def promote_offsets(self) -> Tuple[ArrayValue, str]:
def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
"""Append together multiple ArrayValue objects."""
return ArrayValue(
nodes.ConcatNode(children=tuple([self.node, *[val.node for val in other]]))
nodes.ConcatNode(
children=tuple([self.node, *[val.node for val in other]]),
output_ids=tuple(
ids.ColumnId(bigframes.core.guid.generate_guid())
for id in self.column_ids
),
)
)

def compute_values(self, assignments: Sequence[ex.Expression]):
Expand Down
2 changes: 1 addition & 1 deletion 2 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3139,7 +3139,7 @@ def _pd_index_to_array_value(
rows = []
labels_as_tuples = utils.index_as_tuples(index)
for row_offset in range(len(index)):
id_gen = bigframes.core.identifiers.standard_identifiers()
id_gen = bigframes.core.identifiers.standard_id_strings()
row_label = labels_as_tuples[row_offset]
row_label = (row_label,) if not isinstance(row_label, tuple) else row_label
row = {}
Expand Down
28 changes: 13 additions & 15 deletions 28 bigframes/core/compile/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import google.cloud.bigquery as bigquery

import bigframes.core.compile.compiler as compiler
import bigframes.core.rewrite as rewrites

if TYPE_CHECKING:
import bigframes.core.nodes
import bigframes.core.ordering
import bigframes.core.schema

_STRICT_COMPILER = compiler.Compiler(strict=True)
_STRICT_COMPILER = compiler.Compiler(
strict=True, enable_pruning=True, enable_densify_ids=True
)


class SQLCompiler:
Expand All @@ -34,7 +35,7 @@ def __init__(self, strict: bool = True):

def compile_peek(self, node: bigframes.core.nodes.BigFrameNode, n_rows: int) -> str:
"""Compile node into sql that selects N arbitrary rows, may not execute deterministically."""
return self._compiler.compile_unordered_ir(node).peek_sql(n_rows)
return self._compiler.compile_peek_sql(node, n_rows)

def compile_unordered(
self,
Expand All @@ -44,9 +45,8 @@ def compile_unordered(
) -> str:
"""Compile node into sql where rows are unsorted, and no ordering information is preserved."""
# TODO: Enable limit pullup, but only if not being used to write to clustered table.
return self._compiler.compile_unordered_ir(node).to_sql(
col_id_overrides=col_id_overrides
)
output_ids = [col_id_overrides.get(id, id) for id in node.schema.names]
return self._compiler.compile_sql(node, ordered=False, output_ids=output_ids)

def compile_ordered(
self,
Expand All @@ -56,10 +56,8 @@ def compile_ordered(
) -> str:
"""Compile node into sql where rows are sorted with ORDER BY."""
# If we are ordering the query anyways, compiling the slice as a limit is probably a good idea.
new_node, limit = rewrites.pullup_limit_from_slice(node)
return self._compiler.compile_ordered_ir(new_node).to_sql(
col_id_overrides=col_id_overrides, ordered=True, limit=limit
)
output_ids = [col_id_overrides.get(id, id) for id in node.schema.names]
return self._compiler.compile_sql(node, ordered=True, output_ids=output_ids)

def compile_raw(
self,
Expand All @@ -68,13 +66,12 @@ def compile_raw(
str, Sequence[bigquery.SchemaField], bigframes.core.ordering.RowOrdering
]:
"""Compile node into sql that exposes all columns, including hidden ordering-only columns."""
ir = self._compiler.compile_ordered_ir(node)
sql, schema = ir.raw_sql_and_schema()
return sql, schema, ir._ordering
return self._compiler.compile_raw(node)


def test_only_try_evaluate(node: bigframes.core.nodes.BigFrameNode):
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
node = _STRICT_COMPILER._preprocess(node)
ibis = _STRICT_COMPILER.compile_ordered_ir(node)._to_ibis_expr(
ordering_mode="unordered"
)
Expand All @@ -85,9 +82,10 @@ def test_only_ibis_inferred_schema(node: bigframes.core.nodes.BigFrameNode):
"""Use only for testing paths to ensure ibis inferred schema does not diverge from bigframes inferred schema."""
import bigframes.core.schema

node = _STRICT_COMPILER._preprocess(node)
compiled = _STRICT_COMPILER.compile_unordered_ir(node)
items = tuple(
bigframes.core.schema.SchemaItem(id, compiled.get_column_type(id))
for id in compiled.column_ids
bigframes.core.schema.SchemaItem(name, compiled.get_column_type(ibis_id))
for name, ibis_id in zip(node.schema.names, compiled.column_ids)
)
return bigframes.core.schema.ArraySchema(items)
51 changes: 17 additions & 34 deletions 51 bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,12 @@ def _aggregate_base(
)
# Must have deterministic ordering, so order by the unique "by" column
ordering = TotalOrdering(
tuple([OrderingExpression(column_id) for column_id in by_column_ids]),
tuple(
[
OrderingExpression(ex.DerefOp(ref.id.local_normalized))
for ref in by_column_ids
]
),
total_ordering_columns=frozenset(
[ex.DerefOp(ref.id.local_normalized) for ref in by_column_ids]
),
Expand Down Expand Up @@ -266,31 +271,26 @@ def peek_sql(self, n: int):
def to_sql(
self,
offset_column: typing.Optional[str] = None,
col_id_overrides: typing.Mapping[str, str] = {},
ordered: bool = False,
) -> str:
if offset_column or ordered:
raise ValueError("Cannot produce sorted sql in partial ordering mode")
sql = ibis_bigquery.Backend().compile(
self._to_ibis_expr(
col_id_overrides=col_id_overrides,
)
)
sql = ibis_bigquery.Backend().compile(self._to_ibis_expr())
return typing.cast(str, sql)

def row_count(self) -> OrderedIR:
def row_count(self, name: str) -> OrderedIR:
original_table = self._to_ibis_expr()
ibis_table = original_table.agg(
[
original_table.count().name("count"),
original_table.count().name(name),
]
)
return OrderedIR(
ibis_table,
(ibis_table["count"],),
(ibis_table[name],),
ordering=TotalOrdering(
ordering_value_columns=(ascending_over("count"),),
total_ordering_columns=frozenset([ex.deref("count")]),
ordering_value_columns=(ascending_over(name),),
total_ordering_columns=frozenset([ex.deref(name)]),
),
)

Expand All @@ -299,7 +299,6 @@ def _to_ibis_expr(
*,
expose_hidden_cols: bool = False,
fraction: Optional[float] = None,
col_id_overrides: typing.Mapping[str, str] = {},
):
"""
Creates an Ibis table expression representing the DataFrame.
Expand All @@ -320,8 +319,6 @@ def _to_ibis_expr(
If True, include the hidden ordering columns in the results.
Only compatible with `order_by` and `unordered`
``ordering_mode``.
col_id_overrides:
overrides the column ids for the result
Returns:
An ibis expression representing the data help by the ArrayValue object.
"""
Expand All @@ -346,10 +343,6 @@ def _to_ibis_expr(
if self._reduced_predicate is not None:
table = table.filter(base_table[PREDICATE_COLUMN])
table = table.drop(*columns_to_drop)
if col_id_overrides:
table = table.rename(
{value: key for key, value in col_id_overrides.items()}
)
if fraction is not None:
table = table.filter(ibis.random() < ibis.literal(fraction))
return table
Expand Down Expand Up @@ -941,7 +934,6 @@ def _reproject_to_table(self) -> OrderedIR:

def to_sql(
self,
col_id_overrides: typing.Mapping[str, str] = {},
ordered: bool = False,
limit: Optional[int] = None,
) -> str:
Expand All @@ -951,17 +943,13 @@ def to_sql(
sql = ibis_bigquery.Backend().compile(
baked_ir._to_ibis_expr(
ordering_mode="unordered",
col_id_overrides=col_id_overrides,
expose_hidden_cols=True,
)
)
output_columns = [
col_id_overrides.get(col, col) for col in baked_ir.column_ids
]
sql = (
bigframes.core.compile.googlesql.Select()
.from_(sql)
.select(output_columns)
.select(self.column_ids)
.sql()
)

Expand All @@ -979,24 +967,26 @@ def to_sql(
sql = ibis_bigquery.Backend().compile(
self._to_ibis_expr(
ordering_mode="unordered",
col_id_overrides=col_id_overrides,
expose_hidden_cols=False,
)
)
return typing.cast(str, sql)

def raw_sql_and_schema(
self,
column_ids: typing.Sequence[str],
) -> typing.Tuple[str, typing.Sequence[google.cloud.bigquery.SchemaField]]:
"""Return sql with all hidden columns. Used to cache with ordering information.

Also returns schema, as the extra ordering columns are determined compile-time.
"""
col_id_overrides = dict(zip(self.column_ids, column_ids))
all_columns = (*self.column_ids, *self._hidden_ordering_column_names.keys())
as_ibis = self._to_ibis_expr(
ordering_mode="unordered",
expose_hidden_cols=True,
).select(all_columns)
)
as_ibis = as_ibis.select(all_columns).rename(col_id_overrides)

# Ibis will produce non-nullable schema types, but bigframes should always be nullable
fixed_ibis_schema = ibis_schema.Schema.from_tuples(
Expand All @@ -1013,7 +1003,6 @@ def _to_ibis_expr(
*,
expose_hidden_cols: bool = False,
fraction: Optional[float] = None,
col_id_overrides: typing.Mapping[str, str] = {},
ordering_mode: Literal["string_encoded", "unordered"],
order_col_name: Optional[str] = ORDER_ID_COLUMN,
):
Expand Down Expand Up @@ -1043,8 +1032,6 @@ def _to_ibis_expr(
order_col_name:
If the ordering mode outputs a single ordering or offsets
column, use this as the column name.
col_id_overrides:
overrides the column ids for the result
Returns:
An ibis expression representing the data help by the ArrayValue object.
"""
Expand Down Expand Up @@ -1086,10 +1073,6 @@ def _to_ibis_expr(
if self._reduced_predicate is not None:
table = table.filter(base_table[PREDICATE_COLUMN])
table = table.drop(*columns_to_drop)
if col_id_overrides:
table = table.rename(
{value: key for key, value in col_id_overrides.items()}
)
if fraction is not None:
table = table.filter(ibis.random() < ibis.literal(fraction))
return table
Expand Down
70 changes: 54 additions & 16 deletions 70 bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io
import typing

import google.cloud.bigquery
import ibis
import ibis.backends
import ibis.backends.bigquery
Expand All @@ -32,6 +33,7 @@
import bigframes.core.compile.scalar_op_compiler as compile_scalar
import bigframes.core.compile.schema_translator
import bigframes.core.compile.single_column
import bigframes.core.expression as ex
import bigframes.core.guid as guids
import bigframes.core.identifiers as ids
import bigframes.core.nodes as nodes
Expand All @@ -50,31 +52,66 @@ class Compiler:
strict: bool = True
scalar_op_compiler = compile_scalar.ScalarOpCompiler()
enable_pruning: bool = False
enable_densify_ids: bool = False

def compile_sql(
self, node: nodes.BigFrameNode, ordered: bool, output_ids: typing.Sequence[str]
) -> str:
node = self.set_output_names(node, output_ids)
if ordered:
node, limit = rewrites.pullup_limit_from_slice(node)
return self.compile_ordered_ir(self._preprocess(node)).to_sql(
ordered=True, limit=limit
)
else:
return self.compile_unordered_ir(self._preprocess(node)).to_sql()

def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str:
return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows)

def compile_raw(
self,
node: bigframes.core.nodes.BigFrameNode,
) -> typing.Tuple[
str, typing.Sequence[google.cloud.bigquery.SchemaField], bf_ordering.RowOrdering
]:
ir = self.compile_ordered_ir(self._preprocess(node))
sql, schema = ir.raw_sql_and_schema(column_ids=node.schema.names)
return sql, schema, ir._ordering

def _preprocess(self, node: nodes.BigFrameNode):
if self.enable_pruning:
used_fields = frozenset(field.id for field in node.fields)
node = node.prune(used_fields)
node = functools.cache(rewrites.replace_slice_ops)(node)
if self.enable_densify_ids:
original_names = [id.name for id in node.ids]
node, _ = rewrites.remap_variables(
node, id_generator=ids.anonymous_serial_ids()
)
node = self.set_output_names(node, original_names)
return node

def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR:
ir = typing.cast(
compiled.OrderedIR, self.compile_node(self._preprocess(node), True)
def set_output_names(
self, node: bigframes.core.nodes.BigFrameNode, output_ids: typing.Sequence[str]
):
# TODO: Create specialized output operators that will handle final names
return nodes.SelectionNode(
node,
tuple(
(ex.DerefOp(old_id), ids.ColumnId(out_id))
for old_id, out_id in zip(node.ids, output_ids)
),
)

def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR:
ir = typing.cast(compiled.OrderedIR, self.compile_node(node, True))
if self.strict:
assert ir.has_total_order
return ir

def compile_unordered_ir(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR:
return typing.cast(
compiled.UnorderedIR, self.compile_node(self._preprocess(node), False)
)

def compile_peak_sql(
self, node: nodes.BigFrameNode, n_rows: int
) -> typing.Optional[str]:
return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows)
return typing.cast(compiled.UnorderedIR, self.compile_node(node, False))

# TODO: Remove cache when schema no longer requires compilation to derive schema (and therefor only compiles for execution)
@functools.lru_cache(maxsize=5000)
Expand Down Expand Up @@ -144,11 +181,11 @@ def compile_fromrange(self, node: nodes.FromRangeNode, ordered: bool = True):

labels_array_table = ibis.range(
joined_table[start_column], joined_table[end_column] + node.step, node.step
).name("labels")
).name(node.output_id.sql)
labels = (
typing.cast(ibis.expr.types.ArrayValue, labels_array_table)
.as_table()
.unnest(["labels"])
.unnest([node.output_id.sql])
)
if ordered:
return compiled.OrderedIR(
Expand Down Expand Up @@ -307,18 +344,19 @@ def compile_projection(self, node: nodes.ProjectionNode, ordered: bool = True):

@_compile_node.register
def compile_concat(self, node: nodes.ConcatNode, ordered: bool = True):
output_ids = [id.sql for id in node.output_ids]
if ordered:
compiled_ordered = [self.compile_ordered_ir(node) for node in node.children]
return concat_impl.concat_ordered(compiled_ordered)
return concat_impl.concat_ordered(compiled_ordered, output_ids)
else:
compiled_unordered = [
self.compile_unordered_ir(node) for node in node.children
]
return concat_impl.concat_unordered(compiled_unordered)
return concat_impl.concat_unordered(compiled_unordered, output_ids)

@_compile_node.register
def compile_rowcount(self, node: nodes.RowCountNode, ordered: bool = True):
result = self.compile_unordered_ir(node.child).row_count()
result = self.compile_unordered_ir(node.child).row_count(name=node.col_id.sql)
return result if ordered else result.to_unordered()

@_compile_node.register
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.