@@ -525,35 +525,29 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCa
525
525
class _QueueShutdownTestMixin :
526
526
q_class = None
527
527
528
- async def _get (self , q , go , results ):
529
- await go .wait ()
530
- try :
531
- msg = await q .get ()
532
- results .append (True )
533
- return msg
534
- except asyncio .QueueShutDown :
535
- results .append (False )
536
- return False
528
+ async def asyncSetUp (self ):
529
+ await super ().asyncSetUp ()
530
+ self .delay = 0.001
537
531
538
- async def _get_shutdown (self , q , go , results ):
532
+ async def _get (self , q , go , results , shutdown = False ):
539
533
await go .wait ()
540
534
try :
541
535
msg = await q .get ()
542
- results .append (False )
536
+ results .append (not shutdown )
543
537
return msg
544
538
except asyncio .QueueShutDown :
545
- results .append (True )
546
- return False
539
+ results .append (shutdown )
540
+ return shutdown
547
541
548
- async def _get_nowait (self , q , go , results ):
542
+ async def _get_nowait (self , q , go , results , shutdown = False ):
549
543
await go .wait ()
550
544
try :
551
545
msg = q .get_nowait ()
552
- results .append (True )
546
+ results .append (not shutdown )
553
547
return msg
554
548
except asyncio .QueueShutDown :
555
- results .append (False )
556
- return False
549
+ results .append (shutdown )
550
+ return shutdown
557
551
558
552
async def _get_task_done (self , q , go , results ):
559
553
await go .wait ()
@@ -566,64 +560,45 @@ async def _get_task_done(self, q, go, results):
566
560
results .append (False )
567
561
return False
568
562
569
- async def _get_nowait_shutdown (self , q , go , results ):
570
- await go .wait ()
571
- try :
572
- msg = q .get_nowait ()
573
- results .append (False )
574
- except asyncio .QueueShutDown :
575
- results .append (True )
576
- return True
577
-
578
- async def _put_shutdown (self , q , go , msg , results ):
563
+ async def _put (self , q , go , msg , results , shutdown = False ):
579
564
await go .wait ()
580
565
try :
581
566
await q .put (msg )
582
- results .append (False )
567
+ results .append (not shutdown )
568
+ return not shutdown
583
569
except asyncio .QueueShutDown :
584
- results .append (True )
585
- return msg
570
+ results .append (shutdown )
571
+ return shutdown
586
572
587
- async def _put_nowait_shutdown (self , q , go , msg , results ):
573
+ async def _put_nowait (self , q , go , msg , results , shutdown = False ):
588
574
await go .wait ()
589
575
try :
590
576
q .put_nowait (msg )
591
577
results .append (False )
578
+ return not shutdown
592
579
except asyncio .QueueShutDown :
593
580
results .append (True )
594
- return msg
581
+ return shutdown
595
582
596
583
async def _shutdown (self , q , go , immediate ):
597
- await asyncio .sleep (0.001 )
598
584
q .shutdown (immediate )
599
- await asyncio .sleep (0.001 )
585
+ await asyncio .sleep (self . delay )
600
586
go .set ()
601
- await asyncio .sleep (0.001 )
587
+ await asyncio .sleep (self . delay )
602
588
603
- async def _join (self , q , go , results ):
604
- await go .wait ()
589
+ async def _join (self , q , results , shutdown = False ):
605
590
try :
606
591
await q .join ()
607
- results .append (True )
592
+ results .append (not shutdown )
608
593
return True
609
594
except asyncio .QueueShutDown :
610
- results .append (False )
611
- return False
612
-
613
- async def _join_shutdown (self , q , go , results ):
614
- await go .wait ()
615
- try :
616
- await q .join ()
617
- results .append (False )
595
+ results .append (shutdown )
618
596
return False
619
- except asyncio .QueueShutDown :
620
- results .append (True )
621
- return True
622
597
except asyncio .CancelledError :
623
- results .append (True )
598
+ results .append (shutdown )
624
599
raise
625
600
626
- async def test_empty (self ):
601
+ async def test_shutdown_empty (self ):
627
602
q = self .q_class ()
628
603
q .shutdown ()
629
604
with self .assertRaises (
@@ -635,7 +610,7 @@ async def test_empty(self):
635
610
):
636
611
await q .get ()
637
612
638
- async def test_nonempty (self ):
613
+ async def test_shutdown_nonempty (self ):
639
614
q = self .q_class ()
640
615
q .put_nowait ("data" )
641
616
q .shutdown ()
@@ -645,7 +620,7 @@ async def test_nonempty(self):
645
620
):
646
621
await q .get ()
647
622
648
- async def test_immediate (self ):
623
+ async def test_shutdown_immediate (self ):
649
624
q = self .q_class ()
650
625
q .put_nowait ("data" )
651
626
q .shutdown (immediate = True )
@@ -679,8 +654,46 @@ async def test_shutdown_allowed_transitions(self):
679
654
q .shutdown (immediate = False )
680
655
self .assertNotEqual ("shutdown" , q ._shutdown_state .value )
681
656
657
+ async def _shutdown_all_methods (self , immediate ):
658
+ q = asyncio .Queue ()
659
+ await q .put ("L" )
660
+ q .put_nowait ("O" )
661
+ q .shutdown (immediate )
662
+ with self .assertRaises (asyncio .QueueShutDown ):
663
+ await q .put ("E" )
664
+ with self .assertRaises (asyncio .QueueShutDown ):
665
+ q .put_nowait ("W" )
666
+
667
+ if immediate :
668
+ with self .assertRaises (asyncio .QueueShutDown ):
669
+ await q .get ()
670
+ with self .assertRaises (asyncio .QueueShutDown ):
671
+ q .get_nowait ()
672
+ with self .assertRaises (asyncio .QueueShutDown ):
673
+ q .task_done ()
674
+ with self .assertRaises (asyncio .QueueShutDown ):
675
+ await q .join ()
676
+ else :
677
+ self .assertIn (await q .get (), "LO" )
678
+ q .task_done ()
679
+ self .assertIn (q .get_nowait (), "LO" )
680
+ q .task_done ()
681
+ await q .join ()
682
+ # on shutdown(immediate=False)
683
+ # when queue is empty, should raise ShutDown Exception
684
+ with self .assertRaises (asyncio .QueueShutDown ):
685
+ await q .get ()
686
+ with self .assertRaises (asyncio .QueueShutDown ):
687
+ q .get_nowait ()
688
+
689
+ async def test_shutdown_all_methods (self ):
690
+ return await self ._shutdown_all_methods (False )
691
+
692
+ async def test_shutdown_immediate_get (self ):
693
+ return await self ._shutdown_all_methods (True )
694
+
682
695
async def _shutdown_putters (self , immediate ):
683
- delay = 0.001
696
+ delay = self . delay
684
697
q = self .q_class (2 )
685
698
results = []
686
699
await q .put ("E" )
@@ -702,11 +715,13 @@ async def test_shutdown_immediate_putters_deque(self):
702
715
return await self ._shutdown_putters (True )
703
716
704
717
async def _shutdown_getters (self , immediate ):
705
- delay = 0.001
718
+ delay = self . delay
706
719
q = self .q_class (1 )
707
720
results = []
708
721
await q .put ("Y" )
722
+ nb = q .qsize ()
709
723
# queue full
724
+
710
725
asyncio .create_task (q .get ())
711
726
await asyncio .sleep (delay )
712
727
t = asyncio .create_task (q .get ())
@@ -723,14 +738,16 @@ async def _shutdown_getters(self, immediate):
723
738
q .shutdown (immediate )
724
739
await asyncio .sleep (delay )
725
740
self .assertTrue (q ._getters )
741
+ self .assertEqual (q ._unfinished_tasks , nb )
742
+
726
743
727
744
async def test_shutdown_getters_deque (self ):
728
745
return await self ._shutdown_getters (False )
729
746
730
747
async def test_shutdown_immediate_getters_deque (self ):
731
748
return await self ._shutdown_getters (True )
732
749
733
- async def _shutdown_get_nowait (self , immediate ):
750
+ async def _shutdown_get (self , immediate ):
734
751
q = self .q_class (2 )
735
752
results = []
736
753
go = asyncio .Event ()
@@ -741,11 +758,13 @@ async def _shutdown_get_nowait(self, immediate):
741
758
742
759
if immediate :
743
760
coros = (
744
- (self ._get_nowait_shutdown (q , go , results )),
745
- (self ._get_nowait_shutdown (q , go , results )),
761
+ (self ._get (q , go , results , shutdown = True )),
762
+ (self ._get_nowait (q , go , results , shutdown = True )),
746
763
)
747
764
else :
748
765
coros = (
766
+ # one of these tasks shoud raise Shutdown
767
+ (self ._get (q , go , results )),
749
768
(self ._get_nowait (q , go , results )),
750
769
(self ._get_nowait (q , go , results )),
751
770
)
@@ -754,20 +773,18 @@ async def _shutdown_get_nowait(self, immediate):
754
773
t .append (asyncio .create_task (coro ))
755
774
t .append (asyncio .create_task (self ._shutdown (q , go , immediate )))
756
775
res = await asyncio .gather (* t )
757
-
758
- self .assertEqual (results , [True ]* len (coros ))
759
- self .assertEqual (len (q ._putters ), 0 )
760
776
if immediate :
761
- self .assertEqual (len (q ._getters ), 0 )
762
- self .assertEqual (q ._unfinished_tasks , nb )
777
+ self .assertEqual (results , [True ]* len (coros ))
778
+ else :
779
+ self .assertListEqual (sorted (results ), [False ] + [True ]* (len (coros )- 1 ))
763
780
764
- async def test_shutdown_get_nowait (self ):
765
- return await self ._shutdown_get_nowait (False )
781
+ async def test_shutdown_get (self ):
782
+ return await self ._shutdown_get (False )
766
783
767
- async def test_shutdown_immediate_get_nowait (self ):
768
- return await self ._shutdown_get_nowait (True )
784
+ async def test_shutdown_immediate_get (self ):
785
+ return await self ._shutdown_get (True )
769
786
770
- async def test_shutdown_get_task_done_join (self , immediate = False ):
787
+ async def test_shutdown_get_task_done_join (self ):
771
788
q = self .q_class (2 )
772
789
results = []
773
790
go = asyncio .Event ()
@@ -780,8 +797,8 @@ async def test_shutdown_get_task_done_join(self, immediate=False):
780
797
coros = (
781
798
(self ._get_task_done (q , go , results )),
782
799
(self ._get_task_done (q , go , results )),
783
- (self ._join (q , go , results )),
784
- (self ._join (q , go , results )),
800
+ (self ._join (q , results )),
801
+ (self ._join (q , results )),
785
802
)
786
803
t = []
787
804
for coro in coros :
@@ -802,8 +819,8 @@ async def _shutdown_put(self, immediate):
802
819
# queue not empty
803
820
804
821
coros = (
805
- (self ._put_shutdown (q , go , "Y" , results )),
806
- (self ._put_nowait_shutdown (q , go , "D" , results )),
822
+ (self ._put (q , go , "Y" , results , shutdown = True )),
823
+ (self ._put_nowait (q , go , "D" , results , shutdown = True )),
807
824
)
808
825
t = []
809
826
for coro in coros :
@@ -835,9 +852,9 @@ async def _cancel_join_task(q, delay, t):
835
852
q ._finished .set ()
836
853
837
854
coros = (
838
- (self ._put_shutdown (q , go , "E" , results )),
839
- (self ._put_nowait_shutdown (q , go , "W" , results )),
840
- (self ._join_shutdown (q , go , results )),
855
+ (self ._put (q , go , "E" , results , shutdown = True )),
856
+ (self ._put_nowait (q , go , "W" , results , shutdown = True )),
857
+ (self ._join (q , results , shutdown = True )),
841
858
)
842
859
t = []
843
860
for coro in coros :
@@ -855,7 +872,7 @@ async def _cancel_join_task(q, delay, t):
855
872
self .assertEqual (results , [True ]* len (coros ))
856
873
self .assertTrue (q ._finished .is_set ())
857
874
858
- async def test_shutdown_put_and_join (self ):
875
+ async def test_shutdown_put_join (self ):
859
876
return await self ._shutdown_put_join (False )
860
877
861
878
async def test_shutdown_immediate_put_and_join (self ):
0 commit comments