Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4a9e649

Browse filesBrowse files
encukoubityobpitrou
authored
gh-104090: Add exit code to multiprocessing ResourceTracker (GH-115410)
This builds on #106807, which adds a return code to ResourceTracker, to make future debugging easier. Testing this “in situ” proved difficult, since the global ResourceTracker is involved in test infrastructure. So, the tests here create a new instance and feed it fake data. --------- Co-authored-by: Yonatan Bitton <yonatan.bitton@perception-point.io> Co-authored-by: Yonatan Bitton <bityob@gmail.com> Co-authored-by: Antoine Pitrou <antoine@python.org>
1 parent b052fa3 commit 4a9e649
Copy full SHA for 4a9e649

File tree

Expand file treeCollapse file tree

4 files changed

+94
-7
lines changed
Filter options
Expand file treeCollapse file tree

4 files changed

+94
-7
lines changed

‎Lib/multiprocessing/resource_tracker.py

Copy file name to clipboardExpand all lines: Lib/multiprocessing/resource_tracker.py
+32-6Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@
2929
_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
3030
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
3131

32+
def cleanup_noop(name):
33+
raise RuntimeError('noop should never be registered or cleaned up')
34+
3235
_CLEANUP_FUNCS = {
33-
'noop': lambda: None,
36+
'noop': cleanup_noop,
37+
'dummy': lambda name: None, # Dummy resource used in tests
3438
}
3539

3640
if os.name == 'posix':
@@ -61,6 +65,7 @@ def __init__(self):
6165
self._lock = threading.RLock()
6266
self._fd = None
6367
self._pid = None
68+
self._exitcode = None
6469

6570
def _reentrant_call_error(self):
6671
# gh-109629: this happens if an explicit call to the ResourceTracker
@@ -84,9 +89,16 @@ def _stop(self):
8489
os.close(self._fd)
8590
self._fd = None
8691

87-
os.waitpid(self._pid, 0)
92+
_, status = os.waitpid(self._pid, 0)
93+
8894
self._pid = None
8995

96+
try:
97+
self._exitcode = os.waitstatus_to_exitcode(status)
98+
except ValueError:
99+
# os.waitstatus_to_exitcode may raise an exception for invalid values
100+
self._exitcode = None
101+
90102
def getfd(self):
91103
self.ensure_running()
92104
return self._fd
@@ -119,6 +131,7 @@ def ensure_running(self):
119131
pass
120132
self._fd = None
121133
self._pid = None
134+
self._exitcode = None
122135

123136
warnings.warn('resource_tracker: process died unexpectedly, '
124137
'relaunching. Some resources might leak.')
@@ -221,6 +234,8 @@ def main(fd):
221234
pass
222235

223236
cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
237+
exit_code = 0
238+
224239
try:
225240
# keep track of registered/unregistered resources
226241
with open(fd, 'rb') as f:
@@ -242,6 +257,7 @@ def main(fd):
242257
else:
243258
raise RuntimeError('unrecognized command %r' % cmd)
244259
except Exception:
260+
exit_code = 3
245261
try:
246262
sys.excepthook(*sys.exc_info())
247263
except:
@@ -251,10 +267,17 @@ def main(fd):
251267
for rtype, rtype_cache in cache.items():
252268
if rtype_cache:
253269
try:
254-
warnings.warn(
255-
f'resource_tracker: There appear to be {len(rtype_cache)} '
256-
f'leaked {rtype} objects to clean up at shutdown: {rtype_cache}'
257-
)
270+
exit_code = 1
271+
if rtype == 'dummy':
272+
# The test 'dummy' resource is expected to leak.
273+
# We skip the warning (and *only* the warning) for it.
274+
pass
275+
else:
276+
warnings.warn(
277+
f'resource_tracker: There appear to be '
278+
f'{len(rtype_cache)} leaked {rtype} objects to '
279+
f'clean up at shutdown: {rtype_cache}'
280+
)
258281
except Exception:
259282
pass
260283
for name in rtype_cache:
@@ -265,6 +288,9 @@ def main(fd):
265288
try:
266289
_CLEANUP_FUNCS[rtype](name)
267290
except Exception as e:
291+
exit_code = 2
268292
warnings.warn('resource_tracker: %r: %s' % (name, e))
269293
finally:
270294
pass
295+
296+
sys.exit(exit_code)

‎Lib/test/_test_multiprocessing.py

Copy file name to clipboardExpand all lines: Lib/test/_test_multiprocessing.py
+34-1Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5609,8 +5609,9 @@ def create_and_register_resource(rtype):
56095609
'''
56105610
for rtype in resource_tracker._CLEANUP_FUNCS:
56115611
with self.subTest(rtype=rtype):
5612-
if rtype == "noop":
5612+
if rtype in ("noop", "dummy"):
56135613
# Artefact resource type used by the resource_tracker
5614+
# or tests
56145615
continue
56155616
r, w = os.pipe()
56165617
p = subprocess.Popen([sys.executable,
@@ -5730,6 +5731,38 @@ def test_too_long_name_resource(self):
57305731
with self.assertRaises(ValueError):
57315732
resource_tracker.register(too_long_name_resource, rtype)
57325733

5734+
def _test_resource_tracker_leak_resources(self, cleanup):
5735+
# We use a separate instance for testing, since the main global
5736+
# _resource_tracker may be used to watch test infrastructure.
5737+
from multiprocessing.resource_tracker import ResourceTracker
5738+
tracker = ResourceTracker()
5739+
tracker.ensure_running()
5740+
self.assertTrue(tracker._check_alive())
5741+
5742+
self.assertIsNone(tracker._exitcode)
5743+
tracker.register('somename', 'dummy')
5744+
if cleanup:
5745+
tracker.unregister('somename', 'dummy')
5746+
expected_exit_code = 0
5747+
else:
5748+
expected_exit_code = 1
5749+
5750+
self.assertTrue(tracker._check_alive())
5751+
self.assertIsNone(tracker._exitcode)
5752+
tracker._stop()
5753+
self.assertEqual(tracker._exitcode, expected_exit_code)
5754+
5755+
def test_resource_tracker_exit_code(self):
5756+
"""
5757+
Test the exit code of the resource tracker.
5758+
5759+
If no leaked resources were found, exit code should be 0, otherwise 1
5760+
"""
5761+
for cleanup in [True, False]:
5762+
with self.subTest(cleanup=cleanup):
5763+
self._test_resource_tracker_leak_resources(
5764+
cleanup=cleanup,
5765+
)
57335766

57345767
class TestSimpleQueue(unittest.TestCase):
57355768

‎Lib/test/test_concurrent_futures/test_init.py

Copy file name to clipboardExpand all lines: Lib/test/test_concurrent_futures/test_init.py
+26Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import queue
44
import time
55
import unittest
6+
import sys
67
from concurrent.futures._base import BrokenExecutor
78
from logging.handlers import QueueHandler
89

@@ -109,6 +110,31 @@ def _assert_logged(self, msg):
109110
create_executor_tests(globals(), FailingInitializerMixin)
110111

111112

113+
@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows")
114+
class FailingInitializerResourcesTest(unittest.TestCase):
115+
"""
116+
Source: https://github.com/python/cpython/issues/104090
117+
"""
118+
119+
def _test(self, test_class):
120+
runner = unittest.TextTestRunner()
121+
runner.run(test_class('test_initializer'))
122+
123+
# GH-104090:
124+
# Stop resource tracker manually now, so we can verify there are not leaked resources by checking
125+
# the process exit code
126+
from multiprocessing.resource_tracker import _resource_tracker
127+
_resource_tracker._stop()
128+
129+
self.assertEqual(_resource_tracker._exitcode, 0)
130+
131+
def test_spawn(self):
132+
self._test(ProcessPoolSpawnFailingInitializerTest)
133+
134+
def test_forkserver(self):
135+
self._test(ProcessPoolForkserverFailingInitializerTest)
136+
137+
112138
def setUpModule():
113139
setup_module()
114140

+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
The multiprocessing resource tracker now exits with non-zero status code if a resource
2+
leak was detected. It still exits with status code 0 otherwise.

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.