Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 9d99aee

Browse filesBrowse files
authored
Revert "Add message queue for SSE messages POST endpoint (#459)" (#649)
1 parent c8a14c9 commit 9d99aee
Copy full SHA for 9d99aee

26 files changed

+51
-1247
lines changed

‎README.md

Copy file name to clipboardExpand all lines: README.md
-24Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -449,30 +449,6 @@ if __name__ == "__main__":
449449

450450
For more information on mounting applications in Starlette, see the [Starlette documentation](https://www.starlette.io/routing/#submounting-routes).
451451

452-
#### Message Dispatch Options
453-
454-
By default, the SSE server uses an in-memory message dispatch system for incoming POST messages. For production deployments or distributed scenarios, you can use Redis or implement your own message dispatch system that conforms to the `MessageDispatch` protocol:
455-
456-
```python
457-
# Using the built-in Redis message dispatch
458-
from mcp.server.fastmcp import FastMCP
459-
from mcp.server.message_queue import RedisMessageDispatch
460-
461-
# Create a Redis message dispatch
462-
redis_dispatch = RedisMessageDispatch(
463-
redis_url="redis://localhost:6379/0", prefix="mcp:pubsub:"
464-
)
465-
466-
# Pass the message dispatch instance to the server
467-
mcp = FastMCP("My App", message_queue=redis_dispatch)
468-
```
469-
470-
To use Redis, add the Redis dependency:
471-
472-
```bash
473-
uv add "mcp[redis]"
474-
```
475-
476452
## Examples
477453

478454
### Echo Server

‎examples/servers/simple-prompt/mcp_simple_prompt/server.py

Copy file name to clipboardExpand all lines: examples/servers/simple-prompt/mcp_simple_prompt/server.py
+1-4Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,12 @@ async def get_prompt(
8888
)
8989

9090
if transport == "sse":
91-
from mcp.server.message_queue.redis import RedisMessageDispatch
9291
from mcp.server.sse import SseServerTransport
9392
from starlette.applications import Starlette
9493
from starlette.responses import Response
9594
from starlette.routing import Mount, Route
9695

97-
message_dispatch = RedisMessageDispatch("redis://localhost:6379/0")
98-
99-
sse = SseServerTransport("/messages/", message_dispatch=message_dispatch)
96+
sse = SseServerTransport("/messages/")
10097

10198
async def handle_sse(request):
10299
async with sse.connect_sse(

‎pyproject.toml

Copy file name to clipboardExpand all lines: pyproject.toml
-2Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ dependencies = [
3737
rich = ["rich>=13.9.4"]
3838
cli = ["typer>=0.12.4", "python-dotenv>=1.0.0"]
3939
ws = ["websockets>=15.0.1"]
40-
redis = ["redis>=5.2.1", "types-redis>=4.6.0.20241004"]
4140

4241
[project.scripts]
4342
mcp = "mcp.cli:app [cli]"
@@ -56,7 +55,6 @@ dev = [
5655
"pytest-xdist>=3.6.1",
5756
"pytest-examples>=0.0.14",
5857
"pytest-pretty>=1.2.0",
59-
"fakeredis==2.28.1",
6058
]
6159
docs = [
6260
"mkdocs>=1.6.1",

‎src/mcp/client/sse.py

Copy file name to clipboardExpand all lines: src/mcp/client/sse.py
+1-5Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,7 @@ async def sse_reader(
9898
await read_stream_writer.send(exc)
9999
continue
100100

101-
session_message = SessionMessage(
102-
message=message
103-
)
101+
session_message = SessionMessage(message)
104102
await read_stream_writer.send(session_message)
105103
case _:
106104
logger.warning(
@@ -150,5 +148,3 @@ async def post_writer(endpoint_url: str):
150148
finally:
151149
await read_stream_writer.aclose()
152150
await write_stream.aclose()
153-
await read_stream.aclose()
154-
await write_stream_reader.aclose()

‎src/mcp/client/stdio/__init__.py

Copy file name to clipboardExpand all lines: src/mcp/client/stdio/__init__.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ async def stdout_reader():
144144
await read_stream_writer.send(exc)
145145
continue
146146

147-
session_message = SessionMessage(message=message)
147+
session_message = SessionMessage(message)
148148
await read_stream_writer.send(session_message)
149149
except anyio.ClosedResourceError:
150150
await anyio.lowlevel.checkpoint()

‎src/mcp/client/streamable_http.py

Copy file name to clipboardExpand all lines: src/mcp/client/streamable_http.py
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ async def _handle_sse_event(
153153
):
154154
message.root.id = original_request_id
155155

156-
session_message = SessionMessage(message=message)
156+
session_message = SessionMessage(message)
157157
await read_stream_writer.send(session_message)
158158

159159
# Call resumption token callback if we have an ID
@@ -286,7 +286,7 @@ async def _handle_json_response(
286286
try:
287287
content = await response.aread()
288288
message = JSONRPCMessage.model_validate_json(content)
289-
session_message = SessionMessage(message=message)
289+
session_message = SessionMessage(message)
290290
await read_stream_writer.send(session_message)
291291
except Exception as exc:
292292
logger.error(f"Error parsing JSON response: {exc}")
@@ -333,7 +333,7 @@ async def _send_session_terminated_error(
333333
id=request_id,
334334
error=ErrorData(code=32600, message="Session terminated"),
335335
)
336-
session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error))
336+
session_message = SessionMessage(JSONRPCMessage(jsonrpc_error))
337337
await read_stream_writer.send(session_message)
338338

339339
async def post_writer(

‎src/mcp/client/websocket.py

Copy file name to clipboardExpand all lines: src/mcp/client/websocket.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ async def ws_reader():
6060
async for raw_text in ws:
6161
try:
6262
message = types.JSONRPCMessage.model_validate_json(raw_text)
63-
session_message = SessionMessage(message=message)
63+
session_message = SessionMessage(message)
6464
await read_stream_writer.send(session_message)
6565
except ValidationError as exc:
6666
# If JSON parse or model validation fails, send the exception

‎src/mcp/server/fastmcp/server.py

Copy file name to clipboardExpand all lines: src/mcp/server/fastmcp/server.py
+4-28Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
from mcp.server.lowlevel.server import LifespanResultT
4545
from mcp.server.lowlevel.server import Server as MCPServer
4646
from mcp.server.lowlevel.server import lifespan as default_lifespan
47-
from mcp.server.message_queue import MessageDispatch
4847
from mcp.server.session import ServerSession, ServerSessionT
4948
from mcp.server.sse import SseServerTransport
5049
from mcp.server.stdio import stdio_server
@@ -92,11 +91,6 @@ class Settings(BaseSettings, Generic[LifespanResultT]):
9291
sse_path: str = "/sse"
9392
message_path: str = "/messages/"
9493

95-
# SSE message queue settings
96-
message_dispatch: MessageDispatch | None = Field(
97-
None, description="Custom message dispatch instance"
98-
)
99-
10094
# resource settings
10195
warn_on_duplicate_resources: bool = True
10296

@@ -607,13 +601,6 @@ def _normalize_path(self, mount_path: str, endpoint: str) -> str:
607601

608602
def sse_app(self, mount_path: str | None = None) -> Starlette:
609603
"""Return an instance of the SSE server app."""
610-
message_dispatch = self.settings.message_dispatch
611-
if message_dispatch is None:
612-
from mcp.server.message_queue import InMemoryMessageDispatch
613-
614-
message_dispatch = InMemoryMessageDispatch()
615-
logger.info("Using default in-memory message dispatch")
616-
617604
from starlette.middleware import Middleware
618605
from starlette.routing import Mount, Route
619606

@@ -625,12 +612,11 @@ def sse_app(self, mount_path: str | None = None) -> Starlette:
625612
normalized_message_endpoint = self._normalize_path(
626613
self.settings.mount_path, self.settings.message_path
627614
)
628-
615+
629616
# Set up auth context and dependencies
630617

631618
sse = SseServerTransport(
632-
normalized_message_endpoint,
633-
message_dispatch=message_dispatch
619+
normalized_message_endpoint,
634620
)
635621

636622
async def handle_sse(scope: Scope, receive: Receive, send: Send):
@@ -646,14 +632,7 @@ async def handle_sse(scope: Scope, receive: Receive, send: Send):
646632
streams[1],
647633
self._mcp_server.create_initialization_options(),
648634
)
649-
return Response()
650-
651-
@asynccontextmanager
652-
async def lifespan(app: Starlette):
653-
try:
654-
yield
655-
finally:
656-
await message_dispatch.close()
635+
return Response()
657636

658637
# Create routes
659638
routes: list[Route | Mount] = []
@@ -730,10 +709,7 @@ async def sse_endpoint(request: Request) -> None:
730709

731710
# Create Starlette app with routes and middleware
732711
return Starlette(
733-
debug=self.settings.debug,
734-
routes=routes,
735-
middleware=middleware,
736-
lifespan=lifespan,
712+
debug=self.settings.debug, routes=routes, middleware=middleware
737713
)
738714

739715
async def list_prompts(self) -> list[MCPPrompt]:

‎src/mcp/server/message_queue/__init__.py

Copy file name to clipboardExpand all lines: src/mcp/server/message_queue/__init__.py
-16Lines changed: 0 additions & 16 deletions
This file was deleted.

‎src/mcp/server/message_queue/base.py

Copy file name to clipboardExpand all lines: src/mcp/server/message_queue/base.py
-116Lines changed: 0 additions & 116 deletions
This file was deleted.

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.