Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 68a4622

Browse filesBrowse files
Pub/Sub: synchronous pull with lease management (GoogleCloudPlatform#1701)
* Synchronous pull with lease management * Updated library version
1 parent 9fee707 commit 68a4622
Copy full SHA for 68a4622

File tree

Expand file treeCollapse file tree

3 files changed

+137
-28
lines changed
Filter options
Expand file treeCollapse file tree

3 files changed

+137
-28
lines changed

‎pubsub/cloud-client/requirements.txt

Copy file name to clipboard
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
google-cloud-pubsub==0.37.2
1+
google-cloud-pubsub==0.38.0

‎pubsub/cloud-client/subscriber.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/subscriber.py
+83-12Lines changed: 83 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323

2424
import argparse
2525
import time
26+
import logging
27+
import random
28+
import multiprocessing
2629

2730
from google.cloud import pubsub_v1
2831

@@ -215,7 +218,7 @@ def callback(message):
215218
# [END pubsub_subscriber_flow_settings]
216219

217220

218-
def receive_messages_synchronously(project, subscription_name):
221+
def synchronous_pull(project, subscription_name):
219222
"""Pulling messages synchronously."""
220223
# [START pubsub_subscriber_sync_pull]
221224
# project = "Your Google Cloud Project ID"
@@ -224,13 +227,10 @@ def receive_messages_synchronously(project, subscription_name):
224227
subscription_path = subscriber.subscription_path(
225228
project, subscription_name)
226229

227-
# Builds a pull request with a specific number of messages to return.
228-
# `return_immediately` is set to False so that the system waits (for a
229-
# bounded amount of time) until at lease one message is available.
230-
response = subscriber.pull(
231-
subscription_path,
232-
max_messages=3,
233-
return_immediately=False)
230+
NUM_MESSAGES=3
231+
232+
# The subscriber pulls a specific number of messages.
233+
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)
234234

235235
ack_ids = []
236236
for received_message in response.received_messages:
@@ -239,9 +239,72 @@ def receive_messages_synchronously(project, subscription_name):
239239

240240
# Acknowledges the received messages so they will not be sent again.
241241
subscriber.acknowledge(subscription_path, ack_ids)
242+
243+
print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))
242244
# [END pubsub_subscriber_sync_pull]
243245

244246

247+
def synchronous_pull_with_lease_management(project, subscription_name):
248+
"""Pulling messages synchronously with lease management"""
249+
# [START pubsub_subscriber_sync_pull_with_lease]
250+
# project = "Your Google Cloud Project ID"
251+
# subscription_name = "Your Pubsub subscription name"
252+
subscriber = pubsub_v1.SubscriberClient()
253+
subscription_path = subscriber.subscription_path(
254+
project, subscription_name)
255+
256+
NUM_MESSAGES=2
257+
ACK_DEADLINE=30
258+
SLEEP_TIME=10
259+
260+
# The subscriber pulls a specific number of messages.
261+
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)
262+
263+
multiprocessing.log_to_stderr()
264+
logger = multiprocessing.get_logger()
265+
logger.setLevel(logging.INFO)
266+
267+
def worker(msg):
268+
"""Simulates a long-running process."""
269+
RUN_TIME = random.randint(1,60)
270+
logger.info('{}: Running {} for {}s'.format(
271+
time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
272+
time.sleep(RUN_TIME)
273+
274+
# `processes` stores process as key and ack id and message as values.
275+
processes = dict()
276+
for message in response.received_messages:
277+
process = multiprocessing.Process(target=worker, args=(message,))
278+
processes[process] = (message.ack_id, message.message.data)
279+
process.start()
280+
281+
while processes:
282+
for process, (ack_id, msg_data) in processes.items():
283+
# If the process is still running, reset the ack deadline as
284+
# specified by ACK_DEADLINE once every while as specified
285+
# by SLEEP_TIME.
286+
if process.is_alive():
287+
# `ack_deadline_seconds` must be between 10 to 600.
288+
subscriber.modify_ack_deadline(subscription_path,
289+
[ack_id], ack_deadline_seconds=ACK_DEADLINE)
290+
logger.info('{}: Reset ack deadline for {} for {}s'.format(
291+
time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE))
292+
293+
# If the processs is finished, acknowledges using `ack_id`.
294+
else:
295+
subscriber.acknowledge(subscription_path, [ack_id])
296+
logger.info("{}: Acknowledged {}".format(
297+
time.strftime("%X", time.gmtime()), msg_data))
298+
processes.pop(process)
299+
300+
# If there are still processes running, sleeps the thread.
301+
if processes:
302+
time.sleep(SLEEP_TIME)
303+
304+
print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))
305+
# [END pubsub_subscriber_sync_pull_with_lease]
306+
307+
245308
def listen_for_errors(project, subscription_name):
246309
"""Receives messages and catches errors from a pull subscription."""
247310
# [START pubsub_subscriber_error_listener]
@@ -318,10 +381,15 @@ def callback(message):
318381
help=receive_messages_with_flow_control.__doc__)
319382
receive_with_flow_control_parser.add_argument('subscription_name')
320383

321-
receive_messages_synchronously_parser = subparsers.add_parser(
384+
synchronous_pull_parser = subparsers.add_parser(
322385
'receive-synchronously',
323-
help=receive_messages_synchronously.__doc__)
324-
receive_messages_synchronously_parser.add_argument('subscription_name')
386+
help=synchronous_pull.__doc__)
387+
synchronous_pull_parser.add_argument('subscription_name')
388+
389+
synchronous_pull_with_lease_management_parser = subparsers.add_parser(
390+
'receive-synchronously-with-lease',
391+
help=synchronous_pull_with_lease_management.__doc__)
392+
synchronous_pull_with_lease_management_parser.add_argument('subscription_name')
325393

326394
listen_for_errors_parser = subparsers.add_parser(
327395
'listen_for_errors', help=listen_for_errors.__doc__)
@@ -357,7 +425,10 @@ def callback(message):
357425
receive_messages_with_flow_control(
358426
args.project, args.subscription_name)
359427
elif args.command == 'receive-synchronously':
360-
receive_messages_synchronously(
428+
synchronous_pull(
429+
args.project, args.subscription_name)
430+
elif args.command == 'receive-synchronously-with-lease':
431+
synchronous_pull_with_lease_management(
361432
args.project, args.subscription_name)
362433
elif args.command == 'listen_for_errors':
363434
listen_for_errors(args.project, args.subscription_name)

‎pubsub/cloud-client/subscriber_test.py

Copy file name to clipboardExpand all lines: pubsub/cloud-client/subscriber_test.py
+53-15Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
PROJECT = os.environ['GCLOUD_PROJECT']
2626
TOPIC = 'subscription-test-topic'
2727
SUBSCRIPTION = 'subscription-test-subscription'
28+
SUBSCRIPTION_SYNC1 = 'subscription-test-subscription-sync1'
29+
SUBSCRIPTION_SYNC2 = 'subscription-test-subscription-sync2'
2830
ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT)
2931

3032

@@ -67,6 +69,36 @@ def subscription(subscriber_client, topic):
6769
yield subscription_path
6870

6971

72+
@pytest.fixture
73+
def subscription_sync1(subscriber_client, topic):
74+
subscription_sync_path = subscriber_client.subscription_path(
75+
PROJECT, SUBSCRIPTION_SYNC1)
76+
77+
try:
78+
subscriber_client.delete_subscription(subscription_sync_path)
79+
except Exception:
80+
pass
81+
82+
subscriber_client.create_subscription(subscription_sync_path, topic=topic)
83+
84+
yield subscription_sync_path
85+
86+
87+
@pytest.fixture
88+
def subscription_sync2(subscriber_client, topic):
89+
subscription_sync_path = subscriber_client.subscription_path(
90+
PROJECT, SUBSCRIPTION_SYNC2)
91+
92+
try:
93+
subscriber_client.delete_subscription(subscription_sync_path)
94+
except Exception:
95+
pass
96+
97+
subscriber_client.create_subscription(subscription_sync_path, topic=topic)
98+
99+
yield subscription_sync_path
100+
101+
70102
def test_list_in_topic(subscription, capsys):
71103
@eventually_consistent.call
72104
def _():
@@ -160,6 +192,27 @@ def test_receive(publisher_client, topic, subscription, capsys):
160192
assert 'Message 1' in out
161193

162194

195+
def test_receive_synchronously(
196+
publisher_client, topic, subscription_sync1, capsys):
197+
_publish_messages(publisher_client, topic)
198+
199+
subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC1)
200+
201+
out, _ = capsys.readouterr()
202+
assert 'Done.' in out
203+
204+
205+
def test_receive_synchronously_with_lease(
206+
publisher_client, topic, subscription_sync2, capsys):
207+
_publish_messages(publisher_client, topic)
208+
209+
subscriber.synchronous_pull_with_lease_management(
210+
PROJECT, SUBSCRIPTION_SYNC2)
211+
212+
out, _ = capsys.readouterr()
213+
assert 'Done.' in out
214+
215+
163216
def test_receive_with_custom_attributes(
164217
publisher_client, topic, subscription, capsys):
165218
_publish_messages_with_custom_attributes(publisher_client, topic)
@@ -188,18 +241,3 @@ def test_receive_with_flow_control(
188241
assert 'Listening' in out
189242
assert subscription in out
190243
assert 'Message 1' in out
191-
192-
193-
def test_receive_synchronously(
194-
publisher_client, topic, subscription, capsys):
195-
_publish_messages(publisher_client, topic)
196-
197-
with _make_sleep_patch():
198-
with pytest.raises(RuntimeError, match='sigil'):
199-
subscriber.receive_messages_with_flow_control(
200-
PROJECT, SUBSCRIPTION)
201-
202-
out, _ = capsys.readouterr()
203-
assert 'Message 1' in out
204-
assert 'Message 2' in out
205-
assert 'Message 3' in out

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.