@@ -25,6 +25,15 @@ class Full(Exception):
25
25
pass
26
26
27
27
28
+ class ShutDown (Exception ):
29
+ '''Raised when put/get with shut-down queue.'''
30
+
31
+
32
+ _queue_alive = "alive"
33
+ _queue_shutdown = "shutdown"
34
+ _queue_shutdown_immediate = "shutdown-immediate"
35
+
36
+
28
37
class Queue :
29
38
'''Create a queue object with a given maximum size.
30
39
@@ -54,6 +63,9 @@ def __init__(self, maxsize=0):
54
63
self .all_tasks_done = threading .Condition (self .mutex )
55
64
self .unfinished_tasks = 0
56
65
66
+ # Queue shut-down state
67
+ self .shutdown_state = _queue_alive
68
+
57
69
def task_done (self ):
58
70
'''Indicate that a formerly enqueued task is complete.
59
71
@@ -87,6 +99,8 @@ def join(self):
87
99
'''
88
100
with self .all_tasks_done :
89
101
while self .unfinished_tasks :
102
+ if self .shutdown_state == _queue_shutdown_immediate :
103
+ return
90
104
self .all_tasks_done .wait ()
91
105
92
106
def qsize (self ):
@@ -130,6 +144,8 @@ def put(self, item, block=True, timeout=None):
130
144
is immediately available, else raise the Full exception ('timeout'
131
145
is ignored in that case).
132
146
'''
147
+ if self .shutdown_state != _queue_alive :
148
+ raise ShutDown
133
149
with self .not_full :
134
150
if self .maxsize > 0 :
135
151
if not block :
@@ -138,6 +154,8 @@ def put(self, item, block=True, timeout=None):
138
154
elif timeout is None :
139
155
while self ._qsize () >= self .maxsize :
140
156
self .not_full .wait ()
157
+ if self .shutdown_state != _queue_alive :
158
+ raise ShutDown
141
159
elif timeout < 0 :
142
160
raise ValueError ("'timeout' must be a non-negative number" )
143
161
else :
@@ -147,6 +165,8 @@ def put(self, item, block=True, timeout=None):
147
165
if remaining <= 0.0 :
148
166
raise Full
149
167
self .not_full .wait (remaining )
168
+ if self .shutdown_state != _queue_alive :
169
+ raise ShutDown
150
170
self ._put (item )
151
171
self .unfinished_tasks += 1
152
172
self .not_empty .notify ()
@@ -162,13 +182,17 @@ def get(self, block=True, timeout=None):
162
182
available, else raise the Empty exception ('timeout' is ignored
163
183
in that case).
164
184
'''
185
+ if self .shutdown_state == _queue_shutdown_immediate :
186
+ raise ShutDown
165
187
with self .not_empty :
166
188
if not block :
167
189
if not self ._qsize ():
168
190
raise Empty
169
191
elif timeout is None :
170
192
while not self ._qsize ():
171
193
self .not_empty .wait ()
194
+ if self .shutdown_state == _queue_shutdown_immediate :
195
+ raise ShutDown
172
196
elif timeout < 0 :
173
197
raise ValueError ("'timeout' must be a non-negative number" )
174
198
else :
@@ -178,6 +202,8 @@ def get(self, block=True, timeout=None):
178
202
if remaining <= 0.0 :
179
203
raise Empty
180
204
self .not_empty .wait (remaining )
205
+ if self .shutdown_state == _queue_shutdown_immediate :
206
+ raise ShutDown
181
207
item = self ._get ()
182
208
self .not_full .notify ()
183
209
return item
@@ -198,6 +224,26 @@ def get_nowait(self):
198
224
'''
199
225
return self .get (block = False )
200
226
227
+ def shutdown (self , immediate = False ):
228
+ '''Shut-down the queue, making queue gets and puts raise.
229
+
230
+ By default, gets will only raise once the queue is empty. Set
231
+ 'immediate' to True to make gets raise immediately instead.
232
+
233
+ All blocked callers of put(), get() and join() will be
234
+ unblocked. The ShutDown exception is raised.
235
+ '''
236
+ if immediate :
237
+ self .shutdown_state = _queue_shutdown_immediate
238
+ with self .not_empty :
239
+ self .not_empty .notify_all ()
240
+ with self .all_tasks_done :
241
+ self .all_tasks_done .notify_all ()
242
+ else :
243
+ self .shutdown_state = _queue_shutdown
244
+ with self .not_full :
245
+ self .not_full .notify_all ()
246
+
201
247
# Override these methods to implement other queue organizations
202
248
# (e.g. stack or priority queue).
203
249
# These will only be called with appropriate locks held
0 commit comments