5252import pandas
5353import pyarrow as pa
5454from google .cloud import bigquery_storage_v1
55- from google .cloud .bigquery_storage_v1 import types as bq_storage_types
55+ from google .cloud .bigquery_storage_v1 import (
56+ types as bq_storage_types ,
57+ writer as bq_storage_writer ,
58+ )
5659
5760import bigframes ._tools
5861import bigframes ._tools .strings
@@ -520,38 +523,51 @@ def write_data(
520523 )
521524 serialized_schema = schema .serialize ().to_pybytes ()
522525
523- def stream_worker (work : Iterator [pa .RecordBatch ]) -> str :
526+ def stream_worker (
527+ work : Iterator [pa .RecordBatch ], max_outstanding : int = 5
528+ ) -> str :
524529 requested_stream = bq_storage_types .WriteStream (
525530 type_ = bq_storage_types .WriteStream .Type .PENDING
526531 )
527532 stream = self ._write_client .create_write_stream (
528533 parent = parent , write_stream = requested_stream
529534 )
530- stream_name = stream .name
535+ base_request = bq_storage_types .AppendRowsRequest (
536+ write_stream = stream .name ,
537+ )
538+ base_request .arrow_rows .writer_schema .serialized_schema = serialized_schema
531539
532- def request_generator ():
533- current_offset = 0
534- for batch in work :
535- request = bq_storage_types .AppendRowsRequest (
536- write_stream = stream .name , offset = current_offset
537- )
540+ stream_manager = bq_storage_writer .AppendRowsStream (
541+ client = self ._write_client , initial_request_template = base_request
542+ )
543+ stream_name = stream .name
544+ current_offset = 0
545+ futures : list [bq_storage_writer .AppendRowsFuture ] = []
546+
547+ for batch in work :
548+ if len (futures ) >= max_outstanding :
549+ row_errors = futures .pop (0 ).result ().row_errors
550+ if row_errors :
551+ raise ValueError (
552+ f"Problem loading rows: { row_errors } . { constants .FEEDBACK_LINK } "
553+ )
538554
539- request .arrow_rows .writer_schema .serialized_schema = (
540- serialized_schema
541- )
542- request .arrow_rows .rows .serialized_record_batch = (
543- batch .serialize ().to_pybytes ()
544- )
555+ request = bq_storage_types .AppendRowsRequest (offset = current_offset )
556+ request .arrow_rows .rows .serialized_record_batch = (
557+ batch .serialize ().to_pybytes ()
558+ )
545559
546- yield request
547- current_offset += batch .num_rows
560+ futures . append ( stream_manager . send ( request ))
561+ current_offset += batch .num_rows
548562
549- responses = self . _write_client . append_rows ( requests = request_generator ())
550- for resp in responses :
551- if resp . row_errors :
563+ for future in futures :
564+ row_errors = future . result (). row_errors
565+ if row_errors :
552566 raise ValueError (
553- f"Errors in stream { stream_name } : { resp . row_errors } "
567+ f"Problem loading rows: { row_errors } . { constants . FEEDBACK_LINK } "
554568 )
569+
570+ stream_manager .close ()
555571 self ._write_client .finalize_write_stream (name = stream_name )
556572 return stream_name
557573
0 commit comments