Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: track slot time and add performance stats accessors to session object #725

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions 26 bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ def __init__(
nodes.BigFrameNode, nodes.BigFrameNode
] = weakref.WeakKeyDictionary()

# performance logging
self._bytes_processed_sum = 0
self._slot_millis_sum = 0

@property
def bqclient(self):
return self._clients_provider.bqclient
Expand Down Expand Up @@ -338,6 +342,24 @@ def objects(
def _project(self):
return self.bqclient.project

@property
def bytes_processed_sum(self):
"""The sum of all bytes processed by bigquery jobs using this session."""
return self._bytes_processed_sum
milkshakeiii marked this conversation as resolved.
Show resolved Hide resolved

@property
def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
return self._slot_millis_sum

def _add_bytes_processed(self, amount: int):
"""Increment bytes_processed_sum by amount."""
self._bytes_processed_sum += amount

def _add_slot_millis(self, amount: int):
"""Increment slot_millis_sum by amount."""
self._slot_millis_sum += amount

def __hash__(self):
# Stable hash needed to use in expression tree
return hash(str(self._anonymous_dataset))
Expand Down Expand Up @@ -1825,7 +1847,7 @@ def _start_query(
"""
job_config = self._prepare_query_job_config(job_config)
return bigframes.session._io.bigquery.start_query_with_client(
self.bqclient,
self,
sql,
job_config,
max_results,
Expand All @@ -1849,7 +1871,7 @@ def _start_query_ml_ddl(
job_config.destination_encryption_configuration = None

return bigframes.session._io.bigquery.start_query_with_client(
self.bqclient, sql, job_config
self, sql, job_config
)

def _cache_with_cluster_cols(
Expand Down
51 changes: 40 additions & 11 deletions 51 bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def add_labels(job_config, api_name: Optional[str] = None):


def start_query_with_client(
bq_client: bigquery.Client,
session: bigframes.session.Session,
sql: str,
job_config: bigquery.job.QueryJobConfig,
max_results: Optional[int] = None,
Expand All @@ -229,6 +229,7 @@ def start_query_with_client(
"""
Starts query job and waits for results.
"""
bq_client: bigquery.Client = session.bqclient
add_labels(job_config, api_name=api_name)

try:
Expand All @@ -246,14 +247,41 @@ def start_query_with_client(
else:
results_iterator = query_job.result(max_results=max_results)

if LOGGING_NAME_ENV_VAR in os.environ:
# when running notebooks via pytest nbmake
pytest_log_job(query_job)
stats = get_performance_stats(query_job)
if stats is not None:
bytes_processed, slot_millis = stats
session._add_bytes_processed(bytes_processed)
session._add_slot_millis(slot_millis)
if LOGGING_NAME_ENV_VAR in os.environ:
# when running notebooks via pytest nbmake
write_stats_to_disk(bytes_processed, slot_millis)

return results_iterator, query_job


def pytest_log_job(query_job: bigquery.QueryJob):
def get_performance_stats(query_job: bigquery.QueryJob) -> Optional[Tuple[int, int]]:
"""Parse the query job for performance stats.

Return None if the stats do not reflect real work done in bigquery.
"""
bytes_processed = query_job.total_bytes_processed
if not isinstance(bytes_processed, int):
return None # filter out mocks
if query_job.configuration.dry_run:
# dry run stats are just predictions of the real run
bytes_processed = 0

slot_millis = query_job.slot_millis
if not isinstance(slot_millis, int):
return None # filter out mocks
if query_job.configuration.dry_run:
# dry run stats are just predictions of the real run
slot_millis = 0

return bytes_processed, slot_millis


def write_stats_to_disk(bytes_processed: int, slot_millis: int):
"""For pytest runs only, log information about the query job
to a file in order to create a performance report.
"""
Expand All @@ -265,16 +293,17 @@ def pytest_log_job(query_job: bigquery.QueryJob):
)
test_name = os.environ[LOGGING_NAME_ENV_VAR]
current_directory = os.getcwd()
bytes_processed = query_job.total_bytes_processed
if not isinstance(bytes_processed, int):
return # filter out mocks
if query_job.configuration.dry_run:
# dry runs don't process their total_bytes_processed
bytes_processed = 0

# store bytes processed
bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed")
with open(bytes_file, "a") as f:
f.write(str(bytes_processed) + "\n")

# store slot milliseconds
bytes_file = os.path.join(current_directory, test_name + ".slotmillis")
with open(bytes_file, "a") as f:
f.write(str(slot_millis) + "\n")


def delete_tables_matching_session_id(
client: bigquery.Client, dataset: bigquery.DatasetReference, session_id: str
Expand Down
62 changes: 43 additions & 19 deletions 62 noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,33 +800,57 @@ def notebook(session: nox.Session):
for process in processes:
process.join()

# when run via pytest, notebooks output a .bytesprocessed report
# when the environment variable is set as it is above,
# notebooks output a .bytesprocessed and .slotmillis report
# collect those reports and print a summary
_print_bytes_processed_report()
_print_performance_report()


def _print_bytes_processed_report():
"""Add an informational report about http queries and bytes
processed to the testlog output for purposes of measuring
bigquery-related performance changes.
def _print_performance_report():
"""Add an informational report about http queries, bytes
processed, and slot time to the testlog output for purposes
of measuring bigquery-related performance changes.
"""
print("---BIGQUERY USAGE REPORT---")
cumulative_queries = 0
cumulative_bytes = 0
for report in Path("notebooks/").glob("*/*.bytesprocessed"):
with open(report, "r") as f:
filename = report.stem
lines = f.read().splitlines()
results_dict = {}
for bytes_report in Path("notebooks/").glob("*/*.bytesprocessed"):
with open(bytes_report, "r") as bytes_file:
filename = bytes_report.stem
lines = bytes_file.read().splitlines()
query_count = len(lines)
total_bytes = sum([int(line) for line in lines])
format_string = f"{filename} - query count: {query_count}, bytes processed sum: {total_bytes}"
print(format_string)
cumulative_bytes += total_bytes
cumulative_queries += query_count
print(
"---total queries: {total_queries}, total bytes: {total_bytes}---".format(
total_queries=cumulative_queries, total_bytes=cumulative_bytes
results_dict[filename] = [query_count, total_bytes]
for millis_report in Path("notebooks/").glob("*/*.slotmillis"):
with open(millis_report, "r") as millis_file:
filename = millis_report.stem
lines = millis_file.read().splitlines()
total_slot_millis = sum([int(line) for line in lines])
results_dict[filename] += [total_slot_millis]

cumulative_queries = 0
cumulative_bytes = 0
cumulative_slot_millis = 0
for results in results_dict.values():
if len(results) != 3:
raise IOError(
"Mismatch in performance logging output. "
"Expected one .bytesprocessed and one .slotmillis "
"file for each notebook."
)
query_count, total_bytes, total_slot_millis = results
cumulative_queries += query_count
cumulative_bytes += total_bytes
cumulative_slot_millis += total_slot_millis
print(
f"{filename} - query count: {query_count},"
f" bytes processed sum: {total_bytes},"
f" slot millis sum: {total_slot_millis}"
)

print(
f"---total queries: {cumulative_queries}, "
f"total bytes: {cumulative_bytes}, "
f"total slot millis: {cumulative_slot_millis}---"
)


Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.