diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 8d6b1e8d71ff47..8e57dc1550e8b2 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -38,7 +38,7 @@ Executor Objects future = executor.submit(pow, 323, 1235) print(future.result()) - .. method:: map(func, *iterables, timeout=None, chunksize=1) + .. method:: map(func, *iterables, timeout=None, chunksize=1, prefetch=None) Similar to :func:`map(func, *iterables) ` except: @@ -64,9 +64,16 @@ Executor Objects performance compared to the default size of 1. With :class:`ThreadPoolExecutor`, *chunksize* has no effect. + By default, a reasonable number of tasks are + queued beyond the number of workers, an explicit *prefetch* count may be + provided to specify how many extra tasks should be queued. + .. versionchanged:: 3.5 Added the *chunksize* argument. + .. versionchanged:: 3.8 + Added the *prefetch* argument. + .. method:: shutdown(wait=True) Signal the executor that it should free any resources that it is using diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index ea16eef841c518..efb6ce2b45c8d3 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -4,6 +4,7 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import collections +import itertools import logging import threading import time @@ -568,7 +569,7 @@ def submit(*args, **kwargs): raise NotImplementedError() - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -580,6 +581,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. + prefetch: The number of chunks to queue beyond the number of + workers on the executor. If None, a reasonable default is used. Returns: An iterator equivalent to: map(func, *iterables) but the calls may @@ -592,21 +595,41 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): """ if timeout is not None: end_time = timeout + time.monotonic() + if prefetch is None: + prefetch = self._max_workers + if prefetch < 0: + raise ValueError("prefetch count may not be negative") - fs = [self.submit(fn, *args) for args in zip(*iterables)] + argsiter = zip(*iterables) + initialargs = itertools.islice(argsiter, self._max_workers + prefetch) + + fs = collections.deque(self.submit(fn, *args) for args in initialargs) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): + nonlocal argsiter try: - # reverse to keep finishing order - fs.reverse() while fs: - # Careful not to keep a reference to the popped future if timeout is None: - yield fs.pop().result() + res = [fs[0].result()] else: - yield fs.pop().result(end_time - time.monotonic()) + res = [fs[0].result(end_time - time.monotonic())] + + # Got a result, future needn't be cancelled + del fs[0] + + # Dispatch next task before yielding to keep + # pipeline full + if argsiter: + try: + args = next(argsiter) + except StopIteration: + argsiter = None + else: + fs.append(self.submit(fn, *args)) + + yield res.pop() finally: for future in fs: future.cancel() diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index e6ce278b5d44c6..f5f633bee1f19c 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -632,7 +632,7 @@ def submit(*args, **kwargs): return f submit.__doc__ = _base.Executor.submit.__doc__ - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -643,6 +643,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. + prefetch: The number of chunks to queue beyond the number of + workers on the executor. If None, a reasonable default is used. Returns: An iterator equivalent to: map(func, *iterables) but the calls may @@ -658,7 +660,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), - timeout=timeout) + timeout=timeout, prefetch=prefetch) return _chain_from_iterable_of_lists(results) def shutdown(self, wait=True): diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 903afbd2a4f68a..01e39d5d4b2ef1 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -746,7 +746,22 @@ def record_finished(n): self.executor.map(record_finished, range(10)) self.executor.shutdown(wait=True) - self.assertCountEqual(finished, range(10)) + # No guarantees on how many tasks dispatched, + # but at least one should have been dispatched + self.assertGreater(len(finished), 0) + + def test_infinite_map_input_completes_work(self): + import itertools + def identity(x): + return x + + mapobj = self.executor.map(identity, itertools.count(0)) + # Get one result, which shows we handle infinite inputs + # without waiting for all work to be dispatched + res = next(mapobj) + mapobj.close() # Make sure futures cancelled + + self.assertEqual(res, 0) def test_default_workers(self): executor = self.executor_type() diff --git a/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst b/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst new file mode 100644 index 00000000000000..9318ed1f8e9df7 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst @@ -0,0 +1,4 @@ +Executor.map no longer creates all futures eagerly prior to yielding any +results. This allows it to work with huge or infinite iterables without +consuming excessive resources or crashing, making it more suitable as a drop +in replacement for the built-in map. Patch by Josh Rosenberg.