Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

gh-96471: Add multiprocessing queue shutdown #104230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
Loading
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions 25 Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,8 @@
free slot was available within that time. Otherwise (*block* is
``False``), put an item on the queue if a free slot is immediately
available, else raise the :exc:`queue.Full` exception (*timeout* is
ignored in that case).
ignored in that case). Raises the :exc:`queue.ShutDown` if the queue has
been shut down.

.. versionchanged:: 3.8
If the queue is closed, :exc:`ValueError` is raised instead of
Expand All @@ -890,7 +891,9 @@
it blocks at most *timeout* seconds and raises the :exc:`queue.Empty`
exception if no item was available within that time. Otherwise (block is
``False``), return an item if one is immediately available, else raise the
:exc:`queue.Empty` exception (*timeout* is ignored in that case).
:exc:`queue.Empty` exception (*timeout* is ignored in that case). Raises
the :exc:`queue.ShutDown` exception if the queue has been shut down and
is empty, or if the queue has been shut down immediately.

.. versionchanged:: 3.8
If the queue is closed, :exc:`ValueError` is raised instead of
Expand All @@ -900,6 +903,21 @@

Equivalent to ``get(False)``.

.. method:: shutdown(immediate=False)

Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put`
raise :exc:`queue.ShutDown`.

By default, :meth:`~Queue.get` on a shut down queue will only raise once
the queue is empty. Set *immediate* to true to make :meth:`~Queue.get`
raise immediately instead.

All blocked callers of :meth:`~Queue.put` will be unblocked. If

Check warning on line 915 in Doc/library/multiprocessing.rst

View workflow job for this annotation

GitHub Actions / Docs / Docs

py:meth reference target not found: Queue.join [ref.meth]
*immediate* is true, also unblock callers of :meth:`~Queue.get` and
:meth:`~Queue.join`.

.. versionadded:: 3.13

:class:`multiprocessing.Queue` has a few additional methods not found in
:class:`queue.Queue`. These methods are usually unnecessary for most
code:
Expand Down Expand Up @@ -988,6 +1006,9 @@
items have been processed (meaning that a :meth:`task_done` call was
received for every item that had been :meth:`~Queue.put` into the queue).

``shutdown(immediate=True)`` calls :meth:`task_done` for each remaining
item in the queue.

Raises a :exc:`ValueError` if called more times than there were items
placed in the queue.

Expand Down
2 changes: 2 additions & 0 deletions 2 Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ multiprocessing
``d |= {'b': 2}`` for proxies of :class:`dict`.

(Contributed by Roy Hyunjin Han for :gh:`103134`.)
* Add :meth:`multiprocessing.Queue.shutdown` for queue termination.
(Contributed by Laurie Opperman in :gh:`104230`.)


operator
Expand Down
58 changes: 55 additions & 3 deletions 58 Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import weakref
import errno

from queue import Empty, Full
from queue import Empty, Full, ShutDown
EpicWink marked this conversation as resolved.
Show resolved Hide resolved

from . import connection
from . import context
Expand Down Expand Up @@ -48,18 +48,21 @@ def __init__(self, maxsize=0, *, ctx):
# For use by concurrent.futures
self._ignore_epipe = False
self._reset()
self._is_shutdown = ctx.Value('B', False)

if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)

def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
self._rlock, self._wlock, self._sem, self._opid,
self._is_shutdown)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._rlock, self._wlock, self._sem, self._opid,
self._is_shutdown) = state
self._reset()

def _after_fork(self):
Expand All @@ -84,10 +87,16 @@ def _reset(self, after_fork=False):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._is_shutdown.value:
raise ShutDown
if not self._sem.acquire(block, timeout):
if self._is_shutdown.value:
raise ShutDown
raise Full

with self._notempty:
if self._is_shutdown.value:
raise ShutDown
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
Expand All @@ -98,24 +107,34 @@ def get(self, block=True, timeout=None):
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
if self._is_shutdown.value and self.empty():
raise ShutDown
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
if self._is_shutdown.value and self.empty():
raise ShutDown
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
if self._is_shutdown.value:
raise ShutDown
raise Empty
elif not self._poll():
if self._is_shutdown.value:
raise ShutDown
raise Empty

res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()

# unserialize the data after having released the lock
return _ForkingPickler.loads(res)

Expand All @@ -135,6 +154,25 @@ def get_nowait(self):
def put_nowait(self, obj):
return self.put(obj, False)

def _clear(self):
with self._rlock:
while self._poll():
self._recv_bytes()

def shutdown(self, immediate=False):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
with self._is_shutdown.get_lock():
self._is_shutdown.value = True
if immediate:
self._clear()
# TODO: unblock all getters to check empty (then shutdown)
for _ in range(self._maxsize):
try:
self._sem.release()
except ValueError:
break

def close(self):
self._closed = True
close = self._close
Expand Down Expand Up @@ -328,10 +366,16 @@ def __setstate__(self, state):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._is_shutdown.value:
raise ShutDown
if not self._sem.acquire(block, timeout):
if self._is_shutdown.value:
raise ShutDown
raise Full

with self._notempty, self._cond:
if self._is_shutdown.value:
raise ShutDown
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
Expand All @@ -350,6 +394,14 @@ def join(self):
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()

def _clear(self):
with self._rlock:
while self._poll():
self._recv_bytes()
self._unfinished_tasks.acquire(block=False)
with self._cond:
self._cond.notify_all()

#
# Simplified Queue type -- really just a locked pipe
#
Expand Down
Loading
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.