RAG(Retrieval-Augmented Generation)在語言模型應用中已經相當成熟,但傳統實現往往只是簡單的"檢索-生成"流程。實際對話場景要複雜得多——用户的問題可能含糊不清,或者會頻繁追問,還經常提些不相關的內容。
這篇文章會展示怎麼用 LangGraph 構建一個具備實用價值的 RAG 系統,包括能夠處理後續追問、過濾無關請求、評估檢索結果的質量,同時保持完整的對話記憶。
傳統 RAG 就是檢索 top-k 文檔然後餵給模型生成答案。但對話場景下會遇到幾個棘手的問題:用户問題經常模糊不清或者是追問;檢索到的 k 個文檔可能壓根不相關;還有些問題完全不在系統能回答的範圍內。
這個問題的解決辦法是把聊天曆史、文檔相關性評分、查詢迭代優化以及智能路由結合起來,讓系統能夠理解上下文,還能自我修正。
最終要實現的系統能做下面的事情:
接收對話式提問包括各種追問,把問題改寫成獨立完整的形式,判斷問題是否屬於可回答的範疇,檢索相關文檔並評估質量,沒找到合適文檔時自動優化查詢重新檢索,達到嘗試上限後要麼給出答案要麼返回無法回答,整個過程保留完整對話歷史。
環境準備
先確認 Python 版本,需要 3.10 或更高,然後創建項目目錄
mkdir-p complex-agentic-rag
cd complex-agentic-rag
touch complex_agentic_rag.ipynb
初始化虛擬環境:
uv init .
uv venv
source .venv/bin/activate
裝依賴包:
uv add langchain langgraph langchain-google-genai mypy pillow chromadb
去 AI Studio 申請個 Gemini API key,保存到
.env
文件裏:
touch .env
寫入密鑰:
GOOGLE_API_KEY=<your_gemini_api_key>
別忘了更新
.gitignore
:
echo".env" >> .gitignore
核心實現
加載配置
fromdotenvimportload_dotenv
load_dotenv()
準備測試數據,這裏用幾個關於餐廳的文檔做演示:
from langchain.schema import Document
docs = [
Document(
page_content=(
"Bella Vista is owned by Antonio Rossi, a renowned chef with over 20 years of experience "
"in the culinary industry. He started Bella Vista to bring authentic Italian flavors to the community."
),
metadata={"source": "owner.txt"},
),
Document(
page_content=(
"Bella Vista offers a range of dishes with prices that cater to various budgets. "
"Appetizers start at $8, main courses range from $15 to $35, and desserts are priced between $6 and $12."
),
metadata={"source": "dishes.txt"},
),
Document(
page_content=(
"Bella Vista is open from Monday to Sunday. Weekday hours are 11:00 AM to 10:00 PM, "
"while weekend hours are extended from 11:00 AM to 11:00 PM."
),
metadata={"source": "restaurant_info.txt"},
),
Document(
page_content=(
"Bella Vista offers a variety of menus including a lunch menu, dinner menu, and a special weekend brunch menu. "
"The lunch menu features light Italian fare, the dinner menu offers a more extensive selection of traditional and contemporary dishes, "
"and the brunch menu includes both classic breakfast items and Italian specialties."
),
metadata={"source": "restaurant_info.txt"},
),
]
把文檔編碼後存到 Chroma 向量庫裏。用 Google 的 embedding 模型,設置返回最相關的 2 個文檔:
from langchain_community.vectorstores import Chroma
from langchain_google_genai import GoogleGenerativeAIEmbeddings
embedding_function = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
db = Chroma.from_documents(docs, embedding_function)
retriever = db.as_retriever(search_kwargs={"k": 2})
測試下檢索功能:
retriever.invoke("who is the owner of bella vista?")
輸出應該包含老闆 Antonio Rossi 的信息:
[Document(metadata={'source': 'owner.txt'}, page_content='Bella Vista is owned by Antonio Rossi, a renowned chef with over 20 years of experience in the culinary industry. He started Bella Vista to bring authentic Italian flavors to the community.'),
Document(metadata={'source': 'restaurant_info.txt'}, page_content='Bella Vista is open from Monday to Sunday. Weekday hours are 11:00 AM to 10:00 PM, while weekend hours are extended from 11:00 AM to 11:00 PM.')]
Prompt 和 RAG 鏈需要定義一個帶歷史記錄的提示模板,這樣模型能理解上下文:
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
template = """
Answer the question based on the following context and the chat history:
Chat history: {history}
Context: {context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
rag_chain = prompt | llm
狀態定義使用
DialogState
來追蹤整個對話過程中的各種信息:
from typing import List, TypedDict
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
class DialogState(TypedDict):
turns: List[BaseMessage]
retrieved_docs: List[Document]
topic_flag: str
refined_query: str
ready_for_response: bool
refinement_attempts: int
question: HumanMessage # <== ✅ This is the correct key to use
整個系統由多個節點組成,每個節點負責一個特定功能。
問題重寫器 把追問改寫成完整問題。分類器 判斷問題是否在回答範圍內。路由器 根據分類結果決定走哪條路徑。檢索器 拉取相關文檔。評分器 用 LLM 判斷文檔是否真的相關。優化器 在沒找到合適文檔時改寫問題重試。生成器 基於檢索到的文檔和歷史對話生成回答。兜底節點 處理超出範圍或無法回答的情況。
完整代碼如下:
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain.schema import Document
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, END
class TopicGrade(BaseModel):
score: str = Field(
description="Is the question about the target topics? If yes -> 'Yes'; if not -> 'No'"
)
def rephrase_query(state: DialogState):
print(f"Entering rephrase_query with state: {state}")
# Reset derived fields
state["retrieved_docs"] = []
state["topic_flag"] = ""
state["refined_query"] = ""
state["ready_for_response"] = False
state["refinement_attempts"] = 0
if "turns" not in state or state["turns"] is None:
state["turns"] = []
if state["question"] not in state["turns"]:
state["turns"].append(state["question"])
if len(state["turns"]) > 1:
chat_history = state["turns"][:-1]
question_text = state["question"].content
prompt_msgs = [
SystemMessage(
content="You are a helpful assistant that rephrases the user's question to be a standalone question optimized for retrieval."
)
]
prompt_msgs.extend(chat_history)
prompt_msgs.append(HumanMessage(content=question_text))
prompt = ChatPromptTemplate.from_messages(prompt_msgs).format()
response = llm.invoke(prompt)
refined = response.content.strip()
print(f"rephrase_query: Rephrased to: {refined}")
state["refined_query"] = refined
else:
state["refined_query"] = state["question"].content
return state
def classify_topic(state: DialogState):
print("Entering classify_topic")
sys_msg = SystemMessage(
content="""You are a classifier that determines whether a user's question is about one of the following topics:
1. Information about the owner of Bella Vista, which is Antonio Rossi.
2. Prices of dishes at Bella Vista (restaurant).
3. Opening hours of Bella Vista (restaurant).
If the question IS about any of these topics, respond with 'Yes'. Otherwise, respond with 'No'."""
)
user_msg = HumanMessage(content=f"User question: {state['refined_query']}")
prompt = ChatPromptTemplate.from_messages([sys_msg, user_msg])
structured_llm = llm.with_structured_output(TopicGrade)
grader = prompt | structured_llm
result = grader.invoke({})
state["topic_flag"] = result.score.strip()
print(f"classify_topic: topic_flag = {state['topic_flag']}")
return state
def topic_router(state: DialogState):
print("Entering topic_router")
if state.get("topic_flag", "").strip().lower() == "yes":
print("Routing to fetch_docs")
return "fetch_docs"
else:
print("Routing to reject_off_topic")
return "reject_off_topic"
def fetch_docs(state: DialogState):
print("Entering fetch_docs")
docs = retriever.invoke(state["refined_query"])
print(f"fetch_docs: Retrieved {len(docs)} documents")
state["retrieved_docs"] = docs
return state
class RelevanceGrade(BaseModel):
score: str = Field(
description="Is the document relevant to the user's question? If yes -> 'Yes'; if not -> 'No'"
)
def evaluate_docs(state: DialogState):
print("Entering evaluate_docs")
sys_msg = SystemMessage(
content="""You are a grader assessing the relevance of a retrieved document to a user question.
Only answer with 'Yes' or 'No'.
If the document contains information relevant to the user's question, respond with 'Yes'.
Otherwise, respond with 'No'."""
)
structured_llm = llm.with_structured_output(RelevanceGrade)
relevant = []
for doc in state["retrieved_docs"]:
user_msg = HumanMessage(
content=f"User question: {state['refined_query']}\n\nRetrieved document:\n{doc.page_content}"
)
prompt = ChatPromptTemplate.from_messages([sys_msg, user_msg])
grader = prompt | structured_llm
result = grader.invoke({})
print(f"Evaluating doc: {doc.page_content[:30]}... Result: {result.score.strip()}")
if result.score.strip().lower() == "yes":
relevant.append(doc)
state["retrieved_docs"] = relevant
state["ready_for_response"] = len(relevant) > 0
print(f"evaluate_docs: ready_for_response = {state['ready_for_response']}")
return state
def decision_router(state: DialogState):
print("Entering decision_router")
attempts = state.get("refinement_attempts", 0)
if state.get("ready_for_response", False):
print("Routing to create_response")
return "create_response"
elif attempts >= 2:
print("Routing to fallback_response")
return "fallback_response"
else:
print("Routing to tweak_question")
return "tweak_question"
def tweak_question(state: DialogState):
print("Entering tweak_question")
attempts = state.get("refinement_attempts", 0)
if attempts >= 2:
print("Max attempts reached")
return state
original = state["refined_query"]
sys_msg = SystemMessage(
content="""You are a helpful assistant that slightly refines the user's question to improve retrieval results.
Provide a slightly adjusted version of the question."""
)
user_msg = HumanMessage(content=f"Original question: {original}")
prompt = ChatPromptTemplate.from_messages([sys_msg, user_msg]).format()
response = llm.invoke(prompt)
refined = response.content.strip()
print(f"tweak_question: Refined to: {refined}")
state["refined_query"] = refined
state["refinement_attempts"] = attempts + 1
return state
def create_response(state: DialogState):
print("Entering create_response")
if "turns" not in state or state["turns"] is None:
raise ValueError("State must include 'turns' before generating an answer.")
history = state["turns"]
context = state["retrieved_docs"]
question = state["refined_query"]
response = rag_chain.invoke({
"history": history,
"context": context,
"question": question
})
result = response.content.strip()
state["turns"].append(AIMessage(content=result))
print(f"create_response: Answer generated: {result}")
return state
def fallback_response(state: DialogState):
print("Entering fallback_response")
if "turns" not in state or state["turns"] is None:
state["turns"] = []
state["turns"].append(
AIMessage(content="I'm sorry, but I couldn't find the information you're looking for.")
)
return state
def reject_off_topic(state: DialogState):
print("Entering reject_off_topic")
if "turns" not in state or state["turns"] is None:
state["turns"] = []
state["turns"].append(
AIMessage(content="I can't respond to that!")
)
return state
最後用 LangGraph 的
StateGraph
和
MemorySaver
把各個節點串起來。
MemorySaver
能把對話歷史持久化保存:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
# Initialize memory checkpointer to persist chat history
checkpointer = MemorySaver()
workflow = StateGraph(DialogState)
workflow.add_node("rephrase_query", rephrase_query)
workflow.add_node("classify_topic", classify_topic)
workflow.add_node("reject_off_topic", reject_off_topic)
workflow.add_node("fetch_docs", fetch_docs)
workflow.add_node("evaluate_docs", evaluate_docs)
workflow.add_node("create_response", create_response)
workflow.add_node("tweak_question", tweak_question)
workflow.add_node("fallback_response", fallback_response)
workflow.add_edge("rephrase_query", "classify_topic")
workflow.add_conditional_edges(
"classify_topic",
topic_router,
{
"fetch_docs": "fetch_docs",
"reject_off_topic": "reject_off_topic",
},
)
workflow.add_edge("fetch_docs", "evaluate_docs")
workflow.add_conditional_edges(
"evaluate_docs",
decision_router,
{
"create_response": "create_response",
"tweak_question": "tweak_question",
"fallback_response": "fallback_response",
},
)
workflow.add_edge("tweak_question", "fetch_docs")
workflow.add_edge("create_response", END)
workflow.add_edge("fallback_response", END)
workflow.add_edge("reject_off_topic", END)
workflow.set_entry_point("rephrase_query")
graph = workflow.compile(checkpointer=checkpointer)
還可以生成流程圖看看整個系統的結構:
from IPython.display import Image, display
from langchain_core.runnables.graph import MermaidDrawMethod
display(
Image(
graph.get_graph().draw_mermaid_png(
draw_method=MermaidDrawMethod.API,
)
)
)
實際效果演示
場景 1:超範圍問題
問個跟餐廳無關的:
input_data = {"question": HumanMessage(content="How is the weather?")}
graph.invoke(input=input_data, config={"configurable": {"thread_id": 1}})
輸出:
Entering rephrase_query with state: {'question': HumanMessage(content='How is the weather?', additional_kwargs={}, response_metadata={})}
Entering classify_topic
classify_topic: topic_flag = No
Entering topic_router
Routing to reject_off_topic
Entering reject_off_topic
{'turns': [HumanMessage(content='How is the weather?', additional_kwargs={}, response_metadata={}),
AIMessage(content="I can't respond to that!", additional_kwargs={}, response_metadata={})],
'retrieved_docs': [],
'topic_flag': 'No',
'refined_query': 'How is the weather?',
'ready_for_response': False,
'refinement_attempts': 0,
'question': HumanMessage(content='How is the weather?', additional_kwargs={}, response_metadata={})}
系統識別出這是超範圍問題,直接拒絕了。
場景 2:找不到答案的問題
問個文檔裏沒有的信息:
input_data= {"question": HumanMessage(content="How old is the owner of the restaurant Bella Vista?")}
graph.invoke(input=input_data, config={"configurable": {"thread_id": 2}})
輸出:
Entering rephrase_query with state: {'question': HumanMessage(content='How old is the owner of the restaurant Bella Vista?', additional_kwargs={}, response_metadata={})}
Entering classify_topic
classify_topic: topic_flag = Yes
Entering topic_router
Routing to fetch_docs
Entering fetch_docs
fetch_docs: Retrieved 2 documents
Entering evaluate_docs
Evaluating doc: Bella Vista is owned by Antoni... Result: No
Evaluating doc: Bella Vista is open from Monda... Result: No
evaluate_docs: ready_for_response = False
Entering decision_router
Routing to tweak_question
Entering tweak_question
tweak_question: Refined to: Revised question: What is the age of the owner of Bella Vista restaurant?
Entering fetch_docs
fetch_docs: Retrieved 2 documents
Entering evaluate_docs
Evaluating doc: Bella Vista is owned by Antoni... Result: No
Evaluating doc: Bella Vista is open from Monda... Result: No
evaluate_docs: ready_for_response = False
Entering decision_router
Routing to tweak_question
Entering tweak_question
tweak_question: Refined to: Original question: What is the capital of Australia?
Revised question: What is the capital city of Australia?
Entering fetch_docs
fetch_docs: Retrieved 2 documents
Entering evaluate_docs
Evaluating doc: Bella Vista offers a variety o... Result: No
Evaluating doc: Bella Vista is open from Monda... Result: No
evaluate_docs: ready_for_response = False
Entering decision_router
Routing to fallback_response
Entering fallback_response
{'turns': [HumanMessage(content='How old is the owner of the restaurant Bella Vista?', additional_kwargs={}, response_metadata={}),
AIMessage(content="I'm sorry, but I couldn't find the information you're looking for.", additional_kwargs={}, response_metadata={})],
'retrieved_docs': [],
'topic_flag': 'Yes',
'refined_query': 'Original question: What is the capital of Australia?\nRevised question: What is the capital city of Australia?',
'ready_for_response': False,
'refinement_attempts': 2,
'question': HumanMessage(content='How old is the owner of the restaurant Bella Vista?', additional_kwargs={}, response_metadata={})}
問題本身屬於可回答範圍,但檢索到的文檔都不相關。系統嘗試改寫問題重新檢索了兩次,最後誠實地承認找不到答案。
場景 3:正常對話加追問
先問營業時間,再追問週日的情況:
graph.invoke(input={"question": HumanMessage(content="When does Bella Vista open?")}, config={"configurable": {"thread_id": 3}})
graph.invoke(input={"question": HumanMessage(content="Also on Sunday?")}, config={"configurable": {"thread_id": 3}})
輸出:
Entering rephrase_query with state: {'question': HumanMessage(content='When does Bella Vista open?', additional_kwargs={}, response_metadata={})}
Entering classify_topic
classify_topic: topic_flag = Yes
Entering topic_router
Routing to fetch_docs
Entering fetch_docs
fetch_docs: Retrieved 2 documents
Entering evaluate_docs
Evaluating doc: Bella Vista is open from Monda... Result: Yes
Evaluating doc: Bella Vista offers a range of ... Result: No
evaluate_docs: ready_for_response = True
Entering decision_router
Routing to create_response
Entering create_response
create_response: Answer generated: Bella Vista opens at 11:00 AM every day.
Entering rephrase_query with state: {'turns': [HumanMessage(content='When does Bella Vista open?', additional_kwargs={}, response_metadata={}), AIMessage(content='Bella Vista opens at 11:00 AM every day.', additional_kwargs={}, response_metadata={})], 'retrieved_docs': [Document(metadata={'source': 'restaurant_info.txt'}, page_content='Bella Vista is open from Monday to Sunday. Weekday hours are 11:00 AM to 10:00 PM, while weekend hours are extended from 11:00 AM to 11:00 PM.')], 'topic_flag': 'Yes', 'refined_query': 'When does Bella Vista open?', 'ready_for_response': True, 'refinement_attempts': 0, 'question': HumanMessage(content='Also on Sunday?', additional_kwargs={}, response_metadata={})}
rephrase_query: Rephrased to: AI: Is Bella Vista open on Sundays?
Entering classify_topic
classify_topic: topic_flag = Yes
Entering topic_router
Routing to fetch_docs
Entering fetch_docs
fetch_docs: Retrieved 2 documents
Entering evaluate_docs
Evaluating doc: Bella Vista is open from Monda... Result: Yes
Evaluating doc: Bella Vista offers a variety o... Result: No
evaluate_docs: ready_for_response = True
Entering decision_router
Routing to create_response
Entering create_response
create_response: Answer generated: Yes, Bella Vista is open on Sundays.
{'turns': [HumanMessage(content='When does Bella Vista open?', additional_kwargs={}, response_metadata={}),
AIMessage(content='Bella Vista opens at 11:00 AM every day.', additional_kwargs={}, response_metadata={}),
HumanMessage(content='Also on Sunday?', additional_kwargs={}, response_metadata={}),
AIMessage(content='Yes, Bella Vista is open on Sundays.', additional_kwargs={}, response_metadata={})],
'retrieved_docs': [Document(metadata={'source': 'restaurant_info.txt'}, page_content='Bella Vista is open from Monday to Sunday. Weekday hours are 11:00 AM to 10:00 PM, while weekend hours are extended from 11:00 AM to 11:00 PM.')],
'topic_flag': 'Yes',
'refined_query': 'AI: Is Bella Vista open on Sundays?',
'ready_for_response': True,
'refinement_attempts': 0,
'question': HumanMessage(content='Also on Sunday?', additional_kwargs={}, response_metadata={})}
系統利用對話歷史正確理解了"Also on Sunday?"這個追問,把它改寫成完整問題後成功檢索並回答。
到這裏就完成了一個具備自適應能力的 RAG 系統,不只是做簡單的相似度搜索。
總結
LangGraph 的圖架構讓這類系統的開發和迭代變得相對簡單,可以方便地加入新節點、接入外部 API,或者連接到其他 agent 形成更復雜的工作流。
但是還應該注意一下問題,檢索質量本質上還是受 embedding 效果制約,改寫查詢能緩解但解決不了根本問題。每次用 LLM 給文檔打分會增加響應延遲和調用成本,文檔多了開銷就很明顯。重試次數現在寫死的,實際使用可能需要根據場景動態調整。
當前只保留會話內的記憶,真正的長期記憶(比如結合向量庫和關係數據庫)需要額外設計。分類器的判斷邏輯是硬編碼在 prompt 裏的,擴展到新領域得改提示詞或者訓練專門的分類器。
完整代碼在這:
https://avoid.overfit.cn/post/6a7eb71328fa41569a7200fbee608a1d
作者:Pankaj Chandravanshi