@@ -570,41 +570,49 @@ async def test_shutdown_repr(self):
570
570
self .assertIn ("shutdown-immediate" , repr (q ))
571
571
572
572
async def test_get_shutdown_immediate (self ):
573
+ q = self .q_class ()
573
574
results = []
574
- maxsize = 2
575
- delay = 1e-3
575
+ go = asyncio .Event ()
576
576
577
- async def get_once (q ):
577
+ async def get_once (q , go ):
578
+ await go .wait ()
578
579
try :
579
580
msg = await q .get ()
580
581
results .append (False )
581
582
except asyncio .QueueShutDown :
582
583
results .append (True )
583
584
return True
584
585
585
- async def shutdown (q , delay , immediate ):
586
- await asyncio .sleep (delay )
586
+ async def shutdown (q , go , immediate ):
587
587
q .shutdown (immediate )
588
+ go .set ()
588
589
return True
589
590
590
- q = self .q_class (maxsize )
591
- t = [asyncio .create_task (get_once (q )) for _ in range (maxsize )]
592
- t += [asyncio .create_task (shutdown (q , delay , True ))]
591
+ tasks = (
592
+ (get_once , (q , go )),
593
+ (get_once , (q , go )),
594
+ )
595
+ t = []
596
+ for coro , params in tasks :
597
+ t .append (asyncio .create_task (coro (* params )))
598
+ t .append (asyncio .create_task (shutdown (q , go , True )))
593
599
res = await asyncio .gather (* t )
594
600
595
- self .assertEqual (results , [True ]* maxsize )
601
+ self .assertEqual (results , [True ]* len ( tasks ) )
596
602
597
603
598
- async def test_put_shutdown (self ):
599
- maxsize = 2
604
+ async def _put_shutdown (self , immediate ):
605
+ q = self . q_class ( 2 )
600
606
results = []
601
607
go = asyncio .Event ()
608
+ await q .put ("Y" )
609
+ await q .put ("D" )
610
+ # queue fulled
602
611
603
- async def put_twice (q , go , msg ):
604
- await q .put (msg )
612
+ async def put_once (q , go , msg ):
605
613
await go .wait ()
606
614
try :
607
- await q .put (msg + maxsize )
615
+ await q .put (msg )
608
616
results .append (False )
609
617
except asyncio .QueueShutDown :
610
618
results .append (True )
@@ -614,24 +622,37 @@ async def shutdown(q, go, immediate):
614
622
q .shutdown (immediate )
615
623
go .set ()
616
624
617
- q = self .q_class (maxsize )
618
- t = [asyncio .create_task (put_twice (q , go , i + 1 )) for i in range (maxsize )]
619
- t += [asyncio .create_task (shutdown (q , go , False ))]
625
+ tasks = (
626
+ (put_once , (q , go , 100 )),
627
+ (put_once , (q , go , 200 )),
628
+ )
629
+ t = []
630
+ for coro , params in tasks :
631
+ t .append (asyncio .create_task (coro (* params )))
632
+ t .append (asyncio .create_task (shutdown (q , go , immediate )))
620
633
res = await asyncio .gather (* t )
621
634
622
- self .assertEqual (results , [True ]* maxsize )
635
+ self .assertEqual (results , [True ]* len ( tasks ) )
623
636
637
+ async def test_put_shutdown (self ):
638
+ return await self ._put_shutdown (False )
624
639
625
- async def test_put_and_join_shutdown (self ):
626
- maxsize = 2
640
+ async def test_put_shutdown_immediate (self ):
641
+ return await self ._put_shutdown (True )
642
+
643
+
644
+ async def _put_and_join_shutdown (self , immediate ):
645
+ q = self .q_class (2 )
627
646
results = []
628
647
go = asyncio .Event ()
648
+ await q .put ("Y" )
649
+ await q .put ("D" )
650
+ # queue fulled
629
651
630
- async def put_twice (q , go , msg ):
631
- await q .put (msg )
652
+ async def put_once (q , go , msg ):
632
653
await go .wait ()
633
654
try :
634
- await q .put (msg + 100 )
655
+ await q .put (msg )
635
656
results .append (False )
636
657
except asyncio .QueueShutDown :
637
658
results .append (True )
@@ -641,19 +662,31 @@ async def shutdown(q, go, immediate):
641
662
q .shutdown (immediate )
642
663
go .set ()
643
664
644
- async def join (q , delay ):
665
+ async def join (q , go ):
645
666
await go .wait ()
646
667
await q .join ()
647
668
results .append (True )
648
669
return True
649
670
650
- q = self .q_class (maxsize )
651
- t = [asyncio .create_task (put_twice (q , go , i + 1 )) for i in range (maxsize )]
652
- t += [asyncio .create_task (shutdown (q , go , True )),
653
- asyncio .create_task (join (q , go ))]
671
+ tasks = (
672
+ (put_once , (q , go , 'E' )),
673
+ (put_once , (q , go , 'W' )),
674
+ (join , (q , go )),
675
+ (join , (q , go )),
676
+ )
677
+ t = []
678
+ for coro , params in tasks :
679
+ t .append (asyncio .create_task (coro (* params )))
680
+ t .append (asyncio .create_task (shutdown (q , go , immediate )))
654
681
res = await asyncio .gather (* t )
655
682
656
- self .assertEqual (results , [True ]* (maxsize + 1 ))
683
+ self .assertEqual (results , [True ]* len (tasks ))
684
+
685
+ async def test_put_and_join_shutdown (self ):
686
+ return await self ._put_and_join_shutdown (False )
687
+
688
+ async def test_put_and_join_shutdown_immediate (self ):
689
+ return await self ._put_and_join_shutdown (True )
657
690
658
691
659
692
class QueueShutdownTests (
0 commit comments