Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d1b81ba

Browse filesBrowse files
authored
Merge pull request #156 from pguz/queue_overflow_handler
Add queue overflow handler in asyncsender.
2 parents eb68481 + 478bd02 commit d1b81ba
Copy full SHA for d1b81ba

File tree

2 files changed

+87
-1
lines changed
Filter options

2 files changed

+87
-1
lines changed

‎fluent/asyncsender.py

Copy file name to clipboardExpand all lines: fluent/asyncsender.py
+11-1Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(self,
5555
msgpack_kwargs=None,
5656
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
5757
queue_circular=DEFAULT_QUEUE_CIRCULAR,
58+
queue_overflow_handler=None,
5859
**kwargs):
5960
"""
6061
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
@@ -66,6 +67,10 @@ def __init__(self,
6667
**kwargs)
6768
self._queue_maxsize = queue_maxsize
6869
self._queue_circular = queue_circular
70+
if queue_circular and queue_overflow_handler:
71+
self._queue_overflow_handler = queue_overflow_handler
72+
else:
73+
self._queue_overflow_handler = self._queue_overflow_handler_default
6974

7075
self._thread_guard = threading.Event() # This ensures visibility across all variables
7176
self._closed = False
@@ -109,9 +114,11 @@ def _send(self, bytes_):
109114
if self._queue_circular and self._queue.full():
110115
# discard oldest
111116
try:
112-
self._queue.get(block=False)
117+
discarded_bytes = self._queue.get(block=False)
113118
except Empty: # pragma: no cover
114119
pass
120+
else:
121+
self._queue_overflow_handler(discarded_bytes)
115122
try:
116123
self._queue.put(bytes_, block=(not self._queue_circular))
117124
except Full: # pragma: no cover
@@ -132,5 +139,8 @@ def _send_loop(self):
132139
finally:
133140
self._close()
134141

142+
def _queue_overflow_handler_default(self, discarded_bytes):
143+
pass
144+
135145
def __exit__(self, exc_type, exc_val, exc_tb):
136146
self.close()

‎tests/test_asynchandler.py

Copy file name to clipboardExpand all lines: tests/test_asynchandler.py
+76Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@
44
import sys
55
import unittest
66

7+
try:
8+
from unittest import mock
9+
except ImportError:
10+
import mock
11+
try:
12+
from unittest.mock import patch
13+
except ImportError:
14+
from mock import patch
15+
16+
17+
718
import fluent.asynchandler
819
import fluent.handler
920
from tests import mockserver
@@ -309,3 +320,68 @@ def test_simple(self):
309320
eq('userB', el[2]['to'])
310321
self.assertTrue(el[1])
311322
self.assertTrue(isinstance(el[1], int))
323+
324+
325+
class QueueOverflowException(BaseException):
326+
pass
327+
328+
329+
def queue_overflow_handler(discarded_bytes):
330+
raise QueueOverflowException(discarded_bytes)
331+
332+
333+
class TestHandlerWithCircularQueueHandler(unittest.TestCase):
334+
Q_SIZE = 1
335+
336+
def setUp(self):
337+
super(TestHandlerWithCircularQueueHandler, self).setUp()
338+
self._server = mockserver.MockRecvServer('localhost')
339+
self._port = self._server.port
340+
341+
def tearDown(self):
342+
self._server.close()
343+
344+
def get_handler_class(self):
345+
# return fluent.handler.FluentHandler
346+
return fluent.asynchandler.FluentHandler
347+
348+
def test_simple(self):
349+
handler = self.get_handler_class()('app.follow', port=self._port,
350+
queue_maxsize=self.Q_SIZE,
351+
queue_circular=True,
352+
queue_overflow_handler=queue_overflow_handler)
353+
with handler:
354+
def custom_full_queue():
355+
handler.sender._queue.put(b'Mock', block=True)
356+
return True
357+
358+
with patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(side_effect=custom_full_queue)):
359+
self.assertEqual(handler.sender.queue_circular, True)
360+
self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE)
361+
362+
logging.basicConfig(level=logging.INFO)
363+
log = logging.getLogger('fluent.test')
364+
handler.setFormatter(fluent.handler.FluentRecordFormatter())
365+
log.addHandler(handler)
366+
367+
exc_counter = 0
368+
369+
try:
370+
log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'})
371+
except QueueOverflowException:
372+
exc_counter += 1
373+
374+
try:
375+
log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'})
376+
except QueueOverflowException:
377+
exc_counter += 1
378+
379+
try:
380+
log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'})
381+
except QueueOverflowException:
382+
exc_counter += 1
383+
384+
# we can't be sure to have exception in every case due to multithreading,
385+
# so we can test only for a cautelative condition here
386+
print('Exception raised: {} (expected 3)'.format(exc_counter))
387+
assert exc_counter >= 0

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.