Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: Support compression in to_parquet #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 12, 2023
18 changes: 16 additions & 2 deletions 18 bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import typing
from typing import (
Callable,
Dict,
Iterable,
List,
Literal,
Expand Down Expand Up @@ -2270,7 +2271,13 @@ def to_numpy(

__array__ = to_numpy

def to_parquet(self, path: str, *, index: bool = True) -> None:
def to_parquet(
self,
path: str,
*,
compression: Optional[Literal["snappy", "gzip"]] = "snappy",
index: bool = True,
) -> None:
# TODO(swast): Can we support partition columns argument?
# TODO(chelsealin): Support local file paths.
# TODO(swast): Some warning that wildcard is recommended for large
Expand All @@ -2282,14 +2289,21 @@ def to_parquet(self, path: str, *, index: bool = True) -> None:
if "*" not in path:
raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD)

if compression not in {None, "snappy", "gzip"}:
raise ValueError("'{0}' is not valid for compression".format(compression))

export_options: Dict[str, Union[bool, str]] = {}
if compression:
export_options["compression"] = compression.upper()

result_table = self._run_io_query(
index=index, ordering_id=bigframes.core.io.IO_ORDERING_ID
)
export_data_statement = bigframes.core.io.create_export_data_statement(
f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}",
uri=path,
format="PARQUET",
export_options={},
export_options=export_options,
)
_, query_job = self._block.expr._session._start_query(export_data_statement)
self._set_internal_query_job(query_job)
Expand Down
85 changes: 84 additions & 1 deletion 85 tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder):
scalars_df, _ = scalars_dfs
# Include wildcard so that multiple files can be written/read if > 1 GB.
# https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files
path = gcs_folder + "test_read_parquet_gcs*.parquet"
path = gcs_folder + test_read_parquet_gcs.__name__ + "*.parquet"
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
# GEOGRAPHY not supported in parquet export.
df_in = df_in.drop(columns="geography_col")
Expand Down Expand Up @@ -823,6 +823,89 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder):
pd.testing.assert_frame_equal(pd_df_in, pd_df_out)


@pytest.mark.parametrize(
"compression",
[
None,
"gzip",
"snappy",
],
)
def test_read_parquet_gcs_compressed(
session: bigframes.Session, scalars_dfs, gcs_folder, compression
):
scalars_df, _ = scalars_dfs
# Include wildcard so that multiple files can be written/read if > 1 GB.
# https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files
path = (
gcs_folder
+ test_read_parquet_gcs_compressed.__name__
+ (f"_{compression}" if compression else "")
+ "*.parquet"
)
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
# GEOGRAPHY not supported in parquet export.
df_in = df_in.drop(columns="geography_col")
# Make sure we can also serialize the order.
df_write = df_in.reset_index(drop=False)
df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}"
df_write.to_parquet(path, compression=compression, index=True)

df_out = (
session.read_parquet(path)
# Restore order.
.set_index(df_write.index.name).sort_index()
# Restore index.
.set_index(typing.cast(str, df_in.index.name))
)

# DATETIME gets loaded as TIMESTAMP in parquet. See:
# https://cloud.google.com/bigquery/docs/exporting-data#parquet_export_details
df_out = df_out.assign(
datetime_col=df_out["datetime_col"].astype("timestamp[us][pyarrow]")
)

# Make sure we actually have at least some values before comparing.
assert df_out.size != 0
pd_df_in = df_in.to_pandas()
pd_df_out = df_out.to_pandas()
pd.testing.assert_frame_equal(pd_df_in, pd_df_out)


@pytest.mark.parametrize(
"compression",
[
"brotli",
"lz4",
"zstd",
"unknown",
],
)
def test_read_parquet_gcs_compression_not_supported(
session: bigframes.Session, scalars_dfs, gcs_folder, compression
):
scalars_df, _ = scalars_dfs
# Include wildcard so that multiple files can be written/read if > 1 GB.
# https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files
path = (
gcs_folder
+ test_read_parquet_gcs_compression_not_supported.__name__
+ (f"_{compression}" if compression else "")
+ "*.parquet"
)
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
# GEOGRAPHY not supported in parquet export.
df_in = df_in.drop(columns="geography_col")
# Make sure we can also serialize the order.
df_write = df_in.reset_index(drop=False)
df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}"

with pytest.raises(
ValueError, match=f"'{compression}' is not valid for compression"
):
df_write.to_parquet(path, compression=compression, index=True)


def test_read_json_gcs_bq_engine(session, scalars_dfs, gcs_folder):
scalars_df, _ = scalars_dfs
path = gcs_folder + "test_read_json_gcs_bq_engine_w_index*.json"
Expand Down
5 changes: 5 additions & 0 deletions 5 third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def to_parquet(
self,
path: str,
*,
compression: Optional[Literal["snappy", "gzip"]] = "snappy",
index: bool = True,
) -> None:
"""Write a DataFrame to the binary Parquet format.
Expand All @@ -143,6 +144,10 @@ def to_parquet(
If the data size is more than 1GB, you must use a wildcard to export
the data into multiple files and the size of the files varies.

compression (str, default 'snappy'):
Name of the compression to use. Use ``None`` for no compression.
Supported options: ``'gzip'``, ``'snappy'``.

index (bool, default True):
If ``True``, include the dataframe's index(es) in the file output.
If ``False``, they will not be written to the file.
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.