diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index 97c5ef03e5..43c05c6c83 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -23,6 +23,10 @@ UNNAMED_INDEX_ID = "bigframes_unnamed_index" +def is_gcs_path(value) -> typing_extensions.TypeGuard[str]: + return isinstance(value, str) and value.startswith("gs://") + + def get_axis_number(axis: typing.Union[str, int]) -> typing.Literal[0, 1]: if axis in {0, "index", "rows"}: return 0 diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 2a3aead80a..1cfa2e0155 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2950,15 +2950,21 @@ def from_records( ) def to_csv( - self, path_or_buf: str, sep=",", *, header: bool = True, index: bool = True - ) -> None: + self, + path_or_buf=None, + sep=",", + *, + header: bool = True, + index: bool = True, + ) -> Optional[str]: # TODO(swast): Can we support partition columns argument? # TODO(chelsealin): Support local file paths. # TODO(swast): Some warning that wildcard is recommended for large # query results? See: # https://cloud.google.com/bigquery/docs/exporting-data#limit_the_exported_file_size - if not path_or_buf.startswith("gs://"): - raise NotImplementedError(ERROR_IO_ONLY_GS_PATHS) + if not utils.is_gcs_path(path_or_buf): + pd_df = self.to_pandas() + return pd_df.to_csv(path_or_buf, sep=sep, header=header, index=index) if "*" not in path_or_buf: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) @@ -2975,22 +2981,28 @@ def to_csv( export_data_statement, api_name="dataframe-to_csv" ) self._set_internal_query_job(query_job) + return None def to_json( self, - path_or_buf: str, - orient: Literal[ - "split", "records", "index", "columns", "values", "table" - ] = "columns", + path_or_buf=None, + orient: Optional[ + Literal["split", "records", "index", "columns", "values", "table"] + ] = None, *, lines: bool = False, index: bool = True, - ) -> None: + ) -> Optional[str]: # TODO(swast): Can we support partition columns argument? - # TODO(chelsealin): Support local file paths. - if not path_or_buf.startswith("gs://"): - raise NotImplementedError(ERROR_IO_ONLY_GS_PATHS) - + if not utils.is_gcs_path(path_or_buf): + pd_df = self.to_pandas() + return pd_df.to_json( + path_or_buf, + orient=orient, + lines=lines, + index=index, + default_handler=str, + ) if "*" not in path_or_buf: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) @@ -3019,6 +3031,7 @@ def to_json( export_data_statement, api_name="dataframe-to_json" ) self._set_internal_query_job(query_job) + return None def to_gbq( self, @@ -3117,19 +3130,19 @@ def __array__(self, dtype=None) -> numpy.ndarray: def to_parquet( self, - path: str, + path=None, *, compression: Optional[Literal["snappy", "gzip"]] = "snappy", index: bool = True, - ) -> None: + ) -> Optional[bytes]: # TODO(swast): Can we support partition columns argument? # TODO(chelsealin): Support local file paths. # TODO(swast): Some warning that wildcard is recommended for large # query results? See: # https://cloud.google.com/bigquery/docs/exporting-data#limit_the_exported_file_size - if not path.startswith("gs://"): - raise NotImplementedError(ERROR_IO_ONLY_GS_PATHS) - + if not utils.is_gcs_path(path): + pd_df = self.to_pandas() + return pd_df.to_parquet(path, compression=compression, index=index) if "*" not in path: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) @@ -3153,6 +3166,7 @@ def to_parquet( export_data_statement, api_name="dataframe-to_parquet" ) self._set_internal_query_job(query_job) + return None def to_dict( self, diff --git a/bigframes/series.py b/bigframes/series.py index 8fdafe25e7..3f78a6a9cc 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1652,9 +1652,22 @@ def to_frame(self, name: blocks.Label = None) -> bigframes.dataframe.DataFrame: return bigframes.dataframe.DataFrame(block) def to_csv( - self, path_or_buf: str, sep=",", *, header: bool = True, index: bool = True - ) -> None: - return self.to_frame().to_csv(path_or_buf, sep=sep, header=header, index=index) + self, + path_or_buf=None, + sep=",", + *, + header: bool = True, + index: bool = True, + ) -> Optional[str]: + if utils.is_gcs_path(path_or_buf): + return self.to_frame().to_csv( + path_or_buf, sep=sep, header=header, index=index + ) + else: + pd_series = self.to_pandas() + return pd_series.to_csv( + path_or_buf=path_or_buf, sep=sep, header=header, index=index + ) def to_dict(self, into: type[dict] = dict) -> typing.Mapping: return typing.cast(dict, self.to_pandas().to_dict(into)) # type: ignore @@ -1664,17 +1677,23 @@ def to_excel(self, excel_writer, sheet_name="Sheet1", **kwargs) -> None: def to_json( self, - path_or_buf: str, - orient: typing.Literal[ - "split", "records", "index", "columns", "values", "table" - ] = "columns", + path_or_buf=None, + orient: Optional[ + typing.Literal["split", "records", "index", "columns", "values", "table"] + ] = None, *, lines: bool = False, index: bool = True, - ) -> None: - return self.to_frame().to_json( - path_or_buf=path_or_buf, orient=orient, lines=lines, index=index - ) + ) -> Optional[str]: + if utils.is_gcs_path(path_or_buf): + return self.to_frame().to_json( + path_or_buf=path_or_buf, orient=orient, lines=lines, index=index + ) + else: + pd_series = self.to_pandas() + return pd_series.to_json( + path_or_buf=path_or_buf, orient=orient, lines=lines, index=index # type: ignore + ) def to_latex( self, buf=None, columns=None, header=True, index=True, **kwargs diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 67792b3a1d..3a7eff621f 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -4125,6 +4125,72 @@ def test_df_to_latex(scalars_df_index, scalars_pandas_df_index): assert bf_result == pd_result +def test_df_to_json_local_str(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index.to_json() + # default_handler for arrow types that have no default conversion + pd_result = scalars_pandas_df_index.to_json(default_handler=str) + + assert bf_result == pd_result + + +@skip_legacy_pandas +def test_df_to_json_local_file(scalars_df_index, scalars_pandas_df_index): + with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: + scalars_df_index.to_json(bf_result_file, orient="table") + # default_handler for arrow types that have no default conversion + scalars_pandas_df_index.to_json( + pd_result_file, orient="table", default_handler=str + ) + + bf_result = bf_result_file.read() + pd_result = pd_result_file.read() + + assert bf_result == pd_result + + +def test_df_to_csv_local_str(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index.to_csv() + # default_handler for arrow types that have no default conversion + pd_result = scalars_pandas_df_index.to_csv() + + assert bf_result == pd_result + + +def test_df_to_csv_local_file(scalars_df_index, scalars_pandas_df_index): + with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: + scalars_df_index.to_csv(bf_result_file) + scalars_pandas_df_index.to_csv(pd_result_file) + + bf_result = bf_result_file.read() + pd_result = pd_result_file.read() + + assert bf_result == pd_result + + +def test_df_to_parquet_local_bytes(scalars_df_index, scalars_pandas_df_index): + # GEOGRAPHY not supported in parquet export. + unsupported = ["geography_col"] + + bf_result = scalars_df_index.drop(columns=unsupported).to_parquet() + # default_handler for arrow types that have no default conversion + pd_result = scalars_pandas_df_index.drop(columns=unsupported).to_parquet() + + assert bf_result == pd_result + + +def test_df_to_parquet_local_file(scalars_df_index, scalars_pandas_df_index): + # GEOGRAPHY not supported in parquet export. + unsupported = ["geography_col"] + with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: + scalars_df_index.drop(columns=unsupported).to_parquet(bf_result_file) + scalars_pandas_df_index.drop(columns=unsupported).to_parquet(pd_result_file) + + bf_result = bf_result_file.read() + pd_result = pd_result_file.read() + + assert bf_result == pd_result + + def test_df_to_records(scalars_df_index, scalars_pandas_df_index): unsupported = ["numeric_col"] bf_result = scalars_df_index.drop(columns=unsupported).to_records() @@ -4166,7 +4232,7 @@ def test_df_to_pickle(scalars_df_index, scalars_pandas_df_index): scalars_df_index.to_pickle(bf_result_file) scalars_pandas_df_index.to_pickle(pd_result_file) bf_result = bf_result_file.read() - pd_result = bf_result_file.read() + pd_result = pd_result_file.read() assert bf_result == pd_result diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 10fcec63ce..fe6e001797 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2753,6 +2753,44 @@ def test_to_latex(scalars_df_index, scalars_pandas_df_index): assert bf_result == pd_result +def test_series_to_json_local_str(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index.int64_col.to_json() + pd_result = scalars_pandas_df_index.int64_col.to_json() + + assert bf_result == pd_result + + +@skip_legacy_pandas +def test_series_to_json_local_file(scalars_df_index, scalars_pandas_df_index): + with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: + scalars_df_index.int64_col.to_json(bf_result_file) + scalars_pandas_df_index.int64_col.to_json(pd_result_file) + + bf_result = bf_result_file.read() + pd_result = pd_result_file.read() + + assert bf_result == pd_result + + +def test_series_to_csv_local_str(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index.int64_col.to_csv() + # default_handler for arrow types that have no default conversion + pd_result = scalars_pandas_df_index.int64_col.to_csv() + + assert bf_result == pd_result + + +def test_series_to_csv_local_file(scalars_df_index, scalars_pandas_df_index): + with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: + scalars_df_index.int64_col.to_csv(bf_result_file) + scalars_pandas_df_index.int64_col.to_csv(pd_result_file) + + bf_result = bf_result_file.read() + pd_result = pd_result_file.read() + + assert bf_result == pd_result + + def test_to_dict(scalars_df_index, scalars_pandas_df_index): bf_result = scalars_df_index["int64_too"].to_dict() diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index f8088f8060..7048d9c6dd 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -476,11 +476,11 @@ def to_gbq( def to_parquet( self, - path: str, + path: Optional[str], *, compression: Optional[Literal["snappy", "gzip"]] = "snappy", index: bool = True, - ) -> None: + ) -> Optional[bytes]: """Write a DataFrame to the binary Parquet format. This function writes the dataframe as a `parquet file @@ -496,9 +496,13 @@ def to_parquet( >>> df.to_parquet(path=gcs_bucket) Args: - path (str): + path (str, path object, file-like object, or None, default None): + String, path object (implementing ``os.PathLike[str]``), or file-like + object implementing a binary ``write()`` function. If None, the result is + returned as bytes. If a string or path, it will be used as Root Directory + path when writing a partitioned dataset. Destination URI(s) of Cloud Storage files(s) to store the extracted dataframe - in format of ``gs:///``. + should be formatted ``gs:///``. If the data size is more than 1GB, you must use a wildcard to export the data into multiple files and the size of the files varies. @@ -511,7 +515,7 @@ def to_parquet( If ``False``, they will not be written to the file. Returns: - None. + bytes if no path argument is provided else None """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index 95302e51b2..6734fb6aa9 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -210,14 +210,14 @@ def empty(self) -> bool: def to_json( self, - path_or_buf: str, - orient: Literal[ - "split", "records", "index", "columns", "values", "table" - ] = "columns", + path_or_buf, + orient: Optional[ + Literal["split", "records", "index", "columns", "values", "table"] + ] = None, *, index: bool = True, lines: bool = False, - ) -> None: + ) -> Optional[str]: """Convert the object to a JSON string, written to Cloud Storage. Note NaN's and None will be converted to null and datetime objects @@ -227,16 +227,18 @@ def to_json( Only ``orient='records'`` and ``lines=True`` is supported so far. Args: - path_or_buf (str): - A destination URI of Cloud Storage files(s) to store the extracted + path_or_buf (str, path object, file-like object, or None, default None): + String, path object (implementing os.PathLike[str]), or file-like + object implementing a write() function. If None, the result is + returned as a string. + + Can be a destination URI of Cloud Storage files(s) to store the extracted dataframe in format of ``gs:///``. Must contain a wildcard `*` character. If the data size is more than 1GB, you must use a wildcard to export the data into multiple files and the size of the files varies. - - None, file-like objects or local file paths not yet supported. orient ({`split`, `records`, `index`, `columns`, `values`, `table`}, default 'columns): Indication of expected JSON string format. @@ -271,17 +273,25 @@ def to_json( list-like. Returns: - None: String output not yet supported. + None or str: If path_or_buf is None, returns the resulting json format as a + string. Otherwise returns None. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_csv(self, path_or_buf: str, *, index: bool = True) -> None: + def to_csv(self, path_or_buf, *, index: bool = True) -> Optional[str]: """Write object to a comma-separated values (csv) file on Cloud Storage. Args: - path_or_buf (str): - A destination URI of Cloud Storage files(s) to store the extracted dataframe - in format of ``gs:///``. + path_or_buf (str, path object, file-like object, or None, default None): + String, path object (implementing os.PathLike[str]), or file-like + object implementing a write() function. If None, the result is + returned as a string. If a non-binary file object is passed, it should + be opened with `newline=''`, disabling universal newlines. If a binary + file object is passed, `mode` might need to contain a `'b'`. + + Alternatively, a destination URI of Cloud Storage files(s) to store the + extracted dataframe in format of + ``gs:///``. If the data size is more than 1GB, you must use a wildcard to export the data into multiple files and the size of the files @@ -293,7 +303,8 @@ def to_csv(self, path_or_buf: str, *, index: bool = True) -> None: If True, write row names (index). Returns: - None: String output not yet supported. + None or str: If path_or_buf is None, returns the resulting json format as a + string. Otherwise returns None. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)