博客 / 詳情

返回

Flask集成MCP的AI Agent

前言

近年來,大量新興的 AI 相關第三方庫都提供了異步接口,有些甚至出於性能考慮僅支持異步調用,例如 MCP SDK。伴隨着 Python 異步編程的發展,FastAPI 等框架迅速流行,許多新項目傾向於採用 FastAPI。但實際上,Flask 自 2.0 版本起也開始支持異步方法,因此我們也能借助 Flask 參與到 MCP 的 AI Agent 浪潮中。

PS:需要注意的是,Flask 的異步性能表現不佳。正如 Flask 官方文檔所指出的那樣,每當收到請求時,Flask 都會在一個線程內啓動一個新的事件循環來執行異步視圖函數,然後返回結果。這種方式增加了額外的性能開銷。經過個人實測,Flask 的異步視圖性能確實不如傳統的同步視圖。因此,如果有異步需求,建議還是選用像 FastAPI 這樣原生支持異步的框架。

Async functions require an event loop to run. Flask, as a WSGI application, uses one worker to handle one request/response cycle. When a request comes in to an async view, Flask will start an event loop in a thread, run the view function there, then return the result.

Each request still ties up one worker, even for async views. The upside is that you can run async code within a view, for example to make multiple concurrent database queries, HTTP requests to an external API, etc. However, the number of requests your application can handle at one time will remain the same.

Async is not inherently faster than sync code. Async is beneficial when performing concurrent IO-bound tasks, but will probably not improve CPU-bound tasks. Traditional Flask views will still be appropriate for most use cases, but Flask's async support enables writing and using code that wasn't possible natively before.

本文使用的 LLM 是阿里的通義千問,需要自行申請 API Key。其他兼容 OpenAI SDK 的模型理論上也可使用。

本文介紹的是如何在 Flask 中集成基於 MCP Client 和 MCP Server 的 AI Agent,並不僅僅是用 Flask 開發一個 MCP Server,所以只關注 Flask 實現 MCP Server 的看眾可以關閉本文了。

安裝 SDK

雖然 Flask 從 2.0 版本開始支持異步功能,但這部分功能需要額外安裝相關依賴。這裏我們將 MCP 和 LLM 相關的依賴一起安裝。

uv add 'flask[async]' fastmcp openai

此外還需要安裝 gunicorngevent,這兩個是在生產環境中常用的部署工具。雖然是開發演示項目,但我們也會安裝它們來驗證異步功能的支持情況。

uv add gunicorn gevent

代碼示例

代碼結構

由於這只是個演示項目,所以代碼結構相對簡單。

├── aiagent  # aiagent 模塊
│   ├── mcp_client.py  # 封裝 mcp 的 client
│   └── mcp_servers  # mcp 的 servers
│       ├── common.py  # 會被導入到 composition 的 mcp server 中
│       └── composition.py  # composition 的 mcp server, client 只會連這個 mcp server
├── config.py  # 配置模塊
├── log.py  # 日誌模塊
├── main.py  # 入口文件
├── pyproject.toml
├── README.md
└── uv.lock

配置模塊

簡單寫寫,能用就行,注意替換 API Keyconfig.py

class Config:
    @property
    def llm_base_url(self) -> str:
        return "https://dashscope.aliyuncs.com/compatible-mode/v1"
    
    @property
    def llm_model(self) -> str:
        return "qwen-plus"
    
    @property
    def llm_api_key(self) -> str:
        return "<your api key>"
    
cfg = Config()

日誌模塊

簡單寫寫,能用就行。log.py

import logging
import sys


def setup_logger() -> logging.Logger:
    level = logging.DEBUG
    logger = logging.getLogger("flask-mcp")
    logger.setLevel(level)

    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(level)

    fmt = "%(asctime)s | %(name)s | %(levelname)s | %(filename)s:%(lineno)d | %(funcName)s | %(message)s"
    datefmt = "%Y-%m-%d %H:%M:%S"
    formatter = logging.Formatter(fmt, datefmt)
    handler.setFormatter(formatter)

    logger.addHandler(handler)
    return logger

logger = setup_logger()

MCP Servers

common

我們可以將這個 common mcp server 視為子服務器,它將被導入到父級 composition server 中統一管理。

在 common mcp server 中,以下僅實現了一個用於獲取當前時間的工具函數。

from datetime import datetime

from fastmcp import FastMCP

mcp = FastMCP(name="common mcp server", instructions="Common MCP server for general tasks.")

@mcp.tool
async def get_current_datetime() -> str:
    """Get the current date and time as a string. Format: YYYY-MM-DDTHH:MM:SS±hhmm"""
    return datetime.now().strftime("%Y-%m-%dT%H:%M:%S%z")

if __name__ == "__main__":
    mcp.run(transport="stdio", show_banner=False)

composition

其他的 mcp server 都會組合到這個 composition mcp server 中。一方面 mcp client 只需要連這一個 composition mcp server 即可,另一方面可以按功能組織 mcp server 的代碼。

import asyncio
import sys
from pathlib import Path

sys.path.append(str(Path(__file__).parents[2]))
from fastmcp import FastMCP

from aiagent.mcp_servers.common import mcp as common_mcp

composition_mcp = FastMCP(name="composition mcp server")

async def compose():
    await composition_mcp.import_server(common_mcp)

if __name__ == "__main__":
    asyncio.run(compose())
    composition_mcp.run(transport="stdio", show_banner=False)

MCP Client

aiagent/mcp_client.py 負責整合 LLM 與 MCP Server 的交互。示例中的對話記憶僅存儲在實例變量中,在實際應用中應考慮持久化存儲方案。

import json
from pathlib import Path
from typing import cast

from fastmcp.client import Client, StdioTransport
from openai import AsyncOpenAI
from openai.types.chat import ChatCompletionMessageFunctionToolCall

from config import cfg
from log import logger


class MCPClient:
    def __init__(self):
        self.client = Client(StdioTransport(
            command=str(Path(__file__).parent.parent / ".venv" / "bin" / "python"),
            args=[str(Path(__file__).parent / "mcp_servers" / "composition.py")],
            cwd=str(Path(__file__).parent / "mcp_servers"),
        ))
        self._llm = AsyncOpenAI(
            base_url=cfg.llm_base_url,
            api_key=cfg.llm_api_key,
        )

        self._temp_memories = []

    async def close(self):
        if self.client:
            await self.client.close()

    async def process(self, prompt: str, system_prompt: str = "") -> str:
        if system_prompt:
            self._temp_memories.append({"role": "system", "content": system_prompt})
        
        self._temp_memories.append({"role": "user", "content": prompt})

        async with self.client:
            tools = await self.client.list_tools()
            available_tools = []

            for tool in tools:
                available_tools.append({
                    "type": "function",
                    "function": {
                        "name": tool.name,
                        "description": tool.description,
                        "parameters": tool.inputSchema,
                    }
                })

            logger.info(f"Available mcp tools: {[tool.name for tool in tools]}")

            resp = await self._llm.chat.completions.create(
                model=cfg.llm_model,
                messages=self._temp_memories,
                tools=available_tools,
                temperature=0.3,
            )

            # 存儲最終響應文本
            final_text = []
            # 獲取 LLM 的首個響應消息
            message = resp.choices[0].message
            # 如果響應包含直接內容,則添加到結果中
            if hasattr(message, "content") and message.content:
                final_text.append(message.content)

            # 循環處理工具調用,直到沒有更多工具調用為止
            while message.tool_calls:
                # 遍歷所有工具調用
                for tool_call in message.tool_calls:
                    # 確保工具調用有函數信息
                    if not hasattr(tool_call, "function"):
                        continue

                    # 類型轉換以獲取函數調用詳情
                    function_call = cast(ChatCompletionMessageFunctionToolCall, tool_call)
                    function = function_call.function
                    tool_name = function.name
                    # 解析函數參數
                    tool_args = json.loads(function.arguments)

                    # 檢查 MCP 客户端是否已連接
                    if not self.client.is_connected():
                        raise RuntimeError("Session not initialized. Cannot call tool.")
                    
                    # 調用 MCP 服務器上的指定工具
                    logger.info(f"Calling tool: {tool_name} with args: {tool_args}")
                    result = await self.client.call_tool(tool_name, tool_args)

                    # 將助手的工具調用添加到消息歷史中
                    self._temp_memories.append({
                        "role": "assistant",
                        "tool_calls": [
                            {
                                "id": tool_call.id,
                                "type": "function",
                                "function": {
                                    "name": function.name,
                                    "arguments": function.arguments
                                }
                            }
                        ]
                    })

                    # 將工具調用結果添加到消息歷史中
                    self._temp_memories.append({
                        "role": "tool",
                        "tool_call_id":tool_call.id,
                        "content": str(result.content) if result.content else ""
                    })
                
                # 基於工具調用結果再次調用 LLM
                final_resp = await self._llm.chat.completions.create(
                    model=cfg.llm_model,
                    messages=self._temp_memories,
                    tools=available_tools,
                    temperature=0.3,
                )
                # 更新消息為最新的 LLM 響應
                message = final_resp.choices[0].message
                # 如果響應包含內容,則添加到最終結果中
                if message.content:
                    final_text.append(message.content)

            # 返回連接後的完整響應
            return "\n".join(final_text)

main

main.py 是應用程序的入口文件,其中實現了健康檢查的同步視圖和處理聊天請求的異步視圖。此外還自定義了響應類,確保 Flask 能正確處理非英文字符。

import json
from http import HTTPStatus
from typing import Optional

from flask import Flask, Response, request

from aiagent.mcp_client import MCPClient

app = Flask(__name__)

class APIResponse(Response):
    def __init__(self, data: Optional[dict] = None, code: HTTPStatus = HTTPStatus.OK, msg: str = "success"):
        headers = dict({"Content-Type": "application/json; charset=utf-8"})
        response = json.dumps({
            "code": code.value,
            "msg": msg,
            "data": data,
        }, ensure_ascii=False, default=str)
        super().__init__(response=response, status=code.value, headers=headers)

@app.get("/health")
def health_check():
    return APIResponse(data={"status": "ok"})

@app.post("/chat")
async def post_chat():
    try:
        req_body: Optional[dict] = request.get_json()
        if not req_body or "prompt" not in req_body:
            return APIResponse(code=HTTPStatus.BAD_REQUEST, msg="Missing 'prompt' in request body")
        prompt = req_body.get("prompt")
        if not isinstance(prompt, str):
            return APIResponse(code=HTTPStatus.BAD_REQUEST, msg="'prompt' must be a string")
    except Exception as e:
        return APIResponse(code=HTTPStatus.BAD_REQUEST, msg=str(e))
    
    mcp_client = MCPClient()
    try:
        resp = await mcp_client.process(prompt=prompt)
        resp_body = {
            "content": resp
        }
        return APIResponse(data=resp_body)
    except Exception as e:
        return APIResponse(code=HTTPStatus.INTERNAL_SERVER_ERROR, msg=str(e))
    finally:
        await mcp_client.close()


if __name__ == "__main__":
    app.run(host="127.0.0.1", port=8000)

運行測試

  1. 首先啓動服務端應用
python main.py
  1. 使用 curl 發起請求進行測試。可以看到接口成功返回了答案。通過服務端控制枱日誌可以看出 mcp client 成功調用了 get_current_datetime 工具。
$ curl --request POST \
--url http://127.0.0.1:8000/chat \
--header 'content-type: application/json' \
--data '{
"prompt": "今天的日期是什麼"
}'
{"code": 200, "msg": "success", "data": {"content": "今天的日期是 2025 年 12 月 8 日。"}}

# Server 端控制枱日誌
2025-12-08 21:55:50 | flask-mcp | INFO | mcp_client.py:51 | process | Available mcp tools: ['get_current_datetime']
2025-12-08 21:55:51 | flask-mcp | INFO | mcp_client.py:88 | process | Calling tool: get_current_datetime with args: {}
  1. 使用 gunicorn 啓動應用,測試 gunicorn 對異步方法的支持情況。
gunicorn main:app -n 127.0.0.1:8000 -w 4 -k gevent --worker-connections 1000
  1. 再次使用 curl 測試。測試依然正常。
$ curl --request POST \
  --url http://127.0.0.1:8000/chat \
  --header 'content-type: application/json' \
  --data '{
  "prompt": "現在是什麼時候?"
}'
{"code": 200, "msg": "success", "data": {"content": "現在是 2025 年 12 月 8 日 22 時 37 分 50 秒。"}}

小結

通過上述示例可以看出,Flask 仍然能夠勝任基於 MCP 的 AI Agent 應用開發任務。而且 Flask 2.0 之後的版本與之前版本保持良好的兼容性,因此可以考慮將舊項目升級到新版。不過需要再次強調的是,Flask 的異步視圖性能並不理想,對於新的 AI Agent 項目,建議優先選擇原生支持異步的框架。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.