Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: support upcasting numeric columns in concat #294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 60 additions & 23 deletions 83 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1506,8 +1506,10 @@ def concat(
blocks: typing.List[Block] = [self, *other]
if ignore_index:
blocks = [block.reset_index() for block in blocks]

result_labels = _align_indices(blocks)
level_names = None
else:
level_names, level_types = _align_indices(blocks)
blocks = [_cast_index(block, level_types) for block in blocks]

index_nlevels = blocks[0].index.nlevels

Expand All @@ -1522,7 +1524,7 @@ def concat(
result_expr,
index_columns=list(result_expr.column_ids)[:index_nlevels],
column_labels=aligned_blocks[0].column_labels,
index_labels=result_labels,
index_labels=level_names,
)
if ignore_index:
result_block = result_block.reset_index()
Expand Down Expand Up @@ -1783,16 +1785,40 @@ def block_from_local(data) -> Block:
)


def _cast_index(block: Block, dtypes: typing.Sequence[bigframes.dtypes.Dtype]):
original_block = block
result_ids = []
for idx_id, idx_dtype, target_dtype in zip(
block.index_columns, block.index_dtypes, dtypes
):
if idx_dtype != target_dtype:
block, result_id = block.apply_unary_op(idx_id, ops.AsTypeOp(target_dtype))
result_ids.append(result_id)
else:
result_ids.append(idx_id)

expr = block.expr.select_columns((*result_ids, *original_block.value_columns))
return Block(
expr,
index_columns=result_ids,
column_labels=original_block.column_labels,
index_labels=original_block.index_labels,
)


def _align_block_to_schema(
block: Block, schema: dict[Label, bigframes.dtypes.Dtype]
) -> Block:
"""For a given schema, remap block to schema by reordering columns and inserting nulls."""
"""For a given schema, remap block to schema by reordering columns, and inserting nulls."""
col_ids: typing.Tuple[str, ...] = ()
for label, dtype in schema.items():
# TODO: Support casting to lcd type - requires mixed type support
matching_ids: typing.Sequence[str] = block.label_to_col_id.get(label, ())
if len(matching_ids) > 0:
col_id = matching_ids[-1]
col_dtype = block.expr.get_column_type(col_id)
if dtype != col_dtype:
# If _align_schema worked properly, this should always be an upcast
block, col_id = block.apply_unary_op(col_id, ops.AsTypeOp(dtype))
col_ids = (*col_ids, col_id)
else:
block, null_column = block.create_constant(None, dtype=dtype)
Expand All @@ -1810,38 +1836,44 @@ def _align_schema(
return functools.reduce(reduction, schemas)


def _align_indices(blocks: typing.Sequence[Block]) -> typing.Sequence[Label]:
"""Validates that the blocks have compatible indices and returns the resulting label names."""
def _align_indices(
blocks: typing.Sequence[Block],
) -> typing.Tuple[typing.Sequence[Label], typing.Sequence[bigframes.dtypes.Dtype]]:
"""Validates that the blocks have compatible indices and returns the resulting label names and dtypes."""
names = blocks[0].index.names
types = blocks[0].index.dtypes

for block in blocks[1:]:
if len(names) != block.index.nlevels:
raise NotImplementedError(
f"Cannot combine indices with different number of levels. Use 'ignore_index'=True. {constants.FEEDBACK_LINK}"
)
if block.index.dtypes != types:
raise NotImplementedError(
f"Cannot combine different index dtypes. Use 'ignore_index'=True. {constants.FEEDBACK_LINK}"
)
names = [
lname if lname == rname else None
for lname, rname in zip(names, block.index.names)
]
return names
types = [
bigframes.dtypes.lcd_type_or_throw(ltype, rtype)
for ltype, rtype in zip(types, block.index.dtypes)
]
types = typing.cast(typing.Sequence[bigframes.dtypes.Dtype], types)
return names, types


def _combine_schema_inner(
left: typing.Dict[Label, bigframes.dtypes.Dtype],
right: typing.Dict[Label, bigframes.dtypes.Dtype],
) -> typing.Dict[Label, bigframes.dtypes.Dtype]:
result = dict()
for label, type in left.items():
for label, left_type in left.items():
if label in right:
if type != right[label]:
right_type = right[label]
output_type = bigframes.dtypes.lcd_type(left_type, right_type)
if output_type is None:
raise ValueError(
f"Cannot concat rows with label {label} due to mismatched types. {constants.FEEDBACK_LINK}"
)
result[label] = type
result[label] = output_type
return result


Expand All @@ -1850,15 +1882,20 @@ def _combine_schema_outer(
right: typing.Dict[Label, bigframes.dtypes.Dtype],
) -> typing.Dict[Label, bigframes.dtypes.Dtype]:
result = dict()
for label, type in left.items():
if (label in right) and (type != right[label]):
raise ValueError(
f"Cannot concat rows with label {label} due to mismatched types. {constants.FEEDBACK_LINK}"
)
result[label] = type
for label, type in right.items():
for label, left_type in left.items():
if label not in right:
result[label] = left_type
else:
right_type = right[label]
output_type = bigframes.dtypes.lcd_type(left_type, right_type)
if output_type is None:
raise NotImplementedError(
f"Cannot concat rows with label {label} due to mismatched types. {constants.FEEDBACK_LINK}"
)
result[label] = output_type
for label, right_type in right.items():
if label not in left:
result[label] = type
result[label] = right_type
return result


Expand Down
21 changes: 19 additions & 2 deletions 21 bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
pd.Int64Dtype,
pd.StringDtype,
pd.ArrowDtype,
gpd.array.GeometryDtype,
]

# On BQ side, ARRAY, STRUCT, GEOGRAPHY, JSON are not orderable
Expand Down Expand Up @@ -139,7 +140,7 @@

ARROW_TO_IBIS = {arrow: ibis for ibis, arrow in IBIS_TO_ARROW.items()}

IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, Union[Dtype, np.dtype[Any]]] = {
IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, Dtype] = {
ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS
}
# Allow REQUIRED fields to map correctly.
Expand Down Expand Up @@ -179,7 +180,7 @@

def ibis_dtype_to_bigframes_dtype(
ibis_dtype: ibis_dtypes.DataType,
) -> Union[Dtype, np.dtype[Any]]:
) -> Dtype:
"""Converts an Ibis dtype to a BigQuery DataFrames dtype

Args:
Expand Down Expand Up @@ -340,6 +341,11 @@ def literal_to_ibis_scalar(
ValueError: if passed literal cannot be coerced to a
BigQuery DataFrames compatible scalar
"""
# Special case: Can create nulls for non-bidirectional types
if (force_dtype == gpd.array.GeometryDtype()) and pd.isna(literal):
# Ibis has bug for casting nulltype to geospatial, so we perform intermediate cast first
geotype = ibis_dtypes.GeoSpatial(geotype="geography", srid=4326, nullable=True)
return ibis.literal(None, geotype)
ibis_dtype = BIGFRAMES_TO_IBIS[force_dtype] if force_dtype else None

if pd.api.types.is_list_like(literal):
Expand Down Expand Up @@ -538,6 +544,8 @@ def is_compatible(scalar: typing.Any, dtype: Dtype) -> typing.Optional[Dtype]:


def lcd_type(dtype1: Dtype, dtype2: Dtype) -> typing.Optional[Dtype]:
if dtype1 == dtype2:
return dtype1
# Implicit conversion currently only supported for numeric types
hierarchy: list[Dtype] = [
pd.BooleanDtype(),
Expand All @@ -550,3 +558,12 @@ def lcd_type(dtype1: Dtype, dtype2: Dtype) -> typing.Optional[Dtype]:
return None
lcd_index = max(hierarchy.index(dtype1), hierarchy.index(dtype2))
return hierarchy[lcd_index]


def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype:
result = lcd_type(dtype1, dtype2)
if result is None:
raise NotImplementedError(
f"BigFrames cannot upcast {dtype1} and {dtype2} to common type. {constants.FEEDBACK_LINK}"
)
return result
5 changes: 5 additions & 0 deletions 5 bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,11 @@ def _as_ibis(self, column: ibis_types.Column, window=None) -> ibis_types.Value:
)


class LastOp(WindowOp):
def _as_ibis(self, column: ibis_types.Column, window=None) -> ibis_types.Value:
return _apply_window_if_present(column.last(), window)


class LastNonNullOp(WindowOp):
@property
def skips_nulls(self):
Expand Down
4 changes: 2 additions & 2 deletions 4 tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3141,9 +3141,9 @@ def test_df___array__(scalars_df_index, scalars_pandas_df_index):


def test_getattr_attribute_error_when_pandas_has(scalars_df_index):
# asof is implemented in pandas but not in bigframes
# swapaxes is implemented in pandas but not in bigframes
with pytest.raises(AttributeError):
scalars_df_index.asof()
scalars_df_index.swapaxes()


def test_getattr_attribute_error(scalars_df_index):
Expand Down
32 changes: 32 additions & 0 deletions 32 tests/system/small/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,38 @@ def test_concat_dataframe_mismatched_columns(scalars_dfs, how):
pd.testing.assert_frame_equal(bf_result, pd_result)


def test_concat_dataframe_upcasting(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs

bf_input1 = scalars_df[["int64_col", "float64_col", "int64_too"]].set_index(
"int64_col", drop=True
)
bf_input1.columns = ["a", "b"]
bf_input2 = scalars_df[["int64_too", "int64_col", "float64_col"]].set_index(
"float64_col", drop=True
)
bf_input2.columns = ["a", "b"]
bf_result = bpd.concat([bf_input1, bf_input2], join="outer")
bf_result = bf_result.to_pandas()

bf_input1 = (
scalars_pandas_df[["int64_col", "float64_col", "int64_too"]]
.set_index("int64_col", drop=True)
.set_axis(["a", "b"], axis=1)
)
bf_input2 = (
scalars_pandas_df[["int64_too", "int64_col", "float64_col"]]
.set_index("float64_col", drop=True)
.set_axis(["a", "b"], axis=1)
)
pd_result = pd.concat(
[bf_input1, bf_input2],
join="outer",
)

pd.testing.assert_frame_equal(bf_result, pd_result)


@pytest.mark.parametrize(
("how",),
[
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.