Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 8179bda

Browse filesBrowse files
anguillanneufchenyumic
authored andcommitted
Modified publisher with error handling (GoogleCloudPlatform#1568)
1 parent 7c53d7c commit 8179bda
Copy full SHA for 8179bda

File tree

Expand file treeCollapse file tree

3 files changed

+25
-18
lines changed
Filter options
Expand file treeCollapse file tree

3 files changed

+25
-18
lines changed

‎pubsub/cloud-client/publisher.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/publisher.py
+18-13Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
"""
2323

2424
import argparse
25-
import concurrent.futures
25+
import time
2626

2727
from google.cloud import pubsub_v1
2828

@@ -130,29 +130,27 @@ def publish_messages_with_error_handler(project, topic_name):
130130
publisher = pubsub_v1.PublisherClient()
131131
topic_path = publisher.topic_path(project, topic_name)
132132

133-
# When you publish a message, the client returns a Future. This Future
134-
# can be used to track if an error has occurred.
135-
futures = []
136-
137-
def callback(f):
138-
exc = f.exception()
139-
if exc:
133+
def callback(message_future):
134+
if message_future.exception():
140135
print('Publishing message on {} threw an Exception {}.'.format(
141-
topic_name, exc))
136+
topic_name, message_future.exception()))
137+
else:
138+
print(message_future.result())
142139

143140
for n in range(1, 10):
144141
data = u'Message number {}'.format(n)
145142
# Data must be a bytestring
146143
data = data.encode('utf-8')
144+
# When you publish a message, the client returns a Future.
147145
message_future = publisher.publish(topic_path, data=data)
148146
message_future.add_done_callback(callback)
149-
futures.append(message_future)
147+
148+
print('Published message IDs:')
150149

151150
# We must keep the main thread from exiting to allow it to process
152151
# messages in the background.
153-
concurrent.futures.wait(futures)
154-
155-
print('Published messages.')
152+
while True:
153+
time.sleep(60)
156154
# [END pubsub_publish_messages_error_handler]
157155

158156

@@ -208,6 +206,11 @@ def publish_messages_with_batch_settings(project, topic_name):
208206
help=publish_messages_with_futures.__doc__)
209207
publish_with_futures_parser.add_argument('topic_name')
210208

209+
publish_with_error_handler_parser = subparsers.add_parser(
210+
'publish-with-error-handler',
211+
help=publish_messages_with_error_handler.__doc__)
212+
publish_with_error_handler_parser.add_argument('topic_name')
213+
211214
publish_with_batch_settings_parser = subparsers.add_parser(
212215
'publish-with-batch-settings',
213216
help=publish_messages_with_batch_settings.__doc__)
@@ -227,5 +230,7 @@ def publish_messages_with_batch_settings(project, topic_name):
227230
publish_messages_with_custom_attributes(args.project, args.topic_name)
228231
elif args.command == 'publish-with-futures':
229232
publish_messages_with_futures(args.project, args.topic_name)
233+
elif args.command == 'publish-with-error-handler':
234+
publish_messages_with_error_handler(args.project, args.topic_name)
230235
elif args.command == 'publish-with-batch-settings':
231236
publish_messages_with_batch_settings(args.project, args.topic_name)

‎pubsub/cloud-client/publisher_test.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/publisher_test.py
+7Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ def test_publish_with_batch_settings(topic, capsys):
9595
assert 'Published' in out
9696

9797

98+
def test_publish_with_error_handler(topic, capsys):
99+
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)
100+
101+
out, _ = capsys.readouterr()
102+
assert 'Published' in out
103+
104+
98105
def test_publish_with_futures(topic, capsys):
99106
publisher.publish_messages_with_futures(PROJECT, TOPIC)
100107

‎pubsub/cloud-client/requirements.txt

Copy file name to clipboard
-5Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1 @@
1-
<<<<<<< HEAD
21
google-cloud-pubsub==0.33.0
3-
=======
4-
google-cloud-pubsub==0.32.1
5-
futures==3.1.1; python_version < '3'
6-
>>>>>>> master

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.