Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

gh-113538: Allow client connections to be closed #114432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions 25 Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,31 @@ Do not instantiate the :class:`Server` class directly.
coroutine to wait until the server is closed (and no more
connections are active).

.. method:: close_clients()

Close all existing incoming client connections.
CendioOssman marked this conversation as resolved.
Show resolved Hide resolved

Calls :meth:`~asyncio.BaseTransport.close` on all associated
transports.

:meth:`close` should be called before :meth:`close_clients` when
closing the server to avoid races with new clients connecting.

.. versionadded:: 3.13

.. method:: abort_clients()

Close all existing incoming client connections immediately,
without waiting for pending operations to complete.

Calls :meth:`~asyncio.WriteTransport.abort` on all associated
transports.

:meth:`close` should be called before :meth:`abort_clients` when
closing the server to avoid races with new clients connecting.

.. versionadded:: 3.13

.. method:: get_loop()

Return the event loop associated with the server object.
Expand Down
5 changes: 5 additions & 0 deletions 5 Doc/whatsnew/3.13.rst
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ asyncio
the Unix socket when the server is closed.
(Contributed by Pierre Ossman in :gh:`111246`.)

* Add :meth:`asyncio.Server.close_clients` and
:meth:`asyncio.Server.abort_clients` methods which allow to more
forcefully close an asyncio server.
(Contributed by Pierre Ossman in :gh:`113538`.)

copy
----

Expand Down
26 changes: 18 additions & 8 deletions 26 Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
ssl_handshake_timeout, ssl_shutdown_timeout=None):
self._loop = loop
self._sockets = sockets
self._active_count = 0
# Weak references so abandoned transports can be detected
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Weak references so abandoned transports can be detected
# Weak references so abandoned transports can be ignored

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wording here was intentional. Weak references is to my knowledge the only way to detect abandoned objects. But it's not this code that does that detection, so I can understand the confusion. How about:

Weak references so we don't break Transport's ability to detect abandoned transports

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're thinking from the POV of the transport, whose __del__ must be called to "detect" (i.e., warn about) that it was abandoned. I was thinking from the POV of the loop in close_clients(), where we want to ignore (not encounter) transports that have been closed already.

I'll make it your choice which wording to use.

self._clients = weakref.WeakSet()
self._waiters = []
self._protocol_factory = protocol_factory
self._backlog = backlog
Expand All @@ -290,14 +291,15 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'

def _attach(self):
def _attach(self, transport):
assert self._sockets is not None
self._active_count += 1
self._clients.add(transport)

def _detach(self):
assert self._active_count > 0
self._active_count -= 1
if self._active_count == 0 and self._sockets is None:
def _detach(self, transport):
# Note that 'transport' may already be missing from
# self._clients if it has been garbage collected
self._clients.discard(transport)
if len(self._clients) == 0 and self._sockets is None:
self._wakeup()

def _wakeup(self):
Expand Down Expand Up @@ -346,9 +348,17 @@ def close(self):
self._serving_forever_fut.cancel()
self._serving_forever_fut = None

if self._active_count == 0:
if len(self._clients) == 0:
self._wakeup()

def close_clients(self):
for transport in self._clients.copy():
transport.close()

def abort_clients(self):
for transport in self._clients.copy():
transport.abort()

async def start_serving(self):
self._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
Expand Down
8 changes: 8 additions & 0 deletions 8 Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ def close(self):
"""Stop serving. This leaves existing connections open."""
raise NotImplementedError

def close_clients(self):
"""Close all active connections."""
raise NotImplementedError

def abort_clients(self):
"""Close all active connections immediately."""
raise NotImplementedError

def get_loop(self):
"""Get the event loop the Server object is attached to."""
raise NotImplementedError
Expand Down
4 changes: 2 additions & 2 deletions 4 Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, loop, sock, protocol, waiter=None,
self._called_connection_lost = False
self._eof_written = False
if self._server is not None:
self._server._attach()
self._server._attach(self)
self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
Expand Down Expand Up @@ -167,7 +167,7 @@ def _call_connection_lost(self, exc):
self._sock = None
server = self._server
if server is not None:
server._detach()
server._detach(self)
self._server = None
self._called_connection_lost = True

Expand Down
6 changes: 4 additions & 2 deletions 6 Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ def __init__(self, loop, sock, protocol, extra=None, server=None):
self._paused = False # Set when pause_reading() called

if self._server is not None:
self._server._attach()
self._server._attach(self)
loop._transports[self._sock_fd] = self

def __repr__(self):
Expand Down Expand Up @@ -864,6 +864,8 @@ def __del__(self, _warn=warnings.warn):
if self._sock is not None:
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
self._sock.close()
if self._server is not None:
self._server._detach(self)
CendioOssman marked this conversation as resolved.
Show resolved Hide resolved

def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
Expand Down Expand Up @@ -902,7 +904,7 @@ def _call_connection_lost(self, exc):
self._loop = None
server = self._server
if server is not None:
server._detach()
server._detach(self)
self._server = None

def get_write_buffer_size(self):
Expand Down
85 changes: 77 additions & 8 deletions 85 Lib/test/test_asyncio/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,12 @@ async def main(srv):
class TestServer2(unittest.IsolatedAsyncioTestCase):

async def test_wait_closed_basic(self):
async def serve(*args):
pass
async def serve(rd, wr):
try:
await rd.read()
finally:
wr.close()
await wr.wait_closed()

srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close)
Expand All @@ -137,7 +141,8 @@ async def serve(*args):
self.assertFalse(task1.done())

# active count != 0, not closed: should block
srv._attach()
addr = srv.sockets[0].getsockname()
(rd, wr) = await asyncio.open_connection(addr[0], addr[1])
gvanrossum marked this conversation as resolved.
Show resolved Hide resolved
task2 = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0)
self.assertFalse(task1.done())
Expand All @@ -152,7 +157,8 @@ async def serve(*args):
self.assertFalse(task2.done())
self.assertFalse(task3.done())

srv._detach()
wr.close()
await wr.wait_closed()
# active count == 0, closed: should unblock
await task1
await task2
Expand All @@ -161,22 +167,85 @@ async def serve(*args):

async def test_wait_closed_race(self):
# Test a regression in 3.12.0, should be fixed in 3.12.1
async def serve(*args):
pass
async def serve(rd, wr):
try:
await rd.read()
finally:
wr.close()
await wr.wait_closed()

srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close)

task = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0)
self.assertFalse(task.done())
srv._attach()
addr = srv.sockets[0].getsockname()
(rd, wr) = await asyncio.open_connection(addr[0], addr[1])
loop = asyncio.get_running_loop()
loop.call_soon(srv.close)
loop.call_soon(srv._detach)
loop.call_soon(wr.close)
await srv.wait_closed()

async def test_close_clients(self):
async def serve(rd, wr):
try:
await rd.read()
finally:
wr.close()
await wr.wait_closed()

srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close)

addr = srv.sockets[0].getsockname()
(rd, wr) = await asyncio.open_connection(addr[0], addr[1])
self.addCleanup(wr.close)

task = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0)
self.assertFalse(task.done())

srv.close()
srv.close_clients()
await asyncio.sleep(0.1) # FIXME: flush call_soon()?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, short sleeps are a nuisance in asyncio tests. Usually they can be replaced by a small number of sleep(0) calls though -- usually 1, rarely 2 or 3. sleep(0) is special and guarantees we go through the event loop exactly once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bunch of sleep(0) felt even more arbitrary. :/

What's your suggestion here? Keep it as is? Or something else?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bunch of sleep(0) wastes less time (the asyncio tests are already too slow), and doesn't risk the test becoming flaky due to timing out on slow platforms (a problem we've struggled with). The right number of sleep(0) call takes up no more time than needed, and lets the machinery go through its motions in a deterministic matter. So I recommend sleep(0).

self.assertTrue(task.done())

async def test_abort_clients(self):
async def serve(rd, wr):
nonlocal s_rd, s_wr
s_rd = rd
s_wr = wr
await wr.wait_closed()

s_rd = s_wr = None
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close)

addr = srv.sockets[0].getsockname()
(c_rd, c_wr) = await asyncio.open_connection(addr[0], addr[1])
self.addCleanup(c_wr.close)

# Make sure both sides are in a paused state
while (s_wr.transport.get_write_buffer_size() == 0 or
c_wr.transport.is_reading()):
while s_wr.transport.get_write_buffer_size() == 0:
s_wr.write(b'a' * 65536)
await asyncio.sleep(0)
await asyncio.sleep(0.1) # FIXME: More socket buffer space magically appears?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, what's going on with this one? Is it the same issue that can be fixed with a small number of sleep(0) calls, or different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, to be honest. Might be a kernel or libc issue where it shuffles buffers around and/or allocates more space.

Without it, I cannot reliably get both the server and client to a state where buffers are full. Which is needed for the test to check the right thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this PR is not finished until you get to the bottom of that. If you really need then to reach a specific state and there's no deterministic way to get there, consider manipulating internal APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a pain, but I think I got it fixed. The core problem is that the kernel dynamically increases the socket send buffer size. And it takes about 20-30 ms to do so.

I think I've worked around that by specifying an explicit buffer size. That should turn off the dynamic resizing, if I remember things correctly.


task = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0)
self.assertFalse(task.done())

# Sanity check
self.assertNotEqual(s_wr.transport.get_write_buffer_size(), 0)
self.assertFalse(c_wr.transport.is_reading())

srv.close()
srv.abort_clients()
await asyncio.sleep(0.1) # FIXME: flush call_soon()?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one looks similar to the first sleep.

self.assertTrue(task.done())


# Test the various corner cases of Unix server socket removal
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add :meth:`asyncio.Server.close_clients` and
:meth:`asyncio.Server.abort_clients` methods which allow to more forcefully
close an asyncio server.
Morty Proxy This is a proxified and sanitized view of the page, visit original site.