Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
15 changes: 13 additions & 2 deletions 15 examples/clients/simple-auth-client/mcp_simple_auth_client/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,15 @@ def get_state(self):
class SimpleAuthClient:
"""Simple MCP client with auth support."""

def __init__(self, server_url: str, transport_type: str = "streamable-http"):
def __init__(
self,
server_url: str,
transport_type: str = "streamable-http",
client_metadata_url: str | None = None,
):
self.server_url = server_url
self.transport_type = transport_type
self.client_metadata_url = client_metadata_url
self.session: ClientSession | None = None

async def connect(self):
Expand Down Expand Up @@ -185,12 +191,14 @@ async def _default_redirect_handler(authorization_url: str) -> None:
webbrowser.open(authorization_url)

# Create OAuth authentication handler using the new interface
# Use client_metadata_url to enable CIMD when the server supports it
oauth_auth = OAuthClientProvider(
server_url=self.server_url,
client_metadata=OAuthClientMetadata.model_validate(client_metadata_dict),
storage=InMemoryTokenStorage(),
redirect_handler=_default_redirect_handler,
callback_handler=callback_handler,
client_metadata_url=self.client_metadata_url,
)

# Create transport with auth handler based on transport type
Expand Down Expand Up @@ -334,6 +342,7 @@ async def main():
# Most MCP streamable HTTP servers use /mcp as the endpoint
server_url = os.getenv("MCP_SERVER_PORT", 8000)
transport_type = os.getenv("MCP_TRANSPORT_TYPE", "streamable-http")
client_metadata_url = os.getenv("MCP_CLIENT_METADATA_URL")
server_url = (
f"http://localhost:{server_url}/mcp"
if transport_type == "streamable-http"
Expand All @@ -343,9 +352,11 @@ async def main():
print("🚀 Simple MCP Auth Client")
print(f"Connecting to: {server_url}")
print(f"Transport type: {transport_type}")
if client_metadata_url:
print(f"Client metadata URL: {client_metadata_url}")

# Start connection flow - OAuth will be handled automatically
client = SimpleAuthClient(server_url, transport_type)
client = SimpleAuthClient(server_url, transport_type, client_metadata_url)
await client.connect()


Expand Down
64 changes: 53 additions & 11 deletions 64 src/mcp/client/auth/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from mcp.client.auth.utils import (
build_oauth_authorization_server_metadata_discovery_urls,
build_protected_resource_metadata_discovery_urls,
create_client_info_from_metadata_url,
create_client_registration_request,
create_oauth_metadata_request,
extract_field_from_www_auth,
Expand All @@ -33,6 +34,8 @@
handle_protected_resource_response,
handle_registration_response,
handle_token_response_scopes,
is_valid_client_metadata_url,
should_use_client_metadata_url,
)
from mcp.client.streamable_http import MCP_PROTOCOL_VERSION
from mcp.shared.auth import (
Expand Down Expand Up @@ -96,6 +99,7 @@ class OAuthContext:
redirect_handler: Callable[[str], Awaitable[None]] | None
callback_handler: Callable[[], Awaitable[tuple[str, str | None]]] | None
timeout: float = 300.0
client_metadata_url: str | None = None

# Discovered metadata
protected_resource_metadata: ProtectedResourceMetadata | None = None
Expand Down Expand Up @@ -226,15 +230,40 @@ def __init__(
redirect_handler: Callable[[str], Awaitable[None]] | None = None,
callback_handler: Callable[[], Awaitable[tuple[str, str | None]]] | None = None,
timeout: float = 300.0,
client_metadata_url: str | None = None,
):
"""Initialize OAuth2 authentication."""
"""Initialize OAuth2 authentication.

Args:
server_url: The MCP server URL.
client_metadata: OAuth client metadata for registration.
storage: Token storage implementation.
redirect_handler: Handler for authorization redirects.
callback_handler: Handler for authorization callbacks.
timeout: Timeout for the OAuth flow.
client_metadata_url: URL-based client ID. When provided and the server
advertises client_id_metadata_document_supported=true, this URL will be
used as the client_id instead of performing dynamic client registration.
Must be a valid HTTPS URL with a non-root pathname.

Raises:
ValueError: If client_metadata_url is provided but not a valid HTTPS URL
with a non-root pathname.
"""
# Validate client_metadata_url if provided
if client_metadata_url is not None and not is_valid_client_metadata_url(client_metadata_url):
raise ValueError(
f"client_metadata_url must be a valid HTTPS URL with a non-root pathname, got: {client_metadata_url}"
)

self.context = OAuthContext(
server_url=server_url,
client_metadata=client_metadata,
storage=storage,
redirect_handler=redirect_handler,
callback_handler=callback_handler,
timeout=timeout,
client_metadata_url=client_metadata_url,
)
self._initialized = False

Expand Down Expand Up @@ -566,17 +595,30 @@ async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.
self.context.oauth_metadata,
)

# Step 4: Register client if needed
registration_request = create_client_registration_request(
self.context.oauth_metadata,
self.context.client_metadata,
self.context.get_authorization_base_url(self.context.server_url),
)
# Step 4: Register client or use URL-based client ID (CIMD)
if not self.context.client_info:
registration_response = yield registration_request
client_information = await handle_registration_response(registration_response)
self.context.client_info = client_information
await self.context.storage.set_client_info(client_information)
if should_use_client_metadata_url(
self.context.oauth_metadata, self.context.client_metadata_url
):
# Use URL-based client ID (CIMD)
logger.debug(f"Using URL-based client ID (CIMD): {self.context.client_metadata_url}")
client_information = create_client_info_from_metadata_url(
self.context.client_metadata_url, # type: ignore[arg-type]
redirect_uris=self.context.client_metadata.redirect_uris,
)
self.context.client_info = client_information
await self.context.storage.set_client_info(client_information)
else:
# Fallback to Dynamic Client Registration
registration_request = create_client_registration_request(
self.context.oauth_metadata,
self.context.client_metadata,
self.context.get_authorization_base_url(self.context.server_url),
)
registration_response = yield registration_request
client_information = await handle_registration_response(registration_response)
self.context.client_info = client_information
await self.context.storage.set_client_info(client_information)

# Step 5: Perform authorization and complete token exchange
token_response = yield await self._perform_authorization()
Expand Down
71 changes: 70 additions & 1 deletion 71 src/mcp/client/auth/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from urllib.parse import urljoin, urlparse

from httpx import Request, Response
from pydantic import ValidationError
from pydantic import AnyUrl, ValidationError

from mcp.client.auth import OAuthRegistrationError, OAuthTokenError
from mcp.client.streamable_http import MCP_PROTOCOL_VERSION
Expand Down Expand Up @@ -243,6 +243,75 @@ async def handle_registration_response(response: Response) -> OAuthClientInforma
raise OAuthRegistrationError(f"Invalid registration response: {e}")


def is_valid_client_metadata_url(url: str | None) -> bool:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open to a different break down of helper functions here, this seemed like a reasonably flexible combo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems appropriate to me

"""Validate that a URL is suitable for use as a client_id (CIMD).

The URL must be HTTPS with a non-root pathname.

Args:
url: The URL to validate

Returns:
True if the URL is a valid HTTPS URL with a non-root pathname
"""
if not url:
return False
try:
parsed = urlparse(url)
return parsed.scheme == "https" and parsed.path not in ("", "/")
except Exception:
return False


def should_use_client_metadata_url(
oauth_metadata: OAuthMetadata | None,
client_metadata_url: str | None,
) -> bool:
"""Determine if URL-based client ID (CIMD) should be used instead of DCR.

URL-based client IDs should be used when:
1. The server advertises client_id_metadata_document_supported=true
2. The client has a valid client_metadata_url configured

Args:
oauth_metadata: OAuth authorization server metadata
client_metadata_url: URL-based client ID (already validated)

Returns:
True if CIMD should be used, False if DCR should be used
"""
if not client_metadata_url:
return False

if not oauth_metadata:
return False

return oauth_metadata.client_id_metadata_document_supported is True


def create_client_info_from_metadata_url(
client_metadata_url: str, redirect_uris: list[AnyUrl] | None = None
) -> OAuthClientInformationFull:
"""Create client information using a URL-based client ID (CIMD).

When using URL-based client IDs, the URL itself becomes the client_id
and no client_secret is used (token_endpoint_auth_method="none").

Args:
client_metadata_url: The URL to use as the client_id
redirect_uris: The redirect URIs from the client metadata (passed through for
compatibility with OAuthClientInformationFull which inherits from OAuthClientMetadata)

Returns:
OAuthClientInformationFull with the URL as client_id
"""
return OAuthClientInformationFull(
client_id=client_metadata_url,
token_endpoint_auth_method="none",
redirect_uris=redirect_uris,
)


async def handle_token_response_scopes(
response: Response,
) -> OAuthToken:
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.