Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: Add bigframes.streaming.to_pubsub method to create continuous query that writes to Pub/Sub #801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 132 additions & 7 deletions 139 bigframes/streaming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import json
from typing import Optional
import warnings

from google.cloud import bigquery

Expand All @@ -24,9 +25,11 @@

def to_bigtable(
query: str,
*,
instance: str,
table: str,
bq_client: Optional[bigquery.Client] = None,
service_account_email: Optional[str] = None,
session: Optional[bigframes.Session] = None,
app_profile: Optional[str] = None,
truncate: bool = False,
overwrite: bool = False,
Expand All @@ -53,10 +56,15 @@ def to_bigtable(
The name of the bigtable instance to export to.
table (str):
The name of the bigtable table to export to.
bq_client (str, default None):
The Client object to use for the query. This determines
service_account_email (str):
Full name of the service account to run the continuous query.
Example: accountname@projectname.gserviceaccounts.com
If not provided, the user account will be used, but this
limits the lifetime of the continuous query.
session (bigframes.Session, default None):
The session object to use for the query. This determines
the project id and location of the query. If None, will
default to the bigframes global session default client.
default to the bigframes global session.
app_profile (str, default None):
The bigtable app profile to export to. If None, no app
profile will be used.
Expand Down Expand Up @@ -90,9 +98,16 @@ def to_bigtable(
For example, the job can be cancelled or its error status
can be examined.
"""
warnings.warn(
"The bigframes.streaming module is a preview feature, and subject to change.",
stacklevel=1,
category=bigframes.exceptions.PreviewWarning,
)

# get default client if not passed
if bq_client is None:
bq_client = bigframes.get_global_session().bqclient
if session is None:
session = bigframes.get_global_session()
bq_client = session.bqclient

# build export string from parameters
project = bq_client.project
Expand Down Expand Up @@ -123,7 +138,117 @@ def to_bigtable(

# override continuous http parameter
job_config = bigquery.job.QueryJobConfig()
job_config_filled = job_config.from_api_repr({"query": {"continuous": True}})

job_config_dict: dict = {"query": {"continuous": True}}
if service_account_email is not None:
job_config_dict["query"]["connectionProperties"] = {
"key": "service_account",
"value": service_account_email,
}
job_config_filled = job_config.from_api_repr(job_config_dict)
job_config_filled.labels = {"bigframes-api": "streaming_to_bigtable"}

# begin the query job
query_job = bq_client.query(
sql,
job_config=job_config_filled, # type:ignore
# typing error above is in bq client library
# (should accept abstract job_config, only takes concrete)
job_id=job_id,
job_id_prefix=job_id_prefix,
)

# return the query job to the user for lifetime management
return query_job


def to_pubsub(
milkshakeiii marked this conversation as resolved.
Show resolved Hide resolved
query: str,
*,
topic: str,
service_account_email: str,
session: Optional[bigframes.Session] = None,
job_id: Optional[str] = None,
job_id_prefix: Optional[str] = None,
) -> bigquery.QueryJob:
"""Launches a BigQuery continuous query and returns a
QueryJob object for some management functionality.

This method requires an existing pubsub topic. For instructions
on creating a pubsub topic, see
https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-create-topic?hl=en

Note that a service account is a requirement for continuous queries
exporting to pubsub.

Args:
query (str):
The sql statement to execute as a continuous function.
For example: "SELECT * FROM dataset.table"
This will be wrapped in an EXPORT DATA statement to
launch a continuous query writing to pubsub.
topic (str):
The name of the pubsub topic to export to.
For example: "taxi-rides"
service_account_email (str):
Full name of the service account to run the continuous query.
Example: accountname@projectname.gserviceaccounts.com
session (bigframes.Session, default None):
The session object to use for the query. This determines
the project id and location of the query. If None, will
default to the bigframes global session.
job_id (str, default None):
If specified, replace the default job id for the query,
see job_id parameter of
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
job_id_prefix (str, default None):
If specified, a job id prefix for the query, see
job_id_prefix parameter of
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query

Returns:
google.cloud.bigquery.QueryJob:
See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob
The ongoing query job can be managed using this object.
For example, the job can be cancelled or its error status
can be examined.
"""
warnings.warn(
"The bigframes.streaming module is a preview feature, and subject to change.",
stacklevel=1,
category=bigframes.exceptions.PreviewWarning,
)

# get default client if not passed
if session is None:
session = bigframes.get_global_session()
bq_client = session.bqclient

# build export string from parameters
sql = (
"EXPORT DATA\n"
"OPTIONS (\n"
"format = 'CLOUD_PUBSUB',\n"
f'uri = "https://pubsub.googleapis.com/projects/{bq_client.project}/topics/{topic}"\n'
")\n"
"AS (\n"
f"{query});"
)

# override continuous http parameter
job_config = bigquery.job.QueryJobConfig()
job_config_filled = job_config.from_api_repr(
{
"query": {
"continuous": True,
"connectionProperties": {
"key": "service_account",
"value": service_account_email,
},
}
}
)
job_config_filled.labels = {"bigframes-api": "streaming_to_pubsub"}

# begin the query job
query_job = bq_client.query(
milkshakeiii marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
3 changes: 0 additions & 3 deletions 3 scripts/create_bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
# bigframes.streaming testing if they don't already exist

import os
import pathlib
import sys

import google.cloud.bigtable as bigtable

REPO_ROOT = pathlib.Path(__file__).parent.parent

PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")

if not PROJECT_ID:
Expand Down
49 changes: 49 additions & 0 deletions 49 scripts/create_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This script create the bigtable resources required for
# bigframes.streaming testing if they don't already exist

import os
import sys

from google.cloud import pubsub_v1

PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")

if not PROJECT_ID:
print(
"Please set GOOGLE_CLOUD_PROJECT environment variable before running.",
file=sys.stderr,
)
sys.exit(1)


def create_topic(topic_id):
# based on
# https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-create-topic?hl=en

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, topic_id)

topic = publisher.create_topic(request={"name": topic_path})
print(f"Created topic: {topic.name}")


def main():
create_topic("penguins")


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions 1 setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"geopandas >=0.12.2",
"google-auth >=2.15.0,<3.0dev",
"google-cloud-bigtable >=2.24.0",
"google-cloud-pubsub >=2.21.4",
"google-cloud-bigquery[bqstorage,pandas] >=3.16.0",
"google-cloud-functions >=1.12.0",
"google-cloud-bigquery-connection >=1.12.0",
Expand Down
1 change: 1 addition & 0 deletions 1 testing/constraints-3.9.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ gcsfs==2023.3.0
geopandas==0.12.2
google-auth==2.15.0
google-cloud-bigtable==2.24.0
google-cloud-pubsub==2.21.4
google-cloud-bigquery==3.16.0
google-cloud-functions==1.12.0
google-cloud-bigquery-connection==1.12.0
Expand Down
33 changes: 30 additions & 3 deletions 33 tests/system/large/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ def test_streaming_to_bigtable():
job_id_prefix = "test_streaming_"
sql = """SELECT
body_mass_g, island as rowkey
FROM birds.penguins"""
FROM birds.penguins_bigtable_streaming"""
query_job = bigframes.streaming.to_bigtable(
sql,
"streaming-testing-instance",
"table-testing",
instance="streaming-testing-instance",
table="table-testing",
service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com",
app_profile=None,
truncate=True,
overwrite=True,
Expand All @@ -46,3 +47,29 @@ def test_streaming_to_bigtable():
assert str(query_job.job_id).startswith(job_id_prefix)
finally:
query_job.cancel()


def test_streaming_to_pubsub():
# launch a continuous query
job_id_prefix = "test_streaming_pubsub_"
sql = """SELECT
island
FROM birds.penguins_pubsub_streaming"""
query_job = bigframes.streaming.to_pubsub(
sql,
topic="penguins",
service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com",
job_id=None,
job_id_prefix=job_id_prefix,
)

try:
# wait 100 seconds in order to ensure the query doesn't stop
# (i.e. it is continuous)
time.sleep(100)
assert query_job.error_result is None
assert query_job.errors is None
assert query_job.running()
assert str(query_job.job_id).startswith(job_id_prefix)
milkshakeiii marked this conversation as resolved.
Show resolved Hide resolved
finally:
query_job.cancel()
Morty Proxy This is a proxified and sanitized view of the page, visit original site.