From 796fc3ed59bc0bd79f566ce2bc604f4cf2079fb3 Mon Sep 17 00:00:00 2001 From: Garrett Wu <6505921+GarrettWu@users.noreply.github.com> Date: Mon, 6 Jan 2025 11:23:09 -0800 Subject: [PATCH 01/22] chore: add experimental blob.image_blur function (#1256) * chore: add experimental blob.image_blur function * apply to obj_ref * docs * fix mypy --- bigframes/blob/_functions.py | 130 +++++++++++++++++++ bigframes/core/compile/scalar_op_compiler.py | 12 ++ bigframes/operations/__init__.py | 17 +++ bigframes/operations/blob.py | 51 ++++++++ 4 files changed, 210 insertions(+) create mode 100644 bigframes/blob/_functions.py diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py new file mode 100644 index 0000000000..4b3841252c --- /dev/null +++ b/bigframes/blob/_functions.py @@ -0,0 +1,130 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +import inspect +from typing import Callable, Iterable + +import google.cloud.bigquery as bigquery + +import bigframes +import bigframes.session._io.bigquery as bf_io_bigquery + +_PYTHON_TO_BQ_TYPES = {int: "INT64", float: "FLOAT64", str: "STRING", bytes: "BYTES"} + + +@dataclass(frozen=True) +class FunctionDef: + """Definition of a Python UDF.""" + + func: Callable # function body + requirements: Iterable[str] # required packages + + +# TODO(garrettwu): migrate to bigframes UDF when it is available +class TransformFunction: + """Simple transform function class to deal with Python UDF.""" + + def __init__( + self, func_def: FunctionDef, session: bigframes.Session, connection: str + ): + self._func = func_def.func + self._requirements = func_def.requirements + self._session = session + self._connection = connection + + def _input_bq_signature(self): + sig = inspect.signature(self._func) + inputs = [] + for k, v in sig.parameters.items(): + inputs.append(f"{k} {_PYTHON_TO_BQ_TYPES[v.annotation]}") + return ", ".join(inputs) + + def _output_bq_type(self): + sig = inspect.signature(self._func) + return _PYTHON_TO_BQ_TYPES[sig.return_annotation] + + def _create_udf(self): + """Create Python UDF in BQ. Return name of the UDF.""" + udf_name = str(self._session._loader._storage_manager._random_table()) + + func_body = inspect.getsource(self._func) + func_name = self._func.__name__ + packages = str(list(self._requirements)) + + sql = f""" +CREATE OR REPLACE FUNCTION `{udf_name}`({self._input_bq_signature()}) +RETURNS {self._output_bq_type()} LANGUAGE python +WITH CONNECTION `{self._connection}` +OPTIONS (entry_point='{func_name}', runtime_version='python-3.11', packages={packages}) +AS r\"\"\" + + +{func_body} + + +\"\"\" + """ + + bf_io_bigquery.start_query_with_client( + self._session.bqclient, + sql, + job_config=bigquery.QueryJobConfig(), + metrics=self._session._metrics, + ) + + return udf_name + + def udf(self): + """Create and return the UDF object.""" + udf_name = self._create_udf() + return self._session.read_gbq_function(udf_name) + + +# Blur images. Takes ObjectRefRuntime as JSON string. Outputs ObjectRefRuntime JSON string. +def image_blur_func( + src_obj_ref_rt: str, dst_obj_ref_rt: str, ksize_x: int, ksize_y: int +) -> str: + import json + + import cv2 as cv # type: ignore + import numpy as np + import requests + + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) + + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] + + response = requests.get(src_url) + bts = response.content + + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) + bts = cv.imencode(".jpeg", img_blurred)[1].tobytes() + + requests.put( + url=dst_url, + data=bts, + headers={ + "Content-Type": "image/jpeg", + }, + ) + + return dst_obj_ref_rt + + +image_blur_def = FunctionDef(image_blur_func, ["opencv-python", "numpy", "requests"]) diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index d824009fec..2b85a97483 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -1210,6 +1210,11 @@ def json_extract_string_array_op_impl( return json_extract_string_array(json_obj=x, json_path=op.json_path) +@scalar_op_compiler.register_unary_op(ops.ToJSONString) +def to_json_string_op_impl(json_obj: ibis_types.Value): + return to_json_string(json_obj=json_obj) + + # Blob Ops @scalar_op_compiler.register_unary_op(ops.obj_fetch_metadata_op) def obj_fetch_metadata_op_impl(obj_ref: ibis_types.Value): @@ -1909,6 +1914,13 @@ def json_extract_string_array( # type: ignore[empty-body] """Extracts a JSON array and converts it to a SQL ARRAY of STRINGs.""" +@ibis_udf.scalar.builtin(name="to_json_string") +def to_json_string( # type: ignore[empty-body] + json_obj: ibis_dtypes.JSON, +) -> ibis_dtypes.String: + """Convert JSON to STRING.""" + + @ibis_udf.scalar.builtin(name="ML.DISTANCE") def vector_distance(vector1, vector2, type: str) -> ibis_dtypes.Float64: # type: ignore[empty-body] """Computes the distance between two vectors using specified type ("EUCLIDEAN", "MANHATTAN", or "COSINE")""" diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index 2884d56551..03d9d60d5f 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -740,6 +740,23 @@ def output_type(self, *input_types): ) +@dataclasses.dataclass(frozen=True) +class ToJSONString(UnaryOp): + name: typing.ClassVar[str] = "to_json_string" + + def output_type(self, *input_types): + input_type = input_types[0] + if not dtypes.is_json_like(input_type): + raise TypeError( + "Input type must be an valid JSON object or JSON-formatted string type." + + f" Received type: {input_type}" + ) + return dtypes.STRING_DTYPE + + +to_json_string_op = ToJSONString() + + ## Blob Ops @dataclasses.dataclass(frozen=True) class ObjGetAccessUrl(UnaryOp): diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index c074c72971..898d56ab83 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -14,9 +14,12 @@ from __future__ import annotations +from typing import Optional + import IPython.display as ipy_display import requests +from bigframes import clients from bigframes.operations import base import bigframes.operations as ops import bigframes.series @@ -66,3 +69,51 @@ def display(self, n: int = 3): read_url = str(read_url).strip('"') response = requests.get(read_url) ipy_display.display(ipy_display.Image(response.content)) + + def image_blur( + self, + ksize: tuple[int, int], + *, + dst: bigframes.series.Series, + connection: Optional[str] = None, + ) -> bigframes.series.Series: + """Blurs images. + + .. note:: + BigFrames Blob is still under experiments. It may not work and subject to change in the future. + + Args: + ksize (tuple(int, int)): Kernel size. + dst (bigframes.series.Series): Destination blob series. + connection (str or None, default None): BQ connection used for internet transactions. If None, uses default connection of the session. + + Returns: + JSON: Runtime info of the Blob. + """ + import bigframes.blob._functions as blob_func + + connection = connection or self._block.session._bq_connection + connection = clients.resolve_full_bq_connection_name( + connection, + default_project=self._block.session._project, + default_location=self._block.session._location, + ) + + image_blur_udf = blob_func.TransformFunction( + blob_func.image_blur_def, + session=self._block.session, + connection=connection, + ).udf() + + src_rt = bigframes.series.Series(self._block)._apply_unary_op( + ops.ObjGetAccessUrl(mode="R") + ) + dst_rt = dst._apply_unary_op(ops.ObjGetAccessUrl(mode="RW")) + + src_rt = src_rt._apply_unary_op(ops.to_json_string_op) + dst_rt = dst_rt._apply_unary_op(ops.to_json_string_op) + + df = src_rt.to_frame().join(dst_rt.to_frame(), how="outer") + df["ksize_x"], df["ksize_y"] = ksize + + return df.apply(image_blur_udf, axis=1) From 8077ff49426b103dc5a52eeb86a2c6a869c99825 Mon Sep 17 00:00:00 2001 From: Garrett Wu <6505921+GarrettWu@users.noreply.github.com> Date: Mon, 6 Jan 2025 13:55:19 -0800 Subject: [PATCH 02/22] feat: add max_retries to TextEmbeddingGenerator and Claude3TextGenerator (#1259) * chore: fix wordings of Gemini max_retries * feat: add max_retries to TextEmbeddingGenerator and Claude3TextGenerator --------- Co-authored-by: Shuowei Li --- bigframes/ml/base.py | 64 ++++++- bigframes/ml/llm.py | 109 ++++++----- tests/system/small/ml/test_llm.py | 288 +++++++++++++++++++++++++++--- 3 files changed, 369 insertions(+), 92 deletions(-) diff --git a/bigframes/ml/base.py b/bigframes/ml/base.py index 4058647adb..a2c122f8c7 100644 --- a/bigframes/ml/base.py +++ b/bigframes/ml/base.py @@ -22,7 +22,8 @@ """ import abc -from typing import cast, Optional, TypeVar +from typing import Callable, cast, Mapping, Optional, TypeVar +import warnings import bigframes_vendored.sklearn.base @@ -77,6 +78,9 @@ def fit_transform(self, x_train: Union[DataFrame, Series], y_train: Union[DataFr ... """ + def __init__(self): + self._bqml_model: Optional[core.BqmlModel] = None + def __repr__(self): """Print the estimator's constructor with all non-default parameter values.""" @@ -95,9 +99,6 @@ def __repr__(self): class Predictor(BaseEstimator): """A BigQuery DataFrames ML Model base class that can be used to predict outputs.""" - def __init__(self): - self._bqml_model: Optional[core.BqmlModel] = None - @abc.abstractmethod def predict(self, X): pass @@ -213,12 +214,61 @@ def fit( return self._fit(X, y) +class RetriableRemotePredictor(BaseEstimator): + @property + @abc.abstractmethod + def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]: + pass + + @property + @abc.abstractmethod + def _status_col(self) -> str: + pass + + def _predict_and_retry( + self, X: bpd.DataFrame, options: Mapping, max_retries: int + ) -> bpd.DataFrame: + assert self._bqml_model is not None + + df_result = bpd.DataFrame(session=self._bqml_model.session) # placeholder + df_fail = X + for _ in range(max_retries + 1): + df = self._predict_func(df_fail, options) + + success = df[self._status_col].str.len() == 0 + df_succ = df[success] + df_fail = df[~success] + + if df_succ.empty: + if max_retries > 0: + warnings.warn( + "Can't make any progress, stop retrying.", RuntimeWarning + ) + break + + df_result = ( + bpd.concat([df_result, df_succ]) if not df_result.empty else df_succ + ) + + if df_fail.empty: + break + + if not df_fail.empty: + warnings.warn( + f"Some predictions failed. Check column {self._status_col} for detailed status. You may want to filter the failed rows and retry.", + RuntimeWarning, + ) + + df_result = cast( + bpd.DataFrame, + bpd.concat([df_result, df_fail]) if not df_result.empty else df_fail, + ) + return df_result + + class BaseTransformer(BaseEstimator): """Transformer base class.""" - def __init__(self): - self._bqml_model: Optional[core.BqmlModel] = None - @abc.abstractmethod def _keys(self): pass diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py index d42138b006..e6825f80bb 100644 --- a/bigframes/ml/llm.py +++ b/bigframes/ml/llm.py @@ -16,7 +16,7 @@ from __future__ import annotations -from typing import cast, Literal, Optional +from typing import Callable, cast, Literal, Mapping, Optional import warnings import bigframes_vendored.constants as constants @@ -616,7 +616,7 @@ def to_gbq( @log_adapter.class_logger -class TextEmbeddingGenerator(base.BaseEstimator): +class TextEmbeddingGenerator(base.RetriableRemotePredictor): """Text embedding generator LLM model. Args: @@ -715,18 +715,33 @@ def _from_bq( model._bqml_model = core.BqmlModel(session, bq_model) return model - def predict(self, X: utils.ArrayType) -> bpd.DataFrame: + @property + def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]: + return self._bqml_model.generate_embedding + + @property + def _status_col(self) -> str: + return _ML_GENERATE_EMBEDDING_STATUS + + def predict(self, X: utils.ArrayType, *, max_retries: int = 0) -> bpd.DataFrame: """Predict the result from input DataFrame. Args: X (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): Input DataFrame or Series, can contain one or more columns. If multiple columns are in the DataFrame, it must contain a "content" column for prediction. + max_retries (int, default 0): + Max number of retries if the prediction for any rows failed. Each try needs to make progress (i.e. has successfully predicted rows) to continue the retry. + Each retry will append newly succeeded rows. When the max retries are reached, the remaining rows (the ones without successful predictions) will be appended to the end of the result. + Returns: bigframes.dataframe.DataFrame: DataFrame of shape (n_samples, n_input_columns + n_prediction_columns). Returns predicted values. """ + if max_retries < 0: + raise ValueError( + f"max_retries must be larger than or equal to 0, but is {max_retries}." + ) - # Params reference: https://cloud.google.com/vertex-ai/docs/generative-ai/learn/models (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) if len(X.columns) == 1: @@ -738,15 +753,7 @@ def predict(self, X: utils.ArrayType) -> bpd.DataFrame: "flatten_json_output": True, } - df = self._bqml_model.generate_embedding(X, options) - - if (df[_ML_GENERATE_EMBEDDING_STATUS] != "").any(): - warnings.warn( - f"Some predictions failed. Check column {_ML_GENERATE_EMBEDDING_STATUS} for detailed status. You may want to filter the failed rows and retry.", - RuntimeWarning, - ) - - return df + return self._predict_and_retry(X, options=options, max_retries=max_retries) def to_gbq(self, model_name: str, replace: bool = False) -> TextEmbeddingGenerator: """Save the model to BigQuery. @@ -765,7 +772,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> TextEmbeddingGenerat @log_adapter.class_logger -class GeminiTextGenerator(base.BaseEstimator): +class GeminiTextGenerator(base.RetriableRemotePredictor): """Gemini text generator LLM model. Args: @@ -891,6 +898,14 @@ def _bqml_options(self) -> dict: } return options + @property + def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]: + return self._bqml_model.generate_text + + @property + def _status_col(self) -> str: + return _ML_GENERATE_TEXT_STATUS + def fit( self, X: utils.ArrayType, @@ -1028,41 +1043,7 @@ def predict( "ground_with_google_search": ground_with_google_search, } - df_result = bpd.DataFrame(session=self._bqml_model.session) # placeholder - df_fail = X - for _ in range(max_retries + 1): - df = self._bqml_model.generate_text(df_fail, options) - - success = df[_ML_GENERATE_TEXT_STATUS].str.len() == 0 - df_succ = df[success] - df_fail = df[~success] - - if df_succ.empty: - if max_retries > 0: - warnings.warn( - "Can't make any progress, stop retrying.", RuntimeWarning - ) - break - - df_result = ( - bpd.concat([df_result, df_succ]) if not df_result.empty else df_succ - ) - - if df_fail.empty: - break - - if not df_fail.empty: - warnings.warn( - f"Some predictions failed. Check column {_ML_GENERATE_TEXT_STATUS} for detailed status. You may want to filter the failed rows and retry.", - RuntimeWarning, - ) - - df_result = cast( - bpd.DataFrame, - bpd.concat([df_result, df_fail]) if not df_result.empty else df_fail, - ) - - return df_result + return self._predict_and_retry(X, options=options, max_retries=max_retries) def score( self, @@ -1144,7 +1125,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> GeminiTextGenerator: @log_adapter.class_logger -class Claude3TextGenerator(base.BaseEstimator): +class Claude3TextGenerator(base.RetriableRemotePredictor): """Claude3 text generator LLM model. Go to Google Cloud Console -> Vertex AI -> Model Garden page to enabe the models before use. Must have the Consumer Procurement Entitlement Manager Identity and Access Management (IAM) role to enable the models. @@ -1273,6 +1254,14 @@ def _bqml_options(self) -> dict: } return options + @property + def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]: + return self._bqml_model.generate_text + + @property + def _status_col(self) -> str: + return _ML_GENERATE_TEXT_STATUS + def predict( self, X: utils.ArrayType, @@ -1280,6 +1269,7 @@ def predict( max_output_tokens: int = 128, top_k: int = 40, top_p: float = 0.95, + max_retries: int = 0, ) -> bpd.DataFrame: """Predict the result from input DataFrame. @@ -1307,6 +1297,10 @@ def predict( Specify a lower value for less random responses and a higher value for more random responses. Default 0.95. Possible values [0.0, 1.0]. + max_retries (int, default 0): + Max number of retries if the prediction for any rows failed. Each try needs to make progress (i.e. has successfully predicted rows) to continue the retry. + Each retry will append newly succeeded rows. When the max retries are reached, the remaining rows (the ones without successful predictions) will be appended to the end of the result. + Returns: bigframes.dataframe.DataFrame: DataFrame of shape (n_samples, n_input_columns + n_prediction_columns). Returns predicted values. @@ -1324,6 +1318,11 @@ def predict( if top_p < 0.0 or top_p > 1.0: raise ValueError(f"top_p must be [0.0, 1.0], but is {top_p}.") + if max_retries < 0: + raise ValueError( + f"max_retries must be larger than or equal to 0, but is {max_retries}." + ) + (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) if len(X.columns) == 1: @@ -1338,15 +1337,7 @@ def predict( "flatten_json_output": True, } - df = self._bqml_model.generate_text(X, options) - - if (df[_ML_GENERATE_TEXT_STATUS] != "").any(): - warnings.warn( - f"Some predictions failed. Check column {_ML_GENERATE_TEXT_STATUS} for detailed status. You may want to filter the failed rows and retry.", - RuntimeWarning, - ) - - return df + return self._predict_and_retry(X, options=options, max_retries=max_retries) def to_gbq(self, model_name: str, replace: bool = False) -> Claude3TextGenerator: """Save the model to BigQuery. diff --git a/tests/system/small/ml/test_llm.py b/tests/system/small/ml/test_llm.py index 304204cc7b..29f504443a 100644 --- a/tests/system/small/ml/test_llm.py +++ b/tests/system/small/ml/test_llm.py @@ -381,7 +381,35 @@ def __eq__(self, other): return self.equals(other) -def test_gemini_text_generator_retry_success(session, bq_connection): +@pytest.mark.parametrize( + ( + "model_class", + "options", + ), + [ + ( + llm.GeminiTextGenerator, + { + "temperature": 0.9, + "max_output_tokens": 8192, + "top_k": 40, + "top_p": 1.0, + "flatten_json_output": True, + "ground_with_google_search": False, + }, + ), + ( + llm.Claude3TextGenerator, + { + "max_output_tokens": 128, + "top_k": 40, + "top_p": 0.95, + "flatten_json_output": True, + }, + ), + ], +) +def test_text_generator_retry_success(session, bq_connection, model_class, options): # Requests. df0 = EqCmpAllDataFrame( { @@ -455,22 +483,12 @@ def test_gemini_text_generator_retry_success(session, bq_connection): session=session, ), ] - options = { - "temperature": 0.9, - "max_output_tokens": 8192, - "top_k": 40, - "top_p": 1.0, - "flatten_json_output": True, - "ground_with_google_search": False, - } - gemini_text_generator_model = llm.GeminiTextGenerator( - connection_name=bq_connection, session=session - ) - gemini_text_generator_model._bqml_model = mock_bqml_model + text_generator_model = model_class(connection_name=bq_connection, session=session) + text_generator_model._bqml_model = mock_bqml_model # 3rd retry isn't triggered - result = gemini_text_generator_model.predict(df0, max_retries=3) + result = text_generator_model.predict(df0, max_retries=3) mock_bqml_model.generate_text.assert_has_calls( [ @@ -497,7 +515,35 @@ def test_gemini_text_generator_retry_success(session, bq_connection): ) -def test_gemini_text_generator_retry_no_progress(session, bq_connection): +@pytest.mark.parametrize( + ( + "model_class", + "options", + ), + [ + ( + llm.GeminiTextGenerator, + { + "temperature": 0.9, + "max_output_tokens": 8192, + "top_k": 40, + "top_p": 1.0, + "flatten_json_output": True, + "ground_with_google_search": False, + }, + ), + ( + llm.Claude3TextGenerator, + { + "max_output_tokens": 128, + "top_k": 40, + "top_p": 0.95, + "flatten_json_output": True, + }, + ), + ], +) +def test_text_generator_retry_no_progress(session, bq_connection, model_class, options): # Requests. df0 = EqCmpAllDataFrame( { @@ -550,24 +596,214 @@ def test_gemini_text_generator_retry_no_progress(session, bq_connection): session=session, ), ] + + text_generator_model = model_class(connection_name=bq_connection, session=session) + text_generator_model._bqml_model = mock_bqml_model + + # No progress, only conduct retry once + result = text_generator_model.predict(df0, max_retries=3) + + mock_bqml_model.generate_text.assert_has_calls( + [ + mock.call(df0, options), + mock.call(df1, options), + ] + ) + pd.testing.assert_frame_equal( + result.to_pandas(), + pd.DataFrame( + { + "ml_generate_text_status": ["", "error", "error"], + "prompt": [ + "What is BigQuery?", + "What is BQML?", + "What is BigQuery DataFrame?", + ], + }, + index=[0, 1, 2], + ), + check_dtype=False, + check_index_type=False, + ) + + +def test_text_embedding_generator_retry_success(session, bq_connection): + # Requests. + df0 = EqCmpAllDataFrame( + { + "content": [ + "What is BigQuery?", + "What is BQML?", + "What is BigQuery DataFrame?", + ] + }, + index=[0, 1, 2], + session=session, + ) + df1 = EqCmpAllDataFrame( + { + "ml_generate_embedding_status": ["error", "error"], + "content": [ + "What is BQML?", + "What is BigQuery DataFrame?", + ], + }, + index=[1, 2], + session=session, + ) + df2 = EqCmpAllDataFrame( + { + "ml_generate_embedding_status": ["error"], + "content": [ + "What is BQML?", + ], + }, + index=[1], + session=session, + ) + + mock_bqml_model = mock.create_autospec(spec=core.BqmlModel) + type(mock_bqml_model).session = mock.PropertyMock(return_value=session) + + # Responses. Retry twice then all succeeded. + mock_bqml_model.generate_embedding.side_effect = [ + EqCmpAllDataFrame( + { + "ml_generate_embedding_status": ["", "error", "error"], + "content": [ + "What is BigQuery?", + "What is BQML?", + "What is BigQuery DataFrame?", + ], + }, + index=[0, 1, 2], + session=session, + ), + EqCmpAllDataFrame( + { + "ml_generate_embedding_status": ["error", ""], + "content": [ + "What is BQML?", + "What is BigQuery DataFrame?", + ], + }, + index=[1, 2], + session=session, + ), + EqCmpAllDataFrame( + { + "ml_generate_embedding_status": [""], + "content": [ + "What is BQML?", + ], + }, + index=[1], + session=session, + ), + ] options = { - "temperature": 0.9, - "max_output_tokens": 8192, - "top_k": 40, - "top_p": 1.0, "flatten_json_output": True, - "ground_with_google_search": False, } - gemini_text_generator_model = llm.GeminiTextGenerator( + text_embedding_model = llm.TextEmbeddingGenerator( + connection_name=bq_connection, session=session + ) + text_embedding_model._bqml_model = mock_bqml_model + + # 3rd retry isn't triggered + result = text_embedding_model.predict(df0, max_retries=3) + + mock_bqml_model.generate_embedding.assert_has_calls( + [ + mock.call(df0, options), + mock.call(df1, options), + mock.call(df2, options), + ] + ) + pd.testing.assert_frame_equal( + result.to_pandas(), + pd.DataFrame( + { + "ml_generate_embedding_status": ["", "", ""], + "content": [ + "What is BigQuery?", + "What is BigQuery DataFrame?", + "What is BQML?", + ], + }, + index=[0, 2, 1], + ), + check_dtype=False, + check_index_type=False, + ) + + +def test_text_embedding_generator_retry_no_progress(session, bq_connection): + # Requests. + df0 = EqCmpAllDataFrame( + { + "content": [ + "What is BigQuery?", + "What is BQML?", + "What is BigQuery DataFrame?", + ] + }, + index=[0, 1, 2], + session=session, + ) + df1 = EqCmpAllDataFrame( + { + "ml_generate_embedding_status": ["error", "error"], + "content": [ + "What is BQML?", + "What is BigQuery DataFrame?", + ], + }, + index=[1, 2], + session=session, + ) + + mock_bqml_model = mock.create_autospec(spec=core.BqmlModel) + type(mock_bqml_model).session = mock.PropertyMock(return_value=session) + # Responses. Retry once, no progress, just stop. + mock_bqml_model.generate_embedding.side_effect = [ + EqCmpAllDataFrame( + { + "ml_generate_embedding_status": ["", "error", "error"], + "content": [ + "What is BigQuery?", + "What is BQML?", + "What is BigQuery DataFrame?", + ], + }, + index=[0, 1, 2], + session=session, + ), + EqCmpAllDataFrame( + { + "ml_generate_embedding_status": ["error", "error"], + "content": [ + "What is BQML?", + "What is BigQuery DataFrame?", + ], + }, + index=[1, 2], + session=session, + ), + ] + options = { + "flatten_json_output": True, + } + + text_embedding_model = llm.TextEmbeddingGenerator( connection_name=bq_connection, session=session ) - gemini_text_generator_model._bqml_model = mock_bqml_model + text_embedding_model._bqml_model = mock_bqml_model # No progress, only conduct retry once - result = gemini_text_generator_model.predict(df0, max_retries=3) + result = text_embedding_model.predict(df0, max_retries=3) - mock_bqml_model.generate_text.assert_has_calls( + mock_bqml_model.generate_embedding.assert_has_calls( [ mock.call(df0, options), mock.call(df1, options), @@ -577,8 +813,8 @@ def test_gemini_text_generator_retry_no_progress(session, bq_connection): result.to_pandas(), pd.DataFrame( { - "ml_generate_text_status": ["", "error", "error"], - "prompt": [ + "ml_generate_embedding_status": ["", "error", "error"], + "content": [ "What is BigQuery?", "What is BQML?", "What is BigQuery DataFrame?", From bf90741b8cd5bf238c03f86d081f3630f521ef60 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Mon, 6 Jan 2025 15:10:59 -0800 Subject: [PATCH 03/22] test: read_gbp works when table and column share a name (#1079) --- tests/system/small/test_session.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index eb63a9b72a..f722ccbe75 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -403,6 +403,17 @@ def test_read_gbq_on_linked_dataset_warns(session, source_table): assert warned[0].category == bigframes.exceptions.TimeTravelDisabledWarning +def test_read_gbq_w_ambigous_name( + session: bigframes.Session, +): + # Ensure read_gbq works when table and column share a name + df = session.read_gbq( + "bigframes-dev.bigframes_tests_sys.ambiguous_name" + ).to_pandas() + pd_df = pd.DataFrame({"x": [2, 1], "ambiguous_name": [20, 10]}) + pd.testing.assert_frame_equal(df, pd_df, check_dtype=False, check_index_type=False) + + def test_read_gbq_table_clustered_with_filter(session: bigframes.Session): df = session.read_gbq_table( "bigquery-public-data.cloud_storage_geo_index.landsat_index", From 58f13cb9ef8bac3222e5013d8ae77dd20f886e30 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Mon, 6 Jan 2025 19:28:59 -0800 Subject: [PATCH 04/22] docs: add bq studio links that allows users to generate Jupiter notebooks in bq studio with github contents (#1266) * add new bq studio links that allows users to self import .ipynb from github * revert semantic_operators * add bq studio link for semantic_operators.ipynb --------- Co-authored-by: Shuowei Li --- .../experimental/semantic_operators.ipynb | 936 +++++++++--------- .../bq_dataframes_llm_code_generation.ipynb | 6 + .../bq_dataframes_llm_kmeans.ipynb | 6 + ...q_dataframes_ml_drug_name_generation.ipynb | 6 + .../bq_dataframes_template.ipynb | 7 + .../getting_started_bq_dataframes.ipynb | 6 + .../ml_fundamentals_bq_dataframes.ipynb | 6 + .../bq_dataframes_ml_linear_regression.ipynb | 6 + .../remote_function_vertex_claude_model.ipynb | 6 + .../bq_dataframes_covid_line_graphs.ipynb | 6 + 10 files changed, 526 insertions(+), 465 deletions(-) diff --git a/notebooks/experimental/semantic_operators.ipynb b/notebooks/experimental/semantic_operators.ipynb index f9c7f67358..b3b989dd43 100644 --- a/notebooks/experimental/semantic_operators.ipynb +++ b/notebooks/experimental/semantic_operators.ipynb @@ -44,6 +44,12 @@ " View on GitHub\n", " \n", " \n", + " \n", + " \n", + " \"BQ\n", + " Open in BQ Studio\n", + " \n", + " \n", "" ] }, @@ -135,24 +141,24 @@ }, { "cell_type": "markdown", - "source": [ - "Specify your GCP project and location." - ], "metadata": { "id": "W8TPUvnsqxhv" - } + }, + "source": [ + "Specify your GCP project and location." + ] }, { "cell_type": "code", - "source": [ - "bpd.options.bigquery.project = 'YOUR_PROJECT_ID'\n", - "bpd.options.bigquery.location = 'US'" - ], + "execution_count": null, "metadata": { "id": "vCkraKOeqJFl" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "bpd.options.bigquery.project = 'YOUR_PROJECT_ID'\n", + "bpd.options.bigquery.location = 'US'" + ] }, { "cell_type": "markdown", @@ -293,16 +299,7 @@ }, "outputs": [ { - "output_type": "execute_result", "data": { - "text/plain": [ - " country city\n", - "0 USA Seattle\n", - "1 Germany Berlin\n", - "2 Japan Kyoto\n", - "\n", - "[3 rows x 2 columns]" - ], "text/html": [ "
\n", "