diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 9361543d5f..d89d35516e 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2542,7 +2542,8 @@ def _get_rows_as_json_values(self) -> Block: SELECT {select_columns_csv} FROM T1 """ # The only ways this code is used is through df.apply(axis=1) cope path - destination, query_job = self.session._query_to_destination( + # TODO: Stop using internal API + destination, query_job = self.session._loader._query_to_destination( json_sql, index_cols=[ordering_column_name], api_name="apply" ) if not destination: diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 17dde7021b..6b782b4692 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2956,17 +2956,20 @@ def to_csv( if "*" not in path_or_buf: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) - result_table = self._run_io_query( - index=index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID - ) - export_data_statement = bigframes.session._io.bigquery.create_export_csv_statement( - f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}", - uri=path_or_buf, - field_delimiter=sep, - header=header, + export_array, id_overrides = self._prepare_export( + index=index and self._has_index, + ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) - _, query_job = self._block.expr.session._start_query( - export_data_statement, api_name="dataframe-to_csv" + options = { + "field_delimiter": sep, + "header": header, + } + query_job = self._session._executor.export_gcs( + export_array, + id_overrides, + path_or_buf, + format="csv", + export_options=options, ) self._set_internal_query_job(query_job) return None @@ -3006,17 +3009,12 @@ def to_json( "'lines' keyword is only valid when 'orient' is 'records'." ) - result_table = self._run_io_query( - index=index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID - ) - export_data_statement = bigframes.session._io.bigquery.create_export_data_statement( - f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}", - uri=path_or_buf, - format="JSON", - export_options={}, + export_array, id_overrides = self._prepare_export( + index=index and self._has_index, + ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) - _, query_job = self._block.expr.session._start_query( - export_data_statement, api_name="dataframe-to_json" + query_job = self._session._executor.export_gcs( + export_array, id_overrides, path_or_buf, format="json", export_options={} ) self._set_internal_query_job(query_job) return None @@ -3145,18 +3143,17 @@ def to_parquet( if compression: export_options["compression"] = compression.upper() - result_table = self._run_io_query( - index=index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID + export_array, id_overrides = self._prepare_export( + index=index and self._has_index, + ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) - export_data_statement = bigframes.session._io.bigquery.create_export_data_statement( - f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}", - uri=path, - format="PARQUET", + query_job = self._session._executor.export_gcs( + export_array, + id_overrides, + path, + format="parquet", export_options=export_options, ) - _, query_job = self._block.expr.session._start_query( - export_data_statement, api_name="dataframe-to_parquet" - ) self._set_internal_query_job(query_job) return None @@ -3386,30 +3383,6 @@ def _prepare_export( array_value = array_value.promote_offsets(ordering_id) return array_value, id_overrides - def _run_io_query( - self, - index: bool, - ordering_id: Optional[str] = None, - ) -> bigquery.TableReference: - """Executes a query job presenting this dataframe and returns the destination - table.""" - session = self._block.expr.session - export_array, id_overrides = self._prepare_export( - index=index and self._has_index, ordering_id=ordering_id - ) - - _, query_job = session._execute( - export_array, - ordered=False, - col_id_overrides=id_overrides, - ) - self._set_internal_query_job(query_job) - - # The query job should have finished, so there should be always be a result table. - result_table = query_job.destination - assert result_table is not None - return result_table - def map(self, func, na_action: Optional[str] = None) -> DataFrame: if not callable(func): raise TypeError("the first argument must be callable") diff --git a/bigframes/functions/_remote_function_client.py b/bigframes/functions/_remote_function_client.py index 6ef482ecda..3698bda28b 100644 --- a/bigframes/functions/_remote_function_client.py +++ b/bigframes/functions/_remote_function_client.py @@ -37,6 +37,8 @@ import google.api_core.retry from google.cloud import bigquery, functions_v2 +import bigframes.session._io.bigquery + from . import _utils logger = logging.getLogger(__name__) @@ -142,7 +144,12 @@ def create_bq_remote_function( self._bq_client.create_dataset(dataset, exists_ok=True) # TODO(swast): plumb through the original, user-facing api_name. - _, query_job = self._session._start_query(create_function_ddl) + _, query_job = bigframes.session._io.bigquery.start_query_with_client( + self._session.bqclient, + create_function_ddl, + job_config=bigquery.QueryJobConfig(), + ) + logger.info(f"Created remote function {query_job.ddl_target_routine}") def get_cloud_function_fully_qualified_parent(self): diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 0868ef202a..c91266b875 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -16,9 +16,6 @@ from __future__ import annotations -import copy -import datetime -import itertools import logging import os import secrets @@ -27,10 +24,8 @@ Any, Callable, Dict, - Hashable, IO, Iterable, - List, Literal, Mapping, MutableSequence, @@ -47,21 +42,10 @@ import bigframes_vendored.pandas.io.parquet as third_party_pandas_parquet import bigframes_vendored.pandas.io.parsers.readers as third_party_pandas_readers import bigframes_vendored.pandas.io.pickle as third_party_pandas_pickle -import google.api_core.client_info -import google.api_core.client_options -import google.api_core.exceptions -import google.api_core.gapic_v1.client_info -import google.auth.credentials import google.cloud.bigquery as bigquery -import google.cloud.bigquery.table -import google.cloud.bigquery_connection_v1 -import google.cloud.bigquery_storage_v1 -import google.cloud.functions_v2 -import google.cloud.resourcemanager_v3 import google.cloud.storage as storage # type: ignore import ibis import ibis.backends.bigquery as ibis_bigquery -import jellyfish import numpy as np import pandas from pandas._typing import ( @@ -80,8 +64,6 @@ import bigframes.core.compile import bigframes.core.guid import bigframes.core.pruning -import bigframes.core.schema as schemata -import bigframes.core.utils as utils # Even though the ibis.backends.bigquery import is unused, it's needed # to register new and replacement ops with the Ibis BigQuery backend. @@ -92,9 +74,9 @@ import bigframes.functions._remote_function_session as bigframes_rf_session import bigframes.functions.remote_function as bigframes_rf import bigframes.session._io.bigquery as bf_io_bigquery -import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table import bigframes.session.clients import bigframes.session.executor +import bigframes.session.loader import bigframes.session.metrics import bigframes.session.planner import bigframes.session.temp_storage @@ -109,8 +91,6 @@ _BIGFRAMES_DEFAULT_CONNECTION_ID = "bigframes-default-connection" -_MAX_CLUSTER_COLUMNS = 4 - # TODO(swast): Need to connect to regional endpoints when performing remote # functions operations (BQ Connection IAM, Cloud Run / Cloud Functions). # Also see if resource manager client library supports regional endpoints. @@ -145,20 +125,6 @@ ) -def _to_index_cols( - index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), -) -> List[str]: - """Convert index_col into a list of column names.""" - if isinstance(index_col, bigframes.enums.DefaultIndexKind): - index_cols: List[str] = [] - elif isinstance(index_col, str): - index_cols = [index_col] - else: - index_cols = list(index_col) - - return index_cols - - class Session( third_party_pandas_gbq.GBQIOMixin, third_party_pandas_parquet.ParquetIOMixin, @@ -225,14 +191,6 @@ def __init__( bq_kms_key_name=self._bq_kms_key_name, ) - self._anonymous_dataset = ( - bigframes.session._io.bigquery.create_bq_dataset_reference( - self.bqclient, - location=self._location, - api_name="session-__init__", - ) - ) - # TODO(shobs): Remove this logic after https://github.com/ibis-project/ibis/issues/8494 # has been fixed. The ibis client changes the default query job config # so we are going to remember the current config and restore it after @@ -259,9 +217,6 @@ def __init__( # Now that we're starting the session, don't allow the options to be # changed. context._session_started = True - self._df_snapshot: Dict[ - bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table] - ] = {} # unique session identifier, short enough to be human readable # only needs to be unique among sessions created by the same user @@ -287,20 +242,19 @@ def __init__( bigframes.exceptions.OrderingModePartialPreviewWarning, ) - # Sequential index needs total ordering to generate, so use null index with unstrict ordering. - self._default_index_type: bigframes.enums.DefaultIndexKind = ( + self._allow_ambiguity = not self._strictly_ordered + self._default_index_type = ( bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64 if self._strictly_ordered else bigframes.enums.DefaultIndexKind.NULL ) - self._allow_ambiguity = not self._strictly_ordered self._metrics = bigframes.session.metrics.ExecutionMetrics() self._remote_function_session = bigframes_rf_session.RemoteFunctionSession() self._temp_storage_manager = ( bigframes.session.temp_storage.TemporaryGbqStorageManager( self._clients_provider.bqclient, - dataset=self._anonymous_dataset, + location=self._location, session_id=self._session_id, kms_key=self._bq_kms_key_name, ) @@ -312,6 +266,15 @@ def __init__( metrics=self._metrics, ) + self._loader = bigframes.session.loader.GbqDataLoader( + session=self, + bqclient=self._clients_provider.bqclient, + storage_manager=self._temp_storage_manager, + default_index_type=self._default_index_type, + scan_index_uniqueness=self._strictly_ordered, + metrics=self._metrics, + ) + @property def bqclient(self): return self._clients_provider.bqclient @@ -377,9 +340,13 @@ def slot_millis_sum(self): def _allows_ambiguity(self) -> bool: return self._allow_ambiguity + @property + def _anonymous_dataset(self): + return self._temp_storage_manager.dataset + def __hash__(self): # Stable hash needed to use in expression tree - return hash(str(self._anonymous_dataset)) + return hash(str(self._session_id)) def close(self): """Delete resources that were created with this session's session_id. @@ -412,7 +379,7 @@ def read_gbq( columns = col_order if bf_io_bigquery.is_query(query_or_table): - return self._read_gbq_query( + return self._loader.read_gbq_query( query_or_table, index_col=index_col, columns=columns, @@ -430,7 +397,7 @@ def read_gbq( "'configuration' or use a query." ) - return self._read_gbq_table( + return self._loader.read_gbq_table( query_or_table, index_col=index_col, columns=columns, @@ -448,71 +415,6 @@ def _register_object( ): self._objects.append(weakref.ref(object)) - def _query_to_destination( - self, - query: str, - index_cols: List[str], - api_name: str, - configuration: dict = {"query": {"useQueryCache": True}}, - do_clustering=True, - ) -> Tuple[Optional[bigquery.TableReference], bigquery.QueryJob]: - # If a dry_run indicates this is not a query type job, then don't - # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. - dry_run_config = bigquery.QueryJobConfig() - dry_run_config.dry_run = True - _, dry_run_job = self._start_query( - query, job_config=dry_run_config, api_name=api_name - ) - if dry_run_job.statement_type != "SELECT": - _, query_job = self._start_query(query, api_name=api_name) - return query_job.destination, query_job - - # Create a table to workaround BigQuery 10 GB query results limit. See: - # internal issue 303057336. - # Since we have a `statement_type == 'SELECT'`, schema should be populated. - schema = dry_run_job.schema - assert schema is not None - if do_clustering: - cluster_cols = bf_io_bigquery.select_cluster_cols( - schema, cluster_candidates=index_cols - ) - else: - cluster_cols = [] - temp_table = self._temp_storage_manager.create_temp_table(schema, cluster_cols) - - timeout_ms = configuration.get("jobTimeoutMs") or configuration["query"].get( - "timeoutMs" - ) - - # Convert timeout_ms to seconds, ensuring a minimum of 0.1 seconds to avoid - # the program getting stuck on too-short timeouts. - timeout = max(int(timeout_ms) * 1e-3, 0.1) if timeout_ms else None - - job_config = typing.cast( - bigquery.QueryJobConfig, - bigquery.QueryJobConfig.from_api_repr(configuration), - ) - job_config.destination = temp_table - - try: - # Write to temp table to workaround BigQuery 10 GB query results - # limit. See: internal issue 303057336. - job_config.labels["error_caught"] = "true" - _, query_job = self._start_query( - query, - job_config=job_config, - timeout=timeout, - api_name=api_name, - ) - return query_job.destination, query_job - except google.api_core.exceptions.BadRequest: - # Some SELECT statements still aren't compatible with cluster - # tables as the destination. For example, if the query has a - # top-level ORDER BY, this conflicts with our ability to cluster - # the table by the index column(s). - _, query_job = self._start_query(query, timeout=timeout, api_name=api_name) - return query_job.destination, query_job - def read_gbq_query( self, query: str, @@ -582,7 +484,7 @@ def read_gbq_query( elif col_order: columns = col_order - return self._read_gbq_query( + return self._loader.read_gbq_query( query=query, index_col=index_col, columns=columns, @@ -593,95 +495,6 @@ def read_gbq_query( filters=filters, ) - def _read_gbq_query( - self, - query: str, - *, - index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), - columns: Iterable[str] = (), - configuration: Optional[Dict] = None, - max_results: Optional[int] = None, - api_name: str = "read_gbq_query", - use_cache: Optional[bool] = None, - filters: third_party_pandas_gbq.FiltersType = (), - ) -> dataframe.DataFrame: - import bigframes.dataframe as dataframe - - configuration = _transform_read_gbq_configuration(configuration) - - if "query" not in configuration: - configuration["query"] = {} - - if "query" in configuration["query"]: - raise ValueError( - "The query statement must not be included in the ", - "'configuration' because it is already provided as", - " a separate parameter.", - ) - - if "useQueryCache" in configuration["query"]: - if use_cache is not None: - raise ValueError( - "'useQueryCache' in 'configuration' conflicts with" - " 'use_cache' parameter. Please specify only one." - ) - else: - configuration["query"]["useQueryCache"] = ( - True if use_cache is None else use_cache - ) - - index_cols = _to_index_cols(index_col) - - filters_copy1, filters_copy2 = itertools.tee(filters) - has_filters = len(list(filters_copy1)) != 0 - filters = typing.cast(third_party_pandas_gbq.FiltersType, filters_copy2) - if has_filters or max_results is not None: - # TODO(b/338111344): If we are running a query anyway, we might as - # well generate ROW_NUMBER() at the same time. - all_columns = itertools.chain(index_cols, columns) if columns else () - query = bf_io_bigquery.to_query( - query, - all_columns, - bf_io_bigquery.compile_filters(filters) if has_filters else None, - max_results=max_results, - # We're executing the query, so we don't need time travel for - # determinism. - time_travel_timestamp=None, - ) - - destination, query_job = self._query_to_destination( - query, - index_cols, - api_name=api_name, - configuration=configuration, - ) - - # If there was no destination table, that means the query must have - # been DDL or DML. Return some job metadata, instead. - if not destination: - return dataframe.DataFrame( - data=pandas.DataFrame( - { - "statement_type": [ - query_job.statement_type if query_job else "unknown" - ], - "job_id": [query_job.job_id if query_job else "unknown"], - "location": [query_job.location if query_job else "unknown"], - } - ), - session=self, - ) - - return self._read_gbq_table( - f"{destination.project}.{destination.dataset_id}.{destination.table_id}", - index_col=index_col, - columns=columns, - use_cache=configuration["query"]["useQueryCache"], - api_name=api_name, - # max_results and filters are omitted because they are already - # handled by to_query(), above. - ) - def read_gbq_table( self, query: str, @@ -715,7 +528,7 @@ def read_gbq_table( elif col_order: columns = col_order - return self._read_gbq_table( + return self._loader.read_gbq_table( query=query, index_col=index_col, columns=columns, @@ -750,7 +563,7 @@ def read_gbq_table_streaming( import bigframes.streaming.dataframe as streaming_dataframe - df = self._read_gbq_table( + df = self._loader.read_gbq_table( table, api_name="read_gbq_table_steaming", enable_snapshot=False, @@ -759,249 +572,6 @@ def read_gbq_table_streaming( return streaming_dataframe.StreamingDataFrame._from_table_df(df) - def _read_gbq_table( - self, - query: str, - *, - index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), - columns: Iterable[str] = (), - max_results: Optional[int] = None, - api_name: str, - use_cache: bool = True, - filters: third_party_pandas_gbq.FiltersType = (), - enable_snapshot: bool = True, - ) -> dataframe.DataFrame: - import bigframes.dataframe as dataframe - - # --------------------------------- - # Validate and transform parameters - # --------------------------------- - - if max_results and max_results <= 0: - raise ValueError( - f"`max_results` should be a positive number, got {max_results}." - ) - - table_ref = bigquery.table.TableReference.from_string( - query, default_project=self.bqclient.project - ) - - columns = list(columns) - filters = typing.cast(list, list(filters)) - - # --------------------------------- - # Fetch table metadata and validate - # --------------------------------- - - time_travel_timestamp, table = bf_read_gbq_table.get_table_metadata( - self.bqclient, - table_ref=table_ref, - api_name=api_name, - cache=self._df_snapshot, - use_cache=use_cache, - ) - table_column_names = {field.name for field in table.schema} - - if table.location.casefold() != self._location.casefold(): - raise ValueError( - f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" - ) - - for key in columns: - if key not in table_column_names: - possibility = min( - table_column_names, - key=lambda item: jellyfish.levenshtein_distance(key, item), - ) - raise ValueError( - f"Column '{key}' of `columns` not found in this table. Did you mean '{possibility}'?" - ) - - # Converting index_col into a list of column names requires - # the table metadata because we might use the primary keys - # when constructing the index. - index_cols = bf_read_gbq_table.get_index_cols( - table=table, - index_col=index_col, - ) - - for key in index_cols: - if key not in table_column_names: - possibility = min( - table_column_names, - key=lambda item: jellyfish.levenshtein_distance(key, item), - ) - raise ValueError( - f"Column '{key}' of `index_col` not found in this table. Did you mean '{possibility}'?" - ) - - # ----------------------------- - # Optionally, execute the query - # ----------------------------- - - # max_results introduces non-determinism and limits the cost on - # clustered tables, so fallback to a query. We do this here so that - # the index is consistent with tables that have primary keys, even - # when max_results is set. - # TODO(b/338419730): We don't need to fallback to a query for wildcard - # tables if we allow some non-determinism when time travel isn't supported. - if max_results is not None or bf_io_bigquery.is_table_with_wildcard_suffix( - query - ): - # TODO(b/338111344): If we are running a query anyway, we might as - # well generate ROW_NUMBER() at the same time. - all_columns = itertools.chain(index_cols, columns) if columns else () - query = bf_io_bigquery.to_query( - query, - columns=all_columns, - sql_predicate=bf_io_bigquery.compile_filters(filters) - if filters - else None, - max_results=max_results, - # We're executing the query, so we don't need time travel for - # determinism. - time_travel_timestamp=None, - ) - - return self._read_gbq_query( - query, - index_col=index_cols, - columns=columns, - api_name="read_gbq_table", - use_cache=use_cache, - ) - - # ----------------------------------------- - # Validate table access and features - # ----------------------------------------- - - # Use a time travel to make sure the DataFrame is deterministic, even - # if the underlying table changes. - - # If a dry run query fails with time travel but - # succeeds without it, omit the time travel clause and raise a warning - # about potential non-determinism if the underlying tables are modified. - filter_str = bf_io_bigquery.compile_filters(filters) if filters else None - all_columns = ( - () - if len(columns) == 0 - else (*columns, *[col for col in index_cols if col not in columns]) - ) - - enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table( - self.bqclient, table_ref, all_columns, time_travel_timestamp, filter_str - ) - - # ---------------------------- - # Create ordering and validate - # ---------------------------- - - # TODO(b/337925142): Generate a new subquery with just the index_cols - # in the Ibis table expression so we don't have a "SELECT *" subquery - # in the query that checks for index uniqueness. - # TODO(b/338065601): Provide a way to assume uniqueness and avoid this - # check. - is_index_unique = bf_read_gbq_table.are_index_cols_unique( - bqclient=self.bqclient, - table=table, - index_cols=index_cols, - api_name=api_name, - # If non in strict ordering mode, don't go through overhead of scanning index column(s) to determine if unique - metadata_only=not self._strictly_ordered, - ) - schema = schemata.ArraySchema.from_bq_table(table) - if columns: - schema = schema.select(index_cols + columns) - array_value = core.ArrayValue.from_table( - table, - schema=schema, - predicate=filter_str, - at_time=time_travel_timestamp if enable_snapshot else None, - primary_key=index_cols if is_index_unique else (), - session=self, - ) - - # ---------------------------------------------------- - # Create Default Sequential Index if still have no index - # ---------------------------------------------------- - - # If no index columns provided or found, fall back to session default - if (index_col != bigframes.enums.DefaultIndexKind.NULL) and len( - index_cols - ) == 0: - index_col = self._default_index_type - - index_names: Sequence[Hashable] = index_cols - if index_col == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64: - sequential_index_col = bigframes.core.guid.generate_guid("index_") - array_value = array_value.promote_offsets(sequential_index_col) - index_cols = [sequential_index_col] - index_names = [None] - - value_columns = [col for col in array_value.column_ids if col not in index_cols] - block = blocks.Block( - array_value, - index_columns=index_cols, - column_labels=value_columns, - index_labels=index_names, - ) - if max_results: - block = block.slice(stop=max_results) - df = dataframe.DataFrame(block) - - # If user provided index columns, should sort over it - if len(index_cols) > 0: - df.sort_index() - return df - - def _read_bigquery_load_job( - self, - filepath_or_buffer: str | IO["bytes"], - table: Union[bigquery.Table, bigquery.TableReference], - *, - job_config: bigquery.LoadJobConfig, - index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), - columns: Iterable[str] = (), - ) -> dataframe.DataFrame: - index_cols = _to_index_cols(index_col) - - if not job_config.clustering_fields and index_cols: - job_config.clustering_fields = index_cols[:_MAX_CLUSTER_COLUMNS] - - if isinstance(filepath_or_buffer, str): - if filepath_or_buffer.startswith("gs://"): - load_job = self.bqclient.load_table_from_uri( - filepath_or_buffer, table, job_config=job_config - ) - else: - with open(filepath_or_buffer, "rb") as source_file: - load_job = self.bqclient.load_table_from_file( - source_file, table, job_config=job_config - ) - else: - load_job = self.bqclient.load_table_from_file( - filepath_or_buffer, table, job_config=job_config - ) - - self._start_generic_job(load_job) - table_id = f"{table.project}.{table.dataset_id}.{table.table_id}" - - # Update the table expiration so we aren't limited to the default 24 - # hours of the anonymous dataset. - table_expiration = bigquery.Table(table_id) - table_expiration.expires = ( - datetime.datetime.now(datetime.timezone.utc) + constants.DEFAULT_EXPIRATION - ) - self.bqclient.update_table(table_expiration, ["expires"]) - - # The BigQuery REST API for tables.get doesn't take a session ID, so we - # can't get the schema for a temp table that way. - return self.read_gbq_table( - table_id, - index_col=index_col, - columns=columns, - ) - def read_gbq_model(self, model_name: str): """Loads a BigQuery ML model from BigQuery. @@ -1117,7 +687,7 @@ def _read_pandas( if inline_df is not None: return inline_df try: - return self._read_pandas_load_job(pandas_dataframe, api_name) + return self._loader.read_pandas_load_job(pandas_dataframe, api_name) except pa.ArrowInvalid as e: raise pa.ArrowInvalid( f"Could not convert with a BigQuery type: `{e}`. " @@ -1149,78 +719,6 @@ def _read_pandas_inline( return inline_df return None - def _read_pandas_load_job( - self, pandas_dataframe: pandas.DataFrame, api_name: str - ) -> dataframe.DataFrame: - import bigframes.dataframe as dataframe - - col_index = pandas_dataframe.columns.copy() - col_labels, idx_labels = ( - col_index.to_list(), - pandas_dataframe.index.names, - ) - new_col_ids, new_idx_ids = utils.get_standardized_ids( - col_labels, - idx_labels, - # Loading parquet files into BigQuery with special column names - # is only supported under an allowlist. - strict=True, - ) - - # Add order column to pandas DataFrame to preserve order in BigQuery - ordering_col = "rowid" - columns = frozenset(col_labels + idx_labels) - suffix = 2 - while ordering_col in columns: - ordering_col = f"rowid_{suffix}" - suffix += 1 - - pandas_dataframe_copy = pandas_dataframe.copy() - pandas_dataframe_copy.index.names = new_idx_ids - pandas_dataframe_copy.columns = pandas.Index(new_col_ids) - pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0]) - - job_config = bigquery.LoadJobConfig() - # Specify the datetime dtypes, which is auto-detected as timestamp types. - schema: list[bigquery.SchemaField] = [] - for column, dtype in zip(new_col_ids, pandas_dataframe.dtypes): - if dtype == "timestamp[us][pyarrow]": - schema.append( - bigquery.SchemaField(column, bigquery.enums.SqlTypeNames.DATETIME) - ) - job_config.schema = schema - - # Clustering probably not needed anyways as pandas tables are small - cluster_cols = [ordering_col] - job_config.clustering_fields = cluster_cols - - job_config.labels = {"bigframes-api": api_name} - - load_table_destination = self._temp_storage_manager._random_table() - load_job = self.bqclient.load_table_from_dataframe( - pandas_dataframe_copy, - load_table_destination, - job_config=job_config, - ) - self._start_generic_job(load_job) - - destination_table = self.bqclient.get_table(load_table_destination) - array_value = core.ArrayValue.from_table( - table=destination_table, - # TODO: Generate this directly from original pandas df. - schema=schemata.ArraySchema.from_bq_table(destination_table), - session=self, - offsets_col=ordering_col, - ).drop_columns([ordering_col]) - - block = blocks.Block( - array_value, - index_columns=new_idx_ids, - column_labels=col_index, - index_labels=idx_labels, - ) - return dataframe.DataFrame(block) - def read_csv( self, filepath_or_buffer: str | IO["bytes"], @@ -1334,7 +832,7 @@ def read_csv( elif header > 0: job_config.skip_leading_rows = header - return self._read_bigquery_load_job( + return self._loader._read_bigquery_load_job( filepath_or_buffer, table, job_config=job_config, @@ -1403,7 +901,9 @@ def read_parquet( job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY job_config.labels = {"bigframes-api": "read_parquet"} - return self._read_bigquery_load_job(path, table, job_config=job_config) + return self._loader._read_bigquery_load_job( + path, table, job_config=job_config + ) else: read_parquet_kwargs: Dict[str, Any] = {} if pandas.__version__.startswith("1."): @@ -1463,7 +963,7 @@ def read_json( job_config.encoding = encoding job_config.labels = {"bigframes-api": "read_json"} - return self._read_bigquery_load_job( + return self._loader._read_bigquery_load_job( path_or_buf, table, job_config=job_config, @@ -1521,20 +1021,6 @@ def _check_file_size(self, filepath: str): "for large files to avoid loading the file into local memory." ) - def _sql_to_temp_table( - self, - sql: str, - cluster_cols: Iterable[str], - api_name: str, - ) -> bigquery.TableReference: - destination, _ = self._query_to_destination( - sql, - index_cols=list(cluster_cols), - api_name=api_name, - ) - # There should always be a destination table for this query type. - return typing.cast(bigquery.TableReference, destination) - def remote_function( self, input_types: Union[None, type, Sequence[type]] = None, @@ -1811,34 +1297,6 @@ def _prepare_copy_job_config(self) -> bigquery.CopyJobConfig: return job_config - def _start_query( - self, - sql: str, - job_config: Optional[bigquery.job.QueryJobConfig] = None, - max_results: Optional[int] = None, - timeout: Optional[float] = None, - api_name: Optional[str] = None, - ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: - """ - Starts BigQuery query job and waits for results. - - Do not execute dataframe through this API, instead use the executor. - """ - job_config = bigquery.QueryJobConfig() if job_config is None else job_config - if bigframes.options.compute.maximum_bytes_billed is not None: - # Maybe this should be pushed down into start_query_with_client - job_config.maximum_bytes_billed = ( - bigframes.options.compute.maximum_bytes_billed - ) - return bf_io_bigquery.start_query_with_client( - self.bqclient, - sql, - job_config, - max_results, - timeout, - api_name=api_name, - ) - def _start_query_ml_ddl( self, sql: str, @@ -1883,7 +1341,7 @@ def _export( cluster_cols: Sequence[str], ) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]: # Note: cluster_cols use pre-override column ids - return self._executor.export( + return self._executor.export_gbq( array_value, destination=destination, col_id_overrides=col_id_overrides, @@ -1937,25 +1395,3 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob): def connect(context: Optional[bigquery_options.BigQueryOptions] = None) -> Session: return Session(context) - - -def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict: - """ - For backwards-compatibility, convert any previously client-side only - parameters such as timeoutMs to the property name expected by the REST API. - - Makes a copy of configuration if changes are needed. - """ - - if configuration is None: - return {} - - timeout_ms = configuration.get("query", {}).get("timeoutMs") - if timeout_ms is not None: - # Transform timeoutMs to an actual server-side configuration. - # https://github.com/googleapis/python-bigquery-pandas/issues/479 - configuration = copy.deepcopy(configuration) - del configuration["query"]["timeoutMs"] - configuration["jobTimeoutMs"] = timeout_ms - - return configuration diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index a77729cef9..011c1f1bee 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -82,26 +82,12 @@ def create_job_configs_labels( return dict(zip(labels[:MAX_LABELS_COUNT], values[:MAX_LABELS_COUNT])) -def create_export_csv_statement( - table_id: str, uri: str, field_delimiter: str, header: bool -) -> str: - return create_export_data_statement( - table_id, - uri, - "CSV", - { - "field_delimiter": field_delimiter, - "header": header, - }, - ) - - def create_export_data_statement( table_id: str, uri: str, format: str, export_options: Dict[str, Union[bool, str]] ) -> str: all_options: Dict[str, Union[bool, str]] = { "uri": uri, - "format": format, + "format": format.upper(), # TODO(swast): Does pandas have an option not to overwrite files? "overwrite": True, } diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 539658a18c..24bcd02798 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -15,7 +15,7 @@ from __future__ import annotations import math -from typing import cast, Iterable, Literal, Mapping, Optional, Sequence, Tuple +from typing import cast, Iterable, Literal, Mapping, Optional, Sequence, Tuple, Union import warnings import weakref @@ -118,9 +118,9 @@ def execute( job_config=job_config, ) - def export( + def export_gbq( self, - array_value, + array_value: bigframes.core.ArrayValue, col_id_overrides: Mapping[str, str], destination: bigquery.TableReference, if_exists: Literal["fail", "replace", "append"] = "fail", @@ -147,6 +147,35 @@ def export( job_config=job_config, ) + def export_gcs( + self, + array_value: bigframes.core.ArrayValue, + col_id_overrides: Mapping[str, str], + uri: str, + format: Literal["json", "csv", "parquet"], + export_options: Mapping[str, Union[bool, str]], + ): + """ + Export the ArrayValue to gcs. + """ + _, query_job = self.execute( + array_value, + ordered=False, + col_id_overrides=col_id_overrides, + ) + result_table = query_job.destination + export_data_statement = bq_io.create_export_data_statement( + f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}", + uri=uri, + format=format, + export_options=dict(export_options), + ) + job_config = bigquery.QueryJobConfig() + bq_io.add_labels(job_config, api_name=f"dataframe-to_{format.lower()}") + export_job = self.bqclient.query(export_data_statement, job_config=job_config) + self._wait_on_job(export_job) + return query_job + def dry_run(self, array_value: bigframes.core.ArrayValue, ordered: bool = True): """ Dry run executing the ArrayValue. @@ -198,17 +227,7 @@ def _run_execute_query( job_config.labels["bigframes-mode"] = "unordered" try: query_job = self.bqclient.query(sql, job_config=job_config) - opts = bigframes.options.display - if opts.progress_bar is not None and not query_job.configuration.dry_run: - results_iterator = formatting_helpers.wait_for_query_job( - query_job, progress_bar=opts.progress_bar - ) - else: - results_iterator = query_job.result() - - if self.metrics is not None: - self.metrics.count_job_stats(query_job) - return results_iterator, query_job + return self._wait_on_job(query_job), query_job except google.api_core.exceptions.BadRequest as e: # Unfortunately, this error type does not have a separate error code or exception type @@ -218,6 +237,19 @@ def _run_execute_query( else: raise + def _wait_on_job(self, query_job: bigquery.QueryJob) -> bigquery.table.RowIterator: + opts = bigframes.options.display + if opts.progress_bar is not None and not query_job.configuration.dry_run: + results_iterator = formatting_helpers.wait_for_query_job( + query_job, progress_bar=opts.progress_bar + ) + else: + results_iterator = query_job.result() + + if self.metrics is not None: + self.metrics.count_job_stats(query_job) + return results_iterator + def _with_cached_executions(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: return tree_properties.replace_nodes(node, (dict(self._cached_executions))) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py new file mode 100644 index 0000000000..edfd57b965 --- /dev/null +++ b/bigframes/session/loader.py @@ -0,0 +1,657 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import copy +import dataclasses +import datetime +import itertools +import typing +from typing import Dict, Hashable, IO, Iterable, List, Optional, Sequence, Tuple, Union + +import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq +import google.api_core.exceptions +import google.auth.credentials +import google.cloud.bigquery as bigquery +import google.cloud.bigquery.table +import google.cloud.bigquery_connection_v1 +import google.cloud.bigquery_storage_v1 +import google.cloud.functions_v2 +import google.cloud.resourcemanager_v3 +import jellyfish +import numpy as np +import pandas + +import bigframes.clients +import bigframes.constants as constants +import bigframes.core as core +import bigframes.core.blocks as blocks +import bigframes.core.compile +import bigframes.core.guid +import bigframes.core.pruning +import bigframes.core.schema as schemata +import bigframes.core.utils as utils + +# Even though the ibis.backends.bigquery import is unused, it's needed +# to register new and replacement ops with the Ibis BigQuery backend. +import bigframes.dataframe +import bigframes.dtypes +import bigframes.exceptions +import bigframes.formatting_helpers as formatting_helpers +import bigframes.session._io.bigquery as bf_io_bigquery +import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table +import bigframes.session.clients +import bigframes.session.executor +import bigframes.session.metrics +import bigframes.session.planner +import bigframes.session.temp_storage +import bigframes.version + +# Avoid circular imports. +if typing.TYPE_CHECKING: + import bigframes.core.indexes + import bigframes.dataframe as dataframe + import bigframes.series + import bigframes.session + +_MAX_CLUSTER_COLUMNS = 4 + + +def _to_index_cols( + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), +) -> List[str]: + """Convert index_col into a list of column names.""" + if isinstance(index_col, bigframes.enums.DefaultIndexKind): + index_cols: List[str] = [] + elif isinstance(index_col, str): + index_cols = [index_col] + else: + index_cols = list(index_col) + + return index_cols + + +@dataclasses.dataclass +class GbqDataLoader: + """ + Responsible for loading data into BigFrames using temporary bigquery tables. + + This loader is constrained to loading local data and queries against data sources in the same region as the storage manager. + + + Args: + session (bigframes.session.Session): + The session the data will be loaded into. Objects will not be compatible with other sessions. + bqclient (bigquery.Client): + An object providing client library objects. + storage_manager (bigframes.session.temp_storage.TemporaryGbqStorageManager): + Manages temporary storage used by the loader. + default_index_type (bigframes.enums.DefaultIndexKind): + Determines the index type created for data loaded from gcs or gbq. + scan_index_uniqueness (bool): + Whether the loader will scan index columns to determine whether the values are unique. + This behavior is useful in total ordering mode to use index column as order key. + metrics (bigframes.session.metrics.ExecutionMetrics or None): + Used to record query execution statistics. + """ + + def __init__( + self, + session: bigframes.session.Session, + bqclient: bigquery.Client, + storage_manager: bigframes.session.temp_storage.TemporaryGbqStorageManager, + default_index_type: bigframes.enums.DefaultIndexKind, + scan_index_uniqueness: bool, + metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + ): + self._bqclient = bqclient + self._storage_manager = storage_manager + self._default_index_type = default_index_type + self._scan_index_uniqueness = scan_index_uniqueness + self._df_snapshot: Dict[ + bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table] + ] = {} + self._metrics = metrics + # Unfortunate circular reference, but need to pass reference when constructing objects + self._session = session + + def read_pandas_load_job( + self, pandas_dataframe: pandas.DataFrame, api_name: str + ) -> dataframe.DataFrame: + import bigframes.dataframe as dataframe + + col_index = pandas_dataframe.columns.copy() + col_labels, idx_labels = ( + col_index.to_list(), + pandas_dataframe.index.names, + ) + new_col_ids, new_idx_ids = utils.get_standardized_ids( + col_labels, + idx_labels, + # Loading parquet files into BigQuery with special column names + # is only supported under an allowlist. + strict=True, + ) + + # Add order column to pandas DataFrame to preserve order in BigQuery + ordering_col = "rowid" + columns = frozenset(col_labels + idx_labels) + suffix = 2 + while ordering_col in columns: + ordering_col = f"rowid_{suffix}" + suffix += 1 + + pandas_dataframe_copy = pandas_dataframe.copy() + pandas_dataframe_copy.index.names = new_idx_ids + pandas_dataframe_copy.columns = pandas.Index(new_col_ids) + pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0]) + + job_config = bigquery.LoadJobConfig() + # Specify the datetime dtypes, which is auto-detected as timestamp types. + schema: list[bigquery.SchemaField] = [] + for column, dtype in zip(new_col_ids, pandas_dataframe.dtypes): + if dtype == "timestamp[us][pyarrow]": + schema.append( + bigquery.SchemaField(column, bigquery.enums.SqlTypeNames.DATETIME) + ) + job_config.schema = schema + + # Clustering probably not needed anyways as pandas tables are small + cluster_cols = [ordering_col] + job_config.clustering_fields = cluster_cols + + job_config.labels = {"bigframes-api": api_name} + + load_table_destination = self._storage_manager._random_table() + load_job = self._bqclient.load_table_from_dataframe( + pandas_dataframe_copy, + load_table_destination, + job_config=job_config, + ) + self._start_generic_job(load_job) + + destination_table = self._bqclient.get_table(load_table_destination) + array_value = core.ArrayValue.from_table( + table=destination_table, + # TODO: Generate this directly from original pandas df. + schema=schemata.ArraySchema.from_bq_table(destination_table), + session=self._session, + offsets_col=ordering_col, + ).drop_columns([ordering_col]) + + block = blocks.Block( + array_value, + index_columns=new_idx_ids, + column_labels=col_index, + index_labels=idx_labels, + ) + return dataframe.DataFrame(block) + + def _start_generic_job(self, job: formatting_helpers.GenericJob): + if bigframes.options.display.progress_bar is not None: + formatting_helpers.wait_for_job( + job, bigframes.options.display.progress_bar + ) # Wait for the job to complete + else: + job.result() + + def read_gbq_table( + self, + query: str, + *, + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), + columns: Iterable[str] = (), + max_results: Optional[int] = None, + api_name: str, + use_cache: bool = True, + filters: third_party_pandas_gbq.FiltersType = (), + enable_snapshot: bool = True, + ) -> dataframe.DataFrame: + import bigframes.dataframe as dataframe + + # --------------------------------- + # Validate and transform parameters + # --------------------------------- + + if max_results and max_results <= 0: + raise ValueError( + f"`max_results` should be a positive number, got {max_results}." + ) + + table_ref = google.cloud.bigquery.table.TableReference.from_string( + query, default_project=self._bqclient.project + ) + + columns = list(columns) + filters = typing.cast(list, list(filters)) + + # --------------------------------- + # Fetch table metadata and validate + # --------------------------------- + + time_travel_timestamp, table = bf_read_gbq_table.get_table_metadata( + self._bqclient, + table_ref=table_ref, + api_name=api_name, + cache=self._df_snapshot, + use_cache=use_cache, + ) + table_column_names = {field.name for field in table.schema} + + if table.location.casefold() != self._storage_manager.location.casefold(): + raise ValueError( + f"Current session is in {self._storage_manager.location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" + ) + + for key in columns: + if key not in table_column_names: + possibility = min( + table_column_names, + key=lambda item: jellyfish.levenshtein_distance(key, item), + ) + raise ValueError( + f"Column '{key}' of `columns` not found in this table. Did you mean '{possibility}'?" + ) + + # Converting index_col into a list of column names requires + # the table metadata because we might use the primary keys + # when constructing the index. + index_cols = bf_read_gbq_table.get_index_cols( + table=table, + index_col=index_col, + ) + + for key in index_cols: + if key not in table_column_names: + possibility = min( + table_column_names, + key=lambda item: jellyfish.levenshtein_distance(key, item), + ) + raise ValueError( + f"Column '{key}' of `index_col` not found in this table. Did you mean '{possibility}'?" + ) + + # ----------------------------- + # Optionally, execute the query + # ----------------------------- + + # max_results introduces non-determinism and limits the cost on + # clustered tables, so fallback to a query. We do this here so that + # the index is consistent with tables that have primary keys, even + # when max_results is set. + # TODO(b/338419730): We don't need to fallback to a query for wildcard + # tables if we allow some non-determinism when time travel isn't supported. + if max_results is not None or bf_io_bigquery.is_table_with_wildcard_suffix( + query + ): + # TODO(b/338111344): If we are running a query anyway, we might as + # well generate ROW_NUMBER() at the same time. + all_columns = itertools.chain(index_cols, columns) if columns else () + query = bf_io_bigquery.to_query( + query, + columns=all_columns, + sql_predicate=bf_io_bigquery.compile_filters(filters) + if filters + else None, + max_results=max_results, + # We're executing the query, so we don't need time travel for + # determinism. + time_travel_timestamp=None, + ) + + return self.read_gbq_query( + query, + index_col=index_cols, + columns=columns, + api_name="read_gbq_table", + use_cache=use_cache, + ) + + # ----------------------------------------- + # Validate table access and features + # ----------------------------------------- + + # Use a time travel to make sure the DataFrame is deterministic, even + # if the underlying table changes. + + # If a dry run query fails with time travel but + # succeeds without it, omit the time travel clause and raise a warning + # about potential non-determinism if the underlying tables are modified. + filter_str = bf_io_bigquery.compile_filters(filters) if filters else None + all_columns = ( + () + if len(columns) == 0 + else (*columns, *[col for col in index_cols if col not in columns]) + ) + + enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table( + self._bqclient, table_ref, all_columns, time_travel_timestamp, filter_str + ) + + # ---------------------------- + # Create ordering and validate + # ---------------------------- + + # TODO(b/337925142): Generate a new subquery with just the index_cols + # in the Ibis table expression so we don't have a "SELECT *" subquery + # in the query that checks for index uniqueness. + # TODO(b/338065601): Provide a way to assume uniqueness and avoid this + # check. + is_index_unique = bf_read_gbq_table.are_index_cols_unique( + bqclient=self._bqclient, + table=table, + index_cols=index_cols, + api_name=api_name, + # If non in strict ordering mode, don't go through overhead of scanning index column(s) to determine if unique + metadata_only=not self._scan_index_uniqueness, + ) + schema = schemata.ArraySchema.from_bq_table(table) + if columns: + schema = schema.select(index_cols + columns) + array_value = core.ArrayValue.from_table( + table, + schema=schema, + predicate=filter_str, + at_time=time_travel_timestamp if enable_snapshot else None, + primary_key=index_cols if is_index_unique else (), + session=self._session, + ) + + # ---------------------------------------------------- + # Create Default Sequential Index if still have no index + # ---------------------------------------------------- + + # If no index columns provided or found, fall back to session default + if (index_col != bigframes.enums.DefaultIndexKind.NULL) and len( + index_cols + ) == 0: + index_col = self._default_index_type + + index_names: Sequence[Hashable] = index_cols + if index_col == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64: + sequential_index_col = bigframes.core.guid.generate_guid("index_") + array_value = array_value.promote_offsets(sequential_index_col) + index_cols = [sequential_index_col] + index_names = [None] + + value_columns = [col for col in array_value.column_ids if col not in index_cols] + block = blocks.Block( + array_value, + index_columns=index_cols, + column_labels=value_columns, + index_labels=index_names, + ) + if max_results: + block = block.slice(stop=max_results) + df = dataframe.DataFrame(block) + + # If user provided index columns, should sort over it + if len(index_cols) > 0: + df.sort_index() + return df + + def _read_bigquery_load_job( + self, + filepath_or_buffer: str | IO["bytes"], + table: Union[bigquery.Table, bigquery.TableReference], + *, + job_config: bigquery.LoadJobConfig, + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), + columns: Iterable[str] = (), + ) -> dataframe.DataFrame: + index_cols = _to_index_cols(index_col) + + if not job_config.clustering_fields and index_cols: + job_config.clustering_fields = index_cols[:_MAX_CLUSTER_COLUMNS] + + if isinstance(filepath_or_buffer, str): + if filepath_or_buffer.startswith("gs://"): + load_job = self._bqclient.load_table_from_uri( + filepath_or_buffer, table, job_config=job_config + ) + else: + with open(filepath_or_buffer, "rb") as source_file: + load_job = self._bqclient.load_table_from_file( + source_file, table, job_config=job_config + ) + else: + load_job = self._bqclient.load_table_from_file( + filepath_or_buffer, table, job_config=job_config + ) + + self._start_generic_job(load_job) + table_id = f"{table.project}.{table.dataset_id}.{table.table_id}" + + # Update the table expiration so we aren't limited to the default 24 + # hours of the anonymous dataset. + table_expiration = bigquery.Table(table_id) + table_expiration.expires = ( + datetime.datetime.now(datetime.timezone.utc) + constants.DEFAULT_EXPIRATION + ) + self._bqclient.update_table(table_expiration, ["expires"]) + + # The BigQuery REST API for tables.get doesn't take a session ID, so we + # can't get the schema for a temp table that way. + + return self.read_gbq_table( + query=table_id, + index_col=index_col, + columns=columns, + api_name="read_gbq_table", + ) + + def read_gbq_query( + self, + query: str, + *, + index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), + columns: Iterable[str] = (), + configuration: Optional[Dict] = None, + max_results: Optional[int] = None, + api_name: str = "read_gbq_query", + use_cache: Optional[bool] = None, + filters: third_party_pandas_gbq.FiltersType = (), + ) -> dataframe.DataFrame: + import bigframes.dataframe as dataframe + + configuration = _transform_read_gbq_configuration(configuration) + + if "query" not in configuration: + configuration["query"] = {} + + if "query" in configuration["query"]: + raise ValueError( + "The query statement must not be included in the ", + "'configuration' because it is already provided as", + " a separate parameter.", + ) + + if "useQueryCache" in configuration["query"]: + if use_cache is not None: + raise ValueError( + "'useQueryCache' in 'configuration' conflicts with" + " 'use_cache' parameter. Please specify only one." + ) + else: + configuration["query"]["useQueryCache"] = ( + True if use_cache is None else use_cache + ) + + index_cols = _to_index_cols(index_col) + + filters_copy1, filters_copy2 = itertools.tee(filters) + has_filters = len(list(filters_copy1)) != 0 + filters = typing.cast(third_party_pandas_gbq.FiltersType, filters_copy2) + if has_filters or max_results is not None: + # TODO(b/338111344): If we are running a query anyway, we might as + # well generate ROW_NUMBER() at the same time. + all_columns = itertools.chain(index_cols, columns) if columns else () + query = bf_io_bigquery.to_query( + query, + all_columns, + bf_io_bigquery.compile_filters(filters) if has_filters else None, + max_results=max_results, + # We're executing the query, so we don't need time travel for + # determinism. + time_travel_timestamp=None, + ) + + destination, query_job = self._query_to_destination( + query, + index_cols, + api_name=api_name, + configuration=configuration, + ) + + # If there was no destination table, that means the query must have + # been DDL or DML. Return some job metadata, instead. + if not destination: + return dataframe.DataFrame( + data=pandas.DataFrame( + { + "statement_type": [ + query_job.statement_type if query_job else "unknown" + ], + "job_id": [query_job.job_id if query_job else "unknown"], + "location": [query_job.location if query_job else "unknown"], + } + ), + session=self._session, + ) + + return self.read_gbq_table( + f"{destination.project}.{destination.dataset_id}.{destination.table_id}", + index_col=index_col, + columns=columns, + use_cache=configuration["query"]["useQueryCache"], + api_name=api_name, + # max_results and filters are omitted because they are already + # handled by to_query(), above. + ) + + def _query_to_destination( + self, + query: str, + index_cols: List[str], + api_name: str, + configuration: dict = {"query": {"useQueryCache": True}}, + do_clustering=True, + ) -> Tuple[Optional[bigquery.TableReference], bigquery.QueryJob]: + # If a dry_run indicates this is not a query type job, then don't + # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. + dry_run_config = bigquery.QueryJobConfig() + dry_run_config.dry_run = True + _, dry_run_job = self._start_query( + query, job_config=dry_run_config, api_name=api_name + ) + if dry_run_job.statement_type != "SELECT": + _, query_job = self._start_query(query, api_name=api_name) + return query_job.destination, query_job + + # Create a table to workaround BigQuery 10 GB query results limit. See: + # internal issue 303057336. + # Since we have a `statement_type == 'SELECT'`, schema should be populated. + schema = dry_run_job.schema + assert schema is not None + if do_clustering: + cluster_cols = bf_io_bigquery.select_cluster_cols( + schema, cluster_candidates=index_cols + ) + else: + cluster_cols = [] + temp_table = self._storage_manager.create_temp_table(schema, cluster_cols) + + timeout_ms = configuration.get("jobTimeoutMs") or configuration["query"].get( + "timeoutMs" + ) + + # Convert timeout_ms to seconds, ensuring a minimum of 0.1 seconds to avoid + # the program getting stuck on too-short timeouts. + timeout = max(int(timeout_ms) * 1e-3, 0.1) if timeout_ms else None + + job_config = typing.cast( + bigquery.QueryJobConfig, + bigquery.QueryJobConfig.from_api_repr(configuration), + ) + job_config.destination = temp_table + + try: + # Write to temp table to workaround BigQuery 10 GB query results + # limit. See: internal issue 303057336. + job_config.labels["error_caught"] = "true" + _, query_job = self._start_query( + query, + job_config=job_config, + timeout=timeout, + api_name=api_name, + ) + return query_job.destination, query_job + except google.api_core.exceptions.BadRequest: + # Some SELECT statements still aren't compatible with cluster + # tables as the destination. For example, if the query has a + # top-level ORDER BY, this conflicts with our ability to cluster + # the table by the index column(s). + _, query_job = self._start_query(query, timeout=timeout, api_name=api_name) + return query_job.destination, query_job + + def _start_query( + self, + sql: str, + job_config: Optional[google.cloud.bigquery.QueryJobConfig] = None, + max_results: Optional[int] = None, + timeout: Optional[float] = None, + api_name: Optional[str] = None, + ) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: + """ + Starts BigQuery query job and waits for results. + + Do not execute dataframe through this API, instead use the executor. + """ + job_config = bigquery.QueryJobConfig() if job_config is None else job_config + if bigframes.options.compute.maximum_bytes_billed is not None: + # Maybe this should be pushed down into start_query_with_client + job_config.maximum_bytes_billed = ( + bigframes.options.compute.maximum_bytes_billed + ) + return bf_io_bigquery.start_query_with_client( + self._bqclient, + sql, + job_config, + max_results, + timeout, + api_name=api_name, + ) + + +def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict: + """ + For backwards-compatibility, convert any previously client-side only + parameters such as timeoutMs to the property name expected by the REST API. + + Makes a copy of configuration if changes are needed. + """ + + if configuration is None: + return {} + + timeout_ms = configuration.get("query", {}).get("timeoutMs") + if timeout_ms is not None: + # Transform timeoutMs to an actual server-side configuration. + # https://github.com/googleapis/python-bigquery-pandas/issues/479 + configuration = copy.deepcopy(configuration) + del configuration["query"]["timeoutMs"] + configuration["jobTimeoutMs"] = timeout_ms + + return configuration diff --git a/bigframes/session/temp_storage.py b/bigframes/session/temp_storage.py index fb8c4bac7a..de764e4535 100644 --- a/bigframes/session/temp_storage.py +++ b/bigframes/session/temp_storage.py @@ -32,13 +32,19 @@ class TemporaryGbqStorageManager: def __init__( self, bqclient: bigquery.Client, - dataset: bigquery.DatasetReference, + location: str, session_id: str, *, kms_key: Optional[str] = None ): self.bqclient = bqclient - self.dataset = dataset + self.location = location + self.dataset = bf_io_bigquery.create_bq_dataset_reference( + self.bqclient, + location=self.location, + api_name="session-__init__", + ) + self.session_id = session_id self._table_ids: List[str] = [] self._kms_key = kms_key diff --git a/tests/system/small/test_encryption.py b/tests/system/small/test_encryption.py index 960752a935..7d684e64b4 100644 --- a/tests/system/small/test_encryption.py +++ b/tests/system/small/test_encryption.py @@ -69,7 +69,7 @@ def test_session_query_job(bq_cmek, session_with_bq_cmek): if not bq_cmek: # pragma: NO COVER pytest.skip("no cmek set for testing") # pragma: NO COVER - _, query_job = session_with_bq_cmek._start_query( + _, query_job = session_with_bq_cmek._loader._start_query( "SELECT 123", job_config=bigquery.QueryJobConfig(use_query_cache=False) ) query_job.result() diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 2f7eaa567a..b76c74654c 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -181,7 +181,7 @@ def test_read_gbq_cached_table(): table._properties["location"] = session._location table._properties["numRows"] = "1000000000" table._properties["location"] = session._location - session._df_snapshot[table_ref] = ( + session._loader._df_snapshot[table_ref] = ( datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc), table, ) diff --git a/tests/unit/test_compute_options.py b/tests/unit/test_compute_options.py deleted file mode 100644 index 2de715a40e..0000000000 --- a/tests/unit/test_compute_options.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from unittest import mock - -import bigframes as bf - -from . import resources - - -def test_maximum_bytes_option(): - session = resources.create_bigquery_session() - session.bqclient.query = mock.MagicMock() - with bf.option_context("compute.maximum_bytes_billed", 10000): - session._start_query("query") - call = session.bqclient.query.call_args - assert call.kwargs["job_config"].maximum_bytes_billed == 10000 - session.bqclient.query.assert_called_once()