作者:琛琪、稚柳
引言
Agentic AI 時代已至,在智能客服、代碼生成、流程自動化等場景中,多智能體(Multi-Agent)協作正從構想走向落地。然而,當多個 Agent 需要像一個團隊那樣高效協作時,脆弱的通信機制可能因網絡抖動或服務宕機,就讓整個系統瞬間癱瘓,導致昂貴的計算任務失敗、會話狀態丟失。
如何為這些聰明的“數字員工”們構建一個真正可靠、高效的通信基座?
本文將為您介紹 Apache RocketMQ 全新推出的輕量級通信模型 LiteTopic,如何在 AI 應用場景中有效簡化系統架構、提升穩定性與可靠性,並結合 A2A(Agent-to-Agent)協議與阿里巴巴 AgentScope 框架的生產實踐案例,深入剖析面向智能體通信的落地實踐與技術實現。
RocketMQ for AI:重新定義 AI 應用通信範式
1.1 傳統應用:單向、無反饋的事件驅動模式
在傳統應用的事件驅動場景中,業務邏輯編排通常由人工預先約定,消息生產方成功發送消息後,便無需關注後續的處理邏輯。
下圖以註冊系統為例:用户發起賬户註冊請求後,註冊系統向 RocketMQ 發送“新用户註冊”的消息後便立即返回,無需關心下游的郵件或短信通知系統如何處理。郵件或短信通知系統再分別從 RocketMQ 拉取消息,驅動各自的發送流程。整條業務鏈路為單向、無反饋的事件驅動模式。
1.2 從單向事件到雙向交互:AI 應用對通信提出新挑戰
在 AI 應用場景中,業務邏輯編排通常由大模型動態生成,消息生產方需等待並處理響應結果,才能驅動後續的邏輯執行。
下圖以典型的 AI 會話場景為例:用户所連接的 Gateway 不僅需要發送請求,還需要處理推理響應結果,並將結果推送給瀏覽器,形成完整的交互閉環。
結合真實 AI 應用場景的深度調研,我們發現 AI 場景具有四個顯著特徵,對底層通信模式提出了全新且嚴苛的挑戰:
- 更長的響應時間:傳統互聯網應用追求毫秒級響應延時,而 AI 應用的響應時長普遍達到分鐘級以上。更關鍵的是,AI 應用單次業務的運行時間具有高度不可預測性。
- 更復雜的交互:AI 應用的多輪對話持續時間長,對話歷史可達數十輪甚至更多。單次上下文傳輸可能達到幾十甚至上百 MB,上下文管理難度高。多 Agent 之間的協同編排邏輯更加複雜,需要精確的狀態同步。
- 更昂貴的計算資源:AI 推理依賴昂貴的 GPU 資源,瞬時高併發流量可能衝擊推理服務穩定性,導致算力資源浪費,並且任務失敗重試的成本極高。
- 更精細化的事件驅動:由於計算能力有限,異步事件驅動需要更精準的消費速度控制。同時,必須實現分級的事件驅動策略,以確保高優先級任務優先獲得寶貴的計算資源。
1.3 RocketMQ LiteTopic:專為 AI 場景設計的通信模型
為了應對上述挑戰,Apache RocketMQ 推出了以輕量級通信模型 LiteTopic 為核心的一系列新特性:
-
輕量級通信模型 —— 為海量會話而生
其核心是百萬級輕量資源管理能力。基於極低的資源動態創建開銷,可輕鬆支持海量會話(Session)場景,並提供更細粒度的訂閲管理,適用於長時 Session、AI 工作流和 Agent-to-Agent 交互等場景。
-
企業級上下文管理 —— 讓會話狀態可靠持久
以連續的消息流完整保存 Session 上下文,通過順序保障、排他消費等機制嚴格確保上下文的完整性與一致性。同時原生支持大消息體(數十 MB 甚至更大),輕鬆滿足 AI 場景下龐大數據負載的傳輸需求。
1.4 LiteTopic 技術解析:百萬隊列支撐海量併發會話
LiteTopic 基於 RocketMQ 業界領先的百萬隊列核心技術構建,其底層本質是獨立的 Queue。
- 它為每個獨立會話(Session)創建一個專屬的、低成本的“私有通道”——即輕量主題(LiteTopic),從而能夠以極低的資源開銷支撐海量併發會話的需求。
- 輕量級的 LiteTopic 在消息分配與發送行為上與順序 Topic 一致(其所屬 Queue 由單一 Broker 獨佔,消息始終路由至該 Broker,而非在多個 Broker 間輪詢發送),這種設計天然確保了消息的嚴格順序性,並極大降低了資源管理和路由的複雜度。
1.4.1 LiteConsumer 支持單節點粒度的訂閲關係管理
與傳統消息隊列中“同一 Consumer Group ID(CID)必須全局一致訂閲相同 Topic”的強約束不同,LiteConsumer 創新性地支持 CID 內各節點按需進行差異化訂閲。每個節點可根據實際負載、業務場景或運行時需求,獨立訂閲不同的 LiteTopic,從而構建更加靈活、彈性的消費拓撲。
這一機制從根本上規避了因訂閲關係不一致所引發的消費異常、重複消費或 Rebalance 風暴等問題,顯著提升了系統的靈活性、可擴展性與穩定性。同時,它更契合 AI 時代輕量、動態、點對點的交互模式,為構建輕量級請求-響應式消息收發模型提供了原生支持。
1.4.2 LiteConsumer 的核心能力
- 多節點差異化訂閲:同一 CID 下的不同節點可獨立訂閲各自的 LiteTopic,實現細粒度、個性化的訂閲策略。
- 動態訂閲擴展:支持在運行時實時為單個節點新增 LiteTopic 訂閲,無需重啓服務或影響其他節點的正常消費。
- 動態退訂能力:支持在運行時實時取消單個節點對特定 LiteTopic 的訂閲,實現精準的資源釋放與流量治理。
1.5 生產案例:RocketMQ LiteTopic 如何重塑 AI 應用架構?
以下案例基於某客户真實的 AI 應用場景,通過架構對比直觀展示採用傳統 RocketMQ 通信模型與引入 LiteTopic 輕量級通信模型前後的顯著差異。
採用 RocketMQ LiteTopic 輕量級通信模型後,客户架構實現了質的提升:不僅徹底移除了對 Redis 的依賴,還避免了廣播推送帶來的帶寬與計算資源浪費。整體架構更輕量,系統穩定性與可靠性也得到顯著提升。
1.5.1 改造前:依賴 Redis + 廣播的臃腫架構
整體的業務流程步驟如下:
- 任務提交:用户請求到達後,應用接入層節點將推理任務寫入 Redis。
- 任務處理:Worker 集羣掃描 Redis 並處理推理任務,將推理過程中的中間結果以多條順序消息的形式發送至 RocketMQ。
- 結果持久化與通知:Consumer 集羣順序消費 RocketMQ 消息,將最終推理結果存入 Redis,並基於 RocketMQ 廣播通知所有應用接入層節點。
- 結果推送:應用接入層節點收到廣播消息後,僅當結果歸屬於自身連接時,才從 Redis 獲取完整結果並推送給客户端;否則直接忽略該消息。
傳統架構採用“先存儲、再廣播、後過濾”的模式,在高併發 AI 場景下效率低下且成本高昂:
- 架構臃腫且脆弱:強依賴外部組件 Redis,增加了系統的複雜度和潛在故障點,運維成本高,可用性受限。
- 資源浪費嚴重:無效的廣播機制導致大量帶寬被佔用,且每個應用接入層節點都需進行計算密集型的過濾操作。
- 鏈路冗長低效:數據流轉需多次讀寫 Redis,通信鏈路長、延遲高,應用接入層節點宕機後會話狀態將全部丟失,嚴重影響用户體驗。
1.5.2 改造後:基於 RocketMQ LiteTopic 的極簡可靠架構
引入 LiteTopic 後,業務流程被大幅簡化,實現了端到端的可靠、高效通信:
- 會話綁定與動態訂閲:應用接入層節點在發起推理請求時攜帶唯一身份標識(如 Session ID),並立即訂閲該標識對應的 LiteTopic(無需預創建 consumer group、topic)。
- 結果持久化發送:智能應用(Worker)根據請求中的身份標識,將推理結果直接發送至對應的 LiteTopic(同樣無需預創建)。
- 精準接收消費:應用接入層節點各自精準接收屬於自己的 response 消息,無需過濾,無任何冗餘消費。
1.5.3 核心價值:為 AI 會話注入“記憶”,實現斷點續傳與恢復
客户接入 LiteTopic 輕量級通信模型後,通過將 LiteTopic 與 Session 維度進行細粒度綁定,以極低成本實現了生產級的會話續傳與恢復能力。
在按照上一小節的流程實現端到端的可靠通信後,在網關機器下線/宕機時:
- 自動重連:客户端檢測到連接斷開後,自動發起重連請求。
- 動態訂閲:新接管的應用接入層節點實例根據 Session ID,動態訂閲原 session 對應的 LiteTopic(無需預創建)。
- 斷點續傳:新應用接入層節點從上次成功消費的 Offset 位點開始拉取消息,精準恢復到故障前的狀態(不會丟消息,也不會重複消費已處理的消息)。
- 恢復會話:自動恢復 Session 的完整上下文,用户完全無感知,業務流程無縫銜接。
基於 RocketMQ LiteTopic 打造企業級 Session 管理
2.1 AI 場景下 Session 的四大核心要求
在 AI 應用場景下,業界對 Session 的特性提出了以下四項核心要求:
- 低延遲:面向實時交互場景,要求快速響應。
- 時序性:必須嚴格按對話時間順序組織內容,確保上下文的連續性與邏輯一致性。
- 單會話隔離:保障不同用户/會話間的數據隔離,避免消息串話或狀態混淆。
- 上下文壓縮:支持通過截斷或摘要控制上下文長度,避免超出模型窗口限制導致溢出。
2.2 RocketMQ LiteTopic 實現 Session 的四大優勢
基於 RocketMQ LiteTopic 實現 Session 的核心價值,在於將“Session”從內存易失狀態轉化為可持久、可追溯、可恢復的事件流,為多智能體系統提供企業級會話韌性,徹底解決傳統架構中會話狀態丟失、無法恢復等痛點。
1. 會話狀態持久化 —— 進程重啓不丟會話
消息天然持久化存儲於 CommitLog,即使應用宕機或網絡中斷,也能通過消息重放完整重建會話上下文(如對話歷史、任務狀態、中間結果)。
如下圖所示,應用 A 將響應輸出的 TaskEvent / TaskUpdateEvent 轉換為 RocketMQ LiteTopic 中存儲的消息(Message)。當應用 A 重啓後,可從 CommitLog 中重放所有消息,完整恢復會話狀態。
2. 消息回溯與重放 —— 斷點精準恢復
支持按時間 / Offset 回溯消費,應用重啓後可從斷點精確恢復會話,實現無縫續聊與任務接力,避免重複推理帶來的算力浪費。
當應用宕機後重新啓動,可以指定某個 Session(LiteTopic)中的具體位點開始繼續消費,或從上次消費成功的位點開始消費。
3. Session 隔離與路由 —— 多會話並行無干擾
通過輕量級 LiteTopic 實現會話級隔離(如 Session ID 作為 LiteTopic 的唯一標識),確保多用户/多會話並行運行時互不干擾。
多用户多 Session 的消息存儲於不同的 LiteTopic,在數據存儲維度實現天然隔離,無需應用層手動過濾。
4. 流量削峯與緩衝 —— 保護下游應用穩定性
高併發會話請求被緩衝至 Broker,避免下游 Agent 瞬時過載崩潰,提升系統整體穩定性。下游應用根據自身處理能力按需消費消息,實現“削峯填谷”。
如下圖所示,應用 A 發出的任務請求可在 Broker 中持久化堆積,下游應用 B 根據自身消費能力按需拉取並處理,有效保障系統穩定性。
基於 RocketMQ 構建高可靠 A2A 通信通道
在上一章,我們解決了單個會話的持久化與恢復問題。現在,讓我們將視野放大:當成百上千個功能各異的 Agent 需要協作時,它們之間如何建立標準化的通信?這正是 A2A 協議誕生的意義所在。
3.1 A2A 協議
Agent-to-Agent(簡稱 A2A)是一項由 Google 於 2025 年發起,並貢獻至 Linux 基金會的開源通信協議。其核心目標是建立跨廠商、跨框架的標準化互操作機制,使異構 AI 智能體(Agents)能夠自動發現、可靠通信並高效協作,從而構建開放、可組合、可擴展的多智能體系統生態。
3.2 單智能體 vs. 多智能體架構:能力邊界與協同範式的演進
在深入探討如何構建 A2A 通信之前,我們首先需要理解,為什麼多智能體協同是必然趨勢。我們從六個維度對比單智能體與多智能體的能力差異:
3.3 同步 RPC 與 RocketMQ 異步通信的對比
明確了多智能體架構的優勢後,下一個關鍵問題是:如何實現 Agent 之間的通信?
A2A 協議原生支持的同步 RPC 協議包括 JSON-RPC、gRPC 和 REST。然而,在企業級的複雜場景下,這些同步協議面臨諸多挑戰。下表從多個維度對比同步 RPC 與 RocketMQ 異步通信模型的差異:
3.4 開箱即用:基於 RocketMQ 的 A2A 協議實現
為加速 A2A 協議在異步通信場景的落地,我們基於 RocketMQ SDK 實現了 A2A 協議的 ClientTransport 接口。該實現旨在幫助用户在搭建多智能體應用時,能夠專注於自身業務邏輯,快速構建高可靠、開箱即用的 A2A 通信方案。
發送普通同步請求:
EventKind sendMessage(MessageSendParams request, @Nullable ClientCallContext context)
發送Stream請求:
void sendMessageStreaming(MessageSendParams request, Consumer<StreamingEventKind> eventConsumer…)
重訂訂閲任務數據:
void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> eventConsumer, Consumer<Throwable> errorConsumer
查詢任務完成狀態:
Task getTask(TaskQueryParams request, @Nullable ClientCallContext context)
取消任務執行
Task cancelTask(TaskIdParams request, @Nullable ClientCallContext context)
以及其他方法
開源項目地址
基於 RocketMQ 實現的 A2A 通信 RocketMQTransport 部分代碼現已開源,歡迎關注!
項目地址:https://github.com/apache/rocketmq-a2a
3.5 架構解析:如何通過 RocketMQ 實現 Agent 間通信?
在一個典型的多智能體協作架構中,通信流程如下:
- 應用 A 扮演 Supervisor 角色,負責對用户輸入的需求進行任務分解,並將拆分後的子任務分別發送至應用 B 的業務 Topic(Normal Topic1)和應用 C 的業務 Topic(Normal Topic2)。
- 應用 B 集羣從 Normal Topic1 拉取消息並執行相應邏輯處理,隨後將結果發佈到應用 A 訂閲的 LiteTopic。
- 應用 C 集羣則從 Normal Topic2 拉取消息進行處理,並同樣將結果寫入該 LiteTopic。
- 應用 A 集羣通過拉取 LiteTopic 中的消息,匯聚各子任務的響應結果,進而驅動後續的業務邏輯編排。
AgentScope × RocketMQ:構建多智能體應用的最佳組合
理論與架構已經鋪墊完畢,接下來,讓我們結合一個完整的實戰案例,看看如何將這套強大的通信基座,與頂尖的智能體開發框架 AgentScope 相結合,構建一個真正可用的多智能體應用。
4.1 AgentScope:面向多智能體的開發者友好框架
AgentScope 是阿里巴巴繼 AI 模型社區 ModelScope 後,在 Agent 領域的又一戰略級開源力作。它以“開發者為中心”,專注於提供智能體開發的開源框架,為構建複雜的多智能體應用提供了從設計、開發到調試的全套解決方案。它具備以下核心優勢:
- 對開發者透明:拒絕隱式魔法,所有環節(提示、API、智能體、工作流)可見、可控。
- 實時可介入:原生支持運行時中斷與自定義處理。
- 更智能:內置工具管理、長期記憶、智能 RAG 等能力。
- 模型無關:一次編寫,無縫適配各類大模型。
- 樂高式構建:模塊化設計,組件解耦、自由組合。
- 面向多智能體:顯式消息傳遞與工作流編排,專為協作場景打造。
- 高度可定製:全面開放工具、提示、智能體、工作流及可視化擴展,鼓勵深度定製。
4.2 AgentScope x RocketMQ 的集成架構與合作展望
在明確了 AgentScope 的功能定位與應用價值之後,我們將進一步探討其通信層與 RocketMQ 的現有集成機制,並展望雙方在技術協同與生態共建方面的未來合作方向。
4.2.1 AgentScope 與 RocketMQ 集成架構
當 AgentScope 作為 Agent 應用服務提供者時,其內部支持符合 A2A(Agent-to-Agent)協議的多種通信方式,包括基於 JSONRPC 的 WebService 和 RocketMQ Service,用於接收並處理來自其他 Agent 的 A2A 協議請求。同時,AgentScope 通過 well-known 服務接口向外標準化地透出其所承載 Agent 的核心能力信息,包括但不限於:
- name(名稱)
- description(描述)
- capabilities(能力列表)
- additionalInterfaces(額外支持的接口或協議)
這些元數據使調用方能夠清晰識別該 Agent 提供的主要功能、所支持的通信協議及其對應的接入方式。
當 AgentScope 作為 Agent 應用服務的調用者時,它首先通過訪問目標 Agent 暴露的 well-known 服務,動態獲取其詳細的能力描述、支持的協議類型及對應的服務接入點(如 JSONRPC 端點或 RocketMQ Topic 信息)。隨後,在通信層,AgentScope 利用 A2A 協議定義的傳輸客户端(如 JSONRPCTransport 或 RocketMQTransport)發起請求,並對返回的響應結果進行統一解析與處理,從而實現跨 Agent 的標準化、可互操作的協同調用。
1. 基於 RocketMQ 協議通信架構圖
2. 基於 JSONRPC 協議通信架構圖
4.2.2 合作展望
隨着人工智能與分佈式系統技術的深度融合,消息中間件與智能體(Agent)平台的協同正成為構建下一代智能分佈式應用的關鍵路徑。作為 Apache 軟件基金會頂級項目,RocketMQ 憑藉高吞吐、低延遲和高可靠等核心特性,已成為全球廣泛採用的分佈式消息隊列,在金融、電商、物聯網等關鍵領域積累了深厚的技術積澱,並於近期推出了輕量級通信模型 LiteTopic,進一步拓展了其在 AI 應用場景中的適用性。與此同時,AgentScope 作為新興的智能體編排與運行平台,專注於為多智能體系統提供統一的調度、通信與治理能力。二者在技術理念與應用場景上高度契合,展現出廣闊的合作前景與協同創新潛力。
1. 技術互補:構建“消息驅動 + 智能決策”的新型架構
RocketMQ 提供了強大的異步通信、事件驅動和流式處理能力。AgentScope 則聚焦於智能體生命週期管理、任務分解、上下文感知與自主協作。未來,二者可深度融合,形成“消息即事件、事件觸發智能體行為”的新型架構:
- 智能體間通信的標準化通道:利用 RocketMQ 作為 AgentScope 內部或跨集羣智能體之間的可靠通信總線,保障高併發、有序、可追溯的消息傳遞,提升多智能體系統的魯棒性與擴展性。
2. 生態共建:推動開源與標準協同發展
雙方可基於開源社區開展深度合作:
- 集成適配器開發:共同開發 RocketMQ 與 AgentScope 的官方集成插件,簡化開發者接入流程。
- 聯合參考架構發佈:推出面向典型場景(如智能客服等場景)的聯合解決方案模板,加速行業落地。
- 參與標準制定:在事件驅動架構(EDA)、智能體通信協議等領域協同推進開放標準,促進生態互操作性。
4.3 場景案例:用 AgentScope 與 RocketMQ 打造“智能旅行助手”
本案例以 AgentScope 作為 AI 智能體應用開發框架,構建了三個智能體:
- SupervisorAgent(總控):負責與用户交互,任務分解與邏輯編排。
- WeatherAgent(天氣專家):負責查詢天氣信息。
- TravelAgent(旅行專家):負責依據天氣進行用户的行程規劃。
SupervisorAgent 應用具有如下邏輯:
- 如果用户只查詢天氣情況,則直接請求 WeatherAgent 進行天氣信息查詢;
- 如果用户希望做出行程規劃,則先向 WeatherAgent 發出天氣查詢請求,獲取對應天氣信息後,再帶着天氣信息向 TravelAgent 發出行程規劃請求,TravelAgent 對行程結果進行規劃後將響應結果發送至 SupervisorAgent 訂閲的 LiteTopic,SupervisorAgent 應用將結果發送至用户側。
實戰演練:三步構建高可靠多智能體應用
阿里雲官網現已提供免費試用、一鍵部署的《RocketMQ for AI:企業級 AI 應用集成的異步通信方案》,帶您親手搭建並運行一個多智能體應用,並基於 RocketMQ LiteTopic 實現多智能體異步通信能力——具備持久化、高可靠、可追溯等特性,顯著提升 AI 應用交互的穩定性與可觀測性。
5.1 方案概覽:技術架構與雲資源
本方案將帶領您搭建一個多智能體(Multi-Agent)系統,能夠根據用户的需求查詢天氣信息並制定行程規劃。
為簡化部署過程,我們將在 1 台雲服務器 ECS 上部署 3 個獨立的 Agent(SupervisorAgent,WeatherAgent 和 TravelAgent,具體功能可參考 4.3),並且通過 RocketMQ 消息服務實現 Agent 之間的異步通信。
本方案的技術架構包含構建一個完整多智能體應用所需的所有云資源:
5.2 三步體驗:從創建資源到部署 Agent
1. 免費一鍵部署資源
訪問體驗方案頁面,點擊“免費試用”,進入實驗操作界面後,點擊“立即試用”即可領取免費試用點,自動開始創建資源。
2. 創建 Topic 和 Group
共創建 3 個 Topic,配置參數見下表,其餘參數保持默認。
共創建 3 個 Group,配置參數見下表,其餘參數保持默認。
3. 創建部署智能體應用
在阿里雲百鍊的應用管理頁面,根據示例文檔中提供的模型參數和提示詞,分別創建併發布兩個智能體應用(天氣助手 Agent、行程助手 Agent)。
遠程連接雲服務器 ECS 根據提供的執行腳本部署示例應用程序。等待應用啓動完畢,大約需要 3~5 分鐘,直到終端顯示 You > 提示符,便可直接在終端中輸入信息與智能體交互。
5.3 結果驗證:任務執行與消息軌跡追蹤
- 在
You >提示符後,輸入幫我做一個下週三到下週日杭州周邊自駕遊方案並回車。 - 等待智能體執行任務,最終會返回結合天氣信息的行程規劃內容,過程如下:
a. SupervisorAgent 接收用户輸入,向消息隊列發送一條消息 杭州下週三到週日的天氣情況怎麼樣?。
b. WeatherAgent 監聽到上述消息,執行天氣查詢,並將結果發往消息隊列。
c. SupervisorAgent 監聽到上述消息,獲取了天氣查詢結果,然後向消息隊列發送一條消息 杭州下週三至週日天氣已知,天氣為***,請基於此制定一份從杭州出發的周邊2人3天4晚自駕遊行程規劃(下週三出發,週日返回),包含住宿、餐飲與景點推薦。
d. TravelAgent 監聽到上述消息,執行行程規劃,並將結果發往消息隊列。
- 查看消息軌跡:在雲消息隊列 RocketMQ 版實例詳情頁,可以按 Topic 或按 LiteTopic 查詢到相關的消息軌跡。
目前,該解決方案已在阿里雲官網上線,歡迎點擊此處即可部署體驗~
邀請您釘釘搜索羣號:110085036316,加入 RocketMQ for AI 用户交流羣,探索更多產品功能與應用場景,與我們共建 AI MQ 的未來!