From c4dd68314b6bce0a79893f2fc2dc0e6f3a231e1d Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 20 Jun 2024 17:17:15 +0000 Subject: [PATCH 01/16] Add to_pubsub streaming method --- bigframes/streaming/__init__.py | 79 +++++++++++++++++++ .../simple_benchmark.py.bytesprocessed | 15 ++++ .../benchmark/simple_benchmark.py.slotmillis | 15 ++++ scripts/benchmark/simple_benchmark_2.py | 27 +++++++ .../simple_benchmark_2.py.bytesprocessed | 15 ++++ .../simple_benchmark_2.py.slotmillis | 15 ++++ scripts/create_bigtable.py | 3 - scripts/create_pubsub.py | 49 ++++++++++++ setup.py | 1 + testing/constraints-3.9.txt | 1 + tests/system/large/test_streaming.py | 25 ++++++ 11 files changed, 242 insertions(+), 3 deletions(-) create mode 100644 scripts/benchmark/simple_benchmark.py.bytesprocessed create mode 100644 scripts/benchmark/simple_benchmark.py.slotmillis create mode 100644 scripts/benchmark/simple_benchmark_2.py create mode 100644 scripts/benchmark/simple_benchmark_2.py.bytesprocessed create mode 100644 scripts/benchmark/simple_benchmark_2.py.slotmillis create mode 100644 scripts/create_pubsub.py diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 16da677ef5..0a79981eb4 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -137,3 +137,82 @@ def to_bigtable( # return the query job to the user for lifetime management return query_job + + +def to_pubsub( + query: str, + topic: str, + bq_client: Optional[bigquery.Client] = None, + job_id: Optional[str] = None, + job_id_prefix: Optional[str] = None, +) -> bigquery.QueryJob: + """Launches a BigQuery continuous query and returns a + QueryJob object for some management functionality. + + This method requires an existing bigtable preconfigured to + accept the continuous query export statement. For instructions + on export to bigtable, see + https://cloud.google.com/bigquery/docs/export-to-bigtable. + + Args: + query (str): + The sql statement to execute as a continuous function. + For example: "SELECT * FROM dataset.table" + This will be wrapped in an EXPORT DATA statement to + launch a continuous query writing to bigtable. + topic (str): + The name of the pubsub topic to export to. + For example: "taxi-rides" + bq_client (str, default None): + The Client object to use for the query. This determines + the project id and location of the query. If None, will + default to the bigframes global session default client. + job_id (str, default None): + If specified, replace the default job id for the query, + see job_id parameter of + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query + job_id_prefix (str, default None): + If specified, a job id prefix for the query, see + job_id_prefix parameter of + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query + + Returns: + google.cloud.bigquery.QueryJob: + See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob + The ongoing query job can be managed using this object. + For example, the job can be cancelled or its error status + can be examined. + """ + # get default client if not passed + if bq_client is None: + bq_client = bigframes.get_global_session().bqclient + + # build export string from parameters + project = bq_client.project + + sql = ( + "EXPORT DATA\n" + "OPTIONS (\n" + "format = 'CLOUD_PUBSUB',\n" + f'uri = "https://pubsub.googleapis.com/projects/{project}/topics/{topic}"\n' + ")\n" + "AS (\n" + f"{query});" + ) + + # override continuous http parameter + job_config = bigquery.job.QueryJobConfig() + job_config_filled = job_config.from_api_repr({"query": {"continuous": True}}) + + # begin the query job + query_job = bq_client.query( + sql, + job_config=job_config_filled, # type:ignore + # typing error above is in bq client library + # (should accept abstract job_config, only takes concrete) + job_id=job_id, + job_id_prefix=job_id_prefix, + ) + + # return the query job to the user for lifetime management + return query_job diff --git a/scripts/benchmark/simple_benchmark.py.bytesprocessed b/scripts/benchmark/simple_benchmark.py.bytesprocessed new file mode 100644 index 0000000000..25c6624ccc --- /dev/null +++ b/scripts/benchmark/simple_benchmark.py.bytesprocessed @@ -0,0 +1,15 @@ +3200000 +0 +0 +3200000 +0 +0 +3200000 +0 +0 +3200000 +0 +0 +3200000 +0 +0 diff --git a/scripts/benchmark/simple_benchmark.py.slotmillis b/scripts/benchmark/simple_benchmark.py.slotmillis new file mode 100644 index 0000000000..5e34396672 --- /dev/null +++ b/scripts/benchmark/simple_benchmark.py.slotmillis @@ -0,0 +1,15 @@ +51800 +30 +287 +54920 +39 +119 +54791 +40 +118 +55302 +20 +134 +53327 +57 +245 diff --git a/scripts/benchmark/simple_benchmark_2.py b/scripts/benchmark/simple_benchmark_2.py new file mode 100644 index 0000000000..53b35c52ad --- /dev/null +++ b/scripts/benchmark/simple_benchmark_2.py @@ -0,0 +1,27 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import bigframes.pandas as bpd + +# This is a placeholder benchmark. +# TODO(340278185): Add more data analysis tasks and benchmark files +# like this one. + +print("Performing simple benchmark.") +df = bpd.DataFrame() +df["column_1"] = bpd.Series([i for i in range(100000)]) +df["column_2"] = bpd.Series([i * 2 for i in range(100000)]) +df["column_3"] = df["column_1"] + df["column_2"] +df.__repr__() +bpd.reset_session() diff --git a/scripts/benchmark/simple_benchmark_2.py.bytesprocessed b/scripts/benchmark/simple_benchmark_2.py.bytesprocessed new file mode 100644 index 0000000000..25c6624ccc --- /dev/null +++ b/scripts/benchmark/simple_benchmark_2.py.bytesprocessed @@ -0,0 +1,15 @@ +3200000 +0 +0 +3200000 +0 +0 +3200000 +0 +0 +3200000 +0 +0 +3200000 +0 +0 diff --git a/scripts/benchmark/simple_benchmark_2.py.slotmillis b/scripts/benchmark/simple_benchmark_2.py.slotmillis new file mode 100644 index 0000000000..742b70b4a8 --- /dev/null +++ b/scripts/benchmark/simple_benchmark_2.py.slotmillis @@ -0,0 +1,15 @@ +55506 +72 +332 +61680 +26 +137 +46204 +64 +132 +65913 +59 +169 +49470 +20 +241 diff --git a/scripts/create_bigtable.py b/scripts/create_bigtable.py index 655e4b31ab..f81bb8a013 100644 --- a/scripts/create_bigtable.py +++ b/scripts/create_bigtable.py @@ -16,13 +16,10 @@ # bigframes.streaming testing if they don't already exist import os -import pathlib import sys import google.cloud.bigtable as bigtable -REPO_ROOT = pathlib.Path(__file__).parent.parent - PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT") if not PROJECT_ID: diff --git a/scripts/create_pubsub.py b/scripts/create_pubsub.py new file mode 100644 index 0000000000..5d25398983 --- /dev/null +++ b/scripts/create_pubsub.py @@ -0,0 +1,49 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script create the bigtable resources required for +# bigframes.streaming testing if they don't already exist + +import os +import sys + +from google.cloud import pubsub_v1 + +PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT") + +if not PROJECT_ID: + print( + "Please set GOOGLE_CLOUD_PROJECT environment variable before running.", + file=sys.stderr, + ) + sys.exit(1) + + +def create_topic(topic_id): + # based on + # https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-create-topic?hl=en + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(PROJECT_ID, topic_id) + + topic = publisher.create_topic(request={"name": topic_path}) + print(f"Created topic: {topic.name}") + + +def main(): + create_topic("penguins") + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index dbd9ce5fc2..79baf1fb23 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ "geopandas >=0.12.2", "google-auth >=2.15.0,<3.0dev", "google-cloud-bigtable >=2.24.0", + "google-cloud-pubsub >=2.21.4", "google-cloud-bigquery[bqstorage,pandas] >=3.16.0", "google-cloud-functions >=1.12.0", "google-cloud-bigquery-connection >=1.12.0", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index bbd7bf0069..5a76698576 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -5,6 +5,7 @@ gcsfs==2023.3.0 geopandas==0.12.2 google-auth==2.15.0 google-cloud-bigtable==2.24.0 +google-cloud-pubsub==2.21.4 google-cloud-bigquery==3.16.0 google-cloud-functions==1.12.0 google-cloud-bigquery-connection==1.12.0 diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index 48db61e5bf..940f1a37f5 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -46,3 +46,28 @@ def test_streaming_to_bigtable(): assert str(query_job.job_id).startswith(job_id_prefix) finally: query_job.cancel() + + +def test_streaming_to_pubsub(): + # launch a continuous query + job_id_prefix = "test_streaming_pubsub_" + sql = """SELECT + island + FROM birds.penguins""" + query_job = bigframes.streaming.to_pubsub( + sql, + "penguins", + job_id=None, + job_id_prefix=job_id_prefix, + ) + + try: + # wait 100 seconds in order to ensure the query doesn't stop + # (i.e. it is continuous) + time.sleep(100) + assert query_job.error_result is None + assert query_job.errors is None + assert query_job.running() + assert str(query_job.job_id).startswith(job_id_prefix) + finally: + query_job.cancel() From a45050e2be696c79f2e68366561ca3307bc7e3ff Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 20 Jun 2024 17:19:15 +0000 Subject: [PATCH 02/16] fix comment --- bigframes/streaming/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 0a79981eb4..8f8826a5b5 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -159,7 +159,7 @@ def to_pubsub( The sql statement to execute as a continuous function. For example: "SELECT * FROM dataset.table" This will be wrapped in an EXPORT DATA statement to - launch a continuous query writing to bigtable. + launch a continuous query writing to pubsub. topic (str): The name of the pubsub topic to export to. For example: "taxi-rides" From 5a149a8492ba1ea52e5220c2ed3bbcfa0585a77b Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 20 Jun 2024 17:23:35 +0000 Subject: [PATCH 03/16] fix comment further --- bigframes/streaming/__init__.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 8f8826a5b5..df63ab72a1 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -149,10 +149,12 @@ def to_pubsub( """Launches a BigQuery continuous query and returns a QueryJob object for some management functionality. - This method requires an existing bigtable preconfigured to - accept the continuous query export statement. For instructions - on export to bigtable, see - https://cloud.google.com/bigquery/docs/export-to-bigtable. + This method requires an existing pubsub topic. For instructions + on creating a pubsub topic, see + https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-create-topic?hl=en + + Note that a service account is a requirement for continuous queries + exporting to pubsub. Args: query (str): From 642485055a7cdc7f97aef72fe82c3a58b03c69aa Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 20 Jun 2024 17:29:08 +0000 Subject: [PATCH 04/16] remove accidental files --- .../benchmark/simple_benchmark.py.bytesprocessed | 15 --------------- scripts/benchmark/simple_benchmark.py.slotmillis | 15 --------------- 2 files changed, 30 deletions(-) delete mode 100644 scripts/benchmark/simple_benchmark.py.bytesprocessed delete mode 100644 scripts/benchmark/simple_benchmark.py.slotmillis diff --git a/scripts/benchmark/simple_benchmark.py.bytesprocessed b/scripts/benchmark/simple_benchmark.py.bytesprocessed deleted file mode 100644 index 25c6624ccc..0000000000 --- a/scripts/benchmark/simple_benchmark.py.bytesprocessed +++ /dev/null @@ -1,15 +0,0 @@ -3200000 -0 -0 -3200000 -0 -0 -3200000 -0 -0 -3200000 -0 -0 -3200000 -0 -0 diff --git a/scripts/benchmark/simple_benchmark.py.slotmillis b/scripts/benchmark/simple_benchmark.py.slotmillis deleted file mode 100644 index 5e34396672..0000000000 --- a/scripts/benchmark/simple_benchmark.py.slotmillis +++ /dev/null @@ -1,15 +0,0 @@ -51800 -30 -287 -54920 -39 -119 -54791 -40 -118 -55302 -20 -134 -53327 -57 -245 From 5252456f9a7f52fe97ab810a7569fbf839e7df23 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 20 Jun 2024 17:30:17 +0000 Subject: [PATCH 05/16] remove more accidental files --- .../simple_benchmark_2.py.bytesprocessed | 15 --------------- .../benchmark/simple_benchmark_2.py.slotmillis | 15 --------------- 2 files changed, 30 deletions(-) delete mode 100644 scripts/benchmark/simple_benchmark_2.py.bytesprocessed delete mode 100644 scripts/benchmark/simple_benchmark_2.py.slotmillis diff --git a/scripts/benchmark/simple_benchmark_2.py.bytesprocessed b/scripts/benchmark/simple_benchmark_2.py.bytesprocessed deleted file mode 100644 index 25c6624ccc..0000000000 --- a/scripts/benchmark/simple_benchmark_2.py.bytesprocessed +++ /dev/null @@ -1,15 +0,0 @@ -3200000 -0 -0 -3200000 -0 -0 -3200000 -0 -0 -3200000 -0 -0 -3200000 -0 -0 diff --git a/scripts/benchmark/simple_benchmark_2.py.slotmillis b/scripts/benchmark/simple_benchmark_2.py.slotmillis deleted file mode 100644 index 742b70b4a8..0000000000 --- a/scripts/benchmark/simple_benchmark_2.py.slotmillis +++ /dev/null @@ -1,15 +0,0 @@ -55506 -72 -332 -61680 -26 -137 -46204 -64 -132 -65913 -59 -169 -49470 -20 -241 From 91e927f33d414780e1316829d9a21bbc78fc394c Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 20 Jun 2024 17:33:02 +0000 Subject: [PATCH 06/16] remove another accidental file --- scripts/benchmark/simple_benchmark_2.py | 27 ------------------------- 1 file changed, 27 deletions(-) delete mode 100644 scripts/benchmark/simple_benchmark_2.py diff --git a/scripts/benchmark/simple_benchmark_2.py b/scripts/benchmark/simple_benchmark_2.py deleted file mode 100644 index 53b35c52ad..0000000000 --- a/scripts/benchmark/simple_benchmark_2.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright 2024 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import bigframes.pandas as bpd - -# This is a placeholder benchmark. -# TODO(340278185): Add more data analysis tasks and benchmark files -# like this one. - -print("Performing simple benchmark.") -df = bpd.DataFrame() -df["column_1"] = bpd.Series([i for i in range(100000)]) -df["column_2"] = bpd.Series([i * 2 for i in range(100000)]) -df["column_3"] = df["column_1"] + df["column_2"] -df.__repr__() -bpd.reset_session() From ea6c065e2739a7f2f521c1baf005b73d0af3a48d Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 20 Jun 2024 21:43:26 +0000 Subject: [PATCH 07/16] use service account --- bigframes/streaming/__init__.py | 71 ++++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 11 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index df63ab72a1..d9fa923b1c 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -26,7 +26,7 @@ def to_bigtable( query: str, instance: str, table: str, - bq_client: Optional[bigquery.Client] = None, + session=None, app_profile: Optional[str] = None, truncate: bool = False, overwrite: bool = False, @@ -91,8 +91,10 @@ def to_bigtable( can be examined. """ # get default client if not passed - if bq_client is None: - bq_client = bigframes.get_global_session().bqclient + if session is None: + session = bigframes.get_global_session() + bq_client = session.bqclient + service_account = create_service_account(session) # build export string from parameters project = bq_client.project @@ -123,7 +125,17 @@ def to_bigtable( # override continuous http parameter job_config = bigquery.job.QueryJobConfig() - job_config_filled = job_config.from_api_repr({"query": {"continuous": True}}) + job_config_filled = job_config.from_api_repr( + { + "query": { + "continuous": True, + "connectionProperties": { + "key": "service_account", + "value": service_account, + }, + } + } + ) # begin the query job query_job = bq_client.query( @@ -142,7 +154,7 @@ def to_bigtable( def to_pubsub( query: str, topic: str, - bq_client: Optional[bigquery.Client] = None, + session=None, job_id: Optional[str] = None, job_id_prefix: Optional[str] = None, ) -> bigquery.QueryJob: @@ -186,17 +198,18 @@ def to_pubsub( can be examined. """ # get default client if not passed - if bq_client is None: - bq_client = bigframes.get_global_session().bqclient + if session is None: + session = bigframes.get_global_session() + bq_client = session.bqclient - # build export string from parameters - project = bq_client.project + service_account = create_service_account(session) + # build export string from parameters sql = ( "EXPORT DATA\n" "OPTIONS (\n" "format = 'CLOUD_PUBSUB',\n" - f'uri = "https://pubsub.googleapis.com/projects/{project}/topics/{topic}"\n' + f'uri = "https://pubsub.googleapis.com/projects/{bq_client.project}/topics/{topic}"\n' ")\n" "AS (\n" f"{query});" @@ -204,7 +217,17 @@ def to_pubsub( # override continuous http parameter job_config = bigquery.job.QueryJobConfig() - job_config_filled = job_config.from_api_repr({"query": {"continuous": True}}) + job_config_filled = job_config.from_api_repr( + { + "query": { + "continuous": True, + "connectionProperties": { + "key": "service_account", + "value": service_account, + }, + } + } + ) # begin the query job query_job = bq_client.query( @@ -218,3 +241,29 @@ def to_pubsub( # return the query job to the user for lifetime management return query_job + + +def create_service_account(session): + # create service account + bq_connection_manager = session.bqconnectionmanager + connection_name = session._bq_connection + connection_name = bigframes.clients.resolve_full_bq_connection_name( + connection_name, + default_project=session._project, + default_location=session._location, + ) + connection_name_parts = connection_name.split(".") + if len(connection_name_parts) != 3: + raise ValueError( + f"connection_name must be of the format .., got {connection_name}." + ) + service_account = bq_connection_manager._get_service_account_if_connection_exists( + connection_name_parts[0], + connection_name_parts[1], + connection_name_parts[2], + ) + project = session.bqclient.project + bq_connection_manager._ensure_iam_binding( + project, service_account, "pubsub.publisher" + ) + bq_connection_manager._ensure_iam_binding(project, service_account, "pubsub.viewer") From 583ae51ece2b53efe53c68aa991245d8399c36dc Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 20 Jun 2024 21:48:35 +0000 Subject: [PATCH 08/16] fix return value --- bigframes/streaming/__init__.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index d9fa923b1c..a009c77210 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -26,7 +26,7 @@ def to_bigtable( query: str, instance: str, table: str, - session=None, + session: Optional[bigframes.Session] = None, app_profile: Optional[str] = None, truncate: bool = False, overwrite: bool = False, @@ -53,10 +53,10 @@ def to_bigtable( The name of the bigtable instance to export to. table (str): The name of the bigtable table to export to. - bq_client (str, default None): - The Client object to use for the query. This determines + session (bigframes.Session, default None): + The session object to use for the query. This determines the project id and location of the query. If None, will - default to the bigframes global session default client. + default to the bigframes global session. app_profile (str, default None): The bigtable app profile to export to. If None, no app profile will be used. @@ -154,7 +154,7 @@ def to_bigtable( def to_pubsub( query: str, topic: str, - session=None, + session: Optional[bigframes.Session] = None, job_id: Optional[str] = None, job_id_prefix: Optional[str] = None, ) -> bigquery.QueryJob: @@ -177,10 +177,10 @@ def to_pubsub( topic (str): The name of the pubsub topic to export to. For example: "taxi-rides" - bq_client (str, default None): - The Client object to use for the query. This determines + session (bigframes.Session, default None): + The session object to use for the query. This determines the project id and location of the query. If None, will - default to the bigframes global session default client. + default to the bigframes global session. job_id (str, default None): If specified, replace the default job id for the query, see job_id parameter of @@ -243,7 +243,7 @@ def to_pubsub( return query_job -def create_service_account(session): +def create_service_account(session: bigframes.Session) -> str: # create service account bq_connection_manager = session.bqconnectionmanager connection_name = session._bq_connection @@ -267,3 +267,4 @@ def create_service_account(session): project, service_account, "pubsub.publisher" ) bq_connection_manager._ensure_iam_binding(project, service_account, "pubsub.viewer") + return service_account From 2347f170e39a44deff502e824b5eb60166281dcd Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Thu, 20 Jun 2024 22:57:26 +0000 Subject: [PATCH 09/16] pass session --- bigframes/streaming/__init__.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index a009c77210..d948649a73 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -15,7 +15,7 @@ """Module for bigquery continuous queries""" import json -from typing import Optional +from typing import List, Optional from google.cloud import bigquery @@ -94,7 +94,7 @@ def to_bigtable( if session is None: session = bigframes.get_global_session() bq_client = session.bqclient - service_account = create_service_account(session) + service_account = create_service_account(session, ["bigtable.user"]) # build export string from parameters project = bq_client.project @@ -202,7 +202,9 @@ def to_pubsub( session = bigframes.get_global_session() bq_client = session.bqclient - service_account = create_service_account(session) + service_account = create_service_account( + session, ["pubsub.publisher", "pubsub.viewer"] + ) # build export string from parameters sql = ( @@ -243,7 +245,7 @@ def to_pubsub( return query_job -def create_service_account(session: bigframes.Session) -> str: +def create_service_account(session: bigframes.Session, roles: List[str]) -> str: # create service account bq_connection_manager = session.bqconnectionmanager connection_name = session._bq_connection @@ -263,8 +265,6 @@ def create_service_account(session: bigframes.Session) -> str: connection_name_parts[2], ) project = session.bqclient.project - bq_connection_manager._ensure_iam_binding( - project, service_account, "pubsub.publisher" - ) - bq_connection_manager._ensure_iam_binding(project, service_account, "pubsub.viewer") + for role in roles: + bq_connection_manager._ensure_iam_binding(project, service_account, role) return service_account From 58285db6b5d45df1ee82b843deb234438ccea81a Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Fri, 21 Jun 2024 01:08:35 +0000 Subject: [PATCH 10/16] have the user provide the service account --- bigframes/streaming/__init__.py | 51 +++++++++------------------- tests/system/large/test_streaming.py | 2 ++ 2 files changed, 18 insertions(+), 35 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index d948649a73..9433c10f93 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -15,7 +15,7 @@ """Module for bigquery continuous queries""" import json -from typing import List, Optional +from typing import Optional from google.cloud import bigquery @@ -26,6 +26,7 @@ def to_bigtable( query: str, instance: str, table: str, + service_account: Optional[str] = None, session: Optional[bigframes.Session] = None, app_profile: Optional[str] = None, truncate: bool = False, @@ -53,6 +54,11 @@ def to_bigtable( The name of the bigtable instance to export to. table (str): The name of the bigtable table to export to. + service_account (str): + Full name of the service account to run the continuous query. + Example: accountname@projectname.gserviceaccounts.com + If not provided, the user account will be used, but this + limits the lifetime of the continuous query. session (bigframes.Session, default None): The session object to use for the query. This determines the project id and location of the query. If None, will @@ -94,7 +100,6 @@ def to_bigtable( if session is None: session = bigframes.get_global_session() bq_client = session.bqclient - service_account = create_service_account(session, ["bigtable.user"]) # build export string from parameters project = bq_client.project @@ -129,13 +134,14 @@ def to_bigtable( { "query": { "continuous": True, - "connectionProperties": { - "key": "service_account", - "value": service_account, - }, } } ) + if service_account is not None: + job_config_filled["query"]["connectionProperties"] = { + "key": "service_account", + "value": service_account, + } # begin the query job query_job = bq_client.query( @@ -154,6 +160,7 @@ def to_bigtable( def to_pubsub( query: str, topic: str, + service_account: str, session: Optional[bigframes.Session] = None, job_id: Optional[str] = None, job_id_prefix: Optional[str] = None, @@ -177,6 +184,9 @@ def to_pubsub( topic (str): The name of the pubsub topic to export to. For example: "taxi-rides" + service_account (str): + Full name of the service account to run the continuous query. + Example: accountname@projectname.gserviceaccounts.com session (bigframes.Session, default None): The session object to use for the query. This determines the project id and location of the query. If None, will @@ -202,10 +212,6 @@ def to_pubsub( session = bigframes.get_global_session() bq_client = session.bqclient - service_account = create_service_account( - session, ["pubsub.publisher", "pubsub.viewer"] - ) - # build export string from parameters sql = ( "EXPORT DATA\n" @@ -243,28 +249,3 @@ def to_pubsub( # return the query job to the user for lifetime management return query_job - - -def create_service_account(session: bigframes.Session, roles: List[str]) -> str: - # create service account - bq_connection_manager = session.bqconnectionmanager - connection_name = session._bq_connection - connection_name = bigframes.clients.resolve_full_bq_connection_name( - connection_name, - default_project=session._project, - default_location=session._location, - ) - connection_name_parts = connection_name.split(".") - if len(connection_name_parts) != 3: - raise ValueError( - f"connection_name must be of the format .., got {connection_name}." - ) - service_account = bq_connection_manager._get_service_account_if_connection_exists( - connection_name_parts[0], - connection_name_parts[1], - connection_name_parts[2], - ) - project = session.bqclient.project - for role in roles: - bq_connection_manager._ensure_iam_binding(project, service_account, role) - return service_account diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index 940f1a37f5..0b753d7699 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -27,6 +27,7 @@ def test_streaming_to_bigtable(): sql, "streaming-testing-instance", "table-testing", + service_account="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", app_profile=None, truncate=True, overwrite=True, @@ -57,6 +58,7 @@ def test_streaming_to_pubsub(): query_job = bigframes.streaming.to_pubsub( sql, "penguins", + "streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", job_id=None, job_id_prefix=job_id_prefix, ) From 76ed8e26f1e2abb1b7dd174e770135f8369f3d46 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Fri, 21 Jun 2024 01:17:10 +0000 Subject: [PATCH 11/16] fix mypy error --- bigframes/streaming/__init__.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 9433c10f93..72bcaf6ca4 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -130,18 +130,14 @@ def to_bigtable( # override continuous http parameter job_config = bigquery.job.QueryJobConfig() - job_config_filled = job_config.from_api_repr( - { - "query": { - "continuous": True, - } - } - ) + + job_config_dict: dict = {"query": {"continuous": True}} if service_account is not None: - job_config_filled["query"]["connectionProperties"] = { + job_config_dict["query"]["connectionProperties"] = { "key": "service_account", "value": service_account, } + job_config_filled = job_config.from_api_repr(job_config_dict) # begin the query job query_job = bq_client.query( From 4e1e5790f56bd590d7a5d1493529beab2de9e5aa Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Fri, 21 Jun 2024 19:33:24 +0000 Subject: [PATCH 12/16] address comments --- bigframes/streaming/__init__.py | 17 +++++++++++++++++ tests/system/large/test_streaming.py | 10 +++++----- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 72bcaf6ca4..b3e8b71873 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -16,6 +16,7 @@ import json from typing import Optional +import warnings from google.cloud import bigquery @@ -24,6 +25,7 @@ def to_bigtable( query: str, + *, instance: str, table: str, service_account: Optional[str] = None, @@ -96,6 +98,12 @@ def to_bigtable( For example, the job can be cancelled or its error status can be examined. """ + warnings.warn( + "The bigframes.streaming module is a preview feature, and subject to change.", + stacklevel=1, + category=bigframes.exceptions.PreviewWarning, + ) + # get default client if not passed if session is None: session = bigframes.get_global_session() @@ -138,6 +146,7 @@ def to_bigtable( "value": service_account, } job_config_filled = job_config.from_api_repr(job_config_dict) + job_config_filled.labels = {"bigframes-api": "streaming.to_bigtable"} # begin the query job query_job = bq_client.query( @@ -155,6 +164,7 @@ def to_bigtable( def to_pubsub( query: str, + *, topic: str, service_account: str, session: Optional[bigframes.Session] = None, @@ -203,6 +213,12 @@ def to_pubsub( For example, the job can be cancelled or its error status can be examined. """ + warnings.warn( + "The bigframes.streaming module is a preview feature, and subject to change.", + stacklevel=1, + category=bigframes.exceptions.PreviewWarning, + ) + # get default client if not passed if session is None: session = bigframes.get_global_session() @@ -232,6 +248,7 @@ def to_pubsub( } } ) + job_config_filled.labels = {"bigframes-api": "streaming.to_pubsub"} # begin the query job query_job = bq_client.query( diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index 0b753d7699..ae3dc879b6 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -25,9 +25,9 @@ def test_streaming_to_bigtable(): FROM birds.penguins""" query_job = bigframes.streaming.to_bigtable( sql, - "streaming-testing-instance", - "table-testing", - service_account="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", + instance="streaming-testing-instance", + table="table-testing", + service_account="streaming@henryjsolberg-prod.iam.gserviceaccount.com", app_profile=None, truncate=True, overwrite=True, @@ -57,8 +57,8 @@ def test_streaming_to_pubsub(): FROM birds.penguins""" query_job = bigframes.streaming.to_pubsub( sql, - "penguins", - "streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", + topic="penguins", + service_account="streaming@henryjsolberg-prod.iam.gserviceaccount.com", job_id=None, job_id_prefix=job_id_prefix, ) From 5292f05c5d6ffa85a31cf954e3db6cb89cc0c208 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Fri, 21 Jun 2024 20:14:56 +0000 Subject: [PATCH 13/16] update service account name --- tests/system/large/test_streaming.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index ae3dc879b6..dc52f87a86 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -27,7 +27,7 @@ def test_streaming_to_bigtable(): sql, instance="streaming-testing-instance", table="table-testing", - service_account="streaming@henryjsolberg-prod.iam.gserviceaccount.com", + service_account="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", app_profile=None, truncate=True, overwrite=True, @@ -58,7 +58,7 @@ def test_streaming_to_pubsub(): query_job = bigframes.streaming.to_pubsub( sql, topic="penguins", - service_account="streaming@henryjsolberg-prod.iam.gserviceaccount.com", + service_account="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", job_id=None, job_id_prefix=job_id_prefix, ) From a3e5c1a18d545fcd0e760c2619a966d116061275 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Fri, 21 Jun 2024 20:32:57 +0000 Subject: [PATCH 14/16] fix invalid character in label --- bigframes/streaming/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index b3e8b71873..b9fbc20c92 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -146,7 +146,7 @@ def to_bigtable( "value": service_account, } job_config_filled = job_config.from_api_repr(job_config_dict) - job_config_filled.labels = {"bigframes-api": "streaming.to_bigtable"} + job_config_filled.labels = {"bigframes-api": "streaming_to_bigtable"} # begin the query job query_job = bq_client.query( @@ -248,7 +248,7 @@ def to_pubsub( } } ) - job_config_filled.labels = {"bigframes-api": "streaming.to_pubsub"} + job_config_filled.labels = {"bigframes-api": "streaming_to_pubsub"} # begin the query job query_job = bq_client.query( From f34d58093a7cc15bbd34d34e95a762904b00ddca Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Tue, 25 Jun 2024 01:27:35 +0000 Subject: [PATCH 15/16] rename service_account to service_account_email --- bigframes/streaming/__init__.py | 14 +++++++------- tests/system/large/test_streaming.py | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index b9fbc20c92..0b6fd18561 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -28,7 +28,7 @@ def to_bigtable( *, instance: str, table: str, - service_account: Optional[str] = None, + service_account_email: Optional[str] = None, session: Optional[bigframes.Session] = None, app_profile: Optional[str] = None, truncate: bool = False, @@ -56,7 +56,7 @@ def to_bigtable( The name of the bigtable instance to export to. table (str): The name of the bigtable table to export to. - service_account (str): + service_account_email (str): Full name of the service account to run the continuous query. Example: accountname@projectname.gserviceaccounts.com If not provided, the user account will be used, but this @@ -140,10 +140,10 @@ def to_bigtable( job_config = bigquery.job.QueryJobConfig() job_config_dict: dict = {"query": {"continuous": True}} - if service_account is not None: + if service_account_email is not None: job_config_dict["query"]["connectionProperties"] = { "key": "service_account", - "value": service_account, + "value": service_account_email, } job_config_filled = job_config.from_api_repr(job_config_dict) job_config_filled.labels = {"bigframes-api": "streaming_to_bigtable"} @@ -166,7 +166,7 @@ def to_pubsub( query: str, *, topic: str, - service_account: str, + service_account_email: str, session: Optional[bigframes.Session] = None, job_id: Optional[str] = None, job_id_prefix: Optional[str] = None, @@ -190,7 +190,7 @@ def to_pubsub( topic (str): The name of the pubsub topic to export to. For example: "taxi-rides" - service_account (str): + service_account_email (str): Full name of the service account to run the continuous query. Example: accountname@projectname.gserviceaccounts.com session (bigframes.Session, default None): @@ -243,7 +243,7 @@ def to_pubsub( "continuous": True, "connectionProperties": { "key": "service_account", - "value": service_account, + "value": service_account_email, }, } } diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index dc52f87a86..fbac68f82e 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -27,7 +27,7 @@ def test_streaming_to_bigtable(): sql, instance="streaming-testing-instance", table="table-testing", - service_account="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", + service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", app_profile=None, truncate=True, overwrite=True, @@ -58,7 +58,7 @@ def test_streaming_to_pubsub(): query_job = bigframes.streaming.to_pubsub( sql, topic="penguins", - service_account="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", + service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", job_id=None, job_id_prefix=job_id_prefix, ) From ef5d7d7872261263e0fe51c63637c7266dbc3d52 Mon Sep 17 00:00:00 2001 From: milkshakeiii Date: Wed, 26 Jun 2024 17:54:50 +0000 Subject: [PATCH 16/16] move streaming tests to own tables --- tests/system/large/test_streaming.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index fbac68f82e..c125fde15a 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -22,7 +22,7 @@ def test_streaming_to_bigtable(): job_id_prefix = "test_streaming_" sql = """SELECT body_mass_g, island as rowkey - FROM birds.penguins""" + FROM birds.penguins_bigtable_streaming""" query_job = bigframes.streaming.to_bigtable( sql, instance="streaming-testing-instance", @@ -54,7 +54,7 @@ def test_streaming_to_pubsub(): job_id_prefix = "test_streaming_pubsub_" sql = """SELECT island - FROM birds.penguins""" + FROM birds.penguins_pubsub_streaming""" query_job = bigframes.streaming.to_pubsub( sql, topic="penguins",