diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 3ec1b4b617..015a7642f8 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -21,6 +21,7 @@ import typing from typing import ( Callable, + Dict, Iterable, List, Literal, @@ -2270,7 +2271,13 @@ def to_numpy( __array__ = to_numpy - def to_parquet(self, path: str, *, index: bool = True) -> None: + def to_parquet( + self, + path: str, + *, + compression: Optional[Literal["snappy", "gzip"]] = "snappy", + index: bool = True, + ) -> None: # TODO(swast): Can we support partition columns argument? # TODO(chelsealin): Support local file paths. # TODO(swast): Some warning that wildcard is recommended for large @@ -2282,6 +2289,13 @@ def to_parquet(self, path: str, *, index: bool = True) -> None: if "*" not in path: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) + if compression not in {None, "snappy", "gzip"}: + raise ValueError("'{0}' is not valid for compression".format(compression)) + + export_options: Dict[str, Union[bool, str]] = {} + if compression: + export_options["compression"] = compression.upper() + result_table = self._run_io_query( index=index, ordering_id=bigframes.core.io.IO_ORDERING_ID ) @@ -2289,7 +2303,7 @@ def to_parquet(self, path: str, *, index: bool = True) -> None: f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}", uri=path, format="PARQUET", - export_options={}, + export_options=export_options, ) _, query_job = self._block.expr._session._start_query(export_data_statement) self._set_internal_query_job(query_job) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 7655325bfc..bfe9bc8d0f 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -793,7 +793,7 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder): scalars_df, _ = scalars_dfs # Include wildcard so that multiple files can be written/read if > 1 GB. # https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files - path = gcs_folder + "test_read_parquet_gcs*.parquet" + path = gcs_folder + test_read_parquet_gcs.__name__ + "*.parquet" df_in: bigframes.dataframe.DataFrame = scalars_df.copy() # GEOGRAPHY not supported in parquet export. df_in = df_in.drop(columns="geography_col") @@ -823,6 +823,89 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder): pd.testing.assert_frame_equal(pd_df_in, pd_df_out) +@pytest.mark.parametrize( + "compression", + [ + None, + "gzip", + "snappy", + ], +) +def test_read_parquet_gcs_compressed( + session: bigframes.Session, scalars_dfs, gcs_folder, compression +): + scalars_df, _ = scalars_dfs + # Include wildcard so that multiple files can be written/read if > 1 GB. + # https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files + path = ( + gcs_folder + + test_read_parquet_gcs_compressed.__name__ + + (f"_{compression}" if compression else "") + + "*.parquet" + ) + df_in: bigframes.dataframe.DataFrame = scalars_df.copy() + # GEOGRAPHY not supported in parquet export. + df_in = df_in.drop(columns="geography_col") + # Make sure we can also serialize the order. + df_write = df_in.reset_index(drop=False) + df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}" + df_write.to_parquet(path, compression=compression, index=True) + + df_out = ( + session.read_parquet(path) + # Restore order. + .set_index(df_write.index.name).sort_index() + # Restore index. + .set_index(typing.cast(str, df_in.index.name)) + ) + + # DATETIME gets loaded as TIMESTAMP in parquet. See: + # https://cloud.google.com/bigquery/docs/exporting-data#parquet_export_details + df_out = df_out.assign( + datetime_col=df_out["datetime_col"].astype("timestamp[us][pyarrow]") + ) + + # Make sure we actually have at least some values before comparing. + assert df_out.size != 0 + pd_df_in = df_in.to_pandas() + pd_df_out = df_out.to_pandas() + pd.testing.assert_frame_equal(pd_df_in, pd_df_out) + + +@pytest.mark.parametrize( + "compression", + [ + "brotli", + "lz4", + "zstd", + "unknown", + ], +) +def test_read_parquet_gcs_compression_not_supported( + session: bigframes.Session, scalars_dfs, gcs_folder, compression +): + scalars_df, _ = scalars_dfs + # Include wildcard so that multiple files can be written/read if > 1 GB. + # https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files + path = ( + gcs_folder + + test_read_parquet_gcs_compression_not_supported.__name__ + + (f"_{compression}" if compression else "") + + "*.parquet" + ) + df_in: bigframes.dataframe.DataFrame = scalars_df.copy() + # GEOGRAPHY not supported in parquet export. + df_in = df_in.drop(columns="geography_col") + # Make sure we can also serialize the order. + df_write = df_in.reset_index(drop=False) + df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}" + + with pytest.raises( + ValueError, match=f"'{compression}' is not valid for compression" + ): + df_write.to_parquet(path, compression=compression, index=True) + + def test_read_json_gcs_bq_engine(session, scalars_dfs, gcs_folder): scalars_df, _ = scalars_dfs path = gcs_folder + "test_read_json_gcs_bq_engine_w_index*.json" diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 80a5428b36..e54f984d59 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -129,6 +129,7 @@ def to_parquet( self, path: str, *, + compression: Optional[Literal["snappy", "gzip"]] = "snappy", index: bool = True, ) -> None: """Write a DataFrame to the binary Parquet format. @@ -143,6 +144,10 @@ def to_parquet( If the data size is more than 1GB, you must use a wildcard to export the data into multiple files and the size of the files varies. + compression (str, default 'snappy'): + Name of the compression to use. Use ``None`` for no compression. + Supported options: ``'gzip'``, ``'snappy'``. + index (bool, default True): If ``True``, include the dataframe's index(es) in the file output. If ``False``, they will not be written to the file.