From fbdb56cda18634fce0882e043ac26acd3d2e17c6 Mon Sep 17 00:00:00 2001 From: Josh Rosenberg Date: Fri, 17 Mar 2017 22:04:33 -0400 Subject: [PATCH 1/5] bpo-29842: Make Executor.map less eager so it handles large/unbounded input iterables appropriately --- Doc/library/concurrent.futures.rst | 9 ++++++-- Lib/concurrent/futures/_base.py | 33 ++++++++++++++++++++++++----- Lib/concurrent/futures/process.py | 4 ++-- Lib/test/test_concurrent_futures.py | 17 ++++++++++++++- Misc/NEWS | 6 ++++++ 5 files changed, 59 insertions(+), 10 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index d85576b8bedd8e..42b433ca07a1ac 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -38,7 +38,7 @@ Executor Objects future = executor.submit(pow, 323, 1235) print(future.result()) - .. method:: map(func, *iterables, timeout=None, chunksize=1) + .. method:: map(func, *iterables, timeout=None, chunksize=1, prefetch=None) Equivalent to :func:`map(func, *iterables) ` except *func* is executed asynchronously and several calls to *func* may be made concurrently. The @@ -54,11 +54,16 @@ Executor Objects specified by setting *chunksize* to a positive integer. For very long iterables, using a large value for *chunksize* can significantly improve performance compared to the default size of 1. With :class:`ThreadPoolExecutor`, - *chunksize* has no effect. + *chunksize* has no effect. By default, a reasonable number of tasks are + queued beyond the number of workers, an explicit *prefetch* count may be + provided to specify how many extra tasks should be queued. .. versionchanged:: 3.5 Added the *chunksize* argument. + .. versionchanged:: 3.7 + Added the *prefetch* argument. + .. method:: shutdown(wait=True) Signal the executor that it should free any resources that it is using diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 295489c93e56d8..57782fdba87a0c 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -4,6 +4,7 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import collections +import itertools import logging import threading import time @@ -520,7 +521,7 @@ def submit(self, fn, *args, **kwargs): """ raise NotImplementedError() - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -544,18 +545,40 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): """ if timeout is not None: end_time = timeout + time.time() + if prefetch is None: + prefetch = self._max_workers + if prefetch < 0: + raise ValueError("prefetch count may not be negative") - fs = [self.submit(fn, *args) for args in zip(*iterables)] + argsiter = zip(*iterables) + + fs = collections.deque(self.submit(fn, *args) for args in itertools.islice(argsiter, self._max_workers + prefetch)) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): + nonlocal argsiter try: - for future in fs: + while fs: if timeout is None: - yield future.result() + res = fs[0].result() else: - yield future.result(end_time - time.time()) + res = fs[0].result(end_time - time.time()) + + # Got a result, future needn't be cancelled + del fs[0] + + # Dispatch next task before yielding to keep + # pipeline full + if argsiter: + try: + args = next(argsiter) + except StopIteration: + argsiter = None + else: + fs.append(self.submit(fn, *args)) + + yield res finally: for future in fs: future.cancel() diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8f1d714193ab79..f8b02cb5cb880f 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -455,7 +455,7 @@ def submit(self, fn, *args, **kwargs): return f submit.__doc__ = _base.Executor.submit.__doc__ - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -481,7 +481,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), - timeout=timeout) + timeout=timeout, prefetch=prefetch) return itertools.chain.from_iterable(results) def shutdown(self, wait=True): diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 23e95b212447c8..464bc858b080b8 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -467,7 +467,22 @@ def record_finished(n): self.executor.map(record_finished, range(10)) self.executor.shutdown(wait=True) - self.assertCountEqual(finished, range(10)) + # No guarantees on how many tasks dispatched, + # but at least one should have been dispatched + self.assertGreater(len(finished), 0) + + def test_infinite_map_input_completes_work(self): + import itertools + def identity(x): + return x + + mapobj = self.executor.map(identity, itertools.count(0)) + # Get one result, which shows we handle infinite inputs + # without waiting for all work to be dispatched + res = next(mapobj) + mapobj.close() # Make sure futures cancelled + + self.assertEqual(res, 0) def test_default_workers(self): executor = self.executor_type() diff --git a/Misc/NEWS b/Misc/NEWS index b7990c62e4f744..4ffeb0f391b6c0 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -281,6 +281,12 @@ Extension Modules Library ------- +- bpo-29842: Executor.map no longer creates all futures eagerly prior to + yielding any results. This allows it to work with huge or infinite iterables + without consuming excessive resources or crashing, making it more suitable + as a drop in replacement for the built-in map. + Patch by Josh Rosenberg. + - bpo-29800: Fix crashes in partial.__repr__ if the keys of partial.keywords are not strings. Patch by Michael Seifert. From 8394e3480eb9fbb5200836174024d75581a03b22 Mon Sep 17 00:00:00 2001 From: Josh Rosenberg Date: Mon, 6 May 2019 12:22:17 -0400 Subject: [PATCH 2/5] Add prefetch info to docstrings --- Lib/concurrent/futures/_base.py | 2 ++ Lib/concurrent/futures/process.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index b9888ce0d7083e..ff17bb7539b7b1 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -581,6 +581,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. + prefetch: The number of chunks to queue beyond the number of + workers on the executor. If None, a reasonable default is used. Returns: An iterator equivalent to: map(func, *iterables) but the calls may diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 1280f900c45d99..f5f633bee1f19c 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -643,6 +643,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. + prefetch: The number of chunks to queue beyond the number of + workers on the executor. If None, a reasonable default is used. Returns: An iterator equivalent to: map(func, *iterables) but the calls may From 634a3de6838a14017d4bddbf482d112a8719d7d1 Mon Sep 17 00:00:00 2001 From: Josh Rosenberg Date: Mon, 6 May 2019 12:49:44 -0400 Subject: [PATCH 3/5] Add Misc/NEWS entry --- .../next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst diff --git a/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst b/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst new file mode 100644 index 00000000000000..9318ed1f8e9df7 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst @@ -0,0 +1,4 @@ +Executor.map no longer creates all futures eagerly prior to yielding any +results. This allows it to work with huge or infinite iterables without +consuming excessive resources or crashing, making it more suitable as a drop +in replacement for the built-in map. Patch by Josh Rosenberg. From 015283128476101da1f66067d4fda22b90cc70ff Mon Sep 17 00:00:00 2001 From: Josh Rosenberg Date: Mon, 6 May 2019 15:18:12 -0400 Subject: [PATCH 4/5] Remove trailing whitespace in docs --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 02474cbf7f67d7..8e57dc1550e8b2 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -63,7 +63,7 @@ Executor Objects using a large value for *chunksize* can significantly improve performance compared to the default size of 1. With :class:`ThreadPoolExecutor`, *chunksize* has no effect. - + By default, a reasonable number of tasks are queued beyond the number of workers, an explicit *prefetch* count may be provided to specify how many extra tasks should be queued. From 3c38fabdfe9d625064625b4fb84aa41d59bf250d Mon Sep 17 00:00:00 2001 From: Josh Rosenberg Date: Mon, 6 May 2019 16:06:19 -0400 Subject: [PATCH 5/5] Fix behavior to follow test_free_reference requirements (generator holds no reference to result at the moment it yields Reduce line lengths to PEP8 limits --- Lib/concurrent/futures/_base.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index ff17bb7539b7b1..efb6ce2b45c8d3 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -601,8 +601,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): raise ValueError("prefetch count may not be negative") argsiter = zip(*iterables) + initialargs = itertools.islice(argsiter, self._max_workers + prefetch) - fs = collections.deque(self.submit(fn, *args) for args in itertools.islice(argsiter, self._max_workers + prefetch)) + fs = collections.deque(self.submit(fn, *args) for args in initialargs) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. @@ -611,9 +612,9 @@ def result_iterator(): try: while fs: if timeout is None: - res = fs[0].result() + res = [fs[0].result()] else: - res = fs[0].result(end_time - time.monotonic()) + res = [fs[0].result(end_time - time.monotonic())] # Got a result, future needn't be cancelled del fs[0] @@ -628,7 +629,7 @@ def result_iterator(): else: fs.append(self.submit(fn, *args)) - yield res + yield res.pop() finally: for future in fs: future.cancel()