diff --git a/bigframes/core/rewrite/__init__.py b/bigframes/core/rewrite/__init__.py index 069ebb9cdf..b8f1d26db8 100644 --- a/bigframes/core/rewrite/__init__.py +++ b/bigframes/core/rewrite/__init__.py @@ -22,7 +22,7 @@ try_reduce_to_local_scan, try_reduce_to_table_scan, ) -from bigframes.core.rewrite.slices import pull_up_limits, rewrite_slice +from bigframes.core.rewrite.slices import pull_out_limit, pull_up_limits, rewrite_slice from bigframes.core.rewrite.timedeltas import rewrite_timedelta_expressions from bigframes.core.rewrite.windows import rewrite_range_rolling @@ -32,6 +32,7 @@ "rewrite_slice", "rewrite_timedelta_expressions", "pull_up_limits", + "pull_out_limit", "remap_variables", "defer_order", "column_pruning", diff --git a/bigframes/core/rewrite/slices.py b/bigframes/core/rewrite/slices.py index b8a003e061..92911310da 100644 --- a/bigframes/core/rewrite/slices.py +++ b/bigframes/core/rewrite/slices.py @@ -26,7 +26,7 @@ def pull_up_limits(root: nodes.ResultNode) -> nodes.ResultNode: - new_child, pulled_limit = _pullup_slice_inner(root.child) + new_child, pulled_limit = pull_out_limit(root.child) if new_child == root.child: return root elif pulled_limit is None: @@ -37,7 +37,7 @@ def pull_up_limits(root: nodes.ResultNode) -> nodes.ResultNode: return dataclasses.replace(root, child=new_child, limit=new_limit) -def _pullup_slice_inner( +def pull_out_limit( root: nodes.BigFrameNode, ) -> Tuple[nodes.BigFrameNode, Optional[int]]: """ @@ -53,7 +53,7 @@ def _pullup_slice_inner( assert root.step == 1 assert root.stop is not None limit = root.stop - new_root, prior_limit = _pullup_slice_inner(root.child) + new_root, prior_limit = pull_out_limit(root.child) if (prior_limit is not None) and (prior_limit < limit): limit = prior_limit return new_root, limit @@ -61,7 +61,7 @@ def _pullup_slice_inner( isinstance(root, (nodes.SelectionNode, nodes.ProjectionNode)) and root.row_preserving ): - new_child, prior_limit = _pullup_slice_inner(root.child) + new_child, prior_limit = pull_out_limit(root.child) if prior_limit is not None: return root.transform_children(lambda _: new_child), prior_limit # Most ops don't support pulling up slice, like filter, agg, join, etc. diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index 82df53af82..baf4b12566 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -45,26 +45,13 @@ def can_fast_head(node: nodes.BigFrameNode) -> bool: # To do fast head operation: # (1) the underlying data must be arranged/indexed according to the logical ordering # (2) transformations must support pushing down LIMIT or a filter on row numbers - return has_fast_offset_address(node) or has_fast_offset_address(node) - - -def has_fast_orderby_limit(node: nodes.BigFrameNode) -> bool: - """True iff ORDER BY LIMIT can be performed without a large full table scan.""" - # TODO: In theory compatible with some Slice nodes, potentially by adding OFFSET - if isinstance(node, nodes.LeafNode): - return node.fast_ordered_limit - if isinstance(node, (nodes.ProjectionNode, nodes.SelectionNode)): - return has_fast_orderby_limit(node.child) - return False - - -def has_fast_offset_address(node: nodes.BigFrameNode) -> bool: - """True iff specific offsets can be scanned without a large full table scan.""" - # TODO: In theory can push offset lookups through slice operators by translating indices - if isinstance(node, nodes.LeafNode): - return node.fast_offsets + if isinstance(node, nodes.ReadLocalNode): + # always cheap to push slice into local data + return True + if isinstance(node, nodes.ReadTableNode): + return (node.source.ordering is None) or (node.fast_ordered_limit) if isinstance(node, (nodes.ProjectionNode, nodes.SelectionNode)): - return has_fast_offset_address(node.child) + return can_fast_head(node.child) return False diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 8b0a1266ce..ba669a62bb 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -387,6 +387,7 @@ def read_gbq_table( # type: ignore[overload-overlap] enable_snapshot: bool = ..., dry_run: Literal[False] = ..., force_total_order: Optional[bool] = ..., + n_rows: Optional[int] = None, ) -> dataframe.DataFrame: ... @@ -408,6 +409,7 @@ def read_gbq_table( enable_snapshot: bool = ..., dry_run: Literal[True] = ..., force_total_order: Optional[bool] = ..., + n_rows: Optional[int] = None, ) -> pandas.Series: ... @@ -428,6 +430,7 @@ def read_gbq_table( enable_snapshot: bool = True, dry_run: bool = False, force_total_order: Optional[bool] = None, + n_rows: Optional[int] = None, ) -> dataframe.DataFrame | pandas.Series: import bigframes._tools.strings import bigframes.dataframe as dataframe @@ -618,6 +621,7 @@ def read_gbq_table( at_time=time_travel_timestamp if enable_snapshot else None, primary_key=primary_key, session=self._session, + n_rows=n_rows, ) # if we don't have a unique index, we order by row hash if we are in strict mode if ( @@ -852,6 +856,7 @@ def read_gbq_query( columns=columns, use_cache=configuration["query"]["useQueryCache"], force_total_order=force_total_order, + n_rows=query_job.result().total_rows, # max_results and filters are omitted because they are already # handled by to_query(), above. ) diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index 9384a40fbe..d4bbf2783c 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -39,12 +39,17 @@ def execute( ordered: bool, peek: Optional[int] = None, ) -> Optional[executor.ExecuteResult]: - node = self._try_adapt_plan(plan, ordered) - if not node: + adapt_result = self._try_adapt_plan(plan, ordered) + if not adapt_result: return None + node, limit = adapt_result if node.explicitly_ordered and ordered: return None + if limit is not None: + if peek is None or limit < peek: + peek = limit + import google.cloud.bigquery_storage_v1.types as bq_storage_types from google.protobuf import timestamp_pb2 @@ -117,11 +122,20 @@ def _try_adapt_plan( self, plan: bigframe_node.BigFrameNode, ordered: bool, - ) -> Optional[nodes.ReadTableNode]: + ) -> Optional[tuple[nodes.ReadTableNode, Optional[int]]]: """ - Tries to simplify the plan to an equivalent single ReadTableNode. Otherwise, returns None. + Tries to simplify the plan to an equivalent single ReadTableNode and a limit. Otherwise, returns None. """ + plan, limit = rewrite.pull_out_limit(plan) + # bake_order does not allow slice ops + plan = plan.bottom_up(rewrite.rewrite_slice) if not ordered: # gets rid of order_by ops plan = rewrite.bake_order(plan) - return rewrite.try_reduce_to_table_scan(plan) + read_table_node = rewrite.try_reduce_to_table_scan(plan) + if read_table_node is None: + return None + if (limit is not None) and (read_table_node.source.ordering is not None): + # read api can only use physical ordering to limit, not a logical ordering + return None + return (read_table_node, limit) diff --git a/tests/system/small/session/test_read_gbq_colab.py b/tests/system/small/session/test_read_gbq_colab.py index 946faffab2..a821901e4c 100644 --- a/tests/system/small/session/test_read_gbq_colab.py +++ b/tests/system/small/session/test_read_gbq_colab.py @@ -19,6 +19,7 @@ def test_read_gbq_colab_to_pandas_batches_preserves_order_by(maybe_ordered_session): + executions_before_sql = maybe_ordered_session._metrics.execution_count df = maybe_ordered_session._read_gbq_colab( """ SELECT @@ -32,9 +33,11 @@ def test_read_gbq_colab_to_pandas_batches_preserves_order_by(maybe_ordered_sessi LIMIT 300 """ ) + executions_before_python = maybe_ordered_session._metrics.execution_count batches = df.to_pandas_batches( page_size=100, ) + executions_after = maybe_ordered_session._metrics.execution_count total_rows = 0 for batch in batches: @@ -42,6 +45,55 @@ def test_read_gbq_colab_to_pandas_batches_preserves_order_by(maybe_ordered_sessi total_rows += len(batch.index) assert total_rows > 0 + assert executions_after == executions_before_python == executions_before_sql + 1 + + +def test_read_gbq_colab_peek_avoids_requery(maybe_ordered_session): + executions_before_sql = maybe_ordered_session._metrics.execution_count + df = maybe_ordered_session._read_gbq_colab( + """ + SELECT + name, + SUM(number) AS total + FROM + `bigquery-public-data.usa_names.usa_1910_2013` + WHERE state LIKE 'W%' + GROUP BY name + ORDER BY total DESC + LIMIT 300 + """ + ) + executions_before_python = maybe_ordered_session._metrics.execution_count + result = df.peek(100) + executions_after = maybe_ordered_session._metrics.execution_count + + # Ok, this isn't guaranteed by peek, but should happen with read api based impl + # if starts failing, maybe stopped using read api? + assert result["total"].is_monotonic_decreasing + + assert len(result) == 100 + assert executions_after == executions_before_python == executions_before_sql + 1 + + +def test_read_gbq_colab_repr_avoids_requery(maybe_ordered_session): + executions_before_sql = maybe_ordered_session._metrics.execution_count + df = maybe_ordered_session._read_gbq_colab( + """ + SELECT + name, + SUM(number) AS total + FROM + `bigquery-public-data.usa_names.usa_1910_2013` + WHERE state LIKE 'W%' + GROUP BY name + ORDER BY total DESC + LIMIT 300 + """ + ) + executions_before_python = maybe_ordered_session._metrics.execution_count + _ = repr(df) + executions_after = maybe_ordered_session._metrics.execution_count + assert executions_after == executions_before_python == executions_before_sql + 1 def test_read_gbq_colab_includes_formatted_scalars(session):