From d7e7b4604f60699f41a1fc1bd8fcf1da5653e442 Mon Sep 17 00:00:00 2001 From: Marshall Roch Date: Thu, 20 Mar 2025 09:53:29 -0400 Subject: [PATCH 1/2] send errors to pending requests if server closes --- src/mcp/shared/session.py | 8 +++++ src/mcp/types.py | 4 +++ tests/shared/test_session.py | 59 +++++++++++++++++++++++++++++++++++- 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 31c04df33..154508ea6 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -14,6 +14,7 @@ from mcp.shared.exceptions import McpError from mcp.types import ( + CONNECTION_CLOSED, CancelledNotification, ClientNotification, ClientRequest, @@ -374,6 +375,13 @@ async def _receive_loop(self) -> None: ) ) + # after the read stream is closed, we need to send errors + # to any pending requests + for id, stream in self._response_streams.items(): + error = ErrorData(code=CONNECTION_CLOSED, message="Connection closed") + await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error)) + await stream.aclose() + async def _received_request( self, responder: RequestResponder[ReceiveRequestT, SendResultT] ) -> None: diff --git a/src/mcp/types.py b/src/mcp/types.py index f043fb10a..7f80dc0d9 100644 --- a/src/mcp/types.py +++ b/src/mcp/types.py @@ -137,6 +137,10 @@ class JSONRPCResponse(BaseModel): model_config = ConfigDict(extra="allow") +# SDK error codes +CONNECTION_CLOSED = -32000 +# REQUEST_TIMEOUT = -32001 # the typescript sdk uses this + # Standard JSON-RPC error codes PARSE_ERROR = -32700 INVALID_REQUEST = -32600 diff --git a/tests/shared/test_session.py b/tests/shared/test_session.py index 59cb30c86..eb4e004ae 100644 --- a/tests/shared/test_session.py +++ b/tests/shared/test_session.py @@ -7,7 +7,10 @@ from mcp.client.session import ClientSession from mcp.server.lowlevel.server import Server from mcp.shared.exceptions import McpError -from mcp.shared.memory import create_connected_server_and_client_session +from mcp.shared.memory import ( + create_client_server_memory_streams, + create_connected_server_and_client_session, +) from mcp.types import ( CancelledNotification, CancelledNotificationParams, @@ -124,3 +127,57 @@ async def make_request(client_session): # Give cancellation time to process with anyio.fail_after(1): await ev_cancelled.wait() + + +@pytest.mark.anyio +async def test_connection_closed(): + """ + Test that pending requests are cancelled when the connection is closed remotely. + """ + + ev_closed = anyio.Event() + ev_response = anyio.Event() + + async with create_client_server_memory_streams() as ( + client_streams, + server_streams, + ): + client_read, client_write = client_streams + server_read, server_write = server_streams + + async def make_request(client_session): + """Send a request in a separate task""" + nonlocal ev_response + try: + # any request will do + await client_session.initialize() + pytest.fail("Request should have errored") + except McpError as e: + # Expected - request errored + assert "Connection closed" in str(e) + ev_response.set() + + async def mock_server(): + """Wait for a request, then close the connection""" + nonlocal ev_closed + # Wait for a request + await server_read.receive() + # Close the connection, as if the server exited + server_write.close() + server_read.close() + ev_closed.set() + + async with ( + anyio.create_task_group() as tg, + ClientSession( + read_stream=client_read, + write_stream=client_write, + ) as client_session, + ): + tg.start_soon(make_request, client_session) + tg.start_soon(mock_server) + + with anyio.fail_after(1): + await ev_closed.wait() + with anyio.fail_after(1): + await ev_response.wait() From d65d1502469ebc667eb0077cc3f5503a280f246c Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 27 May 2025 22:52:38 +0100 Subject: [PATCH 2/2] close _response_streams after completion --- src/mcp/shared/session.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index b12bba09d..f5547ea0e 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -424,6 +424,7 @@ async def _receive_loop(self) -> None: error = ErrorData(code=CONNECTION_CLOSED, message="Connection closed") await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error)) await stream.aclose() + self._response_streams.clear() async def _received_request( self, responder: RequestResponder[ReceiveRequestT, SendResultT]