-
-
Notifications
You must be signed in to change notification settings - Fork 32.2k
bpo-29842: Make Executor.map less eager so it handles large/unbounded… #707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
fbdb56c
6281379
8394e34
634a3de
0152831
3c38fab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
… input iterables appropriately
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -467,7 +467,22 @@ def record_finished(n): | |
|
||
self.executor.map(record_finished, range(10)) | ||
self.executor.shutdown(wait=True) | ||
self.assertCountEqual(finished, range(10)) | ||
# No guarantees on how many tasks dispatched, | ||
# but at least one should have been dispatched | ||
self.assertGreater(len(finished), 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this change breaks compatibility. The doc for
So all futures should have executed, instead of being cancelled. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the time I wrote it, it didn't conflict with the documentation precisely; the original documentation said that map was "Equivalent to map(func, *iterables) except func is executed asynchronously and several calls to func may be made concurrently.", but doesn't guarantee that any actual futures exist (it's implemented in terms of submit and futures, but doesn't actually require such a design). That said, it looks like you updated the documentation to add "the iterables are collected immediately rather than lazily;", which, if considered a guarantee, rather than a warning, would make this a breaking change even ignoring the "cancel vs. wait" issue. Do you have any suggestions? If strict adherence to your newly (as of late 2017) documented behavior is needed, I suppose I could change the default behavior from "reasonable prefetch" to "exhaustive prefetch", so when prefetch isn't passed, every task is submitted, but it would be kind of annoying to lose the "good by default" behavior of limited prefetching. The reason I cancelled rather than waiting on the result is that I was trying to follow the normal use pattern for map; since the results are yielded lazily, if the iterator goes away or is closed explicitly (or you explicitly shut down the executor), you're done; having the outstanding futures complete when you're not able to see the results means you're either:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I think there are two problems to discuss:
I think 2) might easily be worked around by introducing a separate method ( It seems it would be good to discuss those questions on the mailing-list. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the problem with using the "lazy_map" name is that it feels like recreating the same annoying distinctions between map and imap from the Py2 era, and it would actually have Executor.map (which is supposed to match map, which lazily consumes the input(s)) be less similar to map than Executor.lazy_map. If it's necessary to gain acceptance, I could change the default behavior to use prefetch=sys.maxsize - self._max_workers. It would match the pre-existing behavior for just about anything that conceivably worked before (modulo the tiny differences in memory usage of deque vs. list for storing the futures) since:
If you passed a reasonable prefetch, you wouldn't have these behaviors (and we should document that interaction), but at least existing code would continue to work identically. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have a strong opinion. I think discussing those alternatives on the ML, to gather more opinions and arguments, would be useful. |
||
|
||
def test_infinite_map_input_completes_work(self): | ||
import itertools | ||
def identity(x): | ||
return x | ||
|
||
mapobj = self.executor.map(identity, itertools.count(0)) | ||
# Get one result, which shows we handle infinite inputs | ||
# without waiting for all work to be dispatched | ||
res = next(mapobj) | ||
mapobj.close() # Make sure futures cancelled | ||
|
||
self.assertEqual(res, 0) | ||
|
||
def test_default_workers(self): | ||
executor = self.executor_type() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using "chunks" here would be more precise than "tasks".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation for chunksize uses the phrasing "this method chops iterables into a number of chunks which it submits to the pool as separate tasks", and since not all executors even use chunks (ThreadPoolExecutor ignores the argument), I figured I'd stick with "tasks". It does kind of leave out a term to describe a single work item; the docs uses chunks and tasks as synonyms, with no term for a single work item.