Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit ef856b0

Browse filesBrowse files
perf(bigframes): Improve write api upload throughput (#16641)
1 parent 3cc859c commit ef856b0
Copy full SHA for ef856b0

1 file changed

+37-21Lines changed: 37 additions & 21 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
  • packages/bigframes/bigframes/session
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎packages/bigframes/bigframes/session/loader.py‎

Copy file name to clipboardExpand all lines: packages/bigframes/bigframes/session/loader.py
+37-21Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@
5252
import pandas
5353
import pyarrow as pa
5454
from google.cloud import bigquery_storage_v1
55-
from google.cloud.bigquery_storage_v1 import types as bq_storage_types
55+
from google.cloud.bigquery_storage_v1 import (
56+
types as bq_storage_types,
57+
writer as bq_storage_writer,
58+
)
5659

5760
import bigframes._tools
5861
import bigframes._tools.strings
@@ -520,38 +523,51 @@ def write_data(
520523
)
521524
serialized_schema = schema.serialize().to_pybytes()
522525

523-
def stream_worker(work: Iterator[pa.RecordBatch]) -> str:
526+
def stream_worker(
527+
work: Iterator[pa.RecordBatch], max_outstanding: int = 5
528+
) -> str:
524529
requested_stream = bq_storage_types.WriteStream(
525530
type_=bq_storage_types.WriteStream.Type.PENDING
526531
)
527532
stream = self._write_client.create_write_stream(
528533
parent=parent, write_stream=requested_stream
529534
)
530-
stream_name = stream.name
535+
base_request = bq_storage_types.AppendRowsRequest(
536+
write_stream=stream.name,
537+
)
538+
base_request.arrow_rows.writer_schema.serialized_schema = serialized_schema
531539

532-
def request_generator():
533-
current_offset = 0
534-
for batch in work:
535-
request = bq_storage_types.AppendRowsRequest(
536-
write_stream=stream.name, offset=current_offset
537-
)
540+
stream_manager = bq_storage_writer.AppendRowsStream(
541+
client=self._write_client, initial_request_template=base_request
542+
)
543+
stream_name = stream.name
544+
current_offset = 0
545+
futures: list[bq_storage_writer.AppendRowsFuture] = []
546+
547+
for batch in work:
548+
if len(futures) >= max_outstanding:
549+
row_errors = futures.pop(0).result().row_errors
550+
if row_errors:
551+
raise ValueError(
552+
f"Problem loading rows: {row_errors}. {constants.FEEDBACK_LINK}"
553+
)
538554

539-
request.arrow_rows.writer_schema.serialized_schema = (
540-
serialized_schema
541-
)
542-
request.arrow_rows.rows.serialized_record_batch = (
543-
batch.serialize().to_pybytes()
544-
)
555+
request = bq_storage_types.AppendRowsRequest(offset=current_offset)
556+
request.arrow_rows.rows.serialized_record_batch = (
557+
batch.serialize().to_pybytes()
558+
)
545559

546-
yield request
547-
current_offset += batch.num_rows
560+
futures.append(stream_manager.send(request))
561+
current_offset += batch.num_rows
548562

549-
responses = self._write_client.append_rows(requests=request_generator())
550-
for resp in responses:
551-
if resp.row_errors:
563+
for future in futures:
564+
row_errors = future.result().row_errors
565+
if row_errors:
552566
raise ValueError(
553-
f"Errors in stream {stream_name}: {resp.row_errors}"
567+
f"Problem loading rows: {row_errors}. {constants.FEEDBACK_LINK}"
554568
)
569+
570+
stream_manager.close()
555571
self._write_client.finalize_write_stream(name=stream_name)
556572
return stream_name
557573

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.