Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 06085c4

Browse filesBrowse files
authored
feat: add flow control for message publishing (#96)
* feat: add publish flow control settings * Add flow control logic to publisher client * Add flow control support for multiple add() threads * Raise publish flow control errors through futures * Include load info in debug log messages * Remove incorrect comment in a test * Remove comment about an error not directly raised * Remove redundant check for reservation exsistence * Change exception for publishing too large a message * Add internal sanity check for byte reservations * Reword the docstring on flow control limits error
1 parent cf9e87c commit 06085c4
Copy full SHA for 06085c4

File tree

Expand file treeCollapse file tree

6 files changed

+837
-23
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

6 files changed

+837
-23
lines changed
Open diff view settings
Collapse file

‎google/cloud/pubsub_v1/publisher/client.py‎

Copy file name to clipboardExpand all lines: google/cloud/pubsub_v1/publisher/client.py
+24-1Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@
3131
from google.cloud.pubsub_v1 import types
3232
from google.cloud.pubsub_v1.gapic import publisher_client
3333
from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport
34+
from google.cloud.pubsub_v1.publisher import exceptions
35+
from google.cloud.pubsub_v1.publisher import futures
3436
from google.cloud.pubsub_v1.publisher._batch import thread
3537
from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
3638
from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer
39+
from google.cloud.pubsub_v1.publisher.flow_controller import FlowController
3740

3841
__version__ = pkg_resources.get_distribution("google-cloud-pubsub").version
3942

@@ -93,7 +96,11 @@ class Client(object):
9396
9497
# Optional
9598
publisher_options = pubsub_v1.types.PublisherOptions(
96-
enable_message_ordering=False
99+
enable_message_ordering=False,
100+
flow_control=pubsub_v1.types.PublishFlowControl(
101+
message_limit=2000,
102+
limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK,
103+
),
97104
),
98105
99106
# Optional
@@ -198,6 +205,9 @@ def __init__(self, batch_settings=(), publisher_options=(), **kwargs):
198205
# Thread created to commit all sequencers after a timeout.
199206
self._commit_thread = None
200207

208+
# The object controlling the message publishing flow
209+
self._flow_controller = FlowController(self.publisher_options.flow_control)
210+
201211
@classmethod
202212
def from_service_account_file(cls, filename, batch_settings=(), **kwargs):
203213
"""Creates an instance of this client using the provided credentials
@@ -364,6 +374,18 @@ def publish(self, topic, data, ordering_key="", **attrs):
364374
data=data, ordering_key=ordering_key, attributes=attrs
365375
)
366376

377+
# Messages should go through flow control to prevent excessive
378+
# queuing on the client side (depending on the settings).
379+
try:
380+
self._flow_controller.add(message)
381+
except exceptions.FlowControlLimitError as exc:
382+
future = futures.Future()
383+
future.set_exception(exc)
384+
return future
385+
386+
def on_publish_done(future):
387+
self._flow_controller.release(message)
388+
367389
with self._batch_lock:
368390
if self._is_stopped:
369391
raise RuntimeError("Cannot publish on a stopped publisher.")
@@ -372,6 +394,7 @@ def publish(self, topic, data, ordering_key="", **attrs):
372394

373395
# Delegate the publishing to the sequencer.
374396
future = sequencer.publish(message)
397+
future.add_done_callback(on_publish_done)
375398

376399
# Create a timer thread if necessary to enforce the batching
377400
# timeout.
Collapse file

‎google/cloud/pubsub_v1/publisher/exceptions.py‎

Copy file name to clipboardExpand all lines: google/cloud/pubsub_v1/publisher/exceptions.py
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ def __init__(self, ordering_key):
3838
super(PublishToPausedOrderingKeyException, self).__init__()
3939

4040

41+
class FlowControlLimitError(Exception):
42+
"""An action resulted in exceeding the flow control limits."""
43+
44+
4145
__all__ = (
46+
"FlowControlLimitError",
4247
"MessageTooLargeError",
4348
"PublishError",
4449
"TimeoutError",
Collapse file
+297Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
# Copyright 2020, Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from collections import deque
16+
import logging
17+
import threading
18+
import warnings
19+
20+
from google.cloud.pubsub_v1 import types
21+
from google.cloud.pubsub_v1.publisher import exceptions
22+
23+
24+
_LOGGER = logging.getLogger(__name__)
25+
26+
27+
class _QuantityReservation(object):
28+
"""A (partial) reservation of a quantifiable resource."""
29+
30+
def __init__(self, reserved, needed):
31+
self.reserved = reserved
32+
self.needed = needed
33+
34+
35+
class FlowController(object):
36+
"""A class used to control the flow of messages passing through it.
37+
38+
Args:
39+
settings (~google.cloud.pubsub_v1.types.PublishFlowControl):
40+
Desired flow control configuration.
41+
"""
42+
43+
def __init__(self, settings):
44+
self._settings = settings
45+
46+
# Load statistics. They represent the number of messages added, but not
47+
# yet released (and their total size).
48+
self._message_count = 0
49+
self._total_bytes = 0
50+
51+
# A FIFO queue of threads blocked on adding a message, from first to last.
52+
# Only relevant if the configured limit exceeded behavior is BLOCK.
53+
self._waiting = deque()
54+
55+
# Reservations of available flow control bytes by the waiting threads.
56+
# Each value is a _QuantityReservation instance.
57+
self._byte_reservations = dict()
58+
self._reserved_bytes = 0
59+
60+
# The lock is used to protect all internal state (message and byte count,
61+
# waiting threads to add, etc.).
62+
self._operational_lock = threading.Lock()
63+
64+
# The condition for blocking the flow if capacity is exceeded.
65+
self._has_capacity = threading.Condition(lock=self._operational_lock)
66+
67+
def add(self, message):
68+
"""Add a message to flow control.
69+
70+
Adding a message updates the internal load statistics, and an action is
71+
taken if these limits are exceeded (depending on the flow control settings).
72+
73+
Args:
74+
message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
75+
The message entering the flow control.
76+
77+
Raises:
78+
:exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`:
79+
Raised when the desired action is
80+
:attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR` and
81+
the message would exceed flow control limits, or when the desired action
82+
is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK` and
83+
the message would block forever against the flow control limits.
84+
"""
85+
if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
86+
return
87+
88+
with self._operational_lock:
89+
if not self._would_overflow(message):
90+
self._message_count += 1
91+
self._total_bytes += message.ByteSize()
92+
return
93+
94+
# Adding a message would overflow, react.
95+
if (
96+
self._settings.limit_exceeded_behavior
97+
== types.LimitExceededBehavior.ERROR
98+
):
99+
# Raising an error means rejecting a message, thus we do not
100+
# add anything to the existing load, but we do report the would-be
101+
# load if we accepted the message.
102+
load_info = self._load_info(
103+
message_count=self._message_count + 1,
104+
total_bytes=self._total_bytes + message.ByteSize(),
105+
)
106+
error_msg = "Flow control limits would be exceeded - {}.".format(
107+
load_info
108+
)
109+
raise exceptions.FlowControlLimitError(error_msg)
110+
111+
assert (
112+
self._settings.limit_exceeded_behavior
113+
== types.LimitExceededBehavior.BLOCK
114+
)
115+
116+
# Sanity check - if a message exceeds total flow control limits all
117+
# by itself, it would block forever, thus raise error.
118+
if (
119+
message.ByteSize() > self._settings.byte_limit
120+
or self._settings.message_limit < 1
121+
):
122+
load_info = self._load_info(
123+
message_count=1, total_bytes=message.ByteSize()
124+
)
125+
error_msg = (
126+
"Total flow control limits too low for the message, "
127+
"would block forever - {}.".format(load_info)
128+
)
129+
raise exceptions.FlowControlLimitError(error_msg)
130+
131+
current_thread = threading.current_thread()
132+
133+
while self._would_overflow(message):
134+
if current_thread not in self._byte_reservations:
135+
self._waiting.append(current_thread)
136+
self._byte_reservations[current_thread] = _QuantityReservation(
137+
reserved=0, needed=message.ByteSize()
138+
)
139+
140+
_LOGGER.debug(
141+
"Blocking until there is enough free capacity in the flow - "
142+
"{}.".format(self._load_info())
143+
)
144+
145+
self._has_capacity.wait()
146+
147+
_LOGGER.debug(
148+
"Woke up from waiting on free capacity in the flow - "
149+
"{}.".format(self._load_info())
150+
)
151+
152+
# Message accepted, increase the load and remove thread stats.
153+
self._message_count += 1
154+
self._total_bytes += message.ByteSize()
155+
self._reserved_bytes -= self._byte_reservations[current_thread].reserved
156+
del self._byte_reservations[current_thread]
157+
self._waiting.remove(current_thread)
158+
159+
def release(self, message):
160+
"""Release a mesage from flow control.
161+
162+
Args:
163+
message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
164+
The message entering the flow control.
165+
"""
166+
if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
167+
return
168+
169+
with self._operational_lock:
170+
# Releasing a message decreases the load.
171+
self._message_count -= 1
172+
self._total_bytes -= message.ByteSize()
173+
174+
if self._message_count < 0 or self._total_bytes < 0:
175+
warnings.warn(
176+
"Releasing a message that was never added or already released.",
177+
category=RuntimeWarning,
178+
stacklevel=2,
179+
)
180+
self._message_count = max(0, self._message_count)
181+
self._total_bytes = max(0, self._total_bytes)
182+
183+
self._distribute_available_bytes()
184+
185+
# If at least one thread waiting to add() can be unblocked, wake them up.
186+
if self._ready_to_unblock():
187+
_LOGGER.debug("Notifying threads waiting to add messages to flow.")
188+
self._has_capacity.notify_all()
189+
190+
def _distribute_available_bytes(self):
191+
"""Distribute availalbe free capacity among the waiting threads in FIFO order.
192+
193+
The method assumes that the caller has obtained ``_operational_lock``.
194+
"""
195+
available = self._settings.byte_limit - self._total_bytes - self._reserved_bytes
196+
197+
for thread in self._waiting:
198+
if available <= 0:
199+
break
200+
201+
reservation = self._byte_reservations[thread]
202+
still_needed = reservation.needed - reservation.reserved
203+
204+
# Sanity check for any internal inconsistencies.
205+
if still_needed < 0:
206+
msg = "Too many bytes reserved: {} / {}".format(
207+
reservation.reserved, reservation.needed
208+
)
209+
warnings.warn(msg, category=RuntimeWarning)
210+
still_needed = 0
211+
212+
can_give = min(still_needed, available)
213+
reservation.reserved += can_give
214+
self._reserved_bytes += can_give
215+
available -= can_give
216+
217+
def _ready_to_unblock(self):
218+
"""Determine if any of the threads waiting to add a message can proceed.
219+
220+
The method assumes that the caller has obtained ``_operational_lock``.
221+
222+
Returns:
223+
bool
224+
"""
225+
if self._waiting:
226+
# It's enough to only check the head of the queue, because FIFO
227+
# distribution of any free capacity.
228+
reservation = self._byte_reservations[self._waiting[0]]
229+
return (
230+
reservation.reserved >= reservation.needed
231+
and self._message_count < self._settings.message_limit
232+
)
233+
234+
return False
235+
236+
def _would_overflow(self, message):
237+
"""Determine if accepting a message would exceed flow control limits.
238+
239+
The method assumes that the caller has obtained ``_operational_lock``.
240+
241+
Args:
242+
message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
243+
The message entering the flow control.
244+
245+
Returns:
246+
bool
247+
"""
248+
reservation = self._byte_reservations.get(threading.current_thread())
249+
250+
if reservation:
251+
enough_reserved = reservation.reserved >= reservation.needed
252+
else:
253+
enough_reserved = False
254+
255+
bytes_taken = self._total_bytes + self._reserved_bytes + message.ByteSize()
256+
size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved
257+
msg_count_overflow = self._message_count + 1 > self._settings.message_limit
258+
259+
return size_overflow or msg_count_overflow
260+
261+
def _load_info(self, message_count=None, total_bytes=None, reserved_bytes=None):
262+
"""Return the current flow control load information.
263+
264+
The caller can optionally adjust some of the values to fit its reporting
265+
needs.
266+
267+
The method assumes that the caller has obtained ``_operational_lock``.
268+
269+
Args:
270+
message_count (Optional[int]):
271+
The value to override the current message count with.
272+
total_bytes (Optional[int]):
273+
The value to override the current total bytes with.
274+
reserved_bytes (Optional[int]):
275+
The value to override the current number of reserved bytes with.
276+
277+
Returns:
278+
str
279+
"""
280+
msg = "messages: {} / {}, bytes: {} / {} (reserved: {})"
281+
282+
if message_count is None:
283+
message_count = self._message_count
284+
285+
if total_bytes is None:
286+
total_bytes = self._total_bytes
287+
288+
if reserved_bytes is None:
289+
reserved_bytes = self._reserved_bytes
290+
291+
return msg.format(
292+
message_count,
293+
self._settings.message_limit,
294+
total_bytes,
295+
self._settings.byte_limit,
296+
reserved_bytes,
297+
)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.