Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on May 7, 2026. It is now read-only.

Commit 6cdf64b

Browse filesBrowse files
fix: Fix issue with stream upload batch size upload limit (#2290)
1 parent c4cb39d commit 6cdf64b
Copy full SHA for 6cdf64b

3 files changed

+86-12Lines changed: 86 additions & 12 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎bigframes/core/local_data.py‎

Copy file name to clipboardExpand all lines: bigframes/core/local_data.py
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,13 @@ def to_arrow(
124124
geo_format: Literal["wkb", "wkt"] = "wkt",
125125
duration_type: Literal["int", "duration"] = "duration",
126126
json_type: Literal["string"] = "string",
127+
max_chunksize: Optional[int] = None,
127128
) -> tuple[pa.Schema, Iterable[pa.RecordBatch]]:
128129
if geo_format != "wkt":
129130
raise NotImplementedError(f"geo format {geo_format} not yet implemented")
130131
assert json_type == "string"
131132

132-
batches = self.data.to_batches()
133+
batches = self.data.to_batches(max_chunksize=max_chunksize)
133134
schema = self.data.schema
134135
if duration_type == "int":
135136
schema = _schema_durations_to_ints(schema)
Collapse file

‎bigframes/session/loader.py‎

Copy file name to clipboardExpand all lines: bigframes/session/loader.py
+47-11Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import datetime
2020
import io
2121
import itertools
22+
import math
2223
import os
2324
import typing
2425
from typing import (
@@ -397,6 +398,15 @@ def stream_data(
397398
offsets_col: str,
398399
) -> bq_data.BigqueryDataSource:
399400
"""Load managed data into bigquery"""
401+
MAX_BYTES = 10000000 # streaming api has 10MB limit
402+
SAFETY_MARGIN = (
403+
40 # Perf seems bad for large chunks, so do 40x smaller than max
404+
)
405+
batch_count = math.ceil(
406+
data.metadata.total_bytes / (MAX_BYTES // SAFETY_MARGIN)
407+
)
408+
rows_per_batch = math.ceil(data.metadata.row_count / batch_count)
409+
400410
schema_w_offsets = data.schema.append(
401411
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
402412
)
@@ -410,16 +420,24 @@ def stream_data(
410420
)
411421
rows_w_offsets = ((*row, offset) for offset, row in enumerate(rows))
412422

413-
for errors in self._bqclient.insert_rows(
414-
load_table_destination,
415-
rows_w_offsets,
416-
selected_fields=bq_schema,
417-
row_ids=map(str, itertools.count()), # used to ensure only-once insertion
418-
):
419-
if errors:
420-
raise ValueError(
421-
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
422-
)
423+
# TODO: don't use batched
424+
batches = _batched(rows_w_offsets, rows_per_batch)
425+
ids_iter = map(str, itertools.count())
426+
427+
for batch in batches:
428+
batch_rows = list(batch)
429+
row_ids = itertools.islice(ids_iter, len(batch_rows))
430+
431+
for errors in self._bqclient.insert_rows(
432+
load_table_destination,
433+
batch_rows,
434+
selected_fields=bq_schema,
435+
row_ids=row_ids, # used to ensure only-once insertion
436+
):
437+
if errors:
438+
raise ValueError(
439+
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
440+
)
423441
destination_table = self._bqclient.get_table(load_table_destination)
424442
return bq_data.BigqueryDataSource(
425443
bq_data.GbqTable.from_table(destination_table),
@@ -434,6 +452,15 @@ def write_data(
434452
offsets_col: str,
435453
) -> bq_data.BigqueryDataSource:
436454
"""Load managed data into bigquery"""
455+
MAX_BYTES = 10000000 # streaming api has 10MB limit
456+
SAFETY_MARGIN = (
457+
4 # aim for 2.5mb to account for row variance, format differences, etc.
458+
)
459+
batch_count = math.ceil(
460+
data.metadata.total_bytes / (MAX_BYTES // SAFETY_MARGIN)
461+
)
462+
rows_per_batch = math.ceil(data.metadata.row_count / batch_count)
463+
437464
schema_w_offsets = data.schema.append(
438465
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
439466
)
@@ -450,7 +477,9 @@ def write_data(
450477

451478
def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]:
452479
schema, batches = data.to_arrow(
453-
offsets_col=offsets_col, duration_type="int"
480+
offsets_col=offsets_col,
481+
duration_type="int",
482+
max_chunksize=rows_per_batch,
454483
)
455484
offset = 0
456485
for batch in batches:
@@ -1332,3 +1361,10 @@ def _validate_dtype_can_load(name: str, column_type: bigframes.dtypes.Dtype):
13321361
f"Nested JSON types, found in column `{name}`: `{column_type}`', "
13331362
f"are currently unsupported for upload. {constants.FEEDBACK_LINK}"
13341363
)
1364+
1365+
1366+
# itertools.batched not available in python <3.12, so we use this instead
1367+
def _batched(iterator: Iterable, n: int) -> Iterable:
1368+
assert n > 0
1369+
while batch := tuple(itertools.islice(iterator, n)):
1370+
yield batch
Collapse file

‎tests/system/large/test_session.py‎

Copy file name to clipboardExpand all lines: tests/system/large/test_session.py
+37Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,50 @@
1717

1818
import google.cloud.bigquery as bigquery
1919
import google.cloud.exceptions
20+
import numpy as np
21+
import pandas as pd
2022
import pytest
2123

2224
import bigframes
2325
import bigframes.pandas as bpd
2426
import bigframes.session._io.bigquery
2527

2628

29+
@pytest.fixture
30+
def large_pd_df():
31+
nrows = 1000000
32+
33+
np_int1 = np.random.randint(0, 1000, size=nrows, dtype=np.int32)
34+
np_int2 = np.random.randint(10000, 20000, size=nrows, dtype=np.int64)
35+
np_bool = np.random.choice([True, False], size=nrows)
36+
np_float1 = np.random.rand(nrows).astype(np.float32)
37+
np_float2 = np.random.normal(loc=50.0, scale=10.0, size=nrows).astype(np.float64)
38+
39+
return pd.DataFrame(
40+
{
41+
"int_col_1": np_int1,
42+
"int_col_2": np_int2,
43+
"bool_col": np_bool,
44+
"float_col_1": np_float1,
45+
"float_col_2": np_float2,
46+
}
47+
)
48+
49+
50+
@pytest.mark.parametrize(
51+
("write_engine"),
52+
[
53+
("bigquery_load"),
54+
("bigquery_streaming"),
55+
("bigquery_write"),
56+
],
57+
)
58+
def test_read_pandas_large_df(session, large_pd_df, write_engine: str):
59+
df = session.read_pandas(large_pd_df, write_engine=write_engine)
60+
assert len(df.peek(5)) == 5
61+
assert len(large_pd_df) == 1000000
62+
63+
2764
def test_close(session: bigframes.Session):
2865
# we will create two tables and confirm that they are deleted
2966
# when the session is closed

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.