Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 6c7677e

Browse filesBrowse files
author
Gurov Ilya
authored
refactor: incorporate will_accept() checks into publish() (#108)
1 parent 0132a46 commit 6c7677e
Copy full SHA for 6c7677e

File tree

Expand file treeCollapse file tree

5 files changed

+47
-69
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+47
-69
lines changed
Open diff view settings
Collapse file

‎google/cloud/pubsub_v1/publisher/_batch/base.py‎

Copy file name to clipboardExpand all lines: google/cloud/pubsub_v1/publisher/_batch/base.py
-26Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -109,32 +109,6 @@ def status(self):
109109
"""
110110
raise NotImplementedError
111111

112-
def will_accept(self, message):
113-
"""Return True if the batch is able to accept the message.
114-
115-
In concurrent implementations, the attributes on the current batch
116-
may be modified by other workers. With this in mind, the caller will
117-
likely want to hold a lock that will make sure the state remains
118-
the same after the "will accept?" question is answered.
119-
120-
Args:
121-
message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message.
122-
123-
Returns:
124-
bool: Whether this batch can accept the message.
125-
"""
126-
# If this batch is not accepting messages generally, return False.
127-
if self.status != BatchStatus.ACCEPTING_MESSAGES:
128-
return False
129-
130-
# If this message will make the batch exceed the ``max_messages``
131-
# setting, return False.
132-
if len(self.messages) >= self.settings.max_messages:
133-
return False
134-
135-
# Okay, everything is good.
136-
return True
137-
138112
def cancel(self, cancellation_reason):
139113
"""Complete pending futures with an exception.
140114
Collapse file

‎google/cloud/pubsub_v1/publisher/_batch/thread.py‎

Copy file name to clipboardExpand all lines: google/cloud/pubsub_v1/publisher/_batch/thread.py
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,8 @@ def publish(self, message):
333333
self._status != base.BatchStatus.ERROR
334334
), "Publish after stop() or publish error."
335335

336-
if not self.will_accept(message):
337-
return future
336+
if self.status != base.BatchStatus.ACCEPTING_MESSAGES:
337+
return
338338

339339
size_increase = types.PublishRequest(messages=[message]).ByteSize()
340340

Collapse file

‎tests/unit/pubsub_v1/publisher/batch/test_base.py‎

Copy file name to clipboardExpand all lines: tests/unit/pubsub_v1/publisher/batch/test_base.py
-30Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,3 @@ def test_len():
4646
assert len(batch) == 0
4747
batch.publish(types.PubsubMessage(data=b"foo"))
4848
assert len(batch) == 1
49-
50-
51-
def test_will_accept():
52-
batch = create_batch(status=BatchStatus.ACCEPTING_MESSAGES)
53-
message = types.PubsubMessage()
54-
assert batch.will_accept(message) is True
55-
56-
57-
def test_will_accept_oversize():
58-
batch = create_batch(
59-
settings=types.BatchSettings(max_bytes=10),
60-
status=BatchStatus.ACCEPTING_MESSAGES,
61-
)
62-
message = types.PubsubMessage(data=b"abcdefghijklmnopqrstuvwxyz")
63-
assert batch.will_accept(message) is True
64-
65-
66-
def test_will_not_accept_status():
67-
batch = create_batch(status="talk to the hand")
68-
message = types.PubsubMessage()
69-
assert batch.will_accept(message) is False
70-
71-
72-
def test_will_not_accept_number():
73-
batch = create_batch(
74-
settings=types.BatchSettings(max_messages=-1),
75-
status=BatchStatus.ACCEPTING_MESSAGES,
76-
)
77-
message = types.PubsubMessage(data=b"abc")
78-
assert batch.will_accept(message) is False
Collapse file

‎tests/unit/pubsub_v1/publisher/batch/test_thread.py‎

Copy file name to clipboardExpand all lines: tests/unit/pubsub_v1/publisher/batch/test_thread.py
+45-7Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -287,18 +287,56 @@ def test_publish_updating_batch_size():
287287
assert batch.size > 0 # I do not always trust protobuf.
288288

289289

290-
def test_publish_not_will_accept():
290+
def test_publish():
291+
batch = create_batch()
292+
message = types.PubsubMessage()
293+
future = batch.publish(message)
294+
295+
assert len(batch.messages) == 1
296+
assert batch._futures == [future]
297+
298+
299+
def test_publish_max_messages_zero():
291300
batch = create_batch(topic="topic_foo", max_messages=0)
292-
base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()
293301

294-
# Publish the message.
295302
message = types.PubsubMessage(data=b"foobarbaz")
303+
with mock.patch.object(batch, "commit") as commit:
304+
future = batch.publish(message)
305+
306+
assert future is not None
307+
assert len(batch.messages) == 1
308+
assert batch._futures == [future]
309+
commit.assert_called_once()
310+
311+
312+
def test_publish_max_messages_enforced():
313+
batch = create_batch(topic="topic_foo", max_messages=1)
314+
315+
message = types.PubsubMessage(data=b"foobarbaz")
316+
message2 = types.PubsubMessage(data=b"foobarbaz2")
317+
318+
future = batch.publish(message)
319+
future2 = batch.publish(message2)
320+
321+
assert future is not None
322+
assert future2 is None
323+
assert len(batch.messages) == 1
324+
assert len(batch._futures) == 1
325+
326+
327+
def test_publish_max_bytes_enforced():
328+
batch = create_batch(topic="topic_foo", max_bytes=15)
329+
330+
message = types.PubsubMessage(data=b"foobarbaz")
331+
message2 = types.PubsubMessage(data=b"foobarbaz2")
332+
296333
future = batch.publish(message)
334+
future2 = batch.publish(message2)
297335

298-
assert future is None
299-
assert batch.size == base_request_size
300-
assert batch.messages == []
301-
assert batch._futures == []
336+
assert future is not None
337+
assert future2 is None
338+
assert len(batch.messages) == 1
339+
assert len(batch._futures) == 1
302340

303341

304342
def test_publish_exceed_max_messages():
Collapse file

‎tests/unit/pubsub_v1/publisher/test_publisher_client.py‎

Copy file name to clipboardExpand all lines: tests/unit/pubsub_v1/publisher/test_publisher_client.py
-4Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ def test_publish():
135135
batch = mock.Mock(spec=client._batch_class)
136136

137137
# Set the mock up to claim indiscriminately that it accepts all messages.
138-
batch.will_accept.return_value = True
139138
batch.publish.side_effect = (future1, future2)
140139

141140
topic = "topic/path"
@@ -169,7 +168,6 @@ def test_publish_error_exceeding_flow_control_limits():
169168
client = publisher.Client(credentials=creds, publisher_options=publisher_options)
170169

171170
mock_batch = mock.Mock(spec=client._batch_class)
172-
mock_batch.will_accept.return_value = True
173171
topic = "topic/path"
174172
client._set_batch(topic, mock_batch)
175173

@@ -216,7 +214,6 @@ def test_publish_attrs_bytestring():
216214
# Use a mock in lieu of the actual batch class.
217215
batch = mock.Mock(spec=client._batch_class)
218216
# Set the mock up to claim indiscriminately that it accepts all messages.
219-
batch.will_accept.return_value = True
220217

221218
topic = "topic/path"
222219
client._set_batch(topic, batch)
@@ -431,7 +428,6 @@ def test_publish_with_ordering_key():
431428
future1.add_done_callback = mock.Mock(spec=["__call__"])
432429
future2.add_done_callback = mock.Mock(spec=["__call__"])
433430

434-
batch.will_accept.return_value = True
435431
batch.publish.side_effect = (future1, future2)
436432

437433
topic = "topic/path"

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.