From 508ded892ac6599916b7562d6a667caa3d533c45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Thu, 29 May 2025 13:40:30 -0500 Subject: [PATCH 1/6] fix: include location in Session-based temporary storage manager DDL queries --- bigframes/session/bigquery_session.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/bigframes/session/bigquery_session.py b/bigframes/session/bigquery_session.py index ae8dc88d43..b9185bf206 100644 --- a/bigframes/session/bigquery_session.py +++ b/bigframes/session/bigquery_session.py @@ -84,7 +84,9 @@ def create_temp_table( ddl = f"CREATE TEMP TABLE `_SESSION`.{googlesql.identifier(table_ref.table_id)} ({fields_string}){cluster_string}" - job = self.bqclient.query(ddl, job_config=job_config) + job = self.bqclient.query( + ddl, job_config=job_config, location=self.location + ) job.result() # return the fully qualified table, so it can be used outside of the session return job.destination @@ -94,7 +96,10 @@ def close(self): self._sessiondaemon.stop() if self._session_id is not None and self.bqclient is not None: - self.bqclient.query_and_wait(f"CALL BQ.ABORT_SESSION('{self._session_id}')") + self.bqclient.query_and_wait( + f"CALL BQ.ABORT_SESSION('{self._session_id}')", + location=self.location, + ) def _get_session_id(self) -> str: if self._session_id: From 7a44bc31b0dfae7e1d1b6eaee49dd4fe5197a2ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Thu, 29 May 2025 13:43:43 -0500 Subject: [PATCH 2/6] fix indentation error --- bigframes/session/bigquery_session.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bigframes/session/bigquery_session.py b/bigframes/session/bigquery_session.py index b9185bf206..883087df07 100644 --- a/bigframes/session/bigquery_session.py +++ b/bigframes/session/bigquery_session.py @@ -96,10 +96,10 @@ def close(self): self._sessiondaemon.stop() if self._session_id is not None and self.bqclient is not None: - self.bqclient.query_and_wait( - f"CALL BQ.ABORT_SESSION('{self._session_id}')", - location=self.location, - ) + self.bqclient.query_and_wait( + f"CALL BQ.ABORT_SESSION('{self._session_id}')", + location=self.location, + ) def _get_session_id(self) -> str: if self._session_id: From 5ecd58b2bb15603f701a76673fc26db505d1a10c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Thu, 29 May 2025 14:27:23 -0500 Subject: [PATCH 3/6] set location in read session, too --- bigframes/session/bq_caching_executor.py | 1 + bigframes/session/read_api_execution.py | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 33d3314a1e..00c222dae3 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -159,6 +159,7 @@ def __init__( read_api_execution.ReadApiSemiExecutor( bqstoragereadclient=bqstoragereadclient, project=self.bqclient.project, + location=self.bqclient.location or "US", ), local_scan_executor.LocalScanExecutor(), ) diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index 9384a40fbe..31c8ccd726 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -28,10 +28,15 @@ class ReadApiSemiExecutor(semi_executor.SemiExecutor): """ def __init__( - self, bqstoragereadclient: bigquery_storage_v1.BigQueryReadClient, project: str + self, + bqstoragereadclient: bigquery_storage_v1.BigQueryReadClient, + project: str, + *, + location: str, ): self.bqstoragereadclient = bqstoragereadclient self.project = project + self.location = location def execute( self, @@ -71,7 +76,7 @@ def execute( ) # Single stream to maintain ordering request = bq_storage_types.CreateReadSessionRequest( - parent=f"projects/{self.project}", + parent=f"projects/{self.project}/locations/{self.location}", read_session=requested_session, max_stream_count=1, ) From 43c94d31f9da6065499c95ced81f723641051ec3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Thu, 29 May 2025 14:54:24 -0500 Subject: [PATCH 4/6] add missing location parent --- bigframes/session/read_api_execution.py | 2 +- tests/system/large/test_location.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index 31c8ccd726..802d20d5e8 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -36,7 +36,7 @@ def __init__( ): self.bqstoragereadclient = bqstoragereadclient self.project = project - self.location = location + self.location = location.lower() # "US" is invalid, but "us" is valid def execute( self, diff --git a/tests/system/large/test_location.py b/tests/system/large/test_location.py index d4428c1f95..7cf73a2832 100644 --- a/tests/system/large/test_location.py +++ b/tests/system/large/test_location.py @@ -70,7 +70,7 @@ def _assert_bq_execution_location( data_format=bqstorage_types.DataFormat.ARROW, # type: ignore[attr-defined] ) read_session = session.bqstoragereadclient.create_read_session( - parent=f"projects/{table.project}", + parent=f"projects/{table.project}/locations/{expected_location.lower()}", read_session=requested_session, max_stream_count=1, ) From bcc2431f0336032f56d8efbf7fa097227562ae2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Thu, 29 May 2025 15:06:42 -0500 Subject: [PATCH 5/6] remove broken read session parent --- bigframes/session/bq_caching_executor.py | 1 - bigframes/session/read_api_execution.py | 9 ++------- tests/system/large/test_location.py | 23 ----------------------- 3 files changed, 2 insertions(+), 31 deletions(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 00c222dae3..33d3314a1e 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -159,7 +159,6 @@ def __init__( read_api_execution.ReadApiSemiExecutor( bqstoragereadclient=bqstoragereadclient, project=self.bqclient.project, - location=self.bqclient.location or "US", ), local_scan_executor.LocalScanExecutor(), ) diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index 802d20d5e8..9384a40fbe 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -28,15 +28,10 @@ class ReadApiSemiExecutor(semi_executor.SemiExecutor): """ def __init__( - self, - bqstoragereadclient: bigquery_storage_v1.BigQueryReadClient, - project: str, - *, - location: str, + self, bqstoragereadclient: bigquery_storage_v1.BigQueryReadClient, project: str ): self.bqstoragereadclient = bqstoragereadclient self.project = project - self.location = location.lower() # "US" is invalid, but "us" is valid def execute( self, @@ -76,7 +71,7 @@ def execute( ) # Single stream to maintain ordering request = bq_storage_types.CreateReadSessionRequest( - parent=f"projects/{self.project}/locations/{self.location}", + parent=f"projects/{self.project}", read_session=requested_session, max_stream_count=1, ) diff --git a/tests/system/large/test_location.py b/tests/system/large/test_location.py index 7cf73a2832..e30059dddc 100644 --- a/tests/system/large/test_location.py +++ b/tests/system/large/test_location.py @@ -15,7 +15,6 @@ import typing from google.cloud import bigquery -from google.cloud.bigquery_storage import types as bqstorage_types import pandas import pandas.testing import pytest @@ -63,28 +62,6 @@ def _assert_bq_execution_location( expected_result, result.to_pandas(), check_dtype=False, check_index_type=False ) - # Ensure BQ Storage Read client operation succceeds - table = result.query_job.destination - requested_session = bqstorage_types.ReadSession( # type: ignore[attr-defined] - table=f"projects/{table.project}/datasets/{table.dataset_id}/tables/{table.table_id}", - data_format=bqstorage_types.DataFormat.ARROW, # type: ignore[attr-defined] - ) - read_session = session.bqstoragereadclient.create_read_session( - parent=f"projects/{table.project}/locations/{expected_location.lower()}", - read_session=requested_session, - max_stream_count=1, - ) - reader = session.bqstoragereadclient.read_rows(read_session.streams[0].name) - frames = [] - for message in reader.rows().pages: - frames.append(message.to_dataframe()) - read_dataframe = pandas.concat(frames) - # normalize before comparing since we lost some of the bigframes column - # naming abtractions in the direct read of the destination table - read_dataframe = read_dataframe.set_index("name") - read_dataframe.columns = result.columns - pandas.testing.assert_frame_equal(expected_result, read_dataframe) - def test_bq_location_default(): session = bigframes.Session() From 15fa7289e13ce63ae04382c126ac167254d33549 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Fri, 30 May 2025 09:52:19 -0500 Subject: [PATCH 6/6] validate location of the destination table --- tests/system/large/test_location.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/tests/system/large/test_location.py b/tests/system/large/test_location.py index e30059dddc..3ebe2bb040 100644 --- a/tests/system/large/test_location.py +++ b/tests/system/large/test_location.py @@ -14,7 +14,6 @@ import typing -from google.cloud import bigquery import pandas import pandas.testing import pytest @@ -40,7 +39,15 @@ def _assert_bq_execution_location( if expected_location is None: expected_location = session._location - assert typing.cast(bigquery.QueryJob, df.query_job).location == expected_location + query_job = df.query_job + assert query_job is not None + assert query_job.location == expected_location + destination = query_job.destination + assert destination is not None + destination_dataset = session.bqclient.get_dataset( + f"{destination.project}.{destination.dataset_id}" + ) + assert destination_dataset.location == expected_location # Ensure operation involving BQ client suceeds result = ( @@ -51,15 +58,27 @@ def _assert_bq_execution_location( .head() ) - assert ( - typing.cast(bigquery.QueryJob, result.query_job).location == expected_location + # Use allow_large_results = True to force a job to be created. + result_pd = result.to_pandas(allow_large_results=True) + + query_job = df.query_job + assert query_job is not None + assert query_job.location == expected_location + destination = query_job.destination + assert destination is not None + destination_dataset = session.bqclient.get_dataset( + f"{destination.project}.{destination.dataset_id}" ) + assert destination_dataset.location == expected_location expected_result = pandas.DataFrame( {"number": [444, 222]}, index=pandas.Index(["aaa", "bbb"], name="name") ) pandas.testing.assert_frame_equal( - expected_result, result.to_pandas(), check_dtype=False, check_index_type=False + expected_result, + result_pd, + check_dtype=False, + check_index_type=False, )