From 45c3ec5327117f5b682c2989054997cec9c7c49a Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sun, 13 Oct 2024 00:31:39 +0100 Subject: [PATCH 01/42] bpo-29842: concurrent.futures.Executor.map: add buffersize param for lazy behavior --- Doc/library/concurrent.futures.rst | 12 +++-- Lib/concurrent/futures/_base.py | 32 +++++++++-- Lib/concurrent/futures/process.py | 11 ++-- Lib/test/test_concurrent_futures/test_pool.py | 53 +++++++++++++++++++ 4 files changed, 99 insertions(+), 9 deletions(-) create mode 100644 Lib/test/test_concurrent_futures/test_pool.py diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 5a950081a1c98d..e60ef18d2a6130 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -40,11 +40,13 @@ Executor Objects future = executor.submit(pow, 323, 1235) print(future.result()) - .. method:: map(fn, *iterables, timeout=None, chunksize=1) + .. method:: map(fn, *iterables, timeout=None, chunksize=1, buffersize=None) Similar to :func:`map(fn, *iterables) ` except: - * the *iterables* are collected immediately rather than lazily; + * the *iterables* are collected immediately rather than lazily, unless a + *buffersize* is specified: If the buffer is full, then the iteration + over *iterables* is paused until a result is yielded from the buffer. * *fn* is executed asynchronously and several calls to *fn* may be made concurrently. @@ -53,7 +55,8 @@ Executor Objects if :meth:`~iterator.__next__` is called and the result isn't available after *timeout* seconds from the original call to :meth:`Executor.map`. *timeout* can be an int or a float. If *timeout* is not specified or - ``None``, there is no limit to the wait time. + ``None``, there is no limit to the wait time. Incompatible with + *buffersize*. If a *fn* call raises an exception, then that exception will be raised when its value is retrieved from the iterator. @@ -70,6 +73,9 @@ Executor Objects .. versionchanged:: 3.5 Added the *chunksize* argument. + .. versionchanged:: 3.15 + Added the *buffersize* argument. + .. method:: shutdown(wait=True, *, cancel_futures=False) Signal the executor that it should free any resources that it is using diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 707fcdfde79acd..8be40236b8f391 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -4,10 +4,12 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import collections +from itertools import islice import logging import threading import time import types +import weakref FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' @@ -572,18 +574,22 @@ def submit(self, fn, /, *args, **kwargs): """ raise NotImplementedError() - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): """Returns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. + is no limit on the wait time. Incompatible with buffersize. chunksize: The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. + buffersize: The maximum number of results that can be buffered + before being yielded. If the buffer is full, the iteration over + iterables is paused until an element is yielded from the + buffer. Returns: An iterator equivalent to: map(func, *iterables) but the calls may @@ -594,10 +600,24 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): before the given timeout. Exception: If fn(*args) raises for any values. """ + if buffersize is not None and buffersize < 1: + raise ValueError("buffersize must be None or >= 1.") + + if buffersize is not None and timeout is not None: + raise ValueError("cannot specify both buffersize and timeout.") + if timeout is not None: end_time = timeout + time.monotonic() - fs = [self.submit(fn, *args) for args in zip(*iterables)] + args_iter = iter(zip(*iterables)) + if buffersize: + fs = collections.deque( + self.submit(fn, *args) for args in islice(args_iter, buffersize) + ) + else: + fs = [self.submit(fn, *args) for args in args_iter] + + executor_weakref = weakref.ref(self) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. @@ -606,6 +626,12 @@ def result_iterator(): # reverse to keep finishing order fs.reverse() while fs: + if ( + buffersize + and (executor := executor_weakref()) + and (args := next(args_iter, None)) + ): + fs.appendleft(executor.submit(fn, *args)) # Careful not to keep a reference to the popped future if timeout is None: yield _result_or_cancel(fs.pop()) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 42eee72bc1457f..a4a2b3054e5df8 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -805,17 +805,21 @@ def submit(self, fn, /, *args, **kwargs): return f submit.__doc__ = _base.Executor.submit.__doc__ - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): """Returns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. + is no limit on the wait time. Incompatible with buffersize. chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. + buffersize: The maximum number of results that can be buffered + before being yielded. If the buffer is full, the iteration over + iterables is paused until an element is yielded from the + buffer. Returns: An iterator equivalent to: map(func, *iterables) but the calls may @@ -831,7 +835,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): results = super().map(partial(_process_chunk, fn), itertools.batched(zip(*iterables), chunksize), - timeout=timeout) + timeout=timeout, + buffersize=buffersize) return _chain_from_iterable_of_lists(results) def shutdown(self, wait=True, *, cancel_futures=False): diff --git a/Lib/test/test_concurrent_futures/test_pool.py b/Lib/test/test_concurrent_futures/test_pool.py new file mode 100644 index 00000000000000..ffceccf7b13d32 --- /dev/null +++ b/Lib/test/test_concurrent_futures/test_pool.py @@ -0,0 +1,53 @@ +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from multiprocessing import Manager +import time +import unittest + +from .util import BaseTestCase, setup_module + + +class PoolExecutorTest(BaseTestCase): + def test_map_buffersize(self): + manager = Manager() + for ExecutorType in (ThreadPoolExecutor, ProcessPoolExecutor): + with ExecutorType(max_workers=1) as pool: + with self.assertRaisesRegex( + ValueError, "buffersize must be None or >= 1." + ): + pool.map(bool, [], buffersize=0) + with ExecutorType(max_workers=1) as pool: + with self.assertRaisesRegex( + ValueError, "cannot specify both buffersize and timeout." + ): + pool.map(bool, [], timeout=1, buffersize=1) + + for buffersize, iterable_size in [ + (1, 5), + (5, 5), + (10, 5), + ]: + iterable = range(iterable_size) + processed_elements = manager.list() + with ExecutorType(max_workers=1) as pool: + iterator = pool.map( + processed_elements.append, iterable, buffersize=buffersize + ) + time.sleep(0.2) # wait for buffered futures to finish + self.assertSetEqual( + set(processed_elements), + set(range(min(buffersize, iterable_size))), + ) + next(iterator) + time.sleep(0.2) # wait for the created future to finish + self.assertSetEqual( + set(processed_elements), + set(range(min(buffersize + 1, iterable_size))), + ) + + +def setUpModule(): + setup_module() + + +if __name__ == "__main__": + unittest.main() From bfb2c5cb8712aab7e9de88055dc2a4c9cde224d4 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Thu, 17 Oct 2024 23:23:17 +0100 Subject: [PATCH 02/42] test_map_buffersize: 1s sleep --- Lib/test/test_concurrent_futures/test_pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_pool.py b/Lib/test/test_concurrent_futures/test_pool.py index ffceccf7b13d32..6135a62ca4960b 100644 --- a/Lib/test/test_concurrent_futures/test_pool.py +++ b/Lib/test/test_concurrent_futures/test_pool.py @@ -32,13 +32,13 @@ def test_map_buffersize(self): iterator = pool.map( processed_elements.append, iterable, buffersize=buffersize ) - time.sleep(0.2) # wait for buffered futures to finish + time.sleep(1) # wait for buffered futures to finish self.assertSetEqual( set(processed_elements), set(range(min(buffersize, iterable_size))), ) next(iterator) - time.sleep(0.2) # wait for the created future to finish + time.sleep(1) # wait for the created future to finish self.assertSetEqual( set(processed_elements), set(range(min(buffersize + 1, iterable_size))), From 85396636834250de930e8de0b696a91fd7f052db Mon Sep 17 00:00:00 2001 From: ebonnal Date: Fri, 18 Oct 2024 10:50:22 +0100 Subject: [PATCH 03/42] mention chunksize in ProcessPoolExecutor's buffersize docstring --- Lib/concurrent/futures/_base.py | 2 +- Lib/concurrent/futures/process.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 8be40236b8f391..c3797eec1349ed 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -588,7 +588,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): ThreadPoolExecutor. buffersize: The maximum number of results that can be buffered before being yielded. If the buffer is full, the iteration over - iterables is paused until an element is yielded from the + iterables is paused until a result is yielded from the buffer. Returns: diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index a4a2b3054e5df8..3eba5a7295956d 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -816,9 +816,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. - buffersize: The maximum number of results that can be buffered + buffersize: The maximum number of result chunks that can be buffered before being yielded. If the buffer is full, the iteration over - iterables is paused until an element is yielded from the + iterables is paused until a result chunk is yielded from the buffer. Returns: From 022b8c63dd1c7914315c985f82c3f62f813e6a32 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Fri, 18 Oct 2024 10:51:23 +0100 Subject: [PATCH 04/42] merge unittest into ExecutorTest --- Lib/test/test_concurrent_futures/executor.py | 40 ++++++++++++++ Lib/test/test_concurrent_futures/test_pool.py | 53 ------------------- 2 files changed, 40 insertions(+), 53 deletions(-) delete mode 100644 Lib/test/test_concurrent_futures/test_pool.py diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index b97d9ffd94b1f8..969085c21ad419 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -1,3 +1,5 @@ +import itertools +from multiprocessing import Manager import threading import time import weakref @@ -71,6 +73,44 @@ def test_map_timeout(self): self.assertEqual([None, None], results) + def test_map_args(self): + with self.assertRaisesRegex(ValueError, "buffersize must be None or >= 1."): + self.executor.map(bool, [], buffersize=0) + with self.assertRaisesRegex( + ValueError, "cannot specify both buffersize and timeout." + ): + self.executor.map(bool, [], timeout=1, buffersize=1) + + def test_map_infinite_iterable(self): + results = self.executor.map(str, itertools.count(1), buffersize=1) + self.assertEqual(next(iter(results)), "1") + + def test_map_buffersize(self): + manager = Manager() + + for buffersize, iterable_size in [ + (1, 5), + (5, 5), + (10, 5), + ]: + iterable = range(iterable_size) + processed_elements = manager.list() + + iterator = self.executor.map( + processed_elements.append, iterable, buffersize=buffersize + ) + time.sleep(1) # wait for buffered futures to finish + self.assertSetEqual( + set(processed_elements), + set(range(min(buffersize, iterable_size))), + ) + next(iterator) + time.sleep(1) # wait for the created future to finish + self.assertSetEqual( + set(processed_elements), + set(range(min(buffersize + 1, iterable_size))), + ) + def test_shutdown_race_issue12456(self): # Issue #12456: race condition at shutdown where trying to post a # sentinel in the call queue blocks (the queue is full while processes diff --git a/Lib/test/test_concurrent_futures/test_pool.py b/Lib/test/test_concurrent_futures/test_pool.py deleted file mode 100644 index 6135a62ca4960b..00000000000000 --- a/Lib/test/test_concurrent_futures/test_pool.py +++ /dev/null @@ -1,53 +0,0 @@ -from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor -from multiprocessing import Manager -import time -import unittest - -from .util import BaseTestCase, setup_module - - -class PoolExecutorTest(BaseTestCase): - def test_map_buffersize(self): - manager = Manager() - for ExecutorType in (ThreadPoolExecutor, ProcessPoolExecutor): - with ExecutorType(max_workers=1) as pool: - with self.assertRaisesRegex( - ValueError, "buffersize must be None or >= 1." - ): - pool.map(bool, [], buffersize=0) - with ExecutorType(max_workers=1) as pool: - with self.assertRaisesRegex( - ValueError, "cannot specify both buffersize and timeout." - ): - pool.map(bool, [], timeout=1, buffersize=1) - - for buffersize, iterable_size in [ - (1, 5), - (5, 5), - (10, 5), - ]: - iterable = range(iterable_size) - processed_elements = manager.list() - with ExecutorType(max_workers=1) as pool: - iterator = pool.map( - processed_elements.append, iterable, buffersize=buffersize - ) - time.sleep(1) # wait for buffered futures to finish - self.assertSetEqual( - set(processed_elements), - set(range(min(buffersize, iterable_size))), - ) - next(iterator) - time.sleep(1) # wait for the created future to finish - self.assertSetEqual( - set(processed_elements), - set(range(min(buffersize + 1, iterable_size))), - ) - - -def setUpModule(): - setup_module() - - -if __name__ == "__main__": - unittest.main() From 7ced7877885438aff732e1d691de7b5c687caa00 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Fri, 18 Oct 2024 11:18:24 +0100 Subject: [PATCH 05/42] fix versionchanged --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index e60ef18d2a6130..2166f68bf6236f 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -73,7 +73,7 @@ Executor Objects .. versionchanged:: 3.5 Added the *chunksize* argument. - .. versionchanged:: 3.15 + .. versionchanged:: 3.14 Added the *buffersize* argument. .. method:: shutdown(wait=True, *, cancel_futures=False) From cb5f26e211901aeccba4a46a7844ebe26e4778a8 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Fri, 18 Oct 2024 10:27:55 +0000 Subject: [PATCH 06/42] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst diff --git a/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst new file mode 100644 index 00000000000000..67c54ae9ca90d8 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst @@ -0,0 +1 @@ +Add a ``buffersize`` parameter to :meth:`concurrent.futures.Executor.map`. From f46ebe6e396cd28ea9e805def29fa2366f43f264 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Fri, 18 Oct 2024 14:16:07 +0100 Subject: [PATCH 07/42] fix tests determinism --- Lib/test/test_concurrent_futures/executor.py | 50 ++++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 969085c21ad419..a304d9ade7a5b6 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -73,7 +73,7 @@ def test_map_timeout(self): self.assertEqual([None, None], results) - def test_map_args(self): + def test_map_with_buffersize(self): with self.assertRaisesRegex(ValueError, "buffersize must be None or >= 1."): self.executor.map(bool, [], buffersize=0) with self.assertRaisesRegex( @@ -81,35 +81,33 @@ def test_map_args(self): ): self.executor.map(bool, [], timeout=1, buffersize=1) - def test_map_infinite_iterable(self): + it = range(4) + self.assertEqual( + list(self.executor.map(str, it, buffersize=1)), + list(map(str, it)), + ) + + def test_map_with_buffersize_on_infinite_iterable(self): results = self.executor.map(str, itertools.count(1), buffersize=1) self.assertEqual(next(iter(results)), "1") - def test_map_buffersize(self): - manager = Manager() + def test_map_with_buffersize_on_iterable_smaller_than_buffer(self): + it = range(2) + results = self.executor.map(str, it, buffersize=10) + self.assertListEqual(list(results), list(map(str, it))) - for buffersize, iterable_size in [ - (1, 5), - (5, 5), - (10, 5), - ]: - iterable = range(iterable_size) - processed_elements = manager.list() - - iterator = self.executor.map( - processed_elements.append, iterable, buffersize=buffersize - ) - time.sleep(1) # wait for buffered futures to finish - self.assertSetEqual( - set(processed_elements), - set(range(min(buffersize, iterable_size))), - ) - next(iterator) - time.sleep(1) # wait for the created future to finish - self.assertSetEqual( - set(processed_elements), - set(range(min(buffersize + 1, iterable_size))), - ) + def test_map_with_buffersize_when_buffer_becomes_full(self): + manager = Manager() + iterable = range(8) + buffersize = 4 + buffered_results = manager.list() + self.executor.map(buffered_results.append, iterable, buffersize=buffersize) + self.executor.shutdown(wait=True) + self.assertSetEqual( + set(buffered_results), + set(itertools.islice(iterable, buffersize)), + msg="only the first `buffersize` elements should be processed", + ) def test_shutdown_race_issue12456(self): # Issue #12456: race condition at shutdown where trying to post a From eb26e8608aefc935b5e61d4ad13bc96be124812e Mon Sep 17 00:00:00 2001 From: ebonnal Date: Fri, 18 Oct 2024 14:45:14 +0100 Subject: [PATCH 08/42] add test_map_with_buffersize_on_empty_iterable --- Lib/test/test_concurrent_futures/executor.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index a304d9ade7a5b6..88db0fb8e7749c 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -96,6 +96,11 @@ def test_map_with_buffersize_on_iterable_smaller_than_buffer(self): results = self.executor.map(str, it, buffersize=10) self.assertListEqual(list(results), list(map(str, it))) + def test_map_with_buffersize_on_empty_iterable(self): + it = iter([]) + results = self.executor.map(str, it, buffersize=10) + self.assertListEqual(list(results), []) + def test_map_with_buffersize_when_buffer_becomes_full(self): manager = Manager() iterable = range(8) From 0821f9552936ab68d3e658d03132d62cb9898165 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Fri, 18 Oct 2024 14:45:50 +0100 Subject: [PATCH 09/42] allow timeout + buffersize --- Doc/library/concurrent.futures.rst | 3 +-- Lib/concurrent/futures/_base.py | 5 +---- Lib/concurrent/futures/process.py | 2 +- Lib/test/test_concurrent_futures/executor.py | 10 ++++++---- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 2166f68bf6236f..fc93c19ce57d08 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -55,8 +55,7 @@ Executor Objects if :meth:`~iterator.__next__` is called and the result isn't available after *timeout* seconds from the original call to :meth:`Executor.map`. *timeout* can be an int or a float. If *timeout* is not specified or - ``None``, there is no limit to the wait time. Incompatible with - *buffersize*. + ``None``, there is no limit to the wait time. If a *fn* call raises an exception, then that exception will be raised when its value is retrieved from the iterator. diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index c3797eec1349ed..82dbc7644a1a37 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -581,7 +581,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. Incompatible with buffersize. + is no limit on the wait time. chunksize: The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by @@ -603,9 +603,6 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): if buffersize is not None and buffersize < 1: raise ValueError("buffersize must be None or >= 1.") - if buffersize is not None and timeout is not None: - raise ValueError("cannot specify both buffersize and timeout.") - if timeout is not None: end_time = timeout + time.monotonic() diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 3eba5a7295956d..f7df70b308a789 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -812,7 +812,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. Incompatible with buffersize. + is no limit on the wait time. chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 88db0fb8e7749c..baeffb8dc15d5f 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -76,10 +76,6 @@ def test_map_timeout(self): def test_map_with_buffersize(self): with self.assertRaisesRegex(ValueError, "buffersize must be None or >= 1."): self.executor.map(bool, [], buffersize=0) - with self.assertRaisesRegex( - ValueError, "cannot specify both buffersize and timeout." - ): - self.executor.map(bool, [], timeout=1, buffersize=1) it = range(4) self.assertEqual( @@ -87,6 +83,12 @@ def test_map_with_buffersize(self): list(map(str, it)), ) + def test_map_with_buffersize_and_timeout(self): + it = self.executor.map(time.sleep, (0, 1), timeout=0.5) + next(it) + with self.assertRaises(TimeoutError): + next(it) + def test_map_with_buffersize_on_infinite_iterable(self): results = self.executor.map(str, itertools.count(1), buffersize=1) self.assertEqual(next(iter(results)), "1") From d95c55bdb63b7b78665794cef33b08063f88609d Mon Sep 17 00:00:00 2001 From: ebonnal Date: Fri, 18 Oct 2024 14:51:06 +0100 Subject: [PATCH 10/42] lint import --- Lib/test/test_concurrent_futures/executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index baeffb8dc15d5f..463ddca5c4c4e9 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -1,5 +1,5 @@ import itertools -from multiprocessing import Manager +import multiprocessing import threading import time import weakref @@ -104,7 +104,7 @@ def test_map_with_buffersize_on_empty_iterable(self): self.assertListEqual(list(results), []) def test_map_with_buffersize_when_buffer_becomes_full(self): - manager = Manager() + manager = multiprocessing.Manager() iterable = range(8) buffersize = 4 buffered_results = manager.list() From c80f4663002f77671714ef8211bd183fb4752b45 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Fri, 18 Oct 2024 14:57:35 +0100 Subject: [PATCH 11/42] tests: polish --- Lib/test/test_concurrent_futures/executor.py | 31 ++++++++++---------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 463ddca5c4c4e9..4fdc72e50b8112 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -74,40 +74,41 @@ def test_map_timeout(self): self.assertEqual([None, None], results) def test_map_with_buffersize(self): + iterable = range(4) with self.assertRaisesRegex(ValueError, "buffersize must be None or >= 1."): - self.executor.map(bool, [], buffersize=0) - - it = range(4) + self.executor.map(bool, iterable, buffersize=0) + self.assertEqual( + list(self.executor.map(str, iterable, buffersize=1)), + list(map(str, iterable)), + ) self.assertEqual( - list(self.executor.map(str, it, buffersize=1)), - list(map(str, it)), + list(self.executor.map(str, iterable, buffersize=2)), + list(map(str, iterable)), ) def test_map_with_buffersize_and_timeout(self): - it = self.executor.map(time.sleep, (0, 1), timeout=0.5) - next(it) + results = self.executor.map(time.sleep, (0, 1), timeout=0.5) + next(results) with self.assertRaises(TimeoutError): - next(it) + next(results) def test_map_with_buffersize_on_infinite_iterable(self): results = self.executor.map(str, itertools.count(1), buffersize=1) self.assertEqual(next(iter(results)), "1") def test_map_with_buffersize_on_iterable_smaller_than_buffer(self): - it = range(2) - results = self.executor.map(str, it, buffersize=10) - self.assertListEqual(list(results), list(map(str, it))) + iterable = range(2) + results = self.executor.map(str, iterable, buffersize=8) + self.assertListEqual(list(results), list(map(str, iterable))) def test_map_with_buffersize_on_empty_iterable(self): - it = iter([]) - results = self.executor.map(str, it, buffersize=10) + results = self.executor.map(str, [], buffersize=8) self.assertListEqual(list(results), []) def test_map_with_buffersize_when_buffer_becomes_full(self): - manager = multiprocessing.Manager() iterable = range(8) buffersize = 4 - buffered_results = manager.list() + buffered_results = multiprocessing.Manager().list() self.executor.map(buffered_results.append, iterable, buffersize=buffersize) self.executor.shutdown(wait=True) self.assertSetEqual( From 90e6d7cfc64b0a73f293644f649d8f90a4eadb98 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Fri, 25 Oct 2024 13:21:36 +0100 Subject: [PATCH 12/42] rephrase docstring --- Lib/concurrent/futures/_base.py | 7 +++---- Lib/concurrent/futures/process.py | 7 +++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 82dbc7644a1a37..73341194d96da8 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -586,10 +586,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. - buffersize: The maximum number of results that can be buffered - before being yielded. If the buffer is full, the iteration over - iterables is paused until a result is yielded from the - buffer. + buffersize: The number of results that can be buffered before being + yielded. If the buffer is full, the iteration over iterables + is paused until a result is yielded from the buffer. Returns: An iterator equivalent to: map(func, *iterables) but the calls may diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index f7df70b308a789..1a21ad40fd57f9 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -816,10 +816,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. - buffersize: The maximum number of result chunks that can be buffered - before being yielded. If the buffer is full, the iteration over - iterables is paused until a result chunk is yielded from the - buffer. + buffersize: The number of result chunks that can be buffered before + being yielded. If the buffer is full, the iteration over iterables + is paused until a result chunk is yielded from the buffer. Returns: An iterator equivalent to: map(func, *iterables) but the calls may From ab9169430d88109e39d286ec223d4b3144b56894 Mon Sep 17 00:00:00 2001 From: Enzo Bonnal Date: Tue, 3 Dec 2024 13:57:14 +0000 Subject: [PATCH 13/42] fix Doc/library/concurrent.futures.rst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index fc93c19ce57d08..1a0a441c109b5a 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -72,7 +72,7 @@ Executor Objects .. versionchanged:: 3.5 Added the *chunksize* argument. - .. versionchanged:: 3.14 + .. versionchanged:: next Added the *buffersize* argument. .. method:: shutdown(wait=True, *, cancel_futures=False) From 01b8adf74d7d50f4cb91fb52d7fb5bedf9e17adf Mon Sep 17 00:00:00 2001 From: Enzo Bonnal Date: Tue, 3 Dec 2024 13:58:26 +0000 Subject: [PATCH 14/42] reorder imports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- Lib/concurrent/futures/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 73341194d96da8..095d6f3ea1b34a 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -4,12 +4,12 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import collections -from itertools import islice import logging import threading import time import types import weakref +from itertools import islice FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' From 2f8a63f24a4d4190b03ca152825373171ae4cc3e Mon Sep 17 00:00:00 2001 From: ebonnal Date: Tue, 3 Dec 2024 15:10:04 +0000 Subject: [PATCH 15/42] rephrase buffersize's ValueError --- Lib/concurrent/futures/_base.py | 2 +- Lib/test/test_concurrent_futures/executor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 095d6f3ea1b34a..0ff5ebad59179c 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -600,7 +600,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): Exception: If fn(*args) raises for any values. """ if buffersize is not None and buffersize < 1: - raise ValueError("buffersize must be None or >= 1.") + raise ValueError("buffersize must be None or > 0.") if timeout is not None: end_time = timeout + time.monotonic() diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 4fdc72e50b8112..708a5b6765cc1c 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -75,7 +75,7 @@ def test_map_timeout(self): def test_map_with_buffersize(self): iterable = range(4) - with self.assertRaisesRegex(ValueError, "buffersize must be None or >= 1."): + with self.assertRaisesRegex(ValueError, "buffersize must be None or > 0."): self.executor.map(bool, iterable, buffersize=0) self.assertEqual( list(self.executor.map(str, iterable, buffersize=1)), From 1fb53a5e129f11c657080cfe0096e9dfb34ba1ff Mon Sep 17 00:00:00 2001 From: ebonnal Date: Tue, 3 Dec 2024 16:23:48 +0000 Subject: [PATCH 16/42] update 3.14.rst --- Doc/whatsnew/3.14.rst | 4 ++++ .../Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 72abfebd46f2b9..bdb0655ccad0e2 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -324,6 +324,10 @@ concurrent.futures supplying a *mp_context* to :class:`concurrent.futures.ProcessPoolExecutor`. (Contributed by Gregory P. Smith in :gh:`84559`.) +* Introduced the optional ``buffersize`` parameter to :meth:`concurrent.futures.Executor.map`, + which specifies the number of results that can be buffered before being yielded. + (Contributed by Enzo Bonnal and Josh Rosenberg in :gh:`74028`.) + ctypes ------ diff --git a/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst index 67c54ae9ca90d8..1c001f6e6c57f3 100644 --- a/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst +++ b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst @@ -1 +1 @@ -Add a ``buffersize`` parameter to :meth:`concurrent.futures.Executor.map`. +Add an optional ``buffersize`` parameter to :meth:`concurrent.futures.Executor.map`. \ No newline at end of file From 365c85dfd217e5c6be724e1841257af0185cfda6 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Tue, 3 Dec 2024 16:29:25 +0000 Subject: [PATCH 17/42] edit docstring --- Lib/concurrent/futures/_base.py | 5 +++-- Lib/concurrent/futures/process.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 0ff5ebad59179c..b35294eb60888f 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -587,8 +587,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. buffersize: The number of results that can be buffered before being - yielded. If the buffer is full, the iteration over iterables - is paused until a result is yielded from the buffer. + yielded. When the buffer is full, iteration over the input + iterables is paused until a result is yielded from the buffer. + If None, buffering is unlimited. Returns: An iterator equivalent to: map(func, *iterables) but the calls may diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 1a21ad40fd57f9..1cfdf2937f77d1 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -817,8 +817,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. buffersize: The number of result chunks that can be buffered before - being yielded. If the buffer is full, the iteration over iterables - is paused until a result chunk is yielded from the buffer. + being yielded. When the buffer is full, iteration over the input + iterables is paused until a result chunk is yielded from the + buffer. If None, buffering is unlimited. Returns: An iterator equivalent to: map(func, *iterables) but the calls may From bf5f838811ec11e6f7fb759eb0b165c279411ccf Mon Sep 17 00:00:00 2001 From: ebonnal Date: Tue, 3 Dec 2024 18:16:23 +0000 Subject: [PATCH 18/42] lint --- Lib/concurrent/futures/process.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 1cfdf2937f77d1..cf795a32e27b66 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -817,9 +817,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. buffersize: The number of result chunks that can be buffered before - being yielded. When the buffer is full, iteration over the input - iterables is paused until a result chunk is yielded from the - buffer. If None, buffering is unlimited. + being yielded. When the buffer is full, iteration over the + input iterables is paused until a result chunk is yielded from + the buffer. If None, buffering is unlimited. Returns: An iterator equivalent to: map(func, *iterables) but the calls may From a0057f1cdbf00164efc1b12c6585dbb074758824 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Tue, 3 Dec 2024 22:34:38 +0000 Subject: [PATCH 19/42] lint --- Doc/whatsnew/3.14.rst | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index bdb0655ccad0e2..1bc7ecf5651290 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -324,8 +324,9 @@ concurrent.futures supplying a *mp_context* to :class:`concurrent.futures.ProcessPoolExecutor`. (Contributed by Gregory P. Smith in :gh:`84559`.) -* Introduced the optional ``buffersize`` parameter to :meth:`concurrent.futures.Executor.map`, - which specifies the number of results that can be buffered before being yielded. +* Introduced the optional ``buffersize`` parameter to + :meth:`concurrent.futures.Executor.map`, which specifies the number + of results that can be buffered before being yielded. (Contributed by Enzo Bonnal and Josh Rosenberg in :gh:`74028`.) ctypes From 1aa1275e41aee01dc4450008ad12b98ba678ab1d Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 4 Dec 2024 00:21:54 +0000 Subject: [PATCH 20/42] comment on weakref --- Lib/concurrent/futures/_base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index b35294eb60888f..547922830605e8 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -614,6 +614,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): else: fs = [self.submit(fn, *args) for args in args_iter] + # use a weak reference to ensure that the executor can be garbage + # collected independently of the result_iterator closure. executor_weakref = weakref.ref(self) # Yield must be hidden in closure so that the futures are submitted From e0a9a9e2380e1cfeebc81104f744aa94ea2cf1ec Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 4 Dec 2024 00:22:04 +0000 Subject: [PATCH 21/42] lint --- Doc/whatsnew/3.14.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 1bc7ecf5651290..4de27d9fad375f 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -317,6 +317,7 @@ concurrent.futures same process) to Python code. This is separate from the proposed API in :pep:`734`. (Contributed by Eric Snow in :gh:`124548`.) + * The default ``ProcessPoolExecutor`` start method (see :ref:`multiprocessing-start-methods`) changed from *fork* to *forkserver* on platforms other than macOS & Windows. If you require the threading From 8d6ea975636e36cf5395a6fbf204633e82d5e054 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 4 Dec 2024 01:45:13 +0000 Subject: [PATCH 22/42] test_map_with_buffersize_when_buffer_becomes_full: avoid using multiprocessing.Manager().list() to work for InterpreterPoolExecutor too --- Lib/test/test_concurrent_futures/executor.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 708a5b6765cc1c..1c26acdef3bc68 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -1,5 +1,4 @@ import itertools -import multiprocessing import threading import time import weakref @@ -106,14 +105,13 @@ def test_map_with_buffersize_on_empty_iterable(self): self.assertListEqual(list(results), []) def test_map_with_buffersize_when_buffer_becomes_full(self): - iterable = range(8) + iterator = iter(range(8)) buffersize = 4 - buffered_results = multiprocessing.Manager().list() - self.executor.map(buffered_results.append, iterable, buffersize=buffersize) + self.executor.map(str, iterator, buffersize=buffersize) self.executor.shutdown(wait=True) - self.assertSetEqual( - set(buffered_results), - set(itertools.islice(iterable, buffersize)), + self.assertEqual( + next(iterator), + buffersize, msg="only the first `buffersize` elements should be processed", ) From 612486842577fcd9a43670e55c0ab419f0982320 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 4 Dec 2024 02:02:17 +0000 Subject: [PATCH 23/42] lint --- .../next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst index 1c001f6e6c57f3..55c2eee77ace61 100644 --- a/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst +++ b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst @@ -1 +1 @@ -Add an optional ``buffersize`` parameter to :meth:`concurrent.futures.Executor.map`. \ No newline at end of file +Add an optional ``buffersize`` parameter to :meth:`concurrent.futures.Executor.map`. From c11276fe60d0e28483dd2cb46bbe8d161c375ed6 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 4 Dec 2024 02:48:43 +0000 Subject: [PATCH 24/42] test_map_with_buffersize_and_timeout: avoid sleeping 0 seconds for win32 --- Lib/test/test_concurrent_futures/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 1c26acdef3bc68..6816109b668259 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -86,7 +86,7 @@ def test_map_with_buffersize(self): ) def test_map_with_buffersize_and_timeout(self): - results = self.executor.map(time.sleep, (0, 1), timeout=0.5) + results = self.executor.map(time.sleep, (0.1, 1), timeout=1) next(results) with self.assertRaises(TimeoutError): next(results) From ebb5337a7cc48936d5bf232a70dd49bda5b70b69 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 4 Dec 2024 02:51:33 +0000 Subject: [PATCH 25/42] remove test_map_with_buffersize_and_timeoutthat does not improve coverage in any way and is not deterministic on windows --- Lib/test/test_concurrent_futures/executor.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 6816109b668259..9016d7bb1f888c 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -85,12 +85,6 @@ def test_map_with_buffersize(self): list(map(str, iterable)), ) - def test_map_with_buffersize_and_timeout(self): - results = self.executor.map(time.sleep, (0.1, 1), timeout=1) - next(results) - with self.assertRaises(TimeoutError): - next(results) - def test_map_with_buffersize_on_infinite_iterable(self): results = self.executor.map(str, itertools.count(1), buffersize=1) self.assertEqual(next(iter(results)), "1") From 602968c4be80cbae6af6abf4aff023988efce46c Mon Sep 17 00:00:00 2001 From: ebonnal Date: Thu, 5 Dec 2024 15:59:13 +0000 Subject: [PATCH 26/42] extend unittesting to no and multiple input iterables --- Lib/test/test_concurrent_futures/executor.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 9016d7bb1f888c..8093ec2337190d 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -78,13 +78,24 @@ def test_map_with_buffersize(self): self.executor.map(bool, iterable, buffersize=0) self.assertEqual( list(self.executor.map(str, iterable, buffersize=1)), - list(map(str, iterable)), + ["0", "1", "2", "3"], ) self.assertEqual( list(self.executor.map(str, iterable, buffersize=2)), - list(map(str, iterable)), + ["0", "1", "2", "3"], ) + # test with multiple input iterables + self.assertEqual( + list(self.executor.map(int.__add__, iterable, iterable, buffersize=2)), + [0, 2, 4, 6], + ) + + # test without input iterable + no_result = self.executor.map(bool, buffersize=2) + with self.assertRaises(StopIteration): + next(no_result) + def test_map_with_buffersize_on_infinite_iterable(self): results = self.executor.map(str, itertools.count(1), buffersize=1) self.assertEqual(next(iter(results)), "1") @@ -92,7 +103,7 @@ def test_map_with_buffersize_on_infinite_iterable(self): def test_map_with_buffersize_on_iterable_smaller_than_buffer(self): iterable = range(2) results = self.executor.map(str, iterable, buffersize=8) - self.assertListEqual(list(results), list(map(str, iterable))) + self.assertListEqual(list(results), ["0", "1"]) def test_map_with_buffersize_on_empty_iterable(self): results = self.executor.map(str, [], buffersize=8) From b14e3680e95d05f80f50b2908263a53d128d5445 Mon Sep 17 00:00:00 2001 From: Enzo Bonnal Date: Thu, 5 Dec 2024 19:07:35 +0000 Subject: [PATCH 27/42] Update Lib/concurrent/futures/_base.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- Lib/concurrent/futures/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 547922830605e8..51ed1f06c92b46 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -614,7 +614,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): else: fs = [self.submit(fn, *args) for args in args_iter] - # use a weak reference to ensure that the executor can be garbage + # Use a weak reference to ensure that the executor can be garbage # collected independently of the result_iterator closure. executor_weakref = weakref.ref(self) From d37ce09968b066c5991fec8e28e6091152ed90b9 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Thu, 5 Dec 2024 20:37:34 +0000 Subject: [PATCH 28/42] rename args_iter -> zipped_iterables --- Lib/concurrent/futures/_base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 51ed1f06c92b46..230427a6790147 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -606,13 +606,13 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): if timeout is not None: end_time = timeout + time.monotonic() - args_iter = iter(zip(*iterables)) + zipped_iterables = iter(zip(*iterables)) if buffersize: fs = collections.deque( - self.submit(fn, *args) for args in islice(args_iter, buffersize) + self.submit(fn, *args) for args in islice(zipped_iterables, buffersize) ) else: - fs = [self.submit(fn, *args) for args in args_iter] + fs = [self.submit(fn, *args) for args in zipped_iterables] # Use a weak reference to ensure that the executor can be garbage # collected independently of the result_iterator closure. @@ -628,7 +628,7 @@ def result_iterator(): if ( buffersize and (executor := executor_weakref()) - and (args := next(args_iter, None)) + and (args := next(zipped_iterables, None)) ): fs.appendleft(executor.submit(fn, *args)) # Careful not to keep a reference to the popped future From cdf239cd578930ae6402043d4184d31d6db8edba Mon Sep 17 00:00:00 2001 From: ebonnal Date: Tue, 10 Dec 2024 15:06:54 +0000 Subject: [PATCH 29/42] remove period at end of error message --- Lib/concurrent/futures/_base.py | 2 +- Lib/test/test_concurrent_futures/executor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 230427a6790147..649c80cb978818 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -601,7 +601,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): Exception: If fn(*args) raises for any values. """ if buffersize is not None and buffersize < 1: - raise ValueError("buffersize must be None or > 0.") + raise ValueError("buffersize must be None or > 0") if timeout is not None: end_time = timeout + time.monotonic() diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 8093ec2337190d..2868f9679a14b5 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -74,7 +74,7 @@ def test_map_timeout(self): def test_map_with_buffersize(self): iterable = range(4) - with self.assertRaisesRegex(ValueError, "buffersize must be None or > 0."): + with self.assertRaisesRegex(ValueError, "buffersize must be None or > 0"): self.executor.map(bool, iterable, buffersize=0) self.assertEqual( list(self.executor.map(str, iterable, buffersize=1)), From 0a497845ce575855bd9716bb337a5f9828c361e5 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 16 Dec 2024 18:17:18 +0000 Subject: [PATCH 30/42] unit tests: merge into a single test method with test messages --- Lib/test/test_concurrent_futures/executor.py | 89 ++++++++++---------- 1 file changed, 45 insertions(+), 44 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 2868f9679a14b5..46d07aba4f8cbe 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -3,6 +3,7 @@ import time import weakref from concurrent import futures +from operator import add from test import support from test.support import Py_GIL_DISABLED @@ -72,52 +73,52 @@ def test_map_timeout(self): self.assertEqual([None, None], results) - def test_map_with_buffersize(self): - iterable = range(4) - with self.assertRaisesRegex(ValueError, "buffersize must be None or > 0"): - self.executor.map(bool, iterable, buffersize=0) + def test_map_buffersize(self): + integers = range(8) + + with self.assertRaisesRegex( + ValueError, + "buffersize must be None or > 0", + msg="`Executor.map` should raise if `buffersize` is not positive.", + ): + self.executor.map(str, integers, buffersize=0) + + for buffersize in (1, 2, len(integers), len(integers) * 2): + self.assertEqual( + list(self.executor.map(str, integers, buffersize=buffersize)), + list(map(str, integers)), + msg="`Executor.map` with `buffersize` should behave the same as `map`.", + ) + + self.assertEqual( + list(self.executor.map(add, integers, integers, buffersize=buffersize)), + list(map(sum, zip(integers, integers))), + msg="`Executor.map` with `buffersize` should work correctly on multiple input iterables", + ) + + self.assertEqual( + next(self.executor.map(str, itertools.count(), buffersize=buffersize)), + next(map(str, integers)), + msg="`Executor.map` with `buffersize` should work correctly on an infinite input iterator.", + ) + + self.assertFalse( + list(self.executor.map(str, [], buffersize=buffersize)), + msg="`Executor.map` with `buffersize` should return an empty iterator if the input iterable is empty.", + ) + + self.assertFalse( + list(self.executor.map(str, buffersize=buffersize)), + msg="`Executor.map` with `buffersize` should return an empty iterator if no input iterable is provided.", + ) + + integers_iter = iter(integers) + self.executor.map(str, integers_iter, buffersize=buffersize) + self.executor.shutdown(wait=True) # wait for pending tasks to complete self.assertEqual( - list(self.executor.map(str, iterable, buffersize=1)), - ["0", "1", "2", "3"], - ) - self.assertEqual( - list(self.executor.map(str, iterable, buffersize=2)), - ["0", "1", "2", "3"], - ) - - # test with multiple input iterables - self.assertEqual( - list(self.executor.map(int.__add__, iterable, iterable, buffersize=2)), - [0, 2, 4, 6], - ) - - # test without input iterable - no_result = self.executor.map(bool, buffersize=2) - with self.assertRaises(StopIteration): - next(no_result) - - def test_map_with_buffersize_on_infinite_iterable(self): - results = self.executor.map(str, itertools.count(1), buffersize=1) - self.assertEqual(next(iter(results)), "1") - - def test_map_with_buffersize_on_iterable_smaller_than_buffer(self): - iterable = range(2) - results = self.executor.map(str, iterable, buffersize=8) - self.assertListEqual(list(results), ["0", "1"]) - - def test_map_with_buffersize_on_empty_iterable(self): - results = self.executor.map(str, [], buffersize=8) - self.assertListEqual(list(results), []) - - def test_map_with_buffersize_when_buffer_becomes_full(self): - iterator = iter(range(8)) - buffersize = 4 - self.executor.map(str, iterator, buffersize=buffersize) - self.executor.shutdown(wait=True) - self.assertEqual( - next(iterator), + next(integers_iter), buffersize, - msg="only the first `buffersize` elements should be processed", + msg="`Executor.map` should pull only the first `buffersize` elements from the input iterable.", ) def test_shutdown_race_issue12456(self): From 178d6fe711cc357a6fd874e49a787bfe8e2b5353 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 16 Dec 2024 19:33:26 +0000 Subject: [PATCH 31/42] apply review on tests format --- Lib/test/test_concurrent_futures/executor.py | 76 ++++++++++---------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 46d07aba4f8cbe..f4600ffb9a2988 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -73,52 +73,48 @@ def test_map_timeout(self): self.assertEqual([None, None], results) - def test_map_buffersize(self): - integers = range(8) - + def test_map_buffersize_validation(self): with self.assertRaisesRegex( ValueError, "buffersize must be None or > 0", - msg="`Executor.map` should raise if `buffersize` is not positive.", ): - self.executor.map(str, integers, buffersize=0) - - for buffersize in (1, 2, len(integers), len(integers) * 2): - self.assertEqual( - list(self.executor.map(str, integers, buffersize=buffersize)), - list(map(str, integers)), - msg="`Executor.map` with `buffersize` should behave the same as `map`.", - ) - - self.assertEqual( - list(self.executor.map(add, integers, integers, buffersize=buffersize)), - list(map(sum, zip(integers, integers))), - msg="`Executor.map` with `buffersize` should work correctly on multiple input iterables", - ) - - self.assertEqual( - next(self.executor.map(str, itertools.count(), buffersize=buffersize)), - next(map(str, integers)), - msg="`Executor.map` with `buffersize` should work correctly on an infinite input iterator.", - ) - - self.assertFalse( - list(self.executor.map(str, [], buffersize=buffersize)), - msg="`Executor.map` with `buffersize` should return an empty iterator if the input iterable is empty.", - ) - - self.assertFalse( - list(self.executor.map(str, buffersize=buffersize)), - msg="`Executor.map` with `buffersize` should return an empty iterator if no input iterable is provided.", - ) - - integers_iter = iter(integers) - self.executor.map(str, integers_iter, buffersize=buffersize) - self.executor.shutdown(wait=True) # wait for pending tasks to complete + self.executor.map(str, range(4), buffersize=0) + + def test_map_buffersize(self): + ints = range(4) + for buffersize in (1, 2, len(ints), len(ints) * 2): + with self.subTest(buffersize=buffersize): + res = self.executor.map(str, ints, buffersize=buffersize) + self.assertEqual(list(res), ["0", "1", "2", "3"]) + + def test_map_buffersize_on_multiple_iterables(self): + ints = range(4) + for buffersize in (1, 2, len(ints), len(ints) * 2): + with self.subTest(buffersize=buffersize): + res = self.executor.map(add, ints, ints, buffersize=buffersize) + self.assertEqual(list(res), [0, 2, 4, 6]) + + def test_map_buffersize_on_infinite_iterable(self): + res = self.executor.map(str, itertools.count(), buffersize=2) + self.assertEqual(next(res, None), "0") + + def test_map_buffersize_on_empty_iterable(self): + res = self.executor.map(str, [], buffersize=2) + self.assertIsNone(next(res, None)) + + def test_map_buffersize_without_iterable(self): + res = self.executor.map(str, buffersize=2) + self.assertIsNone(next(res, None)) + + def test_map_buffersize_when_buffer_is_full(self): + ints = iter(range(4)) + buffersize = 2 + self.executor.map(str, ints, buffersize=buffersize) + self.executor.shutdown(wait=True) # wait for tasks to complete self.assertEqual( - next(integers_iter), + next(ints), buffersize, - msg="`Executor.map` should pull only the first `buffersize` elements from the input iterable.", + msg="should have fetched only `buffersize` elements from `ints`.", ) def test_shutdown_race_issue12456(self): From 516a94b633d0d7f5069040027c6d73d67ae1bea8 Mon Sep 17 00:00:00 2001 From: Enzo Bonnal Date: Mon, 16 Dec 2024 20:01:32 +0000 Subject: [PATCH 32/42] Update Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- .../next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst index 55c2eee77ace61..9cfc55e649af1c 100644 --- a/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst +++ b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst @@ -1 +1,2 @@ -Add an optional ``buffersize`` parameter to :meth:`concurrent.futures.Executor.map`. +Add the optional ``buffersize`` parameter to :meth:`concurrent.futures.Executor.map` +to specify the number of results that can be buffered before being yielded. From ba4ac8191a8e5da74598f5ff8faf2fef5bed101b Mon Sep 17 00:00:00 2001 From: Enzo Bonnal Date: Mon, 16 Dec 2024 20:01:53 +0000 Subject: [PATCH 33/42] Update Doc/whatsnew/3.14.rst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- Doc/whatsnew/3.14.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 4de27d9fad375f..5d97f3403f3df7 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -325,8 +325,8 @@ concurrent.futures supplying a *mp_context* to :class:`concurrent.futures.ProcessPoolExecutor`. (Contributed by Gregory P. Smith in :gh:`84559`.) -* Introduced the optional ``buffersize`` parameter to - :meth:`concurrent.futures.Executor.map`, which specifies the number +* Add the optional ``buffersize`` parameter to + :meth:`concurrent.futures.Executor.map` to specify the number of results that can be buffered before being yielded. (Contributed by Enzo Bonnal and Josh Rosenberg in :gh:`74028`.) From 9588059d1ce5cb9d75897423f318d155799779a9 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 16 Dec 2024 20:02:43 +0000 Subject: [PATCH 34/42] use assertListEqual --- Lib/test/test_concurrent_futures/executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index f4600ffb9a2988..61e940900513a3 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -85,14 +85,14 @@ def test_map_buffersize(self): for buffersize in (1, 2, len(ints), len(ints) * 2): with self.subTest(buffersize=buffersize): res = self.executor.map(str, ints, buffersize=buffersize) - self.assertEqual(list(res), ["0", "1", "2", "3"]) + self.assertListEqual(list(res), ["0", "1", "2", "3"]) def test_map_buffersize_on_multiple_iterables(self): ints = range(4) for buffersize in (1, 2, len(ints), len(ints) * 2): with self.subTest(buffersize=buffersize): res = self.executor.map(add, ints, ints, buffersize=buffersize) - self.assertEqual(list(res), [0, 2, 4, 6]) + self.assertListEqual(list(res), [0, 2, 4, 6]) def test_map_buffersize_on_infinite_iterable(self): res = self.executor.map(str, itertools.count(), buffersize=2) From 0427bf1e9a4befbc9c685eabcd0960daeb2fb5bd Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 16 Dec 2024 20:22:03 +0000 Subject: [PATCH 35/42] test_map_buffersize_validation: test negative buffersize --- Lib/test/test_concurrent_futures/executor.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 61e940900513a3..7b44cc6113625c 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -74,11 +74,13 @@ def test_map_timeout(self): self.assertEqual([None, None], results) def test_map_buffersize_validation(self): - with self.assertRaisesRegex( - ValueError, - "buffersize must be None or > 0", - ): - self.executor.map(str, range(4), buffersize=0) + for buffersize in (0, -1): + with self.subTest(buffersize=buffersize): + with self.assertRaisesRegex( + ValueError, + "buffersize must be None or > 0", + ): + self.executor.map(str, range(4), buffersize=buffersize) def test_map_buffersize(self): ints = range(4) From af88fdf43b2132dba1c4ee3b2b353579d187771a Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 16 Dec 2024 20:40:13 +0000 Subject: [PATCH 36/42] explicitly checks buffersize's type and add test_map_buffersize_type_validation --- Lib/concurrent/futures/_base.py | 2 ++ Lib/test/test_concurrent_futures/executor.py | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 649c80cb978818..0431d3d45be2c0 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -600,6 +600,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): before the given timeout. Exception: If fn(*args) raises for any values. """ + if buffersize is not None and not isinstance(buffersize, int): + raise TypeError("buffersize must be an integer or None") if buffersize is not None and buffersize < 1: raise ValueError("buffersize must be None or > 0") diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 7b44cc6113625c..864a34c23f15bd 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -73,7 +73,16 @@ def test_map_timeout(self): self.assertEqual([None, None], results) - def test_map_buffersize_validation(self): + def test_map_buffersize_type_validation(self): + for buffersize in ("foo", 2.0): + with self.subTest(buffersize=buffersize): + with self.assertRaisesRegex( + TypeError, + "buffersize must be an integer or None", + ): + self.executor.map(str, range(4), buffersize=buffersize) + + def test_map_buffersize_value_validation(self): for buffersize in (0, -1): with self.subTest(buffersize=buffersize): with self.assertRaisesRegex( From 1fcf3fe651f8dc5401c74dcebf7182deec59c886 Mon Sep 17 00:00:00 2001 From: Enzo Bonnal Date: Wed, 18 Dec 2024 00:07:38 +0000 Subject: [PATCH 37/42] test_map_buffersize_on_infinite_iterable: fetch the first 4 elements --- Lib/test/test_concurrent_futures/executor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 864a34c23f15bd..219c343b52d64a 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -108,6 +108,9 @@ def test_map_buffersize_on_multiple_iterables(self): def test_map_buffersize_on_infinite_iterable(self): res = self.executor.map(str, itertools.count(), buffersize=2) self.assertEqual(next(res, None), "0") + self.assertEqual(next(res, None), "1") + self.assertEqual(next(res, None), "2") + self.assertEqual(next(res, None), "3") def test_map_buffersize_on_empty_iterable(self): res = self.executor.map(str, [], buffersize=2) From 0892b2bc8dd3b59c7ccf94c42fdc4ce4abb777c2 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Wed, 25 Dec 2024 02:03:20 +0000 Subject: [PATCH 38/42] add `test_map_buffersize_on_multiple_infinite_iterables` --- Lib/test/test_concurrent_futures/executor.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 219c343b52d64a..c384c7a511f663 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -110,7 +110,17 @@ def test_map_buffersize_on_infinite_iterable(self): self.assertEqual(next(res, None), "0") self.assertEqual(next(res, None), "1") self.assertEqual(next(res, None), "2") - self.assertEqual(next(res, None), "3") + + def test_map_buffersize_on_multiple_infinite_iterables(self): + res = self.executor.map( + add, + itertools.count(), + itertools.count(), + buffersize=2 + ) + self.assertEqual(next(res, None), 0) + self.assertEqual(next(res, None), 2) + self.assertEqual(next(res, None), 4) def test_map_buffersize_on_empty_iterable(self): res = self.executor.map(str, [], buffersize=2) From 579ba3123dc90259f357b0b9f28f054fe853254b Mon Sep 17 00:00:00 2001 From: ebonnal Date: Sat, 11 Jan 2025 00:46:29 +0100 Subject: [PATCH 39/42] doc: specify that it is the size of a buffer of tasks and not results --- Doc/library/concurrent.futures.rst | 11 ++++++----- Doc/whatsnew/3.14.rst | 6 ++++-- Lib/concurrent/futures/_base.py | 9 +++++---- Lib/concurrent/futures/process.py | 9 +++++---- .../2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst | 6 ++++-- 5 files changed, 24 insertions(+), 17 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 1a0a441c109b5a..8df0379fe0a6bc 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -44,9 +44,10 @@ Executor Objects Similar to :func:`map(fn, *iterables) ` except: - * the *iterables* are collected immediately rather than lazily, unless a - *buffersize* is specified: If the buffer is full, then the iteration - over *iterables* is paused until a result is yielded from the buffer. + * The *iterables* are collected immediately rather than lazily, unless a + *buffersize* is specified to limit the number of submitted tasks whose + results have not yet been yielded. If the buffer is full, iteration over + the *iterables* pauses until a result is yielded from the buffer. * *fn* is executed asynchronously and several calls to *fn* may be made concurrently. @@ -70,10 +71,10 @@ Executor Objects *chunksize* has no effect. .. versionchanged:: 3.5 - Added the *chunksize* argument. + Added the *chunksize* parameter. .. versionchanged:: next - Added the *buffersize* argument. + Added the *buffersize* parameter. .. method:: shutdown(wait=True, *, cancel_futures=False) diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 5d97f3403f3df7..a77b4f339a5b0f 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -326,8 +326,10 @@ concurrent.futures (Contributed by Gregory P. Smith in :gh:`84559`.) * Add the optional ``buffersize`` parameter to - :meth:`concurrent.futures.Executor.map` to specify the number - of results that can be buffered before being yielded. + :meth:`concurrent.futures.Executor.map` to limit the number of submitted + tasks whose results have not yet been yielded. If the buffer is full, + iteration over the *iterables* pauses until a result is yielded from the + buffer. (Contributed by Enzo Bonnal and Josh Rosenberg in :gh:`74028`.) ctypes diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 0431d3d45be2c0..9cb119ecc75c0d 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -586,10 +586,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. - buffersize: The number of results that can be buffered before being - yielded. When the buffer is full, iteration over the input - iterables is paused until a result is yielded from the buffer. - If None, buffering is unlimited. + buffersize: The number of submitted tasks whose results have not + yet been yielded. If the buffer is full, iteration over the + iterables pauses until a result is yielded from the buffer. + If None, all input elements are eagerly collected, and a task is + submitted for each. Returns: An iterator equivalent to: map(func, *iterables) but the calls may diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index cf795a32e27b66..bdbfe3e7834316 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -816,10 +816,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. - buffersize: The number of result chunks that can be buffered before - being yielded. When the buffer is full, iteration over the - input iterables is paused until a result chunk is yielded from - the buffer. If None, buffering is unlimited. + buffersize: The number of submitted tasks whose results have not + yet been yielded. If the buffer is full, iteration over the + iterables pauses until a result is yielded from the buffer. + If None, all input elements are eagerly collected, and a task is + submitted for each. Returns: An iterator equivalent to: map(func, *iterables) but the calls may diff --git a/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst index 9cfc55e649af1c..6760e2b935430c 100644 --- a/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst +++ b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst @@ -1,2 +1,4 @@ -Add the optional ``buffersize`` parameter to :meth:`concurrent.futures.Executor.map` -to specify the number of results that can be buffered before being yielded. +Add the optional ``buffersize`` parameter to +:meth:`concurrent.futures.Executor.map` to limit the number of submitted tasks +whose results have not yet been yielded. If the buffer is full, iteration over +the *iterables* pauses until a result is yielded from the buffer. From ef814e5793ca9e59d3d5c4cda701b3a754bf45da Mon Sep 17 00:00:00 2001 From: Enzo Bonnal Date: Mon, 24 Feb 2025 23:16:53 +0000 Subject: [PATCH 40/42] Update Doc/whatsnew/3.14.rst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- Doc/whatsnew/3.14.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 2fedb926f9ce20..5b7ca7d0ac94d7 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -439,7 +439,6 @@ contextvars * Support context manager protocol by :class:`contextvars.Token`. (Contributed by Andrew Svetlov in :gh:`129889`.) - * Add the optional ``buffersize`` parameter to :meth:`concurrent.futures.Executor.map` to limit the number of submitted tasks whose results have not yet been yielded. If the buffer is full, @@ -447,6 +446,7 @@ contextvars buffer. (Contributed by Enzo Bonnal and Josh Rosenberg in :gh:`74028`.) + ctypes ------ From 7b1d5f60e072b2c783803d2fc7d2ce3c4f4f71a1 Mon Sep 17 00:00:00 2001 From: ebonnal Date: Mon, 3 Mar 2025 16:08:38 +0000 Subject: [PATCH 41/42] remove redundant `iter` --- Lib/concurrent/futures/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 9cb119ecc75c0d..d5ba39e3d71774 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -609,7 +609,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): if timeout is not None: end_time = timeout + time.monotonic() - zipped_iterables = iter(zip(*iterables)) + zipped_iterables = zip(*iterables) if buffersize: fs = collections.deque( self.submit(fn, *args) for args in islice(zipped_iterables, buffersize) From bb756f465e2784f6d49e1bc89bc8320ceb92c614 Mon Sep 17 00:00:00 2001 From: Enzo Bonnal Date: Wed, 12 Mar 2025 14:54:05 +0000 Subject: [PATCH 42/42] add new line in 3.14.rst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- Doc/whatsnew/3.14.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index f139b6c057e499..464e722e8e66ae 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -468,6 +468,7 @@ contextvars buffer. (Contributed by Enzo Bonnal and Josh Rosenberg in :gh:`74028`.) + ctypes ------