Skip to content

Add FastAPI endpoints (POST) instead of mounting #28

@johschmidt42

Description

@johschmidt42

Is your feature request related to a problem? Please describe.

Mounting an existing ASGI application (handle_post_message form SseServerTransport) has some known issues/limitations (e.g. APIRouter).

Describe the solution you'd like

Instead of mounting, we can just convert the starlette code to fastapi code:

async def handle_post_message(request: Request):
    # this is the same as in mcp/server/sse.py but compatible with FastAPI

    logger.debug("Handling POST message")

    session_id_param = request.query_params.get("session_id")
    if session_id_param is None:
        logger.warning("Received request without session_id")
        raise HTTPException(status_code=400, detail="session_id is required")

    try:
        session_id = UUID(hex=session_id_param)
        logger.debug(f"Parsed session ID: {session_id}")
    except ValueError:
        logger.warning(f"Received invalid session ID: {session_id_param}")
        raise HTTPException(status_code=400, detail="Invalid session ID") from None

    writer = get_sse_transport()._read_stream_writers.get(session_id)
    if not writer:
        logger.warning(f"Could not find session for ID: {session_id}")
        raise HTTPException(status_code=404, detail="Could not find session")

    json = await request.json()
    logger.debug(f"Received JSON: {json}")

    try:
        message = types.JSONRPCMessage.model_validate(json)
        logger.debug(f"Validated client message: {message}")
    except ValidationError as err:
        logger.error(f"Failed to parse message: {err}")
        await writer.send(err)
        raise HTTPException(status_code=400, detail="Could not parse message") from None

    logger.debug(f"Sending message to writer: {message}")
    await writer.send(message)
    return JSONResponse(content={"message": "Accepted"}, status_code=202)

Then all we have to do is create the POST endpoint for /messages

def add_mcp_server(router: FastAPI | APIRouter, tools: List[Callable]) -> None:
    mount_path: str = "/mcp"

    # create FastMCP to be able to bind tools
    mcp_server: FastMCP = FastMCP("MCP", instructions="Instructions")

    # add tools
    for tool in tools:
        mcp_server.add_tool(tool)

    @router.get(f"{mount_path}/sse")
    async def fastapi_handle_mcp_connection(request: Request):
        async with get_sse_transport().connect_sse(
            request.scope, request.receive, request._send
        ) as (
            read_stream,
            write_stream,
        ):
            await mcp_server._mcp_server.run(
                read_stream,
                write_stream,
                mcp_server._mcp_server.create_initialization_options(),
            )

    @router.post("/mcp/messages/")
    async def fastapi_handle_post_message(request: Request):
        await handle_post_message(request)

Let me know what you think.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions