From b5091908e1e9b3636cd9a8a58e8265c3a181fbf0 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Sat, 16 Sep 2023 00:00:45 +0000 Subject: [PATCH 01/11] feat: add more api_name in job config Change-Id: I2a201ff6f35e4c945177b2f504b60a3744e86a0d --- bigframes/core/__init__.py | 61 ++++++++++++++++++++++++++++++++++++-- bigframes/core/blocks.py | 12 ++++++-- bigframes/dataframe.py | 2 +- bigframes/series.py | 4 ++- bigframes/session.py | 29 ++++++++++++++---- 5 files changed, 96 insertions(+), 12 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 8008c1189a..1016dd360e 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -45,6 +45,7 @@ ORDER_ID_COLUMN = "bigframes_ordering_id" PREDICATE_COLUMN = "bigframes_predicate" +MAX_LABELS_COUNT = 64 @dataclass(frozen=True) @@ -94,6 +95,7 @@ def __init__( hidden_ordering_columns: Optional[Sequence[ibis_types.Value]] = None, ordering: ExpressionOrdering = ExpressionOrdering(), predicates: Optional[Collection[ibis_types.BooleanValue]] = None, + api_methods: Dict[str, str] = {}, ): self._session = session self._table = table @@ -120,6 +122,9 @@ def __init__( self._hidden_ordering_column_names = { column.get_name(): column for column in self._hidden_ordering_columns } + + self._api_methods = api_methods + ### Validation value_col_ids = self._column_names.keys() hidden_col_ids = self._hidden_ordering_column_names.keys() @@ -234,6 +239,10 @@ def _ibis_order(self) -> Sequence[ibis_types.Value]: self._ordering.all_ordering_columns, ) + @property + def api_method(self) -> Dict[str, str]: + return self._api_methods + def builder(self) -> ArrayValueBuilder: """Creates a mutable builder for expressions.""" # Since ArrayValue is intended to be immutable (immutability offers @@ -461,7 +470,19 @@ def shape(self) -> typing.Tuple[int, int]: width = len(self.columns) count_expr = self._to_ibis_expr(ordering_mode="unordered").count() sql = self._session.ibis_client.compile(count_expr) - + api_methods_len = len(self._api_methods) + + # Initialize methods to add as an empty dictionary + add_api_methods: Dict[str, str] = {} + if api_methods_len >= MAX_LABELS_COUNT: + add_api_methods = self.api_method + elif api_methods_len == 0: + add_api_key = "bigframes-api-0" + add_api_methods = {add_api_key: "shape"} + else: + add_api_key = "bigframes-api-" + str(api_methods_len) + self.api_method[add_api_key] = "shape" + add_api_methods = self.api_method # Support in-memory engines for hermetic unit tests. if not isinstance(sql, str): length = self._session.ibis_client.execute(count_expr) @@ -469,6 +490,7 @@ def shape(self) -> typing.Tuple[int, int]: row_iterator, _ = self._session._start_query( sql=sql, max_results=1, + api_methods=add_api_methods, ) length = next(row_iterator)[0] return (length, width) @@ -579,6 +601,20 @@ def aggregate( col_out: agg_op._as_ibis(table[col_in]) for col_in, agg_op, col_out in aggregations } + + # Initialize methods to add as an empty dictionary + add_api_methods: Dict[str, str] = {} + api_methods_len = len(self._api_methods) + if api_methods_len >= MAX_LABELS_COUNT: + add_api_methods = self.api_method + elif api_methods_len == 0: + add_api_key = "bigframes-api-0" + add_api_methods = {add_api_key: "aggregate"} + else: + add_api_key = "bigframes-api-" + str(api_methods_len) + self.api_method[add_api_key] = "aggregate" + add_api_methods = self.api_method + if by_column_ids: result = table.group_by(by_column_ids).aggregate(**stats) # Must have deterministic ordering, so order by the unique "by" column @@ -590,7 +626,13 @@ def aggregate( total_ordering_columns=frozenset(by_column_ids), ) columns = tuple(result[key] for key in result.columns) - expr = ArrayValue(self._session, result, columns=columns, ordering=ordering) + expr = ArrayValue( + self._session, + result, + columns=columns, + ordering=ordering, + api_methods=add_api_methods, + ) if dropna: for column_id in by_column_ids: expr = expr._filter( @@ -613,6 +655,7 @@ def aggregate( columns=[result[col_id] for col_id in [*stats.keys()]], hidden_ordering_columns=[result[ORDER_ID_COLUMN]], ordering=ordering, + api_methods=add_api_methods, ) def corr_aggregate( @@ -871,6 +914,7 @@ def start_query( job_config: Optional[bigquery.job.QueryJobConfig] = None, max_results: Optional[int] = None, expose_extra_columns: bool = False, + api_name: str = "", ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """Execute a query and return metadata about the results.""" # TODO(swast): Cache the job ID so we can look it up again if they ask @@ -885,10 +929,23 @@ def start_query( # maybe we just print the job metadata that we have so far? table = self._to_ibis_expr(expose_hidden_cols=expose_extra_columns) sql = self._session.ibis_client.compile(table) # type:ignore + + # Initialize methods to add as an empty dictionary + add_api_methods: Dict[str, str] = {} + api_methods_len = len(self._api_methods) + if api_methods_len >= MAX_LABELS_COUNT: + add_api_methods = self.api_method + elif api_methods_len == 0: + add_api_key = "bigframes-api-0" + add_api_methods = {add_api_key: api_name} + else: + add_api_key = "bigframes-api-" + str(api_methods_len) + add_api_methods[add_api_key] = api_name return self._session._start_query( sql=sql, job_config=job_config, max_results=max_results, + api_methods=add_api_methods, ) def _get_table_size(self, destination_table): diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 0161d17361..d7f33c0c93 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -429,6 +429,7 @@ def to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, random_state=random_state, + api_name="to_pandas", ) return df, query_job @@ -439,13 +440,16 @@ def _compute_and_count( max_download_size: Optional[int] = None, sampling_method: Optional[str] = None, random_state: Optional[int] = None, + api_name: str = "", ) -> Tuple[pd.DataFrame, int, bigquery.QueryJob]: """Run query and download results as a pandas DataFrame. Return the total number of results as well.""" # TODO(swast): Allow for dry run and timeout. expr = self._apply_value_keys_to_expr(value_keys=value_keys) results_iterator, query_job = expr.start_query( - max_results=max_results, expose_extra_columns=True + max_results=max_results, + expose_extra_columns=True, + api_name=api_name, ) table_size = expr._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES @@ -602,11 +606,13 @@ def _split( return [sliced_block.drop_columns(drop_cols) for sliced_block in sliced_blocks] def _compute_dry_run( - self, value_keys: Optional[Iterable[str]] = None + self, + value_keys: Optional[Iterable[str]] = None, + api_name: str = "", ) -> bigquery.QueryJob: expr = self._apply_value_keys_to_expr(value_keys=value_keys) job_config = bigquery.QueryJobConfig(dry_run=True) - _, query_job = expr.start_query(job_config=job_config) + _, query_job = expr.start_query(job_config=job_config, api_name=api_name) return query_job def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None): diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index eea8beb130..0815f497bc 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -885,7 +885,7 @@ def to_pandas( return df.set_axis(self._block.column_labels, axis=1, copy=False) def _compute_dry_run(self) -> bigquery.QueryJob: - return self._block._compute_dry_run() + return self._block._compute_dry_run(api_name="dataframe_query_job") def copy(self) -> DataFrame: return DataFrame(self._block) diff --git a/bigframes/series.py b/bigframes/series.py index 8815a6abde..aa1da2f2ba 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -274,7 +274,9 @@ def to_pandas( return series def _compute_dry_run(self) -> bigquery.QueryJob: - return self._block._compute_dry_run((self._value_column,)) + return self._block._compute_dry_run( + value_keys=(self._value_column,), api_name="series_query_job" + ) def drop( self, diff --git a/bigframes/session.py b/bigframes/session.py index ac48c977cb..e655d4ba4e 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -98,6 +98,7 @@ _BIGQUERYSTORAGE_REGIONAL_ENDPOINT = "{location}-bigquerystorage.googleapis.com" _MAX_CLUSTER_COLUMNS = 4 +_MAX_LABELS_COUNT = 64 # TODO(swast): Need to connect to regional endpoints when performing remote # functions operations (BQ Connection IAM, Cloud Run / Cloud Functions). @@ -932,7 +933,7 @@ def _read_pandas( job_config = bigquery.LoadJobConfig(schema=schema) job_config.clustering_fields = cluster_cols - job_config.labels = {"bigframes-api": api_name} + job_config.labels = {"bigframes-io-api": api_name} load_table_destination = self._create_session_table() load_job = self.bqclient.load_table_from_dataframe( @@ -1047,7 +1048,7 @@ def read_csv( job_config.autodetect = True job_config.field_delimiter = sep job_config.encoding = encoding - job_config.labels = {"bigframes-api": "read_csv"} + job_config.labels = {"bigframes-io-api": "read_csv"} # We want to match pandas behavior. If header is 0, no rows should be skipped, so we # do not need to set `skip_leading_rows`. If header is None, then there is no header. @@ -1119,7 +1120,7 @@ def read_parquet( job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED job_config.source_format = bigquery.SourceFormat.PARQUET job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY - job_config.labels = {"bigframes-api": "read_parquet"} + job_config.labels = {"bigframes-io-api": "read_parquet"} return self._read_bigquery_load_job(path, table, job_config=job_config) @@ -1166,7 +1167,7 @@ def read_json( job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY job_config.autodetect = True job_config.encoding = encoding - job_config.labels = {"bigframes-api": "read_json"} + job_config.labels = {"bigframes-io-api": "read_json"} return self._read_bigquery_load_job( path_or_buf, @@ -1459,14 +1460,32 @@ def _start_query( sql: str, job_config: Optional[bigquery.job.QueryJobConfig] = None, max_results: Optional[int] = None, + api_methods: Dict[str, str] = {}, ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """ Starts query job and waits for results """ if job_config is not None: + # If there is no lable set + if job_config.labels is None: + job_config.labels = api_methods + # If the total number of lables is under the limit of labels count + elif len(job_config.labels) + len(api_methods) <= _MAX_LABELS_COUNT: + job_config.labels = {**api_methods, **job_config.labels} + # We truncate the label if it is out of the length limit of labels count + else: + job_config_labels_len = len(job_config.labels) + added_lables_len = _MAX_LABELS_COUNT - job_config_labels_len + for key, value in api_methods.items(): + _, _, num = key.split("-") + # Add the label in order before reaching limit + if int(num) < added_lables_len: + job_config.labels[key] = value query_job = self.bqclient.query(sql, job_config=job_config) else: - query_job = self.bqclient.query(sql) + job_config = bigquery.QueryJobConfig() + job_config.labels = api_methods + query_job = self.bqclient.query(sql, job_config=job_config) opts = bigframes.options.display if opts.progress_bar is not None and not query_job.configuration.dry_run: From 34c735ee883252faab50a7faaf6d5a7f169126f4 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Fri, 22 Sep 2023 22:42:47 +0000 Subject: [PATCH 02/11] Capture the latest label if labels length is reached --- bigframes/core/__init__.py | 23 +++++++++++++++++++++-- bigframes/session.py | 20 +++++++++++++------- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 1016dd360e..f49fe06cf0 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -474,8 +474,18 @@ def shape(self) -> typing.Tuple[int, int]: # Initialize methods to add as an empty dictionary add_api_methods: Dict[str, str] = {} + # We capture the latest label if it is out of the length limit of labels count if api_methods_len >= MAX_LABELS_COUNT: - add_api_methods = self.api_method + # Pop out the first item + sorted_dict = dict(sorted(self._api_methods.items())) + sorted_dict.popitem() + # Get the new key number + sorted_items = sorted(self._api_methods.items()) + adeded_number = int(sorted_items[-1][1]) + 1 + add_api_key = "bigframes-api-" + str(adeded_number) + # Add the latest api to + sorted_dict[add_api_key] = "shape" + add_api_methods = sorted_dict elif api_methods_len == 0: add_api_key = "bigframes-api-0" add_api_methods = {add_api_key: "shape"} @@ -606,7 +616,16 @@ def aggregate( add_api_methods: Dict[str, str] = {} api_methods_len = len(self._api_methods) if api_methods_len >= MAX_LABELS_COUNT: - add_api_methods = self.api_method + # Pop out the first item + sorted_dict = dict(sorted(self._api_methods.items())) + sorted_dict.popitem() + # Get the new key number + sorted_items = sorted(self._api_methods.items()) + adeded_number = int(sorted_items[-1][1]) + 1 + add_api_key = "bigframes-api-" + str(adeded_number) + # Add the latest api to + sorted_dict[add_api_key] = "shape" + add_api_methods = sorted_dict elif api_methods_len == 0: add_api_key = "bigframes-api-0" add_api_methods = {add_api_key: "aggregate"} diff --git a/bigframes/session.py b/bigframes/session.py index e655d4ba4e..e353aab9ce 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -1472,15 +1472,21 @@ def _start_query( # If the total number of lables is under the limit of labels count elif len(job_config.labels) + len(api_methods) <= _MAX_LABELS_COUNT: job_config.labels = {**api_methods, **job_config.labels} - # We truncate the label if it is out of the length limit of labels count + # We capture the latest label if it is out of the length limit of labels count else: job_config_labels_len = len(job_config.labels) - added_lables_len = _MAX_LABELS_COUNT - job_config_labels_len - for key, value in api_methods.items(): - _, _, num = key.split("-") - # Add the label in order before reaching limit - if int(num) < added_lables_len: - job_config.labels[key] = value + # The last n labels to add + added_lables_len = ( + job_config_labels_len + len(api_methods) - _MAX_LABELS_COUNT + ) + + # Convert the dictionary items into a list + label_list = sorted(api_methods.items()) + + # Process the last n items using slicing + for key, value in label_list[-added_lables_len:]: + job_config.labels[key] = value + query_job = self.bqclient.query(sql, job_config=job_config) else: job_config = bigquery.QueryJobConfig() From 61d29fd9f432741c5cf9aa3c9699197b83d4ddf6 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Fri, 22 Sep 2023 23:18:37 +0000 Subject: [PATCH 03/11] fix the type and rewording --- bigframes/core/__init__.py | 8 ++++---- bigframes/session.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index f49fe06cf0..cea37aacad 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -472,7 +472,7 @@ def shape(self) -> typing.Tuple[int, int]: sql = self._session.ibis_client.compile(count_expr) api_methods_len = len(self._api_methods) - # Initialize methods to add as an empty dictionary + # Initialize api-methods as an empty dictionary add_api_methods: Dict[str, str] = {} # We capture the latest label if it is out of the length limit of labels count if api_methods_len >= MAX_LABELS_COUNT: @@ -483,7 +483,7 @@ def shape(self) -> typing.Tuple[int, int]: sorted_items = sorted(self._api_methods.items()) adeded_number = int(sorted_items[-1][1]) + 1 add_api_key = "bigframes-api-" + str(adeded_number) - # Add the latest api to + # Add the latest api sorted_dict[add_api_key] = "shape" add_api_methods = sorted_dict elif api_methods_len == 0: @@ -612,7 +612,7 @@ def aggregate( for col_in, agg_op, col_out in aggregations } - # Initialize methods to add as an empty dictionary + # Initialize api-methods as an empty dictionary add_api_methods: Dict[str, str] = {} api_methods_len = len(self._api_methods) if api_methods_len >= MAX_LABELS_COUNT: @@ -623,7 +623,7 @@ def aggregate( sorted_items = sorted(self._api_methods.items()) adeded_number = int(sorted_items[-1][1]) + 1 add_api_key = "bigframes-api-" + str(adeded_number) - # Add the latest api to + # Add the latest api sorted_dict[add_api_key] = "shape" add_api_methods = sorted_dict elif api_methods_len == 0: diff --git a/bigframes/session.py b/bigframes/session.py index e353aab9ce..394bfa4511 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -1466,10 +1466,10 @@ def _start_query( Starts query job and waits for results """ if job_config is not None: - # If there is no lable set + # If there is no label set if job_config.labels is None: job_config.labels = api_methods - # If the total number of lables is under the limit of labels count + # If the total number of labels is under the limit of labels count elif len(job_config.labels) + len(api_methods) <= _MAX_LABELS_COUNT: job_config.labels = {**api_methods, **job_config.labels} # We capture the latest label if it is out of the length limit of labels count From a6b968d21ad2b7fed1cd9f56eb7056f2be5057cf Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Wed, 27 Sep 2023 23:24:07 +0000 Subject: [PATCH 04/11] B feat: change the dict to list for ArrayValue.api_methods --- bigframes/core/__init__.py | 66 ++++++++++++-------------------------- bigframes/session.py | 47 ++++++++++++++------------- 2 files changed, 44 insertions(+), 69 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index cea37aacad..f01d42d9d1 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -17,7 +17,7 @@ import functools import math import typing -from typing import Collection, Dict, Iterable, Literal, Optional, Sequence, Tuple +from typing import Collection, Dict, Iterable, List, Literal, Optional, Sequence, Tuple from google.cloud import bigquery import ibis @@ -95,7 +95,7 @@ def __init__( hidden_ordering_columns: Optional[Sequence[ibis_types.Value]] = None, ordering: ExpressionOrdering = ExpressionOrdering(), predicates: Optional[Collection[ibis_types.BooleanValue]] = None, - api_methods: Dict[str, str] = {}, + api_methods: List[str] = [], ): self._session = session self._table = table @@ -240,7 +240,7 @@ def _ibis_order(self) -> Sequence[ibis_types.Value]: ) @property - def api_method(self) -> Dict[str, str]: + def api_method(self) -> List[str]: return self._api_methods def builder(self) -> ArrayValueBuilder: @@ -472,27 +472,15 @@ def shape(self) -> typing.Tuple[int, int]: sql = self._session.ibis_client.compile(count_expr) api_methods_len = len(self._api_methods) - # Initialize api-methods as an empty dictionary - add_api_methods: Dict[str, str] = {} - # We capture the latest label if it is out of the length limit of labels count + # Initialize methods to add as an empty list + add_api_methods = [] + api_methods_len = len(self._api_methods) if api_methods_len >= MAX_LABELS_COUNT: - # Pop out the first item - sorted_dict = dict(sorted(self._api_methods.items())) - sorted_dict.popitem() - # Get the new key number - sorted_items = sorted(self._api_methods.items()) - adeded_number = int(sorted_items[-1][1]) + 1 - add_api_key = "bigframes-api-" + str(adeded_number) - # Add the latest api - sorted_dict[add_api_key] = "shape" - add_api_methods = sorted_dict - elif api_methods_len == 0: - add_api_key = "bigframes-api-0" - add_api_methods = {add_api_key: "shape"} + add_api_methods = self.api_method[1:] else: - add_api_key = "bigframes-api-" + str(api_methods_len) - self.api_method[add_api_key] = "shape" add_api_methods = self.api_method + add_api_methods.append("shape") + # Support in-memory engines for hermetic unit tests. if not isinstance(sql, str): length = self._session.ibis_client.execute(count_expr) @@ -612,27 +600,14 @@ def aggregate( for col_in, agg_op, col_out in aggregations } - # Initialize api-methods as an empty dictionary - add_api_methods: Dict[str, str] = {} + # Initialize methods to add as an empty list + add_api_methods = [] api_methods_len = len(self._api_methods) if api_methods_len >= MAX_LABELS_COUNT: - # Pop out the first item - sorted_dict = dict(sorted(self._api_methods.items())) - sorted_dict.popitem() - # Get the new key number - sorted_items = sorted(self._api_methods.items()) - adeded_number = int(sorted_items[-1][1]) + 1 - add_api_key = "bigframes-api-" + str(adeded_number) - # Add the latest api - sorted_dict[add_api_key] = "shape" - add_api_methods = sorted_dict - elif api_methods_len == 0: - add_api_key = "bigframes-api-0" - add_api_methods = {add_api_key: "aggregate"} + add_api_methods = self.api_method[1:] else: - add_api_key = "bigframes-api-" + str(api_methods_len) - self.api_method[add_api_key] = "aggregate" add_api_methods = self.api_method + add_api_methods.append("aggregate") if by_column_ids: result = table.group_by(by_column_ids).aggregate(**stats) @@ -949,17 +924,16 @@ def start_query( table = self._to_ibis_expr(expose_hidden_cols=expose_extra_columns) sql = self._session.ibis_client.compile(table) # type:ignore - # Initialize methods to add as an empty dictionary - add_api_methods: Dict[str, str] = {} + # Initialize methods to add as an empty list + add_api_methods = [] api_methods_len = len(self._api_methods) if api_methods_len >= MAX_LABELS_COUNT: - add_api_methods = self.api_method - elif api_methods_len == 0: - add_api_key = "bigframes-api-0" - add_api_methods = {add_api_key: api_name} + add_api_methods = self.api_method[1:] else: - add_api_key = "bigframes-api-" + str(api_methods_len) - add_api_methods[add_api_key] = api_name + add_api_methods = self.api_method + if len(api_name) > 1: + add_api_methods.append(api_name) + return self._session._start_query( sql=sql, job_config=job_config, diff --git a/bigframes/session.py b/bigframes/session.py index 394bfa4511..4e5b2c068e 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -933,7 +933,7 @@ def _read_pandas( job_config = bigquery.LoadJobConfig(schema=schema) job_config.clustering_fields = cluster_cols - job_config.labels = {"bigframes-io-api": api_name} + job_config.labels = {"bigframes-api": api_name} load_table_destination = self._create_session_table() load_job = self.bqclient.load_table_from_dataframe( @@ -1048,7 +1048,7 @@ def read_csv( job_config.autodetect = True job_config.field_delimiter = sep job_config.encoding = encoding - job_config.labels = {"bigframes-io-api": "read_csv"} + job_config.labels = {"bigframes-api": "read_csv"} # We want to match pandas behavior. If header is 0, no rows should be skipped, so we # do not need to set `skip_leading_rows`. If header is None, then there is no header. @@ -1120,7 +1120,7 @@ def read_parquet( job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED job_config.source_format = bigquery.SourceFormat.PARQUET job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY - job_config.labels = {"bigframes-io-api": "read_parquet"} + job_config.labels = {"bigframes-api": "read_parquet"} return self._read_bigquery_load_job(path, table, job_config=job_config) @@ -1167,7 +1167,7 @@ def read_json( job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY job_config.autodetect = True job_config.encoding = encoding - job_config.labels = {"bigframes-io-api": "read_json"} + job_config.labels = {"bigframes-api": "read_json"} return self._read_bigquery_load_job( path_or_buf, @@ -1460,7 +1460,7 @@ def _start_query( sql: str, job_config: Optional[bigquery.job.QueryJobConfig] = None, max_results: Optional[int] = None, - api_methods: Dict[str, str] = {}, + api_methods: List[str] = [], ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """ Starts query job and waits for results @@ -1468,29 +1468,30 @@ def _start_query( if job_config is not None: # If there is no label set if job_config.labels is None: - job_config.labels = api_methods - # If the total number of labels is under the limit of labels count - elif len(job_config.labels) + len(api_methods) <= _MAX_LABELS_COUNT: - job_config.labels = {**api_methods, **job_config.labels} - # We capture the latest label if it is out of the length limit of labels count + label_values = api_methods else: - job_config_labels_len = len(job_config.labels) - # The last n labels to add - added_lables_len = ( - job_config_labels_len + len(api_methods) - _MAX_LABELS_COUNT - ) - - # Convert the dictionary items into a list - label_list = sorted(api_methods.items()) - - # Process the last n items using slicing - for key, value in label_list[-added_lables_len:]: - job_config.labels[key] = value + cur_labels: List[str] = [*job_config.labels.values()] + total_label_values = cur_labels + api_methods + # If the total number of labels is under the limit of labels count + if len(job_config.labels) + len(api_methods) <= _MAX_LABELS_COUNT: + label_values = total_label_values + # We capture the latest label if it is out of the length limit of labels count + else: + label_values = total_label_values[-_MAX_LABELS_COUNT:] + labels = {} + for i, label_value in enumerate(label_values): + label_key = "bigframes-api" + str(i) + labels[label_key] = label_value + job_config.labels = labels query_job = self.bqclient.query(sql, job_config=job_config) else: job_config = bigquery.QueryJobConfig() - job_config.labels = api_methods + labels = {} + for i, label_value in enumerate(api_methods): + label_key = "bigframes-api" + str(i) + labels[label_key] = label_value + job_config.labels = labels query_job = self.bqclient.query(sql, job_config=job_config) opts = bigframes.options.display From db9df9d6c651a4782dd6bb4a0faeb5e89e7d64cd Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Wed, 27 Sep 2023 23:33:12 +0000 Subject: [PATCH 05/11] refactor: refactor the logic --- bigframes/session.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/bigframes/session.py b/bigframes/session.py index 4e5b2c068e..ae4fb90ec6 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -1478,21 +1478,16 @@ def _start_query( # We capture the latest label if it is out of the length limit of labels count else: label_values = total_label_values[-_MAX_LABELS_COUNT:] - - labels = {} - for i, label_value in enumerate(label_values): - label_key = "bigframes-api" + str(i) - labels[label_key] = label_value - job_config.labels = labels - query_job = self.bqclient.query(sql, job_config=job_config) else: job_config = bigquery.QueryJobConfig() - labels = {} - for i, label_value in enumerate(api_methods): - label_key = "bigframes-api" + str(i) - labels[label_key] = label_value - job_config.labels = labels - query_job = self.bqclient.query(sql, job_config=job_config) + label_values = api_methods + + labels = {} + for i, label_value in enumerate(label_values): + label_key = "bigframes-api" + str(i) + labels[label_key] = label_value + job_config.labels = labels + query_job = self.bqclient.query(sql, job_config=job_config) opts = bigframes.options.display if opts.progress_bar is not None and not query_job.configuration.dry_run: From c39adea67ab6f27ea9c0f63f2ffe45efda150a38 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Wed, 27 Sep 2023 21:00:26 -0700 Subject: [PATCH 06/11] fix: generate unique ids on join to avoid id collisions (#65) * fix: generate unique ids on join to avoid id collisions --- bigframes/core/joins/single_column.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/bigframes/core/joins/single_column.py b/bigframes/core/joins/single_column.py index 2d616fc3f0..e067bc0519 100644 --- a/bigframes/core/joins/single_column.py +++ b/bigframes/core/joins/single_column.py @@ -16,6 +16,7 @@ from __future__ import annotations +import itertools import itertools import typing from typing import Callable, Literal, Tuple @@ -129,6 +130,25 @@ def join_by_column( ) } + def get_column_left(col_id): + return lmapping[col_id] + + def get_column_right(col_id): + return rmapping[col_id] + + lmapping = { + col_id: guid.generate_guid() + for col_id in itertools.chain( + left.column_names, left._hidden_ordering_column_names + ) + } + rmapping = { + col_id: guid.generate_guid() + for col_id in itertools.chain( + right.column_names, right._hidden_ordering_column_names + ) + } + def get_column_left(col_id): return lmapping[col_id] @@ -146,6 +166,8 @@ def get_column_right(col_id): col_id_overrides=rmapping, ) join_conditions = [ + value_to_join_key(left_table[lmapping[left_index]]) + == value_to_join_key(right_table[rmapping[right_index]]) value_to_join_key(left_table[lmapping[left_index]]) == value_to_join_key(right_table[rmapping[right_index]]) for left_index, right_index in zip(left_column_ids, right_column_ids) From 121e68bf2cd3e62be468557e767b695c3d33ae0e Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Thu, 28 Sep 2023 20:31:23 +0000 Subject: [PATCH 07/11] fix: move the label to the DataFrame/Series user-facing layer and move the length check logic --- bigframes/core/__init__.py | 45 +++++-------------- bigframes/core/blocks.py | 12 +++-- bigframes/core/groupby/__init__.py | 72 +++++++++++++++++++++--------- bigframes/dataframe.py | 5 ++- bigframes/series.py | 6 ++- bigframes/session.py | 6 +-- 6 files changed, 81 insertions(+), 65 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index f01d42d9d1..364de48dee 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -17,7 +17,7 @@ import functools import math import typing -from typing import Collection, Dict, Iterable, List, Literal, Optional, Sequence, Tuple +from typing import Collection, Dict, Iterable, Literal, Optional, Sequence, Tuple from google.cloud import bigquery import ibis @@ -95,7 +95,7 @@ def __init__( hidden_ordering_columns: Optional[Sequence[ibis_types.Value]] = None, ordering: ExpressionOrdering = ExpressionOrdering(), predicates: Optional[Collection[ibis_types.BooleanValue]] = None, - api_methods: List[str] = [], + api_methods: Sequence[str] = [], ): self._session = session self._table = table @@ -240,7 +240,7 @@ def _ibis_order(self) -> Sequence[ibis_types.Value]: ) @property - def api_method(self) -> List[str]: + def api_methods(self) -> Sequence[str]: return self._api_methods def builder(self) -> ArrayValueBuilder: @@ -465,21 +465,13 @@ def projection(self, columns: Iterable[ibis_types.Value]) -> ArrayValue: new_expr = builder.build() return new_expr - def shape(self) -> typing.Tuple[int, int]: + def shape(self, api_method: str) -> typing.Tuple[int, int]: """Returns dimensions as (length, width) tuple.""" width = len(self.columns) count_expr = self._to_ibis_expr(ordering_mode="unordered").count() sql = self._session.ibis_client.compile(count_expr) - api_methods_len = len(self._api_methods) - - # Initialize methods to add as an empty list - add_api_methods = [] - api_methods_len = len(self._api_methods) - if api_methods_len >= MAX_LABELS_COUNT: - add_api_methods = self.api_method[1:] - else: - add_api_methods = self.api_method - add_api_methods.append("shape") + add_api_methods = list(self.api_methods) + add_api_methods.append(api_method) # Support in-memory engines for hermetic unit tests. if not isinstance(sql, str): @@ -586,6 +578,7 @@ def aggregate( aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp, str]], by_column_ids: typing.Sequence[str] = (), dropna: bool = True, + api_method: str = "", ) -> ArrayValue: """ Apply aggregations to the expression. @@ -599,16 +592,8 @@ def aggregate( col_out: agg_op._as_ibis(table[col_in]) for col_in, agg_op, col_out in aggregations } - - # Initialize methods to add as an empty list - add_api_methods = [] - api_methods_len = len(self._api_methods) - if api_methods_len >= MAX_LABELS_COUNT: - add_api_methods = self.api_method[1:] - else: - add_api_methods = self.api_method - add_api_methods.append("aggregate") - + add_api_methods = list(self.api_methods) + add_api_methods.append(api_method) if by_column_ids: result = table.group_by(by_column_ids).aggregate(**stats) # Must have deterministic ordering, so order by the unique "by" column @@ -924,16 +909,8 @@ def start_query( table = self._to_ibis_expr(expose_hidden_cols=expose_extra_columns) sql = self._session.ibis_client.compile(table) # type:ignore - # Initialize methods to add as an empty list - add_api_methods = [] - api_methods_len = len(self._api_methods) - if api_methods_len >= MAX_LABELS_COUNT: - add_api_methods = self.api_method[1:] - else: - add_api_methods = self.api_method - if len(api_name) > 1: - add_api_methods.append(api_name) - + add_api_methods = list(self.api_methods) + add_api_methods.append(api_name) return self._session._start_query( sql=sql, job_config=job_config, diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index d7f33c0c93..c189902191 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -134,7 +134,7 @@ def index(self) -> indexes.IndexValue: @functools.cached_property def shape(self) -> typing.Tuple[int, int]: """Returns dimensions as (length, width) tuple.""" - impl_length, _ = self._expr.shape() + impl_length, _ = self._expr.shape("shape") return (impl_length, len(self.value_columns)) @property @@ -930,6 +930,7 @@ def aggregate( *, as_index: bool = True, dropna: bool = True, + api_method: str = "", ) -> typing.Tuple[Block, typing.Sequence[str]]: """ Apply aggregations to the block. Callers responsible for setting index column(s) after. @@ -944,7 +945,9 @@ def aggregate( for input_id, operation in aggregations ] output_col_ids = [agg_spec[2] for agg_spec in agg_specs] - result_expr = self.expr.aggregate(agg_specs, by_column_ids, dropna=dropna) + result_expr = self.expr.aggregate( + agg_specs, by_column_ids, dropna=dropna, api_method=api_method + ) aggregate_labels = self._get_labels_for_columns( [agg[0] for agg in aggregations] @@ -1032,6 +1035,7 @@ def summarize( self, column_ids: typing.Sequence[str], stats: typing.Sequence[agg_ops.AggregateOp], + api_method: str, ): """Get a list of stats as a deferred block object.""" label_col_id = guid.generate_guid() @@ -1045,7 +1049,9 @@ def summarize( (col_id, [f"{col_id}-{stat.name}" for stat in stats]) for col_id in column_ids ] - expr = self.expr.aggregate(aggregations).unpivot( + expr = self.expr.aggregate( + aggregations=aggregations, api_method=api_method + ).unpivot( labels, unpivot_columns=columns, index_col_ids=[label_col_id], diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 9be7f22a71..7db44f81db 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -104,12 +104,16 @@ def __getitem__( def sum(self, numeric_only: bool = False, *args) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("sum") - return self._aggregate_all(agg_ops.sum_op, numeric_only=True) + return self._aggregate_all( + agg_ops.sum_op, numeric_only=True, api_method="dfgroupby-sum" + ) def mean(self, numeric_only: bool = False, *args) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("mean") - return self._aggregate_all(agg_ops.mean_op, numeric_only=True) + return self._aggregate_all( + agg_ops.mean_op, numeric_only=True, api_method="dfgroupby-mean" + ) def median( self, numeric_only: bool = False, *, exact: bool = False @@ -120,13 +124,19 @@ def median( ) if not numeric_only: self._raise_on_non_numeric("median") - return self._aggregate_all(agg_ops.median_op, numeric_only=True) + return self._aggregate_all( + agg_ops.median_op, numeric_only=True, api_method="dfgroupby-median" + ) def min(self, numeric_only: bool = False, *args) -> df.DataFrame: - return self._aggregate_all(agg_ops.min_op, numeric_only=numeric_only) + return self._aggregate_all( + agg_ops.min_op, numeric_only=numeric_only, api_method="dfgroupby-min" + ) def max(self, numeric_only: bool = False, *args) -> df.DataFrame: - return self._aggregate_all(agg_ops.max_op, numeric_only=numeric_only) + return self._aggregate_all( + agg_ops.max_op, numeric_only=numeric_only, api_method="dfgroupby-max" + ) def std( self, @@ -135,7 +145,9 @@ def std( ) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("std") - return self._aggregate_all(agg_ops.std_op, numeric_only=True) + return self._aggregate_all( + agg_ops.std_op, numeric_only=True, api_method="dfgroupby-std" + ) def var( self, @@ -144,7 +156,9 @@ def var( ) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("var") - return self._aggregate_all(agg_ops.var_op, numeric_only=True) + return self._aggregate_all( + agg_ops.var_op, numeric_only=True, api_method="dfgroupby-var" + ) def skew( self, @@ -169,13 +183,13 @@ def kurt( kurtosis = kurt def all(self) -> df.DataFrame: - return self._aggregate_all(agg_ops.all_op) + return self._aggregate_all(agg_ops.all_op, api_method="dfgroupby-all") def any(self) -> df.DataFrame: - return self._aggregate_all(agg_ops.any_op) + return self._aggregate_all(agg_ops.any_op, api_method="dfgroupby-any") def count(self) -> df.DataFrame: - return self._aggregate_all(agg_ops.count_op) + return self._aggregate_all(agg_ops.count_op, api_method="dfgroupby-count") def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: if not numeric_only: @@ -240,19 +254,19 @@ def expanding(self, min_periods: int = 1) -> windows.Window: def agg(self, func=None, **kwargs) -> df.DataFrame: if func: if isinstance(func, str): - return self._agg_string(func) + return self._agg_string(func, "dfgroupby-agg") elif utils.is_dict_like(func): - return self._agg_dict(func) + return self._agg_dict(func, "dfgroupby-agg") elif utils.is_list_like(func): - return self._agg_list(func) + return self._agg_list(func, "dfgroupby-agg") else: raise NotImplementedError( f"Aggregate with {func} not supported. {constants.FEEDBACK_LINK}" ) else: - return self._agg_named(**kwargs) + return self._agg_named("dfgroupby-agg", **kwargs) - def _agg_string(self, func: str) -> df.DataFrame: + def _agg_string(self, func: str, api_method: str) -> df.DataFrame: aggregations = [ (col_id, agg_ops.lookup_agg_func(func)) for col_id in self._aggregated_columns() @@ -262,10 +276,11 @@ def _agg_string(self, func: str) -> df.DataFrame: aggregations=aggregations, as_index=self._as_index, dropna=self._dropna, + api_method=api_method, ) return df.DataFrame(agg_block) - def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: + def _agg_dict(self, func: typing.Mapping, api_method: str) -> df.DataFrame: aggregations: typing.List[typing.Tuple[str, agg_ops.AggregateOp]] = [] column_labels = [] @@ -284,6 +299,7 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: aggregations=aggregations, as_index=self._as_index, dropna=self._dropna, + api_method=api_method, ) if want_aggfunc_level: agg_block = agg_block.with_column_labels( @@ -296,7 +312,7 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: agg_block = agg_block.with_column_labels(pd.Index(column_labels)) return df.DataFrame(agg_block) - def _agg_list(self, func: typing.Sequence) -> df.DataFrame: + def _agg_list(self, func: typing.Sequence, api_method: str) -> df.DataFrame: aggregations = [ (col_id, agg_ops.lookup_agg_func(f)) for col_id in self._aggregated_columns() @@ -310,6 +326,7 @@ def _agg_list(self, func: typing.Sequence) -> df.DataFrame: aggregations=aggregations, as_index=self._as_index, dropna=self._dropna, + api_method=api_method, ) agg_block = agg_block.with_column_labels( pd.MultiIndex.from_tuples( @@ -318,7 +335,11 @@ def _agg_list(self, func: typing.Sequence) -> df.DataFrame: ) return df.DataFrame(agg_block) - def _agg_named(self, **kwargs) -> df.DataFrame: + def _agg_named( + self, + api_method: str, + **kwargs, + ) -> df.DataFrame: aggregations = [] column_labels = [] for k, v in kwargs.items(): @@ -338,6 +359,7 @@ def _agg_named(self, **kwargs) -> df.DataFrame: aggregations=aggregations, as_index=self._as_index, dropna=self._dropna, + api_method=api_method, ) agg_block = agg_block.with_column_labels(column_labels) return df.DataFrame(agg_block) @@ -369,7 +391,10 @@ def _column_type(self, col_id: str) -> dtypes.Dtype: return dtype def _aggregate_all( - self, aggregate_op: agg_ops.AggregateOp, numeric_only: bool = False + self, + aggregate_op: agg_ops.AggregateOp, + numeric_only: bool = False, + api_method: str = "", ) -> df.DataFrame: aggregated_col_ids = self._aggregated_columns(numeric_only=numeric_only) aggregations = [(col_id, aggregate_op) for col_id in aggregated_col_ids] @@ -378,6 +403,7 @@ def _aggregate_all( aggregations=aggregations, as_index=self._as_index, dropna=self._dropna, + api_method=api_method, ) return df.DataFrame(result_block) @@ -471,7 +497,7 @@ def kurt(self, *args, **kwargs) -> series.Series: kurtosis = kurt def prod(self, *args) -> series.Series: - return self._aggregate(agg_ops.product_op) + return self._aggregate(agg_ops.product_op, "seriesgroupby-prod") def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]: column_names: list[str] = [] @@ -492,6 +518,7 @@ def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]: by_column_ids=self._by_col_ids, aggregations=aggregations, dropna=self._dropna, + api_method="seriesgroupby-agg", ) if column_names: @@ -584,11 +611,14 @@ def expanding(self, min_periods: int = 1) -> windows.Window: is_series=True, ) - def _aggregate(self, aggregate_op: agg_ops.AggregateOp) -> series.Series: + def _aggregate( + self, aggregate_op: agg_ops.AggregateOp, api_method: str = "" + ) -> series.Series: result_block, _ = self._block.aggregate( self._by_col_ids, ((self._value_column, aggregate_op),), dropna=self._dropna, + api_method=api_method, ) return series.Series(result_block.with_column_labels([self._value_name])) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 0815f497bc..86fe5e168a 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1629,8 +1629,9 @@ def agg( aggregations = [agg_ops.lookup_agg_func(f) for f in func] return DataFrame( self._block.summarize( - self._block.value_columns, - aggregations, + column_ids=self._block.value_columns, + stats=aggregations, + api_method="df-agg", ) ) else: diff --git a/bigframes/series.py b/bigframes/series.py index aa1da2f2ba..4cb346c4a4 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -735,8 +735,9 @@ def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series: aggregations = [agg_ops.lookup_agg_func(f) for f in func] return Series( self._block.summarize( - [self._value_column], - aggregations, + column_ids=[self._value_column], + stats=aggregations, + api_method="series-agg", ) ) else: @@ -786,6 +787,7 @@ def mode(self) -> Series: by_column_ids=[self._value_column], aggregations=((self._value_column, agg_ops.count_op),), as_index=False, + api_method="series-mode", ) value_count_col_id = agg_ids[0] block, max_value_count_col_id = block.apply_window_op( diff --git a/bigframes/session.py b/bigframes/session.py index ae4fb90ec6..65fe9697fa 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -1460,7 +1460,7 @@ def _start_query( sql: str, job_config: Optional[bigquery.job.QueryJobConfig] = None, max_results: Optional[int] = None, - api_methods: List[str] = [], + api_methods: Sequence[str] = [], ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """ Starts query job and waits for results @@ -1470,8 +1470,8 @@ def _start_query( if job_config.labels is None: label_values = api_methods else: - cur_labels: List[str] = [*job_config.labels.values()] - total_label_values = cur_labels + api_methods + cur_labels: Sequence[str] = [*job_config.labels.values()] + total_label_values = list(cur_labels) + list(api_methods) # If the total number of labels is under the limit of labels count if len(job_config.labels) + len(api_methods) <= _MAX_LABELS_COUNT: label_values = total_label_values From b255cb2d0ae344fc41bdfeabae1dc868ee3c75ac Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Tue, 3 Oct 2023 02:19:23 +0000 Subject: [PATCH 08/11] refactor: move the labels logic in io --- bigframes/core/io.py | 34 +++++++++++++++++++- bigframes/session.py | 27 +++------------- tests/unit/core/test_io.py | 65 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 24 deletions(-) diff --git a/bigframes/core/io.py b/bigframes/core/io.py index d47efbdddc..03c70351f5 100644 --- a/bigframes/core/io.py +++ b/bigframes/core/io.py @@ -17,11 +17,43 @@ import datetime import textwrap import types -from typing import Dict, Iterable, Union +from typing import Dict, Optional, Sequence, Tuple, Union, Iterable import google.cloud.bigquery as bigquery IO_ORDERING_ID = "bqdf_row_nums" +MAX_LABELS_COUNT = 64 + + +def create_job_configs_labels( + job_config: Optional[bigquery.job.QueryJobConfig], + api_methods: Sequence[str], +) -> Tuple[Dict[str, str], bigquery.job.QueryJobConfig]: + if job_config is not None: + # If there is no label set + if job_config.labels is None: + label_values = api_methods + else: + cur_labels: Sequence[str] = [*job_config.labels.values()] + cur_labels_len = len(job_config.labels) + api_methods_len = len(api_methods) + # If the total number of labels is under the limit of labels count + if cur_labels_len + api_methods_len <= MAX_LABELS_COUNT: + label_values = list(cur_labels) + list(api_methods) + # We capture the latest label if it is out of the length limit of labels count + else: + added_api_len = cur_labels_len + api_methods_len - MAX_LABELS_COUNT + label_values = list(cur_labels) + list(api_methods)[-added_api_len:] + else: + job_config = bigquery.QueryJobConfig() + label_values = api_methods + + labels = {} + for i, label_value in enumerate(label_values): + label_key = "bigframes-api-" + str(i) + labels[label_key] = label_value + job_config.labels = labels + return labels, job_config def create_export_csv_statement( diff --git a/bigframes/session.py b/bigframes/session.py index 65fe9697fa..359a5849f8 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -1465,29 +1465,10 @@ def _start_query( """ Starts query job and waits for results """ - if job_config is not None: - # If there is no label set - if job_config.labels is None: - label_values = api_methods - else: - cur_labels: Sequence[str] = [*job_config.labels.values()] - total_label_values = list(cur_labels) + list(api_methods) - # If the total number of labels is under the limit of labels count - if len(job_config.labels) + len(api_methods) <= _MAX_LABELS_COUNT: - label_values = total_label_values - # We capture the latest label if it is out of the length limit of labels count - else: - label_values = total_label_values[-_MAX_LABELS_COUNT:] - else: - job_config = bigquery.QueryJobConfig() - label_values = api_methods - - labels = {} - for i, label_value in enumerate(label_values): - label_key = "bigframes-api" + str(i) - labels[label_key] = label_value - job_config.labels = labels - query_job = self.bqclient.query(sql, job_config=job_config) + _, job_config = bigframes_io.create_job_configs_labels( + job_config=job_config, api_methods=api_methods + ) + query_job = self.bqclient.query(sql=sql, job_config=job_config) opts = bigframes.options.display if opts.progress_bar is not None and not query_job.configuration.dry_run: diff --git a/tests/unit/core/test_io.py b/tests/unit/core/test_io.py index afb38a5f75..1db3fd0a45 100644 --- a/tests/unit/core/test_io.py +++ b/tests/unit/core/test_io.py @@ -14,6 +14,7 @@ import datetime from typing import Iterable +from unittest import mock import google.cloud.bigquery as bigquery import pytest @@ -21,6 +22,70 @@ import bigframes.core.io +def test_create_job_configs_is_none(): + mock_job_config = mock.create_autospec(None) + api_methods = ["df-agg", "series-mode"] + labels, _ = bigframes.core.io.create_job_configs_labels( + job_config=mock_job_config, api_methods=api_methods + ) + expected_dict = {"bigframes-api-0": "df-agg", "bigframes-api-1": "series-mode"} + assert labels is not None + assert labels == expected_dict + + +def test_create_job_configs_labels_is_none(): + mock_job_config = mock.create_autospec(bigquery.QueryJobConfig()) + api_methods = ["df-agg", "series-mode"] + labels, _ = bigframes.core.io.create_job_configs_labels( + job_config=mock_job_config, api_methods=api_methods + ) + expected_dict = {"bigframes-api-0": "df-agg", "bigframes-api-1": "series-mode"} + assert labels is not None + assert labels == expected_dict + + +def test_create_job_configs_labels_length_limit_not_met(): + mock_job_config = mock.create_autospec(bigquery.QueryJobConfig()) + mock_job_config.labels = { + "bigframes-api-0": "test0", + "bigframes-api-1": "test1", + "bigframes-api-2": "test2", + } + api_methods = ["df-agg", "series-mode"] + labels, _ = bigframes.core.io.create_job_configs_labels( + job_config=mock_job_config, api_methods=api_methods + ) + expected_dict = { + "bigframes-api-0": "test0", + "bigframes-api-1": "test1", + "bigframes-api-2": "test2", + "bigframes-api-3": "df-agg", + "bigframes-api-4": "series-mode", + } + assert labels is not None + assert len(labels) == 5 + assert labels == expected_dict + + +def test_create_job_configs_labels_length_limit_met(): + mock_job_config = mock.create_autospec(bigquery.QueryJobConfig()) + cur_labels = {} + for i in range(63): + key = f"bigframes-api{i}" + value = f"test{i}" + cur_labels[key] = value + # If cur_labels length is 63, we can only add one label from api_methods + mock_job_config.labels = cur_labels + api_methods = ["df-agg", "series-mode"] + labels, _ = bigframes.core.io.create_job_configs_labels( + job_config=mock_job_config, api_methods=api_methods + ) + assert labels is not None + assert len(labels) == 64 + assert "df-agg" not in labels.values() + assert "series-mode" in labels.values() + + def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): table_ref = bigquery.TableReference.from_string( "my-test-project._e8166e0cdb.anonbb92cd" From 54f047242e608d8873aa65873597662769415cac Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Tue, 3 Oct 2023 18:04:24 +0000 Subject: [PATCH 09/11] fix: fix nit failure --- bigframes/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/session.py b/bigframes/session.py index 359a5849f8..e3b62203a7 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -1468,7 +1468,7 @@ def _start_query( _, job_config = bigframes_io.create_job_configs_labels( job_config=job_config, api_methods=api_methods ) - query_job = self.bqclient.query(sql=sql, job_config=job_config) + query_job = self.bqclient.query(sql, job_config=job_config) opts = bigframes.options.display if opts.progress_bar is not None and not query_job.configuration.dry_run: From ca17ccf41ec9367a6913ff5e31f8d92b87bc2f8c Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Tue, 3 Oct 2023 23:34:31 +0000 Subject: [PATCH 10/11] fix: preserve the existing labels --- bigframes/core/__init__.py | 1 - bigframes/core/io.py | 46 +++++++++++---------- bigframes/core/joins/single_column.py | 22 ----------- bigframes/session.py | 13 ++++-- tests/unit/core/test_io.py | 57 +++++++++++---------------- 5 files changed, 53 insertions(+), 86 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 364de48dee..9b05bd2ab2 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -45,7 +45,6 @@ ORDER_ID_COLUMN = "bigframes_ordering_id" PREDICATE_COLUMN = "bigframes_predicate" -MAX_LABELS_COUNT = 64 @dataclass(frozen=True) diff --git a/bigframes/core/io.py b/bigframes/core/io.py index 03c70351f5..9b25f3e5d2 100644 --- a/bigframes/core/io.py +++ b/bigframes/core/io.py @@ -17,7 +17,7 @@ import datetime import textwrap import types -from typing import Dict, Optional, Sequence, Tuple, Union, Iterable +from typing import Dict, Optional, Sequence, Union, Iterable import google.cloud.bigquery as bigquery @@ -26,34 +26,32 @@ def create_job_configs_labels( - job_config: Optional[bigquery.job.QueryJobConfig], + job_configs_labels: Optional[Dict[str, str]], api_methods: Sequence[str], -) -> Tuple[Dict[str, str], bigquery.job.QueryJobConfig]: - if job_config is not None: - # If there is no label set - if job_config.labels is None: - label_values = api_methods - else: - cur_labels: Sequence[str] = [*job_config.labels.values()] - cur_labels_len = len(job_config.labels) - api_methods_len = len(api_methods) - # If the total number of labels is under the limit of labels count - if cur_labels_len + api_methods_len <= MAX_LABELS_COUNT: - label_values = list(cur_labels) + list(api_methods) - # We capture the latest label if it is out of the length limit of labels count - else: - added_api_len = cur_labels_len + api_methods_len - MAX_LABELS_COUNT - label_values = list(cur_labels) + list(api_methods)[-added_api_len:] +) -> Dict[str, str]: + # If there is no label set + if job_configs_labels is None: + labels = {} + label_values = list(api_methods) else: - job_config = bigquery.QueryJobConfig() - label_values = api_methods + labels = job_configs_labels.copy() + cur_labels_len = len(job_configs_labels) + api_methods_len = len(api_methods) + # If the total number of labels is under the limit of labels count + if cur_labels_len + api_methods_len <= MAX_LABELS_COUNT: + label_values = list(api_methods) + # We capture the latest label if it is out of the length limit of labels count + else: + added_api_len = cur_labels_len + api_methods_len - MAX_LABELS_COUNT + label_values = list(api_methods)[-added_api_len:] - labels = {} for i, label_value in enumerate(label_values): - label_key = "bigframes-api-" + str(i) + if job_configs_labels is not None: + label_key = "bigframes-api-" + str(i + len(job_configs_labels)) + else: + label_key = "bigframes-api-" + str(i) labels[label_key] = label_value - job_config.labels = labels - return labels, job_config + return labels def create_export_csv_statement( diff --git a/bigframes/core/joins/single_column.py b/bigframes/core/joins/single_column.py index e067bc0519..2d616fc3f0 100644 --- a/bigframes/core/joins/single_column.py +++ b/bigframes/core/joins/single_column.py @@ -16,7 +16,6 @@ from __future__ import annotations -import itertools import itertools import typing from typing import Callable, Literal, Tuple @@ -130,25 +129,6 @@ def join_by_column( ) } - def get_column_left(col_id): - return lmapping[col_id] - - def get_column_right(col_id): - return rmapping[col_id] - - lmapping = { - col_id: guid.generate_guid() - for col_id in itertools.chain( - left.column_names, left._hidden_ordering_column_names - ) - } - rmapping = { - col_id: guid.generate_guid() - for col_id in itertools.chain( - right.column_names, right._hidden_ordering_column_names - ) - } - def get_column_left(col_id): return lmapping[col_id] @@ -166,8 +146,6 @@ def get_column_right(col_id): col_id_overrides=rmapping, ) join_conditions = [ - value_to_join_key(left_table[lmapping[left_index]]) - == value_to_join_key(right_table[rmapping[right_index]]) value_to_join_key(left_table[lmapping[left_index]]) == value_to_join_key(right_table[rmapping[right_index]]) for left_index, right_index in zip(left_column_ids, right_column_ids) diff --git a/bigframes/session.py b/bigframes/session.py index e3b62203a7..afdc10c8f4 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -98,7 +98,6 @@ _BIGQUERYSTORAGE_REGIONAL_ENDPOINT = "{location}-bigquerystorage.googleapis.com" _MAX_CLUSTER_COLUMNS = 4 -_MAX_LABELS_COUNT = 64 # TODO(swast): Need to connect to regional endpoints when performing remote # functions operations (BQ Connection IAM, Cloud Run / Cloud Functions). @@ -1465,9 +1464,15 @@ def _start_query( """ Starts query job and waits for results """ - _, job_config = bigframes_io.create_job_configs_labels( - job_config=job_config, api_methods=api_methods - ) + if job_config is not None: + job_config.labels = bigframes_io.create_job_configs_labels( + job_configs_labels=job_config.labels, api_methods=api_methods + ) + else: + job_config = bigquery.QueryJobConfig() + job_config.labels = bigframes_io.create_job_configs_labels( + job_configs_labels=None, api_methods=api_methods + ) query_job = self.bqclient.query(sql, job_config=job_config) opts = bigframes.options.display diff --git a/tests/unit/core/test_io.py b/tests/unit/core/test_io.py index 1db3fd0a45..b7550fcd89 100644 --- a/tests/unit/core/test_io.py +++ b/tests/unit/core/test_io.py @@ -14,7 +14,6 @@ import datetime from typing import Iterable -from unittest import mock import google.cloud.bigquery as bigquery import pytest @@ -22,22 +21,10 @@ import bigframes.core.io -def test_create_job_configs_is_none(): - mock_job_config = mock.create_autospec(None) - api_methods = ["df-agg", "series-mode"] - labels, _ = bigframes.core.io.create_job_configs_labels( - job_config=mock_job_config, api_methods=api_methods - ) - expected_dict = {"bigframes-api-0": "df-agg", "bigframes-api-1": "series-mode"} - assert labels is not None - assert labels == expected_dict - - def test_create_job_configs_labels_is_none(): - mock_job_config = mock.create_autospec(bigquery.QueryJobConfig()) api_methods = ["df-agg", "series-mode"] - labels, _ = bigframes.core.io.create_job_configs_labels( - job_config=mock_job_config, api_methods=api_methods + labels = bigframes.core.io.create_job_configs_labels( + job_configs_labels=None, api_methods=api_methods ) expected_dict = {"bigframes-api-0": "df-agg", "bigframes-api-1": "series-mode"} assert labels is not None @@ -45,45 +32,45 @@ def test_create_job_configs_labels_is_none(): def test_create_job_configs_labels_length_limit_not_met(): - mock_job_config = mock.create_autospec(bigquery.QueryJobConfig()) - mock_job_config.labels = { - "bigframes-api-0": "test0", - "bigframes-api-1": "test1", - "bigframes-api-2": "test2", + cur_labels = { + "bigframes-api": "read_pandas", + "source": "bigquery-dataframes-temp", } api_methods = ["df-agg", "series-mode"] - labels, _ = bigframes.core.io.create_job_configs_labels( - job_config=mock_job_config, api_methods=api_methods + labels = bigframes.core.io.create_job_configs_labels( + job_configs_labels=cur_labels, api_methods=api_methods ) expected_dict = { - "bigframes-api-0": "test0", - "bigframes-api-1": "test1", - "bigframes-api-2": "test2", - "bigframes-api-3": "df-agg", - "bigframes-api-4": "series-mode", + "bigframes-api": "read_pandas", + "source": "bigquery-dataframes-temp", + "bigframes-api-2": "df-agg", + "bigframes-api-3": "series-mode", } assert labels is not None - assert len(labels) == 5 + assert len(labels) == 4 assert labels == expected_dict def test_create_job_configs_labels_length_limit_met(): - mock_job_config = mock.create_autospec(bigquery.QueryJobConfig()) - cur_labels = {} - for i in range(63): - key = f"bigframes-api{i}" + cur_labels = { + "bigframes-api": "read_pandas", + "source": "bigquery-dataframes-temp", + } + for i in range(61): + key = f"bigframes-api-{i}" value = f"test{i}" cur_labels[key] = value # If cur_labels length is 63, we can only add one label from api_methods - mock_job_config.labels = cur_labels api_methods = ["df-agg", "series-mode"] - labels, _ = bigframes.core.io.create_job_configs_labels( - job_config=mock_job_config, api_methods=api_methods + labels = bigframes.core.io.create_job_configs_labels( + job_configs_labels=cur_labels, api_methods=api_methods ) assert labels is not None assert len(labels) == 64 assert "df-agg" not in labels.values() assert "series-mode" in labels.values() + assert "bigframes-api" in labels.keys() + assert "source" in labels.keys() def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): From 17ee92d507db1c8cdac2eb098f937ffa2f32e9eb Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 4 Oct 2023 03:21:09 +0000 Subject: [PATCH 11/11] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- bigframes/core/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/io.py b/bigframes/core/io.py index 9b25f3e5d2..511ec292a1 100644 --- a/bigframes/core/io.py +++ b/bigframes/core/io.py @@ -17,7 +17,7 @@ import datetime import textwrap import types -from typing import Dict, Optional, Sequence, Union, Iterable +from typing import Dict, Iterable, Optional, Sequence, Union import google.cloud.bigquery as bigquery