Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: Series binary ops compatible with more types #618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions 12 bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,6 @@ def _compiled_schema(self) -> schemata.ArraySchema:
)
return schemata.ArraySchema(items)

def validate_schema(self):
tree_derived = self.node.schema
ibis_derived = self._compiled_schema
if tree_derived.names != ibis_derived.names:
raise ValueError(
f"Unexpected names internal {tree_derived.names} vs compiled {ibis_derived.names}"
)
if tree_derived.dtypes != ibis_derived.dtypes:
raise ValueError(
f"Unexpected types internal {tree_derived.dtypes} vs compiled {ibis_derived.dtypes}"
)

def _try_evaluate_local(self):
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
import ibis
Expand Down
2 changes: 1 addition & 1 deletion 2 bigframes/core/compile/scalar_op_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,7 @@ def clip_op(


@scalar_op_compiler.register_nary_op(ops.case_when_op)
def switch_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value:
def case_when_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value:
# ibis can handle most type coercions, but we need to force bool -> int
# TODO: dispatch coercion depending on bigframes dtype schema
result_values = cases_and_outputs[1::2]
Expand Down
29 changes: 24 additions & 5 deletions 29 bigframes/core/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,22 @@
import bigframes.series as series


def to_bf_series(obj, default_index: Optional[index.Index]) -> series.Series:
def is_series_convertible(obj) -> bool:
if isinstance(obj, series.Series):
return True
if isinstance(obj, pd.Series):
return True
if isinstance(obj, index.Index):
return True
if isinstance(obj, pd.Index):
return True
if pd.api.types.is_list_like(obj):
return True
else:
return False


def to_bf_series(obj, default_index: Optional[index.Index], session) -> series.Series:
"""
Convert a an object to a bigframes series

Expand All @@ -37,13 +52,15 @@ def to_bf_series(obj, default_index: Optional[index.Index]) -> series.Series:
if isinstance(obj, series.Series):
return obj
if isinstance(obj, pd.Series):
return series.Series(obj)
return series.Series(obj, session=session)
if isinstance(obj, index.Index):
return series.Series(obj, default_index)
return series.Series(obj, default_index, session=session)
if isinstance(obj, pd.Index):
return series.Series(obj, default_index)
return series.Series(obj, default_index, session=session)
if pd.api.types.is_dict_like(obj):
return series.Series(obj, session=session)
if pd.api.types.is_list_like(obj):
return series.Series(obj, default_index)
return series.Series(obj, default_index, session=session)
else:
raise TypeError(f"Cannot interpret {obj} as series.")

Expand All @@ -69,6 +86,8 @@ def to_pd_series(obj, default_index: pd.Index) -> pd.Series:
return pd.Series(obj.to_pandas(), default_index)
if isinstance(obj, pd.Index):
return pd.Series(obj, default_index)
if pd.api.types.is_dict_like(obj):
return pd.Series(obj)
if pd.api.types.is_list_like(obj):
return pd.Series(obj, default_index)
else:
Expand Down
7 changes: 6 additions & 1 deletion 7 bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def __init__(
raise ValueError(
f"DataFrame constructor only supports copy=True. {constants.FEEDBACK_LINK}"
)
# just ignore object dtype if provided
if dtype in {numpy.dtypes.ObjectDType, "object"}:
dtype = None

# Check to see if constructing from BigQuery-backed objects before
# falling back to pandas constructor
Expand Down Expand Up @@ -668,7 +671,9 @@ def _apply_binop(
DataFrame(other), op, how=how, reverse=reverse
)
elif utils.get_axis_number(axis) == 0:
bf_series = bigframes.core.convert.to_bf_series(other, self.index)
bf_series = bigframes.core.convert.to_bf_series(
other, self.index, self._session
)
return self._apply_series_binop_axis_0(bf_series, op, how, reverse)
elif utils.get_axis_number(axis) == 1:
pd_series = bigframes.core.convert.to_pd_series(other, self.columns)
Expand Down
128 changes: 79 additions & 49 deletions 128 bigframes/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import typing

import bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing
import numpy
import pandas as pd

import bigframes.constants as constants
import bigframes.core.blocks as blocks
import bigframes.core.convert
import bigframes.core.expression as ex
import bigframes.core.indexes as indexes
import bigframes.core.scalar as scalars
Expand All @@ -44,7 +46,19 @@ def __init__(
*,
session: typing.Optional[bigframes.session.Session] = None,
):
block = None
import bigframes.pandas

# just ignore object dtype if provided
if dtype in {numpy.dtypes.ObjectDType, "object"}:
dtype = None

read_pandas_func = (
session.read_pandas
if (session is not None)
else (lambda x: bigframes.pandas.read_pandas(x))
)

block: typing.Optional[blocks.Block] = None
if copy is not None and not copy:
raise ValueError(
f"Series constructor only supports copy=True. {constants.FEEDBACK_LINK}"
Expand All @@ -55,58 +69,75 @@ def __init__(
assert index is None
block = data

elif isinstance(data, SeriesMethods):
block = data._block
# interpret these cases as both index and data
elif (
isinstance(data, SeriesMethods)
or isinstance(data, pd.Series)
or pd.api.types.is_dict_like(data)
):
if isinstance(data, pd.Series):
data = read_pandas_func(data)
elif pd.api.types.is_dict_like(data):
data = read_pandas_func(pd.Series(data, dtype=dtype)) # type: ignore
dtype = None
data_block = data._block
if index is not None:
# reindex
bf_index = indexes.Index(index)
bf_index = indexes.Index(index, session=session)
idx_block = bf_index._block
idx_cols = idx_block.value_columns
block_idx, _ = idx_block.join(block, how="left")
block = block_idx.with_index_labels(bf_index.names)

elif isinstance(data, indexes.Index):
block_idx, _ = idx_block.join(data_block, how="left")
data_block = block_idx.with_index_labels(bf_index.names)
block = data_block

# list-like data that will get default index
elif isinstance(data, indexes.Index) or pd.api.types.is_list_like(data):
data = indexes.Index(data, dtype=dtype, session=session)
dtype = (
None # set to none as it has already been applied, avoid re-cast later
)
if data.nlevels != 1:
raise NotImplementedError("Cannot interpret multi-index as Series.")
# Reset index to promote index columns to value columns, set default index
block = data._block.reset_index(drop=False)
data_block = data._block.reset_index(drop=False).with_column_labels(
data.names
)
if index is not None:
# Align by offset
bf_index = indexes.Index(index)
idx_block = bf_index._block.reset_index(drop=False)
bf_index = indexes.Index(index, session=session)
idx_block = bf_index._block.reset_index(
drop=False
) # reset to align by offsets, and then reset back
idx_cols = idx_block.value_columns
block, (l_mapping, _) = idx_block.join(block, how="left")
block = block.set_index([l_mapping[col] for col in idx_cols])
block = block.with_index_labels(bf_index.names)

if block:
if name:
if not isinstance(name, typing.Hashable):
raise ValueError(
f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}"
)
block = block.with_column_labels([name])
if dtype:
block = block.multi_apply_unary_op(
block.value_columns, ops.AsTypeOp(to_type=dtype)
)
else:
import bigframes.pandas
data_block, (l_mapping, _) = idx_block.join(data_block, how="left")
data_block = data_block.set_index([l_mapping[col] for col in idx_cols])
data_block = data_block.with_index_labels(bf_index.names)
block = data_block

pd_series = pd.Series(
data=data, index=index, dtype=dtype, name=name # type:ignore
)
pd_dataframe = pd_series.to_frame()
if pd_series.name is None:
# to_frame will set default numeric column label if unnamed, but we do not support int column label, so must rename
pd_dataframe = pd_dataframe.set_axis(["unnamed_col"], axis=1)
if session:
block = session.read_pandas(pd_dataframe)._get_block()
else: # Scalar case
if index is not None:
bf_index = indexes.Index(index, session=session)
else:
# Uses default global session
block = bigframes.pandas.read_pandas(pd_dataframe)._get_block()
if pd_series.name is None:
block = block.with_column_labels([None])
bf_index = indexes.Index(
[] if (data is None) else [0],
session=session,
dtype=bigframes.dtypes.INT_DTYPE,
)
block, _ = bf_index._block.create_constant(data, dtype)
dtype = None
block = block.with_column_labels([name])

assert block is not None
if name:
if not isinstance(name, typing.Hashable):
raise ValueError(
f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}"
)
block = block.with_column_labels([name])
if dtype:
block = block.multi_apply_unary_op(
block.value_columns, ops.AsTypeOp(to_type=dtype)
)
self._block: blocks.Block = block

@property
Expand Down Expand Up @@ -145,17 +176,16 @@ def _apply_binary_op(
reverse: bool = False,
) -> series.Series:
"""Applies a binary operator to the series and other."""
if isinstance(other, pd.Series):
# TODO: Convert to BigQuery DataFrames series
raise NotImplementedError(
f"Pandas series not supported as operand. {constants.FEEDBACK_LINK}"
if bigframes.core.convert.is_series_convertible(other):
self_index = indexes.Index(self._block)
other_series = bigframes.core.convert.to_bf_series(
other, self_index, self._block.session
)
if isinstance(other, series.Series):
(self_col, other_col, block) = self._align(other, how=alignment)
(self_col, other_col, block) = self._align(other_series, how=alignment)

name = self._name
if (
isinstance(other, series.Series)
hasattr(other, "name")
and other.name != self._name
and alignment == "outer"
):
Expand All @@ -166,7 +196,7 @@ def _apply_binary_op(
block, result_id = block.project_expr(expr, name)
return series.Series(block.select_column(result_id))

else:
else: # Scalar binop
name = self._name
expr = op.as_expr(
ex.const(other) if reverse else self._value_column,
Expand Down
9 changes: 0 additions & 9 deletions 9 bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import inspect
import itertools
import numbers
import os
import textwrap
import typing
from typing import Any, cast, Literal, Mapping, Optional, Sequence, Tuple, Union
Expand Down Expand Up @@ -73,11 +72,6 @@ def __init__(self, *args, **kwargs):
self._query_job: Optional[bigquery.QueryJob] = None
super().__init__(*args, **kwargs)

# Runs strict validations to ensure internal type predictions and ibis are completely in sync
# Do not execute these validations outside of testing suite.
if "PYTEST_CURRENT_TEST" in os.environ:
self._block.expr.validate_schema()

@property
def dt(self) -> dt.DatetimeMethods:
return dt.DatetimeMethods(self._block)
Expand Down Expand Up @@ -812,9 +806,6 @@ def combine_first(self, other: Series) -> Series:
return result

def update(self, other: Union[Series, Sequence, Mapping]) -> None:
import bigframes.core.convert

other = bigframes.core.convert.to_bf_series(other, default_index=None)
result = self._apply_binary_op(
other, ops.coalesce_op, reverse=True, alignment="left"
)
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.