From bf1d89f224ff86d9b3e134c3e2f2a96079ecb488 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 19 Sep 2023 01:23:00 +0000 Subject: [PATCH 1/8] feat: Implement DataFrame.dot for matrix multiplication Change-Id: I8baa8c8ee0aeef587be25519a9df549e6db70067 --- bigframes/dataframe.py | 61 ++++++++++++++++++++++ tests/data/matrix_2by3.json | 22 ++++++++ tests/data/matrix_2by3.jsonl | 2 + tests/data/matrix_3by4.json | 27 ++++++++++ tests/data/matrix_3by4.jsonl | 3 ++ tests/system/conftest.py | 68 ++++++++++++++++++++++++ tests/system/small/test_dataframe.py | 77 ++++++++++++++++++++++++++++ 7 files changed, 260 insertions(+) create mode 100644 tests/data/matrix_2by3.json create mode 100644 tests/data/matrix_2by3.jsonl create mode 100644 tests/data/matrix_3by4.json create mode 100644 tests/data/matrix_3by4.jsonl diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index e4e22e0306..e21b5579dc 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2500,3 +2500,64 @@ def _get_block(self) -> blocks.Block: def _cached(self) -> DataFrame: return DataFrame(self._block.cached()) + + def dot(self, other: DataFrame) -> DataFrame: + if not isinstance(other, DataFrame): + raise NotImplementedError("Only DataFrame operand is supported") + + # if not self.columns.equals(other.index): + # raise ValueError("matrices are not aligned") + + # Convert the dataframes into cell-value-decomposed representation, i.e. + # each cell value is present in a separate row + row_id = "row" + col_id = "col" + val_id = "val" + left_suffix = "_left" + right_suffix = "_right" + cvd_columns = [row_id, col_id, val_id] + + def get_left_id(id): + return f"{id}{left_suffix}" + + def get_right_id(id): + return f"{id}{right_suffix}" + + left = self.stack().reset_index() + left.columns = cvd_columns + + right = other.stack().reset_index() + right.columns = cvd_columns + + merged = left.merge( + right, + left_on=col_id, + right_on=row_id, + suffixes=(left_suffix, right_suffix), + ) + + left_row_id = get_left_id(row_id) + right_col_id = get_right_id(col_id) + + aggregated = ( + merged.assign( + val=merged[get_left_id(val_id)] * merged[get_right_id(val_id)] + )[[left_row_id, right_col_id, val_id]] + .groupby([left_row_id, right_col_id]) + .sum(numeric_only=True) + ) + aggregated_noindex = aggregated.reset_index() + aggregated_noindex.columns = cvd_columns + result = aggregated_noindex.pivot(columns=col_id, index=row_id) + + # Set the index names to match the left side matrix + result.index.names = self.index.names + + # Pivot has the result columns ordered alphabetically. It should still + # match the columns in the right sided matrix. Let's reorder them as per + # the right side matrix + if not result.columns.difference(other.columns).empty: + raise RuntimeError("Could not construct all columns") + result = result[other.columns] + + return result diff --git a/tests/data/matrix_2by3.json b/tests/data/matrix_2by3.json new file mode 100644 index 0000000000..a0cf0c14da --- /dev/null +++ b/tests/data/matrix_2by3.json @@ -0,0 +1,22 @@ +[ + { + "mode": "REQUIRED", + "name": "rowindex", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "a", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "b", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "c", + "type": "INTEGER" + } + ] diff --git a/tests/data/matrix_2by3.jsonl b/tests/data/matrix_2by3.jsonl new file mode 100644 index 0000000000..c2c96a5423 --- /dev/null +++ b/tests/data/matrix_2by3.jsonl @@ -0,0 +1,2 @@ +{"rowindex": 0, "a": 1, "b": 2, "c": 3} +{"rowindex": 1, "a": 2, "b": 5, "c": 7} diff --git a/tests/data/matrix_3by4.json b/tests/data/matrix_3by4.json new file mode 100644 index 0000000000..6b272ee600 --- /dev/null +++ b/tests/data/matrix_3by4.json @@ -0,0 +1,27 @@ +[ + { + "mode": "REQUIRED", + "name": "rowindex", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "w", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "x", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "y", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "z", + "type": "INTEGER" + } + ] diff --git a/tests/data/matrix_3by4.jsonl b/tests/data/matrix_3by4.jsonl new file mode 100644 index 0000000000..5a081dafc3 --- /dev/null +++ b/tests/data/matrix_3by4.jsonl @@ -0,0 +1,3 @@ +{"rowindex": "a", "w": 2, "x": 4, "y": 8, "z": 21} +{"rowindex": "b", "w": 1, "x": 5, "y": 10, "z": -11} +{"rowindex": "c", "w": 3, "x": 6, "y": 9, "z": 0} diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 3153bd1559..3be2876f76 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -221,6 +221,8 @@ def load_test_data_tables( ("penguins", "penguins_schema.json", "penguins.jsonl"), ("time_series", "time_series_schema.json", "time_series.jsonl"), ("hockey_players", "hockey_players.json", "hockey_players.jsonl"), + ("matrix_2by3", "matrix_2by3.json", "matrix_2by3.jsonl"), + ("matrix_3by4", "matrix_3by4.json", "matrix_3by4.jsonl"), ]: test_data_hash = hashlib.md5() _hash_digest_file(test_data_hash, DATA_DIR / schema_filename) @@ -290,6 +292,16 @@ def time_series_table_id(test_data_tables) -> str: return test_data_tables["time_series"] +@pytest.fixture(scope="session") +def matrix_2by3_table_id(test_data_tables) -> str: + return test_data_tables["matrix_2by3"] + + +@pytest.fixture(scope="session") +def matrix_3by4_table_id(test_data_tables) -> str: + return test_data_tables["matrix_3by4"] + + @pytest.fixture(scope="session") def scalars_df_default_index( scalars_df_index: bigframes.dataframe.DataFrame, @@ -397,6 +409,62 @@ def hockey_pandas_df() -> pd.DataFrame: return df +@pytest.fixture(scope="session") +def matrix_2by3_df( + matrix_2by3_table_id: str, session: bigframes.Session +) -> bigframes.dataframe.DataFrame: + """DataFrame pointing at a test 2-by-3 matrix data.""" + df = session.read_gbq(matrix_2by3_table_id) + df = df.set_index("rowindex").sort_index() + return df + + +@pytest.fixture(scope="session") +def matrix_2by3_pandas_df() -> pd.DataFrame: + """pd.DataFrame pointing at a test 2-by-3 matrix data.""" + df = pd.read_json( + DATA_DIR / "matrix_2by3.jsonl", + lines=True, + dtype={ + "rowindex": pd.Int64Dtype(), + "a": pd.Int64Dtype(), + "b": pd.Int64Dtype(), + "c": pd.Int64Dtype(), + }, + ) + df = df.set_index("rowindex").sort_index() + df.index = df.index.astype("Int64") + return df + + +@pytest.fixture(scope="session") +def matrix_3by4_df( + matrix_3by4_table_id: str, session: bigframes.Session +) -> bigframes.dataframe.DataFrame: + """DataFrame pointing at a test 3-by-4 matrix data.""" + df = session.read_gbq(matrix_3by4_table_id) + df = df.set_index("rowindex").sort_index() + return df + + +@pytest.fixture(scope="session") +def matrix_3by4_pandas_df() -> pd.DataFrame: + """pd.DataFrame pointing at a test 3-by-4 matrix data.""" + df = pd.read_json( + DATA_DIR / "matrix_3by4.jsonl", + lines=True, + dtype={ + "rowindex": pd.StringDtype(storage="pyarrow"), + "w": pd.Int64Dtype(), + "x": pd.Int64Dtype(), + "y": pd.Int64Dtype(), + "z": pd.Int64Dtype(), + }, + ) + df = df.set_index("rowindex").sort_index() + return df + + @pytest.fixture(scope="session") def penguins_df_default_index( penguins_table_id: str, session: bigframes.Session diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index adf17848ee..2d8edc9ea2 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2755,3 +2755,80 @@ def test_df_cached(scalars_df_index): df_cached_copy = df._cached() pandas.testing.assert_frame_equal(df.to_pandas(), df_cached_copy.to_pandas()) + + +def test_df_dot_inline(session): + df1 = pd.DataFrame([[1, 2, 3], [2, 5, 7]]) + df2 = pd.DataFrame([[2, 4, 8], [1, 5, 10], [3, 6, 9]]) + + bf1 = session.read_pandas(df1) + bf2 = session.read_pandas(df2) + bf_result = bf1.dot(bf2).to_pandas() + pd_result = df1.dot(df2) + + # Patch pandas dtypes for testing parity + # Pandas uses int64 instead of Int64 (nullable) dtype. + for name in pd_result.columns: + pd_result[name] = pd_result[name].astype(pd.Int64Dtype()) + pd_result.index = pd_result.index.astype(pd.Int64Dtype()) + + pd.testing.assert_frame_equal( + bf_result, + pd_result, + ) + + +def test_df_dot( + matrix_2by3_df, matrix_2by3_pandas_df, matrix_3by4_df, matrix_3by4_pandas_df +): + bf_result = matrix_2by3_df.dot(matrix_3by4_df).to_pandas() + pd_result = matrix_2by3_pandas_df.dot(matrix_3by4_pandas_df) + + # Patch pandas dtypes for testing parity + # Pandas result is object instead of Int64 (nullable) dtype. + for name in pd_result.columns: + pd_result[name] = pd_result[name].astype(pd.Int64Dtype()) + + pd.testing.assert_frame_equal( + bf_result, + pd_result, + ) + + +def test_df_dot_bq_table_data_small(session): + square_bq_table = "bigframes-dev.zzz_shobs_us.matrix_int_100_by_100" + + # In the first variant, convert BigFrames dataframe to Pandas dataframe + # and then test parity + df1 = session.read_gbq(square_bq_table) + df2 = session.read_gbq(square_bq_table) + df1.columns = df2.index + + bf_result = df1.dot(df2).to_pandas() + pd_result = df1.to_pandas().dot(df2.to_pandas()) + + # Ignore dtype parity, just compare the values + # TODO(shobs): See if we can achieve data type parity + pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) + + # In the second variant, read Pandas dataframe directly and then test parity + df1 = pd.read_gbq(square_bq_table) + df2 = pd.read_gbq(square_bq_table) + df1.columns = df2.index + + pd_result = df1.dot(df2) + + # Ignore data type and index parity, just compare the values + # TODO(shobs): See if we can achieve data type and index parity + pd.testing.assert_frame_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + +@pytest.mark.skipif(True, reason="https://paste.googleplex.com/6270958410137600") +def test_df_dot_bq_1kx1k_mul_1kx1k(session): + df1 = session.read_gbq("bigframes-dev.zzz_shobs_us.matrix_1k_by_1k") + df2 = session.read_gbq("bigframes-dev.zzz_shobs_us.matrix_1k_by_1k") + df1.columns = df2.index + bf_result = df1.dot(df2) + assert bf_result is not None From d9eeb0f4ca8345fa49a1d129652170bae5cd8e4b Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Fri, 29 Sep 2023 21:31:57 +0000 Subject: [PATCH 2/8] adjust tests for left column alignment with right rows --- tests/system/small/test_dataframe.py | 44 ++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 2d8edc9ea2..78ec23d5b6 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2802,7 +2802,7 @@ def test_df_dot_bq_table_data_small(session): # and then test parity df1 = session.read_gbq(square_bq_table) df2 = session.read_gbq(square_bq_table) - df1.columns = df2.index + df1.columns = df2.index.to_pandas() bf_result = df1.dot(df2).to_pandas() pd_result = df1.to_pandas().dot(df2.to_pandas()) @@ -2825,10 +2825,42 @@ def test_df_dot_bq_table_data_small(session): ) -@pytest.mark.skipif(True, reason="https://paste.googleplex.com/6270958410137600") -def test_df_dot_bq_1kx1k_mul_1kx1k(session): - df1 = session.read_gbq("bigframes-dev.zzz_shobs_us.matrix_1k_by_1k") - df2 = session.read_gbq("bigframes-dev.zzz_shobs_us.matrix_1k_by_1k") - df1.columns = df2.index +@pytest.mark.parametrize( + ("left_matrix", "right_matrix", "result_matrix"), + [ + ( + "bigframes-dev.zzz_shobs_us.matrix_1k_by_1k", + "bigframes-dev.zzz_shobs_us.matrix_1k_by_1k", + "bigframes-dev.zzz_shobs_us.matrix_1kx1k_mul_1kx1k_bf", + ), + ( + "bigframes-dev.zzz_shobs_us.matrix_100k_by_1k", + "bigframes-dev.zzz_shobs_us.matrix_1k_by_1k", + "bigframes-dev.zzz_shobs_us.matrix_100kx1k_mul_1kx1k_bf", + ), + ( + "bigframes-dev.zzz_shobs_us.matrix_1m_by_1k", + "bigframes-dev.zzz_shobs_us.matrix_1k_by_1k", + "bigframes-dev.zzz_shobs_us.matrix_1mx1k_mul_1kx1k_bf", + ), + ( + "bigframes-dev.zzz_shobs_us.matrix_10m_by_1k", + "bigframes-dev.zzz_shobs_us.matrix_1k_by_1k", + "bigframes-dev.zzz_shobs_us.matrix_10mx1k_mul_1kx1k_bf", + ), + ], + ids=[ + "1k", + "100k", + "1m", + "10m", + ], +) +@pytest.mark.skipif(True, reason="Enable only for large scale local testing") +def test_df_dot_bq_1kx1k_mul_1kx1k(session, left_matrix, right_matrix, result_matrix): + df1 = session.read_gbq(left_matrix) + df2 = session.read_gbq(right_matrix) + df1.columns = df2.index.to_pandas() bf_result = df1.dot(df2) + bf_result.to_gbq(result_matrix, if_exists="replace") assert bf_result is not None From 2079c2d116406aed5095c0aff85c374c86cccae7 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 2 Oct 2023 20:25:06 +0000 Subject: [PATCH 3/8] refactor pivot to use known unique values --- bigframes/core/blocks.py | 12 +++++++++++- bigframes/dataframe.py | 23 +++++++++++++++++++++-- tests/system/small/test_dataframe.py | 1 + 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 863852c684..6a2a239ca6 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1192,13 +1192,23 @@ def pivot( *, columns: Sequence[str], values: Sequence[str], + columns_unique_values: typing.Optional[ + typing.Union[pd.Index, Sequence[object]] + ] = None, values_in_index: typing.Optional[bool] = None, ): # Columns+index should uniquely identify rows # Warning: This is not validated, breaking this constraint will result in silently non-deterministic behavior. # -1 to allow for ordering column in addition to pivot columns max_unique_value = (_BQ_MAX_COLUMNS - 1) // len(values) - columns_values = self._get_unique_values(columns, max_unique_value) + if columns_unique_values is None: + columns_values = self._get_unique_values(columns, max_unique_value) + else: + columns_values = ( + columns_unique_values + if isinstance(columns_unique_values, pd.Index) + else pd.Index(columns_unique_values) + ) column_index = columns_values column_ids: list[str] = [] diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index bbd63a0d71..910ae196e1 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1664,10 +1664,13 @@ def kurt(self, *, numeric_only: bool = False): kurtosis = kurt - def pivot( + def _pivot( self, *, columns: typing.Union[blocks.Label, Sequence[blocks.Label]], + columns_unique_values: typing.Optional[ + typing.Union[pandas.Index, Sequence[object]] + ] = None, index: typing.Optional[ typing.Union[blocks.Label, Sequence[blocks.Label]] ] = None, @@ -1691,10 +1694,24 @@ def pivot( pivot_block = block.pivot( columns=column_ids, values=value_col_ids, + columns_unique_values=columns_unique_values, values_in_index=utils.is_list_like(values), ) return DataFrame(pivot_block) + def pivot( + self, + *, + columns: typing.Union[blocks.Label, Sequence[blocks.Label]], + index: typing.Optional[ + typing.Union[blocks.Label, Sequence[blocks.Label]] + ] = None, + values: typing.Optional[ + typing.Union[blocks.Label, Sequence[blocks.Label]] + ] = None, + ) -> DataFrame: + return self._pivot(columns=columns, index=index, values=values) + def stack(self): # TODO: support 'level' param by simply reordering levels such that selected level is last before passing to Block.stack. # TODO: match impl to pandas future_stack as described in pandas 2.1 release notes @@ -2596,7 +2613,9 @@ def get_right_id(id): ) aggregated_noindex = aggregated.reset_index() aggregated_noindex.columns = cvd_columns - result = aggregated_noindex.pivot(columns=col_id, index=row_id) + result = aggregated_noindex._pivot( + columns=col_id, columns_unique_values=other.columns, index=row_id + ) # Set the index names to match the left side matrix result.index.names = self.index.names diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 5b41e67d81..15d9c3764d 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2881,6 +2881,7 @@ def test_df_dot( ) +@pytest.mark.skipif(True, reason="Enable only for large scale local testing") def test_df_dot_bq_table_data_small(session): square_bq_table = "bigframes-dev.zzz_shobs_us.matrix_int_100_by_100" From 2cc9a6e055ddeb133fae7de80420a2ab83b8a39a Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Fri, 6 Oct 2023 19:58:18 +0000 Subject: [PATCH 4/8] Support DataFrame.dot(Series), remove local-only tests --- bigframes/dataframe.py | 22 +++--- tests/system/small/test_dataframe.py | 78 +++---------------- .../bigframes_vendored/pandas/core/frame.py | 30 +++++++ 3 files changed, 54 insertions(+), 76 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 910ae196e1..6c52bcdcf7 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2566,12 +2566,11 @@ def _get_block(self) -> blocks.Block: def _cached(self) -> DataFrame: return DataFrame(self._block.cached()) - def dot(self, other: DataFrame) -> DataFrame: - if not isinstance(other, DataFrame): - raise NotImplementedError("Only DataFrame operand is supported") + _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries") - # if not self.columns.equals(other.index): - # raise ValueError("matrices are not aligned") + def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: + if not isinstance(other, (DataFrame, bf_series.Series)): + raise NotImplementedError("Only DataFrame or Series operand is supported") # Convert the dataframes into cell-value-decomposed representation, i.e. # each cell value is present in a separate row @@ -2588,10 +2587,12 @@ def get_left_id(id): def get_right_id(id): return f"{id}{right_suffix}" + other_frame = other if isinstance(other, DataFrame) else other.to_frame() + left = self.stack().reset_index() left.columns = cvd_columns - right = other.stack().reset_index() + right = other_frame.stack().reset_index() right.columns = cvd_columns merged = left.merge( @@ -2614,7 +2615,7 @@ def get_right_id(id): aggregated_noindex = aggregated.reset_index() aggregated_noindex.columns = cvd_columns result = aggregated_noindex._pivot( - columns=col_id, columns_unique_values=other.columns, index=row_id + columns=col_id, columns_unique_values=other_frame.columns, index=row_id ) # Set the index names to match the left side matrix @@ -2623,8 +2624,11 @@ def get_right_id(id): # Pivot has the result columns ordered alphabetically. It should still # match the columns in the right sided matrix. Let's reorder them as per # the right side matrix - if not result.columns.difference(other.columns).empty: + if not result.columns.difference(other_frame.columns).empty: raise RuntimeError("Could not construct all columns") - result = result[other.columns] + result = result[other_frame.columns] + + if isinstance(other, bf_series.Series): + result = result[other.name].rename() return result diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 15d9c3764d..5425c383b7 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2881,73 +2881,17 @@ def test_df_dot( ) -@pytest.mark.skipif(True, reason="Enable only for large scale local testing") -def test_df_dot_bq_table_data_small(session): - square_bq_table = "bigframes-dev.zzz_shobs_us.matrix_int_100_by_100" - - # In the first variant, convert BigFrames dataframe to Pandas dataframe - # and then test parity - df1 = session.read_gbq(square_bq_table) - df2 = session.read_gbq(square_bq_table) - df1.columns = df2.index.to_pandas() - - bf_result = df1.dot(df2).to_pandas() - pd_result = df1.to_pandas().dot(df2.to_pandas()) - - # Ignore dtype parity, just compare the values - # TODO(shobs): See if we can achieve data type parity - pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) - - # In the second variant, read Pandas dataframe directly and then test parity - df1 = pd.read_gbq(square_bq_table) - df2 = pd.read_gbq(square_bq_table) - df1.columns = df2.index +def test_df_dot_series( + matrix_2by3_df, matrix_2by3_pandas_df, matrix_3by4_df, matrix_3by4_pandas_df +): + bf_result = matrix_2by3_df.dot(matrix_3by4_df["x"]).to_pandas() + pd_result = matrix_2by3_pandas_df.dot(matrix_3by4_pandas_df["x"]) - pd_result = df1.dot(df2) + # Patch pandas dtypes for testing parity + # Pandas result is object instead of Int64 (nullable) dtype. + pd_result = pd_result.astype(pd.Int64Dtype()) - # Ignore data type and index parity, just compare the values - # TODO(shobs): See if we can achieve data type and index parity - pd.testing.assert_frame_equal( - bf_result, pd_result, check_dtype=False, check_index_type=False + pd.testing.assert_series_equal( + bf_result, + pd_result, ) - - -@pytest.mark.parametrize( - ("left_matrix", "right_matrix", "result_matrix"), - [ - ( - "bigframes-dev.zzz_shobs_us.matrix_1k_by_1k", - "bigframes-dev.zzz_shobs_us.matrix_1k_by_1k", - "bigframes-dev.zzz_shobs_us.matrix_1kx1k_mul_1kx1k_bf", - ), - ( - "bigframes-dev.zzz_shobs_us.matrix_100k_by_1k", - "bigframes-dev.zzz_shobs_us.matrix_1k_by_1k", - "bigframes-dev.zzz_shobs_us.matrix_100kx1k_mul_1kx1k_bf", - ), - ( - "bigframes-dev.zzz_shobs_us.matrix_1m_by_1k", - "bigframes-dev.zzz_shobs_us.matrix_1k_by_1k", - "bigframes-dev.zzz_shobs_us.matrix_1mx1k_mul_1kx1k_bf", - ), - ( - "bigframes-dev.zzz_shobs_us.matrix_10m_by_1k", - "bigframes-dev.zzz_shobs_us.matrix_1k_by_1k", - "bigframes-dev.zzz_shobs_us.matrix_10mx1k_mul_1kx1k_bf", - ), - ], - ids=[ - "1k", - "100k", - "1m", - "10m", - ], -) -@pytest.mark.skipif(True, reason="Enable only for large scale local testing") -def test_df_dot_bq_1kx1k_mul_1kx1k(session, left_matrix, right_matrix, result_matrix): - df1 = session.read_gbq(left_matrix) - df2 = session.read_gbq(right_matrix) - df1.columns = df2.index.to_pandas() - bf_result = df1.dot(df2) - bf_result.to_gbq(result_matrix, if_exists="replace") - assert bf_result is not None diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 348145a4d6..3f98fd735c 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -2053,3 +2053,33 @@ def fillna(self, value): DataFrame: Object with missing values filled """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + def dot(self, other): + """ + Compute the matrix multiplication between the DataFrame and other. + + This method computes the matrix product between the DataFrame and the + values of an other Series or DataFrame. + + It can also be called using `self @ other`. + + .. note:: + The dimensions of DataFrame and other must be compatible in order to + compute the matrix multiplication. In addition, the column names of + DataFrame and the index of other must contain the same values, as they + will be aligned prior to the multiplication. + + The dot method for Series computes the inner product, instead of the + matrix product here. + + Args: + other (Series or DataFrame): + The other object to compute the matrix product with. + + Returns: + Series or DataFrame + If `other` is a Series, return the matrix product between self and + other as a Series. If other is a DataFrame, return + the matrix product of self and other in a DataFrame. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From 464efbf4e9ac34e684a815b004cafe28243f1dc6 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sat, 7 Oct 2023 00:26:07 +0000 Subject: [PATCH 5/8] add explanation for `column_unique_values` param --- bigframes/core/blocks.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 9862d7a0fd..fbf619b9bd 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1256,11 +1256,16 @@ def pivot( ] = None, values_in_index: typing.Optional[bool] = None, ): - # Columns+index should uniquely identify rows - # Warning: This is not validated, breaking this constraint will result in silently non-deterministic behavior. - # -1 to allow for ordering column in addition to pivot columns - max_unique_value = (_BQ_MAX_COLUMNS - 1) // len(values) + # We need the unique values from the pivot columns to turn them into + # column ids. It can be deteremined by running a SQL query on the + # underlying data. However, the caller can save that if they know the + # unique values upfront by providing them explicitly. if columns_unique_values is None: + # Columns+index should uniquely identify rows + # Warning: This is not validated, breaking this constraint will + # result in silently non-deterministic behavior. + # -1 to allow for ordering column in addition to pivot columns + max_unique_value = (_BQ_MAX_COLUMNS - 1) // len(values) columns_values = self._get_unique_values(columns, max_unique_value) else: columns_values = ( From 55b09925cdcf1d50c79424f3ea5a12131a4463f4 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 18 Oct 2023 19:50:28 +0000 Subject: [PATCH 6/8] NotImplementedError for multi-index matrix multiplication --- bigframes/dataframe.py | 6 ++++ tests/system/small/test_multiindex.py | 44 +++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 2fdd0cc4df..506039db74 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2602,6 +2602,12 @@ def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: if not isinstance(other, (DataFrame, bf_series.Series)): raise NotImplementedError("Only DataFrame or Series operand is supported") + if len(self.index.names) > 1 or len(other.index.names) > 1: + raise NotImplementedError("Multi-index input is not supported") + + if len(self.columns.names) > 1 or len(other.columns.names) > 1: + raise NotImplementedError("Multi-level column input is not supported") + # Convert the dataframes into cell-value-decomposed representation, i.e. # each cell value is present in a separate row row_id = "row" diff --git a/tests/system/small/test_multiindex.py b/tests/system/small/test_multiindex.py index a132676770..b5c78de69c 100644 --- a/tests/system/small/test_multiindex.py +++ b/tests/system/small/test_multiindex.py @@ -934,3 +934,47 @@ def test_column_multi_index_swaplevel(scalars_df_index, scalars_pandas_df_index) pd_result = pd_df.swaplevel(-3, -1, axis=1) pandas.testing.assert_frame_equal(bf_result, pd_result) + + +def test_df_multi_index_dot_not_supported(): + left_matrix = [[1, 2, 3], [2, 5, 7]] + right_matrix = [[2, 4, 8], [1, 5, 10], [3, 6, 9]] + + # Left multi-index + left_index = pandas.MultiIndex.from_tuples([("a", "aa"), ("a", "ab")]) + bf1 = bpd.DataFrame(left_matrix, index=left_index) + bf2 = bpd.DataFrame(right_matrix) + with pytest.raises(NotImplementedError, match="Multi-index input is not supported"): + bf1.dot(bf2) + + # right multi-index + right_index = pandas.MultiIndex.from_tuples([("a", "aa"), ("a", "ab"), ("b", "bb")]) + bf1 = bpd.DataFrame(left_matrix) + bf2 = bpd.DataFrame(right_matrix, index=right_index) + with pytest.raises(NotImplementedError, match="Multi-index input is not supported"): + bf1.dot(bf2) + + +def test_column_multi_index_dot_not_supported(): + left_matrix = [[1, 2, 3], [2, 5, 7]] + right_matrix = [[2, 4, 8], [1, 5, 10], [3, 6, 9]] + + multi_level_columns = pandas.MultiIndex.from_arrays( + [["col0", "col0", "col1"], ["col00", "col01", "col11"]] + ) + + # Left multi-columns + bf1 = bpd.DataFrame(left_matrix, columns=multi_level_columns) + bf2 = bpd.DataFrame(right_matrix) + with pytest.raises( + NotImplementedError, match="Multi-level column input is not supported" + ): + bf1.dot(bf2) + + # right multi-columns + bf1 = bpd.DataFrame(left_matrix) + bf2 = bpd.DataFrame(right_matrix, columns=multi_level_columns) + with pytest.raises( + NotImplementedError, match="Multi-level column input is not supported" + ): + bf1.dot(bf2) From 42d888f66dc0c81c0096ee163c1dc37381f9925b Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 18 Oct 2023 20:21:58 +0000 Subject: [PATCH 7/8] avoid multi-column condition check on series --- bigframes/dataframe.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 506039db74..9a896823b7 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2605,7 +2605,9 @@ def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: if len(self.index.names) > 1 or len(other.index.names) > 1: raise NotImplementedError("Multi-index input is not supported") - if len(self.columns.names) > 1 or len(other.columns.names) > 1: + if len(self.columns.names) > 1 or ( + isinstance(other, DataFrame) and len(other.columns.names) > 1 + ): raise NotImplementedError("Multi-level column input is not supported") # Convert the dataframes into cell-value-decomposed representation, i.e. From 3de16e0045da8f81e495a1e851ca3352a37b3ac9 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 18 Oct 2023 21:16:30 +0000 Subject: [PATCH 8/8] Add feedback link in the exception messages --- bigframes/dataframe.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 9a896823b7..32a2908a42 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2600,15 +2600,21 @@ def _cached(self) -> DataFrame: def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: if not isinstance(other, (DataFrame, bf_series.Series)): - raise NotImplementedError("Only DataFrame or Series operand is supported") + raise NotImplementedError( + f"Only DataFrame or Series operand is supported. {constants.FEEDBACK_LINK}" + ) if len(self.index.names) > 1 or len(other.index.names) > 1: - raise NotImplementedError("Multi-index input is not supported") + raise NotImplementedError( + f"Multi-index input is not supported. {constants.FEEDBACK_LINK}" + ) if len(self.columns.names) > 1 or ( isinstance(other, DataFrame) and len(other.columns.names) > 1 ): - raise NotImplementedError("Multi-level column input is not supported") + raise NotImplementedError( + f"Multi-level column input is not supported. {constants.FEEDBACK_LINK}" + ) # Convert the dataframes into cell-value-decomposed representation, i.e. # each cell value is present in a separate row @@ -2663,7 +2669,9 @@ def get_right_id(id): # match the columns in the right sided matrix. Let's reorder them as per # the right side matrix if not result.columns.difference(other_frame.columns).empty: - raise RuntimeError("Could not construct all columns") + raise RuntimeError( + f"Could not construct all columns. {constants.FEEDBACK_LINK}" + ) result = result[other_frame.columns] if isinstance(other, bf_series.Series):