From 0c9eb40c179c4a9ea8324e4866531b8f4fa0ed63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Mon, 1 Apr 2024 07:19:12 -0500 Subject: [PATCH] Revert "feat: Support max_columns in repr and make repr more efficient (#515)" This reverts commit 54e49cff89bd329852a823cd5cf5c5b41b7f9e32. --- bigframes/core/blocks.py | 42 +++++++-------------- bigframes/core/indexes/index.py | 10 ++--- bigframes/dataframe.py | 66 ++++++++++++++++++++------------- bigframes/series.py | 9 +++-- bigframes/session/__init__.py | 8 +--- 5 files changed, 65 insertions(+), 70 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index aab8b1ad4d..11899eef11 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -467,23 +467,6 @@ def to_pandas_batches(self): self._copy_index_to_pandas(df) yield df - def download_pandas_preview( - self, max_rows: int - ) -> Tuple[pd.DataFrame, bigquery.QueryJob]: - """Download one page of results and return the query job.""" - dtypes = dict(zip(self.index_columns, self.index.dtypes)) - dtypes.update(zip(self.value_columns, self.dtypes)) - results_iterator, query_job = self.session._execute( - self.expr, sorted=True, max_results=max_rows - ) - arrow_results_iterator = results_iterator.to_arrow_iterable() - arrow_table = next(arrow_results_iterator) - downloaded_df = bigframes.session._io.pandas.arrow_to_pandas( - arrow_table, dtypes - ) - self._copy_index_to_pandas(downloaded_df) - return downloaded_df, query_job - def _copy_index_to_pandas(self, df: pd.DataFrame): """Set the index on pandas DataFrame to match this block. @@ -1314,25 +1297,26 @@ def _forward_slice(self, start: int = 0, stop=None, step: int = 1): # queries. @functools.cache def retrieve_repr_request_results( - self, max_results: int, max_columns: int - ) -> Tuple[pd.DataFrame, Tuple[int, int], bigquery.QueryJob]: + self, max_results: int + ) -> Tuple[pd.DataFrame, int, bigquery.QueryJob]: """ Retrieves a pandas dataframe containing only max_results many rows for use with printing methods. - Returns a tuple of the dataframe preview for printing and the overall number - of rows and columns of the table, as well as the query job used. + Returns a tuple of the dataframe and the overall number of rows of the query. """ - pandas_df, query_job = self.download_pandas_preview(max_results) - row_count = self.session._get_table_row_count(query_job.destination) - column_count = len(self.value_columns) - - formatted_df = pandas_df.set_axis(self.column_labels, axis=1) + # TODO(swast): Select a subset of columns if max_columns is less than the + # number of columns in the schema. + count = self.shape[0] + if count > max_results: + head_block = self.slice(0, max_results) + else: + head_block = self + computed_df, query_job = head_block.to_pandas() + formatted_df = computed_df.set_axis(self.column_labels, axis=1) # we reset the axis and substitute the bf index name for the default formatted_df.index.name = self.index.name - # limit column count - formatted_df = formatted_df.iloc[:, 0:max_columns] - return formatted_df, (row_count, column_count), query_job + return formatted_df, count, query_job def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: result_id = guid.generate_guid() diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index 48988aaffe..c818b68711 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -205,17 +205,17 @@ def query_job(self) -> Optional[bigquery.QueryJob]: return self._query_job def __repr__(self) -> str: + # TODO(swast): Add a timeout here? If the query is taking a long time, + # maybe we just print the job metadata that we have so far? + # TODO(swast): Avoid downloading the whole series by using job + # metadata, like we do with DataFrame. opts = bigframes.options.display max_results = opts.max_rows - max_columns = opts.max_columns if opts.repr_mode == "deferred": return formatter.repr_query_job(self.query_job) - pandas_df, _, query_job = self._block.retrieve_repr_request_results( - max_results, max_columns - ) + pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results) self._query_job = query_job - return repr(pandas_df.index) def copy(self, name: Optional[Hashable] = None): diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 1df78dd4cd..066b082490 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -579,16 +579,28 @@ def __setattr__(self, key: str, value): object.__setattr__(self, key, value) def __repr__(self) -> str: - """Converts a DataFrame to a string using pandas dataframe __repr__. + """Converts a DataFrame to a string. Calls to_pandas. - Only represents the first `bigframes.options.display.max_rows` - and `bigframes.options.display.max_columns`. + Only represents the first `bigframes.options.display.max_rows`. """ - if bigframes.options.display.repr_mode == "deferred": + opts = bigframes.options.display + max_results = opts.max_rows + if opts.repr_mode == "deferred": return formatter.repr_query_job(self.query_job) - pandas_df, shape = self._perform_repr_request() - with display_options.pandas_repr(bigframes.options.display): + self._cached() + # TODO(swast): pass max_columns and get the true column count back. Maybe + # get 1 more column than we have requested so that pandas can add the + # ... for us? + pandas_df, row_count, query_job = self._block.retrieve_repr_request_results( + max_results + ) + + self._set_internal_query_job(query_job) + + column_count = len(pandas_df.columns) + + with display_options.pandas_repr(opts): repr_string = repr(pandas_df) # Modify the end of the string to reflect count. @@ -596,40 +608,42 @@ def __repr__(self) -> str: pattern = re.compile("\\[[0-9]+ rows x [0-9]+ columns\\]") if pattern.match(lines[-1]): lines = lines[:-2] - if shape[0] > len(lines) - 1: + + if row_count > len(lines) - 1: lines.append("...") + lines.append("") - lines.append(f"[{shape[0]} rows x {shape[1]} columns]") + lines.append(f"[{row_count} rows x {column_count} columns]") return "\n".join(lines) - def _perform_repr_request(self) -> Tuple[pandas.DataFrame, Tuple[int, int]]: - max_results = bigframes.options.display.max_rows - max_columns = bigframes.options.display.max_columns - self._cached() - pandas_df, shape, query_job = self._block.retrieve_repr_request_results( - max_results, max_columns - ) - self._set_internal_query_job(query_job) - return pandas_df, shape - def _repr_html_(self) -> str: """ Returns an html string primarily for use by notebooks for displaying - a representation of the DataFrame. Displays at most the number of rows - and columns given by `bigframes.options.display.max_rows` and - `bigframes.options.display.max_columns`. + a representation of the DataFrame. Displays 20 rows by default since + many notebooks are not configured for large tables. """ - - if bigframes.options.display.repr_mode == "deferred": + opts = bigframes.options.display + max_results = bigframes.options.display.max_rows + if opts.repr_mode == "deferred": return formatter.repr_query_job_html(self.query_job) - pandas_df, shape = self._perform_repr_request() + self._cached() + # TODO(swast): pass max_columns and get the true column count back. Maybe + # get 1 more column than we have requested so that pandas can add the + # ... for us? + pandas_df, row_count, query_job = self._block.retrieve_repr_request_results( + max_results + ) + + self._set_internal_query_job(query_job) + + column_count = len(pandas_df.columns) - with display_options.pandas_repr(bigframes.options.display): + with display_options.pandas_repr(opts): # _repr_html_ stub is missing so mypy thinks it's a Series. Ignore mypy. html_string = pandas_df._repr_html_() # type:ignore - html_string += f"[{shape[0]} rows x {shape[1]} columns in total]" + html_string += f"[{row_count} rows x {column_count} columns in total]" return html_string def __setitem__(self, key: str, value: SingleItemValue): diff --git a/bigframes/series.py b/bigframes/series.py index f1ac89f514..e7b358c2fe 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -272,16 +272,17 @@ def reset_index( return bigframes.dataframe.DataFrame(block) def __repr__(self) -> str: + # TODO(swast): Add a timeout here? If the query is taking a long time, + # maybe we just print the job metadata that we have so far? + # TODO(swast): Avoid downloading the whole series by using job + # metadata, like we do with DataFrame. opts = bigframes.options.display max_results = opts.max_rows - max_columns = opts.max_columns if opts.repr_mode == "deferred": return formatter.repr_query_job(self.query_job) self._cached() - pandas_df, _, query_job = self._block.retrieve_repr_request_results( - max_results, max_columns - ) + pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results) self._set_internal_query_job(query_job) return repr(pandas_df.iloc[:, 0]) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 6573934f94..ac266da3bd 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1832,7 +1832,6 @@ def _execute( sorted: bool = True, dry_run=False, col_id_overrides: Mapping[str, str] = {}, - max_results: Optional[int] = None, ) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]: sql = self._to_sql( array_value, sorted=sorted, col_id_overrides=col_id_overrides @@ -1842,7 +1841,8 @@ def _execute( else: job_config.dry_run = dry_run return self._start_query( - sql=sql, job_config=job_config, max_results=max_results + sql=sql, + job_config=job_config, ) def _peek( @@ -1887,10 +1887,6 @@ def _get_table_size(self, destination_table): table = self.bqclient.get_table(destination_table) return table.num_bytes - def _get_table_row_count(self, destination_table) -> int: - table = self.bqclient.get_table(destination_table) - return table.num_rows - def _rows_to_dataframe( self, row_iterator: bigquery.table.RowIterator, dtypes: Dict ) -> pandas.DataFrame: