Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d49312e

Browse filesBrowse files
Cloud Pub/Sub Quickstart V2 (GoogleCloudPlatform#2004)
* Quickstart V2 * Adopts Kir's suggestions * Adopted Tim's suggestions * proper resource deletion during teardown
1 parent 18f766a commit d49312e
Copy full SHA for d49312e

File tree

Expand file treeCollapse file tree

4 files changed

+311
-0
lines changed
Filter options
Expand file treeCollapse file tree

4 files changed

+311
-0
lines changed

‎pubsub/cloud-client/quickstart/pub.py

Copy file name to clipboard
+73Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2019 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
# [START pubsub_quickstart_pub_all]
18+
import argparse
19+
import time
20+
# [START pubsub_quickstart_pub_deps]
21+
from google.cloud import pubsub_v1
22+
# [END pubsub_quickstart_pub_deps]
23+
24+
25+
def get_callback(api_future, data):
26+
"""Wrap message data in the context of the callback function."""
27+
28+
def callback(api_future):
29+
try:
30+
print("Published message {} now has message ID {}".format(
31+
data, api_future.result()))
32+
except Exception:
33+
print("A problem occurred when publishing {}: {}\n".format(
34+
data, api_future.exception()))
35+
raise
36+
return callback
37+
38+
39+
def pub(project_id, topic_name):
40+
"""Publishes a message to a Pub/Sub topic."""
41+
# [START pubsub_quickstart_pub_client]
42+
# Initialize a Publisher client
43+
client = pubsub_v1.PublisherClient()
44+
# [END pubsub_quickstart_pub_client]
45+
# Create a fully qualified identifier in the form of
46+
# `projects/{project_id}/topics/{topic_name}`
47+
topic_path = client.topic_path(project_id, topic_name)
48+
49+
# Data sent to Cloud Pub/Sub must be a bytestring
50+
data = b"Hello, World!"
51+
52+
# When you publish a message, the client returns a future.
53+
api_future = client.publish(topic_path, data=data)
54+
api_future.add_done_callback(get_callback(api_future, data))
55+
56+
# Keep the main thread from exiting until background message
57+
# is processed.
58+
while api_future.running():
59+
time.sleep(0.1)
60+
61+
62+
if __name__ == '__main__':
63+
parser = argparse.ArgumentParser(
64+
description=__doc__,
65+
formatter_class=argparse.RawDescriptionHelpFormatter
66+
)
67+
parser.add_argument('project_id', help='Google Cloud project ID')
68+
parser.add_argument('topic_name', help='Pub/Sub topic name')
69+
70+
args = parser.parse_args()
71+
72+
pub(args.project_id, args.topic_name)
73+
# [END pubsub_quickstart_pub_all]
+61Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2019 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import os
18+
import pytest
19+
20+
from google.api_core.exceptions import AlreadyExists
21+
from google.cloud import pubsub_v1
22+
23+
import pub
24+
25+
PROJECT = os.environ['GCLOUD_PROJECT']
26+
TOPIC = 'quickstart-pub-test-topic'
27+
28+
29+
@pytest.fixture(scope='module')
30+
def publisher_client():
31+
yield pubsub_v1.PublisherClient()
32+
33+
34+
@pytest.fixture(scope='module')
35+
def topic(publisher_client):
36+
topic_path = publisher_client.topic_path(PROJECT, TOPIC)
37+
38+
try:
39+
publisher_client.create_topic(topic_path)
40+
except AlreadyExists:
41+
pass
42+
43+
yield TOPIC
44+
45+
46+
@pytest.fixture
47+
def to_delete(publisher_client):
48+
doomed = []
49+
yield doomed
50+
for item in doomed:
51+
publisher_client.delete_topic(item)
52+
53+
54+
def test_pub(publisher_client, topic, to_delete, capsys):
55+
pub.pub(PROJECT, topic)
56+
57+
to_delete.append('projects/{}/topics/{}'.format(PROJECT, TOPIC))
58+
59+
out, _ = capsys.readouterr()
60+
61+
assert "Published message b'Hello, World!'" in out

‎pubsub/cloud-client/quickstart/sub.py

Copy file name to clipboard
+64Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2019 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
# [START pubsub_quickstart_sub_all]
18+
import argparse
19+
import time
20+
# [START pubsub_quickstart_sub_deps]
21+
from google.cloud import pubsub_v1
22+
# [END pubsub_quickstart_sub_deps]
23+
24+
25+
def sub(project_id, subscription_name):
26+
"""Receives messages from a Pub/Sub subscription."""
27+
# [START pubsub_quickstart_sub_client]
28+
# Initialize a Subscriber client
29+
client = pubsub_v1.SubscriberClient()
30+
# [END pubsub_quickstart_sub_client]
31+
# Create a fully qualified identifier in the form of
32+
# `projects/{project_id}/subscriptions/{subscription_name}`
33+
subscription_path = client.subscription_path(
34+
project_id, subscription_name)
35+
36+
def callback(message):
37+
print('Received message {} of message ID {}'.format(
38+
message, message.message_id))
39+
# Acknowledge the message. Unack'ed messages will be redelivered.
40+
message.ack()
41+
print('Acknowledged message of message ID {}\n'.format(
42+
message.message_id))
43+
44+
client.subscribe(subscription_path, callback=callback)
45+
print('Listening for messages on {}..\n'.format(subscription_path))
46+
47+
# Keep the main thread from exiting so the subscriber can
48+
# process messages in the background.
49+
while True:
50+
time.sleep(60)
51+
52+
53+
if __name__ == '__main__':
54+
parser = argparse.ArgumentParser(
55+
description=__doc__,
56+
formatter_class=argparse.RawDescriptionHelpFormatter
57+
)
58+
parser.add_argument('project_id', help='Google Cloud project ID')
59+
parser.add_argument('subscription_name', help='Pub/Sub subscription name')
60+
61+
args = parser.parse_args()
62+
63+
sub(args.project_id, args.subscription_name)
64+
# [END pubsub_quickstart_sub_all]
+113Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2019 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import mock
18+
import os
19+
import pytest
20+
import time
21+
22+
from google.api_core.exceptions import AlreadyExists
23+
from google.cloud import pubsub_v1
24+
25+
import sub
26+
27+
28+
PROJECT = os.environ['GCLOUD_PROJECT']
29+
TOPIC = 'quickstart-sub-test-topic'
30+
SUBSCRIPTION = 'quickstart-sub-test-topic-sub'
31+
32+
33+
@pytest.fixture(scope='module')
34+
def publisher_client():
35+
yield pubsub_v1.PublisherClient()
36+
37+
38+
@pytest.fixture(scope='module')
39+
def topic_path(publisher_client):
40+
topic_path = publisher_client.topic_path(PROJECT, TOPIC)
41+
42+
try:
43+
publisher_client.create_topic(topic_path)
44+
except AlreadyExists:
45+
pass
46+
47+
yield topic_path
48+
49+
50+
@pytest.fixture(scope='module')
51+
def subscriber_client():
52+
yield pubsub_v1.SubscriberClient()
53+
54+
55+
@pytest.fixture(scope='module')
56+
def subscription(subscriber_client, topic_path):
57+
subscription_path = subscriber_client.subscription_path(
58+
PROJECT, SUBSCRIPTION)
59+
60+
try:
61+
subscriber_client.create_subscription(subscription_path, topic_path)
62+
except AlreadyExists:
63+
pass
64+
65+
yield SUBSCRIPTION
66+
67+
68+
@pytest.fixture
69+
def to_delete(publisher_client, subscriber_client):
70+
doomed = []
71+
yield doomed
72+
for client, item in doomed:
73+
if 'topics' in item:
74+
publisher_client.delete_topic(item)
75+
if 'subscriptions' in item:
76+
subscriber_client.delete_subscription(item)
77+
78+
79+
def _make_sleep_patch():
80+
real_sleep = time.sleep
81+
82+
def new_sleep(period):
83+
if period == 60:
84+
real_sleep(10)
85+
raise RuntimeError('sigil')
86+
else:
87+
real_sleep(period)
88+
89+
return mock.patch('time.sleep', new=new_sleep)
90+
91+
92+
def test_sub(publisher_client,
93+
topic_path,
94+
subscriber_client,
95+
subscription,
96+
to_delete,
97+
capsys):
98+
99+
publisher_client.publish(topic_path, data=b'Hello, World!')
100+
101+
to_delete.append((publisher_client, topic_path))
102+
103+
with _make_sleep_patch():
104+
with pytest.raises(RuntimeError, match='sigil'):
105+
sub.sub(PROJECT, subscription)
106+
107+
to_delete.append((subscriber_client,
108+
'projects/{}/subscriptions/{}'.format(PROJECT,
109+
SUBSCRIPTION)))
110+
111+
out, _ = capsys.readouterr()
112+
assert "Received message" in out
113+
assert "Acknowledged message" in out

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.