4
4
import sys
5
5
import unittest
6
6
7
+ try :
8
+ from unittest import mock
9
+ except ImportError :
10
+ import mock
11
+ try :
12
+ from unittest .mock import patch
13
+ except ImportError :
14
+ from mock import patch
15
+
16
+
17
+
7
18
import fluent .asynchandler
8
19
import fluent .handler
9
20
from tests import mockserver
@@ -309,3 +320,68 @@ def test_simple(self):
309
320
eq ('userB' , el [2 ]['to' ])
310
321
self .assertTrue (el [1 ])
311
322
self .assertTrue (isinstance (el [1 ], int ))
323
+
324
+
325
+ class QueueOverflowException (BaseException ):
326
+ pass
327
+
328
+
329
+ def queue_overflow_handler (discarded_bytes ):
330
+ raise QueueOverflowException (discarded_bytes )
331
+
332
+
333
+ class TestHandlerWithCircularQueueHandler (unittest .TestCase ):
334
+ Q_SIZE = 1
335
+
336
+ def setUp (self ):
337
+ super (TestHandlerWithCircularQueueHandler , self ).setUp ()
338
+ self ._server = mockserver .MockRecvServer ('localhost' )
339
+ self ._port = self ._server .port
340
+
341
+ def tearDown (self ):
342
+ self ._server .close ()
343
+
344
+ def get_handler_class (self ):
345
+ # return fluent.handler.FluentHandler
346
+ return fluent .asynchandler .FluentHandler
347
+
348
+ def test_simple (self ):
349
+ handler = self .get_handler_class ()('app.follow' , port = self ._port ,
350
+ queue_maxsize = self .Q_SIZE ,
351
+ queue_circular = True ,
352
+ queue_overflow_handler = queue_overflow_handler )
353
+ with handler :
354
+ def custom_full_queue ():
355
+ handler .sender ._queue .put (b'Mock' , block = True )
356
+ return True
357
+
358
+ with patch .object (fluent .asynchandler .asyncsender .Queue , 'full' , mock .Mock (side_effect = custom_full_queue )):
359
+ self .assertEqual (handler .sender .queue_circular , True )
360
+ self .assertEqual (handler .sender .queue_maxsize , self .Q_SIZE )
361
+
362
+ logging .basicConfig (level = logging .INFO )
363
+ log = logging .getLogger ('fluent.test' )
364
+ handler .setFormatter (fluent .handler .FluentRecordFormatter ())
365
+ log .addHandler (handler )
366
+
367
+ exc_counter = 0
368
+
369
+ try :
370
+ log .info ({'cnt' : 1 , 'from' : 'userA' , 'to' : 'userB' })
371
+ except QueueOverflowException :
372
+ exc_counter += 1
373
+
374
+ try :
375
+ log .info ({'cnt' : 2 , 'from' : 'userA' , 'to' : 'userB' })
376
+ except QueueOverflowException :
377
+ exc_counter += 1
378
+
379
+ try :
380
+ log .info ({'cnt' : 3 , 'from' : 'userA' , 'to' : 'userB' })
381
+ except QueueOverflowException :
382
+ exc_counter += 1
383
+
384
+ # we can't be sure to have exception in every case due to multithreading,
385
+ # so we can test only for a cautelative condition here
386
+ print ('Exception raised: {} (expected 3)' .format (exc_counter ))
387
+ assert exc_counter >= 0
0 commit comments