Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

bpo-29842: Make Executor.map less eager so it handles large/unbounded… #707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
bpo-29842: Make Executor.map less eager so it handles large/unbounded…
… input iterables appropriately
  • Loading branch information
MojoVampire committed Mar 18, 2017
commit fbdb56cda18634fce0882e043ac26acd3d2e17c6
9 changes: 7 additions & 2 deletions 9 Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Executor Objects
future = executor.submit(pow, 323, 1235)
print(future.result())

.. method:: map(func, *iterables, timeout=None, chunksize=1)
.. method:: map(func, *iterables, timeout=None, chunksize=1, prefetch=None)

Equivalent to :func:`map(func, *iterables) <map>` except *func* is executed
asynchronously and several calls to *func* may be made concurrently. The
Expand All @@ -54,11 +54,16 @@ Executor Objects
specified by setting *chunksize* to a positive integer. For very long
iterables, using a large value for *chunksize* can significantly improve
performance compared to the default size of 1. With :class:`ThreadPoolExecutor`,
*chunksize* has no effect.
*chunksize* has no effect. By default, a reasonable number of tasks are
queued beyond the number of workers, an explicit *prefetch* count may be
provided to specify how many extra tasks should be queued.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using "chunks" here would be more precise than "tasks".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation for chunksize uses the phrasing "this method chops iterables into a number of chunks which it submits to the pool as separate tasks", and since not all executors even use chunks (ThreadPoolExecutor ignores the argument), I figured I'd stick with "tasks". It does kind of leave out a term to describe a single work item; the docs uses chunks and tasks as synonyms, with no term for a single work item.


MojoVampire marked this conversation as resolved.
Show resolved Hide resolved
.. versionchanged:: 3.5
Added the *chunksize* argument.

.. versionchanged:: 3.7
MojoVampire marked this conversation as resolved.
Show resolved Hide resolved
Added the *prefetch* argument.

.. method:: shutdown(wait=True)

Signal the executor that it should free any resources that it is using
Expand Down
33 changes: 28 additions & 5 deletions 33 Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
__author__ = 'Brian Quinlan (brian@sweetapp.com)'

import collections
import itertools
import logging
import threading
import time
Expand Down Expand Up @@ -520,7 +521,7 @@ def submit(self, fn, *args, **kwargs):
"""
raise NotImplementedError()

def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None):
"""Returns an iterator equivalent to map(fn, iter).
MojoVampire marked this conversation as resolved.
Show resolved Hide resolved

Args:
MojoVampire marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -544,18 +545,40 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
"""
MojoVampire marked this conversation as resolved.
Show resolved Hide resolved
if timeout is not None:
end_time = timeout + time.time()
if prefetch is None:
prefetch = self._max_workers
if prefetch < 0:
raise ValueError("prefetch count may not be negative")

fs = [self.submit(fn, *args) for args in zip(*iterables)]
argsiter = zip(*iterables)

fs = collections.deque(self.submit(fn, *args) for args in itertools.islice(argsiter, self._max_workers + prefetch))
MojoVampire marked this conversation as resolved.
Show resolved Hide resolved

MojoVampire marked this conversation as resolved.
Show resolved Hide resolved
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
MojoVampire marked this conversation as resolved.
Show resolved Hide resolved
nonlocal argsiter
try:
for future in fs:
while fs:
if timeout is None:
yield future.result()
res = fs[0].result()
else:
yield future.result(end_time - time.time())
res = fs[0].result(end_time - time.time())

# Got a result, future needn't be cancelled
del fs[0]

MojoVampire marked this conversation as resolved.
Show resolved Hide resolved
# Dispatch next task before yielding to keep
# pipeline full
if argsiter:
try:
args = next(argsiter)
except StopIteration:
argsiter = None
else:
fs.append(self.submit(fn, *args))

yield res
finally:
for future in fs:
future.cancel()
Expand Down
4 changes: 2 additions & 2 deletions 4 Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def submit(self, fn, *args, **kwargs):
return f
submit.__doc__ = _base.Executor.submit.__doc__

def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None):
"""Returns an iterator equivalent to map(fn, iter).

Args:
MojoVampire marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -481,7 +481,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):

results = super().map(partial(_process_chunk, fn),
_get_chunks(*iterables, chunksize=chunksize),
timeout=timeout)
timeout=timeout, prefetch=prefetch)
return itertools.chain.from_iterable(results)

def shutdown(self, wait=True):
Expand Down
17 changes: 16 additions & 1 deletion 17 Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,22 @@ def record_finished(n):

self.executor.map(record_finished, range(10))
self.executor.shutdown(wait=True)
self.assertCountEqual(finished, range(10))
# No guarantees on how many tasks dispatched,
# but at least one should have been dispatched
self.assertGreater(len(finished), 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change breaks compatibility. The doc for shutdown says:

If wait is True then this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed.

So all futures should have executed, instead of being cancelled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the time I wrote it, it didn't conflict with the documentation precisely; the original documentation said that map was "Equivalent to map(func, *iterables) except func is executed asynchronously and several calls to func may be made concurrently.", but doesn't guarantee that any actual futures exist (it's implemented in terms of submit and futures, but doesn't actually require such a design).

That said, it looks like you updated the documentation to add "the iterables are collected immediately rather than lazily;", which, if considered a guarantee, rather than a warning, would make this a breaking change even ignoring the "cancel vs. wait" issue.

Do you have any suggestions? If strict adherence to your newly (as of late 2017) documented behavior is needed, I suppose I could change the default behavior from "reasonable prefetch" to "exhaustive prefetch", so when prefetch isn't passed, every task is submitted, but it would be kind of annoying to lose the "good by default" behavior of limited prefetching.

The reason I cancelled rather than waiting on the result is that I was trying to follow the normal use pattern for map; since the results are yielded lazily, if the iterator goes away or is closed explicitly (or you explicitly shut down the executor), you're done; having the outstanding futures complete when you're not able to see the results means you're either:

  1. Expecting the tasks to complete without running out the Executor.map (which doesn't work with Py3's map at all, so the analogy to map should allow it; if you don't run it out, you have no guarantees anything was done)
  2. Not planning to use any further results (in which case running any submitted but unscheduled futures means doing work no one can see the results of)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think there are two problems to discuss:

  1. What happens when shutdown(wait=True) is called. Currently it waits for all outstanding tasks. I don't think we can change that (the explicit wait flag exists for a reason).
  2. Whether map() can be silently switched to a lazy mode of operation. There's a (perhaps minor) problem with that. Currently, if one of iterables raises an error, map() propagates the exception. With your proposal, the exception may be raised later inside the result iterator.

I think 2) might easily be worked around by introducing a separate method (lazy_map?).

It seems it would be good to discuss those questions on the mailing-list.

Copy link
Contributor Author

@MojoVampire MojoVampire May 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the problem with using the "lazy_map" name is that it feels like recreating the same annoying distinctions between map and imap from the Py2 era, and it would actually have Executor.map (which is supposed to match map, which lazily consumes the input(s)) be less similar to map than Executor.lazy_map.

If it's necessary to gain acceptance, I could change the default behavior to use prefetch=sys.maxsize - self._max_workers. It would match the pre-existing behavior for just about anything that conceivably worked before (modulo the tiny differences in memory usage of deque vs. list for storing the futures) since:

  1. All tasks would be submitted fully up front, so shutdown(wait=True) would in fact wait on them (and no further calls to submit would occur in the generator, so submitting wouldn't occur post-shutdown, which would raise a RuntimeError and cause the cancellation
  2. It wouldn't be lazy for anything by default (it would either work eagerly or crash, in the same manner it currently behaves)

If you passed a reasonable prefetch, you wouldn't have these behaviors (and we should document that interaction), but at least existing code would continue to work identically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion. I think discussing those alternatives on the ML, to gather more opinions and arguments, would be useful.


def test_infinite_map_input_completes_work(self):
import itertools
def identity(x):
return x

mapobj = self.executor.map(identity, itertools.count(0))
# Get one result, which shows we handle infinite inputs
# without waiting for all work to be dispatched
res = next(mapobj)
mapobj.close() # Make sure futures cancelled

self.assertEqual(res, 0)

def test_default_workers(self):
executor = self.executor_type()
Expand Down
6 changes: 6 additions & 0 deletions 6 Misc/NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ Extension Modules
Library
-------

- bpo-29842: Executor.map no longer creates all futures eagerly prior to
yielding any results. This allows it to work with huge or infinite iterables
without consuming excessive resources or crashing, making it more suitable
as a drop in replacement for the built-in map.
Patch by Josh Rosenberg.

MojoVampire marked this conversation as resolved.
Show resolved Hide resolved
- bpo-29800: Fix crashes in partial.__repr__ if the keys of partial.keywords
are not strings. Patch by Michael Seifert.

Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.