-
Notifications
You must be signed in to change notification settings - Fork 50
feat: add DataFrame.eval, DataFrame.query #361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fb7eb1d
7e5d266
6069ced
78dad7c
4d85e0e
8d708a9
2b0d902
8cec454
7b3a4ca
fc4b26a
59888b4
79fe94f
ca5b670
838ff14
b9de8f9
b143097
89f6abb
6887fc7
29f0fd2
f8dcce1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
# Copyright 2024 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import dataclasses | ||
from typing import Optional | ||
|
||
import bigframes_vendored.pandas.core.computation.eval as vendored_pandas_eval | ||
import bigframes_vendored.pandas.core.computation.parsing as vendored_pandas_eval_parsing | ||
|
||
import bigframes.dataframe as dataframe | ||
import bigframes.dtypes | ||
import bigframes.series as series | ||
|
||
|
||
def eval(df: dataframe.DataFrame, expr: str, target: Optional[dataframe.DataFrame]): | ||
""" | ||
Evaluate the given python expression | ||
|
||
Args: | ||
df (DataFrame): | ||
Columns of this dataframe will be used to resolve variables in expression. | ||
expr (str): | ||
One or more python expression to evaluate. | ||
target (DataFrame or None): | ||
The evaluation result will be written to the target if provided. | ||
|
||
Returns: | ||
Result of evaluation. | ||
""" | ||
index_resolver = { | ||
vendored_pandas_eval_parsing.clean_column_name(str(name)): EvalSeries( | ||
df.index.get_level_values(level).to_series() | ||
) | ||
for level, name in enumerate(df.index.names) | ||
} | ||
column_resolver = { | ||
vendored_pandas_eval_parsing.clean_column_name(str(name)): EvalSeries(series) | ||
for name, series in df.items() | ||
} | ||
# 3 Levels: user -> logging wrapper -> dataframe -> eval helper (this) | ||
return vendored_pandas_eval.eval( | ||
expr=expr, level=3, target=target, resolvers=(index_resolver, column_resolver) # type: ignore | ||
) | ||
|
||
|
||
@dataclasses.dataclass | ||
class FakeNumpyArray: | ||
dtype: bigframes.dtypes.Dtype | ||
|
||
|
||
class EvalSeries(series.Series): | ||
"""Slight modified series that works better with pandas.eval""" | ||
|
||
def __init__(self, underlying: series.Series): | ||
super().__init__(data=underlying._block) | ||
|
||
@property | ||
def values(self): | ||
"""Returns fake numpy array with only dtype property so that eval can determine schema without actually downloading the data.""" | ||
return FakeNumpyArray(self.dtype) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1493,6 +1493,17 @@ def sort_values( | |
) | ||
return DataFrame(self._block.order_by(ordering)) | ||
|
||
def eval(self, expr: str) -> DataFrame: | ||
import bigframes.core.eval as bf_eval | ||
|
||
return bf_eval.eval(self, expr, target=self) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we only set https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.eval.html There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. eval uses |
||
|
||
def query(self, expr: str) -> DataFrame: | ||
import bigframes.core.eval as bf_eval | ||
|
||
eval_result = bf_eval.eval(self, expr, target=None) | ||
return self[eval_result] | ||
|
||
def value_counts( | ||
self, | ||
subset: typing.Union[blocks.Label, typing.Sequence[blocks.Label]] = None, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3822,6 +3822,44 @@ def test_df_to_orc(scalars_df_index, scalars_pandas_df_index): | |
assert bf_result == pd_result | ||
|
||
|
||
@skip_legacy_pandas | ||
@pytest.mark.parametrize( | ||
("expr",), | ||
[ | ||
("new_col = int64_col + int64_too",), | ||
("new_col = (rowindex > 3) | bool_col",), | ||
("int64_too = bool_col\nnew_col2 = rowindex",), | ||
], | ||
) | ||
def test_df_eval(scalars_dfs, expr): | ||
scalars_df, scalars_pandas_df = scalars_dfs | ||
|
||
bf_result = scalars_df.eval(expr).to_pandas() | ||
pd_result = scalars_pandas_df.eval(expr) | ||
|
||
pd.testing.assert_frame_equal(bf_result, pd_result) | ||
|
||
|
||
@skip_legacy_pandas | ||
@pytest.mark.parametrize( | ||
("expr",), | ||
[ | ||
("int64_col > int64_too",), | ||
("bool_col",), | ||
("((int64_col - int64_too) % @local_var) == 0",), | ||
], | ||
) | ||
def test_df_query(scalars_dfs, expr): | ||
# local_var is referenced in expressions | ||
local_var = 3 # NOQA | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wow! Didn't realize it went as far as snatching up locals to pass through. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, its kind of scary, you tell it how many stack frames to look up and it will bring all those variables in scope for the evaluation. |
||
scalars_df, scalars_pandas_df = scalars_dfs | ||
|
||
bf_result = scalars_df.query(expr).to_pandas() | ||
pd_result = scalars_pandas_df.query(expr) | ||
|
||
pd.testing.assert_frame_equal(bf_result, pd_result) | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("subset", "normalize", "ascending", "dropna"), | ||
[ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,8 @@ | |
|
||
from typing import Callable, TYPE_CHECKING | ||
|
||
from bigframes_vendored.pandas.core.dtypes.inference import iterable_not_string | ||
|
||
if TYPE_CHECKING: | ||
from bigframes_vendored.pandas.pandas._typing import T | ||
|
||
|
@@ -40,3 +42,27 @@ def pipe( | |
return func(*args, **kwargs) | ||
else: | ||
return func(obj, *args, **kwargs) | ||
|
||
|
||
def flatten(line): | ||
""" | ||
Flatten an arbitrarily nested sequence. | ||
|
||
Parameters | ||
---------- | ||
line : sequence | ||
The non string sequence to flatten | ||
|
||
Notes | ||
----- | ||
This doesn't consider strings sequences. | ||
|
||
Returns | ||
------- | ||
flattened : generator | ||
""" | ||
for element in line: | ||
if iterable_not_string(element): | ||
yield from flatten(element) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TIL: Edit: Added in Python 3.3. https://docs.python.org/3/whatsnew/3.3.html#pep-380 I had to be compatible with Python 2.x for far too long, so I missed a lot of these features. |
||
else: | ||
yield element |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
# Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/computation/align.py | ||
""" | ||
Core eval alignment algorithms. | ||
""" | ||
from __future__ import annotations | ||
|
||
from functools import partial, wraps | ||
from typing import Callable, TYPE_CHECKING | ||
import warnings | ||
|
||
import bigframes_vendored.pandas.core.common as com | ||
from bigframes_vendored.pandas.core.computation.common import result_type_many | ||
from bigframes_vendored.pandas.util._exceptions import find_stack_level | ||
import numpy as np | ||
from pandas.errors import PerformanceWarning | ||
|
||
if TYPE_CHECKING: | ||
from collections.abc import Sequence | ||
|
||
from bigframes_vendored.pandas.core.generic import NDFrame | ||
from bigframes_vendored.pandas.core.indexes.base import Index | ||
from pandas._typing import F | ||
|
||
|
||
def _align_core_single_unary_op( | ||
term, | ||
) -> tuple[partial | type[NDFrame], dict[str, Index] | None]: | ||
typ: partial | type[NDFrame] | ||
axes: dict[str, Index] | None = None | ||
|
||
if isinstance(term.value, np.ndarray): | ||
typ = partial(np.asanyarray, dtype=term.value.dtype) | ||
else: | ||
typ = type(term.value) | ||
if hasattr(term.value, "axes"): | ||
axes = _zip_axes_from_type(typ, term.value.axes) | ||
|
||
return typ, axes | ||
|
||
|
||
def _zip_axes_from_type( | ||
typ: type[NDFrame], new_axes: Sequence[Index] | ||
) -> dict[str, Index]: | ||
return {name: new_axes[i] for i, name in enumerate(typ._AXIS_ORDERS)} | ||
|
||
|
||
def _any_pandas_objects(terms) -> bool: | ||
""" | ||
Check a sequence of terms for instances of PandasObject. | ||
""" | ||
return any(is_pandas_object(term.value) for term in terms) | ||
|
||
|
||
def _filter_special_cases(f) -> Callable[[F], F]: | ||
@wraps(f) | ||
def wrapper(terms): | ||
# single unary operand | ||
if len(terms) == 1: | ||
return _align_core_single_unary_op(terms[0]) | ||
|
||
term_values = (term.value for term in terms) | ||
|
||
# we don't have any pandas objects | ||
if not _any_pandas_objects(terms): | ||
return result_type_many(*term_values), None | ||
|
||
return f(terms) | ||
|
||
return wrapper | ||
|
||
|
||
@_filter_special_cases | ||
def _align_core(terms): | ||
term_index = [i for i, term in enumerate(terms) if hasattr(term.value, "axes")] | ||
term_dims = [terms[i].value.ndim for i in term_index] | ||
|
||
from pandas import Series | ||
|
||
ndims = Series(dict(zip(term_index, term_dims))) | ||
|
||
# initial axes are the axes of the largest-axis'd term | ||
biggest = terms[ndims.idxmax()].value | ||
typ = biggest._constructor | ||
axes = biggest.axes | ||
naxes = len(axes) | ||
gt_than_one_axis = naxes > 1 | ||
|
||
for value in (terms[i].value for i in term_index): | ||
value_is_series = is_series(value) | ||
is_series_and_gt_one_axis = value_is_series and gt_than_one_axis | ||
|
||
for axis, items in enumerate(value.axes): | ||
if is_series_and_gt_one_axis: | ||
ax, itm = naxes - 1, value.index | ||
else: | ||
ax, itm = axis, items | ||
|
||
if not axes[ax].is_(itm): | ||
axes[ax] = axes[ax].join(itm, how="outer") | ||
|
||
for i, ndim in ndims.items(): | ||
for axis, items in zip(range(ndim), axes): | ||
ti = terms[i].value | ||
|
||
if hasattr(ti, "reindex"): | ||
transpose = value_is_series(ti) and naxes > 1 | ||
reindexer = axes[naxes - 1] if transpose else items | ||
|
||
term_axis_size = len(ti.axes[axis]) | ||
reindexer_size = len(reindexer) | ||
|
||
ordm = np.log10(max(1, abs(reindexer_size - term_axis_size))) | ||
if ordm >= 1 and reindexer_size >= 10000: | ||
w = ( | ||
f"Alignment difference on axis {axis} is larger " | ||
f"than an order of magnitude on term {repr(terms[i].name)}, " | ||
f"by more than {ordm:.4g}; performance may suffer." | ||
) | ||
warnings.warn( | ||
w, category=PerformanceWarning, stacklevel=find_stack_level() | ||
) | ||
|
||
obj = ti.reindex(reindexer, axis=axis, copy=False) | ||
terms[i].update(obj) | ||
|
||
terms[i].update(terms[i].value.values) | ||
|
||
return typ, _zip_axes_from_type(typ, axes) | ||
|
||
|
||
def align_terms(terms): | ||
""" | ||
Align a set of terms. | ||
""" | ||
try: | ||
# flatten the parse tree (a nested list, really) | ||
terms = list(com.flatten(terms)) | ||
except TypeError: | ||
# can't iterate so it must just be a constant or single variable | ||
if is_series_or_dataframe(terms.value): | ||
typ = type(terms.value) | ||
return typ, _zip_axes_from_type(typ, terms.value.axes) | ||
return np.result_type(terms.type), None | ||
|
||
# if all resolved variables are numeric scalars | ||
if all(term.is_scalar for term in terms): | ||
return result_type_many(*(term.value for term in terms)).type, None | ||
|
||
# perform the main alignment | ||
typ, axes = _align_core(terms) | ||
return typ, axes | ||
|
||
|
||
def reconstruct_object(typ, obj, axes, dtype): | ||
""" | ||
Reconstruct an object given its type, raw value, and possibly empty | ||
(None) axes. | ||
|
||
Parameters | ||
---------- | ||
typ : object | ||
A type | ||
obj : object | ||
The value to use in the type constructor | ||
axes : dict | ||
The axes to use to construct the resulting pandas object | ||
|
||
Returns | ||
------- | ||
ret : typ | ||
An object of type ``typ`` with the value `obj` and possible axes | ||
`axes`. | ||
""" | ||
try: | ||
typ = typ.type | ||
except AttributeError: | ||
pass | ||
|
||
res_t = np.result_type(obj.dtype, dtype) | ||
|
||
if not isinstance(typ, partial) and is_pandas_type(typ): | ||
return typ(obj, dtype=res_t, **axes) | ||
|
||
# special case for pathological things like ~True/~False | ||
if hasattr(res_t, "type") and typ == np.bool_ and res_t != np.bool_: | ||
ret_value = res_t.type(obj) | ||
else: | ||
ret_value = typ(obj).astype(res_t) | ||
# The condition is to distinguish 0-dim array (returned in case of | ||
# scalar) and 1 element array | ||
# e.g. np.array(0) and np.array([0]) | ||
if ( | ||
len(obj.shape) == 1 | ||
and len(obj) == 1 | ||
and not isinstance(ret_value, np.ndarray) | ||
): | ||
ret_value = np.array([ret_value]).astype(res_t) | ||
|
||
return ret_value | ||
|
||
|
||
# Custom to recognize BigFrames types | ||
def is_series(obj) -> bool: | ||
from bigframes_vendored.pandas.core.series import Series | ||
|
||
return isinstance(obj, Series) | ||
|
||
|
||
def is_series_or_dataframe(obj) -> bool: | ||
from bigframes_vendored.pandas.core.frame import NDFrame | ||
|
||
return isinstance(obj, NDFrame) | ||
|
||
|
||
def is_pandas_object(obj) -> bool: | ||
from bigframes_vendored.pandas.core.frame import NDFrame | ||
from bigframes_vendored.pandas.core.indexes.base import Index | ||
|
||
return isinstance(obj, NDFrame) or isinstance(obj, Index) | ||
|
||
|
||
def is_pandas_type(type) -> bool: | ||
from bigframes_vendored.pandas.core.frame import NDFrame | ||
from bigframes_vendored.pandas.core.indexes.base import Index | ||
|
||
return issubclass(type, NDFrame) or issubclass(type, Index) |
Uh oh!
There was an error while loading. Please reload this page.