@@ -55,6 +55,7 @@ def __init__(self,
55
55
msgpack_kwargs = None ,
56
56
queue_maxsize = DEFAULT_QUEUE_MAXSIZE ,
57
57
queue_circular = DEFAULT_QUEUE_CIRCULAR ,
58
+ queue_overflow_handler = None ,
58
59
** kwargs ):
59
60
"""
60
61
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
@@ -66,6 +67,7 @@ def __init__(self,
66
67
** kwargs )
67
68
self ._queue_maxsize = queue_maxsize
68
69
self ._queue_circular = queue_circular
70
+ self ._queue_overflow_handler = queue_overflow_handler
69
71
70
72
self ._thread_guard = threading .Event () # This ensures visibility across all variables
71
73
self ._closed = False
@@ -109,7 +111,8 @@ def _send(self, bytes_):
109
111
if self ._queue_circular and self ._queue .full ():
110
112
# discard oldest
111
113
try :
112
- self ._queue .get (block = False )
114
+ discarded_bytes = self ._queue .get (block = False )
115
+ self ._call_queue_overflow_handler (discarded_bytes )
113
116
except Empty : # pragma: no cover
114
117
pass
115
118
try :
@@ -132,5 +135,13 @@ def _send_loop(self):
132
135
finally :
133
136
self ._close ()
134
137
138
+ def _call_queue_overflow_handler (self , discarded_bytes ):
139
+ try :
140
+ if self ._queue_overflow_handler :
141
+ self ._queue_overflow_handler (discarded_bytes )
142
+ except Exception as e :
143
+ # User should care any exception in handler
144
+ pass
145
+
135
146
def __exit__ (self , exc_type , exc_val , exc_tb ):
136
147
self .close ()
0 commit comments