1
- __all__ = ('Queue' , 'PriorityQueue' , 'LifoQueue' , 'QueueFull' , 'QueueEmpty' )
1
+ __all__ = (
2
+ 'Queue' ,
3
+ 'PriorityQueue' ,
4
+ 'LifoQueue' ,
5
+ 'QueueFull' ,
6
+ 'QueueEmpty' ,
7
+ 'QueueShutDown' ,
8
+ )
2
9
3
10
import collections
4
11
import heapq
@@ -18,6 +25,16 @@ class QueueFull(Exception):
18
25
pass
19
26
20
27
28
+ class QueueShutDown (Exception ):
29
+ """Raised when putting on to or getting from a shut-down Queue."""
30
+ pass
31
+
32
+
33
+ _queue_alive = "alive"
34
+ _queue_shutdown = "shutdown"
35
+ _queue_shutdown_immediate = "shutdown-immediate"
36
+
37
+
21
38
class Queue (mixins ._LoopBoundMixin ):
22
39
"""A queue, useful for coordinating producer and consumer coroutines.
23
40
@@ -41,6 +58,7 @@ def __init__(self, maxsize=0):
41
58
self ._finished = locks .Event ()
42
59
self ._finished .set ()
43
60
self ._init (maxsize )
61
+ self .shutdown_state = _queue_alive
44
62
45
63
# These three are overridable in subclasses.
46
64
@@ -113,6 +131,8 @@ async def put(self, item):
113
131
Put an item into the queue. If the queue is full, wait until a free
114
132
slot is available before adding item.
115
133
"""
134
+ if self .shutdown_state != _queue_alive :
135
+ raise QueueShutDown
116
136
while self .full ():
117
137
putter = self ._get_loop ().create_future ()
118
138
self ._putters .append (putter )
@@ -132,13 +152,17 @@ async def put(self, item):
132
152
# the call. Wake up the next in line.
133
153
self ._wakeup_next (self ._putters )
134
154
raise
155
+ if self .shutdown_state != _queue_alive :
156
+ raise QueueShutDown
135
157
return self .put_nowait (item )
136
158
137
159
def put_nowait (self , item ):
138
160
"""Put an item into the queue without blocking.
139
161
140
162
If no free slot is immediately available, raise QueueFull.
141
163
"""
164
+ if self .shutdown_state != _queue_alive :
165
+ raise QueueShutDown
142
166
if self .full ():
143
167
raise QueueFull
144
168
self ._put (item )
@@ -151,7 +175,11 @@ async def get(self):
151
175
152
176
If queue is empty, wait until an item is available.
153
177
"""
178
+ if self .shutdown_state == _queue_shutdown_immediate :
179
+ raise QueueShutDown
154
180
while self .empty ():
181
+ if self .shutdown_state != _queue_alive :
182
+ raise QueueShutDown
155
183
getter = self ._get_loop ().create_future ()
156
184
self ._getters .append (getter )
157
185
try :
@@ -170,6 +198,8 @@ async def get(self):
170
198
# the call. Wake up the next in line.
171
199
self ._wakeup_next (self ._getters )
172
200
raise
201
+ if self .shutdown_state == _queue_shutdown_immediate :
202
+ raise QueueShutDown
173
203
return self .get_nowait ()
174
204
175
205
def get_nowait (self ):
@@ -178,7 +208,11 @@ def get_nowait(self):
178
208
Return an item if one is immediately available, else raise QueueEmpty.
179
209
"""
180
210
if self .empty ():
211
+ if self .shutdown_state != _queue_alive :
212
+ raise QueueShutDown
181
213
raise QueueEmpty
214
+ elif self .shutdown_state == _queue_shutdown_immediate :
215
+ raise QueueShutDown
182
216
item = self ._get ()
183
217
self ._wakeup_next (self ._putters )
184
218
return item
@@ -214,6 +248,27 @@ async def join(self):
214
248
if self ._unfinished_tasks > 0 :
215
249
await self ._finished .wait ()
216
250
251
+ def shutdown (self , immediate = False ):
252
+ """Shut-down the queue, making queue gets and puts raise.
253
+
254
+ By default, gets will only raise once the queue is empty. Set
255
+ 'immediate' to True to make gets raise immediately instead.
256
+
257
+ All blocked callers of put() will be unblocked, and also get()
258
+ and join() if 'immediate'. The QueueShutDown exception is raised.
259
+ """
260
+ if immediate :
261
+ self .shutdown_state = _queue_shutdown_immediate
262
+ while self ._getters :
263
+ getter = self ._getters .popleft ()
264
+ if not getter .done ():
265
+ getter .set_result (None )
266
+ else :
267
+ self .shutdown_state = _queue_shutdown
268
+ while self ._putters :
269
+ putter = self ._putters .popleft ()
270
+ if not putter .done ():
271
+ putter .set_result (None )
217
272
218
273
class PriorityQueue (Queue ):
219
274
"""A subclass of Queue; retrieves entries in priority order (lowest first).
0 commit comments