Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit fa20a74

Browse filesBrowse files
feat: Add bigframes.execution_history API to track BigQuery jobs (#16588)
This PR promotes execution_history() to the top-level bigframes namespace and upgrades it to track rich metadata for every BigQuery job executed during your session. Key User Benefits: - Easier Access: Call bigframes.execution_history() directly instead of digging into sub-namespaces. - Rich Metadata Tracking: Captures structured statistics for both Query Jobs and Load Jobs including: - job_id and a direct Google Cloud Console URL for easy debugging. - Performance metrics: total_bytes_processed, duration_seconds, and slot_millis. - Query details (truncated preview of the SQL ran). - Clean, Focused Logs: Automatically filters out internal library overhead (like schema validations and index uniqueness checks) so your history only shows the data processing steps you actually care about. Usage Example: ``` 1 import bigframes.pandas as bpd 2 import pandas as pd 3 import bigframes 4 5 # ... run some bigframes operations ... 6 df = bpd.read_gbq("SELECT 1") 7 8 # Upload some local data (triggers a Load Job) 9 bpd.read_pandas(pd.DataFrame({'a': [1, 2, 3]})) 10 11 # Get a DataFrame of all BQ jobs run in this session 12 history = bigframes.execution_history() 13 14 # Inspect recent queries, their costs, and durations 15 print(history[['job_id', 'job_type', 'total_bytes_processed', 'duration_seconds', 'query']]) ``` verified at: 1. vs code notebook: screen/8u2yhaRV9iHbDbF 2. colab notebook: screen/9L8VrP5y9DXhnZz More testcases and notebook update will be checked in using separate PRs for easier review. Fixes #<481840739> 🦕 --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent dec9813 commit fa20a74
Copy full SHA for fa20a74

13 files changed

+420-68Lines changed: 420 additions & 68 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎packages/bigframes/bigframes/__init__.py‎

Copy file name to clipboardExpand all lines: packages/bigframes/bigframes/__init__.py
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from bigframes._config.bigquery_options import BigQueryOptions # noqa: E402
4646
from bigframes.core.global_session import ( # noqa: E402
4747
close_session,
48+
execution_history,
4849
get_global_session,
4950
)
5051
from bigframes.session import Session, connect # noqa: E402
@@ -69,6 +70,7 @@ def load_ipython_extension(ipython):
6970
"BigQueryOptions",
7071
"get_global_session",
7172
"close_session",
73+
"execution_history",
7274
"enums",
7375
"exceptions",
7476
"connect",
Collapse file

‎packages/bigframes/bigframes/core/global_session.py‎

Copy file name to clipboardExpand all lines: packages/bigframes/bigframes/core/global_session.py
+8Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
124124
return func_(get_global_session(), *args, **kwargs)
125125

126126

127+
def execution_history() -> "bigframes.session._ExecutionHistory":
128+
import pandas # noqa: F401
129+
130+
import bigframes.session
131+
132+
return with_default_session(bigframes.session.Session.execution_history)
133+
134+
127135
class _GlobalSessionContext:
128136
"""
129137
Context manager for testing that sets global session.
Collapse file

‎packages/bigframes/bigframes/core/sql/__init__.py‎

Copy file name to clipboardExpand all lines: packages/bigframes/bigframes/core/sql/__init__.py
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
from __future__ import annotations
1514

1615
"""
1716
Utility functions for SQL construction.
1817
"""
1918

19+
from __future__ import annotations
20+
2021
import json
2122
from typing import (
2223
TYPE_CHECKING,
Collapse file

‎packages/bigframes/bigframes/session/__init__.py‎

Copy file name to clipboardExpand all lines: packages/bigframes/bigframes/session/__init__.py
+48Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,46 @@
109109
logger = logging.getLogger(__name__)
110110

111111

112+
class _ExecutionHistory:
113+
def __init__(self, jobs: list[dict]):
114+
self._df = pandas.DataFrame(jobs)
115+
116+
def to_dataframe(self) -> pandas.DataFrame:
117+
"""Returns the execution history as a pandas DataFrame."""
118+
return self._df
119+
120+
def _repr_html_(self) -> str | None:
121+
import bigframes.formatting_helpers as formatter
122+
123+
if self._df.empty:
124+
return "<div>No executions found.</div>"
125+
126+
cols = ["job_type", "job_id", "status", "total_bytes_processed", "job_url"]
127+
128+
# Filter columns to only those that exist in the dataframe
129+
available_cols = [c for c in cols if c in self._df.columns]
130+
131+
def format_url(url):
132+
return f'<a target="_blank" href="{url}">Open Job</a>' if url else ""
133+
134+
try:
135+
df_display = self._df[available_cols].copy()
136+
if "total_bytes_processed" in df_display.columns:
137+
df_display["total_bytes_processed"] = df_display[
138+
"total_bytes_processed"
139+
].apply(formatter.get_formatted_bytes)
140+
if "job_url" in df_display.columns:
141+
df_display["job_url"] = df_display["job_url"].apply(format_url)
142+
143+
# Rename job_id to query_id to match user expectations
144+
if "job_id" in df_display.columns:
145+
df_display = df_display.rename(columns={"job_id": "query_id"})
146+
147+
return df_display.to_html(escape=False, index=False)
148+
except Exception:
149+
return self._df.to_html()
150+
151+
112152
@log_adapter.class_logger
113153
class Session(
114154
third_party_pandas_gbq.GBQIOMixin,
@@ -233,6 +273,7 @@ def __init__(
233273
)
234274

235275
self._metrics = metrics.ExecutionMetrics()
276+
self._publisher.subscribe(self._metrics.on_event)
236277
self._function_session = bff_session.FunctionSession()
237278
self._anon_dataset_manager = anonymous_dataset.AnonymousDatasetManager(
238279
self._clients_provider.bqclient,
@@ -371,6 +412,13 @@ def slot_millis_sum(self):
371412
"""The sum of all slot time used by bigquery jobs in this session."""
372413
return self._metrics.slot_millis
373414

415+
def execution_history(self) -> _ExecutionHistory:
416+
"""Returns the history of executions initiated by BigFrames in the current session.
417+
418+
Use `.to_dataframe()` on the result to get a pandas DataFrame.
419+
"""
420+
return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs])
421+
374422
@property
375423
def _allows_ambiguity(self) -> bool:
376424
return self._allow_ambiguity
Collapse file

‎packages/bigframes/bigframes/session/loader.py‎

Copy file name to clipboardExpand all lines: packages/bigframes/bigframes/session/loader.py
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import pandas
5353
import pyarrow as pa
5454
from google.cloud import bigquery_storage_v1
55+
from google.cloud.bigquery.job.load import LoadJob
56+
from google.cloud.bigquery.job.query import QueryJob
5557
from google.cloud.bigquery_storage_v1 import (
5658
types as bq_storage_types,
5759
)
@@ -623,6 +625,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
623625
else:
624626
job.result()
625627

628+
if self._metrics is not None and isinstance(job, (QueryJob, LoadJob)):
629+
self._metrics.count_job_stats(query_job=job)
630+
626631
@overload
627632
def read_gbq_table( # type: ignore[overload-overlap]
628633
self,
Collapse file

‎packages/bigframes/bigframes/session/metrics.py‎

Copy file name to clipboardExpand all lines: packages/bigframes/bigframes/session/metrics.py
+186-23Lines changed: 186 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,143 @@
1515
from __future__ import annotations
1616

1717
import dataclasses
18+
import datetime
1819
import os
19-
from typing import Optional, Tuple
20+
from typing import Any, Mapping, Optional, Tuple, Union
2021

2122
import google.cloud.bigquery as bigquery
22-
import google.cloud.bigquery.job as bq_job
2323
import google.cloud.bigquery.table as bq_table
24+
from google.cloud.bigquery.job.load import LoadJob
25+
from google.cloud.bigquery.job.query import QueryJob
2426

2527
LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
2628

2729

30+
@dataclasses.dataclass
31+
class JobMetadata:
32+
job_id: Optional[str] = None
33+
query_id: Optional[str] = None
34+
location: Optional[str] = None
35+
project: Optional[str] = None
36+
creation_time: Optional[datetime.datetime] = None
37+
start_time: Optional[datetime.datetime] = None
38+
end_time: Optional[datetime.datetime] = None
39+
duration_seconds: Optional[float] = None
40+
status: Optional[str] = None
41+
total_bytes_processed: Optional[int] = None
42+
total_slot_ms: Optional[int] = None
43+
job_type: Optional[str] = None
44+
error_result: Optional[Mapping[str, Any]] = None
45+
cached: Optional[bool] = None
46+
job_url: Optional[str] = None
47+
query: Optional[str] = None
48+
destination_table: Optional[str] = None
49+
source_uris: Optional[list[str]] = None
50+
input_files: Optional[int] = None
51+
input_bytes: Optional[int] = None
52+
output_rows: Optional[int] = None
53+
source_format: Optional[str] = None
54+
55+
@classmethod
56+
def from_job(
57+
cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None
58+
) -> "JobMetadata":
59+
query_text = getattr(query_job, "query", None)
60+
if query_text and len(query_text) > 1024:
61+
query_text = query_text[:1021] + "..."
62+
63+
job_id = getattr(query_job, "job_id", None)
64+
job_url = None
65+
if job_id:
66+
job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults"
67+
68+
metadata = cls(
69+
job_id=query_job.job_id,
70+
location=query_job.location,
71+
project=query_job.project,
72+
creation_time=query_job.created,
73+
start_time=query_job.started,
74+
end_time=query_job.ended,
75+
duration_seconds=exec_seconds,
76+
status=query_job.state,
77+
job_type=query_job.job_type,
78+
error_result=query_job.error_result,
79+
query=query_text,
80+
job_url=job_url,
81+
)
82+
if isinstance(query_job, QueryJob):
83+
metadata.cached = getattr(query_job, "cache_hit", None)
84+
metadata.destination_table = (
85+
str(query_job.destination) if query_job.destination else None
86+
)
87+
metadata.total_bytes_processed = getattr(
88+
query_job, "total_bytes_processed", None
89+
)
90+
metadata.total_slot_ms = getattr(query_job, "slot_millis", None)
91+
elif isinstance(query_job, LoadJob):
92+
metadata.output_rows = getattr(query_job, "output_rows", None)
93+
metadata.input_files = getattr(query_job, "input_files", None)
94+
metadata.input_bytes = getattr(query_job, "input_bytes", None)
95+
metadata.destination_table = (
96+
str(query_job.destination)
97+
if getattr(query_job, "destination", None)
98+
else None
99+
)
100+
if getattr(query_job, "source_uris", None):
101+
metadata.source_uris = list(query_job.source_uris)
102+
if query_job.configuration and hasattr(
103+
query_job.configuration, "source_format"
104+
):
105+
metadata.source_format = query_job.configuration.source_format
106+
107+
return metadata
108+
109+
@classmethod
110+
def from_row_iterator(
111+
cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None
112+
) -> "JobMetadata":
113+
query_text = getattr(row_iterator, "query", None)
114+
if query_text and len(query_text) > 1024:
115+
query_text = query_text[:1021] + "..."
116+
117+
job_id = getattr(row_iterator, "job_id", None)
118+
job_url = None
119+
if job_id:
120+
project = getattr(row_iterator, "project", "")
121+
location = getattr(row_iterator, "location", "")
122+
job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults"
123+
124+
return cls(
125+
job_id=job_id,
126+
query_id=getattr(row_iterator, "query_id", None),
127+
location=getattr(row_iterator, "location", None),
128+
project=getattr(row_iterator, "project", None),
129+
creation_time=getattr(row_iterator, "created", None),
130+
start_time=getattr(row_iterator, "started", None),
131+
end_time=getattr(row_iterator, "ended", None),
132+
duration_seconds=exec_seconds,
133+
status="DONE",
134+
total_bytes_processed=getattr(row_iterator, "total_bytes_processed", None),
135+
total_slot_ms=getattr(row_iterator, "slot_millis", None),
136+
job_type="query",
137+
cached=getattr(row_iterator, "cache_hit", None),
138+
query=query_text,
139+
job_url=job_url,
140+
)
141+
142+
28143
@dataclasses.dataclass
29144
class ExecutionMetrics:
30145
execution_count: int = 0
31146
slot_millis: int = 0
32147
bytes_processed: int = 0
33148
execution_secs: float = 0
34149
query_char_count: int = 0
150+
jobs: list[JobMetadata] = dataclasses.field(default_factory=list)
35151

36152
def count_job_stats(
37153
self,
38-
query_job: Optional[bq_job.QueryJob] = None,
154+
query_job: Optional[Union[QueryJob, LoadJob]] = None,
39155
row_iterator: Optional[bq_table.RowIterator] = None,
40156
):
41157
if query_job is None:
@@ -57,41 +173,88 @@ def count_job_stats(
57173
self.slot_millis += slot_millis
58174
self.execution_secs += exec_seconds
59175

60-
elif query_job.configuration.dry_run:
61-
query_char_count = len(query_job.query)
176+
self.jobs.append(
177+
JobMetadata.from_row_iterator(row_iterator, exec_seconds=exec_seconds)
178+
)
179+
180+
elif isinstance(query_job, QueryJob) and query_job.configuration.dry_run:
181+
query_char_count = len(getattr(query_job, "query", ""))
62182

63183
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
64184
bytes_processed = 0
65185
slot_millis = 0
66186
exec_seconds = 0.0
67187

68-
elif (stats := get_performance_stats(query_job)) is not None:
69-
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
188+
elif isinstance(query_job, bigquery.QueryJob):
189+
if (stats := get_performance_stats(query_job)) is not None:
190+
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
191+
self.execution_count += 1
192+
self.query_char_count += query_char_count or 0
193+
self.bytes_processed += bytes_processed or 0
194+
self.slot_millis += slot_millis or 0
195+
self.execution_secs += exec_seconds or 0
196+
197+
metadata = JobMetadata.from_job(query_job, exec_seconds=exec_seconds)
198+
self.jobs.append(metadata)
199+
200+
else:
70201
self.execution_count += 1
71-
self.query_char_count += query_char_count or 0
72-
self.bytes_processed += bytes_processed or 0
73-
self.slot_millis += slot_millis or 0
74-
self.execution_secs += exec_seconds or 0
202+
duration = (
203+
(query_job.ended - query_job.created).total_seconds()
204+
if query_job.ended and query_job.created
205+
else None
206+
)
207+
self.jobs.append(JobMetadata.from_job(query_job, exec_seconds=duration))
208+
209+
# For pytest runs only, log information about the query job
210+
# to a file in order to create a performance report.
211+
if (
212+
isinstance(query_job, bigquery.QueryJob)
213+
and not query_job.configuration.dry_run
214+
):
215+
stats = get_performance_stats(query_job)
216+
if stats:
217+
write_stats_to_disk(
218+
query_char_count=stats[0],
219+
bytes_processed=stats[1],
220+
slot_millis=stats[2],
221+
exec_seconds=stats[3],
222+
)
223+
elif row_iterator is not None:
224+
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
225+
query_char_count = len(getattr(row_iterator, "query", "") or "")
226+
slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
227+
created = getattr(row_iterator, "created", None)
228+
ended = getattr(row_iterator, "ended", None)
229+
exec_seconds = (
230+
(ended - created).total_seconds() if created and ended else 0.0
231+
)
75232
write_stats_to_disk(
76233
query_char_count=query_char_count,
77234
bytes_processed=bytes_processed,
78235
slot_millis=slot_millis,
79236
exec_seconds=exec_seconds,
80237
)
81238

82-
else:
83-
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
84-
bytes_processed = 0
85-
query_char_count = 0
86-
slot_millis = 0
87-
exec_seconds = 0
239+
def on_event(self, event: Any):
240+
try:
241+
import bigframes.core.events
242+
from bigframes.session.executor import LocalExecuteResult
243+
except ImportError:
244+
return
88245

89-
write_stats_to_disk(
90-
query_char_count=query_char_count,
91-
bytes_processed=bytes_processed,
92-
slot_millis=slot_millis,
93-
exec_seconds=exec_seconds,
94-
)
246+
if isinstance(event, bigframes.core.events.ExecutionFinished):
247+
if event.result and isinstance(event.result, LocalExecuteResult):
248+
self.execution_count += 1
249+
bytes_processed = event.result.total_bytes_processed or 0
250+
self.bytes_processed += bytes_processed
251+
252+
metadata = JobMetadata(
253+
job_type="polars",
254+
status="DONE",
255+
total_bytes_processed=bytes_processed,
256+
)
257+
self.jobs.append(metadata)
95258

96259

97260
def get_performance_stats(

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.