26
26
27
27
from .util import debug , info , Finalize , register_after_fork , is_exiting
28
28
29
- _queue_alive = 0
30
- _queue_shutdown = 1
31
- _queue_shutdown_immediate = 2
32
-
33
29
#
34
30
# Queue type using a pipe, buffer and thread
35
31
#
@@ -52,7 +48,7 @@ def __init__(self, maxsize=0, *, ctx):
52
48
# For use by concurrent.futures
53
49
self ._ignore_epipe = False
54
50
self ._reset ()
55
- self ._shutdown_state = ctx .Value ('i ' , _queue_alive )
51
+ self ._is_shutdown = ctx .Value ('B ' , False , lock = self . _rlock )
56
52
57
53
if sys .platform != 'win32' :
58
54
register_after_fork (self , Queue ._after_fork )
@@ -61,12 +57,12 @@ def __getstate__(self):
61
57
context .assert_spawning (self )
62
58
return (self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
63
59
self ._rlock , self ._wlock , self ._sem , self ._opid ,
64
- self ._shutdown_state )
60
+ self ._is_shutdown )
65
61
66
62
def __setstate__ (self , state ):
67
63
(self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
68
64
self ._rlock , self ._wlock , self ._sem , self ._opid ,
69
- self ._shutdown_state ) = state
65
+ self ._is_shutdown ) = state
70
66
self ._reset ()
71
67
72
68
def _after_fork (self ):
@@ -88,32 +84,19 @@ def _reset(self, after_fork=False):
88
84
self ._recv_bytes = self ._reader .recv_bytes
89
85
self ._poll = self ._reader .poll
90
86
91
- def _is_alive (self ):
92
- return self ._shutdown_state .value == _queue_alive
93
-
94
- def _is_shutdown (self ):
95
- return self ._shutdown_state .value == _queue_shutdown
96
-
97
- def _is_shutdown_immediate (self ):
98
- return self ._shutdown_state .value == _queue_shutdown_immediate
99
-
100
- def _set_shutdown (self ):
101
- self ._shutdown_state .value = _queue_shutdown
102
-
103
- def _set_shutdown_immediate (self ):
104
- self ._shutdown_state .value = _queue_shutdown_immediate
105
-
106
87
def put (self , obj , block = True , timeout = None ):
107
88
if self ._closed :
108
89
raise ValueError (f"Queue { self !r} is closed" )
109
- if not self ._is_alive () :
90
+ if self ._is_shutdown . value :
110
91
raise ShutDown
111
92
if not self ._sem .acquire (block , timeout ):
112
- if not self ._is_alive () :
93
+ if self ._is_shutdown . value :
113
94
raise ShutDown
114
95
raise Full
115
96
116
97
with self ._notempty :
98
+ if self ._is_shutdown .value :
99
+ raise ShutDown
117
100
if self ._thread is None :
118
101
self ._start_thread ()
119
102
self ._buffer .append (obj )
@@ -124,36 +107,29 @@ def get(self, block=True, timeout=None):
124
107
raise ValueError (f"Queue { self !r} is closed" )
125
108
if block and timeout is None :
126
109
with self ._rlock :
127
- # checks shutdown state
128
- if (self ._is_shutdown_immediate ()
129
- or (self ._is_shutdown () and self .empty ())):
110
+ if self ._is_shutdown .value and self .empty ():
130
111
raise ShutDown
131
112
res = self ._recv_bytes ()
132
113
self ._sem .release ()
133
114
else :
134
115
if block :
135
116
deadline = time .monotonic () + timeout
136
117
if not self ._rlock .acquire (block , timeout ):
137
- if (self ._is_shutdown_immediate ()
138
- or (self ._is_shutdown () and self .empty ())):
118
+ if self ._is_shutdown .value and self .empty ():
139
119
raise ShutDown
140
120
raise Empty
141
121
try :
142
122
if block :
143
123
timeout = deadline - time .monotonic ()
144
124
if not self ._poll (timeout ):
145
- if not self ._is_alive () :
125
+ if self ._is_shutdown . value :
146
126
raise ShutDown
147
127
raise Empty
148
128
elif not self ._poll ():
149
- if not self ._is_alive () :
129
+ if self ._is_shutdown . value :
150
130
raise ShutDown
151
131
raise Empty
152
132
153
- # here queue is not empty
154
- if self ._is_shutdown_immediate ():
155
- raise ShutDown
156
- # here shutdown state queue is alive or shutdown
157
133
res = self ._recv_bytes ()
158
134
self ._sem .release ()
159
135
finally :
@@ -178,18 +154,21 @@ def get_nowait(self):
178
154
def put_nowait (self , obj ):
179
155
return self .put (obj , False )
180
156
157
+ def _clear (self ):
158
+ with self ._rlock :
159
+ while self ._poll ():
160
+ self ._recv_bytes ()
161
+
181
162
def shutdown (self , immediate = False ):
182
163
if self ._closed :
183
164
raise ValueError (f"Queue { self !r} is closed" )
184
- with self ._shutdown_state .get_lock ():
185
- if self ._is_shutdown_immediate ():
186
- return
165
+ with self ._is_shutdown .get_lock ():
166
+ self ._is_shutdown .value = True
187
167
if immediate :
188
- self ._set_shutdown_immediate ()
168
+ self ._clear ()
189
169
with self ._notempty :
190
170
self ._notempty .notify_all ()
191
- else :
192
- self ._set_shutdown ()
171
+ self ._sem .release (self .qsize ())
193
172
194
173
def close (self ):
195
174
self ._closed = True
@@ -384,14 +363,16 @@ def __setstate__(self, state):
384
363
def put (self , obj , block = True , timeout = None ):
385
364
if self ._closed :
386
365
raise ValueError (f"Queue { self !r} is closed" )
387
- if not self ._is_alive () :
366
+ if self ._is_shutdown . value :
388
367
raise ShutDown
389
368
if not self ._sem .acquire (block , timeout ):
390
- if not self ._is_alive () :
369
+ if self ._is_shutdown . value :
391
370
raise ShutDown
392
371
raise Full
393
372
394
373
with self ._notempty , self ._cond :
374
+ if self ._is_shutdown .value :
375
+ raise ShutDown
395
376
if self ._thread is None :
396
377
self ._start_thread ()
397
378
self ._buffer .append (obj )
@@ -400,27 +381,22 @@ def put(self, obj, block=True, timeout=None):
400
381
401
382
def task_done (self ):
402
383
with self ._cond :
403
- if self ._is_shutdown_immediate ():
404
- raise ShutDown
405
384
if not self ._unfinished_tasks .acquire (False ):
406
385
raise ValueError ('task_done() called too many times' )
407
386
if self ._unfinished_tasks ._semlock ._is_zero ():
408
387
self ._cond .notify_all ()
409
388
410
389
def join (self ):
411
390
with self ._cond :
412
- if self ._is_shutdown_immediate ():
413
- raise ShutDown
414
391
if not self ._unfinished_tasks ._semlock ._is_zero ():
415
392
self ._cond .wait ()
416
- if self ._is_shutdown_immediate ():
417
- raise ShutDown
418
393
419
- def shutdown (self , immediate = False ):
420
- with self ._cond :
421
- is_alive = self ._is_alive ()
422
- super ().shutdown (immediate )
423
- if is_alive :
394
+ def _clear (self ):
395
+ with self ._rlock :
396
+ while self ._poll ():
397
+ self ._recv_bytes ()
398
+ self ._unfinished_tasks .acquire (block = False )
399
+ with self ._cond :
424
400
self ._cond .notify_all ()
425
401
426
402
#
0 commit comments