Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit ca118d7

Browse filesBrowse files
committed
Shut-down immediate consumes queue
1 parent 089eb96 commit ca118d7
Copy full SHA for ca118d7

File tree

Expand file treeCollapse file tree

2 files changed

+31
-81
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+31
-81
lines changed

‎Lib/queue.py

Copy file name to clipboardExpand all lines: Lib/queue.py
+11-38Lines changed: 11 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ class ShutDown(Exception):
2929
'''Raised when put/get with shut-down queue.'''
3030

3131

32-
_queue_alive = "alive"
33-
_queue_shutdown = "shutdown"
34-
_queue_shutdown_immediate = "shutdown-immediate"
35-
3632
class Queue:
3733
'''Create a queue object with a given maximum size.
3834
@@ -63,7 +59,7 @@ def __init__(self, maxsize=0):
6359
self.unfinished_tasks = 0
6460

6561
# Queue shutdown state
66-
self.shutdown_state = _queue_alive
62+
self.is_shutdown = False
6763

6864
def task_done(self):
6965
'''Indicate that a formerly enqueued task is complete.
@@ -82,8 +78,6 @@ def task_done(self):
8278
Raises ShutDown if the queue has been shut down immediately.
8379
'''
8480
with self.all_tasks_done:
85-
if self._is_shutdown_immediate():
86-
raise ShutDown
8781
unfinished = self.unfinished_tasks - 1
8882
if unfinished <= 0:
8983
if unfinished < 0:
@@ -103,12 +97,8 @@ def join(self):
10397
Raises ShutDown if the queue has been shut down immediately.
10498
'''
10599
with self.all_tasks_done:
106-
if self._is_shutdown_immediate():
107-
raise ShutDown
108100
while self.unfinished_tasks:
109101
self.all_tasks_done.wait()
110-
if self._is_shutdown_immediate():
111-
raise ShutDown
112102

113103
def qsize(self):
114104
'''Return the approximate size of the queue (not reliable!).'''
@@ -154,7 +144,7 @@ def put(self, item, block=True, timeout=None):
154144
Raises ShutDown if the queue has been shut down.
155145
'''
156146
with self.not_full:
157-
if not self._is_alive():
147+
if self.is_shutdown:
158148
raise ShutDown
159149
if self.maxsize > 0:
160150
if not block:
@@ -163,7 +153,7 @@ def put(self, item, block=True, timeout=None):
163153
elif timeout is None:
164154
while self._qsize() >= self.maxsize:
165155
self.not_full.wait()
166-
if not self._is_alive():
156+
if self.is_shutdown:
167157
raise ShutDown
168158
elif timeout < 0:
169159
raise ValueError("'timeout' must be a non-negative number")
@@ -174,7 +164,7 @@ def put(self, item, block=True, timeout=None):
174164
if remaining <= 0.0:
175165
raise Full
176166
self.not_full.wait(remaining)
177-
if not self._is_alive():
167+
if self.is_shutdown:
178168
raise ShutDown
179169
self._put(item)
180170
self.unfinished_tasks += 1
@@ -195,16 +185,15 @@ def get(self, block=True, timeout=None):
195185
or if the queue has been shut down immediately.
196186
'''
197187
with self.not_empty:
198-
if self._is_shutdown_immediate() or\
199-
(self._is_shutdown() and not self._qsize()):
188+
if self.is_shutdown and not self._qsize():
200189
raise ShutDown
201190
if not block:
202191
if not self._qsize():
203192
raise Empty
204193
elif timeout is None:
205194
while not self._qsize():
206195
self.not_empty.wait()
207-
if self._is_shutdown_immediate():
196+
if self.is_shutdown and not self._qsize():
208197
raise ShutDown
209198
elif timeout < 0:
210199
raise ValueError("'timeout' must be a non-negative number")
@@ -215,7 +204,7 @@ def get(self, block=True, timeout=None):
215204
if remaining <= 0.0:
216205
raise Empty
217206
self.not_empty.wait(remaining)
218-
if self._is_shutdown_immediate():
207+
if self.is_shutdown and not self._qsize():
219208
raise ShutDown
220209
item = self._get()
221210
self.not_full.notify()
@@ -247,32 +236,16 @@ def shutdown(self, immediate=False):
247236
and join() if 'immediate'. The ShutDown exception is raised.
248237
'''
249238
with self.mutex:
250-
if self._is_shutdown_immediate():
251-
return
239+
self.is_shutdown = True
252240
if immediate:
253-
self._set_shutdown_immediate()
241+
while self._qsize():
242+
self._get()
243+
self.unfinished_tasks = 0
254244
self.not_empty.notify_all()
255245
# release all blocked threads in `join()`
256246
self.all_tasks_done.notify_all()
257-
else:
258-
self._set_shutdown()
259247
self.not_full.notify_all()
260248

261-
def _is_alive(self):
262-
return self.shutdown_state == _queue_alive
263-
264-
def _is_shutdown(self):
265-
return self.shutdown_state == _queue_shutdown
266-
267-
def _is_shutdown_immediate(self):
268-
return self.shutdown_state == _queue_shutdown_immediate
269-
270-
def _set_shutdown(self):
271-
self.shutdown_state = _queue_shutdown
272-
273-
def _set_shutdown_immediate(self):
274-
self.shutdown_state = _queue_shutdown_immediate
275-
276249

277250
# Override these methods to implement other queue organizations
278251
# (e.g. stack or priority queue).

‎Lib/test/test_queue.py

Copy file name to clipboardExpand all lines: Lib/test/test_queue.py
+20-43Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -267,16 +267,15 @@ def test_shutdown_immediate(self):
267267
def test_shutdown_allowed_transitions(self):
268268
# allowed transitions would be from alive via shutdown to immediate
269269
q = self.type2test()
270-
self.assertEqual("alive", q.shutdown_state)
270+
self.assertFalse(q.is_shutdown)
271271

272272
q.shutdown()
273-
self.assertEqual("shutdown", q.shutdown_state)
273+
self.assertTrue(q.is_shutdown)
274274

275275
q.shutdown(immediate=True)
276-
self.assertEqual("shutdown-immediate", q.shutdown_state)
276+
self.assertTrue(q.is_shutdown)
277277

278278
q.shutdown(immediate=False)
279-
self.assertNotEqual("shutdown", q.shutdown_state)
280279

281280
def _shutdown_all_methods_in_one_thread(self, immediate):
282281
q = self.type2test(2)
@@ -293,10 +292,9 @@ def _shutdown_all_methods_in_one_thread(self, immediate):
293292
q.get()
294293
with self.assertRaises(self.queue.ShutDown):
295294
q.get_nowait()
296-
with self.assertRaises(self.queue.ShutDown):
295+
with self.assertRaises(ValueError):
297296
q.task_done()
298-
with self.assertRaises(self.queue.ShutDown):
299-
q.join()
297+
q.join()
300298
else:
301299
self.assertIn(q.get(), "LO")
302300
q.task_done()
@@ -333,10 +331,7 @@ def _write_msg_thread(self, q, n, results, delay,
333331
event_end.set()
334332
time.sleep(delay)
335333
# end of all puts
336-
try:
337-
q.join()
338-
except self.queue.ShutDown:
339-
pass
334+
q.join()
340335

341336
def _read_msg_thread(self, q, nb, results, delay, event_start):
342337
event_start.wait()
@@ -355,26 +350,17 @@ def _read_msg_thread(self, q, nb, results, delay, event_start):
355350
nb -= 1
356351
except self.queue.Empty:
357352
pass
358-
try:
359-
q.join()
360-
except self.queue.ShutDown:
361-
pass
353+
q.join()
362354

363355
def _shutdown_thread(self, q, event_end, immediate):
364356
event_end.wait()
365357
q.shutdown(immediate)
366-
try:
367-
q.join()
368-
except self.queue.ShutDown:
369-
pass
358+
q.join()
370359

371360
def _join_thread(self, q, delay, event_start):
372361
event_start.wait()
373362
time.sleep(delay)
374-
try:
375-
q.join()
376-
except self.queue.ShutDown:
377-
pass
363+
q.join()
378364

379365
def _shutdown_all_methods_in_many_threads(self, immediate):
380366
q = self.type2test()
@@ -413,6 +399,9 @@ def _shutdown_all_methods_in_many_threads(self, immediate):
413399
assert(len(res_gets) <= len(res_puts))
414400
assert(res_gets.count(True) <= res_puts.count(True))
415401

402+
for thread in ps[1:]:
403+
thread.join()
404+
416405
def test_shutdown_all_methods_in_many_threads(self):
417406
return self._shutdown_all_methods_in_many_threads(False)
418407

@@ -544,15 +533,9 @@ def _shutdown_join(self, immediate):
544533
go = threading.Event()
545534
nb = q.qsize()
546535

547-
if immediate:
548-
thrds = (
549-
(self._join_shutdown, (q, results)),
550-
(self._join_shutdown, (q, results)),
551-
)
552-
else:
553-
thrds = (
554-
(self._join, (q, results)),
555-
(self._join, (q, results)),
536+
thrds = (
537+
(self._join, (q, results)),
538+
(self._join, (q, results)),
556539
)
557540
threads = []
558541
for func, params in thrds:
@@ -584,21 +567,15 @@ def _shutdown_put_join(self, immediate):
584567
nb = q.qsize()
585568
# queue not fulled
586569

587-
if immediate:
588-
thrds = (
589-
(self._put_shutdown, (q, "E", go, results)),
590-
(self._join_shutdown, (q, results)),
591-
)
592-
else:
593-
thrds = (
594-
(self._put_shutdown, (q, "E", go, results)),
595-
(self._join, (q, results)),
596-
)
570+
thrds = (
571+
(self._put_shutdown, (q, "E", go, results)),
572+
(self._join, (q, results)),
573+
)
597574
threads = []
598575
for func, params in thrds:
599576
threads.append(threading.Thread(target=func, args=params))
600577
threads[-1].start()
601-
if not immediate:
578+
if not immediate or immediate: # TODO: dedent (minimising Git diff)
602579
self.assertEqual(q.unfinished_tasks, nb)
603580
for i in range(nb):
604581
t = threading.Thread(target=q.task_done)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.