Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f552ac1

Browse filesBrowse files
committed
Implement for asyncio queues
1 parent d942c9e commit f552ac1
Copy full SHA for f552ac1

File tree

Expand file treeCollapse file tree

2 files changed

+113
-1
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+113
-1
lines changed

‎Lib/asyncio/queues.py

Copy file name to clipboardExpand all lines: Lib/asyncio/queues.py
+56-1Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
1+
__all__ = (
2+
'Queue',
3+
'PriorityQueue',
4+
'LifoQueue',
5+
'QueueFull',
6+
'QueueEmpty',
7+
'QueueShutDown',
8+
)
29

310
import collections
411
import heapq
@@ -18,6 +25,16 @@ class QueueFull(Exception):
1825
pass
1926

2027

28+
class QueueShutDown(Exception):
29+
"""Raised when putting on to or getting from a shut-down Queue."""
30+
pass
31+
32+
33+
_queue_alive = "alive"
34+
_queue_shutdown = "shutdown"
35+
_queue_shutdown_immediate = "shutdown-immediate"
36+
37+
2138
class Queue(mixins._LoopBoundMixin):
2239
"""A queue, useful for coordinating producer and consumer coroutines.
2340
@@ -41,6 +58,7 @@ def __init__(self, maxsize=0):
4158
self._finished = locks.Event()
4259
self._finished.set()
4360
self._init(maxsize)
61+
self.shutdown_state = _queue_alive
4462

4563
# These three are overridable in subclasses.
4664

@@ -113,6 +131,8 @@ async def put(self, item):
113131
Put an item into the queue. If the queue is full, wait until a free
114132
slot is available before adding item.
115133
"""
134+
if self.shutdown_state != _queue_alive:
135+
raise QueueShutDown
116136
while self.full():
117137
putter = self._get_loop().create_future()
118138
self._putters.append(putter)
@@ -132,13 +152,17 @@ async def put(self, item):
132152
# the call. Wake up the next in line.
133153
self._wakeup_next(self._putters)
134154
raise
155+
if self.shutdown_state != _queue_alive:
156+
raise QueueShutDown
135157
return self.put_nowait(item)
136158

137159
def put_nowait(self, item):
138160
"""Put an item into the queue without blocking.
139161
140162
If no free slot is immediately available, raise QueueFull.
141163
"""
164+
if self.shutdown_state != _queue_alive:
165+
raise QueueShutDown
142166
if self.full():
143167
raise QueueFull
144168
self._put(item)
@@ -151,7 +175,11 @@ async def get(self):
151175
152176
If queue is empty, wait until an item is available.
153177
"""
178+
if self.shutdown_state == _queue_shutdown_immediate:
179+
raise QueueShutDown
154180
while self.empty():
181+
if self.shutdown_state != _queue_alive:
182+
raise QueueShutDown
155183
getter = self._get_loop().create_future()
156184
self._getters.append(getter)
157185
try:
@@ -170,6 +198,8 @@ async def get(self):
170198
# the call. Wake up the next in line.
171199
self._wakeup_next(self._getters)
172200
raise
201+
if self.shutdown_state == _queue_shutdown_immediate:
202+
raise QueueShutDown
173203
return self.get_nowait()
174204

175205
def get_nowait(self):
@@ -178,7 +208,11 @@ def get_nowait(self):
178208
Return an item if one is immediately available, else raise QueueEmpty.
179209
"""
180210
if self.empty():
211+
if self.shutdown_state != _queue_alive:
212+
raise QueueShutDown
181213
raise QueueEmpty
214+
elif self.shutdown_state == _queue_shutdown_immediate:
215+
raise QueueShutDown
182216
item = self._get()
183217
self._wakeup_next(self._putters)
184218
return item
@@ -214,6 +248,27 @@ async def join(self):
214248
if self._unfinished_tasks > 0:
215249
await self._finished.wait()
216250

251+
def shutdown(self, immediate=False):
252+
"""Shut-down the queue, making queue gets and puts raise.
253+
254+
By default, gets will only raise once the queue is empty. Set
255+
'immediate' to True to make gets raise immediately instead.
256+
257+
All blocked callers of put() will be unblocked, and also get()
258+
and join() if 'immediate'. The QueueShutDown exception is raised.
259+
"""
260+
if immediate:
261+
self.shutdown_state = _queue_shutdown_immediate
262+
while self._getters:
263+
getter = self._getters.popleft()
264+
if not getter.done():
265+
getter.set_result(None)
266+
else:
267+
self.shutdown_state = _queue_shutdown
268+
while self._putters:
269+
putter = self._putters.popleft()
270+
if not putter.done():
271+
putter.set_result(None)
217272

218273
class PriorityQueue(Queue):
219274
"""A subclass of Queue; retrieves entries in priority order (lowest first).

‎Lib/test/test_asyncio/test_queues.py

Copy file name to clipboardExpand all lines: Lib/test/test_asyncio/test_queues.py
+57Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,5 +522,62 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCa
522522
q_class = asyncio.PriorityQueue
523523

524524

525+
class _QueueShutdownTestMixin:
526+
q_class = None
527+
528+
async def test_empty(self):
529+
q = self.q_class()
530+
q.shutdown()
531+
try:
532+
await q.put("data")
533+
self.fail("Didn't appear to shut-down queue")
534+
except asyncio.QueueShutDown:
535+
pass
536+
try:
537+
await q.get()
538+
self.fail("Didn't appear to shut-down queue")
539+
except asyncio.QueueShutDown:
540+
pass
541+
542+
async def test_nonempty(self):
543+
q = self.q_class()
544+
q.put_nowait("data")
545+
q.shutdown()
546+
await q.get()
547+
try:
548+
await q.get()
549+
self.fail("Didn't appear to shut-down queue")
550+
except asyncio.QueueShutDown:
551+
pass
552+
553+
async def test_immediate(self):
554+
q = self.q_class()
555+
q.put_nowait("data")
556+
q.shutdown(immediate=True)
557+
try:
558+
await q.get()
559+
self.fail("Didn't appear to shut-down queue")
560+
except asyncio.QueueShutDown:
561+
pass
562+
563+
564+
class QueueShutdownTests(
565+
_QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase
566+
):
567+
q_class = asyncio.Queue
568+
569+
570+
class LifoQueueShutdownTests(
571+
_QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase
572+
):
573+
q_class = asyncio.LifoQueue
574+
575+
576+
class PriorityQueueShutdownTests(
577+
_QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase
578+
):
579+
q_class = asyncio.PriorityQueue
580+
581+
525582
if __name__ == '__main__':
526583
unittest.main()

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.