diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 8008c1189a..9b05bd2ab2 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -94,6 +94,7 @@ def __init__( hidden_ordering_columns: Optional[Sequence[ibis_types.Value]] = None, ordering: ExpressionOrdering = ExpressionOrdering(), predicates: Optional[Collection[ibis_types.BooleanValue]] = None, + api_methods: Sequence[str] = [], ): self._session = session self._table = table @@ -120,6 +121,9 @@ def __init__( self._hidden_ordering_column_names = { column.get_name(): column for column in self._hidden_ordering_columns } + + self._api_methods = api_methods + ### Validation value_col_ids = self._column_names.keys() hidden_col_ids = self._hidden_ordering_column_names.keys() @@ -234,6 +238,10 @@ def _ibis_order(self) -> Sequence[ibis_types.Value]: self._ordering.all_ordering_columns, ) + @property + def api_methods(self) -> Sequence[str]: + return self._api_methods + def builder(self) -> ArrayValueBuilder: """Creates a mutable builder for expressions.""" # Since ArrayValue is intended to be immutable (immutability offers @@ -456,11 +464,13 @@ def projection(self, columns: Iterable[ibis_types.Value]) -> ArrayValue: new_expr = builder.build() return new_expr - def shape(self) -> typing.Tuple[int, int]: + def shape(self, api_method: str) -> typing.Tuple[int, int]: """Returns dimensions as (length, width) tuple.""" width = len(self.columns) count_expr = self._to_ibis_expr(ordering_mode="unordered").count() sql = self._session.ibis_client.compile(count_expr) + add_api_methods = list(self.api_methods) + add_api_methods.append(api_method) # Support in-memory engines for hermetic unit tests. if not isinstance(sql, str): @@ -469,6 +479,7 @@ def shape(self) -> typing.Tuple[int, int]: row_iterator, _ = self._session._start_query( sql=sql, max_results=1, + api_methods=add_api_methods, ) length = next(row_iterator)[0] return (length, width) @@ -566,6 +577,7 @@ def aggregate( aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp, str]], by_column_ids: typing.Sequence[str] = (), dropna: bool = True, + api_method: str = "", ) -> ArrayValue: """ Apply aggregations to the expression. @@ -579,6 +591,8 @@ def aggregate( col_out: agg_op._as_ibis(table[col_in]) for col_in, agg_op, col_out in aggregations } + add_api_methods = list(self.api_methods) + add_api_methods.append(api_method) if by_column_ids: result = table.group_by(by_column_ids).aggregate(**stats) # Must have deterministic ordering, so order by the unique "by" column @@ -590,7 +604,13 @@ def aggregate( total_ordering_columns=frozenset(by_column_ids), ) columns = tuple(result[key] for key in result.columns) - expr = ArrayValue(self._session, result, columns=columns, ordering=ordering) + expr = ArrayValue( + self._session, + result, + columns=columns, + ordering=ordering, + api_methods=add_api_methods, + ) if dropna: for column_id in by_column_ids: expr = expr._filter( @@ -613,6 +633,7 @@ def aggregate( columns=[result[col_id] for col_id in [*stats.keys()]], hidden_ordering_columns=[result[ORDER_ID_COLUMN]], ordering=ordering, + api_methods=add_api_methods, ) def corr_aggregate( @@ -871,6 +892,7 @@ def start_query( job_config: Optional[bigquery.job.QueryJobConfig] = None, max_results: Optional[int] = None, expose_extra_columns: bool = False, + api_name: str = "", ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """Execute a query and return metadata about the results.""" # TODO(swast): Cache the job ID so we can look it up again if they ask @@ -885,10 +907,14 @@ def start_query( # maybe we just print the job metadata that we have so far? table = self._to_ibis_expr(expose_hidden_cols=expose_extra_columns) sql = self._session.ibis_client.compile(table) # type:ignore + + add_api_methods = list(self.api_methods) + add_api_methods.append(api_name) return self._session._start_query( sql=sql, job_config=job_config, max_results=max_results, + api_methods=add_api_methods, ) def _get_table_size(self, destination_table): diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 0161d17361..c189902191 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -134,7 +134,7 @@ def index(self) -> indexes.IndexValue: @functools.cached_property def shape(self) -> typing.Tuple[int, int]: """Returns dimensions as (length, width) tuple.""" - impl_length, _ = self._expr.shape() + impl_length, _ = self._expr.shape("shape") return (impl_length, len(self.value_columns)) @property @@ -429,6 +429,7 @@ def to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, random_state=random_state, + api_name="to_pandas", ) return df, query_job @@ -439,13 +440,16 @@ def _compute_and_count( max_download_size: Optional[int] = None, sampling_method: Optional[str] = None, random_state: Optional[int] = None, + api_name: str = "", ) -> Tuple[pd.DataFrame, int, bigquery.QueryJob]: """Run query and download results as a pandas DataFrame. Return the total number of results as well.""" # TODO(swast): Allow for dry run and timeout. expr = self._apply_value_keys_to_expr(value_keys=value_keys) results_iterator, query_job = expr.start_query( - max_results=max_results, expose_extra_columns=True + max_results=max_results, + expose_extra_columns=True, + api_name=api_name, ) table_size = expr._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES @@ -602,11 +606,13 @@ def _split( return [sliced_block.drop_columns(drop_cols) for sliced_block in sliced_blocks] def _compute_dry_run( - self, value_keys: Optional[Iterable[str]] = None + self, + value_keys: Optional[Iterable[str]] = None, + api_name: str = "", ) -> bigquery.QueryJob: expr = self._apply_value_keys_to_expr(value_keys=value_keys) job_config = bigquery.QueryJobConfig(dry_run=True) - _, query_job = expr.start_query(job_config=job_config) + _, query_job = expr.start_query(job_config=job_config, api_name=api_name) return query_job def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None): @@ -924,6 +930,7 @@ def aggregate( *, as_index: bool = True, dropna: bool = True, + api_method: str = "", ) -> typing.Tuple[Block, typing.Sequence[str]]: """ Apply aggregations to the block. Callers responsible for setting index column(s) after. @@ -938,7 +945,9 @@ def aggregate( for input_id, operation in aggregations ] output_col_ids = [agg_spec[2] for agg_spec in agg_specs] - result_expr = self.expr.aggregate(agg_specs, by_column_ids, dropna=dropna) + result_expr = self.expr.aggregate( + agg_specs, by_column_ids, dropna=dropna, api_method=api_method + ) aggregate_labels = self._get_labels_for_columns( [agg[0] for agg in aggregations] @@ -1026,6 +1035,7 @@ def summarize( self, column_ids: typing.Sequence[str], stats: typing.Sequence[agg_ops.AggregateOp], + api_method: str, ): """Get a list of stats as a deferred block object.""" label_col_id = guid.generate_guid() @@ -1039,7 +1049,9 @@ def summarize( (col_id, [f"{col_id}-{stat.name}" for stat in stats]) for col_id in column_ids ] - expr = self.expr.aggregate(aggregations).unpivot( + expr = self.expr.aggregate( + aggregations=aggregations, api_method=api_method + ).unpivot( labels, unpivot_columns=columns, index_col_ids=[label_col_id], diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 9be7f22a71..7db44f81db 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -104,12 +104,16 @@ def __getitem__( def sum(self, numeric_only: bool = False, *args) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("sum") - return self._aggregate_all(agg_ops.sum_op, numeric_only=True) + return self._aggregate_all( + agg_ops.sum_op, numeric_only=True, api_method="dfgroupby-sum" + ) def mean(self, numeric_only: bool = False, *args) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("mean") - return self._aggregate_all(agg_ops.mean_op, numeric_only=True) + return self._aggregate_all( + agg_ops.mean_op, numeric_only=True, api_method="dfgroupby-mean" + ) def median( self, numeric_only: bool = False, *, exact: bool = False @@ -120,13 +124,19 @@ def median( ) if not numeric_only: self._raise_on_non_numeric("median") - return self._aggregate_all(agg_ops.median_op, numeric_only=True) + return self._aggregate_all( + agg_ops.median_op, numeric_only=True, api_method="dfgroupby-median" + ) def min(self, numeric_only: bool = False, *args) -> df.DataFrame: - return self._aggregate_all(agg_ops.min_op, numeric_only=numeric_only) + return self._aggregate_all( + agg_ops.min_op, numeric_only=numeric_only, api_method="dfgroupby-min" + ) def max(self, numeric_only: bool = False, *args) -> df.DataFrame: - return self._aggregate_all(agg_ops.max_op, numeric_only=numeric_only) + return self._aggregate_all( + agg_ops.max_op, numeric_only=numeric_only, api_method="dfgroupby-max" + ) def std( self, @@ -135,7 +145,9 @@ def std( ) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("std") - return self._aggregate_all(agg_ops.std_op, numeric_only=True) + return self._aggregate_all( + agg_ops.std_op, numeric_only=True, api_method="dfgroupby-std" + ) def var( self, @@ -144,7 +156,9 @@ def var( ) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("var") - return self._aggregate_all(agg_ops.var_op, numeric_only=True) + return self._aggregate_all( + agg_ops.var_op, numeric_only=True, api_method="dfgroupby-var" + ) def skew( self, @@ -169,13 +183,13 @@ def kurt( kurtosis = kurt def all(self) -> df.DataFrame: - return self._aggregate_all(agg_ops.all_op) + return self._aggregate_all(agg_ops.all_op, api_method="dfgroupby-all") def any(self) -> df.DataFrame: - return self._aggregate_all(agg_ops.any_op) + return self._aggregate_all(agg_ops.any_op, api_method="dfgroupby-any") def count(self) -> df.DataFrame: - return self._aggregate_all(agg_ops.count_op) + return self._aggregate_all(agg_ops.count_op, api_method="dfgroupby-count") def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: if not numeric_only: @@ -240,19 +254,19 @@ def expanding(self, min_periods: int = 1) -> windows.Window: def agg(self, func=None, **kwargs) -> df.DataFrame: if func: if isinstance(func, str): - return self._agg_string(func) + return self._agg_string(func, "dfgroupby-agg") elif utils.is_dict_like(func): - return self._agg_dict(func) + return self._agg_dict(func, "dfgroupby-agg") elif utils.is_list_like(func): - return self._agg_list(func) + return self._agg_list(func, "dfgroupby-agg") else: raise NotImplementedError( f"Aggregate with {func} not supported. {constants.FEEDBACK_LINK}" ) else: - return self._agg_named(**kwargs) + return self._agg_named("dfgroupby-agg", **kwargs) - def _agg_string(self, func: str) -> df.DataFrame: + def _agg_string(self, func: str, api_method: str) -> df.DataFrame: aggregations = [ (col_id, agg_ops.lookup_agg_func(func)) for col_id in self._aggregated_columns() @@ -262,10 +276,11 @@ def _agg_string(self, func: str) -> df.DataFrame: aggregations=aggregations, as_index=self._as_index, dropna=self._dropna, + api_method=api_method, ) return df.DataFrame(agg_block) - def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: + def _agg_dict(self, func: typing.Mapping, api_method: str) -> df.DataFrame: aggregations: typing.List[typing.Tuple[str, agg_ops.AggregateOp]] = [] column_labels = [] @@ -284,6 +299,7 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: aggregations=aggregations, as_index=self._as_index, dropna=self._dropna, + api_method=api_method, ) if want_aggfunc_level: agg_block = agg_block.with_column_labels( @@ -296,7 +312,7 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: agg_block = agg_block.with_column_labels(pd.Index(column_labels)) return df.DataFrame(agg_block) - def _agg_list(self, func: typing.Sequence) -> df.DataFrame: + def _agg_list(self, func: typing.Sequence, api_method: str) -> df.DataFrame: aggregations = [ (col_id, agg_ops.lookup_agg_func(f)) for col_id in self._aggregated_columns() @@ -310,6 +326,7 @@ def _agg_list(self, func: typing.Sequence) -> df.DataFrame: aggregations=aggregations, as_index=self._as_index, dropna=self._dropna, + api_method=api_method, ) agg_block = agg_block.with_column_labels( pd.MultiIndex.from_tuples( @@ -318,7 +335,11 @@ def _agg_list(self, func: typing.Sequence) -> df.DataFrame: ) return df.DataFrame(agg_block) - def _agg_named(self, **kwargs) -> df.DataFrame: + def _agg_named( + self, + api_method: str, + **kwargs, + ) -> df.DataFrame: aggregations = [] column_labels = [] for k, v in kwargs.items(): @@ -338,6 +359,7 @@ def _agg_named(self, **kwargs) -> df.DataFrame: aggregations=aggregations, as_index=self._as_index, dropna=self._dropna, + api_method=api_method, ) agg_block = agg_block.with_column_labels(column_labels) return df.DataFrame(agg_block) @@ -369,7 +391,10 @@ def _column_type(self, col_id: str) -> dtypes.Dtype: return dtype def _aggregate_all( - self, aggregate_op: agg_ops.AggregateOp, numeric_only: bool = False + self, + aggregate_op: agg_ops.AggregateOp, + numeric_only: bool = False, + api_method: str = "", ) -> df.DataFrame: aggregated_col_ids = self._aggregated_columns(numeric_only=numeric_only) aggregations = [(col_id, aggregate_op) for col_id in aggregated_col_ids] @@ -378,6 +403,7 @@ def _aggregate_all( aggregations=aggregations, as_index=self._as_index, dropna=self._dropna, + api_method=api_method, ) return df.DataFrame(result_block) @@ -471,7 +497,7 @@ def kurt(self, *args, **kwargs) -> series.Series: kurtosis = kurt def prod(self, *args) -> series.Series: - return self._aggregate(agg_ops.product_op) + return self._aggregate(agg_ops.product_op, "seriesgroupby-prod") def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]: column_names: list[str] = [] @@ -492,6 +518,7 @@ def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]: by_column_ids=self._by_col_ids, aggregations=aggregations, dropna=self._dropna, + api_method="seriesgroupby-agg", ) if column_names: @@ -584,11 +611,14 @@ def expanding(self, min_periods: int = 1) -> windows.Window: is_series=True, ) - def _aggregate(self, aggregate_op: agg_ops.AggregateOp) -> series.Series: + def _aggregate( + self, aggregate_op: agg_ops.AggregateOp, api_method: str = "" + ) -> series.Series: result_block, _ = self._block.aggregate( self._by_col_ids, ((self._value_column, aggregate_op),), dropna=self._dropna, + api_method=api_method, ) return series.Series(result_block.with_column_labels([self._value_name])) diff --git a/bigframes/core/io.py b/bigframes/core/io.py index d47efbdddc..511ec292a1 100644 --- a/bigframes/core/io.py +++ b/bigframes/core/io.py @@ -17,11 +17,41 @@ import datetime import textwrap import types -from typing import Dict, Iterable, Union +from typing import Dict, Iterable, Optional, Sequence, Union import google.cloud.bigquery as bigquery IO_ORDERING_ID = "bqdf_row_nums" +MAX_LABELS_COUNT = 64 + + +def create_job_configs_labels( + job_configs_labels: Optional[Dict[str, str]], + api_methods: Sequence[str], +) -> Dict[str, str]: + # If there is no label set + if job_configs_labels is None: + labels = {} + label_values = list(api_methods) + else: + labels = job_configs_labels.copy() + cur_labels_len = len(job_configs_labels) + api_methods_len = len(api_methods) + # If the total number of labels is under the limit of labels count + if cur_labels_len + api_methods_len <= MAX_LABELS_COUNT: + label_values = list(api_methods) + # We capture the latest label if it is out of the length limit of labels count + else: + added_api_len = cur_labels_len + api_methods_len - MAX_LABELS_COUNT + label_values = list(api_methods)[-added_api_len:] + + for i, label_value in enumerate(label_values): + if job_configs_labels is not None: + label_key = "bigframes-api-" + str(i + len(job_configs_labels)) + else: + label_key = "bigframes-api-" + str(i) + labels[label_key] = label_value + return labels def create_export_csv_statement( diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index eea8beb130..86fe5e168a 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -885,7 +885,7 @@ def to_pandas( return df.set_axis(self._block.column_labels, axis=1, copy=False) def _compute_dry_run(self) -> bigquery.QueryJob: - return self._block._compute_dry_run() + return self._block._compute_dry_run(api_name="dataframe_query_job") def copy(self) -> DataFrame: return DataFrame(self._block) @@ -1629,8 +1629,9 @@ def agg( aggregations = [agg_ops.lookup_agg_func(f) for f in func] return DataFrame( self._block.summarize( - self._block.value_columns, - aggregations, + column_ids=self._block.value_columns, + stats=aggregations, + api_method="df-agg", ) ) else: diff --git a/bigframes/series.py b/bigframes/series.py index 8815a6abde..4cb346c4a4 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -274,7 +274,9 @@ def to_pandas( return series def _compute_dry_run(self) -> bigquery.QueryJob: - return self._block._compute_dry_run((self._value_column,)) + return self._block._compute_dry_run( + value_keys=(self._value_column,), api_name="series_query_job" + ) def drop( self, @@ -733,8 +735,9 @@ def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series: aggregations = [agg_ops.lookup_agg_func(f) for f in func] return Series( self._block.summarize( - [self._value_column], - aggregations, + column_ids=[self._value_column], + stats=aggregations, + api_method="series-agg", ) ) else: @@ -784,6 +787,7 @@ def mode(self) -> Series: by_column_ids=[self._value_column], aggregations=((self._value_column, agg_ops.count_op),), as_index=False, + api_method="series-mode", ) value_count_col_id = agg_ids[0] block, max_value_count_col_id = block.apply_window_op( diff --git a/bigframes/session.py b/bigframes/session.py index ac48c977cb..afdc10c8f4 100644 --- a/bigframes/session.py +++ b/bigframes/session.py @@ -1459,14 +1459,21 @@ def _start_query( sql: str, job_config: Optional[bigquery.job.QueryJobConfig] = None, max_results: Optional[int] = None, + api_methods: Sequence[str] = [], ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """ Starts query job and waits for results """ if job_config is not None: - query_job = self.bqclient.query(sql, job_config=job_config) + job_config.labels = bigframes_io.create_job_configs_labels( + job_configs_labels=job_config.labels, api_methods=api_methods + ) else: - query_job = self.bqclient.query(sql) + job_config = bigquery.QueryJobConfig() + job_config.labels = bigframes_io.create_job_configs_labels( + job_configs_labels=None, api_methods=api_methods + ) + query_job = self.bqclient.query(sql, job_config=job_config) opts = bigframes.options.display if opts.progress_bar is not None and not query_job.configuration.dry_run: diff --git a/tests/unit/core/test_io.py b/tests/unit/core/test_io.py index afb38a5f75..b7550fcd89 100644 --- a/tests/unit/core/test_io.py +++ b/tests/unit/core/test_io.py @@ -21,6 +21,58 @@ import bigframes.core.io +def test_create_job_configs_labels_is_none(): + api_methods = ["df-agg", "series-mode"] + labels = bigframes.core.io.create_job_configs_labels( + job_configs_labels=None, api_methods=api_methods + ) + expected_dict = {"bigframes-api-0": "df-agg", "bigframes-api-1": "series-mode"} + assert labels is not None + assert labels == expected_dict + + +def test_create_job_configs_labels_length_limit_not_met(): + cur_labels = { + "bigframes-api": "read_pandas", + "source": "bigquery-dataframes-temp", + } + api_methods = ["df-agg", "series-mode"] + labels = bigframes.core.io.create_job_configs_labels( + job_configs_labels=cur_labels, api_methods=api_methods + ) + expected_dict = { + "bigframes-api": "read_pandas", + "source": "bigquery-dataframes-temp", + "bigframes-api-2": "df-agg", + "bigframes-api-3": "series-mode", + } + assert labels is not None + assert len(labels) == 4 + assert labels == expected_dict + + +def test_create_job_configs_labels_length_limit_met(): + cur_labels = { + "bigframes-api": "read_pandas", + "source": "bigquery-dataframes-temp", + } + for i in range(61): + key = f"bigframes-api-{i}" + value = f"test{i}" + cur_labels[key] = value + # If cur_labels length is 63, we can only add one label from api_methods + api_methods = ["df-agg", "series-mode"] + labels = bigframes.core.io.create_job_configs_labels( + job_configs_labels=cur_labels, api_methods=api_methods + ) + assert labels is not None + assert len(labels) == 64 + assert "df-agg" not in labels.values() + assert "series-mode" in labels.values() + assert "bigframes-api" in labels.keys() + assert "source" in labels.keys() + + def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): table_ref = bigquery.TableReference.from_string( "my-test-project._e8166e0cdb.anonbb92cd"