Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

perf: inline read_pandas for small data #383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions 16 bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@
import bigframes.session


# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type.
# TODO(tbergeron): Convert to bytes-based limit
MAX_INLINE_DF_SIZE = 5000

LevelType = typing.Hashable
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]
SingleItemValue = Union[bigframes.series.Series, int, float, Callable]
Expand Down Expand Up @@ -170,17 +166,7 @@ def __init__(
columns=columns, # type:ignore
dtype=dtype, # type:ignore
)
if (
pd_dataframe.size < MAX_INLINE_DF_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
dt.pyarrow_dtype
for dt in pd_dataframe.dtypes
if isinstance(dt, pandas.ArrowDtype)
)
):
self._block = blocks.Block.from_local(pd_dataframe)
elif session:
if session:
self._block = session.read_pandas(pd_dataframe)._get_block()
else:
self._block = bigframes.pandas.read_pandas(pd_dataframe)._get_block()
Expand Down
16 changes: 1 addition & 15 deletions 16 bigframes/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@
import bigframes.session
import third_party.bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing

# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type.
# TODO(tbergeron): Convert to bytes-based limit
MAX_INLINE_SERIES_SIZE = 5000


class SeriesMethods:
def __init__(
Expand Down Expand Up @@ -104,17 +100,7 @@ def __init__(
if pd_series.name is None:
# to_frame will set default numeric column label if unnamed, but we do not support int column label, so must rename
pd_dataframe = pd_dataframe.set_axis(["unnamed_col"], axis=1)
if (
pd_dataframe.size < MAX_INLINE_SERIES_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
dt.pyarrow_dtype
for dt in pd_dataframe.dtypes
if isinstance(dt, pd.ArrowDtype)
)
):
block = blocks.Block.from_local(pd_dataframe)
elif session:
if session:
block = session.read_pandas(pd_dataframe)._get_block()
else:
# Uses default global session
Expand Down
33 changes: 30 additions & 3 deletions 33 bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@
"UTF-32LE",
}

# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type.
# TODO(tbergeron): Convert to bytes-based limit
MAX_INLINE_DF_SIZE = 5000

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -882,6 +886,29 @@ def read_pandas(self, pandas_dataframe: pandas.DataFrame) -> dataframe.DataFrame

def _read_pandas(
self, pandas_dataframe: pandas.DataFrame, api_name: str
) -> dataframe.DataFrame:
if (
pandas_dataframe.size < MAX_INLINE_DF_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
(
isinstance(s.dtype, pandas.ArrowDtype)
or (len(s) > 0 and pandas.api.types.is_list_like(s.iloc[0]))
or pandas.api.types.is_datetime64_any_dtype(s)
)
for _, s in pandas_dataframe.items()
)
):
return self._read_pandas_inline(pandas_dataframe)
return self._read_pandas_load_job(pandas_dataframe, api_name)

def _read_pandas_inline(
self, pandas_dataframe: pandas.DataFrame
) -> dataframe.DataFrame:
return dataframe.DataFrame(blocks.Block.from_local(pandas_dataframe))

def _read_pandas_load_job(
self, pandas_dataframe: pandas.DataFrame, api_name: str
) -> dataframe.DataFrame:
col_labels, idx_labels = (
pandas_dataframe.columns.to_list(),
Expand Down Expand Up @@ -1079,7 +1106,7 @@ def read_csv(
encoding=encoding,
**kwargs,
)
return self.read_pandas(pandas_df) # type: ignore
return self._read_pandas(pandas_df, "read_csv") # type: ignore

def read_pickle(
self,
Expand All @@ -1096,7 +1123,7 @@ def read_pickle(
if isinstance(pandas_obj, pandas.Series):
if pandas_obj.name is None:
pandas_obj.name = "0"
bigframes_df = self.read_pandas(pandas_obj.to_frame())
bigframes_df = self._read_pandas(pandas_obj.to_frame(), "read_pickle")
return bigframes_df[bigframes_df.columns[0]]
return self._read_pandas(pandas_obj, "read_pickle")

Expand Down Expand Up @@ -1196,7 +1223,7 @@ def read_json(
engine=engine,
**kwargs,
)
return self.read_pandas(pandas_df)
return self._read_pandas(pandas_df, "read_json")

def _check_file_size(self, filepath: str):
max_size = 1024 * 1024 * 1024 # 1 GB in bytes
Expand Down
9 changes: 8 additions & 1 deletion 9 tests/system/small/test_progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import re
import tempfile

import numpy as np
import pandas as pd

import bigframes as bf
import bigframes.formatting_helpers as formatting_helpers
from bigframes.session import MAX_INLINE_DF_SIZE

job_load_message_regex = r"\w+ job [\w-]+ is \w+\."

Expand Down Expand Up @@ -66,10 +68,15 @@ def test_progress_bar_extract_jobs(
def test_progress_bar_load_jobs(
session: bf.Session, penguins_pandas_df_default_index: pd.DataFrame, capsys
):
# repeat the DF to be big enough to trigger the load job.
df = penguins_pandas_df_default_index
while len(df) < MAX_INLINE_DF_SIZE:
df = pd.DataFrame(np.repeat(df.values, 2, axis=0))

bf.options.display.progress_bar = "terminal"
with tempfile.TemporaryDirectory() as dir:
path = dir + "/test_read_csv_progress_bar*.csv"
penguins_pandas_df_default_index.to_csv(path, index=False)
df.to_csv(path, index=False)
capsys.readouterr() # clear output
session.read_csv(path)

Expand Down
16 changes: 11 additions & 5 deletions 16 tests/unit/session/test_io_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from bigframes.core import log_adapter
import bigframes.pandas as bpd
import bigframes.session._io.bigquery as io_bq
from tests.unit import resources


def test_create_job_configs_labels_is_none():
Expand Down Expand Up @@ -64,7 +65,9 @@ def test_create_job_configs_labels_log_adaptor_call_method_under_length_limit():
"bigframes-api": "read_pandas",
"source": "bigquery-dataframes-temp",
}
df = bpd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
df = bpd.DataFrame(
{"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session()
)
# Test running two methods
df.head()
df.max()
Expand All @@ -81,15 +84,16 @@ def test_create_job_configs_labels_log_adaptor_call_method_under_length_limit():
"recent-bigframes-api-2": "dataframe-__init__",
"recent-bigframes-api-3": "dataframe-head",
"recent-bigframes-api-4": "dataframe-__init__",
"recent-bigframes-api-5": "dataframe-__init__",
}
assert labels is not None
assert len(labels) == 7
assert labels == expected_dict


def test_create_job_configs_labels_length_limit_met_and_labels_is_none():
log_adapter.get_and_reset_api_methods()
df = bpd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
df = bpd.DataFrame(
{"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session()
)
# Test running methods more than the labels' length limit
for i in range(66):
df.head()
Expand All @@ -114,7 +118,9 @@ def test_create_job_configs_labels_length_limit_met():
value = f"test{i}"
cur_labels[key] = value
# If cur_labels length is 62, we can only add one label from api_methods
df = bpd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
df = bpd.DataFrame(
{"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session()
)
# Test running two methods
df.head()
df.max()
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.