Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 90279ad

Browse filesBrowse files
committed
WIP: consume queue on immediate shutdown
1 parent ee8d4df commit 90279ad
Copy full SHA for 90279ad

File tree

Expand file treeCollapse file tree

2 files changed

+197
-280
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+197
-280
lines changed

‎Lib/multiprocessing/queues.py

Copy file name to clipboardExpand all lines: Lib/multiprocessing/queues.py
+30-54Lines changed: 30 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@
2626

2727
from .util import debug, info, Finalize, register_after_fork, is_exiting
2828

29-
_queue_alive = 0
30-
_queue_shutdown = 1
31-
_queue_shutdown_immediate = 2
32-
3329
#
3430
# Queue type using a pipe, buffer and thread
3531
#
@@ -52,7 +48,7 @@ def __init__(self, maxsize=0, *, ctx):
5248
# For use by concurrent.futures
5349
self._ignore_epipe = False
5450
self._reset()
55-
self._shutdown_state = ctx.Value('i', _queue_alive)
51+
self._is_shutdown = ctx.Value('B', False, lock=self._rlock)
5652

5753
if sys.platform != 'win32':
5854
register_after_fork(self, Queue._after_fork)
@@ -61,12 +57,12 @@ def __getstate__(self):
6157
context.assert_spawning(self)
6258
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
6359
self._rlock, self._wlock, self._sem, self._opid,
64-
self._shutdown_state)
60+
self._is_shutdown)
6561

6662
def __setstate__(self, state):
6763
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
6864
self._rlock, self._wlock, self._sem, self._opid,
69-
self._shutdown_state) = state
65+
self._is_shutdown) = state
7066
self._reset()
7167

7268
def _after_fork(self):
@@ -88,32 +84,19 @@ def _reset(self, after_fork=False):
8884
self._recv_bytes = self._reader.recv_bytes
8985
self._poll = self._reader.poll
9086

91-
def _is_alive(self):
92-
return self._shutdown_state.value == _queue_alive
93-
94-
def _is_shutdown(self):
95-
return self._shutdown_state.value == _queue_shutdown
96-
97-
def _is_shutdown_immediate(self):
98-
return self._shutdown_state.value == _queue_shutdown_immediate
99-
100-
def _set_shutdown(self):
101-
self._shutdown_state.value = _queue_shutdown
102-
103-
def _set_shutdown_immediate(self):
104-
self._shutdown_state.value = _queue_shutdown_immediate
105-
10687
def put(self, obj, block=True, timeout=None):
10788
if self._closed:
10889
raise ValueError(f"Queue {self!r} is closed")
109-
if not self._is_alive():
90+
if self._is_shutdown.value:
11091
raise ShutDown
11192
if not self._sem.acquire(block, timeout):
112-
if not self._is_alive():
93+
if self._is_shutdown.value:
11394
raise ShutDown
11495
raise Full
11596

11697
with self._notempty:
98+
if self._is_shutdown.value:
99+
raise ShutDown
117100
if self._thread is None:
118101
self._start_thread()
119102
self._buffer.append(obj)
@@ -124,36 +107,29 @@ def get(self, block=True, timeout=None):
124107
raise ValueError(f"Queue {self!r} is closed")
125108
if block and timeout is None:
126109
with self._rlock:
127-
# checks shutdown state
128-
if (self._is_shutdown_immediate()
129-
or (self._is_shutdown() and self.empty())):
110+
if self._is_shutdown.value and self.empty():
130111
raise ShutDown
131112
res = self._recv_bytes()
132113
self._sem.release()
133114
else:
134115
if block:
135116
deadline = time.monotonic() + timeout
136117
if not self._rlock.acquire(block, timeout):
137-
if (self._is_shutdown_immediate()
138-
or (self._is_shutdown() and self.empty())):
118+
if self._is_shutdown.value and self.empty():
139119
raise ShutDown
140120
raise Empty
141121
try:
142122
if block:
143123
timeout = deadline - time.monotonic()
144124
if not self._poll(timeout):
145-
if not self._is_alive():
125+
if self._is_shutdown.value:
146126
raise ShutDown
147127
raise Empty
148128
elif not self._poll():
149-
if not self._is_alive():
129+
if self._is_shutdown.value:
150130
raise ShutDown
151131
raise Empty
152132

153-
# here queue is not empty
154-
if self._is_shutdown_immediate():
155-
raise ShutDown
156-
# here shutdown state queue is alive or shutdown
157133
res = self._recv_bytes()
158134
self._sem.release()
159135
finally:
@@ -178,18 +154,21 @@ def get_nowait(self):
178154
def put_nowait(self, obj):
179155
return self.put(obj, False)
180156

157+
def _clear(self):
158+
with self._rlock:
159+
while self._poll():
160+
self._recv_bytes()
161+
181162
def shutdown(self, immediate=False):
182163
if self._closed:
183164
raise ValueError(f"Queue {self!r} is closed")
184-
with self._shutdown_state.get_lock():
185-
if self._is_shutdown_immediate():
186-
return
165+
with self._is_shutdown.get_lock():
166+
self._is_shutdown.value = True
187167
if immediate:
188-
self._set_shutdown_immediate()
168+
self._clear()
189169
with self._notempty:
190170
self._notempty.notify_all()
191-
else:
192-
self._set_shutdown()
171+
self._sem.release(self.qsize())
193172

194173
def close(self):
195174
self._closed = True
@@ -384,14 +363,16 @@ def __setstate__(self, state):
384363
def put(self, obj, block=True, timeout=None):
385364
if self._closed:
386365
raise ValueError(f"Queue {self!r} is closed")
387-
if not self._is_alive():
366+
if self._is_shutdown.value:
388367
raise ShutDown
389368
if not self._sem.acquire(block, timeout):
390-
if not self._is_alive():
369+
if self._is_shutdown.value:
391370
raise ShutDown
392371
raise Full
393372

394373
with self._notempty, self._cond:
374+
if self._is_shutdown.value:
375+
raise ShutDown
395376
if self._thread is None:
396377
self._start_thread()
397378
self._buffer.append(obj)
@@ -400,27 +381,22 @@ def put(self, obj, block=True, timeout=None):
400381

401382
def task_done(self):
402383
with self._cond:
403-
if self._is_shutdown_immediate():
404-
raise ShutDown
405384
if not self._unfinished_tasks.acquire(False):
406385
raise ValueError('task_done() called too many times')
407386
if self._unfinished_tasks._semlock._is_zero():
408387
self._cond.notify_all()
409388

410389
def join(self):
411390
with self._cond:
412-
if self._is_shutdown_immediate():
413-
raise ShutDown
414391
if not self._unfinished_tasks._semlock._is_zero():
415392
self._cond.wait()
416-
if self._is_shutdown_immediate():
417-
raise ShutDown
418393

419-
def shutdown(self, immediate=False):
420-
with self._cond:
421-
is_alive = self._is_alive()
422-
super().shutdown(immediate)
423-
if is_alive:
394+
def _clear(self):
395+
with self._rlock:
396+
while self._poll():
397+
self._recv_bytes()
398+
self._unfinished_tasks.acquire(block=False)
399+
with self._cond:
424400
self._cond.notify_all()
425401

426402
#

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.