From b3781987990e133c19db27bc5729af8985696b20 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 22 Aug 2024 23:52:20 +0000 Subject: [PATCH 1/6] perf: Improve repr performance --- bigframes/core/__init__.py | 19 +----- bigframes/core/blocks.py | 26 +++----- bigframes/core/nodes.py | 75 +++++++++++++-------- bigframes/core/tree_properties.py | 35 +++++++++- bigframes/session/executor.py | 105 +++++++++++++++++++++++++----- tests/unit/core/test_blocks.py | 7 +- 6 files changed, 186 insertions(+), 81 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 2e9b5fa994..b18db6298c 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -70,23 +70,7 @@ def from_pyarrow(cls, arrow_table: pa.Table, session: Session): iobytes.getvalue(), data_schema=schema, session=session, - ) - return cls(node) - - @classmethod - def from_cached( - cls, - original: ArrayValue, - table: google.cloud.bigquery.Table, - ordering: orderings.TotalOrdering, - ): - node = nodes.CachedTableNode( - original_node=original.node, - project_id=table.reference.project, - dataset_id=table.reference.dataset_id, - table_id=table.reference.table_id, - physical_schema=tuple(table.schema), - ordering=ordering, + n_rows=arrow_table.num_rows, ) return cls(node) @@ -159,6 +143,7 @@ def as_cached( table_id=cache_table.reference.table_id, physical_schema=tuple(cache_table.schema), ordering=ordering, + n_rows=cache_table.num_rows, ) return ArrayValue(node) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 9361543d5f..d7e6818f76 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -200,6 +200,7 @@ def index(self) -> BlockIndexProperties: @functools.cached_property def shape(self) -> typing.Tuple[int, int]: """Returns dimensions as (length, width) tuple.""" + row_count_expr = self.expr.row_count() # Support in-memory engines for hermetic unit tests. @@ -210,8 +211,7 @@ def shape(self) -> typing.Tuple[int, int]: except Exception: pass - iter, _ = self.session._execute(row_count_expr, ordered=False) - row_count = next(iter)[0] + row_count = self.session._executor.get_row_count(self.expr) return (row_count, len(self.value_columns)) @property @@ -560,7 +560,7 @@ def to_pandas( def try_peek( self, n: int = 20, force: bool = False ) -> typing.Optional[pd.DataFrame]: - if force or tree_properties.peekable(self.expr.node): + if force or tree_properties.can_fast_peek(self.expr.node): iterator, _ = self.session._peek(self.expr, n) df = self._to_dataframe(iterator) self._copy_index_to_pandas(df) @@ -1587,19 +1587,13 @@ def retrieve_repr_request_results( Returns a tuple of the dataframe and the overall number of rows of the query. """ - # TODO(swast): Select a subset of columns if max_columns is less than the - # number of columns in the schema. - count = self.shape[0] - if count > max_results: - head_block = self.slice(0, max_results) - else: - head_block = self - computed_df, query_job = head_block.to_pandas() - formatted_df = computed_df.set_axis(self.column_labels, axis=1) - # we reset the axis and substitute the bf index name(s) for the default - if len(self.index.names) > 0: - formatted_df.index.names = self.index.names # type: ignore - return formatted_df, count, query_job + + results, query_job = self.session._executor.head(self.expr, max_results) + count = self.session._executor.get_row_count(self.expr) + + computed_df = self._to_dataframe(results) + self._copy_index_to_pandas(computed_df) + return computed_df, count, query_job def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: result_id = guid.generate_guid() diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 30edc7740a..7a4295ba27 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -312,18 +312,36 @@ def transform_children( # Input Nodex @dataclass(frozen=True) -class ReadLocalNode(BigFrameNode): +class LeafNode(BigFrameNode): + @property + def roots(self) -> typing.Set[BigFrameNode]: + return {self} + + @property + def supports_fast_head(self) -> bool: + return False + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return self + + @property + def row_count(self) -> typing.Optional[int]: + """How many rows are in the data source. None means unknown.""" + return None + + +@dataclass(frozen=True) +class ReadLocalNode(LeafNode): feather_bytes: bytes data_schema: schemata.ArraySchema + n_rows: int session: typing.Optional[bigframes.session.Session] = None def __hash__(self): return self._node_hash - @property - def roots(self) -> typing.Set[BigFrameNode]: - return {self} - @functools.cached_property def schema(self) -> schemata.ArraySchema: return self.data_schema @@ -333,6 +351,10 @@ def variables_introduced(self) -> int: """Defines the number of variables generated by the current node. Used to estimate query planning complexity.""" return len(self.schema.items) + 1 + @property + def supports_fast_head(self) -> bool: + return True + @property def order_ambiguous(self) -> bool: return False @@ -341,15 +363,14 @@ def order_ambiguous(self) -> bool: def explicitly_ordered(self) -> bool: return True - def transform_children( - self, t: Callable[[BigFrameNode], BigFrameNode] - ) -> BigFrameNode: - return self + @property + def row_count(self) -> typing.Optional[int]: + return self.n_rows ## Put ordering in here or just add order_by node above? @dataclass(frozen=True) -class ReadTableNode(BigFrameNode): +class ReadTableNode(LeafNode): project_id: str = field() dataset_id: str = field() table_id: str = field() @@ -385,10 +406,6 @@ def session(self): def __hash__(self): return self._node_hash - @property - def roots(self) -> typing.Set[BigFrameNode]: - return {self} - @property def schema(self) -> schemata.ArraySchema: return self.columns @@ -398,6 +415,11 @@ def relation_ops_created(self) -> int: # Assume worst case, where readgbq actually has baked in analytic operation to generate index return 3 + @property + def supports_fast_head(self) -> bool: + # TODO: Be more lenient for small tables, or those clustered on non-sequential order key + return self.order_col_is_sequential + @property def order_ambiguous(self) -> bool: return len(self.total_order_cols) == 0 @@ -410,15 +432,10 @@ def explicitly_ordered(self) -> bool: def variables_introduced(self) -> int: return len(self.schema.items) + 1 - def transform_children( - self, t: Callable[[BigFrameNode], BigFrameNode] - ) -> BigFrameNode: - return self - # This node shouldn't be used in the "original" expression tree, only used as replacement for original during planning @dataclass(frozen=True) -class CachedTableNode(BigFrameNode): +class CachedTableNode(LeafNode): # The original BFET subtree that was cached # note: this isn't a "child" node. original_node: BigFrameNode = field() @@ -429,6 +446,7 @@ class CachedTableNode(BigFrameNode): physical_schema: Tuple[bq.SchemaField, ...] = field() ordering: typing.Optional[orderings.RowOrdering] = field() + n_rows: int = field() def __post_init__(self): # enforce invariants @@ -450,10 +468,6 @@ def session(self): def __hash__(self): return self._node_hash - @property - def roots(self) -> typing.Set[BigFrameNode]: - return {self} - @property def schema(self) -> schemata.ArraySchema: return self.original_node.schema @@ -473,6 +487,12 @@ def hidden_columns(self) -> typing.Tuple[str, ...]: if col not in self.schema.names ) + @property + def supports_fast_head(self) -> bool: + # TODO: Be more lenient for small tables, or those clustered on non-sequential order key + # No ordering supports fast head as can just take n arbitrary rows + return (self.ordering is None) or self.ordering.is_sequential + @property def order_ambiguous(self) -> bool: return not isinstance(self.ordering, orderings.TotalOrdering) @@ -483,10 +503,9 @@ def explicitly_ordered(self) -> bool: self.ordering.all_ordering_columns ) > 0 - def transform_children( - self, t: Callable[[BigFrameNode], BigFrameNode] - ) -> BigFrameNode: - return self + @property + def row_count(self) -> typing.Optional[int]: + return self.n_rows # Unary nodes diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index 846cf50d77..e782c5c874 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -32,14 +32,45 @@ def local_only(node: nodes.BigFrameNode) -> bool: return all(isinstance(node, nodes.ReadLocalNode) for node in node.roots) -def peekable(node: nodes.BigFrameNode) -> bool: +def can_fast_peek(node: nodes.BigFrameNode) -> bool: if local_only(node): return True - children_peekable = all(peekable(child) for child in node.child_nodes) + children_peekable = all(can_fast_peek(child) for child in node.child_nodes) self_peekable = not node.non_local return children_peekable and self_peekable +def can_fast_head(node: nodes.BigFrameNode) -> bool: + """Can get head fast if can push head operator down to leafs and operators preserve rows.""" + if isinstance(node, nodes.LeafNode): + return node.supports_fast_head + # TODO: In theory we can push head down through concat, but requires some dedicated logic + if isinstance(node, nodes.UnaryNode): + return node.row_preserving and can_fast_head(node.child) + return False + + +def row_count(node: nodes.BigFrameNode) -> Optional[int]: + """Determine row count from local metadata, return None if unknown.""" + if isinstance(node, nodes.LeafNode): + return node.row_count + if isinstance(node, nodes.AggregateNode): + if len(node.by_column_ids) == 0: + return 1 + return None + if isinstance(node, nodes.ConcatNode): + sub_counts = list(map(row_count, node.child_nodes)) + total = 0 + for count in sub_counts: + if count is None: + return None + total += count + return total + if isinstance(node, nodes.UnaryNode) and node.row_preserving: + return row_count(node.child) + return None + + # Replace modified_cost(node) = cost(apply_cache(node)) def select_cache_target( root: nodes.BigFrameNode, diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 539658a18c..1394b9c4e2 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -25,11 +25,13 @@ import bigframes.core import bigframes.core.compile +import bigframes.core.expression as ex import bigframes.core.guid import bigframes.core.nodes as nodes import bigframes.core.ordering as order import bigframes.core.tree_properties as tree_properties import bigframes.formatting_helpers as formatting_helpers +import bigframes.operations as ops import bigframes.session._io.bigquery as bq_io import bigframes.session.metrics import bigframes.session.planner @@ -84,7 +86,7 @@ def to_sql( if offset_column: array_value = array_value.promote_offsets(offset_column) node = ( - self._with_cached_executions(array_value.node) + self._get_optimized_plan(array_value.node) if enable_cache else array_value.node ) @@ -161,20 +163,74 @@ def dry_run(self, array_value: bigframes.core.ArrayValue, ordered: bool = True): return results_iterator, query_job def peek( - self, array_value: bigframes.core.ArrayValue, n_rows: int + self, + array_value: bigframes.core.ArrayValue, + n_rows: int, ) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]: - """A 'peek' efficiently accesses a small number of rows in the dataframe.""" - if not tree_properties.peekable(self._with_cached_executions(array_value.node)): + """ + A 'peek' efficiently accesses a small number of rows in the dataframe. + """ + plan = self._get_optimized_plan(array_value.node) + if not tree_properties.can_fast_peek(plan): warnings.warn("Peeking this value cannot be done efficiently.") - sql = self.compiler.compile_peek( - self._with_cached_executions(array_value.node), n_rows - ) + + sql = self.compiler.compile_peek(plan, n_rows) # TODO(swast): plumb through the api_name of the user-facing api that # caused this query. - return self._run_execute_query( - sql=sql, - ) + return self._run_execute_query(sql=sql) + + def head( + self, array_value: bigframes.core.ArrayValue, n_rows: int + ) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]: + """ + A 'peek' efficiently accesses a small number of rows in the dataframe. + """ + maybe_row_count = self._local_get_row_count(array_value) + if (maybe_row_count is not None) and (maybe_row_count <= n_rows): + return self.execute(array_value, ordered=True) + + if not self.strictly_ordered and not array_value.node.explicitly_ordered: + # No user-provided ordering, so just get any N rows, its faster! + return self.peek(array_value, n_rows) + + plan = self._get_optimized_plan(array_value.node) + if not tree_properties.can_fast_head(plan): + # If can't get head fast, we are going to need to execute the whole query + # Will want to do this in a way such that the result is reusable, but the first + # N values can be easily extracted. This means clustering on the order key. + + # This may be too much, might be sufficient to cluster on ordering key (if <= 4 parts) + self._cache_with_offsets(array_value) + # Get a new optimized plan after caching + plan = self._get_optimized_plan(array_value.node) + assert tree_properties.can_fast_head(plan) + + head_plan = generate_head_plan(plan, n_rows) + sql = self.compiler.compile_ordered(head_plan) + + # TODO(swast): plumb through the api_name of the user-facing api that + # caused this query. + return self._run_execute_query(sql=sql) + + def get_row_count(self, array_value: bigframes.core.ArrayValue) -> int: + # optimized plan less likely to have count-destroying operators like filter or join + count = self._local_get_row_count(array_value) + if count is not None: + return count + else: + row_count_plan = self._get_optimized_plan( + generate_row_count_plan(array_value.node) + ) + sql = self.compiler.compile_unordered(row_count_plan) + iter, _ = self._run_execute_query(sql) + return next(iter)[0] + + def _local_get_row_count( + self, array_value: bigframes.core.ArrayValue + ) -> Optional[int]: + plan = self._get_optimized_plan(array_value.node) + return tree_properties.row_count(plan) # Helpers def _run_execute_query( @@ -218,7 +274,7 @@ def _run_execute_query( else: raise - def _with_cached_executions(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: + def _get_optimized_plan(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: return tree_properties.replace_nodes(node, (dict(self._cached_executions))) def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): @@ -229,7 +285,7 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): # Once rewriting is available, will want to rewrite before # evaluating execution cost. return tree_properties.is_trivially_executable( - self._with_cached_executions(array_value.node) + self._get_optimized_plan(array_value.node) ) def _cache_with_cluster_cols( @@ -238,7 +294,7 @@ def _cache_with_cluster_cols( """Executes the query and uses the resulting table to rewrite future executions.""" sql, schema, ordering_info = self.compiler.compile_raw( - self._with_cached_executions(array_value.node) + self._get_optimized_plan(array_value.node) ) tmp_table = self._sql_as_cached_temp_table( sql, @@ -260,9 +316,7 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): ) offset_column = bigframes.core.guid.generate_guid("bigframes_offsets") node_w_offsets = array_value.promote_offsets(offset_column).node - sql = self.compiler.compile_unordered( - self._with_cached_executions(node_w_offsets) - ) + sql = self.compiler.compile_unordered(self._get_optimized_plan(node_w_offsets)) tmp_table = self._sql_as_cached_temp_table( sql, @@ -297,7 +351,7 @@ def _simplify_with_caching(self, array_value: bigframes.core.ArrayValue): """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" # Apply existing caching first for _ in range(MAX_SUBTREE_FACTORINGS): - node_with_cache = self._with_cached_executions(array_value.node) + node_with_cache = self._get_optimized_plan(array_value.node) if node_with_cache.planning_complexity < QUERY_COMPLEXITY_LIMIT: return @@ -343,4 +397,21 @@ def _sql_as_cached_temp_table( job_config=job_config, api_name="cached", ) + query_job.destination + query_job.result() return query_job.destination + + +def generate_head_plan(node: nodes.BigFrameNode, n: int): + offsets_id = bigframes.core.guid.generate_guid("offsets_") + plan_w_offsets = nodes.PromoteOffsetsNode(node, offsets_id) + predicate = ops.lt_op.as_expr(ex.free_var(offsets_id), ex.const(n)) + plan_w_head = nodes.FilterNode(plan_w_offsets, predicate) + # Finally, drop the offsets column + return nodes.ProjectionNode( + plan_w_head, tuple((ex.free_var(i), i) for i in node.schema.names) + ) + + +def generate_row_count_plan(node: nodes.BigFrameNode): + return nodes.RowCountNode(node) diff --git a/tests/unit/core/test_blocks.py b/tests/unit/core/test_blocks.py index 8cde187cb3..8ed3acba0f 100644 --- a/tests/unit/core/test_blocks.py +++ b/tests/unit/core/test_blocks.py @@ -20,6 +20,7 @@ import bigframes import bigframes.core.blocks as blocks +import bigframes.session.executor @pytest.mark.parametrize( @@ -78,9 +79,13 @@ def test_block_from_local(data): expected = pandas.DataFrame(data) mock_session = mock.create_autospec(spec=bigframes.Session) + mock_executor = mock.create_autospec( + spec=bigframes.session.executor.BigQueryCachingExecutor + ) # hard-coded the returned dimension of the session for that each of the test case contains 3 rows. - mock_session._execute.return_value = (iter([[3]]), None) + mock_session._executor = mock_executor + mock_executor.get_row_count.return_value = 3 block = blocks.Block.from_local(pandas.DataFrame(data), mock_session) From 7d7451147d499e586a40b8be4baa798d04167413 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 28 Aug 2024 23:51:37 +0000 Subject: [PATCH 2/6] extract gbq metadata from nodes to common struct --- bigframes/core/__init__.py | 11 ++------ bigframes/core/compile/compiler.py | 12 +++++--- bigframes/core/nodes.py | 45 ++++++++++++++++++++---------- bigframes/session/executor.py | 8 +++++- 4 files changed, 47 insertions(+), 29 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index b18db6298c..f3c75f7143 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -94,10 +94,7 @@ def from_table( bigframes.exceptions.PreviewWarning, ) node = nodes.ReadTableNode( - project_id=table.reference.project, - dataset_id=table.reference.dataset_id, - table_id=table.reference.table_id, - physical_schema=tuple(table.schema), + table=nodes.GbqTable.from_table(table), total_order_cols=(offsets_col,) if offsets_col else tuple(primary_key), order_col_is_sequential=(offsets_col is not None), columns=schema, @@ -138,12 +135,8 @@ def as_cached( """ node = nodes.CachedTableNode( original_node=self.node, - project_id=cache_table.reference.project, - dataset_id=cache_table.reference.dataset_id, - table_id=cache_table.reference.table_id, - physical_schema=tuple(cache_table.schema), + table=nodes.GbqTable.from_table(cache_table), ordering=ordering, - n_rows=cache_table.num_rows, ) return ArrayValue(node) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 8fb1f7ab3a..3fedf5c0c8 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -103,14 +103,16 @@ def compile_readlocal(self, node: nodes.ReadLocalNode, ordered: bool = True): @_compile_node.register def compile_cached_table(self, node: nodes.CachedTableNode, ordered: bool = True): - full_table_name = f"{node.project_id}.{node.dataset_id}.{node.table_id}" + full_table_name = ( + f"{node.table.project_id}.{node.table.dataset_id}.{node.table.table_id}" + ) used_columns = ( *node.schema.names, *node.hidden_columns, ) # Physical schema might include unused columns, unsupported datatypes like JSON physical_schema = ibis.backends.bigquery.BigQuerySchema.to_ibis( - list(i for i in node.physical_schema if i.name in used_columns) + list(i for i in node.table.physical_schema if i.name in used_columns) ) ibis_table = ibis.table(physical_schema, full_table_name) if ordered: @@ -156,14 +158,16 @@ def compile_readtable(self, node: nodes.ReadTableNode, ordered: bool = True): def read_table_as_unordered_ibis( self, node: nodes.ReadTableNode ) -> ibis.expr.types.Table: - full_table_name = f"{node.project_id}.{node.dataset_id}.{node.table_id}" + full_table_name = ( + f"{node.table.project_id}.{node.table.dataset_id}.{node.table.table_id}" + ) used_columns = ( *node.schema.names, *[i for i in node.total_order_cols if i not in node.schema.names], ) # Physical schema might include unused columns, unsupported datatypes like JSON physical_schema = ibis.backends.bigquery.BigQuerySchema.to_ibis( - list(i for i in node.physical_schema if i.name in used_columns) + list(i for i in node.table.physical_schema if i.name in used_columns) ) if node.at_time is not None or node.sql_predicate is not None: import bigframes.session._io.bigquery diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 7a4295ba27..2bca8bcf28 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -368,14 +368,28 @@ def row_count(self) -> typing.Optional[int]: return self.n_rows -## Put ordering in here or just add order_by node above? @dataclass(frozen=True) -class ReadTableNode(LeafNode): +class GbqTable: project_id: str = field() dataset_id: str = field() table_id: str = field() - physical_schema: Tuple[bq.SchemaField, ...] = field() + n_rows: int = field() + + def from_table(table: bq.Table) -> GbqTable: + return GbqTable( + project_id=table.project, + dataset_id=table.dataset_id, + table_id=table.table_id, + physical_schema=schemata.ArraySchema.from_bq_table(table), + n_rows=table.num_rows, + ) + + +## Put ordering in here or just add order_by node above? +@dataclass(frozen=True) +class ReadTableNode(LeafNode): + table: GbqTable # Subset of physical schema columns, with chosen BQ types columns: schemata.ArraySchema = field() @@ -391,10 +405,10 @@ class ReadTableNode(LeafNode): def __post_init__(self): # enforce invariants - physical_names = set(map(lambda i: i.name, self.physical_schema)) + physical_names = set(map(lambda i: i.name, self.table.physical_schema)) if not set(self.columns.names).issubset(physical_names): raise ValueError( - f"Requested schema {self.columns} cannot be derived from table schemal {self.physical_schema}" + f"Requested schema {self.columns} cannot be derived from table schemal {self.table.physical_schema}" ) if self.order_col_is_sequential and len(self.total_order_cols) != 1: raise ValueError("Sequential primary key must have only one component") @@ -432,6 +446,12 @@ def explicitly_ordered(self) -> bool: def variables_introduced(self) -> int: return len(self.schema.items) + 1 + @property + def row_count(self) -> typing.Optional[int]: + if self.sql_predicate is None: + return self.table.n_rows + return None + # This node shouldn't be used in the "original" expression tree, only used as replacement for original during planning @dataclass(frozen=True) @@ -440,25 +460,20 @@ class CachedTableNode(LeafNode): # note: this isn't a "child" node. original_node: BigFrameNode = field() # reference to cached materialization of original_node - project_id: str = field() - dataset_id: str = field() - table_id: str = field() - physical_schema: Tuple[bq.SchemaField, ...] = field() - + table: GbqTable ordering: typing.Optional[orderings.RowOrdering] = field() - n_rows: int = field() def __post_init__(self): # enforce invariants - physical_names = set(map(lambda i: i.name, self.physical_schema)) + physical_names = set(map(lambda i: i.name, self.table.physical_schema)) logical_names = self.original_node.schema.names if not set(logical_names).issubset(physical_names): raise ValueError( - f"Requested schema {logical_names} cannot be derived from table schema {self.physical_schema}" + f"Requested schema {logical_names} cannot be derived from table schema {self.table.physical_schema}" ) if not set(self.hidden_columns).issubset(physical_names): raise ValueError( - f"Requested hidden columns {self.hidden_columns} cannot be derived from table schema {self.physical_schema}" + f"Requested hidden columns {self.hidden_columns} cannot be derived from table schema {self.table.physical_schema}" ) @property @@ -505,7 +520,7 @@ def explicitly_ordered(self) -> bool: @property def row_count(self) -> typing.Optional[int]: - return self.n_rows + return self.table.n_rows # Unary nodes diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index c10f54e1c9..dd99e2d059 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -243,7 +243,6 @@ def head( return self._run_execute_query(sql=sql) def get_row_count(self, array_value: bigframes.core.ArrayValue) -> int: - # optimized plan less likely to have count-destroying operators like filter or join count = self._local_get_row_count(array_value) if count is not None: return count @@ -258,6 +257,8 @@ def get_row_count(self, array_value: bigframes.core.ArrayValue) -> int: def _local_get_row_count( self, array_value: bigframes.core.ArrayValue ) -> Optional[int]: + # optimized plan has cache materializations which will have row count metadata + # that is more likely to be usable than original leaf nodes. plan = self._get_optimized_plan(array_value.node) return tree_properties.row_count(plan) @@ -307,6 +308,11 @@ def _wait_on_job(self, query_job: bigquery.QueryJob) -> bigquery.table.RowIterat return results_iterator def _get_optimized_plan(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: + """ + Takes the original expression tree and applies optimizations to accelerate execution. + + At present, the only optimization is to replace subtress with cached previous materializations. + """ return tree_properties.replace_nodes(node, (dict(self._cached_executions))) def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): From a54d3a6e5fa7b8a32d103f1b5066a1d5d093de72 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 29 Aug 2024 19:43:35 +0000 Subject: [PATCH 3/6] clarify fast head --- bigframes/core/nodes.py | 13 ++++++++++--- bigframes/core/tree_properties.py | 1 - bigframes/session/executor.py | 13 ++++++++----- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 2bca8bcf28..0084770445 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -375,6 +375,7 @@ class GbqTable: table_id: str = field() physical_schema: Tuple[bq.SchemaField, ...] = field() n_rows: int = field() + cluster_cols: typing.Optional[Tuple[str, ...]] def from_table(table: bq.Table) -> GbqTable: return GbqTable( @@ -383,6 +384,9 @@ def from_table(table: bq.Table) -> GbqTable: table_id=table.table_id, physical_schema=schemata.ArraySchema.from_bq_table(table), n_rows=table.num_rows, + cluster_cols=None + if table.clustering_fields is None + else tuple(table.clustering_fields), ) @@ -431,7 +435,9 @@ def relation_ops_created(self) -> int: @property def supports_fast_head(self) -> bool: - # TODO: Be more lenient for small tables, or those clustered on non-sequential order key + # Fast head is only supported when row offsets are available. + # In the future, ORDER BY+LIMIT optimizations may allow fast head when + # clustered and/or partitioned on ordering key return self.order_col_is_sequential @property @@ -504,8 +510,9 @@ def hidden_columns(self) -> typing.Tuple[str, ...]: @property def supports_fast_head(self) -> bool: - # TODO: Be more lenient for small tables, or those clustered on non-sequential order key - # No ordering supports fast head as can just take n arbitrary rows + # Fast head is only supported when row offsets are available. + # In the future, ORDER BY+LIMIT optimizations may allow fast head when + # clustered and/or partitioned on ordering key return (self.ordering is None) or self.ordering.is_sequential @property diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index e782c5c874..4978e75e38 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -44,7 +44,6 @@ def can_fast_head(node: nodes.BigFrameNode) -> bool: """Can get head fast if can push head operator down to leafs and operators preserve rows.""" if isinstance(node, nodes.LeafNode): return node.supports_fast_head - # TODO: In theory we can push head down through concat, but requires some dedicated logic if isinstance(node, nodes.UnaryNode): return node.row_preserving and can_fast_head(node.child) return False diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index dd99e2d059..b696b77eb1 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -213,7 +213,7 @@ def head( self, array_value: bigframes.core.ArrayValue, n_rows: int ) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """ - A 'peek' efficiently accesses a small number of rows in the dataframe. + Preview the first n rows of the dataframe. This is less efficient than the unordered peek preview op. """ maybe_row_count = self._local_get_row_count(array_value) if (maybe_row_count is not None) and (maybe_row_count <= n_rows): @@ -227,9 +227,8 @@ def head( if not tree_properties.can_fast_head(plan): # If can't get head fast, we are going to need to execute the whole query # Will want to do this in a way such that the result is reusable, but the first - # N values can be easily extracted. This means clustering on the order key. - - # This may be too much, might be sufficient to cluster on ordering key (if <= 4 parts) + # N values can be easily extracted. + # This currently requires clustering on offsets. self._cache_with_offsets(array_value) # Get a new optimized plan after caching plan = self._get_optimized_plan(array_value.node) @@ -313,7 +312,11 @@ def _get_optimized_plan(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: At present, the only optimization is to replace subtress with cached previous materializations. """ - return tree_properties.replace_nodes(node, (dict(self._cached_executions))) + # Apply any rewrites *after* applying cache, as cache is sensitive to exact tree structure + optimized_plan = tree_properties.replace_nodes( + node, (dict(self._cached_executions)) + ) + return optimized_plan def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): """ From dd23425b5c219ec5a4dd6f1174640b75d71f6665 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 29 Aug 2024 20:12:07 +0000 Subject: [PATCH 4/6] fix physical_schema to be bq client types --- bigframes/core/nodes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 0084770445..6e35f88147 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -382,7 +382,7 @@ def from_table(table: bq.Table) -> GbqTable: project_id=table.project, dataset_id=table.dataset_id, table_id=table.table_id, - physical_schema=schemata.ArraySchema.from_bq_table(table), + physical_schema=tuple(table.schema), n_rows=table.num_rows, cluster_cols=None if table.clustering_fields is None From 1bf4c3b34bcb5d770b5be13a6522dd5bf50598b5 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 29 Aug 2024 20:27:11 +0000 Subject: [PATCH 5/6] add classmethod annotation to GbqTable struct factory method --- bigframes/core/nodes.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 6e35f88147..ab97fc9722 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -377,7 +377,8 @@ class GbqTable: n_rows: int = field() cluster_cols: typing.Optional[Tuple[str, ...]] - def from_table(table: bq.Table) -> GbqTable: + @classmethod + def from_table(cls, table: bq.Table) -> GbqTable: return GbqTable( project_id=table.project, dataset_id=table.dataset_id, From 0f351e296d29688541e52221464d9b88e68ef2be Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 29 Aug 2024 20:27:43 +0000 Subject: [PATCH 6/6] add classmethod annotation to GbqTable struct factory method --- bigframes/core/nodes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index ab97fc9722..73780719a9 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -377,8 +377,8 @@ class GbqTable: n_rows: int = field() cluster_cols: typing.Optional[Tuple[str, ...]] - @classmethod - def from_table(cls, table: bq.Table) -> GbqTable: + @staticmethod + def from_table(table: bq.Table) -> GbqTable: return GbqTable( project_id=table.project, dataset_id=table.dataset_id,