@@ -30,11 +30,9 @@ class ShutDown(Exception):
30
30
'''Raised when put/get with shut-down queue.'''
31
31
32
32
33
- class _QueueState (enum .Enum ):
34
- ALIVE = "alive"
35
- SHUTDOWN = "shutdown"
36
- SHUTDOWN_IMMEDIATE = "shutdown-immediate"
37
-
33
+ _queue_alive = "alive"
34
+ _queue_shutdown = "shutdown"
35
+ _queue_shutdown_immediate = "shutdown-immediate"
38
36
39
37
class Queue :
40
38
'''Create a queue object with a given maximum size.
@@ -66,7 +64,7 @@ def __init__(self, maxsize=0):
66
64
self .unfinished_tasks = 0
67
65
68
66
# Queue shut-down state
69
- self .shutdown_state = _QueueState . ALIVE
67
+ self .shutdown_state = _queue_alive
70
68
71
69
def task_done (self ):
72
70
'''Indicate that a formerly enqueued task is complete.
@@ -83,6 +81,9 @@ def task_done(self):
83
81
placed in the queue.
84
82
'''
85
83
with self .all_tasks_done :
84
+ # here `self.all_task_done` uses `self.mutex`
85
+ if self .shutdown_state == _queue_shutdown_immediate :
86
+ raise ShutDown
86
87
unfinished = self .unfinished_tasks - 1
87
88
if unfinished <= 0 :
88
89
if unfinished < 0 :
@@ -100,10 +101,13 @@ def join(self):
100
101
When the count of unfinished tasks drops to zero, join() unblocks.
101
102
'''
102
103
with self .all_tasks_done :
104
+ # here `self.all_task_done` uses `self.mutex`
105
+ if self .shutdown_state == _queue_shutdown_immediate :
106
+ raise ShutDown
103
107
while self .unfinished_tasks :
104
- if self .shutdown_state is _QueueState .SHUTDOWN_IMMEDIATE :
105
- return
106
108
self .all_tasks_done .wait ()
109
+ if self .shutdown_state == _queue_shutdown_immediate :
110
+ raise ShutDown
107
111
108
112
def qsize (self ):
109
113
'''Return the approximate size of the queue (not reliable!).'''
@@ -146,17 +150,18 @@ def put(self, item, block=True, timeout=None):
146
150
is immediately available, else raise the Full exception ('timeout'
147
151
is ignored in that case).
148
152
'''
149
- if self .shutdown_state is not _QueueState .ALIVE :
150
- raise ShutDown
151
153
with self .not_full :
154
+ # here `self.not_full` uses `self.mutex``
155
+ if self .shutdown_state != _queue_alive :
156
+ raise ShutDown
152
157
if self .maxsize > 0 :
153
158
if not block :
154
159
if self ._qsize () >= self .maxsize :
155
160
raise Full
156
161
elif timeout is None :
157
162
while self ._qsize () >= self .maxsize :
158
163
self .not_full .wait ()
159
- if self .shutdown_state is not _QueueState . ALIVE :
164
+ if self .shutdown_state != _queue_alive :
160
165
raise ShutDown
161
166
elif timeout < 0 :
162
167
raise ValueError ("'timeout' must be a non-negative number" )
@@ -167,7 +172,7 @@ def put(self, item, block=True, timeout=None):
167
172
if remaining <= 0.0 :
168
173
raise Full
169
174
self .not_full .wait (remaining )
170
- if self .shutdown_state is not _QueueState . ALIVE :
175
+ if self .shutdown_state != _queue_alive :
171
176
raise ShutDown
172
177
self ._put (item )
173
178
self .unfinished_tasks += 1
@@ -184,35 +189,36 @@ def get(self, block=True, timeout=None):
184
189
available, else raise the Empty exception ('timeout' is ignored
185
190
in that case).
186
191
'''
187
- if self .shutdown_state is _QueueState .SHUTDOWN_IMMEDIATE :
188
- raise ShutDown
189
192
with self .not_empty :
193
+ # here `self.not_empty` uses `self.mutex`
194
+ if self .shutdown_state == _queue_shutdown_immediate :
195
+ raise ShutDown
190
196
if not block :
191
197
if not self ._qsize ():
192
- if self .shutdown_state is not _QueueState . ALIVE :
198
+ if self .shutdown_state != _queue_alive :
193
199
raise ShutDown
194
200
raise Empty
195
201
elif timeout is None :
196
202
while not self ._qsize ():
197
- if self .shutdown_state is not _QueueState . ALIVE :
203
+ if self .shutdown_state != _queue_alive :
198
204
raise ShutDown
199
205
self .not_empty .wait ()
200
- if self .shutdown_state is not _QueueState . ALIVE :
206
+ if self .shutdown_state != _queue_alive :
201
207
raise ShutDown
202
208
elif timeout < 0 :
203
209
raise ValueError ("'timeout' must be a non-negative number" )
204
210
else :
205
211
endtime = time () + timeout
206
212
while not self ._qsize ():
207
- if self .shutdown_state is not _QueueState . ALIVE :
213
+ if self .shutdown_state != _queue_alive :
208
214
raise ShutDown
209
215
remaining = endtime - time ()
210
216
if remaining <= 0.0 :
211
217
raise Empty
212
218
self .not_empty .wait (remaining )
213
- if self .shutdown_state is not _QueueState . ALIVE :
219
+ if self .shutdown_state != _queue_alive :
214
220
raise ShutDown
215
- if self .shutdown_state is _QueueState . SHUTDOWN_IMMEDIATE :
221
+ if self .shutdown_state == _queue_shutdown_immediate :
216
222
raise ShutDown
217
223
item = self ._get ()
218
224
self .not_full .notify ()
@@ -244,19 +250,19 @@ def shutdown(self, immediate=False):
244
250
and join() if 'immediate'. The ShutDown exception is raised.
245
251
'''
246
252
with self .mutex :
247
- if self .shutdown_state is _QueueState . SHUTDOWN_IMMEDIATE :
253
+ if self .shutdown_state is _queue_shutdown_immediate :
248
254
return
249
255
250
256
if immediate :
251
- self .shutdown_state = _QueueState . SHUTDOWN_IMMEDIATE
257
+ self .shutdown_state = _queue_shutdown_immediate
252
258
self .not_empty .notify_all ()
253
259
# set self.unfinished_tasks to 0
254
260
# to break the loop in 'self.join()'
255
261
# when quits from `wait()`
256
262
self .unfinished_tasks = 0
257
263
self .all_tasks_done .notify_all ()
258
264
else :
259
- self .shutdown_state = _QueueState . SHUTDOWN
265
+ self .shutdown_state = _queue_shutdown
260
266
self .not_full .notify_all ()
261
267
262
268
# Override these methods to implement other queue organizations
0 commit comments