Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4fd0640

Browse filesBrowse files
committed
Add threading implementation of queue shutdown
1 parent 0cd33e1 commit 4fd0640
Copy full SHA for 4fd0640

File tree

Expand file treeCollapse file tree

1 file changed

+46
-0
lines changed
Filter options
Expand file treeCollapse file tree

1 file changed

+46
-0
lines changed

‎Lib/queue.py

Copy file name to clipboardExpand all lines: Lib/queue.py
+46Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@ class Full(Exception):
2525
pass
2626

2727

28+
class ShutDown(Exception):
29+
'''Raised when put/get with shut-down queue.'''
30+
31+
32+
_queue_alive = "alive"
33+
_queue_shutdown = "shutdown"
34+
_queue_shutdown_immediate = "shutdown-immediate"
35+
36+
2837
class Queue:
2938
'''Create a queue object with a given maximum size.
3039
@@ -54,6 +63,9 @@ def __init__(self, maxsize=0):
5463
self.all_tasks_done = threading.Condition(self.mutex)
5564
self.unfinished_tasks = 0
5665

66+
# Queue shut-down state
67+
self.shutdown_state = _queue_alive
68+
5769
def task_done(self):
5870
'''Indicate that a formerly enqueued task is complete.
5971
@@ -87,6 +99,8 @@ def join(self):
8799
'''
88100
with self.all_tasks_done:
89101
while self.unfinished_tasks:
102+
if self.shutdown_state == _queue_shutdown_immediate:
103+
return
90104
self.all_tasks_done.wait()
91105

92106
def qsize(self):
@@ -130,6 +144,8 @@ def put(self, item, block=True, timeout=None):
130144
is immediately available, else raise the Full exception ('timeout'
131145
is ignored in that case).
132146
'''
147+
if self.shutdown_state != _queue_alive:
148+
raise ShutDown
133149
with self.not_full:
134150
if self.maxsize > 0:
135151
if not block:
@@ -138,6 +154,8 @@ def put(self, item, block=True, timeout=None):
138154
elif timeout is None:
139155
while self._qsize() >= self.maxsize:
140156
self.not_full.wait()
157+
if self.shutdown_state != _queue_alive:
158+
raise ShutDown
141159
elif timeout < 0:
142160
raise ValueError("'timeout' must be a non-negative number")
143161
else:
@@ -147,6 +165,8 @@ def put(self, item, block=True, timeout=None):
147165
if remaining <= 0.0:
148166
raise Full
149167
self.not_full.wait(remaining)
168+
if self.shutdown_state != _queue_alive:
169+
raise ShutDown
150170
self._put(item)
151171
self.unfinished_tasks += 1
152172
self.not_empty.notify()
@@ -162,13 +182,17 @@ def get(self, block=True, timeout=None):
162182
available, else raise the Empty exception ('timeout' is ignored
163183
in that case).
164184
'''
185+
if self.shutdown_state == _queue_shutdown_immediate:
186+
raise ShutDown
165187
with self.not_empty:
166188
if not block:
167189
if not self._qsize():
168190
raise Empty
169191
elif timeout is None:
170192
while not self._qsize():
171193
self.not_empty.wait()
194+
if self.shutdown_state == _queue_shutdown_immediate:
195+
raise ShutDown
172196
elif timeout < 0:
173197
raise ValueError("'timeout' must be a non-negative number")
174198
else:
@@ -178,6 +202,8 @@ def get(self, block=True, timeout=None):
178202
if remaining <= 0.0:
179203
raise Empty
180204
self.not_empty.wait(remaining)
205+
if self.shutdown_state == _queue_shutdown_immediate:
206+
raise ShutDown
181207
item = self._get()
182208
self.not_full.notify()
183209
return item
@@ -198,6 +224,26 @@ def get_nowait(self):
198224
'''
199225
return self.get(block=False)
200226

227+
def shutdown(self, immediate=False):
228+
'''Shut-down the queue, making queue gets and puts raise.
229+
230+
By default, gets will only raise once the queue is empty. Set
231+
'immediate' to True to make gets raise immediately instead.
232+
233+
All blocked callers of put(), get() and join() will be
234+
unblocked. The ShutDown exception is raised.
235+
'''
236+
if immediate:
237+
self.shutdown_state = _queue_shutdown_immediate
238+
with self.not_empty:
239+
self.not_empty.notify_all()
240+
with self.all_tasks_done:
241+
self.all_tasks_done.notify_all()
242+
else:
243+
self.shutdown_state = _queue_shutdown
244+
with self.not_full:
245+
self.not_full.notify_all()
246+
201247
# Override these methods to implement other queue organizations
202248
# (e.g. stack or priority queue).
203249
# These will only be called with appropriate locks held

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.