LangChain是一個專為大語言模型設計的開放框架,圍繞任務鏈(Chain)與內存模塊(Memory)構建了核心架構。這兩大組件是LangChain高效構建複雜語言應用的關鍵所在,使模型在多任務環境中得以應對任務管理、上下文維護、記憶存儲等多種需求。

構建大模型智能應用的時候,往往需要對數據庫進行查詢,返回結果,如果自己寫MCP客户端和Tools工具比較麻煩。如果基於LangChain框架的話,可以簡化這種方法。本文就來演示一下,在LangChain框架下,如何使用MCP操作數據庫以及如何使用Tools來訪問數據庫。

我這裏假設你已經構建了MCP服務,或者調用的是公共的MCP服務。使用LangChain訪問MCP來調用工具,現階段主要是使用模型上下文協議(MCP)適配器來實現的。MCP適配器用於連接多個 MCP 服務器並加載 LangChain 兼容資源的客户端。此模塊提供 MultiServerMCPClient 類,用於管理與多個 MCP 服務器的連接,並從中加載工具、提示和資源。

要使用MultiServerMCPClient需要先安裝langchain_mcp_adapters模塊:

pip install langchain-mcp-adapters -i https://pypi.tuna.tsinghua.edu.cn/simple # 使用清華源會提升速度

另一個重要的模塊是langgraph.prebuilt。這個模塊隨langgraph模塊的安裝會自動安裝。

pip install langgraph -i https://pypi.tuna.tsinghua.edu.cn/simple

這個模塊是 LangGraph 庫的核心組成部分,提供了一系列預構建的組件和工具,旨在簡化複雜 AI 代理和工作流的開發過程。LangGraph 是 LangChain 生態的擴展框架,專注於構建有狀態、多步驟的 AI 系統,通過狀態圖(StateGraph)管理節點和邊,支持動態路由、循環和狀態管理。該模塊通過封裝常見的代理邏輯、工具執行和狀態管理功能,顯著降低了開發者的編碼負擔,適合快速原型化和生產級應用。這裏我們主要使用他的一個函數create_react_agent()。這個函數是用於構建基於 ReAct(思考-行動)模式 的智能代理(Agent)的核心函數,其作用是將大語言模型(LLM)與工具調用能力結合,實現動態任務處理。其中ReAct是一種結合推理和行動的代理架構。

使用示例代碼如下:

from langchain_mcp_adapters.client import MultiServerMCPClient
# 配置我們自己構建的MCP服務或者公共的MCP地址
client = MultiServerMCPClient(
    {
        "math": {
            "command": "python",
            # Make sure to update to the full absolute path to your math_server.py file
            "args": ["/path/to/math_server.py"],
            "transport": "stdio",
        },
        "weather": {
            # Make sure you start your weather server on port 8000
            "url": "https://:8000/mcp",
            "transport": "streamable_http",
        },
        "mcp-server-chart": {
            "command": "cmd",
            "args": [
                "/c",
                "npx",
                "-y",
                "@antv/mcp-server-chart"
            ],
            "transport": "stdio",
        }
    }
)
# 獲取所有的工具列表
tools = await client.get_tools()
# 真實調用的時候傳入大模型和工具列表構建一個agent
agent = create_react_agent(
    model,
    tools
)
# 通過invoke來實現工具的調用
result = await agent.ainvoke({
    "messages": [...]
})

整體流程就是:

LangChain下使用MCP和Tools工具訪問數據庫方法_數據庫

這裏我有一個Postgresql數據庫表,存儲是某市的供地信息,使用MCP查詢數據庫,並返回結果,代碼如下:

# 調用公開MCP工具----------------------
import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_openai_tools_agent
from langchain_core.messages import HumanMessage
from langchain_community.utilities import SQLDatabase
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_core.prompts import ChatPromptTemplate
model = ChatOpenAI(
    streaming=True,
    model='deepseek-chat', 
    openai_api_key=<你的API KEY>, 
    openai_api_base='https://api.deepseek.com',
    max_tokens=1024,
    temperature=0.7
)
async def mcp_main(query:str):
    # 加載 MCP 配置
    client = MultiServerMCPClient(
        {
            "postgres": {
                "command": "cmd",
                "args": [
                    "/c",
                    "npx",
                    "-y",
                    "@modelcontextprotocol/server-postgres",
                    "postgresql://postgres:123456@localhost:5432/gtyzt"
                ],
                "transport": "stdio",
            }
        }
    )
    tools = await client.get_tools()


    agent = create_react_agent(
        model,
        tools
    )


    system_prefix = """
    你是一個SQL語句生成專家。根據用户的問題和提供的數據庫表結構信息,生成正確的SQL查詢語句。
                
                數據庫元數據:
                {schema}
                
                生成SQL的規則:
                1. 只生成SQL語句,不要添加任何解釋或説明
                2. 使用正確的表名和列名,與提供的結構一致,字段要加上中文別名
                3. 確保SQL語法正確,適用於PostgreSQL數據庫,geometry類型的字段要用ST_AsText(字段名)來獲取
                4. 如果有聚合操作,確保使用正確的聚合函數
                5. 對於日期類型的條件,使用正確的日期格式
                6. 如果需要排序,添加適當的ORDER BY子句
                7. 查詢結果要去除重複結果
    """
    schema = """
        "table_name": "gd",
        "description": "供地數據表",
        "columns":[
            {"column_name": "ogc_fid","chinese_name": "標識碼","data_type": "int"},
            {"column_name": "wkb_geometry","chinese_name": "地理座標信息","data_type": "geometry"},
            {"column_name": "shape_len","chinese_name": "圖形長度","data_type": "double"},
            {"column_name": "shape_area","chinese_name": "圖形面積","data_type": "double"},
            {"column_name": "city","chinese_name": "大市","data_type": "varchar"},
            {"column_name": "town","chinese_name": "區縣(只到區縣)","data_type": "varchar"},
            {"column_name": "tdzl","chinese_name": "土地坐落(包括鄉鎮街道,道路)","data_type": "varchar"},
            {"column_name": "xzqdm","chinese_name": "行政區代碼","data_type": "varchar"},
            {"column_name": "tdyt","chinese_name": "土地用途","data_type": "varchar"},
            {"column_name": "crnx","chinese_name": "出讓年限","data_type": "varchar"},
            {"column_name": "gyfs","chinese_name": "供應方式","data_type": "varchar"},
       ]
    """
    # 執行任務:訪問網頁並總結內容
    result = await agent.ainvoke({
        "messages": [
            {"role":"system", "content":system_prefix.format(schema=schema)},
            {"role":"user", "content":query}
        ]
    })
    print(result["messages"][-1].content)
    return result["messages"][-1].content
if __name__ == "__main__":
    query = "告訴我某市土地用途類型"
    asyncio.run(mcp_main(query))

返回結果如下,可以看到結論清晰,並且如果數據庫比較大的話,我實測速度上比我們自己讓大模型去生成SQL,再執行是更快的,效率也更高。

LangChain下使用MCP和Tools工具訪問數據庫方法_數據庫_02

除了使用MCP,還有一種辦法就是使用Tools來訪問數據庫,我們自己構建一個查詢數據庫的工具來進行查詢並返回,本質上也和前面的MCP一樣,構建了一個工具,一次性的返回查詢結果。自己編寫工具的最大好處就是比較可控。這裏我使用langchain_community模塊的create_sql_agent()函數。

首先,我們需要創建一個數據庫連接;然後我們可以構建提示詞,我這裏使用了few-shot方法,給出了幾個示例,這樣的話,有利於提升大模型生成SQL的準確性並且速度更快,注意在使用few-shot的時候需要使用embedding模型,可以自己本地部署,也可以使用網上的模型。接下來根據提示詞和大模型,調用create_sql_agent函數,構建一個sql代理agent。通過agent的invoke方法執行查詢,並把這個流程封裝成一個函數,作為tool暴露出來,供程序調用。主函數中,現在大模型上綁定這個tools工具,獲取包含工具調用的初始響應,通過工具的invoke函數獲取數據庫查詢結果,最後把結果傳給大模型,得到最終的輸出。

# text2sql工具,並以tool格式封裝,查詢結果可以通過大模型返回
from langchain.chains import create_sql_query_chain
from langchain_experimental.sql import SQLDatabaseChain
from langchain_community.utilities import SQLDatabase
from langchain_openai import ChatOpenAI
db = SQLDatabase.from_uri("postgresql://postgres:123456@localhost:5432/gtyzt")
model = ChatOpenAI(
    streaming=True,
    model='deepseek-chat', 
    openai_api_key=<你的APK KEY>, 
    openai_api_base='https://api.deepseek.com',
    max_tokens=1024,
    temperature=0.7
)
from langchain_community.agent_toolkits import create_sql_agent
from langchain_core.example_selectors import SemanticSimilarityExampleSelector
from langchain_openai import OpenAIEmbeddings
from langchain_ollama import OllamaEmbeddings
from langchain_community.vectorstores import FAISS
examples = [
    {"input": "某某區供地地塊數量是多少?", "query": "SELECT COUNT(*) FROM gd WHERE town like '某某區%';"},
    {"input": "查詢某某區供地信息","query":"""
            SELECT DISTINCT
                ogc_fid AS "標識碼",
                shape_len AS "圖形長度",
                shape_area AS "圖形面積",
                city AS "大市",
                town AS "區縣(只到區縣)",
                tdzl AS "土地坐落(包括鄉鎮街道,道路)",
                xzqdm AS "行政區代碼",
                tdyt AS "土地用途",
                crnx AS "出讓年限",
                gyfs AS "供應方式"
            FROM gd
            WHERE town like '某某區%'
     """}
]
embeddings = OllamaEmbeddings(model="bge-m3:567m")
example_selector = SemanticSimilarityExampleSelector.from_examples(
    examples,
    embeddings,
    FAISS,
    k=5,
    input_keys=["input"],
)
from langchain_core.prompts import (
    ChatPromptTemplate,
    FewShotPromptTemplate,
    MessagesPlaceholder,
    PromptTemplate,
    SystemMessagePromptTemplate,
)
system_prefix = """
你是一個SQL語句生成專家。根據用户的問題和提供的數據庫表結構信息,生成正確的SQL查詢語句。
            
            數據庫元數據:
            {schema}
            
            生成SQL的規則:
            1. 只生成SQL語句,不要添加任何解釋或説明
            2. 使用正確的表名和列名,與提供的結構一致,字段要加上中文別名
            3. 確保SQL語法正確,適用於PostgreSQL數據庫,geometry類型的字段要用ST_AsText(字段名)來獲取
            4. 如果有聚合操作,確保使用正確的聚合函數
            5. 對於日期類型的條件,使用正確的日期格式
            6. 如果需要排序,添加適當的ORDER BY子句
            7. 查詢結果要去除重複結果
"""
schema = """
    "table_name": "gd",
    "description": "供地數據表",
    "columns":[
        {"column_name": "ogc_fid","chinese_name": "標識碼","data_type": "int"},
        {"column_name": "wkb_geometry","chinese_name": "地理座標信息","data_type": "geometry"},
        {"column_name": "shape_len","chinese_name": "圖形長度","data_type": "double"},
        {"column_name": "shape_area","chinese_name": "圖形面積","data_type": "double"},
        {"column_name": "city","chinese_name": "大市","data_type": "varchar"},
        {"column_name": "town","chinese_name": "區縣(只到區縣)","data_type": "varchar"},
        {"column_name": "tdzl","chinese_name": "土地坐落(包括鄉鎮街道,道路)","data_type": "varchar"},
        {"column_name": "xzqdm","chinese_name": "行政區代碼","data_type": "varchar"},
        {"column_name": "tdyt","chinese_name": "土地用途","data_type": "varchar"},
        {"column_name": "crnx","chinese_name": "出讓年限","data_type": "varchar"},
        {"column_name": "gyfs","chinese_name": "供應方式","data_type": "varchar"} 
    ]
"""
few_shot_prompt = FewShotPromptTemplate(
    example_selector=example_selector,
    example_prompt=PromptTemplate.from_template(
        "User input: {input}\nSQL query: {query}"
    ),
    input_variables=["input", "schema"],
    prefix=system_prefix,
    suffix="",
)
full_prompt = ChatPromptTemplate.from_messages(
    [
        SystemMessagePromptTemplate(prompt=few_shot_prompt),
        ("human", "{input}"),
        MessagesPlaceholder("agent_scratchpad"),
    ]
)
agent = create_sql_agent(
    llm=model,
    db=db,
    prompt=full_prompt,
    verbose=True,
    agent_type="openai-tools",
)
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
from langchain_core.messages import ToolMessage
from langchain_core.messages import AIMessage
import os
import json
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
@tool(descriptinotallow="查詢數據庫")
def query_db(query: str) -> str:
    """
    查詢數據庫
    Args:
        query: 用户問題
    """
    result = agent.invoke({"input": query, "schema": schema})
    return result
@tool(descriptinotallow="查詢地理位置信息")
def query_map(query: str) -> str:
    """
    查詢地理位置信息
    Args:
        query: 用户問題
    """
    result = agent.invoke({"input": query, "schema": schema})
    print(result)
    return result
    
if __name__ == "__main__":
    llm_with_tools = model.bind_tools([query_db])
    query = "告訴我某市土地用途類型"
    messages = [HumanMessage(query)]
    
    # 獲取包含工具調用的初始響應
    ai_msg = llm_with_tools.invoke(messages)
    messages.append(ai_msg)
    
    # 執行工具並獲取結果
    if ai_msg.tool_calls:
        tool_result = query_db.invoke(ai_msg.tool_calls[0])
        json_str = tool_result.content
        json_obj = json.loads(json_str)
        tool_msg = ToolMessage(
            cnotallow=json_obj['output'],
            tool_call_id=ai_msg.tool_calls[0]['id']
        )
        messages.append(tool_msg)
    
    # 流式輸出最終響應
    for chunk in llm_with_tools.stream(messages):
        if hasattr(chunk, 'content') and chunk.content:
            print(chunk.content, end="", flush=True)

執行結果如下:

LangChain下使用MCP和Tools工具訪問數據庫方法_sql_03

Finished chain.

根據數據庫查詢結果,某市的土地用途類型非常豐富,主要包括以下幾大類:

🏭 工業用地

- 一類工業用地

- 二類工業用地

- 三類工業用地

🏢 商業服務業用地

- 商務金融用地

- 零售商業用地

- 旅館用地

- 其他商服用地

- 其他商業服務業用地

... ...

還有一種方法就是通過大模型生成SQL,再執行SQL。這種方法其實也是比較好的,定製化比較好,而且對於需要直接返回數據結果而不希望經過大模型“提煉”與“總結”的場景來説就比較有用了,因為不管是MCP還是tools工具調用,查詢的結果都會經過大模型再過一邊,得到的是比較“人性化”的返回結果,但如果這個接口就是需要返回某種格式的原始結果,那麼用這種方式就會比較好了。

def generate_sql(query):
    # 使用 invoke 方法調用鏈
    input_data = {
        "input": query,
        "schema": schema
    }
    result = chain.invoke(input_data)
    result = result.replace("```sql", "")
    result = result.replace("```", "")
    print(result)
    return result
import psycopg2
import pandas as pd
def execute_query(sql):
    try:
        conn = psycopg2.connect(
            dbname="gtyzt",
            user="postgres",
            password="123456",
            host="localhost",  # 例如'localhost'或者你的數據庫服務器IP地址
            port="5432"   # PostgreSQL默認端口是5432
        )
        df = pd.read_sql(sql, conn)
        conn.close()
        return df
    except Exception as e:
        print(f"查詢出錯: {e}")
        return None
    
def query_map(query:str) -> str:
    '''
        查詢地圖
    '''
    sql = generate_sql(query)
    result = execute_query(sql)
    return result.to_json(force_ascii=False,orient="records")
    
if __name__ == "__main__":
    sql = generate_sql("查詢某區供地信息")
    result = execute_query(sql)
    print(result.head(10).to_json(force_ascii=False,orient="records"))

通過這三種方式,基本可以完成基於大模型對數據庫進行訪問和查詢了。