博客 / 詳情

返回

AI 智能體高可靠設計模式:並行執行

本系列介紹增強現代智能體系統可靠性的設計模式,以直觀方式逐一介紹每個概念,拆解其目的,然後實現簡單可行的版本,演示其如何融入現實世界的智能體系統。本系列一共 14 篇文章,這是第 1 篇。原文:Building the 14 Key Pillars of Agentic AI

優化智能體解決方案需要軟件工程確保組件協調、並行運行並與系統高效交互。例如預測執行,會嘗試處理可預測查詢以降低時延,或者進行冗餘執行,即對同一智能體重複執行多次以防單點故障。其他增強現代智能體系統可靠性的模式包括:

  • 並行工具:智能體同時執行獨立 API 調用以隱藏 I/O 時延。
  • 層級智能體:管理者將任務拆分為由執行智能體處理的小步驟。
  • 競爭性智能體組合:多個智能體提出答案,系統選出最佳。
  • 冗餘執行:即兩個或多個智能體解決同一任務以檢測錯誤並提高可靠性。
  • 並行檢索和混合檢索:多種檢索策略協同運行以提升上下文質量。
  • 多跳檢索:智能體通過迭代檢索步驟收集更深入、更相關的信息。

還有很多其他模式。

本系列將實現最常用智能體模式背後的基礎概念,以直觀方式逐一介紹每個概念,拆解其目的,然後實現簡單可行的版本,演示其如何融入現實世界的智能體系統。

所有理論和代碼都在 GitHub 倉庫裏:🤖 Agentic Parallelism: A Practical Guide 🚀

代碼庫組織如下:

agentic-parallelism/
    ├── 01_parallel_tool_use.ipynb
    ├── 02_parallel_hypothesis.ipynb
    ...
    ├── 06_competitive_agent_ensembles.ipynb
    ├── 07_agent_assembly_line.ipynb
    ├── 08_decentralized_blackboard.ipynb
    ...
    ├── 13_parallel_context_preprocessing.ipynb
    └── 14_parallel_multi_hop_retrieval.ipynb

並行工具隱藏 I/O 時延

智能體系統中最主要且最常見的瓶頸(許多開發者已經知道,但我認為對初學者來説很重要)不是 LLM 思考時間,而是 I/O 時延……即等待網絡、數據庫和外部 API 響應的時間。

並行工具處理

當代理需要從多個來源收集信息時,例如查詢股價和搜索最新新聞,天真、順序的方法會依次執行調用,效率低下。如果都是獨立調用,沒有理由不同時執行。

我們現在構建一個智能體系統,學習該模式在哪種情況下以及如何使用最有效。該系統會接收用户查詢,識別需要調用兩個不同的實時 API,並並行執行。

首先需要創造一些真實的工具,利用 yfinance 庫獲取實時股票價格數據。

from langchain_core.tools import tool
import yfinance as yf

@tool
def get_stock_price(symbol: str) -> float:
    """Get the current stock price for a given stock symbol using Yahoo Finance."""
    # 添加一條 print 語句,以清楚指示何時執行此工具
    print(f"--- [Tool Call] Executing get_stock_price for symbol: {symbol} ---")
    
    # 實例化 yfinance Ticker 對象
    ticker = yf.Ticker(symbol)
    
    # 獲取股票信息,用 'regularMarketPrice' 增強可靠性,並帶有回退
    price = ticker.info.get('regularMarketPrice', ticker.info.get('currentPrice'))
    
    # 處理股票代碼無效或數據不可用的情況
    if price is None:
        return f"Could not find price for symbol {symbol}"
    return price

LangChain 的 @tool 將標準 Python 函數裝飾為工具提供給代理,從而獲取給定股票代碼的市價。

快速測試一下,確保正確連接到了實時 API。

get_stock_price.invoke({"symbol": "NVDA"})

#### 輸出 ####
--- [Tool Call] Executing get_stock_price for symbol: NVDA ---
121.79 ...

可以看到輸出確認工具連接正確,可以訪問外部 yfinance API。如果失敗,就需要檢查網絡連接或 yfinance 安裝情況。

接下來將創建第二個用於獲取最新公司新聞的工具,使用針對基於 LLM 的代理優化的 Tavily 搜索 API。

from langchain_community.tools.tavily_search import TavilySearchResults

# 首先,初始化基本 Tavily 搜索工具
# 'max_results=5' 將限制搜索前 5 個最相關文章
tavily_search = TavilySearchResults(max_results=5)
@tool
def get_recent_company_news(company_name: str) -> list:
    """Get recent news articles and summaries for a given company name using the Tavily search engine."""
    # 添加 print 語句,以便清楚記錄工具的執行情況
    print(f"--- [Tool Call] Executing get_recent_company_news for: {company_name} ---")
    
    # 為搜索引擎構造更具體的查詢
    query = f"latest news about {company_name}"
    
    # 調用底層 Tavily 工具
    return tavily_search.invoke(query)

這裏把基礎工具 TavilySearchResults 封裝在自定義 @tool 函數裏,目的是獲取用户查詢的最新新聞。

測試一下這個工具……

get_recent_company_news.invoke({"company_name": "NVIDIA"})

#### 輸出 ####
--- [Tool Call] Executing get_recent_company_news for: NVIDIA ---
[{'url': 'https://www.reuters.com/technology/nvidia-briefly-surpasses-microsoft-most-valuable-company-2024-06-18/', 'content': 'Nvidia briefly overtakes Microsoft as most valuable company...'}, ...]

輸出是一份近期新聞列表,證實第二個工具也正常工作,我們的代理現在具備兩種不同的真實世界數據收集能力。

為了正確衡量效率提升,需要整理工作流,更新圖狀態,加入用於記錄性能指標的字段。

from typing import TypedDict, Annotated, List
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
    # 'messages' 將保存對話歷史
    messages: Annotated[List[BaseMessage], operator.add]
    # 'performance_log' 將累積詳細説明每個步驟執行時間的字符串
    # 'operator.add' 歸約函數告訴 LangGraph 追加列表而非替換
    performance_log: Annotated[List[str], operator.add]

AgentState 是智能體運行的黑匣子錄音機,通過添加帶有 Annotated operator.add 歸約函數的 performance_log 字段創建持久化日誌,圖中的每個節點都會更新該日誌,為我們提供分析總執行時間和各階段耗時所需的原始數據。

現在創建第一個儀表化節點,調用 LLM 的代理大腦。

import time

def call_model(state: AgentState):
    """The agent node: calls the LLM, measures its own execution time, and logs the result to the state."""
    print("--- AGENT: Invoking LLM --- ")
    start_time = time.time()
    
    # 從狀態中獲取當前消息歷史
    messages = state['messages']
    
    # 調用工具感知 LLM,LLM 將決定是否可以直接回答或需要調用工具
    response = llm_with_tools.invoke(messages)
    
    end_time = time.time()
    execution_time = end_time - start_time
    
    # 用性能數據創建日誌條目
    log_entry = f"[AGENT] LLM call took {execution_time:.2f} seconds."
    print(log_entry)
    
    # 返回 LLM 響應和要添加到狀態的新日誌條目
    return {
        "messages": [response],
        "performance_log": [log_entry]
    }

call_model 函數是我們第一個儀表化圖節點,用帶 time.time()llm_with_tools.invoke() 封裝調用,精確測量 LLM 的思考時間,並將測量數據格式化為人類可讀的字符串,作為狀態更新的一部分返回。

接下來創建用於執行工具的儀表化節點。

from langchain_core.messages import ToolMessage
from langgraph.prebuilt import ToolExecutor

# ToolExecutor 是一個 LangGraph 工具,可以獲取一組工具列表並執行
tool_executor = ToolExecutor(tools)
def call_tool(state: AgentState):
    """The tool node: executes the tool calls planned by the LLM, measures performance, and logs the results."""
    print("--- TOOLS: Executing tool calls --- ")
    start_time = time.time()
    
    # 來自代理的最後一條消息將包含工具調用
    last_message = state['messages'][-1]
    tool_invocations = last_message.tool_calls
    
    # ToolExecutor 可以批量執行工具調用,對於同步工具,底層仍然是順序的,
    # 但這是一種管理執行的乾淨方式
    responses = tool_executor.batch(tool_invocations)
    
    end_time = time.time()
    execution_time = end_time - start_time
    
    # 為工具執行階段創建日誌條目
    log_entry = f"[TOOLS] Executed {len(tool_invocations)} tools in {execution_time:.2f} seconds."
    print(log_entry)
    
    # 將工具響應格式化為 ToolMessages,這是 LangGraph 期望的標準格式
    tool_messages = [
        ToolMessage(content=str(response), tool_call_id=call['id'])
        for call, response in zip(tool_invocations, responses)
    ]
    
    # 返回工具消息和性能日誌
    return {
        "messages": tool_messages,
        "performance_log": [log_entry]
    }

類似於 call_model 節點,將核心邏輯 tool_executor.batch(tool_invocations) 封裝在計時儀表中,通過記錄執行 batch 的總時間,可以稍後和模擬順序執行比較,以量化並行的好處。

定義好儀表節點後,可以將它們接線成 StateGraph

from langgraph.graph import END, StateGraph

# 此函數作為條件邊,根據代理的最後一條消息路由工作流
def should_continue(state: AgentState) -> str:
    last_message = state['messages'][-1]
    # 如果最後一條消息包含工具調用,路由到 'tools' 節點
    if last_message.tool_calls:
        return "tools"
    # 否則,智能體已經完成推理,結束執行圖
    return END

# 定義圖工作流
workflow = StateGraph(AgentState)

# 添加儀表節點
workflow.add_node("agent", call_model)
workflow.add_node("tools", call_tool)

# 入口點是 'agent' 節點
workflow.set_entry_point("agent")

# 為路由添加條件邊
workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END})

# 添加從工具回到代理的邊
workflow.add_edge("tools", "agent")

# 編譯成可執行應用程序
app = workflow.compile()

並行工具調用

我們定義了一個簡單的循環:

  1. agent 思考
  2. should_continue 邊檢查是否需要行動,如果需要,tools 節點會行動,然後流返回 agent 節點處理其動作的結果。
  3. compile() 方法將該抽象定義轉化為具體、可執行的對象。

接下來給代理一個查詢,要求它同時使用兩個工具並進行流式執行,並在每一步檢查狀態。

from langchain_core.messages import HumanMessage
import json

# 圖的初始輸入,包括用户查詢
inputs = {
    "messages": [HumanMessage(content="What is the current stock price of NVIDIA (NVDA) and what is the latest news about the company?")],
    "performance_log": []
}
step_counter = 1
final_state = None

# 用 .Stream() 使用 stream_mode='values' 獲取每個節點運行後的完整狀態字典
for output in app.stream(inputs, stream_mode="values"):

    # 輸出字典的鍵是剛剛運行的節點名稱
    node_name = list(output.keys())[0]
    print(f"\n{'*' * 100}")
    print(f"**Step {step_counter}: {node_name.capitalize()} Node Execution**")
    print(f"{'*' * 100}")
    
    # 打印狀態,以便詳細檢查
    state_for_printing = output[node_name].copy()
    if 'messages' in state_for_printing:
        # 將消息對象轉換為更可讀的字符串表示形式
        state_for_pr...tty_repr() for msg in state_for_printing['messages']]
    print("\nCurrent State:")
    print(json.dumps(state_for_printing, indent=4))

    # 為每一步添加分析
    print(f"\n{'-' * 100}")
    print("State Analysis:")

    if node_name == "agent":
        # 檢查代理響應是否包含工具調用
        if "tool_calls" in state_for_printing['messages'][-1]:
            print("The agent has processed the input. The LLM correctly planned parallel tool calls. The execution time of the LLM call has been logged.")
        else:
            print("The agent has received the tool results and synthesized them into a coherent, final answer. The performance log now contains the full history.")
    elif node_name == "tools":
        print("The tool executor received the tool calls and executed them. The results are now in the state as ToolMessages. The performance log is accumulating.")
    print(f"{'-' * 100}")
    step_counter += 1
    final_state = output[node_name]

執行查詢,看看並行模擬是如何工作的……

#### 輸出 ####
****************************************************************************************************
**Step 1: Agent Node Execution**
****************************************************************************************************
--- AGENT: Invoking LLM --- 
[AGENT] LLM call took 4.12 seconds.

Current State:
{
    "messages": [
        "HumanMessage(content='What is the current stock price of NVIDIA (NVDA) and what is the latest news about the company?')",
        "AIMessage(content='', tool_calls=[{'name': 'get_stock_price', 'args': {'symbol': 'NVDA'}, 'id': '...'}, {'name': 'get_recent_company_news', 'args': {'company_name': 'NVIDIA'}, 'id': '...'}])"
    ],
    "performance_log": [ "[AGENT] LLM call took 4.12 seconds." ]
}
----------------------------------------------------------------------------------------------------
State Analysis:
The agent has processed the input. The LLM correctly planned parallel tool calls. The execution time of the LLM call has be...------------------------------

****************************************************************************************************
**Step 2: Tools Node Execution**
****************************************************************************************************
--- TOOLS: Executing tool calls --- 
--- [Tool Call] Executing get_stock_price for symbol: NVDA ---
--- [Tool Call] Executing get_recent_company_news for: NVIDIA ---
[TOOLS] Executed 2 tools in 2.31 seconds.
Current State:
{
    "messages": [ ... ],
    "performance_log": [ "[AGENT] LLM call took 4.12 seconds.", "[TOOLS] Executed 2 tools in 2.31 seconds." ]
}
----------------------------------------------------------------------------------------------------
State Analysis:
The tool executor received the tool calls and executed them. The results are now in the state as ToolMessages. The performance log is accumulating.
----------------------------------------------------------------------------------------------------
...

流輸出提供了代理週期的逐步視圖。

  • 步驟 1(代理):在 agent 節點初始運行中,通過 AIMessage 可以看到 Llama 3 模型正確識別需要調用兩個獨立工具,get_stock_priceget_recent_company_news,並且在一次回合內完成了規劃,從而實現了並行優化計劃。
  • 步驟 2(工具)tools 節點接收兩條計劃調用,日誌顯示兩條 [Tool Call] 打印語句,確認被 ToolExecutor 同時執行。性能日誌條目 [TOOLS] Executed 2 tools in 2.31 seconds 是關鍵數據。
  • 步驟 3(代理):最後一步,代理收到 ToolMessage 結果並綜合生成最終答案。

現在進行最終定量證明,分析完整性能日誌,計算節省的時間。

print("Run Log:")
total_time = 0
tool_time = 0
for log in final_state['performance_log']:
    print(f" - {log}")
    # 從日誌字符串中提取時間值
    time_val = float(log.split(' ')[-2])
    total_time += time_val
    if "[TOOLS]" in log:
        tool_time = time_val
print("\n" + "-"*60 + "\n")
print(f"Total Execution Time: {total_time:.2f} seconds\n")
print("Analysis:")

可以看到並行處理解決了時延問題……

#### 輸出 ####
============================================================
               FINAL PERFORMANCE REPORT
============================================================
Run Log:
 - [AGENT] LLM call took 4.12 seconds.
 - [TOOLS] Executed 2 tools in 2.31 seconds.
 - [AGENT] LLM call took 5.23 seconds.
------------------------------------------------------------

Total Execution Time: 11.66 seconds

工具執行總時間為 2.31s,假設每個網絡調用耗時約 1.5s,順序執行需時約 3.0s(1.5s + 1.5s)。

併發執行節省了約 0.7s,增益看起來很小,但在一個有 5-10 個獨立工具調用、每次需要 2-3s 的複雜系統中,差別會更大。順序過程需 10-30s,而並行過程仍只需 2-3s,這就是可用系統和不可用系統的區別。


Hi,我是俞凡,一名兼具技術深度與管理視野的技術管理者。曾就職於 Motorola,現任職於 Mavenir,多年帶領技術團隊,聚焦後端架構與雲原生,持續關注 AI 等前沿方向,也關注人的成長,篤信持續學習的力量。在這裏,我會分享技術實踐與思考。歡迎關注公眾號「DeepNoMind」,星標不迷路。也歡迎訪問獨立站 www.DeepNoMind.com,一起交流成長。

本文由mdnice多平台發佈

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.