From ccd3c03cf99951465162d4d4316e6c3868bdaa16 Mon Sep 17 00:00:00 2001 From: Garrett Wu <6505921+GarrettWu@users.noreply.github.com> Date: Wed, 18 Sep 2024 15:07:35 -0700 Subject: [PATCH 01/14] test: move Claude3 tests to load test (#997) * test: move Claude3 tests to load test * add conftest --- tests/system/conftest.py | 34 +++++++++++----- tests/system/load/conftest.py | 39 ++++++++++++++++++ tests/system/load/test_llm.py | 66 ++++++++++++++++++++++++++++++ tests/system/small/ml/conftest.py | 24 ----------- tests/system/small/ml/test_llm.py | 67 ------------------------------- 5 files changed, 129 insertions(+), 101 deletions(-) create mode 100644 tests/system/load/conftest.py diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 9cfb9082af..5ee2dc6397 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -146,16 +146,6 @@ def session() -> Generator[bigframes.Session, None, None]: session.close() # close generated session at cleanup time -@pytest.fixture(scope="session") -def session_us_east5() -> Generator[bigframes.Session, None, None]: - context = bigframes.BigQueryOptions( - location="us-east5", - ) - session = bigframes.Session(context=context) - yield session - session.close() # close generated session at cleanup time - - @pytest.fixture(scope="session") def session_load() -> Generator[bigframes.Session, None, None]: context = bigframes.BigQueryOptions(location="US", project="bigframes-load-testing") @@ -188,6 +178,11 @@ def session_tokyo(tokyo_location: str) -> Generator[bigframes.Session, None, Non session.close() # close generated session at cleanup type +@pytest.fixture(scope="session") +def bq_connection(bigquery_client: bigquery.Client) -> str: + return f"{bigquery_client.project}.{bigquery_client.location}.bigframes-rf-conn" + + @pytest.fixture(scope="session", autouse=True) def cleanup_datasets(bigquery_client: bigquery.Client) -> None: """Cleanup any datasets that were created but not cleaned up.""" @@ -728,6 +723,25 @@ def new_penguins_df(session, new_penguins_pandas_df): return session.read_pandas(new_penguins_pandas_df) +@pytest.fixture(scope="session") +def llm_text_pandas_df(): + """Additional data matching the penguins dataset, with a new index""" + return pd.DataFrame( + { + "prompt": [ + "What is BigQuery?", + "What is BQML?", + "What is BigQuery DataFrame?", + ], + } + ) + + +@pytest.fixture(scope="session") +def llm_text_df(session, llm_text_pandas_df): + return session.read_pandas(llm_text_pandas_df) + + @pytest.fixture(scope="session") def penguins_linear_model_name( session: bigframes.Session, dataset_id_permanent, penguins_table_id diff --git a/tests/system/load/conftest.py b/tests/system/load/conftest.py new file mode 100644 index 0000000000..f15f50c7e7 --- /dev/null +++ b/tests/system/load/conftest.py @@ -0,0 +1,39 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from typing import Generator + +import pytest + +import bigframes + + +# Override the session to target at bigframes-load-testing at all load tests. That allows to run load tests locally with authentic env. +@pytest.fixture(scope="session") +def session() -> Generator[bigframes.Session, None, None]: + context = bigframes.BigQueryOptions(location="US", project="bigframes-load-testing") + session = bigframes.Session(context=context) + yield session + session.close() # close generated session at cleanup time + + +@pytest.fixture(scope="session") +def session_us_east5() -> Generator[bigframes.Session, None, None]: + context = bigframes.BigQueryOptions( + location="us-east5", project="bigframes-load-testing" + ) + session = bigframes.Session(context=context) + yield session + session.close() # close generated session at cleanup time diff --git a/tests/system/load/test_llm.py b/tests/system/load/test_llm.py index 6d22963a97..1d13300115 100644 --- a/tests/system/load/test_llm.py +++ b/tests/system/load/test_llm.py @@ -90,3 +90,69 @@ def test_llm_gemini_configure_fit(llm_fine_tune_df_default_index, llm_remote_tex index=3, ) # TODO(ashleyxu b/335492787): After bqml rolled out version control: save, load, check parameters to ensure configuration was kept + + +# (b/366290533): Claude models are of extremely low capacity. The tests should reside in small tests. Moving these here just to protect BQML's shared capacity(as load test only runs once per day.) and make sure we still have minimum coverage. +@pytest.mark.parametrize( + "model_name", + ("claude-3-sonnet", "claude-3-haiku", "claude-3-5-sonnet", "claude-3-opus"), +) +@pytest.mark.flaky(retries=3, delay=120) +def test_claude3_text_generator_create_load( + dataset_id, model_name, session, session_us_east5, bq_connection +): + if model_name in ("claude-3-5-sonnet", "claude-3-opus"): + session = session_us_east5 + claude3_text_generator_model = llm.Claude3TextGenerator( + model_name=model_name, connection_name=bq_connection, session=session + ) + assert claude3_text_generator_model is not None + assert claude3_text_generator_model._bqml_model is not None + + # save, load to ensure configuration was kept + reloaded_model = claude3_text_generator_model.to_gbq( + f"{dataset_id}.temp_text_model", replace=True + ) + assert f"{dataset_id}.temp_text_model" == reloaded_model._bqml_model.model_name + assert reloaded_model.connection_name == bq_connection + assert reloaded_model.model_name == model_name + + +@pytest.mark.parametrize( + "model_name", + ("claude-3-sonnet", "claude-3-haiku", "claude-3-5-sonnet", "claude-3-opus"), +) +@pytest.mark.flaky(retries=3, delay=120) +def test_claude3_text_generator_predict_default_params_success( + llm_text_df, model_name, session, session_us_east5, bq_connection +): + if model_name in ("claude-3-5-sonnet", "claude-3-opus"): + session = session_us_east5 + claude3_text_generator_model = llm.Claude3TextGenerator( + model_name=model_name, connection_name=bq_connection, session=session + ) + df = claude3_text_generator_model.predict(llm_text_df).to_pandas() + utils.check_pandas_df_schema_and_index( + df, columns=utils.ML_GENERATE_TEXT_OUTPUT, index=3, col_exact=False + ) + + +@pytest.mark.parametrize( + "model_name", + ("claude-3-sonnet", "claude-3-haiku", "claude-3-5-sonnet", "claude-3-opus"), +) +@pytest.mark.flaky(retries=3, delay=120) +def test_claude3_text_generator_predict_with_params_success( + llm_text_df, model_name, session, session_us_east5, bq_connection +): + if model_name in ("claude-3-5-sonnet", "claude-3-opus"): + session = session_us_east5 + claude3_text_generator_model = llm.Claude3TextGenerator( + model_name=model_name, connection_name=bq_connection, session=session + ) + df = claude3_text_generator_model.predict( + llm_text_df, max_output_tokens=100, top_k=20, top_p=0.5 + ).to_pandas() + utils.check_pandas_df_schema_and_index( + df, columns=utils.ML_GENERATE_TEXT_OUTPUT, index=3, col_exact=False + ) diff --git a/tests/system/small/ml/conftest.py b/tests/system/small/ml/conftest.py index ee96646687..c1643776a5 100644 --- a/tests/system/small/ml/conftest.py +++ b/tests/system/small/ml/conftest.py @@ -34,11 +34,6 @@ ) -@pytest.fixture(scope="session") -def bq_connection(bigquery_client) -> str: - return f"{bigquery_client.project}.us.bigframes-rf-conn" - - @pytest.fixture(scope="session") def penguins_bqml_linear_model(session, penguins_linear_model_name) -> core.BqmlModel: model = session.bqclient.get_model(penguins_linear_model_name) @@ -157,20 +152,6 @@ def penguins_pca_model( ) -@pytest.fixture(scope="session") -def llm_text_pandas_df(): - """Additional data matching the penguins dataset, with a new index""" - return pd.DataFrame( - { - "prompt": [ - "What is BigQuery?", - "What is BQML?", - "What is BigQuery DataFrame?", - ], - } - ) - - @pytest.fixture(scope="session") def onnx_iris_pandas_df(): """Data matching the iris dataset.""" @@ -212,11 +193,6 @@ def xgboost_iris_df(session, xgboost_iris_pandas_df): return session.read_pandas(xgboost_iris_pandas_df) -@pytest.fixture(scope="session") -def llm_text_df(session, llm_text_pandas_df): - return session.read_pandas(llm_text_pandas_df) - - @pytest.fixture(scope="session") def bqml_palm2_text_generator_model(session, bq_connection) -> core.BqmlModel: options = { diff --git a/tests/system/small/ml/test_llm.py b/tests/system/small/ml/test_llm.py index cd5d65c458..914548be58 100644 --- a/tests/system/small/ml/test_llm.py +++ b/tests/system/small/ml/test_llm.py @@ -414,73 +414,6 @@ def test_gemini_text_generator_predict_with_params_success( ) -@pytest.mark.parametrize( - "model_name", - ("claude-3-sonnet", "claude-3-haiku", "claude-3-5-sonnet", "claude-3-opus"), -) -@pytest.mark.flaky(retries=3, delay=120) -def test_claude3_text_generator_create_load( - dataset_id, model_name, session, session_us_east5, bq_connection -): - if model_name in ("claude-3-5-sonnet", "claude-3-opus"): - session = session_us_east5 - claude3_text_generator_model = llm.Claude3TextGenerator( - model_name=model_name, connection_name=bq_connection, session=session - ) - assert claude3_text_generator_model is not None - assert claude3_text_generator_model._bqml_model is not None - - # save, load to ensure configuration was kept - reloaded_model = claude3_text_generator_model.to_gbq( - f"{dataset_id}.temp_text_model", replace=True - ) - assert f"{dataset_id}.temp_text_model" == reloaded_model._bqml_model.model_name - assert reloaded_model.connection_name == bq_connection - assert reloaded_model.model_name == model_name - - -@pytest.mark.skip("b/366290533 too many requests are exhausting bqml capacity") -@pytest.mark.parametrize( - "model_name", - ("claude-3-sonnet", "claude-3-haiku", "claude-3-5-sonnet", "claude-3-opus"), -) -@pytest.mark.flaky(retries=3, delay=120) -def test_claude3_text_generator_predict_default_params_success( - llm_text_df, model_name, session, session_us_east5, bq_connection -): - if model_name in ("claude-3-5-sonnet", "claude-3-opus"): - session = session_us_east5 - claude3_text_generator_model = llm.Claude3TextGenerator( - model_name=model_name, connection_name=bq_connection, session=session - ) - df = claude3_text_generator_model.predict(llm_text_df).to_pandas() - utils.check_pandas_df_schema_and_index( - df, columns=utils.ML_GENERATE_TEXT_OUTPUT, index=3, col_exact=False - ) - - -@pytest.mark.skip("b/366290533 too many requests are exhausting bqml capacity") -@pytest.mark.parametrize( - "model_name", - ("claude-3-sonnet", "claude-3-haiku", "claude-3-5-sonnet", "claude-3-opus"), -) -@pytest.mark.flaky(retries=3, delay=120) -def test_claude3_text_generator_predict_with_params_success( - llm_text_df, model_name, session, session_us_east5, bq_connection -): - if model_name in ("claude-3-5-sonnet", "claude-3-opus"): - session = session_us_east5 - claude3_text_generator_model = llm.Claude3TextGenerator( - model_name=model_name, connection_name=bq_connection, session=session - ) - df = claude3_text_generator_model.predict( - llm_text_df, max_output_tokens=100, top_k=20, top_p=0.5 - ).to_pandas() - utils.check_pandas_df_schema_and_index( - df, columns=utils.ML_GENERATE_TEXT_OUTPUT, index=3, col_exact=False - ) - - @pytest.mark.flaky(retries=2) def test_llm_palm_score(llm_fine_tune_df_default_index): model = llm.PaLM2TextGenerator(model_name="text-bison") From 4221632e69f1e4866d5db3cc183021ae8f80b245 Mon Sep 17 00:00:00 2001 From: Chelsea Lin <124939984+chelsea-lin@users.noreply.github.com> Date: Wed, 18 Sep 2024 16:34:54 -0700 Subject: [PATCH 02/14] chore: include unittest workflow in owlbot (#990) --- owlbot.py | 1 - 1 file changed, 1 deletion(-) diff --git a/owlbot.py b/owlbot.py index ca3c8cbe14..b29384d462 100644 --- a/owlbot.py +++ b/owlbot.py @@ -49,7 +49,6 @@ "README.rst", "CONTRIBUTING.rst", ".github/release-trigger.yml", - ".github/workflows/unittest.yml", # BigQuery DataFrames manages its own Kokoro cluster for presubmit & continuous tests. ".kokoro/build.sh", ".kokoro/continuous/common.cfg", From cc48f58cbd94f8110ee863eb57d3fe8dc5a17778 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 19 Sep 2024 16:42:15 -0700 Subject: [PATCH 03/14] feat: Support bool and bytes types in `describe(include='all')` (#994) * feat: Support bool and bytes types in describe(include='all') * update aggregation unit tests * fix typo and remove unnecessary helper * remove unnecessary dep * fix wording --- bigframes/dataframe.py | 48 ++++++++++++---------- bigframes/operations/aggregations.py | 2 +- tests/system/small/test_dataframe.py | 26 ++++++------ tests/unit/operations/test_aggregations.py | 39 +++++++----------- 4 files changed, 55 insertions(+), 60 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 862c8dc2c8..817a02f492 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2303,7 +2303,7 @@ def melt( self._block.melt(id_col_ids, val_col_ids, var_name, value_name) ) - _NUMERICAL_DISCRIBE_AGGS = ( + _NUMERIC_DESCRIBE_AGGS = ( "count", "mean", "std", @@ -2313,41 +2313,53 @@ def melt( "75%", "max", ) - _NON_NUMERICAL_DESCRIBE_AGGS = ("count", "nunique") + _NON_NUMERIC_DESCRIBE_AGGS = ("count", "nunique") def describe(self, include: None | Literal["all"] = None) -> DataFrame: + + allowed_non_numeric_types = { + bigframes.dtypes.STRING_DTYPE, + bigframes.dtypes.BOOL_DTYPE, + bigframes.dtypes.BYTES_DTYPE, + } + if include is None: numeric_df = self._drop_non_numeric(permissive=False) if len(numeric_df.columns) == 0: - # Describe eligible non-numerical columns - result = self._drop_non_string().agg(self._NON_NUMERICAL_DESCRIBE_AGGS) + # Describe eligible non-numeric columns + result = self.select_dtypes(include=allowed_non_numeric_types).agg( + self._NON_NUMERIC_DESCRIBE_AGGS + ) else: - # Otherwise, only describe numerical columns - result = numeric_df.agg(self._NUMERICAL_DISCRIBE_AGGS) + # Otherwise, only describe numeric columns + result = numeric_df.agg(self._NUMERIC_DESCRIBE_AGGS) return typing.cast(DataFrame, result) elif include == "all": numeric_result = typing.cast( DataFrame, self._drop_non_numeric(permissive=False).agg( - self._NUMERICAL_DISCRIBE_AGGS + self._NUMERIC_DESCRIBE_AGGS ), ) - string_result = typing.cast( + + non_numeric_result = typing.cast( DataFrame, - self._drop_non_string().agg(self._NON_NUMERICAL_DESCRIBE_AGGS), + self.select_dtypes(include=allowed_non_numeric_types).agg( + self._NON_NUMERIC_DESCRIBE_AGGS + ), ) if len(numeric_result.columns) == 0: - return string_result - elif len(string_result.columns) == 0: + return non_numeric_result + elif len(non_numeric_result.columns) == 0: return numeric_result else: import bigframes.core.reshape as rs # Use reindex after join to preserve the original column order. return rs.concat( - [numeric_result, string_result], axis=1 + [non_numeric_result, numeric_result], axis=1 )._reindex_columns(self.columns) else: @@ -2549,7 +2561,7 @@ def unstack(self, level: LevelsType = -1): return DataFrame(pivot_block) def _drop_non_numeric(self, permissive=True) -> DataFrame: - numerical_types = ( + numeric_types = ( set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) if permissive else set(bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_RESTRICTIVE) @@ -2557,18 +2569,10 @@ def _drop_non_numeric(self, permissive=True) -> DataFrame: non_numeric_cols = [ col_id for col_id, dtype in zip(self._block.value_columns, self._block.dtypes) - if dtype not in numerical_types + if dtype not in numeric_types ] return DataFrame(self._block.drop_columns(non_numeric_cols)) - def _drop_non_string(self) -> DataFrame: - string_cols = [ - col_id - for col_id, dtype in zip(self._block.value_columns, self._block.dtypes) - if dtype == bigframes.dtypes.STRING_DTYPE - ] - return DataFrame(self._block.select_columns(string_cols)) - def _drop_non_bool(self) -> DataFrame: non_bool_cols = [ col_id diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index f20429e449..d071889ac4 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -568,7 +568,7 @@ def is_agg_op_supported(dtype: dtypes.Dtype, op: AggregateOp) -> bool: if dtype in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE: return True - if dtype == dtypes.STRING_DTYPE: + if dtype in (dtypes.STRING_DTYPE, dtypes.BOOL_DTYPE, dtypes.BYTES_DTYPE): return isinstance(op, (CountOp, NuniqueOp)) # For all other types, support no aggregation diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index b4c81bfbef..0a637e983f 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2619,15 +2619,15 @@ def test_df_describe(scalars_dfs): @skip_legacy_pandas @pytest.mark.parametrize("include", [None, "all"]) -def test_df_describe_non_numerical(scalars_dfs, include): +def test_df_describe_non_numeric(scalars_dfs, include): scalars_df, scalars_pandas_df = scalars_dfs - non_numerical_columns = ["string_col"] + non_numeric_columns = ["string_col", "bytes_col", "bool_col"] - modified_bf = scalars_df[non_numerical_columns] + modified_bf = scalars_df[non_numeric_columns] bf_result = modified_bf.describe(include=include).to_pandas() - modified_pd_df = scalars_pandas_df[non_numerical_columns] + modified_pd_df = scalars_pandas_df[non_numeric_columns] pd_result = modified_pd_df.describe(include=include) # Reindex results with the specified keys and their order, because @@ -2639,8 +2639,8 @@ def test_df_describe_non_numerical(scalars_dfs, include): ).rename(index={"unique": "nunique"}) pd.testing.assert_frame_equal( - pd_result[non_numerical_columns].astype("Int64"), - bf_result[non_numerical_columns], + pd_result[non_numeric_columns].astype("Int64"), + bf_result[non_numeric_columns], check_index_type=False, ) @@ -2649,12 +2649,12 @@ def test_df_describe_non_numerical(scalars_dfs, include): def test_df_describe_mixed_types_include_all(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs - numerical_columns = [ + numeric_columns = [ "int64_col", "float64_col", ] - non_numerical_columns = ["string_col"] - supported_columns = numerical_columns + non_numerical_columns + non_numeric_columns = ["string_col"] + supported_columns = numeric_columns + non_numeric_columns modified_bf = scalars_df[supported_columns] bf_result = modified_bf.describe(include="all").to_pandas() @@ -2678,14 +2678,14 @@ def test_df_describe_mixed_types_include_all(scalars_dfs): ).rename(index={"unique": "nunique"}) pd.testing.assert_frame_equal( - pd_result[numerical_columns].astype("Float64"), - bf_result[numerical_columns], + pd_result[numeric_columns].astype("Float64"), + bf_result[numeric_columns], check_index_type=False, ) pd.testing.assert_frame_equal( - pd_result[non_numerical_columns].astype("Int64"), - bf_result[non_numerical_columns], + pd_result[non_numeric_columns].astype("Int64"), + bf_result[non_numeric_columns], check_index_type=False, ) diff --git a/tests/unit/operations/test_aggregations.py b/tests/unit/operations/test_aggregations.py index 4cb6934c9d..68ad48ac29 100644 --- a/tests/unit/operations/test_aggregations.py +++ b/tests/unit/operations/test_aggregations.py @@ -55,38 +55,29 @@ first_op, ] ) -_STRING_SUPPORTED_OPS = set([count_op, nunique_op]) @pytest.mark.parametrize("dtype", dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @pytest.mark.parametrize("op", _ALL_OPS) -def test_is_agg_op_supported_numerical_support_all(dtype, op): +def test_is_agg_op_supported_numeric_support_all(dtype, op): assert is_agg_op_supported(dtype, op) is True -@pytest.mark.parametrize("dtype", [dtypes.STRING_DTYPE]) -@pytest.mark.parametrize("op", _STRING_SUPPORTED_OPS) -def test_is_agg_op_supported_string_support_ops(dtype, op): - assert is_agg_op_supported(dtype, op) is True - - -@pytest.mark.parametrize("dtype", [dtypes.STRING_DTYPE]) -@pytest.mark.parametrize("op", _ALL_OPS - _STRING_SUPPORTED_OPS) -def test_is_agg_op_supported_string_not_support_ops(dtype, op): - assert is_agg_op_supported(dtype, op) is False - - @pytest.mark.parametrize( - "dtype", + ("dtype", "supported_ops"), [ - dtypes.BYTES_DTYPE, - dtypes.DATE_DTYPE, - dtypes.TIME_DTYPE, - dtypes.DATETIME_DTYPE, - dtypes.TIMESTAMP_DTYPE, - dtypes.GEO_DTYPE, + (dtypes.STRING_DTYPE, {count_op, nunique_op}), + (dtypes.BYTES_DTYPE, {count_op, nunique_op}), + (dtypes.DATE_DTYPE, set()), + (dtypes.TIME_DTYPE, set()), + (dtypes.DATETIME_DTYPE, set()), + (dtypes.TIMESTAMP_DTYPE, set()), + (dtypes.GEO_DTYPE, set()), ], ) -@pytest.mark.parametrize("op", _ALL_OPS) -def test_is_agg_op_supported_non_numerical_no_support(dtype, op): - assert is_agg_op_supported(dtype, op) is False +def test_is_agg_op_supported_non_numeric(dtype, supported_ops): + for op in supported_ops: + assert is_agg_op_supported(dtype, op) is True + + for op in _ALL_OPS - supported_ops: + assert is_agg_op_supported(dtype, op) is False From 8520873a6e4ba8e572b196e610454793e0fce9af Mon Sep 17 00:00:00 2001 From: Garrett Wu <6505921+GarrettWu@users.noreply.github.com> Date: Fri, 20 Sep 2024 15:02:58 -0700 Subject: [PATCH 04/14] test: disable deprecated embedding model tests (#1006) * test: disable deprecated embedding models * delete tests --- tests/system/small/ml/test_llm.py | 119 ------------------------------ 1 file changed, 119 deletions(-) diff --git a/tests/system/small/ml/test_llm.py b/tests/system/small/ml/test_llm.py index 914548be58..3093a36534 100644 --- a/tests/system/small/ml/test_llm.py +++ b/tests/system/small/ml/test_llm.py @@ -178,125 +178,6 @@ def test_text_generator_predict_with_params_success( ) -def test_create_embedding_generator_model( - palm2_embedding_generator_model, dataset_id, bq_connection -): - # Model creation doesn't return error - assert palm2_embedding_generator_model is not None - assert palm2_embedding_generator_model._bqml_model is not None - - # save, load to ensure configuration was kept - reloaded_model = palm2_embedding_generator_model.to_gbq( - f"{dataset_id}.temp_embedding_model", replace=True - ) - assert f"{dataset_id}.temp_embedding_model" == reloaded_model._bqml_model.model_name - assert reloaded_model.model_name == "textembedding-gecko" - assert reloaded_model.connection_name == bq_connection - - -def test_create_embedding_generator_model_002( - palm2_embedding_generator_model_002, dataset_id, bq_connection -): - # Model creation doesn't return error - assert palm2_embedding_generator_model_002 is not None - assert palm2_embedding_generator_model_002._bqml_model is not None - - # save, load to ensure configuration was kept - reloaded_model = palm2_embedding_generator_model_002.to_gbq( - f"{dataset_id}.temp_embedding_model", replace=True - ) - assert f"{dataset_id}.temp_embedding_model" == reloaded_model._bqml_model.model_name - assert reloaded_model.model_name == "textembedding-gecko" - assert reloaded_model.version == "002" - assert reloaded_model.connection_name == bq_connection - - -def test_create_embedding_generator_multilingual_model( - palm2_embedding_generator_multilingual_model, - dataset_id, - bq_connection, -): - # Model creation doesn't return error - assert palm2_embedding_generator_multilingual_model is not None - assert palm2_embedding_generator_multilingual_model._bqml_model is not None - - # save, load to ensure configuration was kept - reloaded_model = palm2_embedding_generator_multilingual_model.to_gbq( - f"{dataset_id}.temp_embedding_model", replace=True - ) - assert f"{dataset_id}.temp_embedding_model" == reloaded_model._bqml_model.model_name - assert reloaded_model.model_name == "textembedding-gecko-multilingual" - assert reloaded_model.connection_name == bq_connection - - -def test_create_text_embedding_generator_model_defaults(bq_connection): - import bigframes.pandas as bpd - - # Note: This starts a thread-local session. - with bpd.option_context( - "bigquery.bq_connection", - bq_connection, - "bigquery.location", - "US", - ): - model = llm.PaLM2TextEmbeddingGenerator() - assert model is not None - assert model._bqml_model is not None - - -def test_create_text_embedding_generator_multilingual_model_defaults(bq_connection): - import bigframes.pandas as bpd - - # Note: This starts a thread-local session. - with bpd.option_context( - "bigquery.bq_connection", - bq_connection, - "bigquery.location", - "US", - ): - model = llm.PaLM2TextEmbeddingGenerator( - model_name="textembedding-gecko-multilingual" - ) - assert model is not None - assert model._bqml_model is not None - - -@pytest.mark.flaky(retries=2) -def test_embedding_generator_predict_success( - palm2_embedding_generator_model, llm_text_df -): - df = palm2_embedding_generator_model.predict(llm_text_df).to_pandas() - assert df.shape == (3, 4) - assert "text_embedding" in df.columns - series = df["text_embedding"] - value = series[0] - assert len(value) == 768 - - -@pytest.mark.flaky(retries=2) -def test_embedding_generator_multilingual_predict_success( - palm2_embedding_generator_multilingual_model, llm_text_df -): - df = palm2_embedding_generator_multilingual_model.predict(llm_text_df).to_pandas() - assert df.shape == (3, 4) - assert "text_embedding" in df.columns - series = df["text_embedding"] - value = series[0] - assert len(value) == 768 - - -@pytest.mark.flaky(retries=2) -def test_embedding_generator_predict_series_success( - palm2_embedding_generator_model, llm_text_df -): - df = palm2_embedding_generator_model.predict(llm_text_df["prompt"]).to_pandas() - assert df.shape == (3, 4) - assert "text_embedding" in df.columns - series = df["text_embedding"] - value = series[0] - assert len(value) == 768 - - @pytest.mark.parametrize( "model_name", ("text-embedding-004", "text-multilingual-embedding-002"), From 038139dfa4fa89167c52c1cb559c2eb5fe2f0411 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Fri, 20 Sep 2024 16:52:49 -0700 Subject: [PATCH 05/14] fix: Fix miscasting issues with case_when (#1003) --- bigframes/core/expression.py | 9 +++- bigframes/operations/base.py | 74 ++++++++++++++++++++++++------- bigframes/series.py | 33 ++++---------- tests/system/small/test_series.py | 13 +++--- 4 files changed, 81 insertions(+), 48 deletions(-) diff --git a/bigframes/core/expression.py b/bigframes/core/expression.py index bbd23b689c..4779e92cde 100644 --- a/bigframes/core/expression.py +++ b/bigframes/core/expression.py @@ -25,7 +25,9 @@ import bigframes.operations.aggregations as agg_ops -def const(value: typing.Hashable, dtype: dtypes.ExpressionType = None) -> Expression: +def const( + value: typing.Hashable, dtype: dtypes.ExpressionType = None +) -> ScalarConstantExpression: return ScalarConstantExpression(value, dtype or dtypes.infer_literal_type(value)) @@ -141,6 +143,9 @@ class ScalarConstantExpression(Expression): def is_const(self) -> bool: return True + def rename(self, name_mapping: Mapping[str, str]) -> ScalarConstantExpression: + return self + def output_type( self, input_types: dict[str, bigframes.dtypes.Dtype] ) -> dtypes.ExpressionType: @@ -167,7 +172,7 @@ class UnboundVariableExpression(Expression): def unbound_variables(self) -> typing.Tuple[str, ...]: return (self.id,) - def rename(self, name_mapping: Mapping[str, str]) -> Expression: + def rename(self, name_mapping: Mapping[str, str]) -> UnboundVariableExpression: if self.id in name_mapping: return UnboundVariableExpression(name_mapping[self.id]) else: diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index 68f46baded..f9a6a87b7a 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -15,7 +15,7 @@ from __future__ import annotations import typing -from typing import List, Sequence +from typing import List, Sequence, Union import bigframes_vendored.constants as constants import bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing @@ -180,9 +180,10 @@ def _apply_binary_op( (self_col, other_col, block) = self._align(other_series, how=alignment) name = self._name + # Drop name if both objects have name attr, but they don't match if ( hasattr(other, "name") - and other.name != self._name + and other_series.name != self._name and alignment == "outer" ): name = None @@ -208,22 +209,41 @@ def _apply_nary_op( ignore_self=False, ): """Applies an n-ary operator to the series and others.""" - values, block = self._align_n(others, ignore_self=ignore_self) - block, result_id = block.apply_nary_op( - values, - op, - self._name, + values, block = self._align_n( + others, ignore_self=ignore_self, cast_scalars=False ) + block, result_id = block.project_expr(op.as_expr(*values)) return series.Series(block.select_column(result_id)) def _apply_binary_aggregation( self, other: series.Series, stat: agg_ops.BinaryAggregateOp ) -> float: (left, right, block) = self._align(other, how="outer") + assert isinstance(left, ex.UnboundVariableExpression) + assert isinstance(right, ex.UnboundVariableExpression) + return block.get_binary_stat(left.id, right.id, stat) + + AlignedExprT = Union[ex.ScalarConstantExpression, ex.UnboundVariableExpression] - return block.get_binary_stat(left, right, stat) + @typing.overload + def _align( + self, other: series.Series, how="outer" + ) -> tuple[ + ex.UnboundVariableExpression, + ex.UnboundVariableExpression, + blocks.Block, + ]: + ... - def _align(self, other: series.Series, how="outer") -> tuple[str, str, blocks.Block]: # type: ignore + @typing.overload + def _align( + self, other: typing.Union[series.Series, scalars.Scalar], how="outer" + ) -> tuple[ex.UnboundVariableExpression, AlignedExprT, blocks.Block,]: + ... + + def _align( + self, other: typing.Union[series.Series, scalars.Scalar], how="outer" + ) -> tuple[ex.UnboundVariableExpression, AlignedExprT, blocks.Block,]: """Aligns the series value with another scalar or series object. Returns new left column id, right column id and joined tabled expression.""" values, block = self._align_n( [ @@ -231,18 +251,36 @@ def _align(self, other: series.Series, how="outer") -> tuple[str, str, blocks.Bl ], how, ) - return (values[0], values[1], block) + return (typing.cast(ex.UnboundVariableExpression, values[0]), values[1], block) + + def _align3(self, other1: series.Series | scalars.Scalar, other2: series.Series | scalars.Scalar, how="left") -> tuple[ex.UnboundVariableExpression, AlignedExprT, AlignedExprT, blocks.Block]: # type: ignore + """Aligns the series value with 2 other scalars or series objects. Returns new values and joined tabled expression.""" + values, index = self._align_n([other1, other2], how) + return ( + typing.cast(ex.UnboundVariableExpression, values[0]), + values[1], + values[2], + index, + ) def _align_n( self, others: typing.Sequence[typing.Union[series.Series, scalars.Scalar]], how="outer", ignore_self=False, - ) -> tuple[typing.Sequence[str], blocks.Block]: + cast_scalars: bool = True, + ) -> tuple[ + typing.Sequence[ + Union[ex.ScalarConstantExpression, ex.UnboundVariableExpression] + ], + blocks.Block, + ]: if ignore_self: - value_ids: List[str] = [] + value_ids: List[ + Union[ex.ScalarConstantExpression, ex.UnboundVariableExpression] + ] = [] else: - value_ids = [self._value_column] + value_ids = [ex.free_var(self._value_column)] block = self._block for other in others: @@ -252,14 +290,16 @@ def _align_n( get_column_right, ) = block.join(other._block, how=how) value_ids = [ - *[get_column_left[value] for value in value_ids], - get_column_right[other._value_column], + *[value.rename(get_column_left) for value in value_ids], + ex.free_var(get_column_right[other._value_column]), ] else: # Will throw if can't interpret as scalar. dtype = typing.cast(bigframes.dtypes.Dtype, self._dtype) - block, constant_col_id = block.create_constant(other, dtype=dtype) - value_ids = [*value_ids, constant_col_id] + value_ids = [ + *value_ids, + ex.const(other, dtype=dtype if cast_scalars else None), + ] return (value_ids, block) def _throw_if_null_index(self, opname: str): diff --git a/bigframes/series.py b/bigframes/series.py index 3a75ab9ccc..82fb6c5089 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -445,23 +445,13 @@ def between(self, left, right, inclusive="both"): ) def case_when(self, caselist) -> Series: + cases = list(itertools.chain(*caselist, (True, self))) return self._apply_nary_op( ops.case_when_op, - tuple( - itertools.chain( - itertools.chain(*caselist), - # Fallback to current value if no other matches. - ( - # We make a Series with a constant value to avoid casts to - # types other than boolean. - Series(True, index=self.index, dtype=pandas.BooleanDtype()), - self, - ), - ), - ), + cases, # Self is already included in "others". ignore_self=True, - ) + ).rename(self.name) @validations.requires_ordering() def cumsum(self) -> Series: @@ -1116,8 +1106,8 @@ def ne(self, other: object) -> Series: def where(self, cond, other=None): value_id, cond_id, other_id, block = self._align3(cond, other) - block, result_id = block.apply_ternary_op( - value_id, cond_id, other_id, ops.where_op + block, result_id = block.project_expr( + ops.where_op.as_expr(value_id, cond_id, other_id) ) return Series(block.select_column(result_id).with_column_labels([self.name])) @@ -1129,8 +1119,8 @@ def clip(self, lower, upper): if upper is None: return self._apply_binary_op(lower, ops.maximum_op, alignment="left") value_id, lower_id, upper_id, block = self._align3(lower, upper) - block, result_id = block.apply_ternary_op( - value_id, lower_id, upper_id, ops.clip_op + block, result_id = block.project_expr( + ops.clip_op.as_expr(value_id, lower_id, upper_id), ) return Series(block.select_column(result_id).with_column_labels([self.name])) @@ -1242,8 +1232,8 @@ def __getitem__(self, indexer): return self.iloc[indexer] if isinstance(indexer, Series): (left, right, block) = self._align(indexer, "left") - block = block.filter_by_id(right) - block = block.select_column(left) + block = block.filter(right) + block = block.select_column(left.id) return Series(block) return self.loc[indexer] @@ -1262,11 +1252,6 @@ def __getattr__(self, key: str): else: raise AttributeError(key) - def _align3(self, other1: Series | scalars.Scalar, other2: Series | scalars.Scalar, how="left") -> tuple[str, str, str, blocks.Block]: # type: ignore - """Aligns the series value with 2 other scalars or series objects. Returns new values and joined tabled expression.""" - values, index = self._align_n([other1, other2], how) - return (values[0], values[1], values[2], index) - def _apply_aggregation( self, op: agg_ops.UnaryAggregateOp | agg_ops.NullaryAggregateOp ) -> Any: diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index b8f7926aec..793a4062c5 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2709,27 +2709,30 @@ def test_between(scalars_df_index, scalars_pandas_df_index, left, right, inclusi ) -def test_case_when(scalars_df_index, scalars_pandas_df_index): +def test_series_case_when(scalars_dfs_maybe_ordered): pytest.importorskip( "pandas", minversion="2.2.0", reason="case_when added in pandas 2.2.0", ) + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered - bf_series = scalars_df_index["int64_col"] - pd_series = scalars_pandas_df_index["int64_col"] + bf_series = scalars_df["int64_col"] + pd_series = scalars_pandas_df["int64_col"] # TODO(tswast): pandas case_when appears to assume True when a value is # null. I suspect this should be considered a bug in pandas. bf_result = bf_series.case_when( [ - ((bf_series > 100).fillna(True), 1000), + ((bf_series > 100).fillna(True), bf_series - 1), + ((bf_series > 0).fillna(True), pd.NA), ((bf_series < -100).fillna(True), -1000), ] ).to_pandas() pd_result = pd_series.case_when( [ - (pd_series > 100, 1000), + (pd_series > 100, pd_series - 1), + (pd_series > 0, pd.NA), (pd_series < -100, -1000), ] ) From bb04742500d445ae196698df87291dc7b1c94102 Mon Sep 17 00:00:00 2001 From: Garrett Wu <6505921+GarrettWu@users.noreply.github.com> Date: Fri, 20 Sep 2024 17:02:59 -0700 Subject: [PATCH 06/14] chore: exclude claude3 notebook in nox session (#1008) --- noxfile.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/noxfile.py b/noxfile.py index bf4c6f9641..f459efef10 100644 --- a/noxfile.py +++ b/noxfile.py @@ -719,6 +719,8 @@ def notebook(session: nox.Session): # bq_dataframes_llm_code_generation creates a bucket in the sample. "notebooks/generative_ai/bq_dataframes_llm_code_generation.ipynb", # Needs BUCKET_URI. "notebooks/generative_ai/sentiment_analysis.ipynb", # Too slow + # TODO(b/366290533): to protect BQML quota + "notebooks/generative_ai/bq_dataframes_llm_claude3_museum_art.ipynb", "notebooks/vertex_sdk/sdk2_bigframes_pytorch.ipynb", # Needs BUCKET_URI. "notebooks/vertex_sdk/sdk2_bigframes_sklearn.ipynb", # Needs BUCKET_URI. "notebooks/vertex_sdk/sdk2_bigframes_tensorflow.ipynb", # Needs BUCKET_URI. From f89785fcfc51c541253ca8c1e8baf80fbfaea3b6 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sun, 22 Sep 2024 18:46:02 -0700 Subject: [PATCH 07/14] deps: update ibis version in prerelease tests (#1012) * deps: update ibis version in prerelease tests * exclude remote function tests from prerelease --- noxfile.py | 24 +++++++++++++++++++--- tests/system/large/test_remote_function.py | 3 +++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/noxfile.py b/noxfile.py index f459efef10..c704da00a5 100644 --- a/noxfile.py +++ b/noxfile.py @@ -543,7 +543,7 @@ def docfx(session): ) -def prerelease(session: nox.sessions.Session, tests_path): +def prerelease(session: nox.sessions.Session, tests_path, extra_pytest_options=()): constraints_path = str( CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt" ) @@ -588,7 +588,7 @@ def prerelease(session: nox.sessions.Session, tests_path): session.install( "--upgrade", "--pre", - "ibis-framework>=8.0.0,<9.0.0dev", + "ibis-framework>=9.0.0,<=9.2.0", ) already_installed.add("ibis-framework") @@ -662,6 +662,7 @@ def prerelease(session: nox.sessions.Session, tests_path): "--cov-report=term-missing", "--cov-fail-under=0", tests_path, + *extra_pytest_options, *session.posargs, ) @@ -675,7 +676,24 @@ def unit_prerelease(session: nox.sessions.Session): @nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS[-1]) def system_prerelease(session: nox.sessions.Session): """Run the system test suite with prerelease dependencies.""" - prerelease(session, os.path.join("tests", "system", "small")) + small_tests_dir = os.path.join("tests", "system", "small") + + # Let's exclude remote function tests from the prerelease tests, since the + # some of the package dependencies propagate to the cloud run functions' + # requirements.txt, and the prerelease package versions may not be available + # in the standard pip install. + # This would mean that we will only rely on the standard remote function + # tests. + small_remote_function_tests = os.path.join( + small_tests_dir, "test_remote_function.py" + ) + assert os.path.exists(small_remote_function_tests) + + prerelease( + session, + os.path.join("tests", "system", "small"), + (f"--ignore={small_remote_function_tests}",), + ) @nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index e224f65a01..d1e82dd415 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1690,6 +1690,9 @@ def analyze(row): ), ), id="multiindex", + marks=pytest.mark.skip( + reason="TODO(b/368639580) revert this skip after fix" + ), ), pytest.param( pandas.DataFrame( From 6b34244e9ebe747161da21af2f31ee537b653093 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 23 Sep 2024 03:40:13 -0700 Subject: [PATCH 08/14] test: ensure all `remote_function` APIs work in partial ordering mode (#1000) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * test: ensure all `remote_function` APIs work in partial ordering mode * remove force_reproject from more APIs * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * temporarily skip multiindex test for axis=1 --------- Co-authored-by: Owl Bot --- bigframes/dataframe.py | 14 +- bigframes/series.py | 6 +- tests/system/small/test_remote_function.py | 246 ++++++++++++++++++--- 3 files changed, 224 insertions(+), 42 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 817a02f492..49a668f008 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3473,11 +3473,7 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: raise ValueError(f"na_action={na_action} not supported") # TODO(shobs): Support **kwargs - # Reproject as workaround to applying filter too late. This forces the - # filter to be applied before passing data to remote function, - # protecting from bad inputs causing errors. - reprojected_df = DataFrame(self._block._force_reproject()) - return reprojected_df._apply_unary_op( + return self._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=(na_action is None)) ) @@ -3572,13 +3568,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ) series_list = [self[col] for col in self.columns] - # Reproject as workaround to applying filter too late. This forces the - # filter to be applied before passing data to remote function, - # protecting from bad inputs causing errors. - reprojected_series = bigframes.series.Series( - series_list[0]._block._force_reproject() - ) - result_series = reprojected_series._apply_nary_op( + result_series = series_list[0]._apply_nary_op( ops.NaryRemoteFunctionOp(func=func), series_list[1:] ) result_series.name = None diff --git a/bigframes/series.py b/bigframes/series.py index 82fb6c5089..193eea7ee3 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1480,11 +1480,7 @@ def combine( ex.message += f"\n{_remote_function_recommendation_message}" raise - # Reproject as workaround to applying filter too late. This forces the - # filter to be applied before passing data to remote function, - # protecting from bad inputs causing errors. - reprojected_series = Series(self._block._force_reproject()) - result_series = reprojected_series._apply_binary_op( + result_series = self._apply_binary_op( other, ops.BinaryRemoteFunctionOp(func=func) ) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 5ffda56f92..f68589f431 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -498,6 +498,37 @@ def add_one(x): assert_pandas_df_equal(bf_result, pd_result) +@pytest.mark.flaky(retries=2, delay=120) +def test_dataframe_applymap_explicit_filter( + session_with_bq_connection, scalars_dfs, dataset_id_permanent +): + def add_one(x): + return x + 1 + + remote_add_one = session_with_bq_connection.remote_function( + [int], int, dataset_id_permanent, name=get_rf_name(add_one) + )(add_one) + + scalars_df, scalars_pandas_df = scalars_dfs + int64_cols = ["int64_col", "int64_too"] + + bf_int64_df = scalars_df[int64_cols] + bf_int64_df_filtered = bf_int64_df[bf_int64_df["int64_col"].notnull()] + bf_result = bf_int64_df_filtered.applymap(remote_add_one).to_pandas() + + pd_int64_df = scalars_pandas_df[int64_cols] + pd_int64_df_filtered = pd_int64_df[pd_int64_df["int64_col"].notnull()] + pd_result = pd_int64_df_filtered.applymap(add_one) + # TODO(shobs): Figure why pandas .applymap() changes the dtype, i.e. + # pd_int64_df_filtered.dtype is Int64Dtype() + # pd_int64_df_filtered.applymap(lambda x: x).dtype is int64. + # For this test let's force the pandas dtype to be same as input. + for col in pd_result: + pd_result[col] = pd_result[col].astype(pd_int64_df_filtered[col].dtype) + + assert_pandas_df_equal(bf_result, pd_result) + + @pytest.mark.flaky(retries=2, delay=120) def test_dataframe_applymap_na_ignore( session_with_bq_connection, scalars_dfs, dataset_id_permanent @@ -1024,12 +1055,21 @@ def test_read_gbq_function_application_repr(session, dataset_id, scalars_df_inde repr(s.mask(should_mask, "REDACTED")) +@pytest.mark.parametrize( + ("method",), + [ + pytest.param("apply"), + pytest.param("map"), + pytest.param("mask"), + ], +) @pytest.mark.flaky(retries=2, delay=120) -def test_remote_function_apply_after_filter(session, dataset_id_permanent, scalars_dfs): - +def test_remote_function_unary_applied_after_filter( + session, dataset_id_permanent, scalars_dfs, method +): # This function is deliberately written to not work with NA input - def plus_one(x: int) -> int: - return x + 1 + def is_odd(x: int) -> bool: + return x % 2 == 1 scalars_df, scalars_pandas_df = scalars_dfs int_col_name_with_nulls = "int64_col" @@ -1038,47 +1078,203 @@ def plus_one(x: int) -> int: assert any([pd.isna(val) for val in scalars_df[int_col_name_with_nulls]]) # create a remote function - plus_one_remote = session.remote_function( - dataset=dataset_id_permanent, name=get_rf_name(plus_one) - )(plus_one) + is_odd_remote = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(is_odd) + )(is_odd) # with nulls in the series the remote function application would fail with pytest.raises( google.api_core.exceptions.BadRequest, match="unsupported operand" ): - scalars_df[int_col_name_with_nulls].apply(plus_one_remote).to_pandas() + bf_method = getattr(scalars_df[int_col_name_with_nulls], method) + bf_method(is_odd_remote).to_pandas() - # after filtering out nulls the remote function application should works + # after filtering out nulls the remote function application should work # similar to pandas - pd_result = scalars_pandas_df[scalars_pandas_df[int_col_name_with_nulls].notnull()][ - int_col_name_with_nulls - ].apply(plus_one) - bf_result = ( + pd_method = getattr( + scalars_pandas_df[scalars_pandas_df[int_col_name_with_nulls].notnull()][ + int_col_name_with_nulls + ], + method, + ) + pd_result = pd_method(is_odd) + bf_method = getattr( scalars_df[scalars_df[int_col_name_with_nulls].notnull()][ int_col_name_with_nulls - ] - .apply(plus_one_remote) + ], + method, + ) + bf_result = bf_method(is_odd_remote).to_pandas() + + # ignore any dtype difference + pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_binary_applied_after_filter( + session, dataset_id_permanent, scalars_dfs +): + # This function is deliberately written to not work with NA input + def add(x: int, y: int) -> int: + return x + y + + scalars_df, scalars_pandas_df = scalars_dfs + int_col_name_with_nulls = "int64_col" + int_col_name_no_nulls = "int64_too" + bf_df = scalars_df[[int_col_name_with_nulls, int_col_name_no_nulls]] + pd_df = scalars_pandas_df[[int_col_name_with_nulls, int_col_name_no_nulls]] + + # make sure there are NA values in the test column + assert any([pd.isna(val) for val in bf_df[int_col_name_with_nulls]]) + + # create a remote function + add_remote = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(add) + )(add) + + # with nulls in the series the remote function application would fail + with pytest.raises( + google.api_core.exceptions.BadRequest, match="unsupported operand" + ): + bf_df[int_col_name_with_nulls].combine( + bf_df[int_col_name_no_nulls], add_remote + ).to_pandas() + + # after filtering out nulls the remote function application should work + # similar to pandas + pd_filter = pd_df[int_col_name_with_nulls].notnull() + pd_result = pd_df[pd_filter][int_col_name_with_nulls].combine( + pd_df[pd_filter][int_col_name_no_nulls], add + ) + bf_filter = bf_df[int_col_name_with_nulls].notnull() + bf_result = ( + bf_df[bf_filter][int_col_name_with_nulls] + .combine(bf_df[bf_filter][int_col_name_no_nulls], add_remote) .to_pandas() ) - # ignore pandas "int64" vs bigframes "Int64" dtype difference + # ignore any dtype difference pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) -def test_remote_function_apply_assign_partial_ordering_mode(dataset_id_permanent): - session = bigframes.Session(bigframes.BigQueryOptions(ordering_mode="partial")) +def test_remote_function_nary_applied_after_filter( + session, dataset_id_permanent, scalars_dfs +): + # This function is deliberately written to not work with NA input + def add(x: int, y: int, z: float) -> float: + return x + y + z - df = session.read_gbq("bigquery-public-data.baseball.schedules")[ + scalars_df, scalars_pandas_df = scalars_dfs + int_col_name_with_nulls = "int64_col" + int_col_name_no_nulls = "int64_too" + float_col_name_with_nulls = "float64_col" + bf_df = scalars_df[ + [int_col_name_with_nulls, int_col_name_no_nulls, float_col_name_with_nulls] + ] + pd_df = scalars_pandas_df[ + [int_col_name_with_nulls, int_col_name_no_nulls, float_col_name_with_nulls] + ] + + # make sure there are NA values in the test columns + assert any([pd.isna(val) for val in bf_df[int_col_name_with_nulls]]) + assert any([pd.isna(val) for val in bf_df[float_col_name_with_nulls]]) + + # create a remote function + add_remote = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(add) + )(add) + + # pandas does not support nary functions, so let's create a proxy function + # for testing purpose that takes a series and in turn calls the naray function + def add_pandas(s: pd.Series) -> float: + return add( + s[int_col_name_with_nulls], + s[int_col_name_no_nulls], + s[float_col_name_with_nulls], + ) + + # with nulls in the series the remote function application would fail + with pytest.raises( + google.api_core.exceptions.BadRequest, match="unsupported operand" + ): + bf_df.apply(add_remote, axis=1).to_pandas() + + # after filtering out nulls the remote function application should work + # similar to pandas + pd_filter = ( + pd_df[int_col_name_with_nulls].notnull() + & pd_df[float_col_name_with_nulls].notnull() + ) + pd_result = pd_df[pd_filter].apply(add_pandas, axis=1) + bf_filter = ( + bf_df[int_col_name_with_nulls].notnull() + & bf_df[float_col_name_with_nulls].notnull() + ) + bf_result = bf_df[bf_filter].apply(add_remote, axis=1).to_pandas() + + # ignore any dtype difference + pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + +@pytest.mark.parametrize( + ("method",), + [ + pytest.param("apply"), + pytest.param("map"), + pytest.param("mask"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_unary_partial_ordering_mode_assign( + unordered_session, dataset_id_permanent, method +): + df = unordered_session.read_gbq("bigquery-public-data.baseball.schedules")[ ["duration_minutes"] ] - def plus_one(x: int) -> int: - return x + 1 + def is_long_duration(minutes: int) -> bool: + return minutes >= 120 + + is_long_duration = unordered_session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(is_long_duration) + )(is_long_duration) + + method = getattr(df["duration_minutes"], method) + + df1 = df.assign(duration_meta=method(is_long_duration)) + repr(df1) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_binary_partial_ordering_mode_assign( + unordered_session, dataset_id_permanent, scalars_df_index +): + def combiner(x: int, y: int) -> int: + if x is None: + return y + return x + + combiner = unordered_session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(combiner) + )(combiner) + + df = scalars_df_index[["int64_col", "int64_too", "float64_col", "string_col"]] + df1 = df.assign(int64_combined=df["int64_col"].combine(df["int64_too"], combiner)) + repr(df1) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_nary_partial_ordering_mode_assign( + unordered_session, dataset_id_permanent, scalars_df_index +): + def processor(x: int, y: int, z: float, w: str) -> str: + return f"I got x={x}, y={y}, z={z} and w={w}" - plus_one = session.remote_function( - dataset=dataset_id_permanent, name=get_rf_name(plus_one) - )(plus_one) + processor = unordered_session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(processor) + )(processor) - df1 = df.assign(duration_cat=df["duration_minutes"].apply(plus_one)) + df = scalars_df_index[["int64_col", "int64_too", "float64_col", "string_col"]] + df1 = df.assign(combined=df.apply(processor, axis=1)) repr(df1) From 952cab92e548b70d077b20bf10f5307751d2ae76 Mon Sep 17 00:00:00 2001 From: Garrett Wu <6505921+GarrettWu@users.noreply.github.com> Date: Mon, 23 Sep 2024 11:44:39 -0700 Subject: [PATCH 09/14] feat: add ml.model_selection.KFold class (#1001) --- bigframes/ml/model_selection.py | 74 +++++--- bigframes/ml/utils.py | 39 +++- tests/system/small/ml/test_model_selection.py | 173 ++++++++++++++++++ .../sklearn/model_selection/_split.py | 109 +++++++++++ 4 files changed, 367 insertions(+), 28 deletions(-) create mode 100644 third_party/bigframes_vendored/sklearn/model_selection/_split.py diff --git a/bigframes/ml/model_selection.py b/bigframes/ml/model_selection.py index a6553d13dc..e4c41b2a39 100644 --- a/bigframes/ml/model_selection.py +++ b/bigframes/ml/model_selection.py @@ -17,8 +17,12 @@ https://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection.""" -from typing import cast, List, Union +import inspect +from typing import cast, Generator, List, Union +import bigframes_vendored.sklearn.model_selection._split as vendored_model_selection_split + +from bigframes.core import log_adapter from bigframes.ml import utils import bigframes.pandas as bpd @@ -30,30 +34,6 @@ def train_test_split( random_state: Union[int, None] = None, stratify: Union[bpd.Series, None] = None, ) -> List[Union[bpd.DataFrame, bpd.Series]]: - """Splits dataframes or series into random train and test subsets. - - Args: - *arrays (bigframes.dataframe.DataFrame or bigframes.series.Series): - A sequence of BigQuery DataFrames or Series that can be joined on - their indexes. - test_size (default None): - The proportion of the dataset to include in the test split. If - None, this will default to the complement of train_size. If both - are none, it will be set to 0.25. - train_size (default None): - The proportion of the dataset to include in the train split. If - None, this will default to the complement of test_size. - random_state (default None): - A seed to use for randomly choosing the rows of the split. If not - set, a random split will be generated each time. - stratify: (bigframes.series.Series or None, default None): - If not None, data is split in a stratified fashion, using this as the class labels. Each split has the same distribution of the class labels with the original dataset. - Default to None. - Note: By setting the stratify parameter, the memory consumption and generated SQL will be linear to the unique values in the Series. May return errors if the unique values size is too large. - - Returns: - List[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]]: A list of BigQuery DataFrames or Series. - """ # TODO(garrettwu): scikit-learn throws an error when the dataframes don't have the same # number of rows. We probably want to do something similar. Now the implementation is based @@ -123,3 +103,47 @@ def _stratify_split(df: bpd.DataFrame, stratify: bpd.Series) -> List[bpd.DataFra results.append(joined_df_test[columns]) return results + + +train_test_split.__doc__ = inspect.getdoc( + vendored_model_selection_split.train_test_split +) + + +@log_adapter.class_logger +class KFold(vendored_model_selection_split.KFold): + def __init__(self, n_splits: int = 5, *, random_state: Union[int, None] = None): + if n_splits < 2: + raise ValueError(f"n_splits must be at least 2. Got {n_splits}") + self._n_splits = n_splits + self._random_state = random_state + + def get_n_splits(self) -> int: + return self._n_splits + + def split( + self, + X: Union[bpd.DataFrame, bpd.Series], + y: Union[bpd.DataFrame, bpd.Series, None] = None, + ) -> Generator[tuple[Union[bpd.DataFrame, bpd.Series, None]], None, None]: + X_df = next(utils.convert_to_dataframe(X)) + y_df_or = next(utils.convert_to_dataframe(y)) if y is not None else None + joined_df = X_df.join(y_df_or, how="outer") if y_df_or is not None else X_df + + fracs = (1 / self._n_splits,) * self._n_splits + + dfs = joined_df._split(fracs=fracs, random_state=self._random_state) + + for i in range(len(dfs)): + train_df = bpd.concat(dfs[:i] + dfs[i + 1 :]) + test_df = dfs[i] + + X_train = train_df[X_df.columns] + y_train = train_df[y_df_or.columns] if y_df_or is not None else None + + X_test = test_df[X_df.columns] + y_test = test_df[y_df_or.columns] if y_df_or is not None else None + + yield utils.convert_to_types( + [X_train, X_test, y_train, y_test], [X, X, y, y] + ) diff --git a/bigframes/ml/utils.py b/bigframes/ml/utils.py index d754b1d002..96f0bc31e9 100644 --- a/bigframes/ml/utils.py +++ b/bigframes/ml/utils.py @@ -13,7 +13,7 @@ # limitations under the License. import typing -from typing import Any, Iterable, Literal, Mapping, Optional, Union +from typing import Any, Generator, Iterable, Literal, Mapping, Optional, Union import bigframes_vendored.constants as constants from google.cloud import bigquery @@ -25,7 +25,7 @@ ArrayType = Union[bpd.DataFrame, bpd.Series] -def convert_to_dataframe(*input: ArrayType) -> Iterable[bpd.DataFrame]: +def convert_to_dataframe(*input: ArrayType) -> Generator[bpd.DataFrame, None, None]: return (_convert_to_dataframe(frame) for frame in input) @@ -39,7 +39,7 @@ def _convert_to_dataframe(frame: ArrayType) -> bpd.DataFrame: ) -def convert_to_series(*input: ArrayType) -> Iterable[bpd.Series]: +def convert_to_series(*input: ArrayType) -> Generator[bpd.Series, None, None]: return (_convert_to_series(frame) for frame in input) @@ -60,6 +60,39 @@ def _convert_to_series(frame: ArrayType) -> bpd.Series: ) +def convert_to_types( + inputs: Iterable[Union[ArrayType, None]], + type_instances: Iterable[Union[ArrayType, None]], +) -> tuple[Union[ArrayType, None]]: + """Convert the DF, Series and None types of the input to corresponding type_instances types.""" + results = [] + for input, type_instance in zip(inputs, type_instances): + results.append(_convert_to_type(input, type_instance)) + return tuple(results) + + +def _convert_to_type( + input: Union[ArrayType, None], type_instance: Union[ArrayType, None] +): + if type_instance is None: + if input is not None: + raise ValueError( + f"Trying to convert not None type to None. {constants.FEEDBACK_LINK}" + ) + return None + if input is None: + raise ValueError( + f"Trying to convert None type to not None. {constants.FEEDBACK_LINK}" + ) + if isinstance(type_instance, bpd.DataFrame): + return _convert_to_dataframe(input) + if isinstance(type_instance, bpd.Series): + return _convert_to_series(input) + raise ValueError( + f"Unsupport converting to {type(type_instance)}. {constants.FEEDBACK_LINK}" + ) + + def parse_model_endpoint(model_endpoint: str) -> tuple[str, Optional[str]]: """Parse model endpoint string to model_name and version.""" model_name = model_endpoint diff --git a/tests/system/small/ml/test_model_selection.py b/tests/system/small/ml/test_model_selection.py index 47529565b7..e6b5f8cdc2 100644 --- a/tests/system/small/ml/test_model_selection.py +++ b/tests/system/small/ml/test_model_selection.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math + import pandas as pd import pytest @@ -302,3 +304,174 @@ def test_train_test_split_stratify(df_fixture, request): test_counts, check_index_type=False, ) + + +@pytest.mark.parametrize( + "n_splits", + (3, 5, 10), +) +def test_KFold_get_n_splits(n_splits): + kf = model_selection.KFold(n_splits) + assert kf.get_n_splits() == n_splits + + +@pytest.mark.parametrize( + "df_fixture", + ("penguins_df_default_index", "penguins_df_null_index"), +) +@pytest.mark.parametrize( + "n_splits", + (3, 5), +) +def test_KFold_split(df_fixture, n_splits, request): + df = request.getfixturevalue(df_fixture) + + kf = model_selection.KFold(n_splits=n_splits) + + X = df[ + [ + "species", + "island", + "culmen_length_mm", + ] + ] + y = df["body_mass_g"] + + len_test_upper, len_test_lower = math.ceil(len(df) / n_splits), math.floor( + len(df) / n_splits + ) + len_train_upper, len_train_lower = ( + len(df) - len_test_lower, + len(df) - len_test_upper, + ) + + for X_train, X_test, y_train, y_test in kf.split(X, y): # type: ignore + assert isinstance(X_train, bpd.DataFrame) + assert isinstance(X_test, bpd.DataFrame) + assert isinstance(y_train, bpd.Series) + assert isinstance(y_test, bpd.Series) + + # Depend on the iteration, train/test can +-1 in size. + assert ( + X_train.shape == (len_train_upper, 3) + and y_train.shape == (len_train_upper,) + and X_test.shape == (len_test_lower, 3) + and y_test.shape == (len_test_lower,) + ) or ( + X_train.shape == (len_train_lower, 3) + and y_train.shape == (len_train_lower,) + and X_test.shape == (len_test_upper, 3) + and y_test.shape == (len_test_upper,) + ) + + +@pytest.mark.parametrize( + "df_fixture", + ("penguins_df_default_index", "penguins_df_null_index"), +) +@pytest.mark.parametrize( + "n_splits", + (3, 5), +) +def test_KFold_split_X_only(df_fixture, n_splits, request): + df = request.getfixturevalue(df_fixture) + + kf = model_selection.KFold(n_splits=n_splits) + + X = df[ + [ + "species", + "island", + "culmen_length_mm", + ] + ] + + len_test_upper, len_test_lower = math.ceil(len(df) / n_splits), math.floor( + len(df) / n_splits + ) + len_train_upper, len_train_lower = ( + len(df) - len_test_lower, + len(df) - len_test_upper, + ) + + for X_train, X_test, y_train, y_test in kf.split(X, y=None): # type: ignore + assert isinstance(X_train, bpd.DataFrame) + assert isinstance(X_test, bpd.DataFrame) + assert y_train is None + assert y_test is None + + # Depend on the iteration, train/test can +-1 in size. + assert ( + X_train.shape == (len_train_upper, 3) + and X_test.shape == (len_test_lower, 3) + ) or ( + X_train.shape == (len_train_lower, 3) + and X_test.shape == (len_test_upper, 3) + ) + + +def test_KFold_seeded_correct_rows(session, penguins_pandas_df_default_index): + kf = model_selection.KFold(random_state=42) + # Note that we're using `penguins_pandas_df_default_index` as this test depends + # on a stable row order being present end to end + # filter down to the chunkiest penguins, to keep our test code a reasonable size + all_data = penguins_pandas_df_default_index[ + penguins_pandas_df_default_index.body_mass_g > 5500 + ] + + # Note that bigframes loses the index if it doesn't have a name + all_data.index.name = "rowindex" + + df = session.read_pandas(all_data) + + X = df[ + [ + "species", + "island", + "culmen_length_mm", + ] + ] + y = df["body_mass_g"] + X_train, X_test, y_train, y_test = next(kf.split(X, y)) # type: ignore + + X_train_sorted = X_train.to_pandas().sort_index() + X_test_sorted = X_test.to_pandas().sort_index() + y_train_sorted = y_train.to_pandas().sort_index() + y_test_sorted = y_test.to_pandas().sort_index() + + train_index: pd.Index = pd.Index( + [ + 144, + 146, + 148, + 161, + 168, + 183, + 217, + 221, + 225, + 226, + 237, + 244, + 257, + 262, + 264, + 266, + 267, + 269, + 278, + 289, + 290, + 291, + ], + dtype="Int64", + name="rowindex", + ) + test_index: pd.Index = pd.Index( + [186, 240, 245, 260, 263, 268], dtype="Int64", name="rowindex" + ) + + pd.testing.assert_index_equal(X_train_sorted.index, train_index) + pd.testing.assert_index_equal(X_test_sorted.index, test_index) + pd.testing.assert_index_equal(y_train_sorted.index, train_index) + pd.testing.assert_index_equal(y_test_sorted.index, test_index) diff --git a/third_party/bigframes_vendored/sklearn/model_selection/_split.py b/third_party/bigframes_vendored/sklearn/model_selection/_split.py new file mode 100644 index 0000000000..280962473e --- /dev/null +++ b/third_party/bigframes_vendored/sklearn/model_selection/_split.py @@ -0,0 +1,109 @@ +""" +The :mod:`sklearn.model_selection._split` module includes classes and +functions to split the data based on a preset strategy. +""" + +# Author: Alexandre Gramfort +# Gael Varoquaux +# Olivier Grisel +# Raghav RV +# Leandro Hermida +# Rodion Martynov +# License: BSD 3 clause + + +from abc import ABCMeta + +from bigframes import constants + + +class _BaseKFold(metaclass=ABCMeta): + """Base class for K-Fold cross-validators.""" + + def split(self, X, y=None): + """Generate indices to split data into training and test set. + + Args: + X (bigframes.dataframe.DataFrame or bigframes.series.Series): + BigFrames DataFrame or Series of shape (n_samples, n_features) + Training data, where `n_samples` is the number of samples + and `n_features` is the number of features. + + y (bigframes.dataframe.DataFrame, bigframes.series.Series or None): + BigFrames DataFrame, Series of shape (n_samples,) or None. + The target variable for supervised learning problems. Default to None. + + Yields: + X_train (bigframes.dataframe.DataFrame or bigframes.series.Series): + The training data for that split. + + X_test (bigframes.dataframe.DataFrame or bigframes.series.Series): + The testing data for that split. + + y_train (bigframes.dataframe.DataFrame, bigframes.series.Series or None): + The training label for that split. + + y_test (bigframes.dataframe.DataFrame, bigframes.series.Series or None): + The testing label for that split. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + def get_n_splits(self): + """Returns the number of splitting iterations in the cross-validator. + + Returns: + int: the number of splitting iterations in the cross-validator. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + +class KFold(_BaseKFold): + """K-Fold cross-validator. + + Split data in train/test sets. Split dataset into k consecutive folds. + + Each fold is then used once as a validation while the k - 1 remaining + folds form the training set. + + Args: + n_splits (int): + Number of folds. Must be at least 2. Default to 5. + + random_state (Optional[int]): + A seed to use for randomly choosing the rows of the split. If not + set, a random split will be generated each time. Default to None. + """ + + +def train_test_split( + *arrays, + test_size=None, + train_size=None, + random_state=None, + stratify=None, +): + """Splits dataframes or series into random train and test subsets. + + Args: + *arrays (bigframes.dataframe.DataFrame or bigframes.series.Series): + A sequence of BigQuery DataFrames or Series that can be joined on + their indexes. + test_size (default None): + The proportion of the dataset to include in the test split. If + None, this will default to the complement of train_size. If both + are none, it will be set to 0.25. + train_size (default None): + The proportion of the dataset to include in the train split. If + None, this will default to the complement of test_size. + random_state (default None): + A seed to use for randomly choosing the rows of the split. If not + set, a random split will be generated each time. + stratify: (bigframes.series.Series or None, default None): + If not None, data is split in a stratified fashion, using this as the class labels. Each split has the same distribution of the class labels with the original dataset. + Default to None. + Note: By setting the stratify parameter, the memory consumption and generated SQL will be linear to the unique values in the Series. May return errors if the unique values size is too large. + + Returns: + List[Union[bigframes.dataframe.DataFrame, bigframes.series.Series]]: A list of BigQuery DataFrames or Series. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From 7bcec880b5443c8a95b9ce9db666d6146426389a Mon Sep 17 00:00:00 2001 From: Huan Chen <142538604+Genesis929@users.noreply.github.com> Date: Mon, 23 Sep 2024 12:10:42 -0700 Subject: [PATCH 10/14] chore: update benchmark readme for usage reference. (#1009) * chore: update benchmark readme for usage reference. * add s * update format --- tests/benchmark/README.md | 62 ++++++++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/tests/benchmark/README.md b/tests/benchmark/README.md index 9c5f4a99d6..a30c36065b 100644 --- a/tests/benchmark/README.md +++ b/tests/benchmark/README.md @@ -1,13 +1,61 @@ -# Benchmark Tests - +# BigFrames Benchmarking ## Overview - This directory contains scripts for performance benchmarking of various components of BigFrames. -## Execution Details +## Why Separate Processes? +Each benchmark is executed in a separate process to mitigate the effects of any residual caching or settings that may persist in BigFrames, ensuring that each test is conducted in a clean state. -Scripts in this directory can be executed as part of the benchmarking session or independently from the command line. This allows for quick, standalone runs for immediate debugging and validation without the overhead of initiating full benchmark sessions. +## Available Benchmarks +This section lists the benchmarks currently available, with descriptions and links to their sources: +- **DB Benchmark**: This benchmark is adapted from DuckDB Labs and is designed to assess database performance. More information can be found on the [official DB Benchmark GitHub page](https://github.com/duckdblabs/db-benchmark). +- **TPC-H Benchmark**: Based on the TPC-H standards, this benchmark evaluates transaction processing capabilities. It is adapted from code found in the Polars repository, specifically tailored to test and compare these capabilities. Details are available on the [Polars Benchmark GitHub repository](https://github.com/pola-rs/polars-benchmark). +- **Notebooks**: These Jupyter notebooks showcase BigFrames' key features and patterns, and also enable performance benchmarking. Explore them at the [BigFrames Notebooks repository](https://github.com/googleapis/python-bigquery-dataframes/tree/main/notebooks). -## Why Separate Processes? +## Usage Examples +Our benchmarking process runs internally on a daily basis to continuously monitor the performance of BigFrames. However, there are occasions when you might need to conduct benchmarking locally to test specific changes or new features. -Each benchmark is executed in a separate process to mitigate the effects of any residual caching or settings that may persist in BigFrames, ensuring that each test is conducted in a clean state. +Here's how you can run benchmarks locally: + +- **Running Notebook Benchmarks**: To execute all notebook benchmarks, use the following command: + ```bash + nox -r -s notebook + ``` + + This command runs all the Jupyter notebooks in the repository as benchmarks. +- **Running Pure Benchmarks**: For executing more traditional benchmarks that do not involve notebooks, use: + ```bash + nox -r -s benchmark + ``` + This will run all the non-notebook benchmarks specified in the repository. + +- **Saving Results**: By default, when run locally, each benchmark concludes by printing a summary of the results, which are not saved automatically. To save the results to a CSV file, you can use the --output-csv or -o option followed by a specific path. If no path is specified, the results will be saved to a temporary location, and the path to this location will be printed at the end of the benchmark. + ```bash + nox -r -s benchmark -- --output-csv path/to/your/results.csv + nox -r -s benchmark -- --output-csv + ``` + +- **Running Multiple Iterations**: To run a benchmark multiple times and obtain an average result, use the -i or --iterations option followed by the number of iterations: + ```bash + nox -r -s benchmark -- --iterations 5 + ``` + +- **Filtering Benchmarks**: If you want to run only specific benchmarks, such as TPC-H, or specific queries within a benchmark, like tpch/q1, you can use the --benchmark-filter or -b option followed by the folder, file name, or both: + ```bash + # Runs all benchmarks in the 'tpch' directory + nox -r -s benchmark -- --benchmark-filter tpch + + # Runs all benchmarks in 'db_benchmark' and specific queries q1 and q2 from TPC-H + nox -r -s benchmark -- --benchmark-filter db_benchmark tpch/q1.py tpch/q2.py + ``` +- **Uploading Results to BigQuery**: To upload benchmark results to BigQuery, set the environment variable GCLOUD_BENCH_PUBLISH_PROJECT to the Google Cloud project where you want to store the results. This enables automatic uploading of the benchmark data to your specified project in BigQuery: + ```bash + export GCLOUD_BENCH_PUBLISH_PROJECT='your-google-cloud-project-id' + + # Run all non-notebook benchmarks and uploads the results to + # your-google-cloud-project-id.benchmark_report.benchmark + nox -r -s benchmark + + # Run all notebook benchmarks and uploads the results to + # your-google-cloud-project-id.benchmark_report.notebook_benchmark + nox -r -s notebook + ``` From 1b5b0eea92631b7dd1b688cf1da617fc7ce862dc Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Mon, 23 Sep 2024 14:19:00 -0700 Subject: [PATCH 11/14] perf: Join op discards child ordering in unordered mode (#923) --- bigframes/core/compile/compiled.py | 4 ++++ bigframes/core/compile/compiler.py | 29 +++++++++++++++++++++-------- bigframes/core/nodes.py | 4 +++- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 693d93de8c..6973091296 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -441,6 +441,10 @@ def explode(self, offsets: typing.Sequence[int]) -> UnorderedIR: columns=columns, ) + def as_ordered_ir(self) -> OrderedIR: + """Convert to OrderedIr, but without any definite ordering.""" + return OrderedIR(self._table, self._columns, predicates=self._predicates) + ## Helpers def _set_or_replace_by_id( self, id: str, new_value: ibis_types.Value diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 23501f93c8..74fcaf5f2a 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -76,14 +76,27 @@ def _compile_node( @_compile_node.register def compile_join(self, node: nodes.JoinNode, ordered: bool = True): if ordered: - left_ordered = self.compile_ordered_ir(node.left_child) - right_ordered = self.compile_ordered_ir(node.right_child) - return bigframes.core.compile.single_column.join_by_column_ordered( - left=left_ordered, - right=right_ordered, - type=node.type, - conditions=node.conditions, - ) + # In general, joins are an ordering destroying operation. + # With ordering_mode = "partial", make this explicit. In + # this case, we don't need to provide a deterministic ordering. + if self.strict: + left_ordered = self.compile_ordered_ir(node.left_child) + right_ordered = self.compile_ordered_ir(node.right_child) + return bigframes.core.compile.single_column.join_by_column_ordered( + left=left_ordered, + right=right_ordered, + type=node.type, + conditions=node.conditions, + ) + else: + left_unordered = self.compile_unordered_ir(node.left_child) + right_unordered = self.compile_unordered_ir(node.right_child) + return bigframes.core.compile.single_column.join_by_column_unordered( + left=left_unordered, + right=right_unordered, + type=node.type, + conditions=node.conditions, + ).as_ordered_ir() else: left_unordered = self.compile_unordered_ir(node.left_child) right_unordered = self.compile_unordered_ir(node.right_child) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 2dc9623d89..93b59f75ee 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -249,6 +249,7 @@ def order_ambiguous(self) -> bool: @property def explicitly_ordered(self) -> bool: + # Do not consider user pre-join ordering intent - they need to re-order post-join in unordered mode. return False def __hash__(self): @@ -307,7 +308,8 @@ def order_ambiguous(self) -> bool: @property def explicitly_ordered(self) -> bool: - return all(child.explicitly_ordered for child in self.children) + # Consider concat as an ordered operations (even though input frames may not be ordered) + return True def __hash__(self): return self._node_hash From 35b458e94e5b5c96b97e55c80945cf3f3c0d163f Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 23 Sep 2024 16:27:23 -0700 Subject: [PATCH 12/14] chore: remove pandas version limit from test `test_getitem_w_struct_array` (#1014) * chore: remove version limit from test * remove redundant import --- tests/system/small/operations/test_strings.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/system/small/operations/test_strings.py b/tests/system/small/operations/test_strings.py index 98fecaa93b..bb328360ee 100644 --- a/tests/system/small/operations/test_strings.py +++ b/tests/system/small/operations/test_strings.py @@ -14,7 +14,6 @@ import re -import packaging.version import pandas as pd import pyarrow as pa import pytest @@ -641,9 +640,6 @@ def test_getitem_w_array(index, column_name, repeated_df, repeated_pandas_df): def test_getitem_w_struct_array(): - if packaging.version.Version(pd.__version__) <= packaging.version.Version("1.5.3"): - pytest.skip("https://github.com/googleapis/python-bigquery/issues/1992") - pa_struct = pa.struct( [ ("name", pa.string()), From 8e9919b53899b6951a10d02643d1d0e53e15665f Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 23 Sep 2024 17:22:23 -0700 Subject: [PATCH 13/14] feat: support ingress settings in `remote_function` (#1011) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: support ingress settings in `remote_function` * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * propagate new param to bigframes.pandas module * fix mypy error --------- Co-authored-by: Owl Bot --- .../functions/_remote_function_client.py | 23 +++++++ .../functions/_remote_function_session.py | 21 +++++- bigframes/pandas/__init__.py | 4 ++ bigframes/session/__init__.py | 9 +++ tests/system/large/test_remote_function.py | 66 +++++++++++++++++++ 5 files changed, 122 insertions(+), 1 deletion(-) diff --git a/bigframes/functions/_remote_function_client.py b/bigframes/functions/_remote_function_client.py index 75385f11a5..5acd31b425 100644 --- a/bigframes/functions/_remote_function_client.py +++ b/bigframes/functions/_remote_function_client.py @@ -23,6 +23,7 @@ import string import sys import tempfile +import types from typing import cast, Tuple, TYPE_CHECKING from bigframes_vendored import constants @@ -43,6 +44,15 @@ logger = logging.getLogger(__name__) +# https://cloud.google.com/sdk/gcloud/reference/functions/deploy#--ingress-settings +_INGRESS_SETTINGS_MAP = types.MappingProxyType( + { + "all": functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, + "internal-only": functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_ONLY, + "internal-and-gclb": functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_AND_GCLB, + } +) + class RemoteFunctionClient: # Wait time (in seconds) for an IAM binding to take effect after creation @@ -228,6 +238,7 @@ def create_cloud_function( is_row_processor=False, vpc_connector=None, memory_mib=1024, + ingress_settings="all", ): """Create a cloud function from the given user defined function. @@ -324,6 +335,16 @@ def create_cloud_function( function.service_config.service_account_email = ( self._cloud_function_service_account ) + if ingress_settings not in _INGRESS_SETTINGS_MAP: + raise ValueError( + "'{}' not one of the supported ingress settings values: {}".format( + ingress_settings, list(_INGRESS_SETTINGS_MAP) + ) + ) + function.service_config.ingress_settings = cast( + functions_v2.ServiceConfig.IngressSettings, + _INGRESS_SETTINGS_MAP[ingress_settings], + ) function.kms_key_name = self._cloud_function_kms_key_name create_function_request.function = function @@ -372,6 +393,7 @@ def provision_bq_remote_function( is_row_processor, cloud_function_vpc_connector, cloud_function_memory_mib, + cloud_function_ingress_settings, ): """Provision a BigQuery remote function.""" # Augment user package requirements with any internal package @@ -418,6 +440,7 @@ def provision_bq_remote_function( is_row_processor=is_row_processor, vpc_connector=cloud_function_vpc_connector, memory_mib=cloud_function_memory_mib, + ingress_settings=cloud_function_ingress_settings, ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index 6bc7a4b079..a924dbd9c5 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -19,7 +19,17 @@ import inspect import sys import threading -from typing import Any, cast, Dict, Mapping, Optional, Sequence, TYPE_CHECKING, Union +from typing import ( + Any, + cast, + Dict, + Literal, + Mapping, + Optional, + Sequence, + TYPE_CHECKING, + Union, +) import warnings import bigframes_vendored.constants as constants @@ -110,6 +120,9 @@ def remote_function( cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_ingress_settings: Literal[ + "all", "internal-only", "internal-and-gclb" + ] = "all", ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -280,6 +293,11 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. + cloud_function_ingress_settings (str, Optional): + Ingress settings controls dictating what traffic can reach the + function. By default `all` will be used. It must be one of: + `all`, `internal-only`, `internal-and-gclb`. See for more details + https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings. """ # Some defaults may be used from the session if not provided otherwise import bigframes.exceptions as bf_exceptions @@ -504,6 +522,7 @@ def try_delattr(attr): is_row_processor=is_row_processor, cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_ingress_settings=cloud_function_ingress_settings, ) # TODO(shobs): Find a better way to support udfs with param named "name". diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 94ea6becab..1bdf49eaf5 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -669,6 +669,9 @@ def remote_function( cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_ingress_settings: Literal[ + "all", "internal-only", "internal-and-gclb" + ] = "all", ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -687,6 +690,7 @@ def remote_function( cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_ingress_settings=cloud_function_ingress_settings, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 7d0cfaee5c..3a9cba442c 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1040,6 +1040,9 @@ def remote_function( cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, cloud_function_memory_mib: Optional[int] = 1024, + cloud_function_ingress_settings: Literal[ + "all", "internal-only", "internal-and-gclb" + ] = "all", ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1194,6 +1197,11 @@ def remote_function( default memory of cloud functions be allocated, pass `None`. See for more details https://cloud.google.com/functions/docs/configuring/memory. + cloud_function_ingress_settings (str, Optional): + Ingress settings controls dictating what traffic can reach the + function. By default `all` will be used. It must be one of: + `all`, `internal-only`, `internal-and-gclb`. See for more details + https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1220,6 +1228,7 @@ def remote_function( cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_memory_mib=cloud_function_memory_mib, + cloud_function_ingress_settings=cloud_function_ingress_settings, ) def read_gbq_function( diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index d1e82dd415..18d2609347 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -2173,3 +2173,69 @@ def foo(x): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, foo ) + + +@pytest.mark.parametrize( + ("ingress_settings_args", "effective_ingress_settings"), + [ + pytest.param( + {}, functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, id="no-set" + ), + pytest.param( + {"cloud_function_ingress_settings": "all"}, + functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, + id="set-all", + ), + pytest.param( + {"cloud_function_ingress_settings": "internal-only"}, + functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_ONLY, + id="set-internal-only", + ), + pytest.param( + {"cloud_function_ingress_settings": "internal-and-gclb"}, + functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_AND_GCLB, + id="set-internal-and-gclb", + ), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_ingress_settings( + session, scalars_dfs, ingress_settings_args, effective_ingress_settings +): + try: + + def square(x: int) -> int: + return x * x + + square_remote = session.remote_function(reuse=False, **ingress_settings_args)( + square + ) + + # Assert that the GCF is created with the intended maximum timeout + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.ingress_settings == effective_ingress_settings + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, square_remote + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_ingress_settings_unsupported(session): + with pytest.raises( + ValueError, match="'unknown' not one of the supported ingress settings values" + ): + + @session.remote_function(reuse=False, cloud_function_ingress_settings="unknown") + def square(x: int) -> int: + return x * x From a95493df27208ad25b925d6a1c134a7d5c668182 Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Tue, 24 Sep 2024 02:27:18 +0000 Subject: [PATCH 14/14] chore(main): release 1.19.0 (#1002) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> --- CHANGELOG.md | 24 +++++++++++++++++++++++ bigframes/version.py | 2 +- third_party/bigframes_vendored/version.py | 2 +- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c210f5c991..a99e0ecd91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,30 @@ [1]: https://pypi.org/project/bigframes/#history +## [1.19.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.18.0...v1.19.0) (2024-09-24) + + +### Features + +* Add ml.model_selection.KFold class ([#1001](https://github.com/googleapis/python-bigquery-dataframes/issues/1001)) ([952cab9](https://github.com/googleapis/python-bigquery-dataframes/commit/952cab92e548b70d077b20bf10f5307751d2ae76)) +* Support bool and bytes types in `describe(include='all')` ([#994](https://github.com/googleapis/python-bigquery-dataframes/issues/994)) ([cc48f58](https://github.com/googleapis/python-bigquery-dataframes/commit/cc48f58cbd94f8110ee863eb57d3fe8dc5a17778)) +* Support ingress settings in `remote_function` ([#1011](https://github.com/googleapis/python-bigquery-dataframes/issues/1011)) ([8e9919b](https://github.com/googleapis/python-bigquery-dataframes/commit/8e9919b53899b6951a10d02643d1d0e53e15665f)) + + +### Bug Fixes + +* Fix miscasting issues with case_when ([#1003](https://github.com/googleapis/python-bigquery-dataframes/issues/1003)) ([038139d](https://github.com/googleapis/python-bigquery-dataframes/commit/038139dfa4fa89167c52c1cb559c2eb5fe2f0411)) + + +### Performance Improvements + +* Join op discards child ordering in unordered mode ([#923](https://github.com/googleapis/python-bigquery-dataframes/issues/923)) ([1b5b0ee](https://github.com/googleapis/python-bigquery-dataframes/commit/1b5b0eea92631b7dd1b688cf1da617fc7ce862dc)) + + +### Dependencies + +* Update ibis version in prerelease tests ([#1012](https://github.com/googleapis/python-bigquery-dataframes/issues/1012)) ([f89785f](https://github.com/googleapis/python-bigquery-dataframes/commit/f89785fcfc51c541253ca8c1e8baf80fbfaea3b6)) + ## [1.18.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.17.0...v1.18.0) (2024-09-18) diff --git a/bigframes/version.py b/bigframes/version.py index 745632616c..5dda345fcb 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.18.0" +__version__ = "1.19.0" diff --git a/third_party/bigframes_vendored/version.py b/third_party/bigframes_vendored/version.py index 745632616c..5dda345fcb 100644 --- a/third_party/bigframes_vendored/version.py +++ b/third_party/bigframes_vendored/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.18.0" +__version__ = "1.19.0"