Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

fix: infer narrowest numeric type when combining numeric columns #602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 17, 2024
Merged
88 changes: 75 additions & 13 deletions 88 bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,7 @@ def unpivot(
*,
passthrough_columns: typing.Sequence[str] = (),
index_col_ids: typing.Sequence[str] = ["index"],
dtype: typing.Union[
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall this was added for matrix multiplication, is that right? Can you help me understand what this was for and why it's now safe to remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added so we could mix compatible numerics together in a single df.sum() or similar aggregation. You would manually pass the float dtype here and it would cast everything to that type. However, this isn't necessary as we can find the common supertype of the inputs and coerce to that automatically.

bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...]
] = pandas.Float64Dtype(),
how: typing.Literal["left", "right"] = "left",
join_side: typing.Literal["left", "right"] = "left",
) -> ArrayValue:
"""
Unpivot ArrayValue columns.
Expand All @@ -367,23 +364,88 @@ def unpivot(
unpivot_columns: Mapping of column id to list of input column ids. Lists of input columns may use None.
passthrough_columns: Columns that will not be unpivoted. Column id will be preserved.
index_col_id (str): The column id to be used for the row labels.
dtype (dtype or list of dtype): Dtype to use for the unpivot columns. If list, must be equal in number to unpivot_columns.

Returns:
ArrayValue: The unpivoted ArrayValue
"""
# There will be N labels, used to disambiguate which of N source columns produced each output row
explode_offsets_id = bigframes.core.guid.generate_guid("unpivot_offsets_")
labels_array = self._create_unpivot_labels_array(row_labels, index_col_ids)
labels_array = labels_array.promote_offsets(explode_offsets_id)

# Unpivot creates N output rows for each input row, labels disambiguate these N rows
joined_array = self._cross_join_w_labels(labels_array, join_side)

# Build the output rows as a case statment that selects between the N input columns
unpivot_exprs = []
# Supports producing multiple stacked ouput columns for stacking only part of hierarchical index
for col_id, input_ids in unpivot_columns:
# row explode offset used to choose the input column
# we use offset instead of label as labels are not necessarily unique
cases = tuple(
(
ops.eq_op.as_expr(explode_offsets_id, ex.const(i)),
ex.free_var(id_or_null)
if (id_or_null is not None)
else ex.const(None),
)
for i, id_or_null in enumerate(input_ids)
)
col_expr = ops.case_when_op.as_expr(*cases)
unpivot_exprs.append((col_expr, col_id))

label_exprs = ((ex.free_var(id), id) for id in index_col_ids)
# passthrough columns are unchanged, just repeated N times each
passthrough_exprs = ((ex.free_var(id), id) for id in passthrough_columns)
return ArrayValue(
nodes.UnpivotNode(
child=self.node,
row_labels=tuple(row_labels),
unpivot_columns=tuple(unpivot_columns),
passthrough_columns=tuple(passthrough_columns),
index_col_ids=tuple(index_col_ids),
dtype=dtype,
how=how,
nodes.ProjectionNode(
child=joined_array.node,
assignments=(*label_exprs, *unpivot_exprs, *passthrough_exprs),
)
)

def _cross_join_w_labels(
self, labels_array: ArrayValue, join_side: typing.Literal["left", "right"]
) -> ArrayValue:
"""
Convert each row in self to N rows, one for each label in labels array.
"""
table_join_side = (
join_def.JoinSide.LEFT if join_side == "left" else join_def.JoinSide.RIGHT
)
labels_join_side = table_join_side.inverse()
labels_mappings = tuple(
join_def.JoinColumnMapping(labels_join_side, id, id)
for id in labels_array.schema.names
)
table_mappings = tuple(
join_def.JoinColumnMapping(table_join_side, id, id)
for id in self.schema.names
)
join = join_def.JoinDefinition(
conditions=(), mappings=(*labels_mappings, *table_mappings), type="cross"
)
if join_side == "left":
joined_array = self.join(labels_array, join_def=join)
else:
joined_array = labels_array.join(self, join_def=join)
return joined_array

def _create_unpivot_labels_array(
self,
former_column_labels: typing.Sequence[typing.Hashable],
col_ids: typing.Sequence[str],
) -> ArrayValue:
"""Create an ArrayValue from a list of label tuples."""
rows = []
for row_offset in range(len(former_column_labels)):
row_label = former_column_labels[row_offset]
row_label = (row_label,) if not isinstance(row_label, tuple) else row_label
row = {col_ids[i]: row_label[i] for i in range(len(col_ids))}
rows.append(row)

return ArrayValue.from_pyarrow(pa.Table.from_pylist(rows), session=self.session)

def join(
self,
other: ArrayValue,
Expand Down
2 changes: 1 addition & 1 deletion 2 bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,5 +857,5 @@ def _idx_extrema(
# Stack the entire column axis to produce single-column result
# Assumption: uniform dtype for stackability
return block.aggregate_all_and_stack(
agg_ops.AnyValueOp(), dtype=block.dtypes[0]
agg_ops.AnyValueOp(),
).with_column_labels([original_block.index.name])
16 changes: 2 additions & 14 deletions 16 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,9 +914,6 @@ def aggregate_all_and_stack(
axis: int | str = 0,
value_col_id: str = "values",
dropna: bool = True,
dtype: typing.Union[
bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...]
] = pd.Float64Dtype(),
) -> Block:
axis_n = utils.get_axis_number(axis)
if axis_n == 0:
Expand All @@ -931,7 +928,6 @@ def aggregate_all_and_stack(
row_labels=self.column_labels.to_list(),
index_col_ids=index_col_ids,
unpivot_columns=tuple([(value_col_id, tuple(self.value_columns))]),
dtype=dtype,
)
return Block(
result_expr,
Expand All @@ -949,7 +945,6 @@ def aggregate_all_and_stack(
index_col_ids=[guid.generate_guid()],
unpivot_columns=[(value_col_id, tuple(self.value_columns))],
passthrough_columns=[*self.index_columns, offset_col],
dtype=dtype,
)
index_aggregations = [
(ex.UnaryAggregation(agg_ops.AnyValueOp(), ex.free_var(col_id)), col_id)
Expand Down Expand Up @@ -1512,22 +1507,18 @@ def stack(self, how="left", levels: int = 1):

# Get matching columns
unpivot_columns: List[Tuple[str, List[str]]] = []
dtypes = []
for val in result_col_labels:
col_id = guid.generate_guid("unpivot_")
input_columns, dtype = self._create_stack_column(val, row_label_tuples)
unpivot_columns.append((col_id, input_columns))
if dtype:
dtypes.append(dtype or pd.Float64Dtype())

added_index_columns = [guid.generate_guid() for _ in range(row_labels.nlevels)]
unpivot_expr = self._expr.unpivot(
row_labels=row_label_tuples,
passthrough_columns=self.index_columns,
unpivot_columns=unpivot_columns,
index_col_ids=added_index_columns,
dtype=tuple(dtypes),
how=how,
join_side=how,
)
new_index_level_names = self.column_labels.names[-levels:]
if how == "left":
Expand Down Expand Up @@ -1559,15 +1550,12 @@ def melt(
value_labels = [self.col_id_to_label[col_id] for col_id in value_vars]
id_labels = [self.col_id_to_label[col_id] for col_id in id_vars]

dtype = self._expr.get_column_type(value_vars[0])

unpivot_expr = self._expr.unpivot(
row_labels=value_labels,
passthrough_columns=id_vars,
unpivot_columns=(unpivot_col,),
index_col_ids=var_col_ids,
dtype=dtype,
how="right",
join_side="right",
)
index_id = guid.generate_guid()
unpivot_expr = unpivot_expr.promote_offsets(index_id)
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.