动态

详情 返回 返回

構建具備深度思考能力的 Agentic RAG 流水線,用於解決複雜查詢 - 动态 详情

原文地址:https://mp.weixin.qq.com/s/JnRcU-6wg6g9RjdVXe3fQA

很多 RAG 系統失敗,並不是因為 LLM 不夠聰明,而是因為它們的架構太簡單。它們試圖用線性的一次性方式,處理一個本質上循環、多步驟的問題。

許多複雜查詢需要推理、反思,以及何時行動的聰明決策,這與我們面對問題時如何檢索信息非常相似。這正是 RAG 流水線中引入“agent 驅動行為”的用武之地。下面看看一個典型的深度思考 RAG 流水線長什麼樣……

Deep Thinking RAG Pipeline

  1. Plan:首先,agent 將複雜用户查詢拆解成結構化的多步驟研究計劃,並決定每一步使用何種工具(內部文檔搜索或 web 搜索)。
  2. Retrieve:對每一步,執行自適應的多階段檢索漏斗,由一個 supervisor 動態選擇最佳搜索策略(vector、keyword 或 hybrid)。
  3. Refine:使用高精度 Cross-Encoder 對初始結果進行重排,並由 distiller agent 將最佳證據壓縮為簡潔的上下文。
  4. Reflect:每一步後,agent 總結當前發現並更新研究歷史,逐步構建對問題的累積理解。
  5. Critique:隨後,一個 policy agent 檢查這段歷史,策略性決策是繼續下一步、遇到死衚衕時修訂計劃,還是結束。
  6. Synthesize:研究完成後,最終的 agent 將來自所有來源的證據綜合為單一、全面且可引用的答案。

在這篇文章中,我們將實現完整的“深度思考 RAG 流水線”,並與基礎 RAG 流水線做對比,展示它是如何解決複雜的 multi-hop 查詢的。

所有代碼與理論都在我的 GitHub 倉庫:

GitHub - FareedKhan-dev/deep-thinking-rag: A Deep Thinking RAG Pipeline to Solve Complex Queries )

📘 目錄

  • 📘 目錄
  • 環境配置
  • 知識庫來源
  • 理解多源、多跳查詢
  • 構建一個會失敗的淺層 RAG 流水線
  • 定義中央智能體系統的 RAG 狀態
  • 戰略規劃與查詢制定
  • 使用工具感知規劃器分解問題
  • 使用查詢重寫智能體優化檢索
  • 通過元數據感知分塊提升精度
  • 創建多階段檢索漏斗
  • 使用監督器動態選擇策略
  • 利用混合、關鍵詞與語義搜索進行廣泛召回
  • 使用交叉編碼器重排器實現高精度
  • 通過上下文蒸餾進行綜合
  • 使用網絡搜索增強知識
  • 自我評估與控制流策略
  • 更新並反映累積研究歷史
  • B構建用於控制流的策略智能體
  • 定義圖節點
  • 定義條件邊
  • 連接深度思考 RAG 機器
  • 編譯與可視化迭代工作流
  • 運行深度思考流水線
  • 分析最終高質量答案
  • 並排對比
  • 評估框架與結果分析
  • 總結整個流水線
  • 使用馬爾可夫決策過程(MDP)學習策略

環境配置

在開始編寫 Deep RAG 流水線前,我們需要打好基礎,因為一個生產級 AI 系統不僅僅是最終算法,還包括在搭建時做出的深思熟慮的選擇。

我們將要實現的每個步驟,都會直接影響最終系統的有效性和可靠性。

當開始開發流水線並不斷試錯時,最好把配置定義為一個簡單的字典。等流程複雜起來,就可以直接回到這個字典,調整配置並觀察對整體性能的影響。

# Central Configuration Dictionary to manage all system parameters
config = {
    "data\_dir": "./data",                           # Directory to store raw and cleaned data
    "vector\_store\_dir": "./vector\_store",           # Directory to persist our vector store
    "llm\_provider": "openai",                       # The LLM provider we are using
    "reasoning\_llm": "gpt-4o",                      # The powerful model for planning and synthesis
    "fast\_llm": "gpt-4o-mini",                      # A faster, cheaper model for simpler tasks like the baseline RAG
    "embedding\_model": "text-embedding-3-small",    # The model for creating document embeddings
    "reranker\_model": "cross-encoder/ms-marco-MiniLM-L-6-v2", # The model for precision reranking
    "max\_reasoning\_iterations": 7,                  # A safeguard to prevent the agent from getting into an infinite loop
    "top\_k\_retrieval": 10,                          # Number of documents for initial broad recall
    "top\_n\_rerank": 3,                              # Number of documents to keep after precision reranking
}

這些鍵大都很好理解,但有三個值得強調:

  • llm_provider:我們使用的 LLM 提供方,這裏用的是 OpenAI。之所以選擇 OpenAI,是因為在 LangChain 中我們可以很容易地切換模型和提供方;你也可以選擇適合自己的,比如 Ollama
  • reasoning_llm :在整個系統裏它必須是最強的,因為要承擔規劃與綜合。
  • fast_llm:用在更簡單的任務上(比如 baseline RAG),應更快更省。

接下來導入流水線會用到的庫,並把 API keys 設為環境變量,避免在代碼中暴露。

import os
import re
import json
from getpass import getpass
from pprint import pprint
import uuid
from typing importList, Dict, TypedDict, Literal, Optional

def\_set\_env(var: str):
    ifnot os.environ.get(var):
        os.environ[var] = getpass(f"Enter your {var}: ")

\_set\_env("OPENAI\_API\_KEY")
\_set\_env("LANGSMITH\_API\_KEY")
\_set\_env("TAVILY\_API\_KEY")

os.environ["LANGSMITH\_TRACING"] = "true"
os.environ["LANGSMITH\_PROJECT"] = "Advanced-Deep-Thinking-RAG"

我們同時啓用了 LangSmith 的 tracing。在一個 agentic 系統裏,工作流複雜且循環,tracing 並非可有可無,而是很重要。它幫助你可視化內部過程,更容易調試 agent 的思考路徑。

知識庫來源

一個生產級 RAG 系統需要既複雜又有挑戰性的知識庫,才能真正體現其有效性。我們將使用 NVIDIA 的 2023 年 10-K 報告,這是一份超過百頁的文件,詳述公司的業務運營、財務表現和風險因素披露。

Sourcing the Knowledge Base

首先實現一個自定義函數,直接從 SEC EDGAR 數據庫下載 10-K 報告,解析原始 HTML,並轉換成乾淨、結構化的文本,供我們的 RAG 流水線攝取。

import requests
from bs4 import BeautifulSoup
from langchain.docstore.document import Document

defdownload\_and\_parse\_10k(url, doc\_path\_raw, doc\_path\_clean):
    if os.path.exists(doc\_path\_clean):
        print(f"Cleaned 10-K file already exists at: {doc\_path\_clean}")
        return

    print(f"Downloading 10-K filing from {url}...")
    headers = {'User-Agent': 'Mozilla/5.0'}
    response = requests.get(url, headers=headers)
    response.raise\_for\_status()

    withopen(doc\_path\_raw, 'w', encoding='utf-8') as f:
        f.write(response.text)
    print(f"Raw document saved to {doc\_path\_raw}")

    soup = BeautifulSoup(response.content, 'html.parser')

    text = ''
    for p in soup.find\_all(['p', 'div', 'span']):
        text += p.get\_text(strip=True) + '\\n\\n'

    clean\_text = re.sub(r'\\n{3,}', '\\n\\n', text).strip()
    clean\_text = re.sub(r'\\s{2,}', ' ', clean\_text).strip()

    withopen(doc\_path\_clean, 'w', encoding='utf-8') as f:
        f.write(clean\_text)
    print(f"Cleaned text content extracted and saved to {doc\_path\_clean}")

這段代碼很直觀,使用
beautifulsoup4
解析 HTML 並提取文本,可方便地在 HTML 結構中導航,獲取有效信息,忽略腳本或樣式等無關元素。

現在執行看看效果。

print("Downloading and parsing NVIDIA's 2023 10-K filing...")
download\_and\_parse\_10k(url\_10k, doc\_path\_raw, doc\_path\_clean)

with open(doc\_path\_clean, 'r', encoding='utf-8') as f:
    print("\\n--- Sample content from cleaned 10-K ---")
    print(f.read(1000) + "...")
#### OUTPUT ####
Downloading and parsing NVIDIA 2023 10-K filing...
Successfully downloaded 10-K filing from https://www.sec.gov/Archives/edgar/data/1045810/000104581023000017/nvda-20230129.htm
Raw document saved to ./data/nvda\_10k\_2023\_raw.html
Cleaned text content extracted and saved to ./data/nvda\_10k\_2023\_clean.txt

# --- Sample content from cleaned 10-K ---
Item 1. Business. 
 OVERVIEW 
 NVIDIA is the pioneer of accelerated computing. We are a full-stack computing company with a platform strategy that brings together hardware, systems, software, algorithms, libraries, and services to create unique value for the markets we serve. Our work in accelerated computing and AI is reshaping the worlds largest industries and profoundly impacting society. 
 Founded in 1993, we started as a PC graphics chip company, inventing the graphics processing unit, or GPU. The GPU was essential for the growth of the PC gaming market and has since been repurposed to revolutionize computer graphics, high performance computing, or HPC, and AI. 
 The programmability of our GPUs made them ...

我們調用函數,把內容存到 txt 文件,作為後續 RAG 的上下文。運行上述代碼後會自動下載,並可預覽樣例。

理解多源、多跳查詢

為了測試我們實現的流水線,並與基礎 RAG 對比,我們需要一個非常複雜的查詢,覆蓋我們所用文檔的不同方面。

Our Complex Query:

"Based on NVIDIA's 2023 10-K filing, identify their key risks related to
competition. Then, find recent news (post-filing, from 2024) about AMD's
AI chip strategy and explain how this new strategy directly addresses or
exacerbates one of NVIDIA's stated risks."

為什麼這個查詢會難倒標準 RAG 流水線?

  1. Multi-Hop 推理:它不能一步完成。系統必須先識別風險,再找 AMD 的新聞,最後把二者綜合起來。
  2. 多源知識:所需信息在完全不同的地方。風險在內部靜態文檔(10-K)中,而 AMD 的新聞是外部的,需要實時 web 訪問。
  3. 綜合與分析:不是簡單列出事實,而是要解釋“後者如何加劇前者”,需要真正的綜合推理。

下一節我們實現一個基礎 RAG 流水線,看看它是如何失敗的。

構建一個會失敗的淺層 RAG 流水線

現在環境和挑戰性知識庫都準備好了,下一步是構建一個標準的“vanilla” RAG 流水線。這很重要……

先從最簡單可行的方案開始,然後把複雜查詢在其上運行,觀察到底是如何以及為何失敗。

我們將做如下事情:

Shallow RAG Pipeline

  • 加載並切分文檔:讀取清洗後的 10-K,並按固定大小切片——這是常見但語義上“天真”的方法。
  • 創建 vector store:對這些切片做 embedding,並用 ChromaDB 建索引,支持基礎語義搜索。
  • 組裝 RAG Chain:使用 LangChain Expression Language(LCEL)把 retriever、prompt 模板和 LLM 串起來,形成線性流程。
  • 演示關鍵失敗點:用我們的多跳多源查詢去執行,分析其不充分的回答。

先加載清洗後的文檔並切分。我們用 LangChain 的 RecursiveCharacterTextSplitter

from langchain\_community.document\_loaders import TextLoader
from langchain.text\_splitter import RecursiveCharacterTextSplitter

print("Loading and chunking the document...")
loader = TextLoader(doc\_path\_clean, encoding='utf-8')
documents = loader.load()

text\_splitter = RecursiveCharacterTextSplitter(chunk\_size=1000, chunk\_overlap=150)
doc\_chunks = text\_splitter.split\_documents(documents)

print(f"Document loaded and split into {len(doc\_chunks)} chunks.")
#### OUTPUT ####
Loading and chunking the document...
Document loaded and split into 378 chunks.

有了 378 個 chunk,下一步是讓它們可檢索——創建向量並存入數據庫。我們用
ChromaDB
作為 vector store,並用 OpenAI 的 text-embedding-3-small
作為 embedding 模型(配置中已定義)。

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

print("Creating baseline vector store...")
embedding_function = OpenAIEmbeddings(model=config['embedding_model'])

baseline_vector_store = Chroma.from_documents(
    documents=doc_chunks,
    embedding=embedding_function
)
baseline_retriever = baseline_vector_store.as_retriever(search_kwargs={"k": 3})

print(f"Vector store created with {baseline_vector_store._collection.count()} embeddings.")
#### OUTPUT ####
Creating baseline vector store...
Vector store created with 378 embeddings.

Chroma.from_documents
會組織以上過程,把向量存入可檢索的索引。最後用 LCEL 把它們裝配成單一可運行的 RAG chain。數據流:用户問題 -> retriever -> prompt -> LLM。

from langchain\_core.prompts import ChatPromptTemplate
from langchain\_openai import ChatOpenAI
from langchain\_core.runnable import RunnablePassthrough
from langchain\_core.output\_parsers import StrOutputParser

template = """You are an AI financial analyst. Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from\_template(template)
llm = ChatOpenAI(model=config["fast\_llm"], temperature=0)

defformat\_docs(docs):
    return"\\n\\n---\\n\\n".join(doc.page\_content for doc in docs)

baseline\_rag\_chain = (
    {"context": baseline\_retriever | format\_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

注意第一步是個字典。context
由子鏈生成:輸入問題 -> baseline_retriever
-> format_docs
;而 question
則原樣透傳(RunnablePassthrough
)。

運行看看哪裏會失敗。

from rich.console import Console
from rich.markdown import Markdown

console = Console()
complex\_query\_adv = "Based on NVIDIA's 2023 10-K filing, identify their key risks related to competition. Then, find recent news (post-filing, from 2024) about AMD's AI chip strategy and explain how this new strategy directly addresses or exacerbates one of NVIDIA's stated risks."

print("Executing complex query on the baseline RAG chain...")
baseline\_result = baseline\_rag\_chain.invoke(complex\_query\_adv)

console.print("\\n--- BASELINE RAG FAILED OUTPUT ---")
console.print(Markdown(baseline\_result))
#### OUTPUT ####
Executing complex query on the baseline RAG chain...

--- BASELINE RAG FAILED OUTPUT ---
Based on the provided context, NVIDIA operates in an intensely competitive semiconductor
industry and faces competition from companies like AMD. The context mentions
that the industry is characterized by rapid technological change. However, the provided documents do not contain any specific information about AMD's recent AI chip strategy from 2024 or how it might impact NVIDIA's stated risks.

可以看到三個明顯問題:

  • 語境不相關:retriever 抓來一些“泛泛的 NVIDIA/competition/AMD”段落,卻沒有 2024 年 AMD 策略的具體細節。
  • 信息缺失:2023 年的數據不可能覆蓋 2024 年事件,系統沒有意識到自己“缺關鍵信息”。
  • 無規劃與工具使用:把複雜問題當成簡單問答,不能拆分步驟,也不會用 web 搜索來補齊。
系統失敗不是因為 LLM 笨,而是因為架構過於簡單。它用線性的一次性流程,試圖解決一個循環的多步驟問題。

理解了基礎 RAG 的問題後,接下來開始實現深度思考的方法論,看看如何解決複雜查詢。

定義中央智能體系統的 RAG 狀態

要構建推理 agent,首先需要管理它的“狀態”。簡單 RAG chain 的每一步都是無狀態的,但……

智能的 agent 需要“記憶”。它需要記住最初的問題、它制定的計劃、以及迄今為止收集到的證據。

RAG State

RAGState
將作為中央記憶,在我們的 LangGraph 工作流中在各節點之間傳遞。首先定義數據結構,從最基本的構件開始:研究計劃中的單一步驟。

我們希望定義 agent 計劃的原子單元。每個 Step
不僅要包含一個待回答的子問題,還要包含其背後的理由,尤其是指定要用的工具。這迫使 agent 的規劃過程明確且結構化。

from langchain\_core.documents import Document
from langchain\_core.pydantic\_v1 import BaseModel, Field

class Step(BaseModel):
    sub\_question: str = Field(description="A specific, answerable question for this step.")
    justification: str = Field(description="A brief explanation of why this step is necessary to answer the main query.")
    tool: Literal["search\_10k", "search\_web"] = Field(description="The tool to use for this step.")
    keywords: List[str] = Field(description="A list of critical keywords for searching relevant document sections.")
    document\_section: Optional[str] = Field(description="A likely document section title (e.g., 'Item 1A. Risk Factors') to search within. Only for 'search\_10k' tool.")

Step
類(基於 Pydantic BaseModel)為 Planner Agent 輸出提供嚴格契約。tool: Literal[...] 強制 LLM 明確在內部知識(search_10k)與外部知識(search_web)之間做出選擇。

這種結構化輸出比解析自然語言計劃可靠得多。

定義了單個 Step後,需要一個容器保存整個步驟序列。創建 Plan類,它是 Step對象的列表,代表 agent 端到端的研究策略。

class Plan(BaseModel):
    steps: List[Step] = Field(description="A detailed, multi-step plan to answer the user's query.")

Plan類為整個研究過程提供結構。調用 Planner Agent 時,我們會要求返回符合該 schema 的 JSON 對象。這樣在任何檢索行動前,agent 的策略都是清晰、按序的,且機器可讀。

執行計劃時,agent 需要記住自己學到了什麼。定義 PastStep
字典,存儲每個已完成步驟的結果,構成 agent 的“研究歷史”或“實驗手記”。

class PastStep(TypedDict):
    step\_index: int
    sub\_question: str
    retrieved\_docs: List[Document]
    summary: str

該結構對 agent 的自我批判(self-critique)循環至關重要。每一步後,我們填充這個字典並加入 state。agent 就能通過回顧這份逐步增長的摘要列表,理解自己已知/未知,決定是否已具備完成任務所需的信息。

最後,把這些拼裝為主 RAGState
字典。它在整個圖中流動,包含原始問題、完整計劃、過往步驟歷史,以及當前正在執行步驟的中間數據。

class RAGState(TypedDict):
    original\_question: str
    plan: Plan
    past\_steps: List[PastStep]
    current\_step\_index: int
    retrieved\_docs: List[Document]
    reranked\_docs: List[Document]
    synthesized\_context: str
    final\_answer: str

RAGState就是 agent 的“心智”。圖中的每個節點接受此字典為輸入,並返回更新後的版本。

例如,plan_node會填充 plan字段,retrieval_node會填充 retrieved_docs,以此類推。這個共享、持久的狀態使複雜的迭代推理成為可能,這是簡單 RAG 鏈所缺失的。

準備好 agent 的記憶藍圖後,開始構建第一個認知組件:Planner Agent。

戰略規劃與查詢制定

Strategic Planning

本節分為三步工程:

  • Tool-Aware Planner:構建 LLM 驅動的 agent,把用户查詢分解為結構化 Plan,併為每步選擇工具。
  • Query Rewriter:創建專門 agent,把 planner 的簡單子問題改寫為高效檢索的查詢。
  • Metadata-Aware Chunking:對源文檔重新處理,增加 section 級 metadata,這是實現高精度過濾檢索的關鍵。

使用工具感知規劃器分解問題

Decomposing Step

不能把完整問題扔給數據庫,指望運氣。要教會 agent 將問題拆成更小、更易處理的部分。

為此,我們創建專門的 Planner Agent,並給出非常清晰的指令(prompt),告訴它該如何工作。

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from rich.pretty import pprint as rprint

planner_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are an expert research planner. Your task is to create a clear, multi-step plan to answer a complex user query by retrieving information from multiple sources.
You have two tools available:
1. `search_10k`: Use this to search for information within NVIDIA's 2023 10-K financial filing. This is best for historical facts, financial data, and stated company policies or risks from that specific time period.
2. `search_web`: Use this to search the public internet for recent news, competitor information, or any topic that is not specific to NVIDIA's 2023 10-K.
Decompose the user's query into a series of simple, sequential sub-questions. For each step, decide which tool is more appropriate.
For `search_10k` steps, also identify the most likely section of the 10-K (e.g., 'Item 1A. Risk Factors', 'Item 7. Management's Discussion and Analysis...').
It is critical to use the exact section titles found in a 10-K filing where possible."""),
    ("human", "User Query: {question}")
])

這裏給 LLM 一個新 persona:expert research planner。明確告知它有兩個工具(search_10k、search_web),以及各自適用場景——這就是“工具感知(tool-aware)”的部分。

我們要求它輸出一個能直接映射到系統能力的計劃,而不是模糊表述。

接下來初始化 reasoning 模型,並與 prompt 串接。關鍵是告訴 LLM 最終輸出必須符合 Pydantic 的 Plan
類格式,確保結構化、可預測。

reasoning\_llm = ChatOpenAI(model=config["reasoning\_llm"], temperature=0)

planner\_agent = planner\_prompt | reasoning\_llm.with\_structured\_output(Plan)
print("Tool-Aware Planner Agent created successfully.")

print("\\n--- Testing Planner Agent ---")
test\_plan = planner\_agent.invoke({"question": complex\_query\_adv})

rprint(test\_plan)

我們把 planner_prompt 通過 reasoning_llm,再用 .with_structured_output(Plan),讓 LangChain 用函數調用能力,返回完全匹配 Plan schema 的 JSON 對象,比解析純文本可靠得多。

測試輸出如下:

#### OUTPUT ####
Tool-Aware Planner Agent created successfully.

--- Testing Planner Agent ---
Plan(
│   steps=[
│   │   Step(
│   │   │   sub\_question="What are the key risks related to competition as stated in NVIDIA's 2023 10-K filing?",
│   │   │   justification="This step is necessary to extract the foundational information about competitive risks directly from the source document as requested by the user.",
│   │   │   tool='search\_10k',
│   │   │   keywords=['competition', 'risk factors', 'semiconductor industry', 'competitors'],
│   │   │   document\_section='Item 1A. Risk Factors'
│   │   ),
│   │   Step(
│   │   │   sub\_question="What are the recent news and developments in AMD's AI chip strategy in 2024?",
│   │   │   justification="This step requires finding up-to-date, external information that is not available in the 2023 10-K filing. A web search is necessary to get the latest details on AMD's strategy.",
│   │   │   tool='search\_web',
│   │   │   keywords=['AMD', 'AI chip strategy', '2024', 'MI300X', 'Instinct accelerator'],
│   │   │   document\_section=None
│   │   )
│   ]
)

可以看到,agent 不僅給了一個清晰的 Plan,還正確識別出問題包含兩部分:

  1. 第一部分答案在 10-K 中,選了 search_10k,且正確猜測了可能的 section。
  2. 第二部分是“2024 年新聞”,10-K 中不可能有,正確選了 search_web。這説明我們的流程在“思考層面”已有希望。

使用查詢重寫智能體優化檢索

目前我們有一個包含好子問題的計劃。

但“有哪些風險?”這樣的問法並不是好的檢索查詢,太籠統。無論是向量數據庫還是 web 搜索,引擎更偏好具體、關鍵詞豐富的查詢。

Query Rewriting Agent

為此我們構建一個小而專的 agent:Query Rewriter。它的唯一工作是,把當前步驟的子問題,結合已知上下文,改寫為更適合檢索的 query。

先設計 prompt:

from langchain\_core.output\_parsers import StrOutputParser

query\_rewriter\_prompt = ChatPromptTemplate.from\_messages([
    ("system", """You are a search query optimization expert. Your task is to rewrite a given sub-question into a highly effective search query for a vector database or web search engine, using keywords and context from the research plan.
The rewritten query should be specific, use terminology likely to be found in the target source (a financial 10-K or news articles), and be structured to retrieve the most relevant text snippets."""),
    ("human", "Current sub-question: {sub\_question}\\n\\nRelevant keywords from plan: {keywords}\\n\\nContext from past steps:\\n{past\_context}")
])

我們讓這個 agent 扮演“search query optimization expert”。它接收三類信息:sub_question、keywords、past_context,以此構造更強的查詢。

初始化 agent:

query\_rewriter\_agent = query\_rewriter\_prompt | reasoning\_llm | StrOutputParser()
print("Query Rewriter Agent created successfully.")

print("\\n--- Testing Query Rewriter Agent ---")

test\_sub\_q = "How does AMD's 2024 AI chip strategy potentially exacerbate the competitive risks identified in NVIDIA's 10-K?"
test\_keywords = ['impact', 'threaten', 'competitive pressure', 'market share', 'technological change']
test\_past\_context = "Step 1 Summary: NVIDIA's 10-K lists intense competition and rapid technological change as key risks. Step 2 Summary: AMD launched its MI300X AI accelerator in 2024 to directly compete with NVIDIA's H100."

rewritten\_q = query\_rewriter\_agent.invoke({
    "sub\_question": test\_sub\_q,
    "keywords": test\_keywords,
    "past\_context": test\_past\_context
})

print(f"Original sub-question: {test\_sub\_q}")
print(f"Rewritten Search Query: {rewritten\_q}")

結果如下:

#### OUTPUT ####
Query Rewriter Agent created successfully.

--- Testing Query Rewriter Agent ---
Original sub-question: How does AMD 2024 AI chip strategy potentially exacerbate the competitive risks identified in NVIDIA 10-K?
Rewritten Search Query: analysis of how AMD 2024 AI chip strategy, including products like the MI300X, exacerbates NVIDIA's stated competitive risks such as rapid technological change and market share erosion in the data center and AI semiconductor industry

原問題面向分析師;改寫後的查詢面向搜索引擎,包含更具體術語,如“MI300X”、“market share erosion”、“data center”等,這些都從關鍵詞和過往上下文中歸納出來。這樣的查詢更可能檢回準確文檔,提升系統準確與效率。

通過元數據感知分塊提升精度

Planner Agent 讓我們有了“額外線索”:它不僅説“找風險”,還提示“看 Item 1A. Risk Factors 這一節”。

但當前 retriever 用不上這個提示。vector store 只是 378 個 chunk 的“扁平列表”,不知道什麼是“section”。

Meta aware chunking

我們需要重建 chunks。這次,每個 chunk 都要加上“它來自 10-K 的哪一節”的 metadata 標籤。這樣 agent 就能執行更精確的“過濾檢索”。

首先,需要在原始文本中程序化定位每個 section 的起始。觀察文檔格式,每個主 section 以 “ITEM”+編號開頭(如“ITEM 1A”、“ITEM 7”),非常適合用正則。

section_pattern = r"(ITEM\\s+\\d[A-Z]?\\.\\s*.*?)(?=\\nITEM\\s+\\d[A-Z]?\\.|$)"

這條 pattern 用於檢測 section 標題,既要足夠靈活以適配多種格式,又要足夠具體避免誤抓。

應用該 pattern,把文檔切分為兩個列表:section 標題列表、對應內容列表。

raw\_text = documents[0].page\_content

section\_titles = re.findall(section\_pattern, raw\_text, re.IGNORECASE | re.DOTALL)
section\_titles = [title.strip().replace('\\\\n', ' ') for title in section\_titles]

sections\_content = re.split(section\_pattern, raw\_text, flags=re.IGNORECASE | re.DOTALL)
sections\_content = [content.strip() for content in sections\_content if content.strip() and not content.strip().lower().startswith('item ')]
print(f"Identified {len(section\_titles)} document sections.")
assert len(section\_titles) == len(sections\_content), "Mismatch between titles and content sections"

這是一種高效解析半結構化文檔的方法。用一次 findall獲得所有 section 標題,再用一次 split按標題切分全文。assert是健全性檢查,確保解析正確。

接着,將標題與內容逐一對應,生成最終帶 metadata 的 chunks。

import uuid

doc\_chunks\_with\_metadata = []

for i, content inenumerate(sections\_content):
    section\_title = section\_titles[i]
    section\_chunks = text\_splitter.split\_text(content)
    
    for chunk in section\_chunks:
        chunk\_id = str(uuid.uuid4())
        doc\_chunks\_with\_metadata.append(
            Document(
                page\_content=chunk,
                metadata={
                    "section": section\_title,
                    "source\_doc": doc\_path\_clean,
                    "id": chunk\_id
                }
            )
        )

print(f"Created {len(doc\_chunks\_with\_metadata)} chunks with section metadata.")
print("\\n--- Sample Chunk with Metadata ---")

sample\_chunk = next(c for c in doc\_chunks\_with\_metadata if"Risk Factors"in c.metadata.get("section", ""))
print(sample\_chunk)

核心在於:為每個 chunk 附加 metadata,將 section_title作為標籤。輸出如下:

#### OUTPUT ####
Processing document and adding metadata...
Identified 22 document sections.
Created 381 chunks with section metadata.

--- Sample Chunk with Metadata ---
Document(
│   page\_content='Our industry is intensely competitive. We operate in the semiconductor\\\\nindustry, which is intensely competitive and characterized by rapid\\\\ntechnological change and evolving industry standards. ...
│   metadata={
│   │   'section': 'Item 1A. Risk Factors.',
│   │   'source\_doc': './data/nvda\_10k\_2023\_clean.txt',
│   │   'id': '...'
│   }
)

看到 metadata
中的 'section': 'Item 1A. Risk Factors.'
了嗎?現在,當 agent 需要找“風險”時,可以對 retriever 説:“只在 section='Item 1A. Risk Factors' 的 chunks 中檢索”。一個簡單的改造,就讓檢索從“鈍器”變成“手術刀”。這是構建生產級 RAG 的關鍵原則。

創建多階段檢索漏斗

到目前為止,我們已經做了智能規劃,併為文檔添加了 metadata。現在構建系統的核心:複雜的檢索流水線。

簡單的一次性語義搜索已經不夠。生產級 agent 需要自適應、分階段的檢索流程。

Multi Stage Funnel (Created by Fareed Khan)

  • Retrieval Supervisor:構建 supervisor agent 作為動態路由器,分析每個子問題並選擇最佳檢索策略(vector、keyword 或 hybrid)。
  • 第一階段(廣覆蓋 Recall):實現 supervisor 可選的不同檢索策略,儘可能廣泛地捕獲潛在相關文檔。
  • 第二階段(高精度 Precision):使用 Cross-Encoder 模型對初始結果進行重排,去噪並將最相關文檔置頂。
  • 第三階段(綜合 Synthesis):創建 Distiller Agent 將 top 文檔壓縮為單一、簡潔的上下文。

使用監督器動態選擇策略

並非所有查詢都相同。比如“Compute & Networking 分部 2023 財年的 revenue 增長是多少?”包含非常具體的術語,keyword 搜索更合適;而“公司對市場競爭的整體態度如何?”則是概念性問題,semantic 搜索更優。

Supervisor Agent

我們不硬編碼策略,而是構建一個小而智能的 agent——Retrieval Supervisor。它的職責就是分析查詢,決定用哪種檢索方式。

先定義其輸出的結構:

class RetrievalDecision(BaseModel):
    strategy: Literal["vector\_search", "keyword\_search", "hybrid\_search"]
    justification: str

然後是 prompt:

retrieval\_supervisor\_prompt = ChatPromptTemplate.from\_messages([
    ("system", """You are a retrieval strategy expert. Based on the user's query, you must decide the best retrieval strategy.
You have three options:
1. \`vector\_search\`: Best for conceptual, semantic, or similarity-based queries.
2. \`keyword\_search\`: Best for queries with specific, exact terms, names, or codes (e.g., 'Item 1A', 'Hopper architecture').
3. \`hybrid\_search\`: A good default that combines both, but may be less precise than a targeted strategy."""),
    ("human", "User Query: {sub\_question}")
])

裝配該 agent 並測試:

retrieval\_supervisor\_agent = retrieval\_supervisor\_prompt | reasoning\_llm.with\_structured\_output(RetrievalDecision)
print("Retrieval Supervisor Agent created.")

print("\\n--- Testing Retrieval Supervisor Agent ---")
query1 = "revenue growth for the Compute & Networking segment in fiscal year 2023"
decision1 = retrieval\_supervisor\_agent.invoke({"sub\_question": query1})

print(f"Query: '{query1}'")
print(f"Decision: {decision1.strategy}, Justification: {decision1.justification}")

query2 = "general sentiment about market competition and technological innovation"
decision2 = retrieval\_supervisor\_agent.invoke({"sub\_question": query2})
print(f"\\nQuery: '{query2}'")
print(f"Decision: {decision2.strategy}, Justification: {decision2.justification}")
#### OUTPUT ####
Retrieval Supervisor Agent created.


# --- Testing Retrieval Supervisor Agent ---
Query: 'revenue growth for the Compute & Networking segment in fiscal year 2023'
Decision: keyword\_search, Justification: The query contains specific keywords like 'revenue growth', 'Compute & Networking', and 'fiscal year 2023' which are ideal for a keyword-based search to find exact financial figures.

Query: 'general sentiment about market competition and technological innovation'
Decision: vector\_search, Justification: This query is conceptual and seeks to understand sentiment and broader themes. Vector search is better suited to capture the semantic meaning of 'market competition' and 'technological innovation' rather than relying on exact keywords.

它能正確地為具體術語選 keyword_search
,為概念性問題選 vector_search
。動態決策比一刀切強得多。

利用混合、關鍵詞與語義搜索進行廣泛召回

有了 supervisor 選擇策略,我們需要實現這些策略。第一階段的目標是 Recall(廣覆蓋):儘可能捕獲所有潛在相關文檔,即使帶入一些噪聲也沒關係。

Broad Recall
我們實現三種搜索函數:

  1. Vector Search:標準語義搜索,升級為支持 metadata filter。
  2. Keyword Search(BM25):傳統且強大的算法,擅長匹配具體術語。
  3. Hybrid Search:結合二者,用 RRF(Reciprocal Rank Fusion)融合。

先用帶 metadata 的 chunks 創建一個高級 vector store。

import numpy as np
from rank\_bm25 import BM25Okapi

print("Creating advanced vector store with metadata...")

advanced\_vector\_store = Chroma.from\_documents(
    documents=doc\_chunks\_with\_metadata,
    embedding=embedding\_function
)
print(f"Advanced vector store created with {advanced\_vector\_store.\_collection.count()} embeddings.")

接着構建 BM25 的索引:

print("\\nBuilding BM25 index for keyword search...")

tokenized\_corpus = [doc.page\_content.split(" ") for doc in doc\_chunks\_with\_metadata]
doc\_ids = [doc.metadata["id"] for doc in doc\_chunks\_with\_metadata]
doc\_map = {doc.metadata["id"]: doc for doc in doc\_chunks\_with\_metadata}
bm25 = BM25Okapi(tokenized\_corpus)

定義三個檢索函數:

def vector\_search\_only(query: str, section\_filter: str = None, k: int = 10):
    filter\_dict = {"section": section\_filter} if section\_filter and"Unknown"notin section\_filter elseNone
    return advanced\_vector\_store.similarity\_search(query, k=k, filter=filter\_dict)

defbm25\_search\_only(query: str, k: int = 10):
    tokenized\_query = query.split(" ")
    bm25\_scores = bm25.get\_scores(tokenized\_query)
    top\_k\_indices = np.argsort(bm25\_scores)[::-1][:k]
    return [doc\_map[doc\_ids[i]] for i in top\_k\_indices]

defhybrid\_search(query: str, section\_filter: str = None, k: int = 10):
    bm25\_docs = bm25\_search\_only(query, k=k)
    semantic\_docs = vector\_search\_only(query, section\_filter=section\_filter, k=k)
    all\_docs = {doc.metadata["id"]: doc for doc in bm25\_docs + semantic\_docs}.values()
    ranked\_lists = [[doc.metadata["id"] for doc in bm25\_docs], [doc.metadata["id"] for doc in semantic\_docs]]
    
    rrf\_scores = {}
    for doc\_list in ranked\_lists:
        for i, doc\_id inenumerate(doc\_list):
            if doc\_id notin rrf\_scores:
                rrf\_scores[doc\_id] = 0
            rrf\_scores[doc\_id] += 1 / (i + 61)
    sorted\_doc\_ids = sorted(rrf\_scores.keys(), key=lambda x: rrf\_scores[x], reverse=True)
    final\_docs = [doc\_map[doc\_id] for doc\_id in sorted\_doc\_ids[:k]]
    return final\_docs

print("\\nAll retrieval strategy functions ready.")

快速測試 keyword 搜索是否能精確命中目標 section:

print("\\n--- Testing Keyword Search ---")
test\_query = "Item 1A. Risk Factors"
test\_results = bm25\_search\_only(test\_query)
print(f"Query: {test\_query}")
print(f"Found {len(test\_results)} documents. Top result section: {test\_results[0].metadata['section']}")
#### OUTPUT ####
Creating advanced vector store with metadata...
Advanced vector store created with 381 embeddings.

Building BM25 index for keyword search...
All retrieval strategy functions ready.

# --- Testing Keyword Search ---
Query: Item 1A. Risk Factors
Found 10 documents. Top result section: Item 1A. Risk Factors.

如預期,BM25 能精確、迅速地檢回 “Item 1A. Risk Factors” 相關文檔。當查詢包含具體關鍵詞(如 section 標題)時,supervisor 就可以選擇這一精準工具。

接下來進入高精度階段,進行重排。

使用交叉編碼器重排器實現高精度

第一階段的 Recall 能拿到 10 個“潛在相關”的文檔。

但“潛在相關”還不夠,直接把這 10 個 chunk 全餵給主推理 LLM 會既低效又有風險——成本高,還可能被噪聲干擾。

High Precision
我們需要 Precision 階段,用 Reranker 來從這 10 個候選中挑出最相關的少數。區別在於模型工作方式:

  1. 初始檢索用的是 bi-encoder(embedding 模型),分別對 query 與文檔編碼,速度快、適合海量搜索。
  2. Cross-Encoder 則將“query + 單個文檔”作為一對,一起輸入,做更深入的比較。它更慢,但更準。

我們要寫一個函數,把 10 個文檔用 Cross-Encoder 打分重排,只保留 config裏的 top 3。

初始化 Cross-Encoder 模型:

from sentence\_transformers import CrossEncoder

print("Initializing CrossEncoder reranker...")

reranker = CrossEncoder(config["reranker\_model"])

定義重排函數:

def rerank\_documents\_function(query: str, documents: List[Document]) -> List[Document]:
    if not documents: 
        return []
        
    pairs = [(query, doc.page\_content) for doc in documents]
    scores = reranker.predict(pairs)
    doc\_scores = list(zip(documents, scores))
    doc\_scores.sort(key=lambda x: x[1], reverse=True)
    reranked\_docs = [doc for doc, score in doc\_scores[:config["top\_n\_rerank"]]]
    return reranked\_docs

該函數用 cross-encoder 對每個(query, doc)對進行打分,排序後截取前 3,輸出短小而高相關的文檔列表,作為後續 agent 的完美上下文。這樣的漏斗式處理(先高召回,再高精度)是生產級 RAG 的關鍵。

通過上下文蒸餾進行綜合

現在我們有 10 -> 3 的高相關文檔,但仍然是三個獨立塊。為進一步精煉,再加入最後一道“壓縮”——Contextual Distillation:將前 3 個文檔“蒸餾”為一個簡潔、乾淨的段落,去除冗餘,構建一段信息密度極高的上下文。

Synthesization

這一步是針對文本處理,不負責回答問題。我們創建 Distiller Agent:

distiller\_prompt = ChatPromptTemplate.from\_messages([
    ("system", """You are a helpful assistant. Your task is to synthesize the following retrieved document snippets into a single, concise paragraph.
The goal is to provide a clear and coherent context that directly answers the question: '{question}'.
Focus on removing redundant information and organizing the content logically. Answer only with the synthesized context."""),
    ("human", "Retrieved Documents:\\n{context}")
])

distiller\_agent = distiller\_prompt | reasoning\_llm | StrOutputParser()
print("Contextual Distiller Agent created.")

在主循環中,每一步的流程將是:

  1. Supervisor:選擇檢索策略(vector/keyword/hybrid)。
  2. Recall:執行選擇的策略,取 top 10 文檔。
  3. Precision:用 rerank_documents_function取 top 3。
  4. Distillation:用 distiller_agent壓縮為單段精華。

這樣我們的證據質量達到最佳。下一步,給 agent “看向外部世界”的能力:web 檢索。

使用網絡搜索增強知識

目前的檢索漏斗很強,但有一個致命盲點:

它只能看到 2023 年 10-K 中的內容。而我們的挑戰需要“2024 年 AMD 的 AI 芯片策略”的最新新聞——這些在靜態知識庫中根本不存在。

真正的“深度思考” agent,必須意識到自身知識的邊界,並能到別處找答案。我們需要給它一扇“窗”。

Augemtation using Web

這一步我們為系統增加一個新工具:Web Search,使其從“文檔特定問答機器人”變成真正的多源研究助手。

我們使用 Tavily Search API——專為 LLM 構建的搜索引擎,返回乾淨、無廣告、相關的結果,非常適合 RAG;同時與 LangChain 集成順暢。

初始化 Tavily 搜索工具:

from langchain\_community.tools.tavily\_search import TavilySearchResults

web\_search\_tool = TavilySearchResults(k=3)

原始 API 響應需要包裝為標準的 Document
列表,以便與我們的 reranker、distiller 無縫銜接。寫一個小包裝函數:

def web_search_function(query: str) -> List[Document]:
    results = web_search_tool.invoke({"query": query})
    return [
        Document(
            page_content=res["content"],
            metadata={"source": res["url"]}
        ) for res in results
    ]

測試:

print("\n--- Testing Web Search Tool ---")
test_query_web = "AMD AI chip strategy 2024"
test_results_web = web_search_function(test\_query\_web)
print(f"Found {len(test_results_web)} results for query: '{test_query_web}'")
if test_results\_web:
    print(f"Top result snippet: {test_results_web[0].page_content[:250]}...")
#### OUTPUT ####
Web search tool (Tavily) initialized.

--- Testing Web Search Tool ---
Found 3 results for query: 'AMD AI chip strategy 2024'
Top result snippet: AMD has intensified its battle with Nvidia in the AI chip market with the release of the Instinct MI300X accelerator, a powerful GPU designed to challenge Nvidia's H100 in training and inference for large language models. Major cloud providers like Microsoft Azure and Oracle Cloud are adopting the MI300X, indicating strong market interest...

結果如願,找到了 3 篇相關網頁。摘要提到了 AMD “Instinct MI300X” 與 NVIDIA “H100”的對抗——正是解決第二部分問題所需的證據。現在 agent 擁有通往外部世界的窗口,planner 可以智能地決定何時使用它。下一步是讓 agent 能夠“反思”並決定何時結束研究。

自我評估與控制流策略

到目前為止,agent 能制定計劃、選擇工具,並執行復雜的檢索漏斗。但還缺少一個關鍵能力:對自身進展進行“思考”。盲目照搬計劃逐步執行的 agent 並非真正智能,需要一個自我批判機制。

Self Critique and Policy Making
每次研究步驟後,agent 都會停下來反思,比較新信息與既有知識,然後做出策略性決策:研究是否已完成,還是要繼續?

這個自我批判循環讓系統從腳本化工作流,躍升為自治 agent。它能判斷自己是否已經收集到足夠的證據,來有信心地回答用户問題。

我們將實現兩個專門 agent:

  1. Reflection Agent:讀取當前步驟的精煉上下文,寫一條簡潔的一句話摘要,加入“研究歷史”。
  2. Policy Agent:作為總指揮,在反思之後,審視整個歷史與最初計劃,做出關鍵決策:CONTINUE_PLAN或 FINISH。

更新並反映累積研究歷史

每完成一步(例如:檢索並蒸餾出 NVIDIA 的風險),不要直接進入下一步。需要把新知識整合到 agent 的記憶中。
Reflective Cumulative

構建 Reflection Agent:任務是讀入當前步驟的精煉上下文,寫一條事實性的一句話摘要,並把它添加到 RAGState的 past_steps中。

reflection\_prompt = ChatPromptTemplate.from\_messages([
    ("system", """You are a research assistant. Based on the retrieved context for the current sub-question, write a concise, one-sentence summary of the key findings.
This summary will be added to our research history. Be factual and to the point."""),
    ("human", "Current sub-question: {sub\_question}\\n\\nDistilled context:\\n{context}")
])

reflection\_agent = reflection\_prompt | reasoning\_llm | StrOutputParser()
print("Reflection Agent created.")

它是認知循環的重要組成:通過這些簡潔摘要,構建乾淨易讀的“研究歷史”,為下一個、也是最重要的 agent——策略決策者,提供輸入。

B構建用於控制流的策略智能體

這是 agent 自主性的“大腦”。在 reflection_agent
更新歷史後,Policy Agent 上場,作為總調度,查看:原始問題、初始計劃、已完成步驟摘要的全量歷史,做出高階策略決策。

Policy Agent
定義決策輸出結構:

class Decision(BaseModel):
    next\_action: Literal["CONTINUE\_PLAN", "FINISH"]
    justification: str

設計 prompt:

policy\_prompt = ChatPromptTemplate.from\_messages([
    ("system", """You are a master strategist. Your role is to analyze the research progress and decide the next action.
You have the original question, the initial plan, and a log of completed steps with their summaries.
- If the collected information in the Research History is sufficient to comprehensively answer the Original Question, decide to FINISH.
- Otherwise, if the plan is not yet complete, decide to CONTINUE\_PLAN."""),
    ("human", "Original Question: {question}\\n\\nInitial Plan:\\n{plan}\\n\\nResearch History (Completed Steps):\\n{history}")
])

policy\_agent = policy\_prompt | reasoning\_llm.with\_structured\_output(Decision)
print("Policy Agent created.")

測試兩個狀態:

print("\\n--- Testing Policy Agent (Incomplete State) ---")
plan\_str = json.dumps([s.dict() for s in test\_plan.steps])
incomplete\_history = "Step 1 Summary: NVIDIA's 10-K states that the semiconductor industry is intensely competitive and subject to rapid technological change."
decision1 = policy\_agent.invoke({"question": complex\_query\_adv, "plan": plan\_str, "history": incomplete\_history})
print(f"Decision: {decision1.next\_action}, Justification: {decision1.justification}")

print("\\n--- Testing Policy Agent (Complete State) ---")
complete\_history = incomplete\_history + "\\nStep 2 Summary: In 2024, AMD launched its MI300X accelerator to directly compete with NVIDIA in the AI chip market, gaining adoption from major cloud providers."
decision2 = policy\_agent.invoke({"question": complex\_query\_adv, "plan": plan\_str, "history": complete\_history})
print(f"Decision: {decision2.next\_action}, Justification: {decision2.justification}")
#### OUTPUT ####
Policy Agent created.

--- Testing Policy Agent (Incomplete State) ---
Decision: CONTINUE\_PLAN, Justification: The research has only identified NVIDIA's competitive risks from the 10-K. It has not yet gathered the required external information about AMD's 2024 strategy, which is the next step in the plan.

--- Testing Policy Agent (Complete State) ---
Decision: FINISH, Justification: The research history now contains comprehensive summaries of both NVIDIA's stated competitive risks and AMD's recent AI chip strategy. All necessary information has been gathered to perform the final synthesis and answer the user's question.

未完成狀態時,正確選擇 CONTINUE_PLAN;完成狀態時,正確選擇 FINISH。有了 policy_agent,我們具備自主系統的頭腦。接下來用 LangGraph 把所有組件串起來。

定義圖節點

我們已經設計好這些專門的 agent。現在要把它們變成工作流的“積木”。LangGraph 中的“節點(node)”就是幹這事的:每個節點是一個 Python 函數,完成一項具體工作,接收 RAGState
,更新並返回字典。

Graph Nodes

先寫一個工具函數,把研究歷史 past_steps格式化成易讀字符串,方便傳給 prompt:

def get_past_context_str(past_steps: List[PastStep]) -> str:
    return "\\\\n\\\\n".join([f"Step {s['step\_index']}: {s['sub\_question']}\\\\nSummary: {s['summary']}" for s in past\_steps])

第一個節點:plan_node,調用 planner_agent填充 plan字段。

def plan_node(state: RAGState) -> Dict:
    console.print("--- 🧠: Generating Plan ---")
    plan = planner_agent.invoke({"question": state["original_question"]})
    rprint(plan)
    return {"plan": plan, "current_step_index": 0, "past_steps": []}

接着是兩個檢索節點:內部文檔與 web。

def retrieval\_node(state: RAGState) -> Dict:
    current\_step\_index = state["current\_step\_index"]
    current\_step = state["plan"].steps[current\_step\_index]
    console.print(f"--- 🔍 : Retrieving from 10-K (Step {current\_step\_index + 1}: {current\_step.sub\_question}) ---")
    
    past\_context = get\_past\_context\_str(state['past\_steps'])
    rewritten\_query = query\_rewriter\_agent.invoke({
        "sub\_question": current\_step.sub\_question,
        "keywords": current\_step.keywords,
        "past\_context": past\_context
    })
    console.print(f"  Rewritten Query: {rewritten\_query}")
    
    retrieval\_decision = retrieval\_supervisor\_agent.invoke({"sub\_question": rewritten\_query})
    console.print(f"  Supervisor Decision: Use \`{retrieval\_decision.strategy}\`. Justification: {retrieval\_decision.justification}")

    if retrieval\_decision.strategy == 'vector\_search':
        retrieved\_docs = vector\_search\_only(rewritten\_query, section\_filter=current\_step.document\_section, k=config['top\_k\_retrieval'])
    elif retrieval\_decision.strategy == 'keyword\_search':
        retrieved\_docs = bm25\_search\_only(rewritten\_query, k=config['top\_k\_retrieval'])
    else:
        retrieved\_docs = hybrid\_search(rewritten\_query, section\_filter=current\_step.document\_section, k=config['top\_k\_retrieval'])
    
    return {"retrieved\_docs": retrieved\_docs}
def web\_search\_node(state: RAGState) -> Dict:
    current\_step\_index = state["current\_step\_index"]
    current\_step = state["plan"].steps[current\_step\_index]
    console.print(f"--- 🌐 : Searching Web (Step {current\_step\_index + 1}: {current\_step.sub\_question}) ---")
    
    past\_context = get\_past\_context\_str(state['past\_steps'])
    rewritten\_query = query\_rewriter\_agent.invoke({
        "sub\_question": current\_step.sub\_question,
        "keywords": current\_step.keywords,
        "past\_context": past\_context
    })
    console.print(f"  Rewritten Query: {rewritten\_query}")
    retrieved\_docs = web\_search\_function(rewritten\_query)
    return {"retrieved\_docs": retrieved\_docs}

然後是 Precision 與 Distillation 節點:

def compression\_node(state: RAGState) -> Dict:
    console.print("--- ✂️: Distilling Context ---")
    current\_step\_index = state["current\_step\_index"]
    current\_step = state["plan"].steps[current\_step\_index]
    context = format\_docs(state["reranked\_docs"])
    synthesized\_context = distiller\_agent.invoke({"question": current\_step.sub\_question, "context": context})
    console.print(f"  Distilled Context Snippet: {synthesized\_context[:200]}...")
    return {"synthesized\_context": synthesized\_context}
def reflection\_node(state: RAGState) -> Dict:
    console.print("--- : Reflecting on Findings ---")
    current\_step\_index = state["current\_step\_index"]
    current\_step = state["plan"].steps[current\_step\_index]
    summary = reflection\_agent.invoke({"sub\_question": current\_step.sub\_question, "context": state['synthesized\_context']})
    console.print(f"  Summary: {summary}")
    
    new\_past\_step = {
        "step\_index": current\_step\_index + 1,
        "sub\_question": current\_step.sub\_question,
        "retrieved\_docs": state['reranked\_docs'],
        "summary": summary
    }
    return {"past\_steps": state["past\_steps"] + [new\_past\_step], "current\_step\_index": current\_step\_index + 1}

反思並更新歷史:

def reflection_node(state: RAGState) -> Dict:    console.print("--- : Reflecting on Findings ---")    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    summary = reflection_agent.invoke({"sub_question": current_step.sub_question, "context": state['synthesized_context']})    console.print(f"  Summary: {summary}")        new_past_step = {        "step_index": current_step_index + 1,        "sub_question": current_step.sub_question,        "retrieved_docs": state['reranked_docs'],        "summary": summary    }    return {"past_steps": state["past_steps"] + [new_past_step], "current_step_index": current_step_index + 1}

最終答案生成:

def final\_answer\_node(state: RAGState) -> Dict:
    console.print("--- ✅: Generating Final Answer with Citations ---")
    final\_context = ""
    for i, step inenumerate(state['past\_steps']):
        final\_context += f"\\\\n--- Findings from Research Step {i+1} ---\\\\n"
        for doc in step['retrieved\_docs']:
            source = doc.metadata.get('section') or doc.metadata.get('source')
            final\_context += f"Source: {source}\\\\nContent: {doc.page\_content}\\\\n\\\\n"
    
    final\_answer\_prompt = ChatPromptTemplate.from\_messages([
        ("system", """You are an expert financial analyst. Synthesize the research findings from internal documents and web searches into a comprehensive, multi-paragraph answer for the user's original question.
Your answer must be grounded in the provided context. At the end of any sentence that relies on specific information, you MUST add a citation. For 10-K documents, use [Source: ]. For web results, use [Source: ]."""),
        ("human", "Original Question: {question}\\n\\nResearch History and Context:\\n{context}")
    ])
    
    final\_answer\_agent = final\_answer\_prompt | reasoning\_llm | StrOutputParser()
    final\_answer = final\_answer\_agent.invoke({"question": state['original\_question'], "context": final\_context})
    return {"final\_answer": final\_answer}

節點齊備後,接下來定義“邊”(edges),確定它們之間的連接關係與控制流。

定義條件邊

我們需要兩類關鍵的條件邊:

  1. 工具路由器(route_by_tool):在 plan之後,查看當前步驟應使用的工具,路由到 retrieve_10k或 retrieve_web。
  2. 主控制循環(should_continue_node):每次反思後,調用 policy_agent決定是繼續下一步還是結束並生成答案。

工具路由器:

def route\_by\_tool(state: RAGState) -> str:
    current\_step\_index = state["current\_step\_index"]
    current\_step = state["plan"].steps[current\_step\_index]
    return current\_step.tool

主控制循環:

def should\_continue\_node(state: RAGState) -> str:
    console.print("--- 🚦 : Evaluating Policy ---")
    current\_step\_index = state["current\_step\_index"]
    
    if current\_step\_index >= len(state["plan"].steps):
        console.print("  -> Plan complete. Finishing.")
        return"finish"
    
    if current\_step\_index >= config["max\_reasoning\_iterations"]:
        console.print("  -> Max iterations reached. Finishing.")
        return"finish"

    if state.get("reranked\_docs") isnotNoneandnot state["reranked\_docs"]:
        console.print("  -> Retrieval failed for the last step. Continuing with next step in plan.")
        return"continue"

    history = get\_past\_context\_str(state['past\_steps'])
    plan\_str = json.dumps([s.dict() for s in state['plan'].steps])

    decision = policy\_agent.invoke({"question": state["original\_question"], "plan": plan\_str, "history": history})
    console.print(f"  -> Decision: {decision.next\_action} | Justification: {decision.justification}")
    
    if decision.next\_action == "FINISH":
        return"finish"
    else:
        return"continue"

有了節點(專家)與條件邊(對話規則),我們就可以構建完整的 StateGraph。

連接深度思考 RAG 機器

現在用 LangGraph 的 StateGraph來定義完整的認知架構,也就是 agent 的思維流程藍圖。

from langgraph.graph import StateGraph, END

graph = StateGraph(RAGState)

添加節點:

graph.add\_node("plan", plan\_node)
graph.add\_node("retrieve\_10k", retrieval\_node)
graph.add\_node("retrieve\_web", web\_search\_node)
graph.add\_node("rerank", rerank\_node)
graph.add\_node("compress", compression\_node)
graph.add\_node("reflect", reflection\_node)
graph.add\_node("generate\_final\_answer", final\_answer\_node)

連接邊與條件邊:

graph.set\_entry\_point("plan")

graph.add\_conditional\_edges(
    "plan",
    route\_by\_tool,
    {
        "search\_10k": "retrieve\_10k",
        "search\_web": "retrieve\_web",
    },
)

graph.add\_edge("retrieve\_10k", "rerank")
graph.add\_edge("retrieve\_web", "rerank")
graph.add\_edge("rerank", "compress")
graph.add\_edge("compress", "reflect")

graph.add\_conditional\_edges(
    "reflect",
    should\_continue\_node,
    {
        "continue": "plan",
        "finish": "generate\_final\_answer",
    },
)

graph.add\_edge("generate\_final\_answer", END)
print("StateGraph constructed successfully.")

流程回顧:

  1. 從 plan開始;
  2. route_by_tool決定走 retrieve_10k還是 retrieve_web;
  3. 然後始終按 rerank -> compress -> reflect;
  4. reflect後,通過 should_continue_node 決定:
  5. 若 CONTINUE_PLAN,回到 plan,路由下一步;
  6. 若 FINISH,進入 generate_final_answer;
  7. 生成最終答案後結束。

至此,我們完成了深度思考 Agent 的複雜、循環架構。下一步是編譯與可視化。

編譯與可視化迭代工作流

編譯(.compile())會把抽象的節點與邊定義,轉化為可執行應用。我們還可以用內置工具生成圖示,有助於理解與調試。

deep\_thinking\_rag\_graph = graph.compile()
print("Graph compiled successfully.")

try:
    from IPython.display import Image, display
    png\_image = deep\_thinking\_rag\_graph.get\_graph().draw\_png()
    display(Image(png\_image))
except Exception as e:
    print(f"Graph visualization failed: {e}. Please ensure pygraphviz is installed.")

Deep Thinking Simpler Pipeline Flow

你會看到:

  • route_by_tool選擇內部或外部檢索的分支;
  • 每個研究步驟的線性處理(rerank -> compress -> reflect);
  • 關鍵的反饋循環:should_continue把流程送回 plan,開始下一輪;
  • 研究完成後進入 generate_final_answer 的“出口”。

這就是一個“會思考”的系統。接下來實際運行。

運行深度思考流水線

我們要用相同的多跳多源查詢來測試這個系統,看看它能否成功。

這裏我們調用 .stream()觀察每個節點更新後的 state,實時追蹤 agent 的“思考過程”。

final\_state = None
graph\_input = {"original\_question": complex\_query\_adv}

print("--- Invoking Deep Thinking RAG Graph ---")
for chunk in deep\_thinking\_rag\_graph.stream(graph\_input, stream\_mode="values"):
    final\_state = chunk
print("\\n--- Graph Stream Finished ---")
#### OUTPUT ####

--- Invoking Deep Thinking RAG Graph ---

--- 🧠: Generating Plan ---
plan:
  steps:
  - sub\_question: What are the key risks related to competition as stated in NVIDIA's 2023 10-K filing?
    tool: search\_10k
    ...
  - sub\_question: What are the recent news and developments in AMD's AI chip strategy in 2024?
    tool: search\_web
    ...

--- 🔍 : Retrieving from 10-K (Step 1: ...) ---
  Rewritten Query: key competitive risks for NVIDIA in the semiconductor industry...
  Supervisor Decision: Use \`hybrid\_search\`. ...

--- 🎯  : Reranking Documents ---
  Reranked to top 3 documents.

--- ✂️: Distilling Context ---
  Distilled Context Snippet: NVIDIA operates in the intensely competitive semiconductor industry...

--- 🤔: Reflecting on Findings ---
  Summary: According to its 2023 10-K, NVIDIA operates in an intensely competitive semiconductor industry...

--- 🚦  : Evaluating Policy ---
  -> Decision: CONTINUE\_PLAN | Justification: The first step...has been completed. The next step...is still pending...

--- 🌐 : Searching Web (Step 2: ...) ---
  Rewritten Query: AMD AI chip strategy news and developments 2024...

--- 🎯  : Reranking Documents ---
  Reranked to top 3 documents.

--- ✂️: Distilling Context ---
  Distilled Context Snippet: AMD has ramped up its challenge to Nvidia in the AI accelerator market with its Instinct MI300 series...

--- 🤔: Reflecting on Findings ---
  Summary: In 2024, AMD is aggressively competing with NVIDIA in the AI chip market through its Instinct MI300X accelerator...

--- 🚦  : Evaluating Policy ---
  -> Decision: FINISH | Justification: The research history now contains comprehensive summaries of both NVIDIA's stated risks and AMD's recent strategy...

--- ✅: Generating Final Answer with Citations ---

--- Graph Stream Finished ---

可以看到系統完整執行了我們設計的流程:規劃 -> 步驟 1 -> 自我評估繼續 -> 步驟 2 -> 自我評估結束 -> 最終綜合。

分析最終高質量答案

打印最終答案:

console.print("--- DEEP THINKING RAG FINAL ANSWER ---")
console.print(Markdown(final\_state['final\_answer']))
#### OUTPUT ####
--- DEEP THINKING RAG FINAL ANSWER ---
Based on an analysis of NVIDIA's 2023 10-K filing and recent news from 2024 regarding AMD's AI chip strategy, the following synthesis can be made:

\*\*NVIDIA's Stated Competitive Risks:\*\*
In its 2023 10-K filing, NVIDIA identifies its operating environment as the "intensely competitive" semiconductor industry, which is characterized by rapid technological change. A primary risk is that competitors, including AMD, could introduce new products with better performance or lower costs that gain significant market acceptance, which could materially and adversely affect its business [Source: Item 1A. Risk Factors.].

\*\*AMD's 2024 AI Chip Strategy:\*\*
In 2024, AMD has moved aggressively to challenge NVIDIA's dominance in the AI hardware market with its Instinct MI300 series of accelerators, particularly the MI300X. This product is designed to compete directly with NVIDIA's H100 GPU. AMD's strategy has gained significant traction, with major cloud providers such as Microsoft Azure and Oracle announcing plans to use the new chips [Source: https://www.reuters.com/technology/amd-forecasts-35-billion-ai-chip-revenue-2024-2024-01-30/].

\*\*Synthesis and Impact:\*\*
AMD's 2024 AI chip strategy directly exacerbates the competitive risks outlined in NVIDIA's 10-K. The successful launch and adoption of the MI300X is a materialization of the specific risk that a competitor could introduce a product with comparable performance. The adoption of AMD's chips by major cloud providers signifies a direct challenge to NVIDIA's market share in the lucrative data center segment, validating NVIDIA's stated concerns about rapid technological change [Source: Item 1A. Risk Factors. and https://www.cnbc.com/2023/12/06/amd-launches-new-mi300x-ai-chip-to-compete-with-nvidias-h100.html].

這是一次“完全成功”的綜合性回答:

  • 正確總結了 10-K 的風險;
  • 正確總結了 2024 年 AMD 動向;
  • 關鍵在“綜合與影響”部分:完成了多跳推理,解釋“後者如何加劇前者”;
  • 並提供了來源溯源(內部 section 與外部 URL)。

並排對比

讓我們把兩種結果放在一起對比。

這個對比清晰地説明:採用循環、工具感知、自我批判的 agent 架構,在複雜真實查詢上實現了顯著且可量化的性能提升。

評估框架與結果分析

雖然我們在一個難題上取得了成功,但在生產環境中需要客觀、量化、自動化的驗證。

Evaluation Framework

我們使用 RAGAs(RAG Assessment)庫,聚焦四個關鍵指標:

  • Context Precision & Recall:衡量檢索質量。Precision 問:檢回的文檔有多少真相關?Recall 問:所有相關文檔中,我們找到了多少?
  • Answer Faithfulness:答案是否紮根於提供的上下文,是防止 LLM 幻覺的主要檢查。
  • Answer Correctness:最終質量度量,與人工撰寫的“ground truth”答案對比,評估事實準確性與完整性。

準備評估數據集(包含問題、兩套系統的答案與上下文、以及 ground truth)並評測:

ffrom datasets import Dataset
from ragas import evaluate
from ragas.metrics import (
    context\_precision,
    context\_recall,
    faithfulness,
    answer\_correctness,
)
import pandas as pd

print("Preparing evaluation dataset...")

ground\_truth\_answer\_adv = "NVIDIA's 2023 10-K lists intense competition and rapid technological change as key risks. This risk is exacerbated by AMD's 2024 strategy, specifically the launch of the MI300X AI accelerator, which directly competes with NVIDIA's H100 and has been adopted by major cloud providers, threatening NVIDIA's market share in the data center segment."

retrieved\_docs\_for\_baseline\_adv = baseline\_retriever.invoke(complex\_query\_adv)
baseline\_contexts = [[doc.page\_content for doc in retrieved\_docs\_for\_baseline\_adv]]

advanced\_contexts\_flat = []
for step in final\_state['past\_steps']:
    advanced\_contexts\_flat.extend([doc.page\_content for doc in step['retrieved\_docs']])

advanced\_contexts = [list(set(advanced\_contexts\_flat))]

eval\_data = {
    'question': [complex\_query\_adv, complex\_query\_adv],
    'answer': [baseline\_result, final\_state['final\_answer']],
    'contexts': baseline\_contexts + advanced\_contexts,
    'ground\_truth': [ground\_truth\_answer\_adv, ground\_truth\_answer\_adv]
}

eval\_dataset = Dataset.from\_dict(eval\_data)

metrics = [
    context\_precision,
    context\_recall,
    faithfulness,
    answer\_correctness,
]
print("Running RAGAs evaluation...")

result = evaluate(eval\_dataset, metrics=metrics, is\_async=False)
print("Evaluation complete.")

results\_df = result.to\_pandas()
results\_df.index = ['baseline\_rag', 'deep\_thinking\_rag']

print("\\n--- RAGAs Evaluation Results ---")
print(results\_df[['context\_precision', 'context\_recall', 'faithfulness', 'answer\_correctness']].T)

輸出示例:

#### OUTPUT ####
Preparing evaluation dataset...
Running RAGAs evaluation...
Evaluation complete.


--- RAGAs Evaluation Results ---
                     baseline\_rag  deep\_thinking\_rag
context\_precision        0.500000           0.890000
context\_recall           0.333333           1.000000
faithfulness             1.000000           1.000000
answer\_correctness       0.395112           0.991458

量化結果為 Deep Thinking 架構給出明確客觀的優勢:

  • Context Precision(0.50 vs 0.89):baseline 只有一半相關,因為只能檢回關於“競爭”的泛化信息;advanced agent 通過多步驟、多工具檢索,顯著提升精度。
  • Context Recall(0.33 vs 1.00):baseline 完全錯過了 web 信息,召回低;advanced 通過規劃與工具使用,找齊全部必要信息,達到滿分。
  • Faithfulness(1.00 vs 1.00):兩者都很忠實。baseline 正確指出自己沒有信息;advanced 正確使用了找到的信息。忠實但不正確的答案也沒意義。
  • Answer Correctness(0.40 vs 0.99):最終質量指標。baseline 因缺失第二部分分析而低於 40%;advanced 接近完美。

總結整個流水線

本文中,我們從一個簡單、脆弱的 RAG 流水線,構建到一個複雜的自治推理 agent:

  • 先搭建 vanilla RAG,並演示它在複雜多源查詢上的必然失敗;
  • 系統化地打造 Deep Thinking Agent,賦予其規劃、多工具使用、與自適應檢索策略的能力;
  • 構建多階段檢索漏斗:先廣召回(hybrid search),再高精度(cross-encoder reranker),最後綜合(distiller agent);
  • 使用 LangGraph 編排整個認知架構,創建循環、有狀態的工作流,實現真正的多步推理;
  • 加入自我批判循環,讓 agent 能識別失敗、修訂計劃、並在無法得到答案時優雅退出;
  • 最後用 RAGAs 做生產級評估,客觀量化證明 advanced agent 的優越性能。

使用馬爾可夫決策過程(MDP)學習策略

目前,我們的 Policy Agent(決定 CONTINUE或 FINISH)依賴於像 GPT-4o 這樣的通用 LLM,每次都要調用。儘管有效,但在生產環境可能較慢且昂貴。學術前沿提出了更優的路徑。

  • 將 RAG 建模為 Decision Process:把 agent 的推理循環建模為 Markov Decision Process(MDP)。在這個模型中,每個 RAGState是“狀態”,每個 action(CONTINUE、REVISE、FINISH)會把系統帶入新狀態,並獲得一定“獎勵”(比如找到正確答案)。
  • 從經驗中學習:我們在 LangSmith 中記錄的成千上萬次成功/失敗的推理軌跡,都是寶貴的訓練數據。每條軌跡都是 agent 在這個 MDP 中的一個例子。
  • 訓練 Policy Model:利用這些數據,可以用 Reinforcement Learning 訓練一個更小、更專門的 policy 模型。
  • 目標:速度與效率。把像 GPT-4o 這樣複雜模型的決策能力,蒸餾到一個更小(例如 7B)的模型裏,使 CONTINUE/FINISH的決策更快、更省,同時高度針對我們的領域。這是諸多研究(如 DeepRAG)的核心思想,也是自治 RAG 系統優化的下一階段。
user avatar mianlengxincidehongjiu 头像 haijun_5e7e16c909f52 头像 quanzhikeji 头像 writers 头像 chen_5ec331606ce75 头像 weidejianpan 头像 fennudebiandang 头像 zread_ai 头像 xiangchujiadepubu 头像 qingfouai 头像 bobo_yf 头像 htdaydayup_5da2d7a6d4888 头像
点赞 12 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.