Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit cce955f

Browse filesBrowse files
Pub/Sub end-to-end sample (GoogleCloudPlatform#1800)
* Created new end-to-end sample, moved old sample * Add space around operator
1 parent e0c7cca commit cce955f
Copy full SHA for cce955f

File tree

Expand file treeCollapse file tree

3 files changed

+123
-28
lines changed
Filter options
Expand file treeCollapse file tree

3 files changed

+123
-28
lines changed

‎pubsub/cloud-client/publisher.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/publisher.py
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def list_topics(project_id):
4141

4242
def create_topic(project_id, topic_name):
4343
"""Create a new Pub/Sub topic."""
44+
# [START pubsub_quickstart_create_topic]
4445
# [START pubsub_create_topic]
4546
from google.cloud import pubsub_v1
4647

@@ -53,6 +54,7 @@ def create_topic(project_id, topic_name):
5354
topic = publisher.create_topic(topic_path)
5455

5556
print('Topic created: {}'.format(topic))
57+
# [END pubsub_quickstart_create_topic]
5658
# [END pubsub_create_topic]
5759

5860

‎pubsub/cloud-client/quickstart.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/quickstart.py
+80-11Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,94 @@
1515
# limitations under the License.
1616

1717

18-
def run_quickstart():
19-
# [START pubsub_quickstart_create_topic]
20-
# Imports the Google Cloud client library
18+
import argparse
19+
20+
21+
def end_to_end(project_id, topic_name, subscription_name, num_messages):
22+
# [START pubsub_end_to_end]
23+
import time
24+
2125
from google.cloud import pubsub_v1
2226

23-
# Instantiates a client
27+
# TODO project_id = "Your Google Cloud Project ID"
28+
# TODO topic_name = "Your Pub/Sub topic name"
29+
# TODO num_messages = number of messages to test end-to-end
30+
31+
# Instantiates a publisher and subscriber client
2432
publisher = pubsub_v1.PublisherClient()
33+
subscriber = pubsub_v1.SubscriberClient()
34+
35+
# The `topic_path` method creates a fully qualified identifier
36+
# in the form `projects/{project_id}/topics/{topic_name}`
37+
topic_path = subscriber.topic_path(project_id, topic_name)
2538

26-
# The resource path for the new topic contains the project ID
27-
# and the topic name.
28-
topic_path = publisher.topic_path(
29-
'my-project', 'my-new-topic')
39+
# The `subscription_path` method creates a fully qualified identifier
40+
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
41+
subscription_path = subscriber.subscription_path(
42+
project_id, subscription_name)
3043

3144
# Create the topic.
3245
topic = publisher.create_topic(topic_path)
46+
print('\nTopic created: {}'.format(topic.name))
47+
48+
# Create a subscription.
49+
subscription = subscriber.create_subscription(
50+
subscription_path, topic_path)
51+
print('\nSubscription created: {}\n'.format(subscription.name))
52+
53+
publish_begin = time.time()
54+
55+
# Publish messages.
56+
for n in range(num_messages):
57+
data = u'Message number {}'.format(n)
58+
# Data must be a bytestring
59+
data = data.encode('utf-8')
60+
# When you publish a message, the client returns a future.
61+
future = publisher.publish(topic_path, data=data)
62+
print('Published {} of message ID {}.'.format(data, future.result()))
63+
64+
publish_time = time.time() - publish_begin
3365

34-
print('Topic created: {}'.format(topic))
35-
# [END pubsub_quickstart_create_topic]
66+
messages = set()
67+
68+
def callback(message):
69+
print('Received message: {}'.format(message))
70+
# Unacknowledged messages will be sent again.
71+
message.ack()
72+
messages.add(message)
73+
74+
subscribe_begin = time.time()
75+
76+
# Receive messages. The subscriber is nonblocking.
77+
subscriber.subscribe(subscription_path, callback=callback)
78+
79+
print('\nListening for messages on {}...\n'.format(subscription_path))
80+
81+
while True:
82+
if len(messages) == num_messages:
83+
subscribe_time = time.time() - subscribe_begin
84+
print("\nReceived all messages.")
85+
print("Publish time lapsed: {:.2f}s.".format(publish_time))
86+
print("Subscribe time lapsed: {:.2f}s.".format(subscribe_time))
87+
break
88+
else:
89+
# Sleeps the thread at 50Hz to save on resources.
90+
time.sleep(1. / 50)
91+
# [END pubsub_end_to_end]
3692

3793

3894
if __name__ == '__main__':
39-
run_quickstart()
95+
96+
parser = argparse.ArgumentParser(
97+
description=__doc__,
98+
formatter_class=argparse.RawDescriptionHelpFormatter
99+
)
100+
parser.add_argument('project_id', help='Your Google Cloud project ID')
101+
parser.add_argument('topic_name', help='Your topic name')
102+
parser.add_argument('subscription_name', help='Your subscription name')
103+
parser.add_argument('num_msgs', type=int, help='Number of test messages')
104+
105+
args = parser.parse_args()
106+
107+
end_to_end(args.project_id, args.topic_name, args.subscription_name,
108+
args.num_msgs)
+41-17Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
# Copyright 2016 Google Inc. All Rights Reserved.
1+
#!/usr/bin/env python
2+
3+
# Copyright 2018 Google Inc. All Rights Reserved.
24
#
35
# Licensed under the Apache License, Version 2.0 (the "License");
46
# you may not use this file except in compliance with the License.
@@ -15,33 +17,55 @@
1517
import os
1618

1719
from google.cloud import pubsub_v1
18-
import mock
1920
import pytest
20-
2121
import quickstart
2222

2323
PROJECT = os.environ['GCLOUD_PROJECT']
24-
# Must match the dataset listed in quickstart.py
25-
TOPIC_NAME = 'my-new-topic'
26-
TOPIC_PATH = 'projects/{}/topics/{}'.format(PROJECT, TOPIC_NAME)
24+
TOPIC = 'end-to-end-test-topic'
25+
SUBSCRIPTION = 'end-to-end-test-topic-sub'
26+
N = 10
27+
28+
29+
@pytest.fixture(scope='module')
30+
def publisher_client():
31+
yield pubsub_v1.PublisherClient()
2732

2833

29-
@pytest.fixture
30-
def temporary_topic():
31-
"""Fixture that ensures the test topic does not exist before the test."""
32-
publisher = pubsub_v1.PublisherClient()
34+
@pytest.fixture(scope='module')
35+
def topic(publisher_client):
36+
topic_path = publisher_client.topic_path(PROJECT, TOPIC)
3337

3438
try:
35-
publisher.delete_topic(TOPIC_PATH)
39+
publisher_client.delete_topic(topic_path)
3640
except Exception:
3741
pass
3842

39-
yield
43+
yield TOPIC
4044

4145

42-
@mock.patch.object(
43-
pubsub_v1.PublisherClient, 'topic_path', return_value=TOPIC_PATH)
44-
def test_quickstart(unused_topic_path, temporary_topic, capsys):
45-
quickstart.run_quickstart()
46+
@pytest.fixture(scope='module')
47+
def subscriber_client():
48+
yield pubsub_v1.SubscriberClient()
49+
50+
51+
@pytest.fixture(scope='module')
52+
def subscription(subscriber_client, topic):
53+
subscription_path = subscriber_client.subscription_path(
54+
PROJECT, SUBSCRIPTION)
55+
56+
try:
57+
subscriber_client.delete_subscription(subscription_path)
58+
except Exception:
59+
pass
60+
61+
yield SUBSCRIPTION
62+
63+
64+
def test_end_to_end(topic, subscription, capsys):
65+
66+
quickstart.end_to_end(PROJECT, topic, subscription, N)
4667
out, _ = capsys.readouterr()
47-
assert TOPIC_NAME in out
68+
69+
assert "Received all messages" in out
70+
assert "Publish time lapsed" in out
71+
assert "Subscribe time lapsed" in out

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.