Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f1208ff

Browse filesBrowse files
Use _interpreters.call().
1 parent fe95f3f commit f1208ff
Copy full SHA for f1208ff

File tree

Expand file treeCollapse file tree

2 files changed

+106
-89
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+106
-89
lines changed

‎Lib/concurrent/futures/interpreter.py

Copy file name to clipboardExpand all lines: Lib/concurrent/futures/interpreter.py
+56-68Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,8 @@ def resolve_task(fn, args, kwargs):
4545
# XXX Circle back to this later.
4646
raise TypeError('scripts not supported')
4747
else:
48-
# Functions defined in the __main__ module can't be pickled,
49-
# so they can't be used here. In the future, we could possibly
50-
# borrow from multiprocessing to work around this.
5148
task = (fn, args, kwargs)
52-
data = pickle.dumps(task)
53-
return data
49+
return task
5450

5551
if initializer is not None:
5652
try:
@@ -65,35 +61,6 @@ def create_context():
6561
return cls(initdata, shared)
6662
return create_context, resolve_task
6763

68-
@classmethod
69-
@contextlib.contextmanager
70-
def _capture_exc(cls, resultsid):
71-
try:
72-
yield
73-
except BaseException as exc:
74-
# Send the captured exception out on the results queue,
75-
# but still leave it unhandled for the interpreter to handle.
76-
_interpqueues.put(resultsid, (None, exc))
77-
raise # re-raise
78-
79-
@classmethod
80-
def _send_script_result(cls, resultsid):
81-
_interpqueues.put(resultsid, (None, None))
82-
83-
@classmethod
84-
def _call(cls, func, args, kwargs, resultsid):
85-
with cls._capture_exc(resultsid):
86-
res = func(*args or (), **kwargs or {})
87-
# Send the result back.
88-
with cls._capture_exc(resultsid):
89-
_interpqueues.put(resultsid, (res, None))
90-
91-
@classmethod
92-
def _call_pickled(cls, pickled, resultsid):
93-
with cls._capture_exc(resultsid):
94-
fn, args, kwargs = pickle.loads(pickled)
95-
cls._call(fn, args, kwargs, resultsid)
96-
9764
def __init__(self, initdata, shared=None):
9865
self.initdata = initdata
9966
self.shared = dict(shared) if shared else None
@@ -104,11 +71,56 @@ def __del__(self):
10471
if self.interpid is not None:
10572
self.finalize()
10673

107-
def _exec(self, script):
108-
assert self.interpid is not None
109-
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
74+
def _call(self, fn, args, kwargs):
75+
def do_call(resultsid, func, *args, **kwargs):
76+
try:
77+
return func(*args, **kwargs)
78+
except BaseException as exc:
79+
# Avoid relying on globals.
80+
import _interpreters
81+
import _interpqueues
82+
# Send the captured exception out on the results queue,
83+
# but still leave it unhandled for the interpreter to handle.
84+
try:
85+
_interpqueues.put(resultsid, exc)
86+
except _interpreters.NotShareableError:
87+
# The exception is not shareable.
88+
import sys
89+
import traceback
90+
print('exception is not shareable:', file=sys.stderr)
91+
traceback.print_exception(exc)
92+
_interpqueues.put(resultsid, None)
93+
raise # re-raise
94+
95+
args = (self.resultsid, fn, *args)
96+
res, excinfo = _interpreters.call(self.interpid, do_call, args, kwargs)
11097
if excinfo is not None:
11198
raise ExecutionFailed(excinfo)
99+
return res
100+
101+
def _get_exception(self):
102+
# Wait for the exception data to show up.
103+
while True:
104+
try:
105+
excdata = _interpqueues.get(self.resultsid)
106+
except _interpqueues.QueueNotFoundError:
107+
raise # re-raise
108+
except _interpqueues.QueueError as exc:
109+
if exc.__cause__ is not None or exc.__context__ is not None:
110+
raise # re-raise
111+
if str(exc).endswith(' is empty'):
112+
continue
113+
else:
114+
raise # re-raise
115+
except ModuleNotFoundError:
116+
# interpreters.queues doesn't exist, which means
117+
# QueueEmpty doesn't. Act as though it does.
118+
continue
119+
else:
120+
break
121+
exc, unboundop = excdata
122+
assert unboundop is None, unboundop
123+
return exc
112124

113125
def initialize(self):
114126
assert self.interpid is None, self.interpid
@@ -119,8 +131,6 @@ def initialize(self):
119131
maxsize = 0
120132
self.resultsid = _interpqueues.create(maxsize)
121133

122-
self._exec(f'from {__name__} import WorkerContext')
123-
124134
if self.shared:
125135
_interpreters.set___main___attrs(
126136
self.interpid, self.shared, restrict=True)
@@ -148,37 +158,15 @@ def finalize(self):
148158
pass
149159

150160
def run(self, task):
151-
data = task
152-
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
153-
161+
fn, args, kwargs = task
154162
try:
155-
self._exec(script)
156-
except ExecutionFailed as exc:
157-
exc_wrapper = exc
158-
else:
159-
exc_wrapper = None
160-
161-
# Return the result, or raise the exception.
162-
while True:
163-
try:
164-
obj = _interpqueues.get(self.resultsid)
165-
except _interpqueues.QueueNotFoundError:
163+
return self._call(fn, args, kwargs)
164+
except ExecutionFailed as wrapper:
165+
exc = self._get_exception()
166+
if exc is None:
167+
# The exception must have been not shareable.
166168
raise # re-raise
167-
except _interpqueues.QueueError:
168-
continue
169-
except ModuleNotFoundError:
170-
# interpreters.queues doesn't exist, which means
171-
# QueueEmpty doesn't. Act as though it does.
172-
continue
173-
else:
174-
break
175-
(res, exc), unboundop = obj
176-
assert unboundop is None, unboundop
177-
if exc is not None:
178-
assert res is None, res
179-
assert exc_wrapper is not None
180-
raise exc from exc_wrapper
181-
return res
169+
raise exc from wrapper
182170

183171

184172
class BrokenInterpreterPool(_thread.BrokenThreadPool):

‎Lib/test/test_concurrent_futures/test_interpreter_pool.py

Copy file name to clipboardExpand all lines: Lib/test/test_concurrent_futures/test_interpreter_pool.py
+50-21Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import contextlib
33
import io
44
import os
5-
import pickle
5+
import select
66
import time
77
import unittest
88
from concurrent.futures.interpreter import (
@@ -22,10 +22,14 @@ def noop():
2222

2323

2424
def write_msg(fd, msg):
25+
import os
2526
os.write(fd, msg + b'\0')
2627

2728

28-
def read_msg(fd):
29+
def read_msg(fd, timeout=10.0):
30+
r, _, _ = select.select([fd], [], [], timeout)
31+
if fd not in r:
32+
raise TimeoutError('nothing to read')
2933
msg = b''
3034
while ch := os.read(fd, 1):
3135
if ch == b'\0':
@@ -121,19 +125,32 @@ def init2():
121125
nonlocal count
122126
count += 1
123127

124-
with self.assertRaises(pickle.PicklingError):
125-
self.executor_type(initializer=init1)
126-
with self.assertRaises(pickle.PicklingError):
127-
self.executor_type(initializer=init2)
128+
with contextlib.redirect_stderr(io.StringIO()) as stderr:
129+
with self.executor_type(initializer=init1) as executor:
130+
fut = executor.submit(lambda: None)
131+
self.assertIn('NotShareableError', stderr.getvalue())
132+
with self.assertRaises(BrokenInterpreterPool):
133+
fut.result()
134+
135+
with contextlib.redirect_stderr(io.StringIO()) as stderr:
136+
with self.executor_type(initializer=init2) as executor:
137+
fut = executor.submit(lambda: None)
138+
self.assertIn('NotShareableError', stderr.getvalue())
139+
with self.assertRaises(BrokenInterpreterPool):
140+
fut.result()
128141

129142
def test_init_instance_method(self):
130143
class Spam:
131144
def initializer(self):
132145
raise NotImplementedError
133146
spam = Spam()
134147

135-
with self.assertRaises(pickle.PicklingError):
136-
self.executor_type(initializer=spam.initializer)
148+
with contextlib.redirect_stderr(io.StringIO()) as stderr:
149+
with self.executor_type(initializer=spam.initializer) as executor:
150+
fut = executor.submit(lambda: None)
151+
self.assertIn('NotShareableError', stderr.getvalue())
152+
with self.assertRaises(BrokenInterpreterPool):
153+
fut.result()
137154

138155
def test_init_shared(self):
139156
msg = b'eggs'
@@ -178,8 +195,6 @@ def test_init_exception_in_func(self):
178195
stderr = stderr.getvalue()
179196
self.assertIn('ExecutionFailed: Exception: spam', stderr)
180197
self.assertIn('Uncaught in the interpreter:', stderr)
181-
self.assertIn('The above exception was the direct cause of the following exception:',
182-
stderr)
183198

184199
@unittest.expectedFailure
185200
def test_submit_script(self):
@@ -208,19 +223,24 @@ def task2():
208223
return spam
209224

210225
executor = self.executor_type()
211-
with self.assertRaises(pickle.PicklingError):
212-
executor.submit(task1)
213-
with self.assertRaises(pickle.PicklingError):
214-
executor.submit(task2)
226+
227+
fut = executor.submit(task1)
228+
with self.assertRaises(_interpreters.NotShareableError):
229+
fut.result()
230+
231+
fut = executor.submit(task2)
232+
with self.assertRaises(_interpreters.NotShareableError):
233+
fut.result()
215234

216235
def test_submit_local_instance(self):
217236
class Spam:
218237
def __init__(self):
219238
self.value = True
220239

221240
executor = self.executor_type()
222-
with self.assertRaises(pickle.PicklingError):
223-
executor.submit(Spam)
241+
fut = executor.submit(Spam)
242+
with self.assertRaises(_interpreters.NotShareableError):
243+
fut.result()
224244

225245
def test_submit_instance_method(self):
226246
class Spam:
@@ -229,8 +249,9 @@ def run(self):
229249
spam = Spam()
230250

231251
executor = self.executor_type()
232-
with self.assertRaises(pickle.PicklingError):
233-
executor.submit(spam.run)
252+
fut = executor.submit(spam.run)
253+
with self.assertRaises(_interpreters.NotShareableError):
254+
fut.result()
234255

235256
def test_submit_func_globals(self):
236257
executor = self.executor_type()
@@ -242,6 +263,7 @@ def test_submit_func_globals(self):
242263

243264
@unittest.expectedFailure
244265
def test_submit_exception_in_script(self):
266+
# Scripts are not supported currently.
245267
fut = self.executor.submit('raise Exception("spam")')
246268
with self.assertRaises(Exception) as captured:
247269
fut.result()
@@ -289,13 +311,20 @@ def test_idle_thread_reuse(self):
289311
executor.shutdown(wait=True)
290312

291313
def test_pickle_errors_propagate(self):
292-
# GH-125864: Pickle errors happen before the script tries to execute, so the
293-
# queue used to wait infinitely.
294-
314+
# GH-125864: Pickle errors happen before the script tries to execute,
315+
# so the queue used to wait infinitely.
295316
fut = self.executor.submit(PickleShenanigans(0))
296317
with self.assertRaisesRegex(RuntimeError, "gotcha"):
297318
fut.result()
298319

320+
def test_no_stale_references(self):
321+
# Weak references don't cross between interpreters.
322+
raise unittest.SkipTest('not applicable')
323+
324+
def test_free_reference(self):
325+
# Weak references don't cross between interpreters.
326+
raise unittest.SkipTest('not applicable')
327+
299328

300329
class AsyncioTest(InterpretersMixin, testasyncio_utils.TestCase):
301330

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.