Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on May 7, 2026. It is now read-only.

Commit 96597f0

Browse filesBrowse files
perf: Make executor data uploads async internally (#2529)
1 parent 915cce5 commit 96597f0
Copy full SHA for 96597f0

2 files changed

+30-14Lines changed: 30 additions & 14 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎bigframes/session/bq_caching_executor.py‎

Copy file name to clipboardExpand all lines: bigframes/session/bq_caching_executor.py
+18-13Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from __future__ import annotations
1616

17+
import concurrent.futures
1718
import math
1819
import threading
1920
from typing import Literal, Mapping, Optional, Sequence, Tuple
@@ -514,13 +515,29 @@ def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode):
514515
Replace large local sources with the uploaded version of those datasources.
515516
"""
516517
# Step 1: Upload all previously un-uploaded data
518+
needs_upload = []
517519
for leaf in original_root.unique_nodes():
518520
if isinstance(leaf, nodes.ReadLocalNode):
519521
if (
520522
leaf.local_data_source.metadata.total_bytes
521523
> bigframes.constants.MAX_INLINE_BYTES
522524
):
523-
self._upload_local_data(leaf.local_data_source)
525+
needs_upload.append(leaf.local_data_source)
526+
527+
futures: dict[concurrent.futures.Future, local_data.ManagedArrowTable] = dict()
528+
for local_source in needs_upload:
529+
future = self.loader.read_data_async(
530+
local_source, bigframes.core.guid.generate_guid()
531+
)
532+
futures[future] = local_source
533+
try:
534+
for future in concurrent.futures.as_completed(futures.keys()):
535+
self.cache.cache_remote_replacement(futures[future], future.result())
536+
except Exception as e:
537+
# cancel all futures
538+
for future in futures:
539+
future.cancel()
540+
raise e
524541

525542
# Step 2: Replace local scans with remote scans
526543
def map_local_scans(node: nodes.BigFrameNode):
@@ -550,18 +567,6 @@ def map_local_scans(node: nodes.BigFrameNode):
550567

551568
return original_root.bottom_up(map_local_scans)
552569

553-
def _upload_local_data(self, local_table: local_data.ManagedArrowTable):
554-
if self.cache.get_uploaded_local_data(local_table) is not None:
555-
return
556-
# Lock prevents concurrent repeated work, but slows things down.
557-
# Might be better as a queue and a worker thread
558-
with self._upload_lock:
559-
if self.cache.get_uploaded_local_data(local_table) is None:
560-
uploaded = self.loader.load_data_or_write_data(
561-
local_table, bigframes.core.guid.generate_guid()
562-
)
563-
self.cache.cache_remote_replacement(local_table, uploaded)
564-
565570
def _execute_plan_gbq(
566571
self,
567572
plan: nodes.BigFrameNode,
Collapse file

‎bigframes/session/loader.py‎

Copy file name to clipboardExpand all lines: bigframes/session/loader.py
+12-1Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,17 @@ def __init__(
300300
self._session = session
301301
self._clock = session_time.BigQuerySyncedClock(bqclient)
302302
self._clock.sync()
303+
self._threadpool = concurrent.futures.ThreadPoolExecutor(
304+
max_workers=1, thread_name_prefix="bigframes-loader"
305+
)
306+
307+
def read_data_async(
308+
self, local_data: local_data.ManagedArrowTable, offsets_col: str
309+
) -> concurrent.futures.Future[bq_data.BigqueryDataSource]:
310+
future = self._threadpool.submit(
311+
self._load_data_or_write_data, local_data, offsets_col
312+
)
313+
return future
303314

304315
def read_pandas(
305316
self,
@@ -350,7 +361,7 @@ def read_managed_data(
350361
session=self._session,
351362
)
352363

353-
def load_data_or_write_data(
364+
def _load_data_or_write_data(
354365
self,
355366
data: local_data.ManagedArrowTable,
356367
offsets_col: str,

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.