-
Notifications
You must be signed in to change notification settings - Fork 813
Closed
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem? Please describe.
Mounting an existing ASGI application (handle_post_message
form SseServerTransport
) has some known issues/limitations (e.g. APIRouter).
Describe the solution you'd like
Instead of mounting, we can just convert the starlette code to fastapi code:
async def handle_post_message(request: Request):
# this is the same as in mcp/server/sse.py but compatible with FastAPI
logger.debug("Handling POST message")
session_id_param = request.query_params.get("session_id")
if session_id_param is None:
logger.warning("Received request without session_id")
raise HTTPException(status_code=400, detail="session_id is required")
try:
session_id = UUID(hex=session_id_param)
logger.debug(f"Parsed session ID: {session_id}")
except ValueError:
logger.warning(f"Received invalid session ID: {session_id_param}")
raise HTTPException(status_code=400, detail="Invalid session ID") from None
writer = get_sse_transport()._read_stream_writers.get(session_id)
if not writer:
logger.warning(f"Could not find session for ID: {session_id}")
raise HTTPException(status_code=404, detail="Could not find session")
json = await request.json()
logger.debug(f"Received JSON: {json}")
try:
message = types.JSONRPCMessage.model_validate(json)
logger.debug(f"Validated client message: {message}")
except ValidationError as err:
logger.error(f"Failed to parse message: {err}")
await writer.send(err)
raise HTTPException(status_code=400, detail="Could not parse message") from None
logger.debug(f"Sending message to writer: {message}")
await writer.send(message)
return JSONResponse(content={"message": "Accepted"}, status_code=202)
Then all we have to do is create the POST endpoint for /messages
def add_mcp_server(router: FastAPI | APIRouter, tools: List[Callable]) -> None:
mount_path: str = "/mcp"
# create FastMCP to be able to bind tools
mcp_server: FastMCP = FastMCP("MCP", instructions="Instructions")
# add tools
for tool in tools:
mcp_server.add_tool(tool)
@router.get(f"{mount_path}/sse")
async def fastapi_handle_mcp_connection(request: Request):
async with get_sse_transport().connect_sse(
request.scope, request.receive, request._send
) as (
read_stream,
write_stream,
):
await mcp_server._mcp_server.run(
read_stream,
write_stream,
mcp_server._mcp_server.create_initialization_options(),
)
@router.post("/mcp/messages/")
async def fastapi_handle_post_message(request: Request):
await handle_post_message(request)
Let me know what you think.
OR13
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request