From 39bba21d8697ba6f393ec4257f2c850a48be0910 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 6 Nov 2023 23:00:42 +0000 Subject: [PATCH 1/4] feat: add 'cross' join support --- bigframes/core/__init__.py | 1 + bigframes/core/blocks.py | 30 +++++++++++++++++++ bigframes/core/compile/single_column.py | 1 + bigframes/core/joins/merge.py | 1 + bigframes/core/nodes.py | 1 + bigframes/dataframe.py | 26 ++++++++++++++++ bigframes/pandas/__init__.py | 1 + tests/system/small/test_dataframe.py | 29 ++++++++++++++---- tests/system/small/test_pandas.py | 21 +++++++++++++ .../bigframes_vendored/pandas/core/frame.py | 5 ++++ .../pandas/core/reshape/merge.py | 2 ++ 11 files changed, 112 insertions(+), 6 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 4653f0ab6a..2cfe7b9e08 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -413,6 +413,7 @@ def join( "left", "outer", "right", + "cross", ], allow_row_identity_join: bool = True, ): diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 635e7db865..f9dcf80ee4 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -35,6 +35,7 @@ import bigframes.core as core import bigframes.core.guid as guid import bigframes.core.indexes as indexes +import bigframes.core.joins as joining import bigframes.core.joins.name_resolution as join_names import bigframes.core.ordering as ordering import bigframes.core.utils @@ -1531,6 +1532,7 @@ def merge( "left", "outer", "right", + "cross", ], left_join_ids: typing.Sequence[str], right_join_ids: typing.Sequence[str], @@ -1691,6 +1693,34 @@ def resolve_index_level(self, level: LevelsType) -> typing.Sequence[str]: raise ValueError(f"Unexpected level: {level_ref}") return resolved_level_ids + def cross_join( + self, other: Block, left_suffix: str = "", right_suffix: str = "" + ) -> Block: + # Separate api for cross join as it doesn't utilize index. + left_expr = self._expr + right_expr = other._expr + get_column_left, get_column_right = joining.JOIN_NAME_REMAPPER( + left_expr.column_ids, right_expr.column_ids + ) + combined_expr = left_expr.join( + self_column_ids=(), + other=right_expr, + other_column_ids=(), + how="cross", + ) + left_cols = [get_column_left[col] for col in self.value_columns] + right_cols = [get_column_right[col] for col in other.value_columns] + offsets_id = guid.generate_guid() + expr_with_offsets = combined_expr.select_columns( + [*left_cols, *right_cols] + ).promote_offsets(offsets_id) + + return Block( + expr_with_offsets, + index_columns=(offsets_id,), + column_labels=[*self.column_labels, *other.column_labels], + ) + def _is_monotonic( self, column_ids: typing.Union[str, Sequence[str]], increasing: bool ) -> bool: diff --git a/bigframes/core/compile/single_column.py b/bigframes/core/compile/single_column.py index b992aa1d1d..50c4db2ef3 100644 --- a/bigframes/core/compile/single_column.py +++ b/bigframes/core/compile/single_column.py @@ -40,6 +40,7 @@ def join_by_column( "left", "outer", "right", + "cross", ], allow_row_identity_join: bool = True, ) -> compiled.CompiledArrayValue: diff --git a/bigframes/core/joins/merge.py b/bigframes/core/joins/merge.py index fac16b3607..c65e1bdd54 100644 --- a/bigframes/core/joins/merge.py +++ b/bigframes/core/joins/merge.py @@ -32,6 +32,7 @@ def merge( "left", "outer", "right", + "cross", ] = "inner", on: Optional[str] = None, *, diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 7b252b164f..8f1e2e5e73 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -86,6 +86,7 @@ class JoinNode(BigFrameNode): "left", "outer", "right", + "cross", ] allow_row_identity_join: bool = True diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 40f12671ae..baf2f9ff83 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1922,6 +1922,7 @@ def merge( "left", "outer", "right", + "cross", ] = "inner", # TODO(garrettwu): Currently can take inner, outer, left and right. To support # cross joins @@ -1932,6 +1933,19 @@ def merge( sort: bool = False, suffixes: tuple[str, str] = ("_x", "_y"), ) -> DataFrame: + if how == "cross": + if on is not None: + raise ValueError("'on' is not supported for cross join.") + result_block = self._block.merge( + right._block, + left_join_ids=[], + right_join_ids=[], + suffixes=suffixes, + how=how, + sort=True, + ) + return DataFrame(result_block) + if on is None: if left_on is None or right_on is None: raise ValueError("Must specify `on` or `left_on` + `right_on`.") @@ -1985,6 +1999,18 @@ def join( raise NotImplementedError( f"Deduping column names is not implemented. {constants.FEEDBACK_LINK}" ) + if how == "cross": + if on is not None: + raise ValueError("'on' is not supported for cross join.") + result_block = left._block.merge( + right._block, + left_join_ids=[], + right_join_ids=[], + suffixes=("", ""), + how="cross", + sort=True, + ) + return DataFrame(result_block) # Join left columns with right index if on is not None: diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 1c52b103fb..d35f838366 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -332,6 +332,7 @@ def merge( "left", "outer", "right", + "cross", ] = "inner", on: Optional[str] = None, *, diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index bd5930e508..8aa749d1ea 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -886,6 +886,26 @@ def test_df_isin_dict(scalars_dfs): pandas.testing.assert_frame_equal(bf_result, pd_result.astype("boolean")) +def test_df_cross_merge(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + left_columns = ["int64_col", "float64_col", "rowindex_2"] + right_columns = ["int64_col", "bool_col", "string_col", "rowindex_2"] + + left = scalars_df[left_columns] + # Offset the rows somewhat so that outer join can have an effect. + right = scalars_df[right_columns].assign(rowindex_2=scalars_df["rowindex_2"] + 2) + + bf_result = left.merge(right, "cross").to_pandas() + + pd_result = scalars_pandas_df[left_columns].merge( + scalars_pandas_df[right_columns].assign( + rowindex_2=scalars_pandas_df["rowindex_2"] + 2 + ), + "cross", + ) + pd.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False) + + @pytest.mark.parametrize( ("merge_how",), [ @@ -1695,12 +1715,7 @@ def test_series_binop_add_different_table( all_joins = pytest.mark.parametrize( ("how",), - ( - ("outer",), - ("left",), - ("right",), - ("inner",), - ), + (("outer",), ("left",), ("right",), ("inner",), ("cross",)), ) @@ -1740,6 +1755,8 @@ def test_join_duplicate_columns_raises_not_implemented(scalars_dfs): @all_joins def test_join_param_on(scalars_dfs, how): + if how == "cross": + pytest.skip("cross join not supported with 'on' param") bf_df, pd_df = scalars_dfs bf_df_a = bf_df[["string_col", "int64_col", "rowindex_2"]] diff --git a/tests/system/small/test_pandas.py b/tests/system/small/test_pandas.py index 0292ebd206..b88901f3bc 100644 --- a/tests/system/small/test_pandas.py +++ b/tests/system/small/test_pandas.py @@ -289,6 +289,27 @@ def test_merge_left_on_right_on(scalars_dfs, merge_how): assert_pandas_df_equal_ignore_ordering(bf_result, pd_result) +def test_pd_merge_cross(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + left_columns = ["int64_col", "float64_col", "int64_too"] + right_columns = ["int64_col", "bool_col", "string_col", "rowindex_2"] + + left = scalars_df[left_columns] + right = scalars_df[right_columns] + + df = bpd.merge(left, right, "cross", sort=True) + bf_result = df.to_pandas() + + pd_result = pd.merge( + scalars_pandas_df[left_columns], + scalars_pandas_df[right_columns], + "cross", + sort=True, + ) + + pd.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False) + + @pytest.mark.parametrize( ("merge_how",), [ diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 6f4f6be35d..3e4cec284c 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -2194,6 +2194,8 @@ def join(self, other, *, on: Optional[str] = None, how: str) -> DataFrame: and sort it lexicographically. ``inner``: form intersection of calling frame's index (or column if on is specified) with `other`'s index, preserving the order of the calling's one. + ``cross``: creates the cartesian product from both frames, preserves + the order of the left keys. Returns: bigframes.dataframe.DataFrame: A dataframe containing columns from both the caller and `other`. @@ -2208,6 +2210,7 @@ def merge( "left", "outer", "right", + "cross", ] = "inner", on: Optional[str] = None, *, @@ -2243,6 +2246,8 @@ def merge( join; sort keys lexicographically. ``inner``: use intersection of keys from both frames, similar to a SQL inner join; preserve the order of the left keys. + ``cross``: creates the cartesian product from both frames, preserves the order + of the left keys. on (label or list of labels): Columns to join on. It must be found in both DataFrames. Either on or left_on + right_on diff --git a/third_party/bigframes_vendored/pandas/core/reshape/merge.py b/third_party/bigframes_vendored/pandas/core/reshape/merge.py index b03f366fca..704e50f516 100644 --- a/third_party/bigframes_vendored/pandas/core/reshape/merge.py +++ b/third_party/bigframes_vendored/pandas/core/reshape/merge.py @@ -49,6 +49,8 @@ def merge( join; sort keys lexicographically. ``inner``: use intersection of keys from both frames, similar to a SQL inner join; preserve the order of the left keys. + ``cross``: creates the cartesian product from both frames, preserves the order + of the left keys. on (label or list of labels): Columns to join on. It must be found in both DataFrames. Either on or left_on + right_on From 7916802d740b909b92b1330aba26e427959f3d62 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 6 Nov 2023 23:14:13 +0000 Subject: [PATCH 2/4] remove unused cross_merge method --- bigframes/core/blocks.py | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index f9dcf80ee4..16f84ad357 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -35,7 +35,6 @@ import bigframes.core as core import bigframes.core.guid as guid import bigframes.core.indexes as indexes -import bigframes.core.joins as joining import bigframes.core.joins.name_resolution as join_names import bigframes.core.ordering as ordering import bigframes.core.utils @@ -1693,34 +1692,6 @@ def resolve_index_level(self, level: LevelsType) -> typing.Sequence[str]: raise ValueError(f"Unexpected level: {level_ref}") return resolved_level_ids - def cross_join( - self, other: Block, left_suffix: str = "", right_suffix: str = "" - ) -> Block: - # Separate api for cross join as it doesn't utilize index. - left_expr = self._expr - right_expr = other._expr - get_column_left, get_column_right = joining.JOIN_NAME_REMAPPER( - left_expr.column_ids, right_expr.column_ids - ) - combined_expr = left_expr.join( - self_column_ids=(), - other=right_expr, - other_column_ids=(), - how="cross", - ) - left_cols = [get_column_left[col] for col in self.value_columns] - right_cols = [get_column_right[col] for col in other.value_columns] - offsets_id = guid.generate_guid() - expr_with_offsets = combined_expr.select_columns( - [*left_cols, *right_cols] - ).promote_offsets(offsets_id) - - return Block( - expr_with_offsets, - index_columns=(offsets_id,), - column_labels=[*self.column_labels, *other.column_labels], - ) - def _is_monotonic( self, column_ids: typing.Union[str, Sequence[str]], increasing: bool ) -> bool: From 1db41902fa3dd4c6b6eb2e58a0f7b7febfe79cc1 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 7 Nov 2023 00:38:21 +0000 Subject: [PATCH 3/4] fix mypy issues --- bigframes/core/compile/single_column.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/compile/single_column.py b/bigframes/core/compile/single_column.py index 50c4db2ef3..93ba3f16f1 100644 --- a/bigframes/core/compile/single_column.py +++ b/bigframes/core/compile/single_column.py @@ -108,7 +108,7 @@ def join_by_column( left_table, right_table, predicates=join_conditions, - how=how, + how=how, # type: ignore ) # Preserve ordering accross joins. From 44b0273ce59ffa6370a8906008b7a7e247747f5e Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 8 Nov 2023 19:03:48 +0000 Subject: [PATCH 4/4] make test confirm ValueError for cross join with 'on' provided --- tests/system/small/test_dataframe.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 5215dd0c98..605d4abc1d 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -1805,20 +1805,23 @@ def test_join_duplicate_columns_raises_not_implemented(scalars_dfs): @all_joins def test_join_param_on(scalars_dfs, how): - if how == "cross": - pytest.skip("cross join not supported with 'on' param") bf_df, pd_df = scalars_dfs bf_df_a = bf_df[["string_col", "int64_col", "rowindex_2"]] bf_df_a = bf_df_a.assign(rowindex_2=bf_df_a["rowindex_2"] + 2) bf_df_b = bf_df[["float64_col"]] - bf_result = bf_df_a.join(bf_df_b, on="rowindex_2", how=how).to_pandas() - pd_df_a = pd_df[["string_col", "int64_col", "rowindex_2"]] - pd_df_a = pd_df_a.assign(rowindex_2=pd_df_a["rowindex_2"] + 2) - pd_df_b = pd_df[["float64_col"]] - pd_result = pd_df_a.join(pd_df_b, on="rowindex_2", how=how) - assert_pandas_df_equal_ignore_ordering(bf_result, pd_result) + if how == "cross": + with pytest.raises(ValueError): + bf_df_a.join(bf_df_b, on="rowindex_2", how=how) + else: + bf_result = bf_df_a.join(bf_df_b, on="rowindex_2", how=how).to_pandas() + + pd_df_a = pd_df[["string_col", "int64_col", "rowindex_2"]] + pd_df_a = pd_df_a.assign(rowindex_2=pd_df_a["rowindex_2"] + 2) + pd_df_b = pd_df[["float64_col"]] + pd_result = pd_df_a.join(pd_df_b, on="rowindex_2", how=how) + assert_pandas_df_equal_ignore_ordering(bf_result, pd_result) @pytest.mark.parametrize(