Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

GH-109564: add asyncio.Server state machine #131009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
Loading
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 61 additions & 23 deletions 84 Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import collections
import collections.abc
import concurrent.futures
import enum
import errno
import heapq
import itertools
Expand Down Expand Up @@ -272,6 +273,23 @@ async def restore(self):
self._proto.resume_writing()


class _ServerState(enum.Enum):
"""This tracks the state of Server.

-[in]->NOT_STARTED -[ss]-> SERVING -[cl]-> CLOSED -[wk]*-> SHUTDOWN

- in: Server.__init__()
- ss: Server._start_serving()
- cl: Server.close()
- wk: Server._wakeup() *only called if number of clients == 0
"""

NOT_STARTED = "not_started"
SERVING = "serving"
CLOSED = "closed"
SHUTDOWN = "shutdown"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the name SHUTDOWN here, but needed something more "definitive" than closed.

If we do use SHUTDOWN should I also rename Server._wakeup -> Server._shutdown?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do use SHUTDOWN should I also rename Server._wakeup -> Server._shutdown?

I think that makes sense.

I'm more worried about INITIALIZED here; nothing is really initialized, other than the Python object, which isn't particularly useful knowledge. Let's call it something like NOT_SERVING or NOT_YET_STARTED.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 07129e5



class Server(events.AbstractServer):

def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
Expand All @@ -287,32 +305,47 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
self._ssl_context = ssl_context
self._ssl_handshake_timeout = ssl_handshake_timeout
self._ssl_shutdown_timeout = ssl_shutdown_timeout
self._serving = False
self._state = _ServerState.NOT_STARTED
self._serving_forever_fut = None

def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'

def _attach(self, transport):
assert self._sockets is not None
if self._state != _ServerState.SERVING:
raise RuntimeError("server is not serving, cannot attach transport")
self._clients.add(transport)

def _detach(self, transport):
self._clients.discard(transport)
if len(self._clients) == 0 and self._sockets is None:
self._wakeup()
if self._state == _ServerState.CLOSED and len(self._clients) == 0:
self._shutdown()

def _shutdown(self):
if self._state == _ServerState.CLOSED:
self._state = _ServerState.SHUTDOWN
elif self._state == _ServerState.SHUTDOWN:
# gh109564: the wakeup method has two possible call-sites,
# through an explicit call Server.close(), or indirectly through
# Server._detach() by the last connected client.
return
else:
raise RuntimeError(f"server {self!r} must be closed before shutdown")

def _wakeup(self):
waiters = self._waiters
self._waiters = None
for waiter in waiters:
if not waiter.done():
waiter.set_result(None)

def _start_serving(self):
if self._serving:
if self._state == _ServerState.NOT_STARTED:
self._state = _ServerState.SERVING
elif self._state == _ServerState.SERVING:
return
self._serving = True
else:
raise RuntimeError(f'server {self!r} was already started and then closed')

for sock in self._sockets:
sock.listen(self._backlog)
self._loop._start_serving(
Expand All @@ -324,7 +357,7 @@ def get_loop(self):
return self._loop

def is_serving(self):
return self._serving
return self._state == _ServerState.SERVING

@property
def sockets(self):
Expand All @@ -333,23 +366,30 @@ def sockets(self):
return tuple(trsock.TransportSocket(s) for s in self._sockets)

def close(self):
sockets = self._sockets
if sockets is None:
if self._state in {_ServerState.CLOSED, _ServerState.SHUTDOWN}:
return
self._sockets = None

for sock in sockets:
self._loop._stop_serving(sock)
prev_state = self._state
try:
self._state = _ServerState.CLOSED
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if something magically goes wrong after this has been set? Is the server still alive while thinking it's closed? It might be worth adding a try/except to undo this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah great point! Although I'm not sure how that would look from a user's point of view. Should we give them a warning to let them know they must try to close the server again? Alternatively, should we expose something like a force kwarg/flag?

I pushed an initial fix with a simple try/except to recover the previous state on fail in 48a3c0d


self._serving = False
sockets = self._sockets
if sockets is None:
return
self._sockets = None

if (self._serving_forever_fut is not None and
not self._serving_forever_fut.done()):
self._serving_forever_fut.cancel()
self._serving_forever_fut = None
for sock in sockets:
self._loop._stop_serving(sock)

if len(self._clients) == 0:
self._wakeup()
if (self._serving_forever_fut is not None and
not self._serving_forever_fut.done()):
self._serving_forever_fut.cancel()
self._serving_forever_fut = None

if len(self._clients) == 0:
self._shutdown()
except:
self._state = prev_state

def close_clients(self):
for transport in self._clients.copy():
Expand All @@ -369,8 +409,6 @@ async def serve_forever(self):
if self._serving_forever_fut is not None:
raise RuntimeError(
f'server {self!r} is already being awaited on serve_forever()')
if self._sockets is None:
raise RuntimeError(f'server {self!r} is closed')

self._start_serving()
self._serving_forever_fut = self._loop.create_future()
Expand Down Expand Up @@ -407,7 +445,7 @@ async def wait_closed(self):
# from two places: self.close() and self._detach(), but only
# when both conditions have become true. To signal that this
# has happened, self._wakeup() sets self._waiters to None.
if self._waiters is None:
if self._state == _ServerState.SHUTDOWN:
return
waiter = self._loop.create_future()
self._waiters.append(waiter)
Expand Down
2 changes: 1 addition & 1 deletion 2 Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, loop, sock, protocol, waiter=None,
self._closing = False # Set when close() called.
self._called_connection_lost = False
self._eof_written = False
if self._server is not None:
if self._server is not None and self._server.is_serving():
self._server._attach(self)
self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None:
Expand Down
3 changes: 2 additions & 1 deletion 3 Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,8 +794,9 @@ def __init__(self, loop, sock, protocol, extra=None, server=None):
self._closing = False # Set when close() called.
self._paused = False # Set when pause_reading() called

if self._server is not None:
if self._server is not None and self._server.is_serving():
self._server._attach(self)

loop._transports[self._sock_fd] = self

def __repr__(self):
Expand Down
10 changes: 8 additions & 2 deletions 10 Lib/test/test_asyncio/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import threading
import unittest
from unittest.mock import Mock

from test.support import socket_helper
from test.test_asyncio import utils as test_utils
Expand Down Expand Up @@ -65,7 +66,7 @@ async def main(srv):
self.assertIsNone(srv._waiters)
self.assertFalse(srv.is_serving())

with self.assertRaisesRegex(RuntimeError, r'is closed'):
with self.assertRaisesRegex(RuntimeError, r'started and then closed'):
self.loop.run_until_complete(srv.serve_forever())


Expand Down Expand Up @@ -118,7 +119,7 @@ async def main(srv):
self.assertIsNone(srv._waiters)
self.assertFalse(srv.is_serving())

with self.assertRaisesRegex(RuntimeError, r'is closed'):
with self.assertRaisesRegex(RuntimeError, r'started and then closed'):
self.loop.run_until_complete(srv.serve_forever())


Expand Down Expand Up @@ -186,6 +187,8 @@ async def serve(rd, wr):
loop.call_soon(srv.close)
loop.call_soon(wr.close)
await srv.wait_closed()
self.assertTrue(task.done())
self.assertFalse(srv.is_serving())

async def test_close_clients(self):
async def serve(rd, wr):
Expand All @@ -212,6 +215,9 @@ async def serve(rd, wr):
await asyncio.sleep(0)
self.assertTrue(task.done())

with self.assertRaisesRegex(RuntimeError, r'started and then closed'):
await srv.start_serving()

async def test_abort_clients(self):
async def serve(rd, wr):
fut.set_result((rd, wr))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix race condition in :meth:`asyncio.Server.close`. Patch by Jamie Phan.
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.