Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

perf: Cache transpose to allow performant retranspose #635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 58 additions & 22 deletions 80 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import os
import random
import typing
from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple
from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple, Union
import warnings

import google.cloud.bigquery as bigquery
Expand Down Expand Up @@ -105,6 +105,8 @@ def __init__(
index_columns: Iterable[str],
column_labels: typing.Union[pd.Index, typing.Iterable[Label]],
index_labels: typing.Union[pd.Index, typing.Iterable[Label], None] = None,
*,
transpose_cache: Optional[Block] = None,
):
"""Construct a block object, will create default index if no index columns specified."""
index_columns = list(index_columns)
Expand Down Expand Up @@ -144,6 +146,7 @@ def __init__(
# TODO(kemppeterson) Add a cache for corr to parallel the single-column stats.

self._stats_cache[" ".join(self.index_columns)] = {}
self._transpose_cache: Optional[Block] = transpose_cache

@classmethod
def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block:
Expand Down Expand Up @@ -716,6 +719,15 @@ def with_column_labels(
index_labels=self.index.names,
)

def with_transpose_cache(self, transposed: Block):
return Block(
self._expr,
index_columns=self.index_columns,
column_labels=self._column_labels,
index_labels=self.index.names,
transpose_cache=transposed,
)

def with_index_labels(self, value: typing.Sequence[Label]) -> Block:
if len(value) != len(self.index_columns):
raise ValueError(
Expand Down Expand Up @@ -804,18 +816,35 @@ def multi_apply_window_op(
def multi_apply_unary_op(
self,
columns: typing.Sequence[str],
op: ops.UnaryOp,
op: Union[ops.UnaryOp, ex.Expression],
) -> Block:
if isinstance(op, ops.UnaryOp):
input_varname = guid.generate_guid()
expr = op.as_expr(input_varname)
else:
input_varnames = op.unbound_variables
assert len(input_varnames) == 1
expr = op
input_varname = input_varnames[0]

block = self
for i, col_id in enumerate(columns):
for col_id in columns:
label = self.col_id_to_label[col_id]
block, result_id = block.apply_unary_op(
col_id,
op,
result_label=label,
block, result_id = block.project_expr(
expr.bind_all_variables({input_varname: ex.free_var(col_id)}),
label=label,
)
block = block.copy_values(result_id, col_id)
block = block.drop_columns([result_id])
# Special case, we can preserve transpose cache for full-frame unary ops
if (self._transpose_cache is not None) and set(self.value_columns) == set(
columns
):
transpose_columns = self._transpose_cache.value_columns
new_transpose_cache = self._transpose_cache.multi_apply_unary_op(
transpose_columns, op
)
block = block.with_transpose_cache(new_transpose_cache)
return block

def apply_window_op(
Expand Down Expand Up @@ -922,20 +951,17 @@ def aggregate_all_and_stack(
(ex.UnaryAggregation(operation, ex.free_var(col_id)), col_id)
for col_id in self.value_columns
]
index_col_ids = [
guid.generate_guid() for i in range(self.column_labels.nlevels)
]
result_expr = self.expr.aggregate(aggregations, dropna=dropna).unpivot(
row_labels=self.column_labels.to_list(),
index_col_ids=index_col_ids,
unpivot_columns=tuple([(value_col_id, tuple(self.value_columns))]),
)
index_id = guid.generate_guid()
result_expr = self.expr.aggregate(
aggregations, dropna=dropna
).assign_constant(index_id, None, None)
# Transpose as last operation so that final block has valid transpose cache
return Block(
result_expr,
index_columns=index_col_ids,
column_labels=[None],
index_labels=self.column_labels.names,
)
index_columns=[index_id],
column_labels=self.column_labels,
index_labels=[None],
).transpose(original_row_index=pd.Index([None]))
else: # axis_n == 1
# using offsets as identity to group on.
# TODO: Allow to promote identity/total_order columns instead for better perf
Expand Down Expand Up @@ -1575,10 +1601,19 @@ def melt(
index_columns=[index_id],
)

def transpose(self) -> Block:
"""Transpose the block. Will fail if dtypes aren't coercible to a common type or too many rows"""
def transpose(self, *, original_row_index: Optional[pd.Index] = None) -> Block:
"""Transpose the block. Will fail if dtypes aren't coercible to a common type or too many rows.
Can provide the original_row_index directly if it is already known, otherwise a query is needed.
"""
if self._transpose_cache is not None:
return self._transpose_cache.with_transpose_cache(self)

original_col_index = self.column_labels
original_row_index = self.index.to_pandas()
original_row_index = (
original_row_index
if original_row_index is not None
else self.index.to_pandas()
)
original_row_count = len(original_row_index)
if original_row_count > bigframes.constants.MAX_COLUMNS:
raise NotImplementedError(
Expand Down Expand Up @@ -1619,6 +1654,7 @@ def transpose(self) -> Block:
result.with_column_labels(original_row_index)
.order_by([ordering.ascending_over(result.index_columns[-1])])
.drop_levels([result.index_columns[-1]])
.with_transpose_cache(self)
)

def _create_stack_column(
Expand Down
17 changes: 7 additions & 10 deletions 17 bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,17 +823,14 @@ def to_sql(
for col in baked_ir.column_ids
]
selection = ", ".join(map(lambda col_id: f"`{col_id}`", output_columns))
order_by_clause = baked_ir._ordering_clause(
baked_ir._ordering.all_ordering_columns
)

sql = textwrap.dedent(
f"SELECT {selection}\n"
"FROM (\n"
f"{sql}\n"
")\n"
f"{order_by_clause}\n"
)
sql = textwrap.dedent(f"SELECT {selection}\n" "FROM (\n" f"{sql}\n" ")\n")
# Single row frames may not have any ordering columns
if len(baked_ir._ordering.all_ordering_columns) > 0:
order_by_clause = baked_ir._ordering_clause(
baked_ir._ordering.all_ordering_columns
)
sql += f"{order_by_clause}\n"
else:
sql = ibis_bigquery.Backend().compile(
self._to_ibis_expr(
Expand Down
2 changes: 2 additions & 0 deletions 2 bigframes/core/ordering.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ def _truncate_ordering(
truncated_refs.append(order_part)
if columns_seen.issuperset(must_see):
return tuple(truncated_refs)
if len(must_see) == 0:
return ()
raise ValueError("Ordering did not contain all total_order_cols")

def with_reverse(self):
Expand Down
47 changes: 24 additions & 23 deletions 47 bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,18 +693,19 @@ def _apply_binop(
def _apply_scalar_binop(
self, other: float | int, op: ops.BinaryOp, reverse: bool = False
) -> DataFrame:
block = self._block
for column_id, label in zip(
self._block.value_columns, self._block.column_labels
):
expr = (
op.as_expr(ex.const(other), column_id)
if reverse
else op.as_expr(column_id, ex.const(other))
if reverse:
expr = op.as_expr(
left_input=ex.const(other),
right_input=bigframes.core.guid.generate_guid(),
)
block, _ = block.project_expr(expr, label)
block = block.drop_columns([column_id])
return DataFrame(block)
else:
expr = op.as_expr(
left_input=bigframes.core.guid.generate_guid(),
right_input=ex.const(other),
)
return DataFrame(
self._block.multi_apply_unary_op(self._block.value_columns, expr)
)

def _apply_series_binop_axis_0(
self,
Expand Down Expand Up @@ -1974,7 +1975,7 @@ def any(
else:
frame = self._drop_non_bool()
block = frame._block.aggregate_all_and_stack(agg_ops.any_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

def all(
self, axis: typing.Union[str, int] = 0, *, bool_only: bool = False
Expand All @@ -1984,7 +1985,7 @@ def all(
else:
frame = self._drop_non_bool()
block = frame._block.aggregate_all_and_stack(agg_ops.all_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

def sum(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
Expand All @@ -1994,7 +1995,7 @@ def sum(
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_stack(agg_ops.sum_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

def mean(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
Expand All @@ -2004,7 +2005,7 @@ def mean(
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_stack(agg_ops.mean_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

def median(
self, *, numeric_only: bool = False, exact: bool = True
Expand All @@ -2019,7 +2020,7 @@ def median(
return result
else:
block = frame._block.aggregate_all_and_stack(agg_ops.median_op)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

def quantile(
self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False
Expand Down Expand Up @@ -2052,7 +2053,7 @@ def std(
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_stack(agg_ops.std_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

def var(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
Expand All @@ -2062,7 +2063,7 @@ def var(
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_stack(agg_ops.var_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

def min(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
Expand All @@ -2072,7 +2073,7 @@ def min(
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_stack(agg_ops.min_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

def max(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
Expand All @@ -2082,7 +2083,7 @@ def max(
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_stack(agg_ops.max_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

def prod(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
Expand All @@ -2092,7 +2093,7 @@ def prod(
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_stack(agg_ops.product_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

product = prod
product.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.prod)
Expand All @@ -2103,11 +2104,11 @@ def count(self, *, numeric_only: bool = False) -> bigframes.series.Series:
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_stack(agg_ops.count_op)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

def nunique(self) -> bigframes.series.Series:
block = self._block.aggregate_all_and_stack(agg_ops.nunique_op)
return bigframes.series.Series(block.select_column("values"))
return bigframes.series.Series(block)

def agg(
self, func: str | typing.Sequence[str]
Expand Down
14 changes: 14 additions & 0 deletions 14 tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2488,6 +2488,20 @@ def test_df_transpose_error():
dataframe.DataFrame([[1, "hello"], [2, "world"]]).transpose()


def test_df_transpose_repeated_uses_cache():
bf_df = dataframe.DataFrame([[1, 2.5], [2, 3.5]])
pd_df = pandas.DataFrame([[1, 2.5], [2, 3.5]])
# Transposing many times so that operation will fail from complexity if not using cache
for i in range(10):
# Cache still works even with simple scalar binop
bf_df = bf_df.transpose() + i
pd_df = pd_df.transpose() + i

pd.testing.assert_frame_equal(
pd_df, bf_df.to_pandas(), check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize(
("ordered"),
[
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.