diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 0f9cacd83d..c2250dfe1c 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -513,9 +513,14 @@ def _materialize_local( ) -> Tuple[pd.DataFrame, bigquery.QueryJob]: """Run query and download results as a pandas DataFrame. Return the total number of results as well.""" # TODO(swast): Allow for dry run and timeout. - results_iterator, query_job = self.session._execute( - self.expr, sorted=materialize_options.ordered + _, query_job = self.session._query_to_destination( + self.session._to_sql(self.expr, sorted=True), + list(self.index_columns), + api_name="cached", + do_clustering=False, ) + results_iterator = query_job.result() + table_size = ( self.session._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b6d56006be..47f4bc98b5 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -428,7 +428,8 @@ def _query_to_destination( index_cols: List[str], api_name: str, configuration: dict = {"query": {"useQueryCache": True}}, - ) -> Tuple[Optional[bigquery.TableReference], Optional[bigquery.QueryJob]]: + do_clustering=True, + ) -> Tuple[Optional[bigquery.TableReference], bigquery.QueryJob]: # If a dry_run indicates this is not a query type job, then don't # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. dry_run_config = bigquery.QueryJobConfig() @@ -442,11 +443,14 @@ def _query_to_destination( # internal issue 303057336. # Since we have a `statement_type == 'SELECT'`, schema should be populated. schema = typing.cast(Iterable[bigquery.SchemaField], dry_run_job.schema) - cluster_cols = [ - item.name - for item in schema - if (item.name in index_cols) and _can_cluster_bq(item) - ][:_MAX_CLUSTER_COLUMNS] + if do_clustering: + cluster_cols = [ + item.name + for item in schema + if (item.name in index_cols) and _can_cluster_bq(item) + ][:_MAX_CLUSTER_COLUMNS] + else: + cluster_cols = [] temp_table = self._create_empty_temp_table(schema, cluster_cols) timeout_ms = configuration.get("jobTimeoutMs") or configuration["query"].get( diff --git a/tests/system/load/test_large_tables.py b/tests/system/load/test_large_tables.py index 22baa2268f..871c846c79 100644 --- a/tests/system/load/test_large_tables.py +++ b/tests/system/load/test_large_tables.py @@ -90,3 +90,14 @@ def test_to_pandas_batches_large_table(): del df assert row_count == expected_row_count + + +def test_to_pandas_large_table(): + df = bpd.read_gbq("load_testing.scalars_10gb") + # df will be downloaded locally + expected_row_count, expected_column_count = df.shape + + df = df.to_pandas() + row_count, column_count = df.shape + assert column_count == expected_column_count + assert row_count == expected_row_count