一、開篇導語
我們對A2A已經有了初步的瞭解,但對具體使用可能還充滿了很多疑問,今天我們結合具體的實例來加深對A2A實際應用的理解,想象這樣一個場景:我們要組織一場户外籃球賽,需要同時考慮天氣狀況、場地預約、參與人員時間安排等多個因素。在傳統模式下,這需要我們分別查看天氣預報、聯繫場地管理員、逐個確認參與者時間——一個典型的多系統、多步驟的繁瑣過程。
而現在,通過A2A智能代理協議,這一切變得相當簡單,只需要一個簡單的請求,就告訴我們是否可以如期舉行,這背後發生的,是一場智能代理的協同交響曲:天氣代理提供精準預報,場地代理檢查可用性,日曆代理協調參與者時間,最後通知代理髮送確認信息。每個代理各司其職,又完美配合,共同完成一個複雜決策。
二、A2A的簡單回顧
1. 什麼是A2A協議
- 定義闡述:Agent-to-Agent協議的標準化通信框架
- 核心特徵:鬆耦合、可發現、自描述、可組合的智能體交互模式
- 核心組件:
- 服務發現機制:通過標準端點實現代理能力的自動識別
- 任務描述語言:統一的JSON Schema定義輸入輸出規範
- 狀態管理:完整的任務生命週期跟蹤和管理
- 安全認證:基於令牌的身份驗證和授權機制
2. A2A與傳統API的對比優勢
- 動態性對比:
- 傳統API:靜態接口,需要預先知道端點地址和參數
- A2A協議:動態發現,代理可運行時識別彼此能力
- 語義化程度:
- 傳統API:通常缺乏機器可讀的語義描述
- A2A協議:完整的JSON Schema提供豐富的語義信息
- 自治能力:
- 傳統API:被動響應,無自主決策能力
- A2A協議:代理具有自主性和智能決策能力
3. 協議的核心技術特性
- 標準化服務發現
- 通過/.well-known/agent.json端點暴露代理能力
- 自動化的代理註冊和發現機制,動態的能力協商和版本管理
- 統一的任務模型
- 標準化的任務提交格式,統一的結果返回結構,完整的錯誤處理規範
- 安全通信機制
- 基於令牌的身份驗證,端到端的加密通信,細粒度的權限控制
4. A2A協議的生態系統價值
- 互操作性:不同廠商、不同技術的代理無縫協作
- 可擴展性:新代理服務的即插即用式集成
- 維護性:獨立部署、升級不影響整體系統運行
三、單智能體決策系統
這是一個基於天氣條件自動決定籃球會議是否舉行的智能代理系統,遵循A2A通信協議,首先要聲明一個提供指定日期天氣數據查詢的 RESTful API 服務,並且要遵循智能代理(Agent)的標準規範。
1. 天氣服務的Agent
1.1 Agent Card 聲明
WEATHER_AGENT_CARD = {
"name": "WeatherAgent",
"version": "1.0",
"description": "提供指定日期的天氣數據查詢",
"endpoints": {
"task_submit": "/api/tasks/weather",
"sse_subscribe": "/api/tasks/updates"
},
"input_schema": {
"type": "object",
"properties": {
"date": {"type": "string", "format": "date"},
"location": {"type": "string", "enum": ["北京"]}
},
"required": ["date"]
},
"authentication": {"methods": ["API_Key"]}
}
- 作用:定義代理的元數據,遵循 .well-known/agent.json 標準
- name/version/description: 代理的基本信息
- endpoints: 定義任務提交和更新訂閲的API端點
- input_schema: 定義輸入參數的JSON Schema
- authentication: 聲明認證方式
1.2 數據模型
class WeatherTaskRequest(BaseModel):
task_id: str
params: dict
- 作用:定義任務請求的數據結構
- task_id: 任務唯一標識符
- params: 包含查詢參數(日期、位置)
1.3 模擬數據存儲
weather_db = {
"2025-05-08": {"temperature": "25℃", "condition": "雷陣雨"},
"2025-05-09": {"temperature": "18℃", "condition": "小雨轉晴"},
"2025-05-10": {"temperature": "22℃", "condition": "多雲轉晴"}
}
- 作用:模擬數據庫,存儲預設日期的天氣數據
1.4 Agent Card 發現端點
@app.get("/.well-known/agent.json")
async def get_agent_card():
return WEATHER_AGENT_CARD
- 功能:提供代理的標準發現接口,讓其他系統能夠自動發現和了解這個代理的能力
1.5 天氣查詢任務處理端點
@app.post("/api/tasks/weather")
async def handle_weather_task(request: WeatherTaskRequest):
target_date = request.params.get("date")
if not target_date or target_date not in weather_db:
raise HTTPException(status_code=400, detail="無效日期參數")
return {
"task_id": request.task_id,
"status": "completed",
"artifact": {
"date": target_date,
"weather": weather_db[target_date]
}
}
- 接收天氣查詢任務
- 驗證日期參數
- 返回對應日期的天氣信息
- 返回標準化的任務響應格式
1.5 服務提供的完整功能
1.5.1 代理髮現
- 通過標準路徑 /.well-known/agent.json 提供代理能力描述
1.5.2 天氣查詢
- 輸入:指定日期(必需)和位置(可選,目前只支持北京)
- 輸出:温度信息和天氣狀況
- 示例查詢日期:2025-05-08、2025-05-09、2025-05-10
1.5.3 標準化響應
返回統一格式的任務響應:
{
"task_id": "任務ID",
"status": "completed",
"artifact": {
"date": "查詢日期",
"weather": {"temperature": "温度", "condition": "天氣狀況"}
}
}
1.5.4 錯誤處理
- 參數驗證:檢查日期是否存在且有效
- 錯誤響應:返回標準HTTP錯誤碼和描述
1.6. 使用示例
1.6.1 發現代理能力
GET http://localhost:8000/.well-known/agent.json
1.6.2 查詢天氣
POST http://localhost:8000/api/tasks/weather
Content-Type: application/json
{
"task_id": "task_123",
"params": {
"date": "2025-05-08"
}
}
1.6.3 相應結果
{
"task_id": "task_123",
"status": "completed",
"artifact": {
"date": "2025-05-08",
"weather": {
"temperature": "25℃",
"condition": "雷陣雨"
}
}
}
1.6.4 服務啓動示例
這個服務可以作為更大的AI系統或工作流中的一個組件,專門負責天氣信息查詢功能。
WeatherAgent完整參考:
from fastapi import FastAPI, HTTPException
from datetime import date
from pydantic import BaseModel
import uvicorn
app = FastAPI()
# Agent Card聲明(通過/.well-known/agent.json暴露)
WEATHER_AGENT_CARD = {
"name": "WeatherAgent",
"version": "1.0",
"description": "提供指定日期的天氣數據查詢",
"endpoints": {
"task_submit": "/api/tasks/weather",
"sse_subscribe": "/api/tasks/updates"
},
"input_schema": {
"type": "object",
"properties": {
"date": {"type": "string", "format": "date"},
"location": {"type": "string", "enum": ["北京"]}
},
"required": ["date"]
},
"authentication": {"methods": ["API_Key"]}
}
# 任務請求模型
class WeatherTaskRequest(BaseModel):
task_id: str
params: dict
# 模擬天氣數據存儲
weather_db = {
"2025-05-08": {"temperature": "25℃", "condition": "雷陣雨"},
"2025-05-09": {"temperature": "18℃", "condition": "小雨轉晴"},
"2025-05-10": {"temperature": "22℃", "condition": "多雲轉晴"}
}
@app.get("/.well-known/agent.json")
async def get_agent_card():
return WEATHER_AGENT_CARD
@app.post("/api/tasks/weather")
async def handle_weather_task(request: WeatherTaskRequest):
"""處理天氣查詢任務"""
target_date = request.params.get("date")
# 參數驗證
if not target_date or target_date not in weather_db:
raise HTTPException(status_code=400, detail="無效日期參數")
return {
"task_id": request.task_id,
"status": "completed",
"artifact": {
"date": target_date,
"weather": weather_db[target_date]
}
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
2. 籃球會議安排代理
2.1 類初始化
class BasketBallAgent:
def __init__(self):
self.weather_agent_url = "http://localhost:8000"
self.api_key = "SECRET_KEY" # 實際應通過安全方式存儲
- 設置天氣代理服務的URL
- 配置API密鑰(實際應用中應從安全存儲獲取)
2.2 任務創建方法
def _create_task(self, target_date: str) -> dict:
"""創建A2A標準任務對象"""
return {
"task_id": str(uuid.uuid4()),
"params": {
"date": target_date,
"location": "北京"
}
}
- 生成符合A2A協議標準的任務對象
- 使用UUID確保任務ID的唯一性
- 封裝查詢參數(日期和位置)
2.3 天氣查詢核心方法
def check_weather(self, target_date: str) -> dict:
"""通過A2A協議查詢天氣"""
# 獲取天氣智能體能力描述
agent_card = requests.get(
f"{self.weather_agent_url}/.well-known/agent.json"
).json()
# 構造任務請求
task = self._create_task(target_date)
# 發送任務請求
response = requests.post(
f"{self.weather_agent_url}{agent_card['endpoints']['task_submit']}",
json=task,
headers={"Authorization": f"Bearer {self.api_key}"}
)
if response.status_code == 200:
return response.json()["artifact"]
else:
raise Exception(f"天氣查詢失敗: {response.text}")
- 作用:完整的A2A協議交互流程
- 服務發現:首先獲取天氣代理的能力描述
- 任務構造:創建標準格式的任務請求
- API調用:向任務提交端點發送請求
- 結果處理:提取並返回天氣數據
2.4 決策邏輯方法
def schedule_meeting(self, date: str):
"""綜合決策邏輯"""
try:
result = self.check_weather(date)
if "雨" not in result["weather"]["condition"] and "雪" not in result["weather"]["condition"]:
return {"status": "confirmed", "weather": result["weather"]}
else:
return {"status": "cancelled", "reason": "惡劣天氣"}
except Exception as e:
return {"status": "error", "detail": str(e)}
- 作用:基於天氣條件做出智能決策
- 確認舉行:天氣狀況不含"雨"或"雪"
- 取消會議:出現惡劣天氣條件
- 錯誤處理:處理查詢失敗的情況
2.5 系統協議流程
2.5.1 A2A協議實現
BasketBallAgent (客户端) → WeatherAgent (服務端)
↓ ↓
決策代理 能力代理
- 服務發現:動態獲取代理能力
- 標準通信:統一的任務格式和響應格式
- 鬆耦合:代理間獨立部署和升級
2.5.2 智能決策流程
輸入日期 → 查詢天氣 → 分析條件 → 輸出決策
↓ ↓ ↓ ↓
"2025-05-08" → 雷陣雨 → 包含"雨" → 取消會議
"2025-05-10" → 多雲轉晴 → 無惡劣天氣 → 確認舉行
2.6 使用示例
2.6.1 測試案例1:雨天取消
result = meeting_agent.schedule_meeting("2025-05-08")
# 輸出: 籃球安排結果: {'status': 'cancelled', 'reason': '惡劣天氣'}
2.6.2 測試案例2:晴天確認
result = meeting_agent.schedule_meeting("2025-05-10")
# 輸出: 籃球安排結果: {'status': 'confirmed', 'weather': {'temperature': '22℃', 'condition': '多雲轉晴'}}
2.6.3 運行結果示例
BasketBallAgent 參考:
import requests
import uuid
class BasketBallAgent:
def __init__(self):
self.weather_agent_url = "http://localhost:8000"
self.api_key = "SECRET_KEY" # 實際應通過安全方式存儲
def _create_task(self, target_date: str) -> dict:
"""創建A2A標準任務對象"""
return {
"task_id": str(uuid.uuid4()),
"params": {
"date": target_date,
"location": "北京"
}
}
def check_weather(self, target_date: str) -> dict:
"""通過A2A協議查詢天氣"""
# 獲取天氣智能體能力描述
agent_card = requests.get(
f"{self.weather_agent_url}/.well-known/agent.json"
).json()
# 構造任務請求
task = self._create_task(target_date)
# 發送任務請求
response = requests.post(
f"{self.weather_agent_url}{agent_card['endpoints']['task_submit']}",
json=task,
headers={"Authorization": f"Bearer {self.api_key}"}
)
if response.status_code == 200:
return response.json()["artifact"]
else:
raise Exception(f"天氣查詢失敗: {response.text}")
def schedule_meeting(self, date: str):
"""綜合決策邏輯"""
try:
result = self.check_weather(date)
# print('result=', result)
if "雨" not in result["weather"]["condition"] and "雪" not in result["weather"]["condition"]:
return {"status": "confirmed", "weather": result["weather"]}
else:
return {"status": "cancelled", "reason": "惡劣天氣"}
except Exception as e:
return {"status": "error", "detail": str(e)}
# 使用示例
if __name__ == "__main__":
meeting_agent = BasketBallAgent()
result = meeting_agent.schedule_meeting("2025-05-08")
# result = meeting_agent.schedule_meeting("2025-05-10")
print("籃球安排結果:", result)
四、多代理協作系統
基於以上示例擴展為多代理協作系統,新增了場地代理、日曆代理以及通知代理
1. 系統特點
1.1 多代理協作架構
BasketBallAgent (協調者)
├── WeatherAgent (天氣查詢)
├── VenueAgent (場地管理)
├── CalendarAgent (日程檢查)
└── NotificationAgent (通知發送)
1.2 新增代理功能
- 場地代理 (VenueAgent):檢查室內/室外場地可用性、處理場地維護狀態、支持不同運動類型
- 日曆代理 (CalendarAgent):檢查參與者時間衝突、計算可用率、提供詳細可用性信息
- 通知代理 (NotificationAgent):發送會議確認/取消通知、支持多種通知類型、跟蹤通知狀態
1.3 智能決策邏輯
- 決策流程:
- 天氣檢查 → 惡劣天氣則取消
- 場地檢查 → 場地不可用則取消
- 參與者檢查 → 可用率<60%則取消
- 發送通知 → 確認後自動通知
1.4 升級的架構設計
- 四層架構模型:
- 客户端層:用户交互界面
- 協調層:BasketBallAgent決策引擎
- 能力層:天氣、場地、日曆等專業代理
- 數據層:各代理的專有數據存儲
1.5 代理職責邊界劃分
- WeatherAgent:專注天氣數據查詢與預報
- VenueAgent:管理場地資源與可用性狀態
- CalendarAgent:處理參與者時間衝突檢測
- NotificationAgent:負責多通道消息推送
- BasketBallAgent:協調決策與業務流程編排
1.6 數據流與狀態管理
- 請求流程:客户端→協調器→能力代理的鏈式調用
- 響應聚合:多源數據在協調層的智能融合
- 狀態追蹤:基於task_id的全鏈路任務狀態管理
- 錯誤處理:分級故障隔離與優雅降級策略
1.7 決策機制
1.7.1 分佈式決策流程設計
- 並行檢查機制:天氣、場地、日程的併發驗證
- 決策樹邏輯:基於優先級的條件判斷序列
- 閾值配置:可調節的通過標準(如60%參與者可用率)
- 上下文傳遞:決策過程中狀態信息的完整傳遞
1.7.2 智能推薦引擎
- 多因素綜合分析:天氣狀況、場地類型、參與度權重計算
- 個性化建議生成:基於歷史數據和用户偏好的智能推薦
- 備選方案提供:自動建議替代時間或場地選項
- 風險評估:基於概率的會議成功可能性預測
1.7.3 衝突解決策略
- 優先級仲裁:關鍵條件(安全因素)的否決權設置
- 權重平衡:不同代理反饋結果的權重分配算法
- 協商機制:代理間的條件協商與妥協方案生成
- 人工干預:複雜情況的轉人工決策流程
2. 執行流程圖
3. 流程詳細説明
3.1 階段一:客户端請求接收與初始化
3.1.1 請求接收過程
客户端向高級籃球代理系統提交會議安排請求,請求中包含會議日期、參與者列表、場地類型等關鍵信息。系統接收到請求後,首先創建決策上下文對象,用於在整個處理流程中跟蹤和記錄所有檢查結果、決策狀態和原因説明。
3.1.2 上下文初始化
系統初始化一個結構化的決策上下文,該上下文將作為整個決策過程的數據載體。它包含原始請求信息、各個代理的檢查結果集合、最終決策狀態以及決策原因列表。初始狀態下,決策狀態設置為待處理,等待後續檢查結果的填充。
3.2 階段二:多代理並行檢查執行
3.2.1 並行調用策略
系統採用並行執行策略,同時向三個專業代理服務發起檢查請求。這種並行處理方式顯著減少了總體響應時間,提高了系統效率。每個代理服務都運行在獨立的端口上,具有專門的功能領域。
3.2.2 天氣代理檢查流程
3.2.2.1 服務發現與連接
系統首先通過標準化的服務發現機制定位天氣代理服務。通過訪問預定義的服務描述端點,獲取天氣代理的能力描述和接口規範,確保調用的準確性和兼容性。
3.2.2.2 天氣數據查詢
向天氣代理提交包含目標日期和位置的查詢請求。天氣代理接收到請求後,在其內部天氣數據庫中檢索指定日期的詳細天氣信息,包括温度、天氣狀況、濕度、風速、降水概率等多個維度的數據。
3.2.2.3 天氣適宜性分析
天氣代理不僅返回原始天氣數據,還進行專業的適宜性分析。通過分析温度範圍、降水情況、風速等因素,計算得出籃球活動的天氣適宜性評分,識別潛在的風險因素,並生成具體的活動建議。
3.2.3 場地代理檢查流程
3.2.3.1 場地狀態驗證
系統向場地代理查詢指定日期和場地類型的可用性狀態。場地代理檢查其管理的場地數據庫,驗證目標場地在請求時間段內是否可用,是否存在預定衝突或維護安排。
3.2.3.2 場地詳情獲取
除了基本的可用性狀態,場地代理還提供場地的詳細信息,包括場地名稱、容納人數、設施配置、收費標準等。這些信息有助於瞭解場地的具體條件。
3.2.3.3 衝突檢測處理
如果檢測到場地衝突,場地代理會明確説明衝突類型(已被佔用、維護中、已被預訂等),為後續決策提供充分的依據。
3.2.4 日曆代理檢查流程
3.2.4.1 參與者時間衝突檢測
日曆代理接收參與者列表和目標日期,逐個檢查每位參與者的個人日曆安排。通過比對請求時間段與參與者已有日程,識別是否存在時間衝突。
3.2.4.5 可用性統計分析
日曆代理統計可用參與者的數量,計算總體可用率,並分析關鍵參與者的可用情況。這些統計信息為會議安排決策提供重要參考。
3.2.4.6 衝突詳情報告
對於存在時間衝突的參與者,日曆代理提供詳細的衝突事件信息,包括衝突事件的標題、時間範圍和優先級,幫助理解衝突的性質和嚴重程度。
3.3 階段三:檢查結果聚合與智能決策
3.3.1 結果收集與整合
系統等待所有並行檢查完成,收集各個代理返回的檢查結果。將這些結果統一整合到決策上下文中,形成完整的檢查結果數據集。
3.3.2 分級決策邏輯執行
系統按照預設的優先級順序執行決策邏輯:
3.3.2.1 第一級:安全條件檢查
首先評估天氣條件的安全性。系統檢查天氣狀況中是否包含雨、雪、雷暴、大風等不利於户外籃球活動的因素。如果檢測到惡劣天氣條件,立即做出取消決策,並記錄具體的天氣原因。
3.3.2.2 第二級:資源條件檢查
如果天氣條件通過,接着檢查場地可用性。驗證請求的場地在目標時間段內是否處於可用狀態。如果場地不可用,做出取消決策,並説明具體的場地狀態原因。
3.3.2.3 第三級:人員條件檢查
如果前兩級檢查都通過,最後評估參與者可用性。檢查可用參與者比例是否達到預設閾值(如60%)。如果參與率不足,做出取消決策,並報告具體的參與率數據。
3.3.3 最終確認決策
只有當所有三級檢查都滿足條件時,系統才做出會議確認的最終決策。
3.4 階段四:通知處理與確認
3.4.1 條件性通知觸發
僅當會議獲得確認時,系統才會觸發通知流程。這種條件性執行避免了不必要的通知發送,提高了系統效率。
3.4.2 通知代理調用
系統向通知代理髮送會議確認通知請求,包含所有收件人列表、通知類型以及完整的會議決策數據。通知代理根據通知類型選擇相應的模板,填充具體的會議信息。
3.4.3 多通道通知發送
通知代理通過配置的通信通道(如郵件、短信等)向所有參與者發送格式化的會議確認通知。通知內容包含會議詳情、天氣信息、場地信息和參與建議等。
3.4.4 通知結果記錄
通知代理記錄每次通知發送的詳細結果,包括髮送時間、接收者列表、發送狀態等信息,這些信息被返回並記錄在決策上下文中。
3.5 階段五:最終響應生成與返回
3.5.1 結果格式化處理
系統將決策上下文中的所有信息整合生成標準化的響應格式。響應中包含系統生成的唯一會議標識符、最終決策狀態、決策時間戳、決策原因説明、詳細的檢查結果彙總以及針對性的活動建議。
3.5.2 智能建議生成
根據最終的決策狀態和具體的檢查結果,系統生成個性化的建議信息。對於確認的會議,提供天氣適應的活動建議;對於取消的會議,根據取消原因提供改期或調整的替代方案。
3.5.3 完整響應返回
將格式化後的最終響應返回給客户端,完成整個會議安排決策流程。客户端獲得包含所有決策依據和詳細信息的完整響應,便於用户理解系統決策並採取相應行動。
4. 啓動説明
要運行完整的系統,需要在不同的終端中分別啓動這四個服務:
# 終端1 - WeatherAgent
python WeatherAgent.py
# 終端2 - VenueAgent
python VenueAgent.py
# 終端3 - NotificationAgent
python NotificationAgent.py
# 終端4 - CalendarAgent
python CalendarAgent.py
# 終端5 - 運行測試客户端
python MultiAgent2.py
每個Agent都提供了完整的REST API接口,支持服務發現、任務處理和健康檢查,共同構成了一個完整的分佈式智能代理系統。
5. 示例參考
WeatherAgent示例:
from fastapi import FastAPI, HTTPException
from datetime import date
from pydantic import BaseModel
import uvicorn
from enum import Enum
from typing import Dict, Optional
app = FastAPI(title="WeatherAgent", version="1.0.0")
class WeatherCondition(str, Enum):
SUNNY = "晴"
CLOUDY = "多雲"
RAIN = "雨"
SNOW = "雪"
THUNDERSTORM = "雷陣雨"
LIGHT_RAIN = "小雨"
CLEAR = "晴轉多雲"
class WeatherTaskRequest(BaseModel):
task_id: str
agent_type: str
timestamp: str
params: dict
# 詳細的天氣數據庫
weather_db = {
"2025-05-08": {
"temperature": "25℃",
"condition": WeatherCondition.THUNDERSTORM,
"humidity": "85%",
"wind_speed": "15km/h",
"precipitation_probability": "90%",
"uv_index": "2",
"air_quality": "良"
},
"2025-05-09": {
"temperature": "18℃",
"condition": WeatherCondition.LIGHT_RAIN,
"humidity": "78%",
"wind_speed": "12km/h",
"precipitation_probability": "60%",
"uv_index": "1",
"air_quality": "優"
},
"2025-05-10": {
"temperature": "22℃",
"condition": WeatherCondition.CLEAR,
"humidity": "65%",
"wind_speed": "8km/h",
"precipitation_probability": "10%",
"uv_index": "5",
"air_quality": "優"
},
"2025-05-11": {
"temperature": "28℃",
"condition": WeatherCondition.SUNNY,
"humidity": "55%",
"wind_speed": "6km/h",
"precipitation_probability": "5%",
"uv_index": "8",
"air_quality": "良"
}
}
# 智能天氣分析
class WeatherAnalyzer:
@staticmethod
def analyze_weather_suitability(weather_data: dict, activity_type: str = "basketball") -> dict:
"""分析天氣對活動的適宜性"""
score = 100
# 温度適宜性 (15-30度最佳)
temp = int(weather_data["temperature"].replace("℃", ""))
if 15 <= temp <= 30:
temp_score = 100
elif 10 <= temp < 15 or 30 < temp <= 35:
temp_score = 70
else:
temp_score = 30
score = score * 0.3 + temp_score * 0.7
# 降水影響
if "雨" in weather_data["condition"] or "雪" in weather_data["condition"]:
score *= 0.3
elif weather_data["precipitation_probability"] > "50%":
score *= 0.7
# 風速影響
wind_speed = int(weather_data["wind_speed"].replace("km/h", ""))
if wind_speed > 20:
score *= 0.5
elif wind_speed > 15:
score *= 0.8
return {
"suitability_score": round(score),
"recommendation": "適宜進行" if score >= 70 else "不適宜進行",
"risk_factors": WeatherAnalyzer._identify_risk_factors(weather_data)
}
@staticmethod
def _identify_risk_factors(weather_data: dict) -> list:
"""識別天氣風險因素"""
risks = []
if "雨" in weather_data["condition"]:
risks.append("地面濕滑")
if "雷" in weather_data["condition"]:
risks.append("雷電危險")
if int(weather_data["temperature"].replace("℃", "")) > 30:
risks.append("高温中暑風險")
if int(weather_data["wind_speed"].replace("km/h", "")) > 15:
risks.append("大風影響")
return risks
WEATHER_AGENT_CARD = {
"name": "WeatherAgent",
"version": "2.0.0",
"description": "提供詳細的天氣數據查詢和分析服務",
"endpoints": {
"task_submit": "/api/tasks/weather",
"health_check": "/health"
},
"input_schema": {
"type": "object",
"properties": {
"date": {"type": "string", "format": "date", "description": "查詢日期"},
"location": {"type": "string", "description": "地理位置"},
"detailed": {"type": "boolean", "description": "是否返回詳細分析"}
},
"required": ["date"]
},
"capabilities": {
"weather_forecast": True,
"suitability_analysis": True,
"risk_assessment": True
},
"authentication": {"methods": ["API_Key"]}
}
@app.get("/.well-known/agent.json")
async def get_agent_card():
return WEATHER_AGENT_CARD
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "WeatherAgent"}
@app.post("/api/tasks/weather")
async def handle_weather_task(request: WeatherTaskRequest):
"""處理天氣查詢任務"""
target_date = request.params.get("date")
location = request.params.get("location", "北京")
detailed = request.params.get("detailed", False)
# 參數驗證
if not target_date:
raise HTTPException(status_code=400, detail="缺少日期參數")
if target_date not in weather_db:
# 模擬未來日期的天氣預測
simulated_weather = {
"temperature": "23℃",
"condition": WeatherCondition.CLOUDY,
"humidity": "70%",
"wind_speed": "10km/h",
"precipitation_probability": "30%",
"uv_index": "4",
"air_quality": "良"
}
weather_data = simulated_weather
else:
weather_data = weather_db[target_date]
# 構建響應
response = {
"task_id": request.task_id,
"status": "completed",
"artifact": {
"date": target_date,
"location": location,
"weather": weather_data
}
}
# 如果請求詳細分析,添加適宜性分析
if detailed:
analysis = WeatherAnalyzer.analyze_weather_suitability(weather_data)
response["artifact"]["analysis"] = analysis
return response
@app.get("/api/weather/history")
async def get_weather_history(days: int = 7):
"""獲取歷史天氣數據(模擬)"""
return {
"historical_data": weather_db,
"summary": {
"total_days": len(weather_db),
"rainy_days": len([w for w in weather_db.values() if "雨" in w["condition"]])
}
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
2. VenueAgent示例:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List
import uuid
app = FastAPI(title="VenueAgent", version="1.0.0")
class VenueStatus(str, Enum):
AVAILABLE = "available"
OCCUPIED = "occupied"
MAINTENANCE = "maintenance"
RESERVED = "reserved"
class SportType(str, Enum):
BASKETBALL = "basketball"
FOOTBALL = "football"
TENNIS = "tennis"
SWIMMING = "swimming"
class VenueTaskRequest(BaseModel):
task_id: str
agent_type: str
timestamp: str
params: dict
# 場地數據庫
venue_db = {
"basketball": {
"outdoor": {
"2025-05-08": VenueStatus.AVAILABLE,
"2025-05-09": VenueStatus.MAINTENANCE,
"2025-05-10": VenueStatus.AVAILABLE,
"2025-05-11": VenueStatus.RESERVED
},
"indoor": {
"2025-05-08": VenueStatus.OCCUPIED,
"2025-05-09": VenueStatus.AVAILABLE,
"2025-05-10": VenueStatus.AVAILABLE,
"2025-05-11": VenueStatus.AVAILABLE
}
},
"football": {
"outdoor": {
"2025-05-08": VenueStatus.AVAILABLE,
"2025-05-09": VenueStatus.AVAILABLE
}
}
}
# 場地詳細信息
venue_details = {
"basketball_outdoor": {
"name": "室外籃球場A區",
"capacity": 20,
"facilities": ["燈光", "休息區", "飲水機"],
"hourly_rate": 50,
"location": "體育館東側"
},
"basketball_indoor": {
"name": "室內籃球館主場地",
"capacity": 50,
"facilities": ["空調", "更衣室", "淋浴間", "專業地板"],
"hourly_rate": 150,
"location": "體育館主館"
}
}
class VenueManager:
@staticmethod
def check_availability(sport_type: str, venue_type: str, date: str) -> Dict:
"""檢查場地可用性"""
if sport_type not in venue_db:
return {
"status": VenueStatus.OCCUPIED,
"message": "不支持的體育類型"
}
if venue_type not in venue_db[sport_type]:
return {
"status": VenueStatus.OCCUPIED,
"message": "不支持的場地類型"
}
date_availability = venue_db[sport_type][venue_type]
if date in date_availability:
status = date_availability[date]
message = VenueManager._get_status_message(status)
else:
status = VenueStatus.AVAILABLE
message = "場地可用"
return {
"status": status,
"message": message,
"details": venue_details.get(f"{sport_type}_{venue_type}", {})
}
@staticmethod
def _get_status_message(status: VenueStatus) -> str:
messages = {
VenueStatus.AVAILABLE: "場地可用",
VenueStatus.OCCUPIED: "場地已被佔用",
VenueStatus.MAINTENANCE: "場地維護中",
VenueStatus.RESERVED: "場地已被預訂"
}
return messages.get(status, "場地狀態未知")
@staticmethod
def reserve_venue(sport_type: str, venue_type: str, date: str, duration: int) -> Dict:
"""預訂場地"""
availability = VenueManager.check_availability(sport_type, venue_type, date)
if availability["status"] != VenueStatus.AVAILABLE:
return {
"success": False,
"message": f"無法預訂: {availability['message']}",
"reservation_id": None
}
# 模擬預訂過程
reservation_id = str(uuid.uuid4())[:8]
venue_db[sport_type][venue_type][date] = VenueStatus.RESERVED
return {
"success": True,
"message": "預訂成功",
"reservation_id": reservation_id,
"details": availability["details"]
}
VENUE_AGENT_CARD = {
"name": "VenueAgent",
"version": "1.0.0",
"description": "提供場地管理和預訂服務",
"endpoints": {
"task_submit": "/api/tasks/venue/check",
"reservation": "/api/tasks/venue/reserve",
"health_check": "/health"
},
"input_schema": {
"type": "object",
"properties": {
"date": {"type": "string", "format": "date"},
"venue_type": {"type": "string", "enum": ["outdoor", "indoor"]},
"sport_type": {"type": "string", "enum": ["basketball", "football", "tennis"]}
},
"required": ["date", "venue_type"]
},
"capabilities": {
"availability_check": True,
"reservation_management": True,
"venue_information": True
},
"authentication": {"methods": ["API_Key"]}
}
@app.get("/.well-known/agent.json")
async def get_agent_card():
return VENUE_AGENT_CARD
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "VenueAgent"}
@app.post("/api/tasks/venue/check")
async def check_venue_availability(request: VenueTaskRequest):
"""檢查場地可用性"""
date = request.params.get("date")
venue_type = request.params.get("venue_type")
sport_type = request.params.get("sport_type", "basketball")
if not date or not venue_type:
raise HTTPException(status_code=400, detail="缺少必要參數")
availability = VenueManager.check_availability(sport_type, venue_type, date)
return {
"task_id": request.task_id,
"status": "completed",
"artifact": {
"date": date,
"venue_type": venue_type,
"sport_type": sport_type,
"status": availability["status"],
"message": availability["message"],
"details": availability["details"]
}
}
@app.post("/api/tasks/venue/reserve")
async def reserve_venue(request: VenueTaskRequest):
"""預訂場地"""
date = request.params.get("date")
venue_type = request.params.get("venue_type")
sport_type = request.params.get("sport_type", "basketball")
duration = request.params.get("duration", 2)
if not date or not venue_type:
raise HTTPException(status_code=400, detail="缺少必要參數")
reservation_result = VenueManager.reserve_venue(sport_type, venue_type, date, duration)
return {
"task_id": request.task_id,
"status": "completed",
"artifact": reservation_result
}
@app.get("/api/venue/types")
async def get_venue_types(sport_type: str = "basketball"):
"""獲取可用的場地類型"""
if sport_type in venue_db:
return {
"sport_type": sport_type,
"available_venue_types": list(venue_db[sport_type].keys()),
"details": {f"{sport_type}_{vt}": venue_details.get(f"{sport_type}_{vt}")
for vt in venue_db[sport_type].keys()}
}
else:
raise HTTPException(status_code=404, detail="不支持的體育類型")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8001, log_level="info")
MultiAgent2示例:
import requests
import uuid
from enum import Enum
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from pydantic import BaseModel
# 枚舉定義
class MeetingStatus(str, Enum):
CONFIRMED = "confirmed"
CANCELLED = "cancelled"
PENDING = "pending"
ERROR = "error"
class VenueStatus(str, Enum):
AVAILABLE = "available"
OCCUPIED = "occupied"
MAINTENANCE = "maintenance"
# 數據模型
class Participant(BaseModel):
name: str
email: str
availability: bool = True
class MeetingRequest(BaseModel):
date: str
duration_hours: int = 2
participants: List[Participant] = []
venue_type: str = "outdoor" # outdoor or indoor
# 擴展的籃球代理系統
class AdvancedBasketBallAgent:
def __init__(self):
# 多個代理服務的配置
self.agents = {
"weather": {"url": "http://localhost:8000", "api_key": "WEATHER_KEY"},
"venue": {"url": "http://localhost:8001", "api_key": "VENUE_KEY"},
"notification": {"url": "http://localhost:8002", "api_key": "NOTIFY_KEY"},
"calendar": {"url": "http://localhost:8003", "api_key": "CALENDAR_KEY"}
}
def _create_task(self, agent_type: str, params: dict) -> dict:
"""創建標準任務對象"""
return {
"task_id": str(uuid.uuid4()),
"agent_type": agent_type,
"timestamp": datetime.now().isoformat(),
"params": params
}
def _call_agent(self, agent_name: str, endpoint: str, task: dict) -> dict:
"""通用代理調用方法"""
agent_config = self.agents[agent_name]
try:
# 動態發現代理能力
agent_card_url = f"{agent_config['url']}/.well-known/agent.json"
agent_card = requests.get(agent_card_url, timeout=5).json()
# 調用具體端點
response = requests.post(
f"{agent_config['url']}{endpoint}",
json=task,
headers={"Authorization": f"Bearer {agent_config['api_key']}"},
timeout=10
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Agent {agent_name} 調用失敗: {response.text}")
except requests.exceptions.RequestException as e:
raise Exception(f"Agent {agent_name} 連接失敗: {str(e)}")
# 天氣代理調用
def check_weather(self, date: str, location: str = "北京") -> dict:
"""查詢天氣情況"""
task = self._create_task("weather", {
"date": date,
"location": location,
"detailed": True # 請求詳細天氣信息
})
return self._call_agent("weather", "/api/tasks/weather", task)
# 場地代理調用
def check_venue_availability(self, date: str, venue_type: str = "outdoor") -> dict:
"""檢查場地可用性"""
task = self._create_task("venue", {
"date": date,
"venue_type": venue_type,
"sport_type": "basketball"
})
return self._call_agent("venue", "/api/tasks/venue/check", task)
# 日曆代理調用
def check_participant_availability(self, date: str, participants: List[Participant]) -> dict:
"""檢查參與者時間安排"""
task = self._create_task("calendar", {
"date": date,
"participants": [p.dict() for p in participants],
"event_type": "basketball_meeting"
})
return self._call_agent("calendar", "/api/tasks/calendar/check", task)
# 通知代理調用
def send_notification(self, meeting_data: dict, recipients: List[str]) -> dict:
"""發送會議通知"""
task = self._create_task("notification", {
"meeting_data": meeting_data,
"recipients": recipients,
"notification_type": "meeting_confirmation"
})
return self._call_agent("notification", "/api/tasks/notification/send", task)
# 綜合決策引擎
def schedule_advanced_meeting(self, meeting_request: MeetingRequest) -> dict:
"""高級會議安排決策"""
decision_context = {
"request": meeting_request.dict(),
"checks": {},
"final_decision": MeetingStatus.PENDING,
"reasons": []
}
try:
# 1. 檢查天氣條件
weather_result = self.check_weather(meeting_request.date)
decision_context["checks"]["weather"] = weather_result
weather_condition = weather_result["artifact"]["weather"]["condition"]
if any(bad_weather in weather_condition for bad_weather in ["雨", "雪", "雷暴", "大風"]):
decision_context["final_decision"] = MeetingStatus.CANCELLED
decision_context["reasons"].append(f"惡劣天氣: {weather_condition}")
return self._format_decision(decision_context)
# 2. 檢查場地可用性
venue_result = self.check_venue_availability(
meeting_request.date,
meeting_request.venue_type
)
decision_context["checks"]["venue"] = venue_result
if venue_result["artifact"]["status"] != VenueStatus.AVAILABLE:
decision_context["final_decision"] = MeetingStatus.CANCELLED
decision_context["reasons"].append(
f"場地不可用: {venue_result['artifact']['status']}"
)
return self._format_decision(decision_context)
# 3. 檢查參與者時間
if meeting_request.participants:
calendar_result = self.check_participant_availability(
meeting_request.date,
meeting_request.participants
)
decision_context["checks"]["calendar"] = calendar_result
available_count = calendar_result["artifact"]["available_count"]
total_count = calendar_result["artifact"]["total_count"]
if available_count < total_count * 0.6: # 至少60%參與者可用
decision_context["final_decision"] = MeetingStatus.CANCELLED
decision_context["reasons"].append(
f"參與者可用率不足: {available_count}/{total_count}"
)
return self._format_decision(decision_context)
# 4. 所有檢查通過,確認會議
decision_context["final_decision"] = MeetingStatus.CONFIRMED
decision_context["reasons"].append("所有條件滿足,會議確認")
# 5. 發送通知
if decision_context["final_decision"] == MeetingStatus.CONFIRMED:
recipient_emails = [p.email for p in meeting_request.participants]
notification_result = self.send_notification(
decision_context,
recipient_emails
)
decision_context["checks"]["notification"] = notification_result
return self._format_decision(decision_context)
except Exception as e:
decision_context["final_decision"] = MeetingStatus.ERROR
decision_context["reasons"].append(f"系統錯誤: {str(e)}")
return self._format_decision(decision_context)
def _format_decision(self, context: dict) -> dict:
"""格式化決策結果"""
return {
"meeting_id": str(uuid.uuid4()),
"status": context["final_decision"],
"decision_time": datetime.now().isoformat(),
"reasons": context["reasons"],
"detailed_checks": context["checks"],
"recommendation": self._generate_recommendation(context)
}
def _generate_recommendation(self, context: dict) -> str:
"""生成智能推薦"""
status = context["final_decision"]
if status == MeetingStatus.CONFIRMED:
weather = context["checks"]["weather"]["artifact"]["weather"]
return f"會議確認!預計天氣:{weather['condition']},温度:{weather['temperature']},建議穿着運動服裝。"
elif status == MeetingStatus.CANCELLED:
reasons = context["reasons"]
if "天氣" in str(reasons):
return "由於天氣原因取消,建議改為室內場地或改期舉行。"
elif "場地" in str(reasons):
return "場地不可用,建議選擇其他場地或時間。"
else:
return "會議取消,請重新安排時間。"
else:
return "無法確定會議狀態,請手動檢查。"
# 模擬的其他代理服務實現
class VenueAgent:
"""場地管理代理"""
def __init__(self):
self.venue_db = {
"2025-05-08": {"outdoor": VenueStatus.AVAILABLE, "indoor": VenueStatus.OCCUPIED},
"2025-05-09": {"outdoor": VenueStatus.MAINTENANCE, "indoor": VenueStatus.AVAILABLE},
"2025-05-10": {"outdoor": VenueStatus.AVAILABLE, "indoor": VenueStatus.AVAILABLE},
}
def check_availability(self, task_request: dict) -> dict:
date = task_request["params"]["date"]
venue_type = task_request["params"]["venue_type"]
if date in self.venue_db and venue_type in self.venue_db[date]:
return {
"task_id": task_request["task_id"],
"status": "completed",
"artifact": {
"date": date,
"venue_type": venue_type,
"status": self.venue_db[date][venue_type],
"message": f"{venue_type}場地狀態查詢完成"
}
}
else:
return {
"task_id": task_request["task_id"],
"status": "completed",
"artifact": {
"date": date,
"venue_type": venue_type,
"status": VenueStatus.OCCUPIED,
"message": "未找到場地信息,默認不可用"
}
}
class CalendarAgent:
"""日曆管理代理"""
def __init__(self):
self.user_calendars = {
"alice@example.com": ["2025-05-08", "2025-05-09"], # 這些日期忙碌
"bob@example.com": ["2025-05-09"],
"charlie@example.com": [], # 所有日期都可用
}
def check_availability(self, task_request: dict) -> dict:
date = task_request["params"]["date"]
participants = task_request["params"]["participants"]
available_count = 0
availability_details = {}
for participant in participants:
email = participant["email"]
is_available = date not in self.user_calendars.get(email, [])
availability_details[email] = {
"available": is_available,
"name": participant["name"]
}
if is_available:
available_count += 1
return {
"task_id": task_request["task_id"],
"status": "completed",
"artifact": {
"date": date,
"available_count": available_count,
"total_count": len(participants),
"availability_rate": available_count / len(participants) if participants else 0,
"details": availability_details
}
}
class NotificationAgent:
"""通知代理"""
def send_notification(self, task_request: dict) -> dict:
meeting_data = task_request["params"]["meeting_data"]
recipients = task_request["params"]["recipients"]
# 模擬發送通知
notification_result = {
"sent_to": recipients,
"meeting_status": meeting_data["final_decision"],
"notification_id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat()
}
return {
"task_id": task_request["task_id"],
"status": "completed",
"artifact": notification_result
}
# 使用示例和測試
def demo_advanced_system():
"""演示高級系統功能"""
print("=== 高級籃球會議安排系統演示 ===\n")
# 創建代理實例
meeting_agent = AdvancedBasketBallAgent()
# 測試場景1: 理想情況 - 所有條件滿足
print("場景1: 理想情況")
ideal_request = MeetingRequest(
date="2025-05-10",
participants=[
Participant(name="Alice", email="alice@example.com"),
Participant(name="Bob", email="bob@example.com"),
Participant(name="Charlie", email="charlie@example.com")
],
venue_type="outdoor"
)
result1 = meeting_agent.schedule_advanced_meeting(ideal_request)
print(f"結果: {result1['status']}")
print(f"推薦: {result1['recommendation']}\n")
# 測試場景2: 惡劣天氣
print("場景2: 惡劣天氣")
bad_weather_request = MeetingRequest(
date="2025-05-08", # 雷陣雨
participants=[
Participant(name="Alice", email="alice@example.com"),
Participant(name="Bob", email="bob@example.com")
]
)
result2 = meeting_agent.schedule_advanced_meeting(bad_weather_request)
print(f"結果: {result2['status']}")
print(f"原因: {result2['reasons']}")
print(f"推薦: {result2['recommendation']}\n")
# 測試場景3: 場地維護
print("場景3: 場地維護")
venue_issue_request = MeetingRequest(
date="2025-05-09", # 户外場地維護
participants=[
Participant(name="Alice", email="alice@example.com"),
Participant(name="Charlie", email="charlie@example.com")
]
)
result3 = meeting_agent.schedule_advanced_meeting(venue_issue_request)
print(f"結果: {result3['status']}")
print(f"原因: {result3['reasons']}")
print(f"推薦: {result3['recommendation']}\n")
if __name__ == "__main__":
demo_advanced_system()
6. 系統特性體現
- 高效並行處理:通過同時執行多個代理檢查,顯著優化了系統響應時間。
- 分級決策機制:按照安全、資源、人員的優先級順序執行決策,確保關鍵因素優先考慮。
- 條件性操作執行:通知等後續操作僅在特定決策條件下觸發,避免不必要的資源消耗。
- 完整溯源能力:通過詳細的檢查結果記錄和決策原因説明,提供完整的決策過程溯源。
- 彈性容錯設計:單個代理服務的故障不會導致整個系統崩潰,具備優雅降級能力。
五、總結
從簡單的天氣查詢代理到複雜的多代理協同決策系統,我們見證了A2A智能代理協議如何重塑分佈式人工智能的架構範式。這個演進過程不僅僅是技術棧的升級,更是思維模式的根本轉變——從追求單體智能的極致性能轉向構建智能體間的協同生態。
通過籃球會議安排系統的完整示例,我們看到了A2A協議的核心價值:標準化的服務發現機制讓代理能夠自主識別彼此能力,語義化的任務描述確保跨系統的準確理解,自治性的決策邏輯使每個代理都能發揮專業優勢。這種架構帶來的不僅是技術上的鬆耦合和可擴展性,更是業務層面的敏捷響應和智能升級。
更重要的是,這個系統展現了分佈式智能的乘法效應:單個代理或許只能解決特定領域的問題,但當它們通過A2A協議協同工作時,產生的集體智能遠遠超出各部分能力的簡單疊加。天氣代理的專業預報、場地代理的資源管理、日曆代理的時間協調、通知代理的溝通觸達——每個代理專注本職,卻共同完成了一個需要多維度考量的複雜決策。