-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
gh-113538: Allow client connections to be closed #114432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
9f0111e
c25eabb
b7fa198
b3cd9c1
c78a927
1ec06da
2790ddf
6a56a80
6929888
8316199
3e1705b
6c078d6
1158151
1065dda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -277,7 +277,8 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, | |||
ssl_handshake_timeout, ssl_shutdown_timeout=None): | ||||
self._loop = loop | ||||
self._sockets = sockets | ||||
self._active_count = 0 | ||||
# Weak references so abandoned transports can be detected | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The wording here was intentional. Weak references is to my knowledge the only way to detect abandoned objects. But it's not this code that does that detection, so I can understand the confusion. How about:
? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, you're thinking from the POV of the transport, whose I'll make it your choice which wording to use. |
||||
self._clients = weakref.WeakSet() | ||||
self._waiters = [] | ||||
self._protocol_factory = protocol_factory | ||||
self._backlog = backlog | ||||
|
@@ -290,14 +291,15 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, | |||
def __repr__(self): | ||||
return f'<{self.__class__.__name__} sockets={self.sockets!r}>' | ||||
|
||||
def _attach(self): | ||||
def _attach(self, transport): | ||||
assert self._sockets is not None | ||||
self._active_count += 1 | ||||
self._clients.add(transport) | ||||
|
||||
def _detach(self): | ||||
assert self._active_count > 0 | ||||
self._active_count -= 1 | ||||
if self._active_count == 0 and self._sockets is None: | ||||
def _detach(self, transport): | ||||
# Note that 'transport' may already be missing from | ||||
# self._clients if it has been garbage collected | ||||
self._clients.discard(transport) | ||||
if len(self._clients) == 0 and self._sockets is None: | ||||
self._wakeup() | ||||
|
||||
def _wakeup(self): | ||||
|
@@ -346,9 +348,17 @@ def close(self): | |||
self._serving_forever_fut.cancel() | ||||
self._serving_forever_fut = None | ||||
|
||||
if self._active_count == 0: | ||||
if len(self._clients) == 0: | ||||
self._wakeup() | ||||
|
||||
def close_clients(self): | ||||
for transport in self._clients.copy(): | ||||
transport.close() | ||||
|
||||
def abort_clients(self): | ||||
for transport in self._clients.copy(): | ||||
transport.abort() | ||||
|
||||
async def start_serving(self): | ||||
self._start_serving() | ||||
# Skip one loop iteration so that all 'loop.add_reader' | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,8 +125,12 @@ async def main(srv): | |
class TestServer2(unittest.IsolatedAsyncioTestCase): | ||
|
||
async def test_wait_closed_basic(self): | ||
async def serve(*args): | ||
pass | ||
async def serve(rd, wr): | ||
try: | ||
await rd.read() | ||
finally: | ||
wr.close() | ||
await wr.wait_closed() | ||
|
||
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
self.addCleanup(srv.close) | ||
|
@@ -137,7 +141,8 @@ async def serve(*args): | |
self.assertFalse(task1.done()) | ||
|
||
# active count != 0, not closed: should block | ||
srv._attach() | ||
addr = srv.sockets[0].getsockname() | ||
(rd, wr) = await asyncio.open_connection(addr[0], addr[1]) | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
task2 = asyncio.create_task(srv.wait_closed()) | ||
await asyncio.sleep(0) | ||
self.assertFalse(task1.done()) | ||
|
@@ -152,7 +157,8 @@ async def serve(*args): | |
self.assertFalse(task2.done()) | ||
self.assertFalse(task3.done()) | ||
|
||
srv._detach() | ||
wr.close() | ||
await wr.wait_closed() | ||
# active count == 0, closed: should unblock | ||
await task1 | ||
await task2 | ||
|
@@ -161,22 +167,85 @@ async def serve(*args): | |
|
||
async def test_wait_closed_race(self): | ||
# Test a regression in 3.12.0, should be fixed in 3.12.1 | ||
async def serve(*args): | ||
pass | ||
async def serve(rd, wr): | ||
try: | ||
await rd.read() | ||
finally: | ||
wr.close() | ||
await wr.wait_closed() | ||
|
||
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
self.addCleanup(srv.close) | ||
|
||
task = asyncio.create_task(srv.wait_closed()) | ||
await asyncio.sleep(0) | ||
self.assertFalse(task.done()) | ||
srv._attach() | ||
addr = srv.sockets[0].getsockname() | ||
(rd, wr) = await asyncio.open_connection(addr[0], addr[1]) | ||
loop = asyncio.get_running_loop() | ||
loop.call_soon(srv.close) | ||
loop.call_soon(srv._detach) | ||
loop.call_soon(wr.close) | ||
await srv.wait_closed() | ||
|
||
async def test_close_clients(self): | ||
async def serve(rd, wr): | ||
try: | ||
await rd.read() | ||
finally: | ||
wr.close() | ||
await wr.wait_closed() | ||
|
||
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
self.addCleanup(srv.close) | ||
|
||
addr = srv.sockets[0].getsockname() | ||
(rd, wr) = await asyncio.open_connection(addr[0], addr[1]) | ||
self.addCleanup(wr.close) | ||
|
||
task = asyncio.create_task(srv.wait_closed()) | ||
await asyncio.sleep(0) | ||
self.assertFalse(task.done()) | ||
|
||
srv.close() | ||
srv.close_clients() | ||
await asyncio.sleep(0.1) # FIXME: flush call_soon()? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, short sleeps are a nuisance in asyncio tests. Usually they can be replaced by a small number of sleep(0) calls though -- usually 1, rarely 2 or 3. sleep(0) is special and guarantees we go through the event loop exactly once. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bunch of What's your suggestion here? Keep it as is? Or something else? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bunch of sleep(0) wastes less time (the asyncio tests are already too slow), and doesn't risk the test becoming flaky due to timing out on slow platforms (a problem we've struggled with). The right number of sleep(0) call takes up no more time than needed, and lets the machinery go through its motions in a deterministic matter. So I recommend sleep(0). |
||
self.assertTrue(task.done()) | ||
|
||
async def test_abort_clients(self): | ||
async def serve(rd, wr): | ||
nonlocal s_rd, s_wr | ||
s_rd = rd | ||
s_wr = wr | ||
await wr.wait_closed() | ||
|
||
s_rd = s_wr = None | ||
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
self.addCleanup(srv.close) | ||
|
||
addr = srv.sockets[0].getsockname() | ||
(c_rd, c_wr) = await asyncio.open_connection(addr[0], addr[1]) | ||
self.addCleanup(c_wr.close) | ||
|
||
# Make sure both sides are in a paused state | ||
while (s_wr.transport.get_write_buffer_size() == 0 or | ||
c_wr.transport.is_reading()): | ||
while s_wr.transport.get_write_buffer_size() == 0: | ||
s_wr.write(b'a' * 65536) | ||
await asyncio.sleep(0) | ||
await asyncio.sleep(0.1) # FIXME: More socket buffer space magically appears? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eh, what's going on with this one? Is it the same issue that can be fixed with a small number of sleep(0) calls, or different? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know, to be honest. Might be a kernel or libc issue where it shuffles buffers around and/or allocates more space. Without it, I cannot reliably get both the server and client to a state where buffers are full. Which is needed for the test to check the right thing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO this PR is not finished until you get to the bottom of that. If you really need then to reach a specific state and there's no deterministic way to get there, consider manipulating internal APIs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a pain, but I think I got it fixed. The core problem is that the kernel dynamically increases the socket send buffer size. And it takes about 20-30 ms to do so. I think I've worked around that by specifying an explicit buffer size. That should turn off the dynamic resizing, if I remember things correctly. |
||
|
||
task = asyncio.create_task(srv.wait_closed()) | ||
await asyncio.sleep(0) | ||
self.assertFalse(task.done()) | ||
|
||
# Sanity check | ||
self.assertNotEqual(s_wr.transport.get_write_buffer_size(), 0) | ||
self.assertFalse(c_wr.transport.is_reading()) | ||
|
||
srv.close() | ||
srv.abort_clients() | ||
await asyncio.sleep(0.1) # FIXME: flush call_soon()? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one looks similar to the first sleep. |
||
self.assertTrue(task.done()) | ||
|
||
|
||
# Test the various corner cases of Unix server socket removal | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
Add :meth:`asyncio.Server.close_clients` and | ||
:meth:`asyncio.Server.abort_clients` methods which allow to more forcefully | ||
close an asyncio server. |
Uh oh!
There was an error while loading. Please reload this page.