LangChain是一個專為大語言模型設計的開放框架,圍繞任務鏈(Chain)與內存模塊(Memory)構建了核心架構。這兩大組件是LangChain高效構建複雜語言應用的關鍵所在,使模型在多任務環境中得以應對任務管理、上下文維護、記憶存儲等多種需求。
構建大模型智能應用的時候,往往需要對數據庫進行查詢,返回結果,如果自己寫MCP客户端和Tools工具比較麻煩。如果基於LangChain框架的話,可以簡化這種方法。本文就來演示一下,在LangChain框架下,如何使用MCP操作數據庫以及如何使用Tools來訪問數據庫。
我這裏假設你已經構建了MCP服務,或者調用的是公共的MCP服務。使用LangChain訪問MCP來調用工具,現階段主要是使用模型上下文協議(MCP)適配器來實現的。MCP適配器用於連接多個 MCP 服務器並加載 LangChain 兼容資源的客户端。此模塊提供 MultiServerMCPClient 類,用於管理與多個 MCP 服務器的連接,並從中加載工具、提示和資源。
要使用MultiServerMCPClient需要先安裝langchain_mcp_adapters模塊:
pip install langchain-mcp-adapters -i https://pypi.tuna.tsinghua.edu.cn/simple # 使用清華源會提升速度
另一個重要的模塊是langgraph.prebuilt。這個模塊隨langgraph模塊的安裝會自動安裝。
pip install langgraph -i https://pypi.tuna.tsinghua.edu.cn/simple
這個模塊是 LangGraph 庫的核心組成部分,提供了一系列預構建的組件和工具,旨在簡化複雜 AI 代理和工作流的開發過程。LangGraph 是 LangChain 生態的擴展框架,專注於構建有狀態、多步驟的 AI 系統,通過狀態圖(StateGraph)管理節點和邊,支持動態路由、循環和狀態管理。該模塊通過封裝常見的代理邏輯、工具執行和狀態管理功能,顯著降低了開發者的編碼負擔,適合快速原型化和生產級應用。這裏我們主要使用他的一個函數create_react_agent()。這個函數是用於構建基於 ReAct(思考-行動)模式 的智能代理(Agent)的核心函數,其作用是將大語言模型(LLM)與工具調用能力結合,實現動態任務處理。其中ReAct是一種結合推理和行動的代理架構。
使用示例代碼如下:
from langchain_mcp_adapters.client import MultiServerMCPClient
# 配置我們自己構建的MCP服務或者公共的MCP地址
client = MultiServerMCPClient(
{
"math": {
"command": "python",
# Make sure to update to the full absolute path to your math_server.py file
"args": ["/path/to/math_server.py"],
"transport": "stdio",
},
"weather": {
# Make sure you start your weather server on port 8000
"url": "https://:8000/mcp",
"transport": "streamable_http",
},
"mcp-server-chart": {
"command": "cmd",
"args": [
"/c",
"npx",
"-y",
"@antv/mcp-server-chart"
],
"transport": "stdio",
}
}
)
# 獲取所有的工具列表
tools = await client.get_tools()
# 真實調用的時候傳入大模型和工具列表構建一個agent
agent = create_react_agent(
model,
tools
)
# 通過invoke來實現工具的調用
result = await agent.ainvoke({
"messages": [...]
})
整體流程就是:
這裏我有一個Postgresql數據庫表,存儲是某市的供地信息,使用MCP查詢數據庫,並返回結果,代碼如下:
# 調用公開MCP工具----------------------
import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_openai_tools_agent
from langchain_core.messages import HumanMessage
from langchain_community.utilities import SQLDatabase
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_core.prompts import ChatPromptTemplate
model = ChatOpenAI(
streaming=True,
model='deepseek-chat',
openai_api_key=<你的API KEY>,
openai_api_base='https://api.deepseek.com',
max_tokens=1024,
temperature=0.7
)
async def mcp_main(query:str):
# 加載 MCP 配置
client = MultiServerMCPClient(
{
"postgres": {
"command": "cmd",
"args": [
"/c",
"npx",
"-y",
"@modelcontextprotocol/server-postgres",
"postgresql://postgres:123456@localhost:5432/gtyzt"
],
"transport": "stdio",
}
}
)
tools = await client.get_tools()
agent = create_react_agent(
model,
tools
)
system_prefix = """
你是一個SQL語句生成專家。根據用户的問題和提供的數據庫表結構信息,生成正確的SQL查詢語句。
數據庫元數據:
{schema}
生成SQL的規則:
1. 只生成SQL語句,不要添加任何解釋或説明
2. 使用正確的表名和列名,與提供的結構一致,字段要加上中文別名
3. 確保SQL語法正確,適用於PostgreSQL數據庫,geometry類型的字段要用ST_AsText(字段名)來獲取
4. 如果有聚合操作,確保使用正確的聚合函數
5. 對於日期類型的條件,使用正確的日期格式
6. 如果需要排序,添加適當的ORDER BY子句
7. 查詢結果要去除重複結果
"""
schema = """
"table_name": "gd",
"description": "供地數據表",
"columns":[
{"column_name": "ogc_fid","chinese_name": "標識碼","data_type": "int"},
{"column_name": "wkb_geometry","chinese_name": "地理座標信息","data_type": "geometry"},
{"column_name": "shape_len","chinese_name": "圖形長度","data_type": "double"},
{"column_name": "shape_area","chinese_name": "圖形面積","data_type": "double"},
{"column_name": "city","chinese_name": "大市","data_type": "varchar"},
{"column_name": "town","chinese_name": "區縣(只到區縣)","data_type": "varchar"},
{"column_name": "tdzl","chinese_name": "土地坐落(包括鄉鎮街道,道路)","data_type": "varchar"},
{"column_name": "xzqdm","chinese_name": "行政區代碼","data_type": "varchar"},
{"column_name": "tdyt","chinese_name": "土地用途","data_type": "varchar"},
{"column_name": "crnx","chinese_name": "出讓年限","data_type": "varchar"},
{"column_name": "gyfs","chinese_name": "供應方式","data_type": "varchar"},
]
"""
# 執行任務:訪問網頁並總結內容
result = await agent.ainvoke({
"messages": [
{"role":"system", "content":system_prefix.format(schema=schema)},
{"role":"user", "content":query}
]
})
print(result["messages"][-1].content)
return result["messages"][-1].content
if __name__ == "__main__":
query = "告訴我某市土地用途類型"
asyncio.run(mcp_main(query))
返回結果如下,可以看到結論清晰,並且如果數據庫比較大的話,我實測速度上比我們自己讓大模型去生成SQL,再執行是更快的,效率也更高。
除了使用MCP,還有一種辦法就是使用Tools來訪問數據庫,我們自己構建一個查詢數據庫的工具來進行查詢並返回,本質上也和前面的MCP一樣,構建了一個工具,一次性的返回查詢結果。自己編寫工具的最大好處就是比較可控。這裏我使用langchain_community模塊的create_sql_agent()函數。
首先,我們需要創建一個數據庫連接;然後我們可以構建提示詞,我這裏使用了few-shot方法,給出了幾個示例,這樣的話,有利於提升大模型生成SQL的準確性並且速度更快,注意在使用few-shot的時候需要使用embedding模型,可以自己本地部署,也可以使用網上的模型。接下來根據提示詞和大模型,調用create_sql_agent函數,構建一個sql代理agent。通過agent的invoke方法執行查詢,並把這個流程封裝成一個函數,作為tool暴露出來,供程序調用。主函數中,現在大模型上綁定這個tools工具,獲取包含工具調用的初始響應,通過工具的invoke函數獲取數據庫查詢結果,最後把結果傳給大模型,得到最終的輸出。
# text2sql工具,並以tool格式封裝,查詢結果可以通過大模型返回
from langchain.chains import create_sql_query_chain
from langchain_experimental.sql import SQLDatabaseChain
from langchain_community.utilities import SQLDatabase
from langchain_openai import ChatOpenAI
db = SQLDatabase.from_uri("postgresql://postgres:123456@localhost:5432/gtyzt")
model = ChatOpenAI(
streaming=True,
model='deepseek-chat',
openai_api_key=<你的APK KEY>,
openai_api_base='https://api.deepseek.com',
max_tokens=1024,
temperature=0.7
)
from langchain_community.agent_toolkits import create_sql_agent
from langchain_core.example_selectors import SemanticSimilarityExampleSelector
from langchain_openai import OpenAIEmbeddings
from langchain_ollama import OllamaEmbeddings
from langchain_community.vectorstores import FAISS
examples = [
{"input": "某某區供地地塊數量是多少?", "query": "SELECT COUNT(*) FROM gd WHERE town like '某某區%';"},
{"input": "查詢某某區供地信息","query":"""
SELECT DISTINCT
ogc_fid AS "標識碼",
shape_len AS "圖形長度",
shape_area AS "圖形面積",
city AS "大市",
town AS "區縣(只到區縣)",
tdzl AS "土地坐落(包括鄉鎮街道,道路)",
xzqdm AS "行政區代碼",
tdyt AS "土地用途",
crnx AS "出讓年限",
gyfs AS "供應方式"
FROM gd
WHERE town like '某某區%'
"""}
]
embeddings = OllamaEmbeddings(model="bge-m3:567m")
example_selector = SemanticSimilarityExampleSelector.from_examples(
examples,
embeddings,
FAISS,
k=5,
input_keys=["input"],
)
from langchain_core.prompts import (
ChatPromptTemplate,
FewShotPromptTemplate,
MessagesPlaceholder,
PromptTemplate,
SystemMessagePromptTemplate,
)
system_prefix = """
你是一個SQL語句生成專家。根據用户的問題和提供的數據庫表結構信息,生成正確的SQL查詢語句。
數據庫元數據:
{schema}
生成SQL的規則:
1. 只生成SQL語句,不要添加任何解釋或説明
2. 使用正確的表名和列名,與提供的結構一致,字段要加上中文別名
3. 確保SQL語法正確,適用於PostgreSQL數據庫,geometry類型的字段要用ST_AsText(字段名)來獲取
4. 如果有聚合操作,確保使用正確的聚合函數
5. 對於日期類型的條件,使用正確的日期格式
6. 如果需要排序,添加適當的ORDER BY子句
7. 查詢結果要去除重複結果
"""
schema = """
"table_name": "gd",
"description": "供地數據表",
"columns":[
{"column_name": "ogc_fid","chinese_name": "標識碼","data_type": "int"},
{"column_name": "wkb_geometry","chinese_name": "地理座標信息","data_type": "geometry"},
{"column_name": "shape_len","chinese_name": "圖形長度","data_type": "double"},
{"column_name": "shape_area","chinese_name": "圖形面積","data_type": "double"},
{"column_name": "city","chinese_name": "大市","data_type": "varchar"},
{"column_name": "town","chinese_name": "區縣(只到區縣)","data_type": "varchar"},
{"column_name": "tdzl","chinese_name": "土地坐落(包括鄉鎮街道,道路)","data_type": "varchar"},
{"column_name": "xzqdm","chinese_name": "行政區代碼","data_type": "varchar"},
{"column_name": "tdyt","chinese_name": "土地用途","data_type": "varchar"},
{"column_name": "crnx","chinese_name": "出讓年限","data_type": "varchar"},
{"column_name": "gyfs","chinese_name": "供應方式","data_type": "varchar"}
]
"""
few_shot_prompt = FewShotPromptTemplate(
example_selector=example_selector,
example_prompt=PromptTemplate.from_template(
"User input: {input}\nSQL query: {query}"
),
input_variables=["input", "schema"],
prefix=system_prefix,
suffix="",
)
full_prompt = ChatPromptTemplate.from_messages(
[
SystemMessagePromptTemplate(prompt=few_shot_prompt),
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad"),
]
)
agent = create_sql_agent(
llm=model,
db=db,
prompt=full_prompt,
verbose=True,
agent_type="openai-tools",
)
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
from langchain_core.messages import ToolMessage
from langchain_core.messages import AIMessage
import os
import json
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
@tool(descriptinotallow="查詢數據庫")
def query_db(query: str) -> str:
"""
查詢數據庫
Args:
query: 用户問題
"""
result = agent.invoke({"input": query, "schema": schema})
return result
@tool(descriptinotallow="查詢地理位置信息")
def query_map(query: str) -> str:
"""
查詢地理位置信息
Args:
query: 用户問題
"""
result = agent.invoke({"input": query, "schema": schema})
print(result)
return result
if __name__ == "__main__":
llm_with_tools = model.bind_tools([query_db])
query = "告訴我某市土地用途類型"
messages = [HumanMessage(query)]
# 獲取包含工具調用的初始響應
ai_msg = llm_with_tools.invoke(messages)
messages.append(ai_msg)
# 執行工具並獲取結果
if ai_msg.tool_calls:
tool_result = query_db.invoke(ai_msg.tool_calls[0])
json_str = tool_result.content
json_obj = json.loads(json_str)
tool_msg = ToolMessage(
cnotallow=json_obj['output'],
tool_call_id=ai_msg.tool_calls[0]['id']
)
messages.append(tool_msg)
# 流式輸出最終響應
for chunk in llm_with_tools.stream(messages):
if hasattr(chunk, 'content') and chunk.content:
print(chunk.content, end="", flush=True)
執行結果如下:
Finished chain.
根據數據庫查詢結果,某市的土地用途類型非常豐富,主要包括以下幾大類:
🏭 工業用地
- 一類工業用地
- 二類工業用地
- 三類工業用地
🏢 商業服務業用地
- 商務金融用地
- 零售商業用地
- 旅館用地
- 其他商服用地
- 其他商業服務業用地
... ...
還有一種方法就是通過大模型生成SQL,再執行SQL。這種方法其實也是比較好的,定製化比較好,而且對於需要直接返回數據結果而不希望經過大模型“提煉”與“總結”的場景來説就比較有用了,因為不管是MCP還是tools工具調用,查詢的結果都會經過大模型再過一邊,得到的是比較“人性化”的返回結果,但如果這個接口就是需要返回某種格式的原始結果,那麼用這種方式就會比較好了。
def generate_sql(query):
# 使用 invoke 方法調用鏈
input_data = {
"input": query,
"schema": schema
}
result = chain.invoke(input_data)
result = result.replace("```sql", "")
result = result.replace("```", "")
print(result)
return result
import psycopg2
import pandas as pd
def execute_query(sql):
try:
conn = psycopg2.connect(
dbname="gtyzt",
user="postgres",
password="123456",
host="localhost", # 例如'localhost'或者你的數據庫服務器IP地址
port="5432" # PostgreSQL默認端口是5432
)
df = pd.read_sql(sql, conn)
conn.close()
return df
except Exception as e:
print(f"查詢出錯: {e}")
return None
def query_map(query:str) -> str:
'''
查詢地圖
'''
sql = generate_sql(query)
result = execute_query(sql)
return result.to_json(force_ascii=False,orient="records")
if __name__ == "__main__":
sql = generate_sql("查詢某區供地信息")
result = execute_query(sql)
print(result.head(10).to_json(force_ascii=False,orient="records"))
通過這三種方式,基本可以完成基於大模型對數據庫進行訪問和查詢了。