Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2d2820c

Browse filesBrowse files
Resolve all futures (GoogleCloudPlatform#2231)
1 parent b32f7df commit 2d2820c
Copy full SHA for 2d2820c

File tree

Expand file treeCollapse file tree

3 files changed

+42
-42
lines changed
Filter options
Expand file treeCollapse file tree

3 files changed

+42
-42
lines changed

‎pubsub/cloud-client/publisher.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/publisher.py
+37-35Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def publish_messages(project_id, topic_name):
9595
data = data.encode('utf-8')
9696
# When you publish a message, the client returns a future.
9797
future = publisher.publish(topic_path, data=data)
98-
print('Published {} of message ID {}.'.format(data, future.result()))
98+
print(future.result())
9999

100100
print('Published messages.')
101101
# [END pubsub_quickstart_publisher]
@@ -119,8 +119,9 @@ def publish_messages_with_custom_attributes(project_id, topic_name):
119119
# Data must be a bytestring
120120
data = data.encode('utf-8')
121121
# Add two attributes, origin and username, to the message
122-
publisher.publish(
122+
future = publisher.publish(
123123
topic_path, data, origin='python-sample', username='gcp')
124+
print(future.result())
124125

125126
print('Published messages with custom attributes.')
126127
# [END pubsub_publish_custom_attributes]
@@ -138,21 +139,15 @@ def publish_messages_with_futures(project_id, topic_name):
138139
publisher = pubsub_v1.PublisherClient()
139140
topic_path = publisher.topic_path(project_id, topic_name)
140141

141-
# When you publish a message, the client returns a Future. This Future
142-
# can be used to track when the message is published.
143-
futures = []
144-
145142
for n in range(1, 10):
146143
data = u'Message number {}'.format(n)
147144
# Data must be a bytestring
148145
data = data.encode('utf-8')
149-
message_future = publisher.publish(topic_path, data=data)
150-
futures.append(message_future)
151-
152-
print('Published message IDs:')
153-
for future in futures:
154-
# result() blocks until the message is published.
146+
# When you publish a message, the client returns a future.
147+
future = publisher.publish(topic_path, data=data)
155148
print(future.result())
149+
150+
print("Published messages with futures.")
156151
# [END pubsub_publisher_concurrency_control]
157152

158153

@@ -169,28 +164,34 @@ def publish_messages_with_error_handler(project_id, topic_name):
169164
publisher = pubsub_v1.PublisherClient()
170165
topic_path = publisher.topic_path(project_id, topic_name)
171166

172-
def callback(message_future):
173-
if message_future.exception():
174-
print('{} needs handling.'.format(message_future.exception()))
175-
else:
176-
print(message_future.result())
167+
futures = dict()
177168

178-
for n in range(1, 10):
179-
data = u'Message number {}'.format(n)
180-
# Data must be a bytestring
181-
data = data.encode('utf-8')
182-
# When you publish a message, the client returns a Future.
183-
message_future = publisher.publish(topic_path, data=data)
184-
# If you wish to handle publish failures, do it in the callback.
185-
# Otherwise, it's okay to call `message_future.result()` directly.
186-
message_future.add_done_callback(callback)
187-
188-
print('Published message IDs:')
189-
190-
# We keep the main thread from exiting so message futures can be
191-
# resolved in the background.
192-
while True:
193-
time.sleep(60)
169+
def get_callback(f, data):
170+
def callback(f):
171+
try:
172+
print(f.result())
173+
futures.pop(data)
174+
except: # noqa
175+
print("Please handle {} for {}.".format(f.exception(), data))
176+
return callback
177+
178+
for i in range(10):
179+
data = str(i)
180+
futures.update({data: None})
181+
# When you publish a message, the client returns a future.
182+
future = publisher.publish(
183+
topic_path,
184+
data=data.encode("utf-8"), # data must be a bytestring.
185+
)
186+
futures[data] = future
187+
# Publish failures shall be handled in the callback function.
188+
future.add_done_callback(get_callback(future, data))
189+
190+
# Wait for all the publish futures to resolve before exiting.
191+
while futures:
192+
time.sleep(5)
193+
194+
print("Published message with error handler.")
194195
# [END pubsub_publish_messages_error_handler]
195196

196197

@@ -215,9 +216,10 @@ def publish_messages_with_batch_settings(project_id, topic_name):
215216
data = u'Message number {}'.format(n)
216217
# Data must be a bytestring
217218
data = data.encode('utf-8')
218-
publisher.publish(topic_path, data=data)
219+
future = publisher.publish(topic_path, data=data)
220+
print(future.result())
219221

220-
print('Published messages.')
222+
print('Published messages with batch settings.')
221223
# [END pubsub_publisher_batch_settings]
222224

223225

‎pubsub/cloud-client/publisher_test.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/publisher_test.py
+1-5Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,7 @@ def test_publish_with_batch_settings(topic, capsys):
111111

112112

113113
def test_publish_with_error_handler(topic, capsys):
114-
115-
with _make_sleep_patch():
116-
with pytest.raises(RuntimeError, match='sigil'):
117-
publisher.publish_messages_with_error_handler(
118-
PROJECT, TOPIC)
114+
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)
119115

120116
out, _ = capsys.readouterr()
121117
assert 'Published' in out

‎pubsub/cloud-client/subscriber_test.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/subscriber_test.py
+4-2Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,15 @@ def test_update(subscriber_client, subscription, capsys):
169169
def _publish_messages(publisher_client, topic):
170170
for n in range(5):
171171
data = u'Message {}'.format(n).encode('utf-8')
172-
publisher_client.publish(
172+
future = publisher_client.publish(
173173
topic, data=data)
174+
future.result()
174175

175176

176177
def _publish_messages_with_custom_attributes(publisher_client, topic):
177178
data = u'Test message'.encode('utf-8')
178-
publisher_client.publish(topic, data=data, origin='python-sample')
179+
future = publisher_client.publish(topic, data=data, origin='python-sample')
180+
future.result()
179181

180182

181183
def _make_sleep_patch():

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.