Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

refactor: move reader functions from __init__.py to a separate file under the pandas package #1023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Sep 27, 2024
Merged
335 changes: 19 additions & 316 deletions 335 bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,15 @@
import inspect
import sys
import typing
from typing import (
Any,
Callable,
Dict,
IO,
Iterable,
List,
Literal,
MutableSequence,
Optional,
Sequence,
Tuple,
Union,
)
from typing import Any, Iterable, List, Literal, Optional, Sequence, Tuple, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.reshape.concat as vendored_pandas_concat
import bigframes_vendored.pandas.core.reshape.encoding as vendored_pandas_encoding
import bigframes_vendored.pandas.core.reshape.merge as vendored_pandas_merge
import bigframes_vendored.pandas.core.reshape.tile as vendored_pandas_tile
import bigframes_vendored.pandas.core.tools.datetimes as vendored_pandas_datetimes
import bigframes_vendored.pandas.io.gbq as vendored_pandas_gbq
from google.cloud import bigquery
import numpy
import pandas
from pandas._typing import (
CompressionOptions,
FilePath,
ReadPickleBuffer,
StorageOptions,
)

import bigframes._config as config
import bigframes.core.blocks
Expand All @@ -65,6 +43,18 @@
import bigframes.enums
import bigframes.functions._utils as functions_utils
import bigframes.operations as ops
from bigframes.pandas.io.api import (
read_csv,
read_gbq,
read_gbq_function,
read_gbq_model,
read_gbq_query,
read_gbq_table,
read_json,
read_pandas,
read_parquet,
read_pickle,
)
import bigframes.series
import bigframes.session
import bigframes.session._io.bigquery
Expand Down Expand Up @@ -373,286 +363,6 @@ def merge(
merge.__doc__ = vendored_pandas_merge.merge.__doc__


def _set_default_session_location_if_possible(query):
# Set the location as per the query if this is the first query the user is
# running and:
# (1) Default session has not started yet, and
# (2) Location is not set yet, and
# (3) Use of regional endpoints is not set.
# If query is a table name, then it would be the location of the table.
# If query is a SQL with a table, then it would be table's location.
# If query is a SQL with no table, then it would be the BQ default location.
if (
options.bigquery._session_started
or options.bigquery.location
or options.bigquery.use_regional_endpoints
):
return

clients_provider = bigframes.session.clients.ClientsProvider(
project=options.bigquery.project,
location=options.bigquery.location,
use_regional_endpoints=options.bigquery.use_regional_endpoints,
credentials=options.bigquery.credentials,
application_name=options.bigquery.application_name,
bq_kms_key_name=options.bigquery.kms_key_name,
)

bqclient = clients_provider.bqclient

if bigframes.session._io.bigquery.is_query(query):
# Intentionally run outside of the session so that we can detect the
# location before creating the session. Since it's a dry_run, labels
# aren't necessary.
job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True))
options.bigquery.location = job.location
else:
table = bqclient.get_table(query)
options.bigquery.location = table.location


# Note: the following methods are duplicated from Session. This duplication
# enables the following:
#
# 1. Static type checking knows the argument and return types, which is
# difficult to do with decorators. Aside: When we require Python 3.10, we
# can use Concatenate for generic typing in decorators. See:
# https://stackoverflow.com/a/68290080/101923
# 2. docstrings get processed by static processing tools, such as VS Code's
# autocomplete.
# 3. Positional arguments function as expected. If we were to pull in the
# methods directly from Session, a Session object would need to be the first
# argument, even if we allow a default value.
# 4. Allows to set BigQuery options for the BigFrames session based on the
# method and its arguments.


def read_csv(
filepath_or_buffer: str | IO["bytes"],
*,
sep: Optional[str] = ",",
header: Optional[int] = 0,
names: Optional[
Union[MutableSequence[Any], numpy.ndarray[Any, Any], Tuple[Any, ...], range]
] = None,
index_col: Optional[
Union[
int,
str,
Sequence[Union[str, int]],
bigframes.enums.DefaultIndexKind,
Literal[False],
]
] = None,
usecols: Optional[
Union[
MutableSequence[str],
Tuple[str, ...],
Sequence[int],
pandas.Series,
pandas.Index,
numpy.ndarray[Any, Any],
Callable[[Any], bool],
]
] = None,
dtype: Optional[Dict] = None,
engine: Optional[
Literal["c", "python", "pyarrow", "python-fwf", "bigquery"]
] = None,
encoding: Optional[str] = None,
**kwargs,
) -> bigframes.dataframe.DataFrame:
return global_session.with_default_session(
bigframes.session.Session.read_csv,
filepath_or_buffer=filepath_or_buffer,
sep=sep,
header=header,
names=names,
index_col=index_col,
usecols=usecols,
dtype=dtype,
engine=engine,
encoding=encoding,
**kwargs,
)


read_csv.__doc__ = inspect.getdoc(bigframes.session.Session.read_csv)


def read_json(
path_or_buf: str | IO["bytes"],
*,
orient: Literal[
"split", "records", "index", "columns", "values", "table"
] = "columns",
dtype: Optional[Dict] = None,
encoding: Optional[str] = None,
lines: bool = False,
engine: Literal["ujson", "pyarrow", "bigquery"] = "ujson",
**kwargs,
) -> bigframes.dataframe.DataFrame:
return global_session.with_default_session(
bigframes.session.Session.read_json,
path_or_buf=path_or_buf,
orient=orient,
dtype=dtype,
encoding=encoding,
lines=lines,
engine=engine,
**kwargs,
)


read_json.__doc__ = inspect.getdoc(bigframes.session.Session.read_json)


def read_gbq(
query_or_table: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (),
columns: Iterable[str] = (),
configuration: Optional[Dict] = None,
max_results: Optional[int] = None,
filters: vendored_pandas_gbq.FiltersType = (),
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
) -> bigframes.dataframe.DataFrame:
_set_default_session_location_if_possible(query_or_table)
return global_session.with_default_session(
bigframes.session.Session.read_gbq,
query_or_table,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
filters=filters,
use_cache=use_cache,
col_order=col_order,
)


read_gbq.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq)


def read_gbq_model(model_name: str):
return global_session.with_default_session(
bigframes.session.Session.read_gbq_model,
model_name,
)


read_gbq_model.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_model)


def read_gbq_query(
query: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (),
columns: Iterable[str] = (),
configuration: Optional[Dict] = None,
max_results: Optional[int] = None,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
filters: vendored_pandas_gbq.FiltersType = (),
) -> bigframes.dataframe.DataFrame:
_set_default_session_location_if_possible(query)
return global_session.with_default_session(
bigframes.session.Session.read_gbq_query,
query,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
use_cache=use_cache,
col_order=col_order,
filters=filters,
)


read_gbq_query.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_query)


def read_gbq_table(
query: str,
*,
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (),
columns: Iterable[str] = (),
max_results: Optional[int] = None,
filters: vendored_pandas_gbq.FiltersType = (),
use_cache: bool = True,
col_order: Iterable[str] = (),
) -> bigframes.dataframe.DataFrame:
_set_default_session_location_if_possible(query)
return global_session.with_default_session(
bigframes.session.Session.read_gbq_table,
query,
index_col=index_col,
columns=columns,
max_results=max_results,
filters=filters,
use_cache=use_cache,
col_order=col_order,
)


read_gbq_table.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_table)


@typing.overload
def read_pandas(pandas_dataframe: pandas.DataFrame) -> bigframes.dataframe.DataFrame:
...


@typing.overload
def read_pandas(pandas_dataframe: pandas.Series) -> bigframes.series.Series:
...


@typing.overload
def read_pandas(pandas_dataframe: pandas.Index) -> bigframes.core.indexes.Index:
...


def read_pandas(pandas_dataframe: Union[pandas.DataFrame, pandas.Series, pandas.Index]):
return global_session.with_default_session(
bigframes.session.Session.read_pandas,
pandas_dataframe,
)


read_pandas.__doc__ = inspect.getdoc(bigframes.session.Session.read_pandas)


def read_pickle(
filepath_or_buffer: FilePath | ReadPickleBuffer,
compression: CompressionOptions = "infer",
storage_options: StorageOptions = None,
):
return global_session.with_default_session(
bigframes.session.Session.read_pickle,
filepath_or_buffer=filepath_or_buffer,
compression=compression,
storage_options=storage_options,
)


read_pickle.__doc__ = inspect.getdoc(bigframes.session.Session.read_pickle)


def read_parquet(
path: str | IO["bytes"], *, engine: str = "auto"
) -> bigframes.dataframe.DataFrame:
return global_session.with_default_session(
bigframes.session.Session.read_parquet,
path,
engine=engine,
)


read_parquet.__doc__ = inspect.getdoc(bigframes.session.Session.read_parquet)


def remote_function(
input_types: Union[None, type, Sequence[type]] = None,
output_type: Optional[type] = None,
Expand Down Expand Up @@ -697,17 +407,6 @@ def remote_function(
remote_function.__doc__ = inspect.getdoc(bigframes.session.Session.remote_function)


def read_gbq_function(function_name: str, is_row_processor: bool = False):
return global_session.with_default_session(
bigframes.session.Session.read_gbq_function,
function_name=function_name,
is_row_processor=is_row_processor,
)


read_gbq_function.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_function)


@typing.overload
def to_datetime(
arg: Union[
Expand Down Expand Up @@ -893,15 +592,19 @@ def reset_session():
pass

# Use __all__ to let type checkers know what is part of the public API.
__all___ = [
__all__ = [
# Functions
"concat",
"merge",
"read_csv",
"read_gbq",
"read_gbq_function",
"read_gbq_model",
"read_gbq_query",
"read_gbq_table",
"read_json",
"read_pandas",
"read_parquet",
"read_pickle",
"remote_function",
"to_datetime",
Expand All @@ -911,7 +614,7 @@ def reset_session():
"Float64Dtype",
"Int64Dtype",
"StringDtype",
"ArrowDtype"
"ArrowDtype",
# Class aliases
"DataFrame",
"Index",
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.