Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 93429e0

Browse filesBrowse files
docs(samples): Add Dataflow to Pub/Sub snippet (GoogleCloudPlatform#11104)
* docs(samples): Add Dataflow to Pub/Sub snippet Adds a snippet that shows how to write messages to Pub/Sub from Dataflow. * Fix region tag * Skip Python 3.12 test see apache/beam#29149 * Fix copyright date * Improve IT by reading messages from Pub/Sub * Incorporate review feedback * Fix testfor 3.8 * Fix linter error
1 parent da40d4e commit 93429e0
Copy full SHA for 93429e0

File tree

Expand file treeCollapse file tree

5 files changed

+222
-4
lines changed
Filter options
Expand file treeCollapse file tree

5 files changed

+222
-4
lines changed

‎dataflow/snippets/batch_write_storage.py

Copy file name to clipboardExpand all lines: dataflow/snippets/batch_write_storage.py
+7-2Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,22 @@
1414
# limitations under the License.
1515

1616
# [START dataflow_batch_write_to_storage]
17+
import argparse
18+
from typing import List
19+
1720
import apache_beam as beam
1821
from apache_beam.io.textio import WriteToText
1922
from apache_beam.options.pipeline_options import PipelineOptions
2023

24+
from typing_extensions import Self
25+
2126

22-
def write_to_cloud_storage(argv=None):
27+
def write_to_cloud_storage(argv : List[str] = None) -> None:
2328
# Parse the pipeline options passed into the application.
2429
class MyOptions(PipelineOptions):
2530
@classmethod
2631
# Define a custom pipeline option that specfies the Cloud Storage bucket.
27-
def _add_argparse_args(cls, parser):
32+
def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
2833
parser.add_argument("--output", required=True)
2934

3035
wordsList = ["1", "2", "3", "4"]

‎dataflow/snippets/noxfile_config.py

Copy file name to clipboard
+42Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Default TEST_CONFIG_OVERRIDE for python repos.
16+
17+
# You can copy this file into your directory, then it will be imported from
18+
# the noxfile.py.
19+
20+
# The source of truth:
21+
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py
22+
23+
TEST_CONFIG_OVERRIDE = {
24+
# You can opt out from the test for specific Python versions.
25+
"ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.12"],
26+
# Old samples are opted out of enforcing Python type hints
27+
# All new samples should feature them
28+
"enforce_type_hints": True,
29+
# An envvar key for determining the project id to use. Change it
30+
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
31+
# build specific Cloud project. You can also use your own string
32+
# to use your own Cloud project.
33+
"gcloud_project_env": "GOOGLE_CLOUD_PROJECT",
34+
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',
35+
# If you need to use a specific version of pip,
36+
# change pip_version_override to the string representation
37+
# of the version number, for example, "20.2.4"
38+
"pip_version_override": None,
39+
# A dictionary you want to inject into your test. Don't put any
40+
# secrets here. These values will override predefined values.
41+
"envs": {},
42+
}

‎dataflow/snippets/tests/test_batch_write_storage.py

Copy file name to clipboardExpand all lines: dataflow/snippets/tests/test_batch_write_storage.py
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@
2626

2727

2828
@pytest.fixture(scope="function")
29-
def setup_and_teardown():
29+
def setup_and_teardown() -> None:
3030
try:
3131
bucket = storage_client.create_bucket(bucket_name)
3232
yield
3333
finally:
3434
bucket.delete(force=True)
3535

3636

37-
def test_write_to_cloud_storage(setup_and_teardown):
37+
def test_write_to_cloud_storage(setup_and_teardown: None) -> None:
3838
sys.argv = ['', f'--output=gs://{bucket_name}/output/out-']
3939
write_to_cloud_storage()
4040

+96Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# !/usr/bin/env python
2+
# Copyright 2024 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import os
17+
import time
18+
from unittest.mock import patch
19+
import uuid
20+
21+
from google.cloud import pubsub_v1
22+
23+
import pytest
24+
25+
from ..write_pubsub import write_to_pubsub
26+
27+
28+
topic_id = f'test-topic-{uuid.uuid4()}'
29+
subscription_id = f'{topic_id}-sub'
30+
project_id = os.environ["GOOGLE_CLOUD_PROJECT"]
31+
32+
publisher = pubsub_v1.PublisherClient()
33+
subscriber = pubsub_v1.SubscriberClient()
34+
35+
NUM_MESSAGES = 4
36+
TIMEOUT = 60 * 5
37+
38+
39+
@pytest.fixture(scope="function")
40+
def setup_and_teardown() -> None:
41+
topic_path = publisher.topic_path(project_id, topic_id)
42+
subscription_path = subscriber.subscription_path(project_id, subscription_id)
43+
44+
try:
45+
publisher.create_topic(request={"name": topic_path})
46+
subscriber.create_subscription(
47+
request={"name": subscription_path, "topic": topic_path}
48+
)
49+
yield
50+
finally:
51+
subscriber.delete_subscription(
52+
request={"subscription": subscription_path})
53+
publisher.delete_topic(request={"topic": topic_path})
54+
55+
56+
def read_messages() -> None:
57+
received_messages = []
58+
ack_ids = []
59+
60+
# Read messages from Pub/Sub. It might be necessary to read multiple
61+
# batches, Use a timeout value to avoid potentially looping forever.
62+
start_time = time.time()
63+
while time.time() - start_time <= TIMEOUT:
64+
# Pull messages from Pub/Sub.
65+
subscription_path = subscriber.subscription_path(project_id, subscription_id)
66+
response = subscriber.pull(
67+
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES}
68+
)
69+
received_messages.append(response.received_messages)
70+
71+
for received_message in response.received_messages:
72+
ack_ids.append(received_message.ack_id)
73+
74+
# Acknowledge the received messages so they will not be sent again.
75+
subscriber.acknowledge(
76+
request={"subscription": subscription_path, "ack_ids": ack_ids}
77+
)
78+
79+
if (len(received_messages) >= NUM_MESSAGES):
80+
break
81+
82+
time.sleep(5)
83+
84+
return received_messages
85+
86+
87+
def test_write_to_pubsub(setup_and_teardown: None) -> None:
88+
with patch("sys.argv", [
89+
"", '--streaming', f'--project={project_id}', f'--topic={topic_id}'
90+
]):
91+
write_to_pubsub()
92+
93+
# Read from Pub/Sub to verify the pipeline successfully wrote messages.
94+
# Duplicate reads are possible.
95+
messages = read_messages()
96+
assert (len(messages) >= NUM_MESSAGES)

‎dataflow/snippets/write_pubsub.py

Copy file name to clipboard
+75Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#!/usr/bin/env python
2+
# Copyright 2024 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# [START dataflow_pubsub_write_with_attributes]i
17+
import argparse
18+
from typing import Any, Dict, List
19+
20+
import apache_beam as beam
21+
from apache_beam.io import PubsubMessage
22+
from apache_beam.io import WriteToPubSub
23+
from apache_beam.options.pipeline_options import PipelineOptions
24+
25+
from typing_extensions import Self
26+
27+
28+
def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
29+
attributes = {
30+
'buyer': item['name'],
31+
'timestamp': str(item['ts'])
32+
}
33+
data = bytes(item['product'], 'utf-8')
34+
35+
return PubsubMessage(data=data, attributes=attributes)
36+
37+
38+
def write_to_pubsub(argv: List[str] = None) -> None:
39+
40+
# Parse the pipeline options passed into the application. Example:
41+
# --project=$PROJECT_ID --topic=$TOPIC_NAME --streaming
42+
# For more information, see
43+
# https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
44+
class MyOptions(PipelineOptions):
45+
@classmethod
46+
# Define custom pipeline options that specify the project ID and Pub/Sub
47+
# topic.
48+
def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
49+
parser.add_argument("--project", required=True)
50+
parser.add_argument("--topic", required=True)
51+
52+
example_data = [
53+
{'name': 'Robert', 'product': 'TV', 'ts': 1613141590000},
54+
{'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000},
55+
{'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000},
56+
{'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000}
57+
]
58+
options = MyOptions()
59+
60+
with beam.Pipeline(options=options) as pipeline:
61+
(
62+
pipeline
63+
| "Create elements" >> beam.Create(example_data)
64+
| "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
65+
| WriteToPubSub(
66+
topic=f'projects/{options.project}/topics/{options.topic}',
67+
with_attributes=True)
68+
)
69+
70+
print('Pipeline ran successfully.')
71+
# [END dataflow_pubsub_write_with_attributes]
72+
73+
74+
if __name__ == "__main__":
75+
write_to_pubsub()

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.