From 377941e56c23c9c73551da1fd64f3d27e539f0f9 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 28 May 2024 16:46:32 +0000 Subject: [PATCH 1/7] feat: add slot_millis and add stats to session object --- bigframes/session/__init__.py | 22 ++++++++++-- bigframes/session/_io/bigquery/__init__.py | 41 +++++++++++++++++----- 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 83481a3ae9..dcf8aa01c2 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -289,6 +289,10 @@ def __init__( nodes.BigFrameNode, nodes.BigFrameNode ] = weakref.WeakKeyDictionary() + # performance logging + self._bytes_processed_sum = 0 + self._slot_millis_sum = 0 + @property def bqclient(self): return self._clients_provider.bqclient @@ -338,6 +342,20 @@ def objects( def _project(self): return self.bqclient.project + @property + def bytes_processed_sum(self): + return self._bytes_processed_sum + + @property + def slot_millis_sum(self): + return self._slot_millis_sum + + def add_bytes_processed(self, amount: int): + self._bytes_processed_sum += amount + + def add_slot_millis(self, amount: int): + self._slot_millis_sum += amount + def __hash__(self): # Stable hash needed to use in expression tree return hash(str(self._anonymous_dataset)) @@ -1825,7 +1843,7 @@ def _start_query( """ job_config = self._prepare_query_job_config(job_config) return bigframes.session._io.bigquery.start_query_with_client( - self.bqclient, + self, sql, job_config, max_results, @@ -1849,7 +1867,7 @@ def _start_query_ml_ddl( job_config.destination_encryption_configuration = None return bigframes.session._io.bigquery.start_query_with_client( - self.bqclient, sql, job_config + self, sql, job_config ) def _cache_with_cluster_cols( diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index cd6847c312..a00daa53ae 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -219,7 +219,7 @@ def add_labels(job_config, api_name: Optional[str] = None): def start_query_with_client( - bq_client: bigquery.Client, + session: bigquery.session.Session, sql: str, job_config: bigquery.job.QueryJobConfig, max_results: Optional[int] = None, @@ -229,6 +229,7 @@ def start_query_with_client( """ Starts query job and waits for results. """ + bq_client: bigquery.Client = session.bqclient add_labels(job_config, api_name=api_name) try: @@ -246,14 +247,35 @@ def start_query_with_client( else: results_iterator = query_job.result(max_results=max_results) + bytes_processed, slot_millis = get_performance_stats(query_job) + session.add_bytes_processed(bytes_processed) + session.add_slot_millis(slot_millis) if LOGGING_NAME_ENV_VAR in os.environ: # when running notebooks via pytest nbmake - pytest_log_job(query_job) + write_stats_to_disk(bytes_processed, slot_millis) return results_iterator, query_job -def pytest_log_job(query_job: bigquery.QueryJob): +def get_performance_stats(query_job: bigquery.QueryJob): + bytes_processed = query_job.total_bytes_processed + if not isinstance(bytes_processed, int): + return # filter out mocks + if query_job.configuration.dry_run: + # dry run stats are just predictions of the real run + bytes_processed = 0 + + slot_millis = query_job.slot_millis + if not isinstance(slot_millis, int): + return # filter out mocks + if query_job.configuration.dry_run: + # dry run stats are just predictions of the real run + slot_millis = 0 + + return bytes_processed, slot_millis + + +def write_stats_to_disk(bytes_processed: int, slot_millis: int): """For pytest runs only, log information about the query job to a file in order to create a performance report. """ @@ -265,16 +287,17 @@ def pytest_log_job(query_job: bigquery.QueryJob): ) test_name = os.environ[LOGGING_NAME_ENV_VAR] current_directory = os.getcwd() - bytes_processed = query_job.total_bytes_processed - if not isinstance(bytes_processed, int): - return # filter out mocks - if query_job.configuration.dry_run: - # dry runs don't process their total_bytes_processed - bytes_processed = 0 + + # store bytes processed bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed") with open(bytes_file, "a") as f: f.write(str(bytes_processed) + "\n") + # store slot milliseconds + bytes_file = os.path.join(current_directory, test_name + ".slotmillis") + with open(bytes_file, "a") as f: + f.write(str(slot_millis) + "\n") + def delete_tables_matching_session_id( client: bigquery.Client, dataset: bigquery.DatasetReference, session_id: str From ade90d7f9279b7c2a050a778229901c8b27015fd Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 28 May 2024 17:02:32 +0000 Subject: [PATCH 2/7] fix none handling --- bigframes/session/_io/bigquery/__init__.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index a00daa53ae..e0d286b9b0 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -247,9 +247,11 @@ def start_query_with_client( else: results_iterator = query_job.result(max_results=max_results) - bytes_processed, slot_millis = get_performance_stats(query_job) - session.add_bytes_processed(bytes_processed) - session.add_slot_millis(slot_millis) + stats = get_performance_stats(query_job) + if stats is not None: + bytes_processed, slot_millis = stats + session.add_bytes_processed(bytes_processed) + session.add_slot_millis(slot_millis) if LOGGING_NAME_ENV_VAR in os.environ: # when running notebooks via pytest nbmake write_stats_to_disk(bytes_processed, slot_millis) @@ -257,17 +259,21 @@ def start_query_with_client( return results_iterator, query_job -def get_performance_stats(query_job: bigquery.QueryJob): +def get_performance_stats(query_job: bigquery.QueryJob) -> Optional[Tuple[int, int]]: + """Parse the query job for performance stats. + + Return None if the stats do not reflect real work done in bigquery. + """ bytes_processed = query_job.total_bytes_processed if not isinstance(bytes_processed, int): - return # filter out mocks + return None # filter out mocks if query_job.configuration.dry_run: # dry run stats are just predictions of the real run bytes_processed = 0 slot_millis = query_job.slot_millis if not isinstance(slot_millis, int): - return # filter out mocks + return None # filter out mocks if query_job.configuration.dry_run: # dry run stats are just predictions of the real run slot_millis = 0 From 8984a13586accd847f2bf814fbd4f9cb934586db Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 28 May 2024 17:07:37 +0000 Subject: [PATCH 3/7] fix indent --- bigframes/session/_io/bigquery/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index e0d286b9b0..1631797a79 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -252,9 +252,9 @@ def start_query_with_client( bytes_processed, slot_millis = stats session.add_bytes_processed(bytes_processed) session.add_slot_millis(slot_millis) - if LOGGING_NAME_ENV_VAR in os.environ: - # when running notebooks via pytest nbmake - write_stats_to_disk(bytes_processed, slot_millis) + if LOGGING_NAME_ENV_VAR in os.environ: + # when running notebooks via pytest nbmake + write_stats_to_disk(bytes_processed, slot_millis) return results_iterator, query_job From e57fb79419d4e8def820d5a61d70af1360967c3c Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 28 May 2024 17:12:49 +0000 Subject: [PATCH 4/7] make incrementor internal --- bigframes/session/__init__.py | 6 ++++-- bigframes/session/_io/bigquery/__init__.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index dcf8aa01c2..d47edc7482 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -350,10 +350,12 @@ def bytes_processed_sum(self): def slot_millis_sum(self): return self._slot_millis_sum - def add_bytes_processed(self, amount: int): + def _add_bytes_processed(self, amount: int): + """Increment bytes_processed_sum by amount.""" self._bytes_processed_sum += amount - def add_slot_millis(self, amount: int): + def _add_slot_millis(self, amount: int): + """Increment slot_millis_sum by amount.""" self._slot_millis_sum += amount def __hash__(self): diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 1631797a79..61f8275c0a 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -250,8 +250,8 @@ def start_query_with_client( stats = get_performance_stats(query_job) if stats is not None: bytes_processed, slot_millis = stats - session.add_bytes_processed(bytes_processed) - session.add_slot_millis(slot_millis) + session._add_bytes_processed(bytes_processed) + session._add_slot_millis(slot_millis) if LOGGING_NAME_ENV_VAR in os.environ: # when running notebooks via pytest nbmake write_stats_to_disk(bytes_processed, slot_millis) From 6ade4cf0d2837436090c967782f0591c642ac4b7 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 28 May 2024 17:45:51 +0000 Subject: [PATCH 5/7] update noxfile --- bigframes/session/_io/bigquery/__init__.py | 2 +- noxfile.py | 57 ++++++++++++++++------ 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 61f8275c0a..6afa86aa2d 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -219,7 +219,7 @@ def add_labels(job_config, api_name: Optional[str] = None): def start_query_with_client( - session: bigquery.session.Session, + session: bigframes.session.Session, sql: str, job_config: bigquery.job.QueryJobConfig, max_results: Optional[int] = None, diff --git a/noxfile.py b/noxfile.py index c6e8da8c81..14c1b749e0 100644 --- a/noxfile.py +++ b/noxfile.py @@ -800,33 +800,58 @@ def notebook(session: nox.Session): for process in processes: process.join() - # when run via pytest, notebooks output a .bytesprocessed report + # when the environment variable is set as it is above, + # notebooks output a .bytesprocessed and .slotmillis report # collect those reports and print a summary - _print_bytes_processed_report() + _print_performance_report() -def _print_bytes_processed_report(): +def _print_performance_report(): """Add an informational report about http queries and bytes processed to the testlog output for purposes of measuring bigquery-related performance changes. """ print("---BIGQUERY USAGE REPORT---") - cumulative_queries = 0 - cumulative_bytes = 0 - for report in Path("notebooks/").glob("*/*.bytesprocessed"): - with open(report, "r") as f: - filename = report.stem - lines = f.read().splitlines() + results_dict = {} + for bytes_report in Path("notebooks/").glob("*/*.bytesprocessed"): + with open(bytes_report, "r") as bytes_file: + filename = bytes_report.stem + lines = bytes_file.read().splitlines() query_count = len(lines) total_bytes = sum([int(line) for line in lines]) - format_string = f"{filename} - query count: {query_count}, bytes processed sum: {total_bytes}" - print(format_string) - cumulative_bytes += total_bytes - cumulative_queries += query_count - print( - "---total queries: {total_queries}, total bytes: {total_bytes}---".format( - total_queries=cumulative_queries, total_bytes=cumulative_bytes + results_dict[filename] = [query_count, total_bytes] + for millis_report in Path("notebooks/").glob("*/*.slotmillis"): + with open(millis_report, "r") as millis_file: + filename = millis_report.stem + lines = millis_file.read().splitlines() + total_slot_millis = sum([int(line) for line in lines]) + results_dict[filename] += [total_slot_millis] + + cumulative_queries = 0 + cumulative_bytes = 0 + cumulative_slot_millis = 0 + for results in results_dict.values(): + if len(results) != 3: + raise IOError( + "Mismatch in performance logging output. " + "Expected one .bytesprocessed and one .slotmillis " + "file for each notebook." + ) + query_count, total_bytes, total_slot_millis = results + cumulative_queries += query_count + cumulative_bytes += total_bytes + cumulative_slot_millis += total_slot_millis + format_string = ( + f"{filename} - query count: {query_count}," + f" bytes processed sum: {total_bytes}," + f" slot millis sum: {total_slot_millis}" ) + print(format_string) + + print( + f"---total queries: {cumulative_queries}, " + f"total bytes: {cumulative_bytes}, " + f"total slot millis: {cumulative_slot_millis}---" ) From 5ff4cae53a98616f8d37b0ccc7b9ba684aad66d7 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 28 May 2024 17:47:28 +0000 Subject: [PATCH 6/7] update comment --- noxfile.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/noxfile.py b/noxfile.py index 14c1b749e0..37cd8d0521 100644 --- a/noxfile.py +++ b/noxfile.py @@ -807,9 +807,9 @@ def notebook(session: nox.Session): def _print_performance_report(): - """Add an informational report about http queries and bytes - processed to the testlog output for purposes of measuring - bigquery-related performance changes. + """Add an informational report about http queries, bytes + processed, and slot time to the testlog output for purposes + of measuring bigquery-related performance changes. """ print("---BIGQUERY USAGE REPORT---") results_dict = {} From 7c25abd98f0c0ad00a75b9bee0dda9ea34ccaf30 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 28 May 2024 18:01:14 +0000 Subject: [PATCH 7/7] add comment --- bigframes/session/__init__.py | 2 ++ noxfile.py | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index d47edc7482..07bb6ddce0 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -344,10 +344,12 @@ def _project(self): @property def bytes_processed_sum(self): + """The sum of all bytes processed by bigquery jobs using this session.""" return self._bytes_processed_sum @property def slot_millis_sum(self): + """The sum of all slot time used by bigquery jobs in this session.""" return self._slot_millis_sum def _add_bytes_processed(self, amount: int): diff --git a/noxfile.py b/noxfile.py index 37cd8d0521..52583bbf1a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -841,12 +841,11 @@ def _print_performance_report(): cumulative_queries += query_count cumulative_bytes += total_bytes cumulative_slot_millis += total_slot_millis - format_string = ( + print( f"{filename} - query count: {query_count}," f" bytes processed sum: {total_bytes}," f" slot millis sum: {total_slot_millis}" ) - print(format_string) print( f"---total queries: {cumulative_queries}, "