1
1
'''A multi-producer, multi-consumer queue.'''
2
2
3
+ import enum
3
4
import threading
4
5
import types
5
6
from collections import deque
@@ -29,9 +30,12 @@ class ShutDown(Exception):
29
30
'''Raised when put/get with shut-down queue.'''
30
31
31
32
32
- _queue_alive = "alive"
33
- _queue_shutdown = "shutdown"
34
- _queue_shutdown_immediate = "shutdown-immediate"
33
+ class _QueueState (enum .Enum ):
34
+ ALIVE = "alive"
35
+ SHUTDOWN = "shutdown"
36
+ SHUTDOWN_IMMEDIATE = "shutdown_immediate"
37
+
38
+ E
35
39
36
40
37
41
class Queue :
@@ -64,7 +68,7 @@ def __init__(self, maxsize=0):
64
68
self .unfinished_tasks = 0
65
69
66
70
# Queue shut-down state
67
- self .shutdown_state = _queue_alive
71
+ self .shutdown_state = _QueueState . ALIVE
68
72
69
73
def task_done (self ):
70
74
'''Indicate that a formerly enqueued task is complete.
@@ -99,7 +103,7 @@ def join(self):
99
103
'''
100
104
with self .all_tasks_done :
101
105
while self .unfinished_tasks :
102
- if self .shutdown_state == _queue_shutdown_immediate :
106
+ if self .shutdown_state is _QueueState . SHUTDOWN_IMMEDIATE :
103
107
return
104
108
self .all_tasks_done .wait ()
105
109
@@ -144,7 +148,7 @@ def put(self, item, block=True, timeout=None):
144
148
is immediately available, else raise the Full exception ('timeout'
145
149
is ignored in that case).
146
150
'''
147
- if self .shutdown_state != _queue_alive :
151
+ if self .shutdown_state is not _QueueState . ALIVE :
148
152
raise ShutDown
149
153
with self .not_full :
150
154
if self .maxsize > 0 :
@@ -154,7 +158,7 @@ def put(self, item, block=True, timeout=None):
154
158
elif timeout is None :
155
159
while self ._qsize () >= self .maxsize :
156
160
self .not_full .wait ()
157
- if self .shutdown_state != _queue_alive :
161
+ if self .shutdown_state is not _QueueState . ALIVE :
158
162
raise ShutDown
159
163
elif timeout < 0 :
160
164
raise ValueError ("'timeout' must be a non-negative number" )
@@ -165,7 +169,7 @@ def put(self, item, block=True, timeout=None):
165
169
if remaining <= 0.0 :
166
170
raise Full
167
171
self .not_full .wait (remaining )
168
- if self .shutdown_state != _queue_alive :
172
+ if self .shutdown_state is not _QueueState . ALIVE :
169
173
raise ShutDown
170
174
self ._put (item )
171
175
self .unfinished_tasks += 1
@@ -182,35 +186,35 @@ def get(self, block=True, timeout=None):
182
186
available, else raise the Empty exception ('timeout' is ignored
183
187
in that case).
184
188
'''
185
- if self .shutdown_state == _queue_shutdown_immediate :
189
+ if self .shutdown_state is _QueueState . SHUTDOWN_IMMEDIATE :
186
190
raise ShutDown
187
191
with self .not_empty :
188
192
if not block :
189
193
if not self ._qsize ():
190
- if self .shutdown_state != _queue_alive :
194
+ if self .shutdown_state is not _QueueState . ALIVE :
191
195
raise ShutDown
192
196
raise Empty
193
197
elif timeout is None :
194
198
while not self ._qsize ():
195
- if self .shutdown_state != _queue_alive :
199
+ if self .shutdown_state is not _QueueState . ALIVE :
196
200
raise ShutDown
197
201
self .not_empty .wait ()
198
- if self .shutdown_state != _queue_alive :
202
+ if self .shutdown_state is not _QueueState . ALIVE :
199
203
raise ShutDown
200
204
elif timeout < 0 :
201
205
raise ValueError ("'timeout' must be a non-negative number" )
202
206
else :
203
207
endtime = time () + timeout
204
208
while not self ._qsize ():
205
- if self .shutdown_state != _queue_alive :
209
+ if self .shutdown_state is not _QueueState . ALIVE :
206
210
raise ShutDown
207
211
remaining = endtime - time ()
208
212
if remaining <= 0.0 :
209
213
raise Empty
210
214
self .not_empty .wait (remaining )
211
- if self .shutdown_state != _queue_alive :
215
+ if self .shutdown_state is not _QueueState . ALIVE :
212
216
raise ShutDown
213
- if self .shutdown_state == _queue_shutdown_immediate :
217
+ if self .shutdown_state is _QueueState . SHUTDOWN_IMMEDIATE :
214
218
raise ShutDown
215
219
item = self ._get ()
216
220
self .not_full .notify ()
@@ -243,15 +247,15 @@ def shutdown(self, immediate=False):
243
247
'''
244
248
with self .mutex :
245
249
if immediate :
246
- self .shutdown_state = _queue_shutdown_immediate
250
+ self .shutdown_state = _QueueState . SHUTDOWN_IMMEDIATE
247
251
self .not_empty .notify_all ()
248
252
# set self.unfinished_tasks to 0
249
253
# to break the loop in 'self.join()'
250
254
# when quits from `wait()`
251
255
self .unfinished_tasks = 0
252
256
self .all_tasks_done .notify_all ()
253
257
else :
254
- self .shutdown_state = _queue_shutdown
258
+ self .shutdown_state = _QueueState . SHUTDOWN
255
259
self .not_full .notify_all ()
256
260
257
261
# Override these methods to implement other queue organizations
0 commit comments