diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 92708a7f93..ab09230c99 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1170,7 +1170,11 @@ def _read_csv_w_bigquery_engine( table_id = self._loader.load_file(filepath_or_buffer, job_config=job_config) df = self._loader.read_gbq_table( - table_id, index_col=index_col, columns=columns, names=names + table_id, + index_col=index_col, + columns=columns, + names=names, + index_col_in_columns=True, ) if dtype is not None: diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index cf02393fd8..814d44292e 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -96,7 +96,31 @@ def _to_index_cols( return index_cols -def _check_column_duplicates(index_cols: Iterable[str], columns: Iterable[str]): +def _check_column_duplicates( + index_cols: Iterable[str], columns: Iterable[str], index_col_in_columns: bool +) -> Iterable[str]: + """Validates and processes index and data columns for duplicates and overlap. + + This function performs two main tasks: + 1. Ensures there are no duplicate column names within the `index_cols` list + or within the `columns` list. + 2. Based on the `index_col_in_columns` flag, it validates the relationship + between `index_cols` and `columns`. + + Args: + index_cols (Iterable[str]): + An iterable of column names designated as the index. + columns (Iterable[str]): + An iterable of column names designated as the data columns. + index_col_in_columns (bool): + A flag indicating how to handle overlap between `index_cols` and + `columns`. + - If `False`, the two lists must be disjoint (contain no common + elements). An error is raised if any overlap is found. + - If `True`, `index_cols` is expected to be a subset of + `columns`. An error is raised if an index column is not found + in the `columns` list. + """ index_cols_list = list(index_cols) if index_cols is not None else [] columns_list = list(columns) if columns is not None else [] set_index = set(index_cols_list) @@ -108,17 +132,29 @@ def _check_column_duplicates(index_cols: Iterable[str], columns: Iterable[str]): "All column names specified in 'index_col' must be unique." ) + if len(columns_list) == 0: + return columns + if len(columns_list) > len(set_columns): raise ValueError( "The 'columns' argument contains duplicate names. " "All column names specified in 'columns' must be unique." ) - if not set_index.isdisjoint(set_columns): - raise ValueError( - "Found column names that exist in both 'index_col' and 'columns' arguments. " - "These arguments must specify distinct sets of columns." - ) + if index_col_in_columns: + if not set_index.issubset(set_columns): + raise ValueError( + f"The specified index column(s) were not found: {set_index - set_columns}. " + f"Available columns are: {set_columns}" + ) + return [col for col in columns if col not in set_index] + else: + if not set_index.isdisjoint(set_columns): + raise ValueError( + "Found column names that exist in both 'index_col' and 'columns' arguments. " + "These arguments must specify distinct sets of columns." + ) + return columns @dataclasses.dataclass @@ -391,6 +427,7 @@ def read_gbq_table( # type: ignore[overload-overlap] dry_run: Literal[False] = ..., force_total_order: Optional[bool] = ..., n_rows: Optional[int] = None, + index_col_in_columns: bool = False, ) -> dataframe.DataFrame: ... @@ -413,6 +450,7 @@ def read_gbq_table( dry_run: Literal[True] = ..., force_total_order: Optional[bool] = ..., n_rows: Optional[int] = None, + index_col_in_columns: bool = False, ) -> pandas.Series: ... @@ -434,7 +472,67 @@ def read_gbq_table( dry_run: bool = False, force_total_order: Optional[bool] = None, n_rows: Optional[int] = None, + index_col_in_columns: bool = False, ) -> dataframe.DataFrame | pandas.Series: + """Read a BigQuery table into a BigQuery DataFrames DataFrame. + + This method allows you to create a DataFrame from a BigQuery table. + You can specify the columns to load, an index column, and apply + filters. + + Args: + table_id (str): + The identifier of the BigQuery table to read. + index_col (Iterable[str] | str | Iterable[int] | int | bigframes.enums.DefaultIndexKind, optional): + The column(s) to use as the index for the DataFrame. This can be + a single column name or a list of column names. If not provided, + a default index will be used based on the session's + ``default_index_type``. + columns (Iterable[str], optional): + The columns to read from the table. If not specified, all + columns will be read. + names (Optional[Iterable[str]], optional): + A list of column names to use for the resulting DataFrame. This + is useful if you want to rename the columns as you read the + data. + max_results (Optional[int], optional): + The maximum number of rows to retrieve from the table. If not + specified, all rows will be loaded. + use_cache (bool, optional): + Whether to use cached results for the query. Defaults to True. + Setting this to False will force a re-execution of the query. + filters (third_party_pandas_gbq.FiltersType, optional): + A list of filters to apply to the data. Filters are specified + as a list of tuples, where each tuple contains a column name, + an operator (e.g., '==', '!='), and a value. + enable_snapshot (bool, optional): + If True, a snapshot of the table is used to ensure that the + DataFrame is deterministic, even if the underlying table + changes. Defaults to True. + dry_run (bool, optional): + If True, the function will not actually execute the query but + will instead return statistics about the table. Defaults to False. + force_total_order (Optional[bool], optional): + If True, a total ordering is enforced on the DataFrame, which + can be useful for operations that require a stable row order. + If None, the session's default behavior is used. + n_rows (Optional[int], optional): + The number of rows to consider for type inference and other + metadata operations. This does not limit the number of rows + in the final DataFrame. + index_col_in_columns (bool, optional): + Specifies if the ``index_col`` is also present in the ``columns`` + list. Defaults to ``False``. + + * If ``False``, ``index_col`` and ``columns`` must specify + distinct sets of columns. An error will be raised if any + column is found in both. + * If ``True``, the column(s) in ``index_col`` are expected to + also be present in the ``columns`` list. This is useful + when the index is selected from the data columns (e.g., in a + ``read_csv`` scenario). The column will be used as the + DataFrame's index and removed from the list of value columns. + """ import bigframes._tools.strings import bigframes.dataframe as dataframe @@ -516,7 +614,9 @@ def read_gbq_table( index_col=index_col, names=names, ) - _check_column_duplicates(index_cols, columns) + columns = list( + _check_column_duplicates(index_cols, columns, index_col_in_columns) + ) for key in index_cols: if key not in table_column_names: @@ -798,7 +898,9 @@ def read_gbq_query( ) index_cols = _to_index_cols(index_col) - _check_column_duplicates(index_cols, columns) + columns = _check_column_duplicates( + index_cols, columns, index_col_in_columns=False + ) filters_copy1, filters_copy2 = itertools.tee(filters) has_filters = len(list(filters_copy1)) != 0 diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 6e68a759b4..9febb0da42 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -1320,10 +1320,6 @@ def test_read_csv_for_names_less_than_columns(session, df_and_gcs_csv_for_two_co assert bf_df.shape == pd_df.shape assert bf_df.columns.tolist() == pd_df.columns.tolist() - # BigFrames requires `sort_index()` because BigQuery doesn't preserve row IDs - # (b/280889935) or guarantee row ordering. - bf_df = bf_df.sort_index() - # Pandas's index name is None, while BigFrames's index name is "rowindex". pd_df.index.name = "rowindex" pd.testing.assert_frame_equal(bf_df.to_pandas(), pd_df.to_pandas()) @@ -1479,41 +1475,70 @@ def test_read_csv_for_gcs_file_w_header(session, df_and_gcs_csv, header): def test_read_csv_w_usecols(session, df_and_local_csv): # Compares results for pandas and bigframes engines scalars_df, path = df_and_local_csv + usecols = ["rowindex", "bool_col"] with open(path, "rb") as buffer: bf_df = session.read_csv( buffer, engine="bigquery", - usecols=["bool_col"], + usecols=usecols, ) with open(path, "rb") as buffer: # Convert default pandas dtypes to match BigQuery DataFrames dtypes. pd_df = session.read_csv( buffer, - usecols=["bool_col"], + usecols=usecols, dtype=scalars_df[["bool_col"]].dtypes.to_dict(), ) - # Cannot compare two dataframe due to b/408499371. - assert len(bf_df.columns) == 1 - assert len(pd_df.columns) == 1 + assert bf_df.shape == pd_df.shape + assert bf_df.columns.tolist() == pd_df.columns.tolist() + # BigFrames requires `sort_index()` because BigQuery doesn't preserve row IDs + # (b/280889935) or guarantee row ordering. + bf_df = bf_df.set_index("rowindex").sort_index() + pd_df = pd_df.set_index("rowindex") + pd.testing.assert_frame_equal(bf_df.to_pandas(), pd_df.to_pandas()) -@pytest.mark.parametrize( - "engine", - [ - pytest.param("bigquery", id="bq_engine"), - pytest.param(None, id="default_engine"), - ], -) -def test_read_csv_local_w_usecols(session, scalars_pandas_df_index, engine): - with tempfile.TemporaryDirectory() as dir: - path = dir + "/test_read_csv_local_w_usecols.csv" - # Using the pandas to_csv method because the BQ one does not support local write. - scalars_pandas_df_index.to_csv(path, index=False) - # df should only have 1 column which is bool_col. - df = session.read_csv(path, usecols=["bool_col"], engine=engine) - assert len(df.columns) == 1 +def test_read_csv_w_usecols_and_indexcol(session, df_and_local_csv): + # Compares results for pandas and bigframes engines + scalars_df, path = df_and_local_csv + usecols = ["rowindex", "bool_col"] + with open(path, "rb") as buffer: + bf_df = session.read_csv( + buffer, + engine="bigquery", + usecols=usecols, + index_col="rowindex", + ) + with open(path, "rb") as buffer: + # Convert default pandas dtypes to match BigQuery DataFrames dtypes. + pd_df = session.read_csv( + buffer, + usecols=usecols, + index_col="rowindex", + dtype=scalars_df[["bool_col"]].dtypes.to_dict(), + ) + + assert bf_df.shape == pd_df.shape + assert bf_df.columns.tolist() == pd_df.columns.tolist() + + pd.testing.assert_frame_equal(bf_df.to_pandas(), pd_df.to_pandas()) + + +def test_read_csv_w_indexcol_not_in_usecols(session, df_and_local_csv): + _, path = df_and_local_csv + with open(path, "rb") as buffer: + with pytest.raises( + ValueError, + match=re.escape("The specified index column(s) were not found"), + ): + session.read_csv( + buffer, + engine="bigquery", + usecols=["bool_col"], + index_col="rowindex", + ) @pytest.mark.parametrize( @@ -1553,9 +1578,6 @@ def test_read_csv_local_w_encoding(session, penguins_pandas_df_default_index): bf_df = session.read_csv( path, engine="bigquery", index_col="rowindex", encoding="ISO-8859-1" ) - # BigFrames requires `sort_index()` because BigQuery doesn't preserve row IDs - # (b/280889935) or guarantee row ordering. - bf_df = bf_df.sort_index() pd.testing.assert_frame_equal( bf_df.to_pandas(), penguins_pandas_df_default_index )