diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 779d11b371..e88326795c 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1506,8 +1506,10 @@ def concat( blocks: typing.List[Block] = [self, *other] if ignore_index: blocks = [block.reset_index() for block in blocks] - - result_labels = _align_indices(blocks) + level_names = None + else: + level_names, level_types = _align_indices(blocks) + blocks = [_cast_index(block, level_types) for block in blocks] index_nlevels = blocks[0].index.nlevels @@ -1522,7 +1524,7 @@ def concat( result_expr, index_columns=list(result_expr.column_ids)[:index_nlevels], column_labels=aligned_blocks[0].column_labels, - index_labels=result_labels, + index_labels=level_names, ) if ignore_index: result_block = result_block.reset_index() @@ -1783,16 +1785,40 @@ def block_from_local(data) -> Block: ) +def _cast_index(block: Block, dtypes: typing.Sequence[bigframes.dtypes.Dtype]): + original_block = block + result_ids = [] + for idx_id, idx_dtype, target_dtype in zip( + block.index_columns, block.index_dtypes, dtypes + ): + if idx_dtype != target_dtype: + block, result_id = block.apply_unary_op(idx_id, ops.AsTypeOp(target_dtype)) + result_ids.append(result_id) + else: + result_ids.append(idx_id) + + expr = block.expr.select_columns((*result_ids, *original_block.value_columns)) + return Block( + expr, + index_columns=result_ids, + column_labels=original_block.column_labels, + index_labels=original_block.index_labels, + ) + + def _align_block_to_schema( block: Block, schema: dict[Label, bigframes.dtypes.Dtype] ) -> Block: - """For a given schema, remap block to schema by reordering columns and inserting nulls.""" + """For a given schema, remap block to schema by reordering columns, and inserting nulls.""" col_ids: typing.Tuple[str, ...] = () for label, dtype in schema.items(): - # TODO: Support casting to lcd type - requires mixed type support matching_ids: typing.Sequence[str] = block.label_to_col_id.get(label, ()) if len(matching_ids) > 0: col_id = matching_ids[-1] + col_dtype = block.expr.get_column_type(col_id) + if dtype != col_dtype: + # If _align_schema worked properly, this should always be an upcast + block, col_id = block.apply_unary_op(col_id, ops.AsTypeOp(dtype)) col_ids = (*col_ids, col_id) else: block, null_column = block.create_constant(None, dtype=dtype) @@ -1810,24 +1836,28 @@ def _align_schema( return functools.reduce(reduction, schemas) -def _align_indices(blocks: typing.Sequence[Block]) -> typing.Sequence[Label]: - """Validates that the blocks have compatible indices and returns the resulting label names.""" +def _align_indices( + blocks: typing.Sequence[Block], +) -> typing.Tuple[typing.Sequence[Label], typing.Sequence[bigframes.dtypes.Dtype]]: + """Validates that the blocks have compatible indices and returns the resulting label names and dtypes.""" names = blocks[0].index.names types = blocks[0].index.dtypes + for block in blocks[1:]: if len(names) != block.index.nlevels: raise NotImplementedError( f"Cannot combine indices with different number of levels. Use 'ignore_index'=True. {constants.FEEDBACK_LINK}" ) - if block.index.dtypes != types: - raise NotImplementedError( - f"Cannot combine different index dtypes. Use 'ignore_index'=True. {constants.FEEDBACK_LINK}" - ) names = [ lname if lname == rname else None for lname, rname in zip(names, block.index.names) ] - return names + types = [ + bigframes.dtypes.lcd_type_or_throw(ltype, rtype) + for ltype, rtype in zip(types, block.index.dtypes) + ] + types = typing.cast(typing.Sequence[bigframes.dtypes.Dtype], types) + return names, types def _combine_schema_inner( @@ -1835,13 +1865,15 @@ def _combine_schema_inner( right: typing.Dict[Label, bigframes.dtypes.Dtype], ) -> typing.Dict[Label, bigframes.dtypes.Dtype]: result = dict() - for label, type in left.items(): + for label, left_type in left.items(): if label in right: - if type != right[label]: + right_type = right[label] + output_type = bigframes.dtypes.lcd_type(left_type, right_type) + if output_type is None: raise ValueError( f"Cannot concat rows with label {label} due to mismatched types. {constants.FEEDBACK_LINK}" ) - result[label] = type + result[label] = output_type return result @@ -1850,15 +1882,20 @@ def _combine_schema_outer( right: typing.Dict[Label, bigframes.dtypes.Dtype], ) -> typing.Dict[Label, bigframes.dtypes.Dtype]: result = dict() - for label, type in left.items(): - if (label in right) and (type != right[label]): - raise ValueError( - f"Cannot concat rows with label {label} due to mismatched types. {constants.FEEDBACK_LINK}" - ) - result[label] = type - for label, type in right.items(): + for label, left_type in left.items(): + if label not in right: + result[label] = left_type + else: + right_type = right[label] + output_type = bigframes.dtypes.lcd_type(left_type, right_type) + if output_type is None: + raise NotImplementedError( + f"Cannot concat rows with label {label} due to mismatched types. {constants.FEEDBACK_LINK}" + ) + result[label] = output_type + for label, right_type in right.items(): if label not in left: - result[label] = type + result[label] = right_type return result diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index b754acea2e..608885dec4 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -40,6 +40,7 @@ pd.Int64Dtype, pd.StringDtype, pd.ArrowDtype, + gpd.array.GeometryDtype, ] # On BQ side, ARRAY, STRUCT, GEOGRAPHY, JSON are not orderable @@ -139,7 +140,7 @@ ARROW_TO_IBIS = {arrow: ibis for ibis, arrow in IBIS_TO_ARROW.items()} -IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, Union[Dtype, np.dtype[Any]]] = { +IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, Dtype] = { ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS } # Allow REQUIRED fields to map correctly. @@ -179,7 +180,7 @@ def ibis_dtype_to_bigframes_dtype( ibis_dtype: ibis_dtypes.DataType, -) -> Union[Dtype, np.dtype[Any]]: +) -> Dtype: """Converts an Ibis dtype to a BigQuery DataFrames dtype Args: @@ -340,6 +341,11 @@ def literal_to_ibis_scalar( ValueError: if passed literal cannot be coerced to a BigQuery DataFrames compatible scalar """ + # Special case: Can create nulls for non-bidirectional types + if (force_dtype == gpd.array.GeometryDtype()) and pd.isna(literal): + # Ibis has bug for casting nulltype to geospatial, so we perform intermediate cast first + geotype = ibis_dtypes.GeoSpatial(geotype="geography", srid=4326, nullable=True) + return ibis.literal(None, geotype) ibis_dtype = BIGFRAMES_TO_IBIS[force_dtype] if force_dtype else None if pd.api.types.is_list_like(literal): @@ -538,6 +544,8 @@ def is_compatible(scalar: typing.Any, dtype: Dtype) -> typing.Optional[Dtype]: def lcd_type(dtype1: Dtype, dtype2: Dtype) -> typing.Optional[Dtype]: + if dtype1 == dtype2: + return dtype1 # Implicit conversion currently only supported for numeric types hierarchy: list[Dtype] = [ pd.BooleanDtype(), @@ -550,3 +558,12 @@ def lcd_type(dtype1: Dtype, dtype2: Dtype) -> typing.Optional[Dtype]: return None lcd_index = max(hierarchy.index(dtype1), hierarchy.index(dtype2)) return hierarchy[lcd_index] + + +def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype: + result = lcd_type(dtype1, dtype2) + if result is None: + raise NotImplementedError( + f"BigFrames cannot upcast {dtype1} and {dtype2} to common type. {constants.FEEDBACK_LINK}" + ) + return result diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 8178ebfaea..452abf047c 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -396,6 +396,11 @@ def _as_ibis(self, column: ibis_types.Column, window=None) -> ibis_types.Value: ) +class LastOp(WindowOp): + def _as_ibis(self, column: ibis_types.Column, window=None) -> ibis_types.Value: + return _apply_window_if_present(column.last(), window) + + class LastNonNullOp(WindowOp): @property def skips_nulls(self): diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index cb2e4f94fa..fa3d5148a8 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -3141,9 +3141,9 @@ def test_df___array__(scalars_df_index, scalars_pandas_df_index): def test_getattr_attribute_error_when_pandas_has(scalars_df_index): - # asof is implemented in pandas but not in bigframes + # swapaxes is implemented in pandas but not in bigframes with pytest.raises(AttributeError): - scalars_df_index.asof() + scalars_df_index.swapaxes() def test_getattr_attribute_error(scalars_df_index): diff --git a/tests/system/small/test_pandas.py b/tests/system/small/test_pandas.py index 282c0d68eb..a79ddb64cd 100644 --- a/tests/system/small/test_pandas.py +++ b/tests/system/small/test_pandas.py @@ -185,6 +185,38 @@ def test_concat_dataframe_mismatched_columns(scalars_dfs, how): pd.testing.assert_frame_equal(bf_result, pd_result) +def test_concat_dataframe_upcasting(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + bf_input1 = scalars_df[["int64_col", "float64_col", "int64_too"]].set_index( + "int64_col", drop=True + ) + bf_input1.columns = ["a", "b"] + bf_input2 = scalars_df[["int64_too", "int64_col", "float64_col"]].set_index( + "float64_col", drop=True + ) + bf_input2.columns = ["a", "b"] + bf_result = bpd.concat([bf_input1, bf_input2], join="outer") + bf_result = bf_result.to_pandas() + + bf_input1 = ( + scalars_pandas_df[["int64_col", "float64_col", "int64_too"]] + .set_index("int64_col", drop=True) + .set_axis(["a", "b"], axis=1) + ) + bf_input2 = ( + scalars_pandas_df[["int64_too", "int64_col", "float64_col"]] + .set_index("float64_col", drop=True) + .set_axis(["a", "b"], axis=1) + ) + pd_result = pd.concat( + [bf_input1, bf_input2], + join="outer", + ) + + pd.testing.assert_frame_equal(bf_result, pd_result) + + @pytest.mark.parametrize( ("how",), [