From 6e54a4d5f7dfcc5072051f92932f14efdaff3dab Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 22 Sep 2023 00:40:28 +0000 Subject: [PATCH] perf: add ability to cache dataframe and series to session table --- bigframes/core/__init__.py | 23 +++++++++++++++++++++++ bigframes/core/blocks.py | 9 +++++++++ bigframes/dataframe.py | 3 +++ bigframes/series.py | 3 +++ tests/system/small/test_dataframe.py | 10 ++++++++++ 5 files changed, 48 insertions(+) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 3b3754642e..44af203e56 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -1128,6 +1128,29 @@ def slice( ) return sliced_expr if step > 0 else sliced_expr.reversed() + def cached(self, cluster_cols: typing.Sequence[str]) -> ArrayValue: + """Write the ArrayValue to a session table and create a new block object that references it.""" + ibis_expr = self._to_ibis_expr( + ordering_mode="unordered", expose_hidden_cols=True + ) + destination = self._session._ibis_to_session_table( + ibis_expr, cluster_cols=cluster_cols, api_name="cache" + ) + table_expression = self._session.ibis_client.sql( + f"SELECT * FROM `_SESSION`.`{destination.table_id}`" + ) + new_columns = [table_expression[column] for column in self.column_names] + new_hidden_columns = [ + table_expression[column] for column in self._hidden_ordering_column_names + ] + return ArrayValue( + self._session, + table_expression, + columns=new_columns, + hidden_ordering_columns=new_hidden_columns, + ordering=self._ordering, + ) + class ArrayValueBuilder: """Mutable expression class. diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index ad4f72070f..c4127c5fd5 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1467,6 +1467,15 @@ def to_sql_query( idx_labels, ) + def cached(self) -> Block: + """Write the block to a session table and create a new block object that references it.""" + return Block( + self.expr.cached(cluster_cols=self.index_columns), + index_columns=self.index_columns, + column_labels=self.column_labels, + index_labels=self.index_labels, + ) + def _is_monotonic( self, column_ids: typing.Union[str, Sequence[str]], increasing: bool ) -> bool: diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 76377cd477..0d357e7c3d 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2480,3 +2480,6 @@ def _set_block(self, block: blocks.Block): def _get_block(self) -> blocks.Block: return self._block + + def _cached(self) -> DataFrame: + return DataFrame(self._block.cached()) diff --git a/bigframes/series.py b/bigframes/series.py index 8e47088c14..c1c0cb0537 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1389,6 +1389,9 @@ def _slice( ), ) + def _cached(self) -> Series: + return Series(self._block.cached()) + def _is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]: return pandas.api.types.is_list_like(obj) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 3eeb368ad2..b6ca958c03 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2717,3 +2717,13 @@ def test_query_job_setters(scalars_df_default_index: dataframe.DataFrame): job_ids.add(scalars_df_default_index.query_job.job_id) assert len(job_ids) == 2 + + +def test_df_cached(scalars_df_index): + df = scalars_df_index.set_index(["int64_too", "int64_col"]).sort_values( + "string_col" + ) + df = df[df["rowindex_2"] % 2 == 0] + + df_cached_copy = df._cached() + pandas.testing.assert_frame_equal(df.to_pandas(), df_cached_copy.to_pandas())