Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 6d46cba

Browse filesBrowse files
feat(bigframes): Support loading avro, orc data (#16555)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/google-cloud-python/issues) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕
1 parent ef62df4 commit 6d46cba
Copy full SHA for 6d46cba

4 files changed

+247-3Lines changed: 247 additions & 3 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎packages/bigframes/bigframes/pandas/__init__.py‎

Copy file name to clipboardExpand all lines: packages/bigframes/bigframes/pandas/__init__.py
+8-2Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
_read_gbq_colab,
102102
from_glob_path,
103103
read_arrow,
104+
read_avro,
104105
read_csv,
105106
read_gbq,
106107
read_gbq_function,
@@ -109,6 +110,7 @@
109110
read_gbq_query,
110111
read_gbq_table,
111112
read_json,
113+
read_orc,
112114
read_pandas,
113115
read_parquet,
114116
read_pickle,
@@ -446,8 +448,9 @@ def reset_session():
446448
get_dummies,
447449
merge,
448450
qcut,
449-
read_csv,
450451
read_arrow,
452+
read_avro,
453+
read_csv,
451454
read_gbq,
452455
_read_gbq_colab,
453456
read_gbq_function,
@@ -456,6 +459,7 @@ def reset_session():
456459
read_gbq_query,
457460
read_gbq_table,
458461
read_json,
462+
read_orc,
459463
read_pandas,
460464
read_parquet,
461465
read_pickle,
@@ -481,8 +485,9 @@ def reset_session():
481485
"get_dummies",
482486
"merge",
483487
"qcut",
484-
"read_csv",
485488
"read_arrow",
489+
"read_avro",
490+
"read_csv",
486491
"read_gbq",
487492
"_read_gbq_colab",
488493
"read_gbq_function",
@@ -491,6 +496,7 @@ def reset_session():
491496
"read_gbq_query",
492497
"read_gbq_table",
493498
"read_json",
499+
"read_orc",
494500
"read_pandas",
495501
"read_parquet",
496502
"read_pickle",
Collapse file

‎packages/bigframes/bigframes/pandas/io/api.py‎

Copy file name to clipboardExpand all lines: packages/bigframes/bigframes/pandas/io/api.py
+32Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,21 @@ def read_arrow(pa_table: pa.Table) -> bigframes.dataframe.DataFrame:
9494
return session.read_arrow(pa_table=pa_table)
9595

9696

97+
def read_avro(
98+
path: str | IO["bytes"],
99+
*,
100+
engine: str = "auto",
101+
) -> bigframes.dataframe.DataFrame:
102+
return global_session.with_default_session(
103+
bigframes.session.Session.read_avro,
104+
path,
105+
engine=engine,
106+
)
107+
108+
109+
read_avro.__doc__ = inspect.getdoc(bigframes.session.Session.read_avro)
110+
111+
97112
def read_csv(
98113
filepath_or_buffer: str | IO["bytes"],
99114
*,
@@ -514,6 +529,23 @@ def read_gbq_table(
514529
read_gbq_table.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_table)
515530

516531

532+
def read_orc(
533+
path: str | IO["bytes"],
534+
*,
535+
engine: str = "auto",
536+
write_engine: constants.WriteEngineType = "default",
537+
) -> bigframes.dataframe.DataFrame:
538+
return global_session.with_default_session(
539+
bigframes.session.Session.read_orc,
540+
path,
541+
engine=engine,
542+
write_engine=write_engine,
543+
)
544+
545+
546+
read_orc.__doc__ = inspect.getdoc(bigframes.session.Session.read_orc)
547+
548+
517549
@typing.overload
518550
def read_pandas(
519551
pandas_dataframe: pandas.DataFrame,
Collapse file

‎packages/bigframes/bigframes/session/__init__.py‎

Copy file name to clipboardExpand all lines: packages/bigframes/bigframes/session/__init__.py
+82-1Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1344,7 +1344,7 @@ def read_parquet(
13441344
"The provided path contains a wildcard character (*), which is not "
13451345
"supported by the current engine. To read files from wildcard paths, "
13461346
"please use the 'bigquery' engine by setting `engine='bigquery'` in "
1347-
"your configuration."
1347+
"the function call."
13481348
)
13491349

13501350
read_parquet_kwargs: Dict[str, Any] = {}
@@ -1360,6 +1360,87 @@ def read_parquet(
13601360
)
13611361
return self._read_pandas(pandas_obj, write_engine=write_engine)
13621362

1363+
def read_orc(
1364+
self,
1365+
path: str | IO["bytes"],
1366+
*,
1367+
engine: str = "auto",
1368+
write_engine: constants.WriteEngineType = "default",
1369+
) -> dataframe.DataFrame:
1370+
"""Load an ORC file to a BigQuery DataFrames DataFrame.
1371+
1372+
Args:
1373+
path (str or IO):
1374+
The path or buffer to the ORC file. Can be a local path or Google Cloud Storage URI.
1375+
engine (str, default "auto"):
1376+
The engine used to read the file. Supported values: `auto`, `bigquery`, `pyarrow`.
1377+
write_engine (str, default "default"):
1378+
The write engine used to persist the data to BigQuery if needed.
1379+
1380+
Returns:
1381+
bigframes.pandas.DataFrame:
1382+
A new DataFrame representing the data from the ORC file.
1383+
"""
1384+
bigframes.session.validation.validate_engine_compatibility(
1385+
engine=engine,
1386+
write_engine=write_engine,
1387+
)
1388+
if engine == "bigquery":
1389+
job_config = bigquery.LoadJobConfig()
1390+
job_config.source_format = bigquery.SourceFormat.ORC
1391+
job_config.labels = {"bigframes-api": "read_orc"}
1392+
table_id = self._loader.load_file(path, job_config=job_config)
1393+
return self._loader.read_gbq_table(table_id)
1394+
elif engine in ("auto", "pyarrow"):
1395+
if isinstance(path, str) and "*" in path:
1396+
raise ValueError(
1397+
"The provided path contains a wildcard character (*), which is not "
1398+
"supported by the current engine. To read files from wildcard paths, "
1399+
"please use the 'bigquery' engine by setting `engine='bigquery'` in "
1400+
"your configuration."
1401+
)
1402+
1403+
read_orc_kwargs: Dict[str, Any] = {}
1404+
if not pandas.__version__.startswith("1."):
1405+
read_orc_kwargs["dtype_backend"] = "pyarrow"
1406+
1407+
pandas_obj = pandas.read_orc(path, **read_orc_kwargs)
1408+
return self._read_pandas(pandas_obj, write_engine=write_engine)
1409+
else:
1410+
raise ValueError(
1411+
f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery', 'pyarrow'."
1412+
)
1413+
1414+
def read_avro(
1415+
self,
1416+
path: str | IO["bytes"],
1417+
*,
1418+
engine: str = "auto",
1419+
) -> dataframe.DataFrame:
1420+
"""Load an Avro file to a BigQuery DataFrames DataFrame.
1421+
1422+
Args:
1423+
path (str or IO):
1424+
The path or buffer to the Avro file. Can be a local path or Google Cloud Storage URI.
1425+
engine (str, default "auto"):
1426+
The engine used to read the file. Only `bigquery` is supported for Avro.
1427+
1428+
Returns:
1429+
bigframes.pandas.DataFrame:
1430+
A new DataFrame representing the data from the Avro file.
1431+
"""
1432+
if engine not in ("auto", "bigquery"):
1433+
raise ValueError(
1434+
f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery'."
1435+
)
1436+
1437+
job_config = bigquery.LoadJobConfig()
1438+
job_config.use_avro_logical_types = True
1439+
job_config.source_format = bigquery.SourceFormat.AVRO
1440+
job_config.labels = {"bigframes-api": "read_avro"}
1441+
table_id = self._loader.load_file(path, job_config=job_config)
1442+
return self._loader.read_gbq_table(table_id)
1443+
13631444
def read_json(
13641445
self,
13651446
path_or_buf: str | IO["bytes"],
Collapse file

‎packages/bigframes/tests/system/small/test_session.py‎

Copy file name to clipboardExpand all lines: packages/bigframes/tests/system/small/test_session.py
+125Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1935,6 +1935,131 @@ def test_read_parquet_gcs(
19351935
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
19361936

19371937

1938+
@pytest.mark.parametrize(
1939+
("engine", "filename"),
1940+
(
1941+
pytest.param(
1942+
"bigquery",
1943+
"000000000000.orc",
1944+
id="bigquery",
1945+
),
1946+
pytest.param(
1947+
"auto",
1948+
"000000000000.orc",
1949+
id="auto",
1950+
),
1951+
pytest.param(
1952+
"pyarrow",
1953+
"000000000000.orc",
1954+
id="pyarrow",
1955+
),
1956+
pytest.param(
1957+
"bigquery",
1958+
"*.orc",
1959+
id="bigquery_wildcard",
1960+
),
1961+
pytest.param(
1962+
"auto",
1963+
"*.orc",
1964+
id="auto_wildcard",
1965+
marks=pytest.mark.xfail(
1966+
raises=ValueError,
1967+
),
1968+
),
1969+
),
1970+
)
1971+
def test_read_orc_gcs(
1972+
session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename
1973+
):
1974+
pytest.importorskip(
1975+
"pandas",
1976+
minversion="2.0.0",
1977+
reason="pandas<2 does not handle nullable int columns well",
1978+
)
1979+
scalars_df, _ = scalars_dfs
1980+
write_path = gcs_folder + test_read_orc_gcs.__name__ + "000000000000.orc"
1981+
read_path = gcs_folder + test_read_orc_gcs.__name__ + filename
1982+
1983+
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
1984+
df_in = df_in.drop(
1985+
columns=[
1986+
"geography_col",
1987+
"time_col",
1988+
"datetime_col",
1989+
"duration_col",
1990+
"timestamp_col",
1991+
]
1992+
)
1993+
df_write = df_in.reset_index(drop=False)
1994+
df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}"
1995+
df_write.to_orc(write_path)
1996+
1997+
df_out = (
1998+
session.read_orc(read_path, engine=engine)
1999+
.set_index(df_write.index.name)
2000+
.sort_index()
2001+
.set_index(typing.cast(str, df_in.index.name))
2002+
)
2003+
2004+
assert df_out.size != 0
2005+
pd_df_in = df_in.to_pandas()
2006+
pd_df_out = df_out.to_pandas()
2007+
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
2008+
2009+
2010+
@pytest.mark.parametrize(
2011+
("engine", "filename"),
2012+
(
2013+
pytest.param(
2014+
"bigquery",
2015+
"000000000000.avro",
2016+
id="bigquery",
2017+
),
2018+
pytest.param(
2019+
"bigquery",
2020+
"*.avro",
2021+
id="bigquery_wildcard",
2022+
),
2023+
),
2024+
)
2025+
def test_read_avro_gcs(
2026+
session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename
2027+
):
2028+
scalars_df, _ = scalars_dfs
2029+
write_uri = gcs_folder + test_read_avro_gcs.__name__ + "*.avro"
2030+
read_uri = gcs_folder + test_read_avro_gcs.__name__ + filename
2031+
2032+
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
2033+
# datetime round-trips back as str in avro
2034+
df_in = df_in.drop(columns=["geography_col", "duration_col", "datetime_col"])
2035+
df_write = df_in.reset_index(drop=False)
2036+
index_name = f"ordering_id_{random.randrange(1_000_000)}"
2037+
df_write.index.name = index_name
2038+
2039+
# Create a BigQuery table
2040+
table_id = df_write.to_gbq()
2041+
2042+
# Extract to GCS as Avro
2043+
client = session.bqclient
2044+
extract_job_config = bigquery.ExtractJobConfig()
2045+
extract_job_config.destination_format = "AVRO"
2046+
extract_job_config.use_avro_logical_types = True
2047+
2048+
client.extract_table(table_id, write_uri, job_config=extract_job_config).result()
2049+
2050+
df_out = (
2051+
session.read_avro(read_uri, engine=engine)
2052+
.set_index(index_name)
2053+
.sort_index()
2054+
.set_index(typing.cast(str, df_in.index.name))
2055+
)
2056+
2057+
assert df_out.size != 0
2058+
pd_df_in = df_in.to_pandas()
2059+
pd_df_out = df_out.to_pandas()
2060+
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)
2061+
2062+
19382063
@pytest.mark.parametrize(
19392064
"compression",
19402065
[

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.