Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 50119c5

Browse filesBrowse files
author
Paweł Guz
committed
Add queue overflow handler in asyncsender.
1 parent a37f313 commit 50119c5
Copy full SHA for 50119c5

File tree

1 file changed

+12
-1
lines changed
Filter options

1 file changed

+12
-1
lines changed

‎fluent/asyncsender.py

Copy file name to clipboardExpand all lines: fluent/asyncsender.py
+12-1Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(self,
5555
msgpack_kwargs=None,
5656
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
5757
queue_circular=DEFAULT_QUEUE_CIRCULAR,
58+
queue_overflow_handler=None,
5859
**kwargs):
5960
"""
6061
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
@@ -66,6 +67,7 @@ def __init__(self,
6667
**kwargs)
6768
self._queue_maxsize = queue_maxsize
6869
self._queue_circular = queue_circular
70+
self._queue_overflow_handler = queue_overflow_handler
6971

7072
self._thread_guard = threading.Event() # This ensures visibility across all variables
7173
self._closed = False
@@ -109,7 +111,8 @@ def _send(self, bytes_):
109111
if self._queue_circular and self._queue.full():
110112
# discard oldest
111113
try:
112-
self._queue.get(block=False)
114+
discarded_bytes = self._queue.get(block=False)
115+
self._call_queue_overflow_handler(discarded_bytes)
113116
except Empty: # pragma: no cover
114117
pass
115118
try:
@@ -132,5 +135,13 @@ def _send_loop(self):
132135
finally:
133136
self._close()
134137

138+
def _call_queue_overflow_handler(self, discarded_bytes):
139+
try:
140+
if self._queue_overflow_handler:
141+
self._queue_overflow_handler(discarded_bytes)
142+
except Exception as e:
143+
# User should care any exception in handler
144+
pass
145+
135146
def __exit__(self, exc_type, exc_val, exc_tb):
136147
self.close()

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.