@@ -71,43 +71,41 @@ def test_map_timeout(self):
71
71
72
72
self .assertEqual ([None , None ], results )
73
73
74
- def test_map_args (self ):
74
+ def test_map_with_buffersize (self ):
75
75
with self .assertRaisesRegex (ValueError , "buffersize must be None or >= 1." ):
76
76
self .executor .map (bool , [], buffersize = 0 )
77
77
with self .assertRaisesRegex (
78
78
ValueError , "cannot specify both buffersize and timeout."
79
79
):
80
80
self .executor .map (bool , [], timeout = 1 , buffersize = 1 )
81
81
82
- def test_map_infinite_iterable (self ):
82
+ it = range (4 )
83
+ self .assertEqual (
84
+ list (self .executor .map (str , it , buffersize = 1 )),
85
+ list (map (str , it )),
86
+ )
87
+
88
+ def test_map_with_buffersize_on_infinite_iterable (self ):
83
89
results = self .executor .map (str , itertools .count (1 ), buffersize = 1 )
84
90
self .assertEqual (next (iter (results )), "1" )
85
91
86
- def test_map_buffersize (self ):
87
- manager = Manager ()
92
+ def test_map_with_buffersize_on_iterable_smaller_than_buffer (self ):
93
+ it = range (2 )
94
+ results = self .executor .map (str , it , buffersize = 10 )
95
+ self .assertListEqual (list (results ), list (map (str , it )))
88
96
89
- for buffersize , iterable_size in [
90
- (1 , 5 ),
91
- (5 , 5 ),
92
- (10 , 5 ),
93
- ]:
94
- iterable = range (iterable_size )
95
- processed_elements = manager .list ()
96
-
97
- iterator = self .executor .map (
98
- processed_elements .append , iterable , buffersize = buffersize
99
- )
100
- time .sleep (1 ) # wait for buffered futures to finish
101
- self .assertSetEqual (
102
- set (processed_elements ),
103
- set (range (min (buffersize , iterable_size ))),
104
- )
105
- next (iterator )
106
- time .sleep (1 ) # wait for the created future to finish
107
- self .assertSetEqual (
108
- set (processed_elements ),
109
- set (range (min (buffersize + 1 , iterable_size ))),
110
- )
97
+ def test_map_with_buffersize_when_buffer_becomes_full (self ):
98
+ manager = Manager ()
99
+ iterable = range (8 )
100
+ buffersize = 4
101
+ buffered_results = manager .list ()
102
+ self .executor .map (buffered_results .append , iterable , buffersize = buffersize )
103
+ self .executor .shutdown (wait = True )
104
+ self .assertSetEqual (
105
+ set (buffered_results ),
106
+ set (itertools .islice (iterable , buffersize )),
107
+ msg = "only the first `buffersize` elements should be processed" ,
108
+ )
111
109
112
110
def test_shutdown_race_issue12456 (self ):
113
111
# Issue #12456: race condition at shutdown where trying to post a
0 commit comments