From 95f74b98ab9bdeb5f2d176e36c7a42f0def1a163 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 20 Mar 2017 19:43:03 +0100 Subject: [PATCH 1/2] bpo-29861: release references to multiprocessing Pool tasks Release references to tasks, their arguments and their results as soon as they are finished, instead of keeping them alive until another task arrives. --- Lib/multiprocessing/pool.py | 7 ++++++- Lib/test/_test_multiprocessing.py | 25 +++++++++++++++++++++++++ Misc/NEWS | 3 +++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index ffdf42614d59eba..ae8cec44796b211 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -128,6 +128,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, util.debug("Possible encoding error while sending result: %s" % ( wrapped)) put((job, i, (False, wrapped))) + + task = job = result = func = args = kwds = None completed += 1 util.debug('worker exiting after %d tasks' % completed) @@ -402,10 +404,11 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache): if set_length: util.debug('doing set_length()') set_length(i+1) + finally: + task = taskseq = job = None else: util.debug('task handler got sentinel') - try: # tell result handler to finish when cache is empty util.debug('task handler sending sentinel to result handler') @@ -445,6 +448,7 @@ def _handle_results(outqueue, get, cache): cache[job]._set(i, obj) except KeyError: pass + task = job = obj = None while cache and thread._state != TERMINATE: try: @@ -461,6 +465,7 @@ def _handle_results(outqueue, get, cache): cache[job]._set(i, obj) except KeyError: pass + task = job = obj = None if hasattr(outqueue, '_reader'): util.debug('ensuring that outqueue is not full') diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index b5f47825466de6c..82827dcc92e6fe5 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -18,6 +18,7 @@ import logging import struct import operator +import weakref import test.support import test.support.script_helper @@ -1738,6 +1739,19 @@ def raise_large_valuerror(wait): time.sleep(wait) raise ValueError("x" * 1024**2) +def identity(x): + return x + +class CountedObject(object): + n_instances = 0 + + def __new__(cls): + cls.n_instances += 1 + return object.__new__(cls) + + def __del__(self): + type(self).n_instances -= 1 + class SayWhenError(ValueError): pass def exception_throwing_generator(total, when): @@ -1746,6 +1760,7 @@ def exception_throwing_generator(total, when): raise SayWhenError("Somebody said when") yield i + class _TestPool(BaseTestCase): @classmethod @@ -2000,6 +2015,16 @@ def test_map_no_failfast(self): # check that we indeed waited for all jobs self.assertGreater(time.time() - t_start, 0.9) + def test_release_task_refs(self): + # Issue #29861: + objs = [CountedObject() for i in range(10)] + refs = [weakref.ref(o) for o in objs] + self.pool.map(identity, objs) + + del objs + self.assertEqual(set(wr() for wr in refs), {None}) + self.assertEqual(CountedObject.n_instances, 0) + def raising(): raise KeyError("key") diff --git a/Misc/NEWS b/Misc/NEWS index d8ea4c91e54e1ad..508895433de8a8b 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -282,6 +282,9 @@ Extension Modules Library ------- +- bpo-29861: Release references to tasks, their arguments and their results + as soon as they are finished in multiprocessing.Pool. + - bpo-25455: Fixed crashes in repr of recursive buffered file-like objects. - bpo-29800: Fix crashes in partial.__repr__ if the keys of partial.keywords From c73847c6d518a1f6e033064b20734aacd6194a22 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 20 Mar 2017 19:46:00 +0100 Subject: [PATCH 2/2] Comments in test --- Lib/test/_test_multiprocessing.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 82827dcc92e6fe5..1d3bb0f8bae7816 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2016,13 +2016,16 @@ def test_map_no_failfast(self): self.assertGreater(time.time() - t_start, 0.9) def test_release_task_refs(self): - # Issue #29861: + # Issue #29861: task arguments and results should not be kept + # alive after we are done with them. objs = [CountedObject() for i in range(10)] refs = [weakref.ref(o) for o in objs] self.pool.map(identity, objs) del objs self.assertEqual(set(wr() for wr in refs), {None}) + # With a process pool, copies of the objects are returned, check + # they were released too. self.assertEqual(CountedObject.n_instances, 0)