diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 3c9bb003cc..3120e96b1a 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -597,10 +597,13 @@ def read_pickle( read_pickle.__doc__ = inspect.getdoc(bigframes.session.Session.read_pickle) -def read_parquet(path: str | IO["bytes"]) -> bigframes.dataframe.DataFrame: +def read_parquet( + path: str | IO["bytes"], *, engine: str = "auto" +) -> bigframes.dataframe.DataFrame: return global_session.with_default_session( bigframes.session.Session.read_parquet, path, + engine=engine, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index ef4a349244..4b30a3a9d1 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1130,19 +1130,32 @@ def read_pickle( def read_parquet( self, path: str | IO["bytes"], + *, + engine: str = "auto", ) -> dataframe.DataFrame: - # Note: "engine" is omitted because it is redundant. Loading a table - # from a pandas DataFrame will just create another parquet file + load - # job anyway. table = bigframes_io.random_table(self._anonymous_dataset) - job_config = bigquery.LoadJobConfig() - job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED - job_config.source_format = bigquery.SourceFormat.PARQUET - job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY - job_config.labels = {"bigframes-api": "read_parquet"} + if engine == "bigquery": + job_config = bigquery.LoadJobConfig() + job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED + job_config.source_format = bigquery.SourceFormat.PARQUET + job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY + job_config.labels = {"bigframes-api": "read_parquet"} - return self._read_bigquery_load_job(path, table, job_config=job_config) + return self._read_bigquery_load_job(path, table, job_config=job_config) + else: + read_parquet_kwargs: Dict[str, Any] = {} + if pandas.__version__.startswith("1."): + read_parquet_kwargs["use_nullable_dtypes"] = True + else: + read_parquet_kwargs["dtype_backend"] = "pyarrow" + + pandas_obj = pandas.read_parquet( + path, + engine=engine, # type: ignore + **read_parquet_kwargs, + ) + return self._read_pandas(pandas_obj, "read_parquet") def read_json( self, diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 85573472b9..2e2252be06 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -856,11 +856,19 @@ def test_read_pickle_gcs(session, penguins_pandas_df_default_index, gcs_folder): pd.testing.assert_frame_equal(penguins_pandas_df_default_index, df.to_pandas()) -def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder): +@pytest.mark.parametrize( + ("engine",), + ( + ("auto",), + ("bigquery",), + ), +) +def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder, engine): scalars_df, _ = scalars_dfs # Include wildcard so that multiple files can be written/read if > 1 GB. # https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files path = gcs_folder + test_read_parquet_gcs.__name__ + "*.parquet" + df_in: bigframes.dataframe.DataFrame = scalars_df.copy() # GEOGRAPHY not supported in parquet export. df_in = df_in.drop(columns="geography_col") @@ -869,8 +877,12 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder): df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}" df_write.to_parquet(path, index=True) + # Only bigquery engine for reads supports wildcards in path name. + if engine != "bigquery": + path = path.replace("*", "000000000000") + df_out = ( - session.read_parquet(path) + session.read_parquet(path, engine=engine) # Restore order. .set_index(df_write.index.name).sort_index() # Restore index. @@ -880,7 +892,8 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder): # DATETIME gets loaded as TIMESTAMP in parquet. See: # https://cloud.google.com/bigquery/docs/exporting-data#parquet_export_details df_out = df_out.assign( - datetime_col=df_out["datetime_col"].astype("timestamp[us][pyarrow]") + datetime_col=df_out["datetime_col"].astype("timestamp[us][pyarrow]"), + timestamp_col=df_out["timestamp_col"].astype("timestamp[us, tz=UTC][pyarrow]"), ) # Make sure we actually have at least some values before comparing. @@ -919,7 +932,7 @@ def test_read_parquet_gcs_compressed( df_write.to_parquet(path, compression=compression, index=True) df_out = ( - session.read_parquet(path) + session.read_parquet(path, engine="bigquery") # Restore order. .set_index(df_write.index.name).sort_index() # Restore index. diff --git a/third_party/bigframes_vendored/pandas/io/parquet.py b/third_party/bigframes_vendored/pandas/io/parquet.py index 0f664e70fc..877a384b6d 100644 --- a/third_party/bigframes_vendored/pandas/io/parquet.py +++ b/third_party/bigframes_vendored/pandas/io/parquet.py @@ -9,6 +9,8 @@ class ParquetIOMixin: def read_parquet( self, path: str, + *, + engine: str = "auto", ): r"""Load a Parquet object from the file path (local or Cloud Storage), returning a DataFrame. @@ -23,11 +25,15 @@ def read_parquet( >>> bpd.options.display.progress_bar = None >>> gcs_path = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet" - >>> df = bpd.read_parquet(path=gcs_path) + >>> df = bpd.read_parquet(path=gcs_path, engine="bigquery") Args: path (str): Local or Cloud Storage path to Parquet file. + engine (str): + One of ``'auto', 'pyarrow', 'fastparquet'``, or ``'bigquery'``. + Parquet library to parse the file. If set to ``'bigquery'``, + order is not preserved. Default, ``'auto'``. Returns: bigframes.dataframe.DataFrame: A BigQuery DataFrames.