Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: add DefaultIndexKind.NULL to use as index_col in read_gbq*, creating an indexless DataFrame/Series #662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
aaa545b
feat: Support indexless dataframe/series
TrevorBergeron May 6, 2024
7f11946
Merge remote-tracking branch 'github/main' into null_index
TrevorBergeron May 7, 2024
9a5b212
fixes for kurt, skew, median
TrevorBergeron May 8, 2024
0248150
fix unit tests
TrevorBergeron May 8, 2024
26e2d4f
Merge remote-tracking branch 'github/main' into null_index
TrevorBergeron May 8, 2024
16e292b
fix more issues
TrevorBergeron May 8, 2024
5611a86
fix defaulting to primary key logic
TrevorBergeron May 8, 2024
8caa068
Merge remote-tracking branch 'github/main' into null_index
TrevorBergeron May 9, 2024
ea9b120
fix tests
TrevorBergeron May 9, 2024
88fc037
Merge remote-tracking branch 'github/main' into null_index
TrevorBergeron May 15, 2024
27d6f47
many small changes
TrevorBergeron May 15, 2024
75b1fd1
fix accidental null indexes and raising warning
TrevorBergeron May 16, 2024
0b26bbb
Merge remote-tracking branch 'github/main' into null_index
TrevorBergeron May 16, 2024
7142078
fix df quantile index
TrevorBergeron May 16, 2024
7b5f4f6
Merge remote-tracking branch 'github/main' into null_index
TrevorBergeron May 17, 2024
bc28bd4
disable legacy pandas for some tests, add concat test
TrevorBergeron May 17, 2024
bd0aa12
fix series repr
TrevorBergeron May 17, 2024
5efcc27
Update bigframes/session/__init__.py
TrevorBergeron May 17, 2024
4b487e7
Update bigframes/core/rewrite.py
TrevorBergeron May 17, 2024
3892241
Update bigframes/core/rewrite.py
TrevorBergeron May 17, 2024
09af424
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] May 17, 2024
1164faf
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] May 17, 2024
8844f27
Merge branch 'null_index' of https://github.com/googleapis/python-big…
gcf-owl-bot[bot] May 17, 2024
600d500
pr comments addressed
TrevorBergeron May 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions 13 bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,19 @@ def join(
return ArrayValue(bigframes.core.rewrite.maybe_rewrite_join(join_node))
return ArrayValue(join_node)

def try_align_as_projection(
self,
other: ArrayValue,
join_type: join_def.JoinType,
mappings: typing.Tuple[join_def.JoinColumnMapping, ...],
) -> typing.Optional[ArrayValue]:
tswast marked this conversation as resolved.
Show resolved Hide resolved
left_side = bigframes.core.rewrite.SquashedSelect.from_node(self.node)
right_side = bigframes.core.rewrite.SquashedSelect.from_node(other.node)
result = left_side.maybe_merge(right_side, join_type, mappings)
if result is not None:
return ArrayValue(result.expand())
return None

def explode(self, column_ids: typing.Sequence[str]) -> ArrayValue:
assert len(column_ids) > 0
for column_id in column_ids:
Expand Down
19 changes: 12 additions & 7 deletions 19 bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,11 @@ def skew(

block = block.select_columns(skew_ids).with_column_labels(column_labels)
if not grouping_column_ids:
# When ungrouped, stack everything into single column so can be returned as series
block = block.stack()
block = block.drop_levels([block.index_columns[0]])
# When ungrouped, transpose result row into a series
# perform transpose last, so as to not invalidate cache
block, index_col = block.create_constant(None, None)
block = block.set_index([index_col])
return block.transpose(original_row_index=pd.Index([None]))
return block


Expand Down Expand Up @@ -637,9 +639,11 @@ def kurt(

block = block.select_columns(kurt_ids).with_column_labels(column_labels)
if not grouping_column_ids:
# When ungrouped, stack everything into single column so can be returned as series
block = block.stack()
block = block.drop_levels([block.index_columns[0]])
# When ungrouped, transpose result row into a series
# perform transpose last, so as to not invalidate cache
block, index_col = block.create_constant(None, None)
block = block.set_index([index_col])
return block.transpose(original_row_index=pd.Index([None]))
return block


Expand Down Expand Up @@ -820,7 +824,8 @@ def idxmax(block: blocks.Block) -> blocks.Block:
def _idx_extrema(
block: blocks.Block, min_or_max: typing.Literal["min", "max"]
) -> blocks.Block:
if len(block.index_columns) != 1:
block._throw_if_null_index("idx")
if len(block.index_columns) > 1:
# TODO: Need support for tuple dtype
raise NotImplementedError(
f"idxmin not support for multi-index. {constants.FEEDBACK_LINK}"
Expand Down
135 changes: 112 additions & 23 deletions 135 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import bigframes.core.utils as utils
import bigframes.core.window_spec as window_specs
import bigframes.dtypes
import bigframes.exceptions
import bigframes.features
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
Expand Down Expand Up @@ -120,19 +121,11 @@ def __init__(
f"'index_columns' (size {len(index_columns)}) and 'index_labels' (size {len(index_labels)}) must have equal length"
)

# If no index columns are set, create one.
#
# Note: get_index_cols in
# bigframes/session/_io/bigquery/read_gbq_table.py depends on this
# being as sequential integer index column. If this default behavior
# ever changes, please also update get_index_cols so
# that users who explicitly request a sequential integer index can
# still get one.
if len(index_columns) == 0:
tswast marked this conversation as resolved.
Show resolved Hide resolved
new_index_col_id = guid.generate_guid()
expr = expr.promote_offsets(new_index_col_id)
index_columns = [new_index_col_id]

warnings.warn(
"Creating object with Null Index. Null Index is a preview feature.",
category=bigframes.exceptions.PreviewWarning,
)
self._index_columns = tuple(index_columns)
# Index labels don't need complicated hierarchical access so can store as tuple
self._index_labels = (
Expand Down Expand Up @@ -517,7 +510,8 @@ def _copy_index_to_pandas(self, df: pd.DataFrame):

Warning: This method modifies ``df`` inplace.
"""
if self.index_columns:
# Note: If BigQuery DataFrame has null index, a default one will be created for the local materialization.
if len(self.index_columns) > 0:
df.set_index(list(self.index_columns), inplace=True)
# Pandas names is annotated as list[str] rather than the more
# general Sequence[Label] that BigQuery DataFrames has.
Expand Down Expand Up @@ -1093,16 +1087,25 @@ def aggregate(
aggregate_labels = self._get_labels_for_columns(
[agg[0] for agg in aggregations]
)

names: typing.List[Label] = []
for by_col_id in by_column_ids:
if by_col_id in self.value_columns:
names.append(self.col_id_to_label[by_col_id])
else:
names.append(self.col_id_to_index_name[by_col_id])
if len(by_column_ids) == 0:
label_id = guid.generate_guid()
result_expr = result_expr.assign_constant(label_id, 0, pd.Int64Dtype())
index_columns = (label_id,)
names = [None]
else:
index_columns = tuple(by_column_ids) # type: ignore
for by_col_id in by_column_ids:
if by_col_id in self.value_columns:
names.append(self.col_id_to_label[by_col_id])
else:
names.append(self.col_id_to_index_name[by_col_id])

return (
Block(
result_expr,
index_columns=by_column_ids,
index_columns=index_columns,
column_labels=aggregate_labels,
index_labels=names,
),
Expand Down Expand Up @@ -1256,11 +1259,12 @@ def explode(
expr = self.expr.explode(column_ids)

if ignore_index:
new_index_ids = guid.generate_guid()
return Block(
expr.drop_columns(self.index_columns),
expr.drop_columns(self.index_columns).promote_offsets(new_index_ids),
column_labels=self.column_labels,
# Initiates default index creation using the block constructor.
index_columns=[],
index_columns=[new_index_ids],
)
else:
return Block(
Expand Down Expand Up @@ -1423,7 +1427,8 @@ def retrieve_repr_request_results(
computed_df, query_job = head_block.to_pandas()
formatted_df = computed_df.set_axis(self.column_labels, axis=1)
# we reset the axis and substitute the bf index name(s) for the default
formatted_df.index.names = self.index.names # type: ignore
if len(self.index.names) > 0:
formatted_df.index.names = self.index.names # type: ignore
return formatted_df, count, query_job

def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
Expand Down Expand Up @@ -1907,9 +1912,26 @@ def join(
other: Block,
*,
how="left",
sort=False,
sort: bool = False,
block_identity_join: bool = False,
) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]:
"""
Join two blocks objects together, and provide mappings between source columns and output columns.

Args:
other (Block):
The right operand of the join operation
how (str):
Describes the join type. 'inner', 'outer', 'left', or 'right'
sort (bool):
if true will sort result by index
block_identity_join (bool):
If true, will not convert join to a projection (implicitly assuming unique indices)

Returns:
Block, (left_mapping, right_mapping): Result block and mappers from input column ids to result column ids.
"""

if not isinstance(other, Block):
# TODO(swast): We need to improve this error message to be more
# actionable for the user. For example, it's possible they
Expand All @@ -1923,6 +1945,16 @@ def join(
raise NotImplementedError(
f"Only how='outer','left','right','inner' currently supported. {constants.FEEDBACK_LINK}"
)
# Special case for null index,
if (
(self.index.nlevels == other.index.nlevels == 0)
and not sort
and not block_identity_join
):
return join_indexless(self, other, how=how)

self._throw_if_null_index("join")
other._throw_if_null_index("join")
if self.index.nlevels == other.index.nlevels == 1:
return join_mono_indexed(
self, other, how=how, sort=sort, block_identity_join=block_identity_join
Expand Down Expand Up @@ -2071,6 +2103,12 @@ def _is_monotonic(
self._stats_cache[column_name].update({op_name: result})
return result

def _throw_if_null_index(self, opname: str):
if len(self.index_columns) == 0:
raise bigframes.exceptions.NullIndexError(
f"Cannot do {opname} without an index. Set an index using set_index."
)

def _get_rows_as_json_values(self) -> Block:
# We want to preserve any ordering currently present before turning to
# direct SQL manipulation. We will restore the ordering when we rebuild
Expand Down Expand Up @@ -2211,6 +2249,10 @@ def __repr__(self) -> str:

def to_pandas(self) -> pd.Index:
"""Executes deferred operations and downloads the results."""
if len(self.column_ids) == 0:
raise bigframes.exceptions.NullIndexError(
"Cannot materialize index, as this object does not have an index. Set index column(s) using set_index."
)
# Project down to only the index column. So the query can be cached to visualize other data.
index_columns = list(self._block.index_columns)
dtypes = dict(zip(index_columns, self.dtypes))
Expand Down Expand Up @@ -2252,6 +2294,53 @@ def is_uniquely_named(self: BlockIndexProperties):
return len(set(self.names)) == len(self.names)


def join_indexless(
left: Block,
right: Block,
*,
how="left",
) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]:
"""Joins two blocks"""
left_expr = left.expr
right_expr = right.expr
left_mappings = [
join_defs.JoinColumnMapping(
source_table=join_defs.JoinSide.LEFT,
source_id=id,
destination_id=guid.generate_guid(),
)
for id in left_expr.column_ids
]
right_mappings = [
join_defs.JoinColumnMapping(
source_table=join_defs.JoinSide.RIGHT,
source_id=id,
destination_id=guid.generate_guid(),
)
for id in right_expr.column_ids
]
combined_expr = left_expr.try_align_as_projection(
right_expr,
join_type=how,
mappings=(*left_mappings, *right_mappings),
)
if combined_expr is None:
raise bigframes.exceptions.NullIndexError(
"Cannot implicitly align objects. Set an explicit index using set_index."
)
get_column_left = {m.source_id: m.destination_id for m in left_mappings}
get_column_right = {m.source_id: m.destination_id for m in right_mappings}
block = Block(
combined_expr,
column_labels=[*left.column_labels, *right.column_labels],
index_columns=(),
)
return (
block,
(get_column_left, get_column_right),
)


def join_mono_indexed(
left: Block,
right: Block,
Expand Down
5 changes: 5 additions & 0 deletions 5 bigframes/core/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ def __new__(
def from_frame(
cls, frame: Union[bigframes.series.Series, bigframes.dataframe.DataFrame]
) -> Index:
if len(frame._block.index_columns) == 0:
raise bigframes.exceptions.NullIndexError(
"Cannot access index properties with Null Index. Set an index using set_index."
)
frame._block._throw_if_null_index("from_frame")
index = Index(frame._block)
index._linked_frame = frame
return index
Expand Down
38 changes: 26 additions & 12 deletions 38 bigframes/core/rewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ def order_with(self, by: Tuple[order.OrderingExpression, ...]):
self.root, self.columns, self.predicate, new_ordering, self.reverse_root
)

def maybe_join(
def can_join(
self, right: SquashedSelect, join_def: join_defs.JoinDefinition
) -> Optional[SquashedSelect]:
) -> bool:
if join_def.type == "cross":
# Cannot convert cross join to projection
return None
return False

r_exprs_by_id = {id: expr for expr, id in right.columns}
l_exprs_by_id = {id: expr for expr, id in self.columns}
Expand All @@ -113,10 +113,17 @@ def maybe_join(
if (self.root != right.root) or any(
l_expr != r_expr for l_expr, r_expr in zip(l_join_exprs, r_join_exprs)
):
return False
return True

def maybe_merge(
self,
right: SquashedSelect,
join_type: join_defs.JoinType,
mappings: Tuple[join_defs.JoinColumnMapping, ...],
) -> Optional[SquashedSelect]:
if self.root != right.root:
return None

join_type = join_def.type

# Mask columns and remap names to expected schema
lselection = self.columns
rselection = right.columns
Expand All @@ -136,7 +143,7 @@ def maybe_join(
lselection = tuple((apply_mask(expr, lmask), id) for expr, id in lselection)
if rmask is not None:
rselection = tuple((apply_mask(expr, rmask), id) for expr, id in rselection)
new_columns = remap_names(join_def, lselection, rselection)
new_columns = remap_names(mappings, lselection, rselection)

# Reconstruct ordering
reverse_root = self.reverse_root
Expand Down Expand Up @@ -201,20 +208,27 @@ def maybe_squash_projection(node: nodes.BigFrameNode) -> nodes.BigFrameNode:
def maybe_rewrite_join(join_node: nodes.JoinNode) -> nodes.BigFrameNode:
left_side = SquashedSelect.from_node(join_node.left_child)
right_side = SquashedSelect.from_node(join_node.right_child)
joined = left_side.maybe_join(right_side, join_node.join)
if joined is not None:
return joined.expand()
if left_side.can_join(right_side, join_node.join):
merged = left_side.maybe_merge(
right_side, join_node.join.type, join_node.join.mappings
)
assert (
merged is not None
), "Couldn't merge nodes. This shouldn't happen. Please share full stacktrace with the BigQuery DataFrames team at bigframes-feedback@google.com."
return merged.expand()
else:
return join_node


def remap_names(
join: join_defs.JoinDefinition, lselection: Selection, rselection: Selection
mappings: Tuple[join_defs.JoinColumnMapping, ...],
lselection: Selection,
rselection: Selection,
) -> Selection:
new_selection: Selection = tuple()
l_exprs_by_id = {id: expr for expr, id in lselection}
r_exprs_by_id = {id: expr for expr, id in rselection}
for mapping in join.mappings:
for mapping in mappings:
if mapping.source_table == join_defs.JoinSide.LEFT:
expr = l_exprs_by_id[mapping.source_id]
else: # Right
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.