Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

gh-117531: Unblock getters after non-immediate queue shutdown #117532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 10, 2024
6 changes: 4 additions & 2 deletions 6 Doc/library/queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,10 @@ them down.
queue is empty. Set *immediate* to true to make :meth:`~Queue.get` raise
immediately instead.

All blocked callers of :meth:`~Queue.put` will be unblocked. If *immediate*
is true, also unblock callers of :meth:`~Queue.get` and :meth:`~Queue.join`.
All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get` will be
unblocked. If *immediate* is true, a task will be marked as done for each
remaining item in the queue, which may unblock callers of
:meth:`~Queue.join`.

.. versionadded:: 3.13

Expand Down
8 changes: 5 additions & 3 deletions 8 Lib/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,9 @@ def shutdown(self, immediate=False):
By default, gets will only raise once the queue is empty. Set
'immediate' to True to make gets raise immediately instead.

All blocked callers of put() will be unblocked, and also get()
and join() if 'immediate'.
All blocked callers of put() and get() will be unblocked. If
'immediate', a task is marked as done for each item remaining in
the queue, which may unblock callers of join().
'''
with self.mutex:
self.is_shutdown = True
Expand All @@ -249,9 +250,10 @@ def shutdown(self, immediate=False):
self._get()
if self.unfinished_tasks > 0:
self.unfinished_tasks -= 1
self.not_empty.notify_all()
# release all blocked threads in `join()`
self.all_tasks_done.notify_all()
# All getters need to re-check queue-empty to raise ShutDown
EpicWink marked this conversation as resolved.
Show resolved Hide resolved
self.not_empty.notify_all()
self.not_full.notify_all()

# Override these methods to implement other queue organizations
Expand Down
17 changes: 17 additions & 0 deletions 17 Lib/test/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,23 @@ def test_shutdown_get_task_done_join(self):

self.assertEqual(results, [True]*len(thrds))

def test_shutdown_pending_get(self):
def get():
try:
results.append(q.get())
except Exception as e:
results.append(e)

q = self.type2test()
results = []
get_thread = threading.Thread(target=get)
get_thread.start()
q.shutdown(immediate=False)
get_thread.join(timeout=10.0)
self.assertFalse(get_thread.is_alive())
self.assertEqual(len(results), 1)
self.assertIsInstance(results[0], self.queue.ShutDown)


class QueueTest(BaseQueueTestMixin):

Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.