Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 1847008

Browse filesBrowse files
Akshit97aagarwal25
and
aagarwal25
authored
feat: Streamable HTTP support (#643)
Co-authored-by: aagarwal25 <akshit_agarwal@intuit.com>
1 parent 02b6e70 commit 1847008
Copy full SHA for 1847008

File tree

Expand file treeCollapse file tree

7 files changed

+247
-14
lines changed
Filter options
Expand file treeCollapse file tree

7 files changed

+247
-14
lines changed
+13Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# MCP Streamable HTTP Example
2+
3+
This example uses a local Streamable HTTP server in [server.py](server.py).
4+
5+
Run the example via:
6+
7+
```
8+
uv run python examples/mcp/streamablehttp_example/main.py
9+
```
10+
11+
## Details
12+
13+
The example uses the `MCPServerStreamableHttp` class from `agents.mcp`. The server runs in a sub-process at `https://localhost:8000/mcp`.
+83Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import asyncio
2+
import os
3+
import shutil
4+
import subprocess
5+
import time
6+
from typing import Any
7+
8+
from agents import Agent, Runner, gen_trace_id, trace
9+
from agents.mcp import MCPServer, MCPServerStreamableHttp
10+
from agents.model_settings import ModelSettings
11+
12+
13+
async def run(mcp_server: MCPServer):
14+
agent = Agent(
15+
name="Assistant",
16+
instructions="Use the tools to answer the questions.",
17+
mcp_servers=[mcp_server],
18+
model_settings=ModelSettings(tool_choice="required"),
19+
)
20+
21+
# Use the `add` tool to add two numbers
22+
message = "Add these numbers: 7 and 22."
23+
print(f"Running: {message}")
24+
result = await Runner.run(starting_agent=agent, input=message)
25+
print(result.final_output)
26+
27+
# Run the `get_weather` tool
28+
message = "What's the weather in Tokyo?"
29+
print(f"\n\nRunning: {message}")
30+
result = await Runner.run(starting_agent=agent, input=message)
31+
print(result.final_output)
32+
33+
# Run the `get_secret_word` tool
34+
message = "What's the secret word?"
35+
print(f"\n\nRunning: {message}")
36+
result = await Runner.run(starting_agent=agent, input=message)
37+
print(result.final_output)
38+
39+
40+
async def main():
41+
async with MCPServerStreamableHttp(
42+
name="Streamable HTTP Python Server",
43+
params={
44+
"url": "http://localhost:8000/mcp",
45+
},
46+
) as server:
47+
trace_id = gen_trace_id()
48+
with trace(workflow_name="Streamable HTTP Example", trace_id=trace_id):
49+
print(f"View trace: https://platform.openai.com/traces/trace?trace_id={trace_id}\n")
50+
await run(server)
51+
52+
53+
if __name__ == "__main__":
54+
# Let's make sure the user has uv installed
55+
if not shutil.which("uv"):
56+
raise RuntimeError(
57+
"uv is not installed. Please install it: https://docs.astral.sh/uv/getting-started/installation/"
58+
)
59+
60+
# We'll run the Streamable HTTP server in a subprocess. Usually this would be a remote server, but for this
61+
# demo, we'll run it locally at http://localhost:8000/mcp
62+
process: subprocess.Popen[Any] | None = None
63+
try:
64+
this_dir = os.path.dirname(os.path.abspath(__file__))
65+
server_file = os.path.join(this_dir, "server.py")
66+
67+
print("Starting Streamable HTTP server at http://localhost:8000/mcp ...")
68+
69+
# Run `uv run server.py` to start the Streamable HTTP server
70+
process = subprocess.Popen(["uv", "run", server_file])
71+
# Give it 3 seconds to start
72+
time.sleep(3)
73+
74+
print("Streamable HTTP server started. Running example...\n\n")
75+
except Exception as e:
76+
print(f"Error starting Streamable HTTP server: {e}")
77+
exit(1)
78+
79+
try:
80+
asyncio.run(main())
81+
finally:
82+
if process:
83+
process.terminate()
+33Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import random
2+
3+
import requests
4+
from mcp.server.fastmcp import FastMCP
5+
6+
# Create server
7+
mcp = FastMCP("Echo Server")
8+
9+
10+
@mcp.tool()
11+
def add(a: int, b: int) -> int:
12+
"""Add two numbers"""
13+
print(f"[debug-server] add({a}, {b})")
14+
return a + b
15+
16+
17+
@mcp.tool()
18+
def get_secret_word() -> str:
19+
print("[debug-server] get_secret_word()")
20+
return random.choice(["apple", "banana", "cherry"])
21+
22+
23+
@mcp.tool()
24+
def get_current_weather(city: str) -> str:
25+
print(f"[debug-server] get_current_weather({city})")
26+
27+
endpoint = "https://wttr.in"
28+
response = requests.get(f"{endpoint}/{city}")
29+
return response.text
30+
31+
32+
if __name__ == "__main__":
33+
mcp.run(transport="streamable-http")

‎pyproject.toml

Copy file name to clipboardExpand all lines: pyproject.toml
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ dependencies = [
1313
"typing-extensions>=4.12.2, <5",
1414
"requests>=2.0, <3",
1515
"types-requests>=2.0, <3",
16-
"mcp>=1.6.0, <2; python_version >= '3.10'",
16+
"mcp>=1.8.0, <2; python_version >= '3.10'",
1717
]
1818
classifiers = [
1919
"Typing :: Typed",

‎src/agents/mcp/__init__.py

Copy file name to clipboardExpand all lines: src/agents/mcp/__init__.py
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
MCPServerSseParams,
66
MCPServerStdio,
77
MCPServerStdioParams,
8+
MCPServerStreamableHttp,
9+
MCPServerStreamableHttpParams,
810
)
911
except ImportError:
1012
pass
@@ -17,5 +19,7 @@
1719
"MCPServerSseParams",
1820
"MCPServerStdio",
1921
"MCPServerStdioParams",
22+
"MCPServerStreamableHttp",
23+
"MCPServerStreamableHttpParams",
2024
"MCPUtil",
2125
]

‎src/agents/mcp/server.py

Copy file name to clipboardExpand all lines: src/agents/mcp/server.py
+98-8Lines changed: 98 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
1111
from mcp import ClientSession, StdioServerParameters, Tool as MCPTool, stdio_client
1212
from mcp.client.sse import sse_client
13-
from mcp.types import CallToolResult, JSONRPCMessage
13+
from mcp.client.streamable_http import GetSessionIdCallback, streamablehttp_client
14+
from mcp.shared.message import SessionMessage
15+
from mcp.types import CallToolResult
1416
from typing_extensions import NotRequired, TypedDict
1517

1618
from ..exceptions import UserError
@@ -83,8 +85,9 @@ def create_streams(
8385
self,
8486
) -> AbstractAsyncContextManager[
8587
tuple[
86-
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
87-
MemoryObjectSendStream[JSONRPCMessage],
88+
MemoryObjectReceiveStream[SessionMessage | Exception],
89+
MemoryObjectSendStream[SessionMessage],
90+
GetSessionIdCallback | None
8891
]
8992
]:
9093
"""Create the streams for the server."""
@@ -105,7 +108,11 @@ async def connect(self):
105108
"""Connect to the server."""
106109
try:
107110
transport = await self.exit_stack.enter_async_context(self.create_streams())
108-
read, write = transport
111+
# streamablehttp_client returns (read, write, get_session_id)
112+
# sse_client returns (read, write)
113+
114+
read, write, *_ = transport
115+
109116
session = await self.exit_stack.enter_async_context(
110117
ClientSession(
111118
read,
@@ -232,8 +239,9 @@ def create_streams(
232239
self,
233240
) -> AbstractAsyncContextManager[
234241
tuple[
235-
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
236-
MemoryObjectSendStream[JSONRPCMessage],
242+
MemoryObjectReceiveStream[SessionMessage | Exception],
243+
MemoryObjectSendStream[SessionMessage],
244+
GetSessionIdCallback | None
237245
]
238246
]:
239247
"""Create the streams for the server."""
@@ -302,8 +310,9 @@ def create_streams(
302310
self,
303311
) -> AbstractAsyncContextManager[
304312
tuple[
305-
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
306-
MemoryObjectSendStream[JSONRPCMessage],
313+
MemoryObjectReceiveStream[SessionMessage | Exception],
314+
MemoryObjectSendStream[SessionMessage],
315+
GetSessionIdCallback | None
307316
]
308317
]:
309318
"""Create the streams for the server."""
@@ -318,3 +327,84 @@ def create_streams(
318327
def name(self) -> str:
319328
"""A readable name for the server."""
320329
return self._name
330+
331+
332+
class MCPServerStreamableHttpParams(TypedDict):
333+
"""Mirrors the params in`mcp.client.streamable_http.streamablehttp_client`."""
334+
335+
url: str
336+
"""The URL of the server."""
337+
338+
headers: NotRequired[dict[str, str]]
339+
"""The headers to send to the server."""
340+
341+
timeout: NotRequired[timedelta]
342+
"""The timeout for the HTTP request. Defaults to 5 seconds."""
343+
344+
sse_read_timeout: NotRequired[timedelta]
345+
"""The timeout for the SSE connection, in seconds. Defaults to 5 minutes."""
346+
347+
terminate_on_close: NotRequired[bool]
348+
"""Terminate on close"""
349+
350+
351+
class MCPServerStreamableHttp(_MCPServerWithClientSession):
352+
"""MCP server implementation that uses the Streamable HTTP transport. See the [spec]
353+
(https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http)
354+
for details.
355+
"""
356+
357+
def __init__(
358+
self,
359+
params: MCPServerStreamableHttpParams,
360+
cache_tools_list: bool = False,
361+
name: str | None = None,
362+
client_session_timeout_seconds: float | None = 5,
363+
):
364+
"""Create a new MCP server based on the Streamable HTTP transport.
365+
366+
Args:
367+
params: The params that configure the server. This includes the URL of the server,
368+
the headers to send to the server, the timeout for the HTTP request, and the
369+
timeout for the Streamable HTTP connection and whether we need to
370+
terminate on close.
371+
372+
cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
373+
cached and only fetched from the server once. If `False`, the tools list will be
374+
fetched from the server on each call to `list_tools()`. The cache can be
375+
invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
376+
if you know the server will not change its tools list, because it can drastically
377+
improve latency (by avoiding a round-trip to the server every time).
378+
379+
name: A readable name for the server. If not provided, we'll create one from the
380+
URL.
381+
382+
client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
383+
"""
384+
super().__init__(cache_tools_list, client_session_timeout_seconds)
385+
386+
self.params = params
387+
self._name = name or f"streamable_http: {self.params['url']}"
388+
389+
def create_streams(
390+
self,
391+
) -> AbstractAsyncContextManager[
392+
tuple[
393+
MemoryObjectReceiveStream[SessionMessage | Exception],
394+
MemoryObjectSendStream[SessionMessage],
395+
GetSessionIdCallback | None
396+
]
397+
]:
398+
"""Create the streams for the server."""
399+
return streamablehttp_client(
400+
url=self.params["url"],
401+
headers=self.params.get("headers", None),
402+
timeout=self.params.get("timeout", timedelta(seconds=30)),
403+
sse_read_timeout=self.params.get("sse_read_timeout", timedelta(seconds=60 * 5)),
404+
terminate_on_close=self.params.get("terminate_on_close", True)
405+
)
406+
407+
@property
408+
def name(self) -> str:
409+
"""A readable name for the server."""
410+
return self._name

‎uv.lock

Copy file name to clipboardExpand all lines: uv.lock
+15-5Lines changed: 15 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.