Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit fbf6f4a

Browse filesBrowse files
Pub/Sub: update how subscriber client listens to StreamingPullFuture (GoogleCloudPlatform#2475)
* update sub.py & requirements.txt * fix flaky subscriber test with separate subscriptions
1 parent 14995eb commit fbf6f4a
Copy full SHA for fbf6f4a

File tree

Expand file treeCollapse file tree

9 files changed

+148
-164
lines changed
Filter options
Expand file treeCollapse file tree

9 files changed

+148
-164
lines changed

‎pubsub/cloud-client/iam_test.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/iam_test.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019 Google Inc. All Rights Reserved.
1+
# Copyright 2016 Google Inc. All Rights Reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.

‎pubsub/cloud-client/publisher.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/publisher.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22

3-
# Copyright 2019 Google LLC. All Rights Reserved.
3+
# Copyright 2016 Google LLC. All Rights Reserved.
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.

‎pubsub/cloud-client/publisher_test.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/publisher_test.py
+5-7Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019 Google Inc. All Rights Reserved.
1+
# Copyright 2016 Google Inc. All Rights Reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -36,13 +36,11 @@ def topic(client):
3636
topic_path = client.topic_path(PROJECT, TOPIC)
3737

3838
try:
39-
client.delete_topic(topic_path)
40-
except Exception:
41-
pass
42-
43-
client.create_topic(topic_path)
39+
response = client.get_topic(topic_path)
40+
except: # noqa
41+
response = client.create_topic(topic_path)
4442

45-
yield topic_path
43+
yield response.name
4644

4745

4846
def _make_sleep_patch():

‎pubsub/cloud-client/quickstart.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/quickstart.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22

3-
# Copyright 2019 Google Inc. All Rights Reserved.
3+
# Copyright 2016 Google Inc. All Rights Reserved.
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.

‎pubsub/cloud-client/quickstart/sub.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/quickstart/sub.py
+10-9Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
# [START pubsub_quickstart_sub_all]
1818
import argparse
19-
import time
2019
# [START pubsub_quickstart_sub_deps]
2120
from google.cloud import pubsub_v1
2221
# [END pubsub_quickstart_sub_deps]
@@ -34,20 +33,22 @@ def sub(project_id, subscription_name):
3433
project_id, subscription_name)
3534

3635
def callback(message):
37-
print('Received message {} of message ID {}'.format(
36+
print('Received message {} of message ID {}\n'.format(
3837
message, message.message_id))
3938
# Acknowledge the message. Unack'ed messages will be redelivered.
4039
message.ack()
41-
print('Acknowledged message of message ID {}\n'.format(
42-
message.message_id))
40+
print('Acknowledged message {}\n'.format(message.message_id))
4341

44-
client.subscribe(subscription_path, callback=callback)
42+
streaming_pull_future = client.subscribe(
43+
subscription_path, callback=callback)
4544
print('Listening for messages on {}..\n'.format(subscription_path))
4645

47-
# Keep the main thread from exiting so the subscriber can
48-
# process messages in the background.
49-
while True:
50-
time.sleep(60)
46+
# Calling result() on StreamingPullFuture keeps the main thread from
47+
# exiting while messages get processed in the callbacks.
48+
try:
49+
streaming_pull_future.result()
50+
except: # noqa
51+
streaming_pull_future.cancel()
5152

5253

5354
if __name__ == '__main__':

‎pubsub/cloud-client/quickstart/sub_test.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/quickstart/sub_test.py
+42-49Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
import mock
1817
import os
1918
import pytest
20-
import time
2119

2220
from google.api_core.exceptions import AlreadyExists
2321
from google.cloud import pubsub_v1
@@ -29,84 +27,79 @@
2927
TOPIC = 'quickstart-sub-test-topic'
3028
SUBSCRIPTION = 'quickstart-sub-test-topic-sub'
3129

32-
33-
@pytest.fixture(scope='module')
34-
def publisher_client():
35-
yield pubsub_v1.PublisherClient()
30+
publisher_client = pubsub_v1.PublisherClient()
31+
subscriber_client = pubsub_v1.SubscriberClient()
3632

3733

3834
@pytest.fixture(scope='module')
39-
def topic_path(publisher_client):
35+
def topic_path():
4036
topic_path = publisher_client.topic_path(PROJECT, TOPIC)
4137

4238
try:
43-
publisher_client.create_topic(topic_path)
39+
topic = publisher_client.create_topic(topic_path)
40+
return topic.name
4441
except AlreadyExists:
45-
pass
46-
47-
yield topic_path
48-
49-
50-
@pytest.fixture(scope='module')
51-
def subscriber_client():
52-
yield pubsub_v1.SubscriberClient()
42+
return topic_path
5343

5444

5545
@pytest.fixture(scope='module')
56-
def subscription(subscriber_client, topic_path):
46+
def subscription_path(topic_path):
5747
subscription_path = subscriber_client.subscription_path(
5848
PROJECT, SUBSCRIPTION)
5949

6050
try:
61-
subscriber_client.create_subscription(subscription_path, topic_path)
51+
subscription = subscriber_client.create_subscription(
52+
subscription_path, topic_path)
53+
return subscription.name
6254
except AlreadyExists:
63-
pass
64-
65-
yield SUBSCRIPTION
55+
return subscription_path
6656

6757

68-
@pytest.fixture
69-
def to_delete(publisher_client, subscriber_client):
70-
doomed = []
71-
yield doomed
72-
for client, item in doomed:
58+
def _to_delete(resource_paths):
59+
for item in resource_paths:
7360
if 'topics' in item:
7461
publisher_client.delete_topic(item)
7562
if 'subscriptions' in item:
7663
subscriber_client.delete_subscription(item)
7764

7865

79-
def _make_sleep_patch():
80-
real_sleep = time.sleep
66+
def _publish_messages(topic_path):
67+
publish_future = publisher_client.publish(topic_path, data=b'Hello World!')
68+
publish_future.result()
69+
8170

82-
def new_sleep(period):
83-
if period == 60:
84-
real_sleep(10)
85-
raise RuntimeError('sigil')
86-
else:
87-
real_sleep(period)
71+
def _sub_timeout(project_id, subscription_name):
72+
# This is an exactly copy of `sub.py` except
73+
# StreamingPullFuture.result() will time out after 10s.
74+
client = pubsub_v1.SubscriberClient()
75+
subscription_path = client.subscription_path(
76+
project_id, subscription_name)
8877

89-
return mock.patch('time.sleep', new=new_sleep)
78+
def callback(message):
79+
print('Received message {} of message ID {}\n'.format(
80+
message, message.message_id))
81+
message.ack()
82+
print('Acknowledged message {}\n'.format(message.message_id))
9083

84+
streaming_pull_future = client.subscribe(
85+
subscription_path, callback=callback)
86+
print('Listening for messages on {}..\n'.format(subscription_path))
87+
88+
try:
89+
streaming_pull_future.result(timeout=10)
90+
except: # noqa
91+
streaming_pull_future.cancel()
9192

92-
def test_sub(publisher_client,
93-
topic_path,
94-
subscriber_client,
95-
subscription,
96-
to_delete,
97-
capsys):
9893

99-
publisher_client.publish(topic_path, data=b'Hello, World!')
94+
def test_sub(monkeypatch, topic_path, subscription_path, capsys):
95+
monkeypatch.setattr(sub, 'sub', _sub_timeout)
10096

101-
to_delete.append((publisher_client, topic_path))
97+
_publish_messages(topic_path)
10298

103-
with _make_sleep_patch():
104-
with pytest.raises(RuntimeError, match='sigil'):
105-
sub.sub(PROJECT, subscription)
99+
sub.sub(PROJECT, SUBSCRIPTION)
106100

107-
to_delete.append((subscriber_client,
108-
'projects/{}/subscriptions/{}'.format(PROJECT,
109-
SUBSCRIPTION)))
101+
# Clean up resources.
102+
_to_delete([topic_path, subscription_path])
110103

111104
out, _ = capsys.readouterr()
112105
assert "Received message" in out

‎pubsub/cloud-client/quickstart_test.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/quickstart_test.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22

3-
# Copyright 2019 Google Inc. All Rights Reserved.
3+
# Copyright 2016 Google Inc. All Rights Reserved.
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.

‎pubsub/cloud-client/subscriber.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/subscriber.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22

3-
# Copyright 2019 Google Inc. All Rights Reserved.
3+
# Copyright 2016 Google Inc. All Rights Reserved.
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.