-
-
Notifications
You must be signed in to change notification settings - Fork 32k
GH-109564: add asyncio.Server state machine #131009
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0fec860
f3b96bf
8e409b7
a92158a
44d24fb
07129e5
48a3c0d
5832655
7f3481b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
import collections | ||
import collections.abc | ||
import concurrent.futures | ||
import enum | ||
import errno | ||
import heapq | ||
import itertools | ||
|
@@ -272,6 +273,23 @@ async def restore(self): | |
self._proto.resume_writing() | ||
|
||
|
||
class _ServerState(enum.Enum): | ||
"""This tracks the state of Server. | ||
|
||
-[in]->NOT_STARTED -[ss]-> SERVING -[cl]-> CLOSED -[wk]*-> SHUTDOWN | ||
|
||
- in: Server.__init__() | ||
- ss: Server._start_serving() | ||
- cl: Server.close() | ||
- wk: Server._wakeup() *only called if number of clients == 0 | ||
""" | ||
|
||
NOT_STARTED = "not_started" | ||
SERVING = "serving" | ||
CLOSED = "closed" | ||
SHUTDOWN = "shutdown" | ||
|
||
|
||
class Server(events.AbstractServer): | ||
|
||
def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, | ||
|
@@ -287,32 +305,47 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, | |
self._ssl_context = ssl_context | ||
self._ssl_handshake_timeout = ssl_handshake_timeout | ||
self._ssl_shutdown_timeout = ssl_shutdown_timeout | ||
self._serving = False | ||
self._state = _ServerState.NOT_STARTED | ||
self._serving_forever_fut = None | ||
|
||
def __repr__(self): | ||
return f'<{self.__class__.__name__} sockets={self.sockets!r}>' | ||
|
||
def _attach(self, transport): | ||
assert self._sockets is not None | ||
if self._state != _ServerState.SERVING: | ||
raise RuntimeError("server is not serving, cannot attach transport") | ||
self._clients.add(transport) | ||
|
||
def _detach(self, transport): | ||
self._clients.discard(transport) | ||
if len(self._clients) == 0 and self._sockets is None: | ||
self._wakeup() | ||
if self._state == _ServerState.CLOSED and len(self._clients) == 0: | ||
self._shutdown() | ||
|
||
def _shutdown(self): | ||
if self._state == _ServerState.CLOSED: | ||
self._state = _ServerState.SHUTDOWN | ||
elif self._state == _ServerState.SHUTDOWN: | ||
# gh109564: the wakeup method has two possible call-sites, | ||
# through an explicit call Server.close(), or indirectly through | ||
# Server._detach() by the last connected client. | ||
return | ||
else: | ||
raise RuntimeError(f"server {self!r} must be closed before shutdown") | ||
|
||
def _wakeup(self): | ||
waiters = self._waiters | ||
self._waiters = None | ||
for waiter in waiters: | ||
if not waiter.done(): | ||
waiter.set_result(None) | ||
|
||
def _start_serving(self): | ||
if self._serving: | ||
if self._state == _ServerState.NOT_STARTED: | ||
self._state = _ServerState.SERVING | ||
elif self._state == _ServerState.SERVING: | ||
return | ||
self._serving = True | ||
else: | ||
raise RuntimeError(f'server {self!r} was already started and then closed') | ||
|
||
for sock in self._sockets: | ||
sock.listen(self._backlog) | ||
self._loop._start_serving( | ||
|
@@ -324,7 +357,7 @@ def get_loop(self): | |
return self._loop | ||
|
||
def is_serving(self): | ||
return self._serving | ||
return self._state == _ServerState.SERVING | ||
|
||
@property | ||
def sockets(self): | ||
|
@@ -333,23 +366,30 @@ def sockets(self): | |
return tuple(trsock.TransportSocket(s) for s in self._sockets) | ||
|
||
def close(self): | ||
sockets = self._sockets | ||
if sockets is None: | ||
if self._state in {_ServerState.CLOSED, _ServerState.SHUTDOWN}: | ||
return | ||
self._sockets = None | ||
|
||
for sock in sockets: | ||
self._loop._stop_serving(sock) | ||
prev_state = self._state | ||
try: | ||
self._state = _ServerState.CLOSED | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if something magically goes wrong after this has been set? Is the server still alive while thinking it's closed? It might be worth adding a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah great point! Although I'm not sure how that would look from a user's point of view. Should we give them a warning to let them know they must try to close the server again? Alternatively, should we expose something like a I pushed an initial fix with a simple |
||
|
||
self._serving = False | ||
sockets = self._sockets | ||
if sockets is None: | ||
return | ||
self._sockets = None | ||
|
||
if (self._serving_forever_fut is not None and | ||
not self._serving_forever_fut.done()): | ||
self._serving_forever_fut.cancel() | ||
self._serving_forever_fut = None | ||
for sock in sockets: | ||
self._loop._stop_serving(sock) | ||
|
||
if len(self._clients) == 0: | ||
self._wakeup() | ||
if (self._serving_forever_fut is not None and | ||
not self._serving_forever_fut.done()): | ||
self._serving_forever_fut.cancel() | ||
self._serving_forever_fut = None | ||
|
||
if len(self._clients) == 0: | ||
self._shutdown() | ||
except: | ||
self._state = prev_state | ||
|
||
def close_clients(self): | ||
for transport in self._clients.copy(): | ||
|
@@ -369,8 +409,6 @@ async def serve_forever(self): | |
if self._serving_forever_fut is not None: | ||
raise RuntimeError( | ||
f'server {self!r} is already being awaited on serve_forever()') | ||
if self._sockets is None: | ||
raise RuntimeError(f'server {self!r} is closed') | ||
|
||
self._start_serving() | ||
self._serving_forever_fut = self._loop.create_future() | ||
|
@@ -407,7 +445,7 @@ async def wait_closed(self): | |
# from two places: self.close() and self._detach(), but only | ||
# when both conditions have become true. To signal that this | ||
# has happened, self._wakeup() sets self._waiters to None. | ||
if self._waiters is None: | ||
if self._state == _ServerState.SHUTDOWN: | ||
return | ||
waiter = self._loop.create_future() | ||
self._waiters.append(waiter) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix race condition in :meth:`asyncio.Server.close`. Patch by Jamie Phan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about the name
SHUTDOWN
here, but needed something more "definitive" thanclosed
.If we do use
SHUTDOWN
should I also renameServer._wakeup
->Server._shutdown
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes sense.
I'm more worried about
INITIALIZED
here; nothing is really initialized, other than the Python object, which isn't particularly useful knowledge. Let's call it something likeNOT_SERVING
orNOT_YET_STARTED
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 07129e5