Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d42f54c

Browse filesBrowse files
anguillanneufbusunkim96
authored andcommitted
Pub/Sub: remove infinite while loops in subscriber examples (GoogleCloudPlatform#2604)
* use result() on streaming pull futures instead of infinite while * remove unused imports
1 parent 2329466 commit d42f54c
Copy full SHA for d42f54c

File tree

Expand file treeCollapse file tree

3 files changed

+179
-174
lines changed
Filter options
Expand file treeCollapse file tree

3 files changed

+179
-174
lines changed

‎pubsub/cloud-client/publisher_test.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/publisher_test.py
+41-34Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525

2626
UUID = uuid.uuid4().hex
2727
PROJECT = os.environ["GCLOUD_PROJECT"]
28-
TOPIC = "publisher-test-topic-" + UUID
28+
TOPIC_ADMIN = "publisher-test-topic-admin-" + UUID
29+
TOPIC_PUBLISH = "publisher-test-topic-publish-" + UUID
2930

3031

3132
@pytest.fixture
@@ -34,15 +35,30 @@ def client():
3435

3536

3637
@pytest.fixture
37-
def topic(client):
38-
topic_path = client.topic_path(PROJECT, TOPIC)
38+
def topic_admin(client):
39+
topic_path = client.topic_path(PROJECT, TOPIC_ADMIN)
3940

4041
try:
41-
response = client.get_topic(topic_path)
42+
topic = client.get_topic(topic_path)
4243
except: # noqa
43-
response = client.create_topic(topic_path)
44+
topic = client.create_topic(topic_path)
4445

45-
yield response.name
46+
yield topic.name
47+
# Teardown of `topic_admin` is handled in `test_delete()`.
48+
49+
50+
@pytest.fixture
51+
def topic_publish(client):
52+
topic_path = client.topic_path(PROJECT, TOPIC_PUBLISH)
53+
54+
try:
55+
topic = client.get_topic(topic_path)
56+
except: # noqa
57+
topic = client.create_topic(topic_path)
58+
59+
yield topic.name
60+
61+
client.delete_topic(topic.name)
4662

4763

4864
def _make_sleep_patch():
@@ -58,83 +74,74 @@ def new_sleep(period):
5874
return mock.patch("time.sleep", new=new_sleep)
5975

6076

61-
def _to_delete():
62-
publisher_client = pubsub_v1.PublisherClient()
63-
publisher_client.delete_topic(
64-
"projects/{}/topics/{}".format(PROJECT, TOPIC)
65-
)
66-
67-
68-
def test_list(client, topic, capsys):
77+
def test_list(client, topic_admin, capsys):
6978
@eventually_consistent.call
7079
def _():
7180
publisher.list_topics(PROJECT)
7281
out, _ = capsys.readouterr()
73-
assert topic in out
82+
assert topic_admin in out
7483

7584

7685
def test_create(client):
77-
topic_path = client.topic_path(PROJECT, TOPIC)
86+
topic_path = client.topic_path(PROJECT, TOPIC_ADMIN)
7887
try:
7988
client.delete_topic(topic_path)
8089
except Exception:
8190
pass
8291

83-
publisher.create_topic(PROJECT, TOPIC)
92+
publisher.create_topic(PROJECT, TOPIC_ADMIN)
8493

8594
@eventually_consistent.call
8695
def _():
8796
assert client.get_topic(topic_path)
8897

8998

90-
def test_delete(client, topic):
91-
publisher.delete_topic(PROJECT, TOPIC)
99+
def test_delete(client, topic_admin):
100+
publisher.delete_topic(PROJECT, TOPIC_ADMIN)
92101

93102
@eventually_consistent.call
94103
def _():
95104
with pytest.raises(Exception):
96-
client.get_topic(client.topic_path(PROJECT, TOPIC))
105+
client.get_topic(client.topic_path(PROJECT, TOPIC_ADMIN))
97106

98107

99-
def test_publish(topic, capsys):
100-
publisher.publish_messages(PROJECT, TOPIC)
108+
def test_publish(topic_publish, capsys):
109+
publisher.publish_messages(PROJECT, TOPIC_PUBLISH)
101110

102111
out, _ = capsys.readouterr()
103112
assert "Published" in out
104113

105114

106-
def test_publish_with_custom_attributes(topic, capsys):
107-
publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC)
115+
def test_publish_with_custom_attributes(topic_publish, capsys):
116+
publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_PUBLISH)
108117

109118
out, _ = capsys.readouterr()
110119
assert "Published" in out
111120

112121

113-
def test_publish_with_batch_settings(topic, capsys):
114-
publisher.publish_messages_with_batch_settings(PROJECT, TOPIC)
122+
def test_publish_with_batch_settings(topic_publish, capsys):
123+
publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_PUBLISH)
115124

116125
out, _ = capsys.readouterr()
117126
assert "Published" in out
118127

119128

120-
def test_publish_with_retry_settings(topic, capsys):
121-
publisher.publish_messages_with_retry_settings(PROJECT, TOPIC)
129+
def test_publish_with_retry_settings(topic_publish, capsys):
130+
publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_PUBLISH)
122131

123132
out, _ = capsys.readouterr()
124133
assert "Published" in out
125134

126135

127-
def test_publish_with_error_handler(topic, capsys):
128-
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)
136+
def test_publish_with_error_handler(topic_publish, capsys):
137+
publisher.publish_messages_with_error_handler(PROJECT, TOPIC_PUBLISH)
129138

130139
out, _ = capsys.readouterr()
131140
assert "Published" in out
132141

133142

134-
def test_publish_with_futures(topic, capsys):
135-
publisher.publish_messages_with_futures(PROJECT, TOPIC)
143+
def test_publish_with_futures(topic_publish, capsys):
144+
publisher.publish_messages_with_futures(PROJECT, TOPIC_PUBLISH)
136145

137146
out, _ = capsys.readouterr()
138147
assert "Published" in out
139-
140-
_to_delete()

‎pubsub/cloud-client/subscriber.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/subscriber.py
+60-38Lines changed: 60 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -161,16 +161,16 @@ def update_subscription(project_id, subscription_name, endpoint):
161161
# [END pubsub_update_push_configuration]
162162

163163

164-
def receive_messages(project_id, subscription_name):
164+
def receive_messages(project_id, subscription_name, timeout=None):
165165
"""Receives messages from a pull subscription."""
166166
# [START pubsub_subscriber_async_pull]
167167
# [START pubsub_quickstart_subscriber]
168-
import time
169-
170168
from google.cloud import pubsub_v1
171169

172170
# TODO project_id = "Your Google Cloud Project ID"
173171
# TODO subscription_name = "Your Pub/Sub subscription name"
172+
# TODO timeout = 5.0 # "How long the subscriber should listen for
173+
# messages in seconds"
174174

175175
subscriber = pubsub_v1.SubscriberClient()
176176
# The `subscription_path` method creates a fully qualified identifier
@@ -183,27 +183,33 @@ def callback(message):
183183
print("Received message: {}".format(message))
184184
message.ack()
185185

186-
subscriber.subscribe(subscription_path, callback=callback)
186+
streaming_pull_future = subscriber.subscribe(
187+
subscription_path, callback=callback
188+
)
189+
print("Listening for messages on {}..\n".format(subscription_path))
187190

188-
# The subscriber is non-blocking. We must keep the main thread from
189-
# exiting to allow it to process messages asynchronously in the background.
190-
print("Listening for messages on {}".format(subscription_path))
191-
while True:
192-
time.sleep(60)
191+
# result() in a future will block indefinitely if `timeout` is not set,
192+
# unless an exception is encountered first.
193+
try:
194+
streaming_pull_future.result(timeout=timeout)
195+
except: # noqa
196+
streaming_pull_future.cancel()
193197
# [END pubsub_subscriber_async_pull]
194198
# [END pubsub_quickstart_subscriber]
195199

196200

197-
def receive_messages_with_custom_attributes(project_id, subscription_name):
201+
def receive_messages_with_custom_attributes(
202+
project_id, subscription_name, timeout=None
203+
):
198204
"""Receives messages from a pull subscription."""
199205
# [START pubsub_subscriber_sync_pull_custom_attributes]
200206
# [START pubsub_subscriber_async_pull_custom_attributes]
201-
import time
202-
203207
from google.cloud import pubsub_v1
204208

205209
# TODO project_id = "Your Google Cloud Project ID"
206210
# TODO subscription_name = "Your Pub/Sub subscription name"
211+
# TODO timeout = 5.0 # "How long the subscriber should listen for
212+
# messages in seconds"
207213

208214
subscriber = pubsub_v1.SubscriberClient()
209215
subscription_path = subscriber.subscription_path(
@@ -219,26 +225,32 @@ def callback(message):
219225
print("{}: {}".format(key, value))
220226
message.ack()
221227

222-
subscriber.subscribe(subscription_path, callback=callback)
228+
streaming_pull_future = subscriber.subscribe(
229+
subscription_path, callback=callback
230+
)
231+
print("Listening for messages on {}..\n".format(subscription_path))
223232

224-
# The subscriber is non-blocking, so we must keep the main thread from
225-
# exiting to allow it to process messages in the background.
226-
print("Listening for messages on {}".format(subscription_path))
227-
while True:
228-
time.sleep(60)
233+
# result() in a future will block indefinitely if `timeout` is not set,
234+
# unless an exception is encountered first.
235+
try:
236+
streaming_pull_future.result(timeout=timeout)
237+
except: # noqa
238+
streaming_pull_future.cancel()
229239
# [END pubsub_subscriber_async_pull_custom_attributes]
230240
# [END pubsub_subscriber_sync_pull_custom_attributes]
231241

232242

233-
def receive_messages_with_flow_control(project_id, subscription_name):
243+
def receive_messages_with_flow_control(
244+
project_id, subscription_name, timeout=None
245+
):
234246
"""Receives messages from a pull subscription with flow control."""
235247
# [START pubsub_subscriber_flow_settings]
236-
import time
237-
238248
from google.cloud import pubsub_v1
239249

240250
# TODO project_id = "Your Google Cloud Project ID"
241251
# TODO subscription_name = "Your Pub/Sub subscription name"
252+
# TODO timeout = 5.0 # "How long the subscriber should listen for
253+
# messages in seconds"
242254

243255
subscriber = pubsub_v1.SubscriberClient()
244256
subscription_path = subscriber.subscription_path(
@@ -251,15 +263,18 @@ def callback(message):
251263

252264
# Limit the subscriber to only have ten outstanding messages at a time.
253265
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
254-
subscriber.subscribe(
266+
267+
streaming_pull_future = subscriber.subscribe(
255268
subscription_path, callback=callback, flow_control=flow_control
256269
)
270+
print("Listening for messages on {}..\n".format(subscription_path))
257271

258-
# The subscriber is non-blocking, so we must keep the main thread from
259-
# exiting to allow it to process messages in the background.
260-
print("Listening for messages on {}".format(subscription_path))
261-
while True:
262-
time.sleep(60)
272+
# result() in a future will block indefinitely if `timeout` is not set,
273+
# unless an exception is encountered first.
274+
try:
275+
streaming_pull_future.result(timeout=timeout)
276+
except: # noqa
277+
streaming_pull_future.cancel()
263278
# [END pubsub_subscriber_flow_settings]
264279

265280

@@ -386,13 +401,15 @@ def worker(msg):
386401
# [END pubsub_subscriber_sync_pull_with_lease]
387402

388403

389-
def listen_for_errors(project_id, subscription_name):
404+
def listen_for_errors(project_id, subscription_name, timeout=None):
390405
"""Receives messages and catches errors from a pull subscription."""
391406
# [START pubsub_subscriber_error_listener]
392407
from google.cloud import pubsub_v1
393408

394409
# TODO project_id = "Your Google Cloud Project ID"
395410
# TODO subscription_name = "Your Pubsub subscription name"
411+
# TODO timeout = 5.0 # "How long the subscriber should listen for
412+
# messages in seconds"
396413

397414
subscriber = pubsub_v1.SubscriberClient()
398415
subscription_path = subscriber.subscription_path(
@@ -403,16 +420,19 @@ def callback(message):
403420
print("Received message: {}".format(message))
404421
message.ack()
405422

406-
future = subscriber.subscribe(subscription_path, callback=callback)
423+
streaming_pull_future = subscriber.subscribe(
424+
subscription_path, callback=callback
425+
)
426+
print("Listening for messages on {}..\n".format(subscription_path))
407427

408-
# Blocks the thread while messages are coming in through the stream. Any
409-
# exceptions that crop up on the thread will be set on the future.
428+
# result() in a future will block indefinitely if `timeout` is not set,
429+
# unless an exception is encountered first.
410430
try:
411-
# When timeout is unspecified, the result method waits indefinitely.
412-
future.result(timeout=30)
431+
streaming_pull_future.result(timeout=timeout)
413432
except Exception as e:
433+
streaming_pull_future.cancel()
414434
print(
415-
"Listening for messages on {} threw an Exception: {}.".format(
435+
"Listening for messages on {} threw an exception: {}.".format(
416436
subscription_name, e
417437
)
418438
)
@@ -518,14 +538,14 @@ def callback(message):
518538
args.project_id, args.subscription_name, args.endpoint
519539
)
520540
elif args.command == "receive":
521-
receive_messages(args.project_id, args.subscription_name)
541+
receive_messages(args.project_id, args.subscription_name, args.timeout)
522542
elif args.command == "receive-custom-attributes":
523543
receive_messages_with_custom_attributes(
524-
args.project_id, args.subscription_name
544+
args.project_id, args.subscription_name, args.timeout
525545
)
526546
elif args.command == "receive-flow-control":
527547
receive_messages_with_flow_control(
528-
args.project_id, args.subscription_name
548+
args.project_id, args.subscription_name, args.timeout
529549
)
530550
elif args.command == "receive-synchronously":
531551
synchronous_pull(args.project_id, args.subscription_name)
@@ -534,4 +554,6 @@ def callback(message):
534554
args.project_id, args.subscription_name
535555
)
536556
elif args.command == "listen_for_errors":
537-
listen_for_errors(args.project_id, args.subscription_name)
557+
listen_for_errors(
558+
args.project_id, args.subscription_name, args.timeout
559+
)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.