Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

refactor: Create class for column ids and column refs #1022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 30 additions & 35 deletions 65 bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import bigframes.core.compile
import bigframes.core.expression as ex
import bigframes.core.guid
import bigframes.core.identifiers as ids
import bigframes.core.join_def as join_def
import bigframes.core.local_data as local_data
import bigframes.core.nodes as nodes
Expand Down Expand Up @@ -169,7 +170,7 @@ def row_count(self) -> ArrayValue:
# Operations
def filter_by_id(self, predicate_id: str, keep_null: bool = False) -> ArrayValue:
"""Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression."""
predicate: ex.Expression = ex.free_var(predicate_id)
predicate: ex.Expression = ex.deref(predicate_id)
if keep_null:
predicate = ops.fillna_op.as_expr(predicate, ex.const(True))
return self.filter(predicate)
Expand Down Expand Up @@ -200,7 +201,9 @@ def promote_offsets(self) -> Tuple[ArrayValue, str]:
)

return (
ArrayValue(nodes.PromoteOffsetsNode(child=self.node, col_id=col_id)),
ArrayValue(
nodes.PromoteOffsetsNode(child=self.node, col_id=ids.ColumnId(col_id))
),
col_id,
)

Expand All @@ -212,7 +215,9 @@ def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:

def compute_values(self, assignments: Sequence[ex.Expression]):
col_ids = self._gen_namespaced_uids(len(assignments))
ex_id_pairs = tuple((ex, id) for ex, id in zip(assignments, col_ids))
ex_id_pairs = tuple(
(ex, ids.ColumnId(id)) for ex, id in zip(assignments, col_ids)
)
return (
ArrayValue(nodes.ProjectionNode(child=self.node, assignments=ex_id_pairs)),
col_ids,
Expand All @@ -228,14 +233,19 @@ def assign(self, source_id: str, destination_id: str) -> ArrayValue:
if destination_id in self.column_ids: # Mutate case
exprs = [
(
(source_id if (col_id == destination_id) else col_id),
col_id,
ex.deref(source_id if (col_id == destination_id) else col_id),
ids.ColumnId(col_id),
)
for col_id in self.column_ids
]
else: # append case
self_projection = ((col_id, col_id) for col_id in self.column_ids)
exprs = [*self_projection, (source_id, destination_id)]
self_projection = (
(ex.deref(col_id), ids.ColumnId(col_id)) for col_id in self.column_ids
)
exprs = [
*self_projection,
(ex.deref(source_id), ids.ColumnId(destination_id)),
]
return ArrayValue(
nodes.SelectionNode(
child=self.node,
Expand All @@ -248,24 +258,15 @@ def create_constant(
value: typing.Any,
dtype: typing.Optional[bigframes.dtypes.Dtype],
) -> Tuple[ArrayValue, str]:
destination_id = self._gen_namespaced_uid()
if pandas.isna(value):
# Need to assign a data type when value is NaN.
dtype = dtype or bigframes.dtypes.DEFAULT_DTYPE

return (
ArrayValue(
nodes.ProjectionNode(
child=self.node,
assignments=((ex.const(value, dtype), destination_id),),
)
),
destination_id,
)
return self.project_to_id(ex.const(value, dtype))

def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
# This basically just drops and reorders columns - logically a no-op except as a final step
selections = ((col_id, col_id) for col_id in column_ids)
selections = ((ex.deref(col_id), ids.ColumnId(col_id)) for col_id in column_ids)
return ArrayValue(
nodes.SelectionNode(
child=self.node,
Expand All @@ -274,14 +275,8 @@ def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
)

def drop_columns(self, columns: Iterable[str]) -> ArrayValue:
new_projection = (
(col_id, col_id) for col_id in self.column_ids if col_id not in columns
)
return ArrayValue(
nodes.SelectionNode(
child=self.node,
input_output_pairs=tuple(new_projection),
)
return self.select_columns(
[col_id for col_id in self.column_ids if col_id not in columns]
)

def aggregate(
Expand All @@ -297,11 +292,12 @@ def aggregate(
by_column_id: column id of the aggregation key, this is preserved through the transform
dropna: whether null keys should be dropped
"""
agg_defs = tuple((agg, ids.ColumnId(name)) for agg, name in aggregations)
return ArrayValue(
nodes.AggregateNode(
child=self.node,
aggregations=tuple(aggregations),
by_column_ids=tuple(by_column_ids),
aggregations=agg_defs,
by_column_ids=tuple(map(ex.deref, by_column_ids)),
dropna=dropna,
)
)
Expand Down Expand Up @@ -342,10 +338,10 @@ def project_window_op(
ArrayValue(
nodes.WindowOpNode(
child=self.node,
column_name=column_name,
column_name=ex.deref(column_name),
op=op,
window_spec=window_spec,
output_name=output_name,
output_name=ids.ColumnId(output_name),
never_skip_nulls=never_skip_nulls,
skip_reproject_unsafe=skip_reproject_unsafe,
)
Expand Down Expand Up @@ -376,7 +372,9 @@ def relational_join(
join_node = nodes.JoinNode(
left_child=self.node,
right_child=other.node,
conditions=conditions,
conditions=tuple(
(ex.deref(l_col), ex.deref(r_col)) for l_col, r_col in conditions
),
type=type,
)
# Maps input ids to output ids for caller convenience
Expand Down Expand Up @@ -414,7 +412,7 @@ def explode(self, column_ids: typing.Sequence[str]) -> ArrayValue:
for column_id in column_ids:
assert bigframes.dtypes.is_array_like(self.get_column_type(column_id))

offsets = tuple(self.get_offset_for_name(id) for id in column_ids)
offsets = tuple(ex.deref(id) for id in column_ids)
return ArrayValue(nodes.ExplodeNode(child=self.node, column_ids=offsets))

def _uniform_sampling(self, fraction: float) -> ArrayValue:
Expand All @@ -425,9 +423,6 @@ def _uniform_sampling(self, fraction: float) -> ArrayValue:
"""
return ArrayValue(nodes.RandomSampleNode(self.node, fraction))

def get_offset_for_name(self, name: str):
return self.schema.names.index(name)

# Deterministically generate namespaced ids for new variables
# These new ids are only unique within the current namespace.
# Many operations, such as joins, create new namespaces. See: BigFrameNode.defines_namespace
Expand Down
14 changes: 7 additions & 7 deletions 14 bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def _interpolate_column(
) -> typing.Tuple[blocks.Block, str]:
if interpolate_method not in ["linear", "nearest", "ffill"]:
raise ValueError("interpolate method not supported")
window_ordering = (ordering.OrderingExpression(ex.free_var(x_values)),)
window_ordering = (ordering.OrderingExpression(ex.deref(x_values)),)
backwards_window = windows.rows(following=0, ordering=window_ordering)
forwards_window = windows.rows(preceding=0, ordering=window_ordering)

Expand Down Expand Up @@ -373,7 +373,7 @@ def value_counts(
block = block.order_by(
[
ordering.OrderingExpression(
ex.free_var(count_id),
ex.deref(count_id),
direction=ordering.OrderingDirection.ASC
if ascending
else ordering.OrderingDirection.DESC,
Expand Down Expand Up @@ -430,7 +430,7 @@ def rank(
nullity_col_ids.append(nullity_col_id)
window_ordering = (
ordering.OrderingExpression(
ex.free_var(col),
ex.deref(col),
ordering.OrderingDirection.ASC
if ascending
else ordering.OrderingDirection.DESC,
Expand Down Expand Up @@ -522,7 +522,7 @@ def nsmallest(
block = block.reversed()
order_refs = [
ordering.OrderingExpression(
ex.free_var(col_id), direction=ordering.OrderingDirection.ASC
ex.deref(col_id), direction=ordering.OrderingDirection.ASC
)
for col_id in column_ids
]
Expand Down Expand Up @@ -552,7 +552,7 @@ def nlargest(
block = block.reversed()
order_refs = [
ordering.OrderingExpression(
ex.free_var(col_id), direction=ordering.OrderingDirection.DESC
ex.deref(col_id), direction=ordering.OrderingDirection.DESC
)
for col_id in column_ids
]
Expand Down Expand Up @@ -849,9 +849,9 @@ def _idx_extrema(
)
# Have to find the min for each
order_refs = [
ordering.OrderingExpression(ex.free_var(value_col), direction),
ordering.OrderingExpression(ex.deref(value_col), direction),
*[
ordering.OrderingExpression(ex.free_var(idx_col))
ordering.OrderingExpression(ex.deref(idx_col))
for idx_col in original_block.index_columns
],
]
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.