Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 84c2cb2

Browse filesBrowse files
Added sample for Pub/Sub synchronous pull subscriber (GoogleCloudPlatform#1673)
* Added sample for synchronous pull
1 parent eedc6d2 commit 84c2cb2
Copy full SHA for 84c2cb2

File tree

Expand file treeCollapse file tree

2 files changed

+60
-0
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+60
-0
lines changed

‎pubsub/cloud-client/subscriber.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/subscriber.py
+45Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ def create_push_subscription(project,
9090
def delete_subscription(project, subscription_name):
9191
"""Deletes an existing Pub/Sub topic."""
9292
# [START pubsub_delete_subscription]
93+
# project = "Your Google Cloud Project ID"
94+
# subscription_name = "Your Pubsub subscription name"
9395
subscriber = pubsub_v1.SubscriberClient()
9496
subscription_path = subscriber.subscription_path(
9597
project, subscription_name)
@@ -138,6 +140,8 @@ def receive_messages(project, subscription_name):
138140
"""Receives messages from a pull subscription."""
139141
# [START pubsub_subscriber_async_pull]
140142
# [START pubsub_quickstart_subscriber]
143+
# project = "Your Google Cloud Project ID"
144+
# subscription_name = "Your Pubsub subscription name"
141145
subscriber = pubsub_v1.SubscriberClient()
142146
subscription_path = subscriber.subscription_path(
143147
project, subscription_name)
@@ -160,6 +164,8 @@ def callback(message):
160164
def receive_messages_with_custom_attributes(project, subscription_name):
161165
"""Receives messages from a pull subscription."""
162166
# [START pubsub_subscriber_sync_pull_custom_attributes]
167+
# project = "Your Google Cloud Project ID"
168+
# subscription_name = "Your Pubsub subscription name"
163169
subscriber = pubsub_v1.SubscriberClient()
164170
subscription_path = subscriber.subscription_path(
165171
project, subscription_name)
@@ -186,6 +192,8 @@ def callback(message):
186192
def receive_messages_with_flow_control(project, subscription_name):
187193
"""Receives messages from a pull subscription with flow control."""
188194
# [START pubsub_subscriber_flow_settings]
195+
# project = "Your Google Cloud Project ID"
196+
# subscription_name = "Your Pubsub subscription name"
189197
subscriber = pubsub_v1.SubscriberClient()
190198
subscription_path = subscriber.subscription_path(
191199
project, subscription_name)
@@ -207,9 +215,38 @@ def callback(message):
207215
# [END pubsub_subscriber_flow_settings]
208216

209217

218+
def receive_messages_synchronously(project, subscription_name):
219+
"""Pulling messages synchronously."""
220+
# [START pubsub_subscriber_sync_pull]
221+
# project = "Your Google Cloud Project ID"
222+
# subscription_name = "Your Pubsub subscription name"
223+
subscriber = pubsub_v1.SubscriberClient()
224+
subscription_path = subscriber.subscription_path(
225+
project, subscription_name)
226+
227+
# Builds a pull request with a specific number of messages to return.
228+
# `return_immediately` is set to False so that the system waits (for a
229+
# bounded amount of time) until at lease one message is available.
230+
response = subscriber.pull(
231+
subscription_path,
232+
max_messages=3,
233+
return_immediately=False)
234+
235+
ack_ids = []
236+
for received_message in response.received_messages:
237+
print("Received: {}".format(received_message.message.data))
238+
ack_ids.append(received_message.ack_id)
239+
240+
# Acknowledges the received messages so they will not be sent again.
241+
subscriber.acknowledge(subscription_path, ack_ids)
242+
# [END pubsub_subscriber_sync_pull]
243+
244+
210245
def listen_for_errors(project, subscription_name):
211246
"""Receives messages and catches errors from a pull subscription."""
212247
# [START pubsub_subscriber_error_listener]
248+
# project = "Your Google Cloud Project ID"
249+
# subscription_name = "Your Pubsub subscription name"
213250
subscriber = pubsub_v1.SubscriberClient()
214251
subscription_path = subscriber.subscription_path(
215252
project, subscription_name)
@@ -281,6 +318,11 @@ def callback(message):
281318
help=receive_messages_with_flow_control.__doc__)
282319
receive_with_flow_control_parser.add_argument('subscription_name')
283320

321+
receive_messages_synchronously_parser = subparsers.add_parser(
322+
'receive-synchronously',
323+
help=receive_messages_synchronously.__doc__)
324+
receive_messages_synchronously_parser.add_argument('subscription_name')
325+
284326
listen_for_errors_parser = subparsers.add_parser(
285327
'listen_for_errors', help=listen_for_errors.__doc__)
286328
listen_for_errors_parser.add_argument('subscription_name')
@@ -314,5 +356,8 @@ def callback(message):
314356
elif args.command == 'receive-flow-control':
315357
receive_messages_with_flow_control(
316358
args.project, args.subscription_name)
359+
elif args.command == 'receive-synchronously':
360+
receive_messages_synchronously(
361+
args.project, args.subscription_name)
317362
elif args.command == 'listen_for_errors':
318363
listen_for_errors(args.project, args.subscription_name)

‎pubsub/cloud-client/subscriber_test.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/subscriber_test.py
+15Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,18 @@ def test_receive_with_flow_control(
188188
assert 'Listening' in out
189189
assert subscription in out
190190
assert 'Message 1' in out
191+
192+
193+
def test_receive_synchronously(
194+
publisher_client, topic, subscription, capsys):
195+
_publish_messages(publisher_client, topic)
196+
197+
with _make_sleep_patch():
198+
with pytest.raises(RuntimeError, match='sigil'):
199+
subscriber.receive_messages_with_flow_control(
200+
PROJECT, SUBSCRIPTION)
201+
202+
out, _ = capsys.readouterr()
203+
assert 'Message 1' in out
204+
assert 'Message 2' in out
205+
assert 'Message 3' in out

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.