Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

gh-124694: Add concurrent.futures.InterpreterPoolExecutor #124548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5c69d38
Make ThreadPoolExecutor extensible.
ericsnowcurrently Sep 25, 2024
01789be
Add InterpreterPoolExecutor.
ericsnowcurrently Sep 25, 2024
6def4be
Clean up the interpreter if initialize() fails.
ericsnowcurrently Sep 27, 2024
84993a5
Add a missing import.
ericsnowcurrently Sep 27, 2024
c540cf0
Fix some typos.
ericsnowcurrently Sep 27, 2024
45d584d
Add more tests.
ericsnowcurrently Sep 27, 2024
c90c016
Add docs.
ericsnowcurrently Sep 27, 2024
1cb4657
Add a NEwS entry.
ericsnowcurrently Sep 27, 2024
4dc0989
Fix the last test.
ericsnowcurrently Sep 27, 2024
57b2db6
Add more tests.
ericsnowcurrently Sep 27, 2024
75e11d2
Simplify ExecutionFailed.
ericsnowcurrently Sep 30, 2024
69c2b8e
Fix the signature of resolve_task().
ericsnowcurrently Sep 30, 2024
f03c314
Capture any uncaught exception.
ericsnowcurrently Sep 30, 2024
4806d9f
Add TODO comments.
ericsnowcurrently Sep 30, 2024
efc0395
Docs fixes.
ericsnowcurrently Sep 30, 2024
a29aee3
Automatically apply textwrap.dedent() to scripts.
ericsnowcurrently Sep 30, 2024
8bab457
Fix the WASI build.
ericsnowcurrently Sep 30, 2024
cd29914
wasi
ericsnowcurrently Oct 1, 2024
0287f3b
Ignore race in test.
ericsnowcurrently Oct 1, 2024
80cd7b1
Add BrokenInterpreterPool.
ericsnowcurrently Oct 8, 2024
f8d4273
Tweak the docs.
ericsnowcurrently Oct 8, 2024
3a8bfce
Clarify the InterpreterPoolExecutor docs.
ericsnowcurrently Oct 8, 2024
af6c27a
Catch all exceptions.
ericsnowcurrently Oct 8, 2024
8c0a405
Factor out exception serialization helpers.
ericsnowcurrently Oct 8, 2024
1ae7ca2
Set the ExecutionFailed error as __cause__.
ericsnowcurrently Oct 8, 2024
d24e85d
Drop the exception serialization helpers.
ericsnowcurrently Oct 8, 2024
05a03ad
Always finalize if there is an error in initialize().
ericsnowcurrently Oct 8, 2024
f150931
Explicitly note the problem with functions defined in __main__.
ericsnowcurrently Oct 8, 2024
97d0292
Handle the case where interpreters.queues doesn't exist.
ericsnowcurrently Oct 8, 2024
baf0504
Merge branch 'main' into interpreter-pool-executor
ericsnowcurrently Oct 15, 2024
5c3a327
Add a What's New entry about InterpreterPoolExecutor.
ericsnowcurrently Oct 15, 2024
a2032a8
Fix a typo.
ericsnowcurrently Oct 15, 2024
54119b8
Fix the documented signature.
ericsnowcurrently Oct 15, 2024
744dca7
Test and document asyncio support.
ericsnowcurrently Oct 15, 2024
f61d62d
Apply suggestions from code review
ericsnowcurrently Oct 16, 2024
ee65bb2
Expand the docs.
ericsnowcurrently Oct 16, 2024
a7f5c50
For now, drop support for scripts.
ericsnowcurrently Oct 16, 2024
b148e09
Fix a TODO comment.
ericsnowcurrently Oct 16, 2024
e365ae7
Fix the docs.
ericsnowcurrently Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
For now, drop support for scripts.
  • Loading branch information
ericsnowcurrently committed Oct 16, 2024
commit a7f5c50e7c1032123d6816db614d34757d0472fc
12 changes: 0 additions & 12 deletions 12 Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,6 @@ the bytes over a shared :mod:`socket <socket>` or
The executor may replace uncaught exceptions from *initializer*
with :class:`~concurrent.futures.interpreter.ExecutionFailed`.

The *initializer* argument may also be a script (:class:`str`),
The script will be executed in the interpreter's ``__main__`` module.
The executor automatically applies :func:`textwrap.dedent` to the script.
*initargs* must not be passed in in this case.

The optional *shared* argument is a :class:`dict` of objects that all
interpreters in the pool share. The *shared* items are added to each
interpreter's ``__main__`` module. Not all objects are shareable.
Expand All @@ -328,13 +323,6 @@ likewise serializes the return value when sending it back.
Functions defined in the ``__main__`` module cannot be pickled
and thus cannot be used.

For :meth:`~Executor.submit`, but *not* :meth:`~Executor.map`,
you can also pass a script (:class:`str`) instead of a callable.
The script will be executed in the interpreter's ``__main__`` module.
The executor will automatically apply :func:`textwrap.dedent` to the
script, so you don't have to do so. With a script, arguments must
not be passed in. The return value for a script is always ``None``.

When a worker's current task raises an uncaught exception, the worker
always tries to preserve the exception as-is. If that is successful
then it also sets the ``__cause__`` to a corresponding
Expand Down
13 changes: 9 additions & 4 deletions 13 Lib/concurrent/futures/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class WorkerContext(_thread.WorkerContext):
def prepare(cls, initializer, initargs, shared):
def resolve_task(fn, args, kwargs):
if isinstance(fn, str):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
if args or kwargs:
raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
data = textwrap.dedent(fn)
Expand All @@ -60,11 +62,13 @@ def resolve_task(fn, args, kwargs):
kind = 'function'
return (data, kind)

if isinstance(initializer, str):
if initargs:
raise ValueError(f'an initializer script does not take args, got {initargs!r}')
if initializer is not None:
initdata = resolve_task(initializer, initargs, {})
try:
initdata = resolve_task(initializer, initargs, {})
except ValueError:
if isinstance(initializer, str) and initargs:
raise ValueError(f'an initializer script does not take args, got {initargs!r}')
raise # re-raise
else:
initdata = None
def create_context():
Expand Down Expand Up @@ -160,6 +164,7 @@ def finalize(self):
def run(self, task):
data, kind = task
if kind == 'script':
raise NotImplementedError('script kind disabled')
script = f"""
with WorkerContext._capture_exc({self.resultsid}):
{textwrap.indent(data, ' ')}
Expand Down
125 changes: 68 additions & 57 deletions 125 Lib/test/test_concurrent_futures/test_interpreter_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
from .util import BaseTestCase, InterpreterPoolMixin, setup_module


def noop():
pass


def write_msg(fd, msg):
os.write(fd, msg + b'\0')

Expand Down Expand Up @@ -55,6 +59,7 @@ def pipe(self):
class InterpreterPoolExecutorTest(
InterpretersMixin, ExecutorTest, BaseTestCase):

@unittest.expectedFailure
def test_init_script(self):
msg1 = b'step: init'
msg2 = b'step: run'
Expand All @@ -80,6 +85,7 @@ def test_init_script(self):
self.assertEqual(after_init, msg1)
self.assertEqual(after_run, msg2)

@unittest.expectedFailure
def test_init_script_args(self):
with self.assertRaises(ValueError):
self.executor_type(initializer='pass', initargs=('spam',))
Expand Down Expand Up @@ -123,46 +129,50 @@ def initializer(self):
def test_init_shared(self):
msg = b'eggs'
r, w = self.pipe()
script = f"""
script = f"""if True:
import os
if __name__ != '__main__':
import __main__
spam = __main__.spam
os.write({w}, spam + b'\\0')
"""

executor = self.executor_type(shared={'spam': msg})
fut = executor.submit(script)
fut = executor.submit(exec, script)
fut.result()
after = read_msg(r)

self.assertEqual(after, msg)

def test_init_exception(self):
with self.subTest('script'):
executor = self.executor_type(initializer='raise Exception("spam")')
with executor:
with contextlib.redirect_stderr(io.StringIO()) as stderr:
fut = executor.submit('pass')
with self.assertRaises(BrokenInterpreterPool):
fut.result()
stderr = stderr.getvalue()
self.assertIn('ExecutionFailed: Exception: spam', stderr)
self.assertIn('Uncaught in the interpreter:', stderr)
self.assertIn('The above exception was the direct cause of the following exception:',
stderr)

with self.subTest('func'):
executor = self.executor_type(initializer=fail,
initargs=(Exception, 'spam'))
with executor:
with contextlib.redirect_stderr(io.StringIO()) as stderr:
fut = executor.submit('pass')
with self.assertRaises(BrokenInterpreterPool):
fut.result()
stderr = stderr.getvalue()
self.assertIn('ExecutionFailed: Exception: spam', stderr)
self.assertIn('Uncaught in the interpreter:', stderr)
self.assertIn('The above exception was the direct cause of the following exception:',
stderr)

@unittest.expectedFailure
def test_init_exception_in_script(self):
executor = self.executor_type(initializer='raise Exception("spam")')
with executor:
with contextlib.redirect_stderr(io.StringIO()) as stderr:
fut = executor.submit('pass')
with self.assertRaises(BrokenInterpreterPool):
fut.result()
stderr = stderr.getvalue()
self.assertIn('ExecutionFailed: Exception: spam', stderr)
self.assertIn('Uncaught in the interpreter:', stderr)
self.assertIn('The above exception was the direct cause of the following exception:',
stderr)

def test_init_exception_in_func(self):
executor = self.executor_type(initializer=fail,
initargs=(Exception, 'spam'))
with executor:
with contextlib.redirect_stderr(io.StringIO()) as stderr:
fut = executor.submit(noop)
with self.assertRaises(BrokenInterpreterPool):
fut.result()
stderr = stderr.getvalue()
self.assertIn('ExecutionFailed: Exception: spam', stderr)
self.assertIn('Uncaught in the interpreter:', stderr)
self.assertIn('The above exception was the direct cause of the following exception:',
stderr)

@unittest.expectedFailure
def test_submit_script(self):
msg = b'spam'
r, w = self.pipe()
Expand Down Expand Up @@ -221,39 +231,40 @@ def test_submit_func_globals(self):
self.assertEqual(name, __name__)
self.assertNotEqual(name, '__main__')

def test_submit_exception(self):
with self.subTest('script'):
fut = self.executor.submit('raise Exception("spam")')
with self.assertRaises(Exception) as captured:
fut.result()
self.assertIs(type(captured.exception), Exception)
self.assertEqual(str(captured.exception), 'spam')
cause = captured.exception.__cause__
self.assertIs(type(cause), ExecutionFailed)
for attr in ('__name__', '__qualname__', '__module__'):
self.assertEqual(getattr(cause.excinfo.type, attr),
getattr(Exception, attr))
self.assertEqual(cause.excinfo.msg, 'spam')

with self.subTest('func'):
fut = self.executor.submit(fail, Exception, 'spam')
with self.assertRaises(Exception) as captured:
fut.result()
self.assertIs(type(captured.exception), Exception)
self.assertEqual(str(captured.exception), 'spam')
cause = captured.exception.__cause__
self.assertIs(type(cause), ExecutionFailed)
for attr in ('__name__', '__qualname__', '__module__'):
self.assertEqual(getattr(cause.excinfo.type, attr),
getattr(Exception, attr))
self.assertEqual(cause.excinfo.msg, 'spam')
@unittest.expectedFailure
def test_submit_exception_in_script(self):
fut = self.executor.submit('raise Exception("spam")')
with self.assertRaises(Exception) as captured:
fut.result()
self.assertIs(type(captured.exception), Exception)
self.assertEqual(str(captured.exception), 'spam')
cause = captured.exception.__cause__
self.assertIs(type(cause), ExecutionFailed)
for attr in ('__name__', '__qualname__', '__module__'):
self.assertEqual(getattr(cause.excinfo.type, attr),
getattr(Exception, attr))
self.assertEqual(cause.excinfo.msg, 'spam')

def test_submit_exception_in_func(self):
fut = self.executor.submit(fail, Exception, 'spam')
with self.assertRaises(Exception) as captured:
fut.result()
self.assertIs(type(captured.exception), Exception)
self.assertEqual(str(captured.exception), 'spam')
cause = captured.exception.__cause__
self.assertIs(type(cause), ExecutionFailed)
for attr in ('__name__', '__qualname__', '__module__'):
self.assertEqual(getattr(cause.excinfo.type, attr),
getattr(Exception, attr))
self.assertEqual(cause.excinfo.msg, 'spam')

def test_saturation(self):
blocker = queues.create()
executor = self.executor_type(4, shared=dict(blocker=blocker))

for i in range(15 * executor._max_workers):
executor.submit('blocker.get()')
executor.submit(exec, 'import __main__; __main__.blocker.get()')
#executor.submit('blocker.get()')
self.assertEqual(len(executor._threads), executor._max_workers)
for i in range(15 * executor._max_workers):
blocker.put_nowait(None)
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.