一、前情提要
今天我們來探討智能投資顧問系統的終極演進形態——混合式架構。如果説反應式架構是短跑選手,能夠在秒級內給出快速響應;深思式架構是馬拉松選手,能夠進行深度的分析和規劃;那麼混合式架構就是十項全能運動員,它集兩者之長,根據不同的場景需求智能切換工作模式,真正實現了因材施教的個性化投資顧問服務。
在前面幾篇文章的基礎上,今天我們將詳細介紹混合式智能投資顧問系統的架構設計與實現原理,更新技術棧改進系統流程和運行方式,通過智能路由機制,在反應式快速響應與深思熟慮式深度分析之間實現動態平衡,為不同複雜度的投資諮詢需求提供最優解決方案。
二、系統演進
從單一到融合的必然演進,在深入探討反應式和深思式架構後,我們發現兩者各有優劣:
- 反應式架構雖然響應迅速,但分析深度有限,僅僅只適合標準化問題,難以處理複雜場景,同時個性化程度相對較低;
- 深思式架構分析深度足夠,但響應時間較長;大模型接口資源消耗大,難以支撐高併發;同時對於簡單問題存在過度處理的問題
- 混合式架構順應而生,不是選擇用快刀還是利劍,而是根據不同的使用場景,智能選擇最合適的匹配方式,由此,必定會有混合式架構的誕生。
三、混合架構的設計原則
- 適應性:根據查詢特徵自動選擇處理路徑
- 效率性:合理分配計算資源,避免過度處理
- 用户體驗:在響應速度和分析深度間找到最佳平衡點
- 可擴展性:支持模塊化擴展和性能線性提升
四、混合式架構的核心思想
混合式架構的核心設計理念基於情境感知的智能路由和優勢資源的動態調配。不同於單一架構的侷限性,混合架構通過實時評估查詢複雜度、用户價值和系統資源,在反應式的速度優勢與深思熟慮式的深度優勢之間實現最優平衡。
1. 智能路由
混合式架構的智能路由機制如同架構的大腦,這就像一個經驗豐富的導診台護士,能夠快速判斷患者的病情輕重,然後分派到不同的診室:
- 急診室(反應式通道):處理簡單緊急的諮詢
- 專科門診(混合式通道):處理中等複雜度的規劃
- 專家會診(深思式通道):處理複雜的綜合性問題
2. 關聯與演進
讓我們通過一個具體的例子來説明三種架構的關聯,例如用户查詢:"我35歲,年收入50萬,有房有車無負債,現有100萬存款想做投資,能承受中等風險,希望5年內資產增長50%"
反應式處理:
# 快速提取關鍵信息後直接返回
"建議採用平衡型投資策略,配置50%權益類資產..."
深思式處理:
# 經過深度分析後返回
"""
經過全面分析,為您制定以下投資規劃:
1. 資產配置:股票40%、債券30%、基金20%、現金10%
2. 預期收益:年化8.4%
3. 風險評估:中等偏上
...(詳細分析報告)
"""
混合式處理:
# 快速響應 + 逐步深度分析
"""
快速建議:基於您的風險偏好,建議平衡型配置
深度分析(逐步加載):
• 用户畫像分析完成...
• 市場環境評估中...
• 投資策略生成...
• 風險評估優化...
最終為您生成個性化投資方案...
"""
3. 技術創新
動態負載均衡
混合式架構引入了動態負載均衡機制,能夠根據系統實時狀態智能調整路由策略:
class DynamicLoadBalancer:
def adjust_routing_strategy(self, system_status):
if system_status.reactive_load > 80%:
# 反應式負載高,適當提升混合式門檻
self.complexity_thresholds['hybrid'] = 0.4
elif system_status.deliberative_load > 60%:
# 深思式負載高,啓用快速模式
self.enable_quick_deliberative_mode()
漸進式響應機制
混合式架構創新性地提出了漸進式響應的概念:
用户查詢
↓
即時響應(1秒內)
↓ "正在為您深度分析..."
初步建議(3秒內)
↓ "基於快速分析,建議..."
深度分析(5-8秒)
↓ "深度分析完成,優化建議..."
完整報告(10-15秒)
↓ "個性化投資方案已生成"
這種機制既保證了用户體驗的流暢性,又提供了深度的分析價值。
五、系統架構設計
1. 整體架構概覽
混合式智能投資顧問系統架構
├── 接入層
│ ├── 用户請求接收
│ ├── 輸入驗證與預處理
│ └── 會話管理
├── 智能路由層
│ ├── 複雜度評估引擎
│ ├── 用户價值分析
│ └── 路由決策器
├── 處理引擎層
│ ├── 反應式處理通道
│ ├── 混合式處理通道
│ └── 深思式處理通道
├── 數據層
│ ├── 用户畫像庫
│ ├── 知識庫
│ └── 性能監控庫
└── 輸出層
├── 結果格式化
├── 風險控制
└── 質量驗證
2. 核心組件詳解
2.1 智能路由引擎
職責:
- 實時分析用户查詢的複雜度特徵
- 評估用户歷史價值和當前需求
- 選擇最優處理通道
- 動態調整路由策略
關鍵技術:
# 複雜度評估算法
複雜度分數 = (文本長度分數 + 金融術語分數 + 數字數量分數 + 規劃詞彙分數) / 4
# 路由決策邏輯
if 複雜度 ≤ 0.3: 選擇反應式通道
elif 複雜度 ≥ 0.7 and 用户價值 > 0.5: 選擇深思式通道
else: 選擇混合式通道
2.2 多通道處理引擎
- 反應式通道:基於規則引擎的快速匹配、標準化建議模板、極速響應(0.5-2秒)
- 混合式通道:雙線程並行處理、結果智能融合、平衡響應(2-8秒)
- 深思式通道:多階段深度分析、個性化策略生成、深度處理(8-15秒)
3. 狀態管理與性能優化
3.1 實時狀態監控系統
狀態數據模型:
狀態信息 = {
'current_step': '當前主步驟名稱',
'progress': 進度百分比,
'architecture': '當前使用架構',
'user_id': '用户標識',
'current_query': '當前查詢內容',
'sub_steps': ['子步驟1', '子步驟2', ...]
}
監控維度:
- 處理進度:實時顯示完成百分比
- 步驟跟蹤:清晰標識當前執行階段
- 資源使用:監控各通道負載情況
- 性能指標:響應時間和成功率統計
3.2 性能優化策略
負載均衡機制:
- 基於系統負載動態調整路由閾值
- 高負載時適當提升反應式處理比例
- 資源競爭時的優先級調度
彈性伸縮策略:
if 反應式QPS > 閾值:
提升混合式處理門檻
啓用緩存加速機制
if 深思式併發 > 限制:
啓用快速深思模式
調整超時時間設置
if 系統負載 > 容量:
臨時拒絕低優先級請求
啓用服務降級策略
多級緩存體系:
- 查詢結果緩存:相同查詢的直接返回
- 用户畫像緩存:活躍用户數據的快速訪問
- 策略模板緩存:常用投資策略的預生成
- 市場數據緩存:實時市場信息的本地緩存
六、系統運行流程
1. 流程圖
2. 智能路由分析
2.1 複雜度評估維度
文本特徵分析:
- 長度因子:查詢文本長度與信息密度
- 專業術語:金融投資領域專業詞彙出現頻率
- 數字特徵:金額、年限、比例等數值信息
- 規劃詞彙:目標、計劃、規劃等前瞻性詞彙
計算模型:
複雜度分數 = 文本長度分數 × 0.25 + 金融術語分數 × 0.35 + 數字數量分數 × 0.25 + 規劃詞彙分數 × 0.15
2.2 用户價值評估
評估指標:
- 交互頻率:歷史查詢次數和活躍度
- 查詢質量:平均複雜度和需求深度
- 架構偏好:歷史處理通道選擇模式
- 價值潛力:基於用户畫像的長期價值預估
3. 多通道處理詳細流程
3.1 反應式處理通道
執行序列:
- 1. 快速意圖識別
- 關鍵詞提取和分類
- 意圖優先級排序
- 緊急程度判斷
- 2. 規則引擎匹配
- 業務規則條件判斷
- 模板選擇和建議生成
- 風險提示自動添加
- 3. 快速建議生成
- 標準化建議填充
- 個性化參數調整
- 輸出格式化和美化
3.2 混合式處理通道
並行處理架構:
混合式處理流程:
├── 反應式線程(快速通道)
│ ├── 即時意圖分析
│ ├── 基礎建議生成
│ └── 快速結果返回
├── 深思式線程(快速深度)
│ ├── 簡化用户分析
│ ├── 關鍵因素評估
│ └── 優化建議生成
└── 結果融合引擎
├── 建議優先級排序
├── 衝突檢測和解決
└── 最終結果整合
架構優勢:
- 線程安全:確保並行處理的數據一致性
- 超時控制:防止單一線程阻塞整體流程
- 資源平衡:合理分配CPU和內存資源
- 優雅降級:在部分服務不可用時保持基本功能
3.3 深思式處理通道
五階段深度分析:
階段一:深度用户畫像
- 人口統計特徵分析
- 財務狀況全面評估
- 風險偏好精確測量
- 投資目標明確化
階段二:市場環境分析
- 宏觀經濟週期判斷
- 行業發展趨勢評估
- 市場估值水平分析
- 政策環境影響考量
階段三:多策略生成
- 風險收益特徵建模
- 資產配置方案設計
- 投資組合優化
- 備選策略比較
階段四:風險評估優化
- 風險承受能力匹配
- 壓力測試情景分析
- 風險控制措施制定
- 應急預案准備
階段五:個性化建議
- 完整投資建議書生成
- 實施計劃詳細制定
- 後續跟蹤方案設計
- 風險提示完整披露
七、示例:混合式智能投資顧問系統
1. 系統概述
本系統實現了智能投資顧問的混合式架構,通過智能路由機制在反應式快速響應,與深思式深度分析之間動態選擇最優處理路徑。
核心特性:
- 1. 智能路由決策 - 基於查詢複雜度自動選擇處理通道
- 2. 多通道並行處理 - 支持反應式、混合式、深思式三種處理模式
- 3. 實時狀態監控 - 可視化展示系統運行狀態
- 4. 用户畫像學習 - 基於歷史交互持續優化服務
2. 代碼分解
2.1 混合式智能投資顧問主類
class HybridInvestmentAdvisor:
"""
職責:協調整個系統的運行,包括路由決策、多通道處理、狀態管理和可視化
設計模式:Facade模式 + Strategy模式
- Facade:對外提供統一的process_query接口
- Strategy:不同的處理通道採用不同的策略
"""
2.2 初始化混合式顧問系統
def __init__(self):
# 初始化各處理通道
self.reactive_advisor = ReactiveAdvisor() # 反應式處理組件
self.deliberative_advisor = DeliberativeAdvisor() # 深思式處理組件
self.router = IntelligentRouter() # 智能路由引擎
# 數據存儲結構
self.user_profiles = {} # 用户畫像:{user_id: profile_data}
self.visualization_data = [] # 可視化性能數據
self.animation_frames = [] # 動畫幀序列
# 系統當前狀態(用於實時監控)
self.current_status = {
'step': '等待輸入', # 當前主步驟名稱
'progress': 0, # 進度百分比(0-100)
'current_architecture': None, # 當前使用的架構類型
'current_user': None, # 當前處理的用户ID
'current_query': None, # 當前處理的查詢內容
'sub_steps': [] # 當前步驟的子步驟列表
}
組件説明:
- - reactive_advisor: 處理簡單快速查詢
- - deliberative_advisor: 處理複雜深度分析
- - router: 智能路由決策引擎
- - user_profiles: 用户畫像存儲
- - visualization_data: 性能監控數據
- - current_status: 當前系統狀態(用於可視化)
- - animation_frames: 動畫幀序列(用於GIF生成)
2.3 處理用户查詢的主入口方法
def process_query(self, user_id: str, user_input: str) -> Dict[str, Any]:
# 階段1: 初始化狀態 - 設置開始處理狀態
self._update_status('開始處理', 0, user_id, user_input, [])
self._capture_frame() # 捕獲初始狀態幀
start_time = time.time() # 開始計時
# 階段2: 智能路由分析 - 決定使用哪種處理通道
self._update_status('路由分析', 20, user_id, user_input,
['接收用户輸入', '分析查詢複雜度'])
self._capture_frame() # 捕獲路由分析幀
print(f"\n 開始處理用户 {user_id} 的查詢: {user_input}")
# 調用路由引擎分析查詢複雜度並選擇處理架構
route_info = self.router.analyze_route(user_input, self.user_profiles.get(user_id, {}))
print(f" 路由決策: {route_info['architecture']} (複雜度: {route_info['complexity_score']:.2f})")
# 階段3: 基於路由決策執行相應處理
self._update_status('執行處理', 50, user_id, user_input,
['路由分析完成', f'選擇{route_info["architecture"]}模式', '開始處理'])
self._capture_frame() # 捕獲處理開始幀
# 根據路由結果選擇處理通道
if route_info['architecture'] == 'reactive':
result = self._reactive_processing(user_id, user_input)
elif route_info['architecture'] == 'deliberative':
result = self._deliberative_processing(user_id, user_input)
else: # hybrid
result = self._hybrid_processing(user_id, user_input)
# 階段4: 結果處理和輸出
self._update_status('生成結果', 80, user_id, user_input,
['處理完成', '生成最終建議', '格式化輸出'])
self._capture_frame() # 捕獲結果生成幀
# 計算處理時間並添加到結果中
processing_time = time.time() - start_time
result['processing_time'] = f"{processing_time:.2f}秒"
result['architecture_used'] = route_info['architecture']
# 更新用户畫像和性能數據
self._update_user_profile(user_id, user_input, result, route_info)
# 記錄本次處理的性能數據
self.visualization_data.append({
'timestamp': datetime.now(),
'user_id': user_id,
'architecture': route_info['architecture'],
'complexity': route_info['complexity_score'],
'processing_time': processing_time,
'input_length': len(user_input)
})
# 階段5: 完成處理
self._update_status('完成', 100, user_id, user_input, ['所有步驟完成', '準備下一個查詢'])
self._capture_frame() # 捕獲完成幀
return result
執行流程:
- 1. 狀態初始化 → 2. 路由分析 → 3. 通道處理 → 4. 結果生成 → 5. 數據更新
輸入參數:
- - user_id: 用户標識,用於個性化服務和畫像更新
- - user_input: 用户輸入的查詢文本
返回結果:
- - 包含建議內容、處理時間、使用架構的字典
2.4 反應式處理通道
def _reactive_processing(self, user_id: str, user_input: str) -> Dict[str, Any]:
# 設置當前架構類型(用於可視化高亮)
self.current_status['current_architecture'] = 'reactive'
# 定義反應式處理的步驟序列
steps = ['調用Qwen API', '快速意圖分析', '規則匹配', '生成快速建議']
# 逐步執行每個步驟並捕獲動畫幀
for i, step in enumerate(steps):
self._update_sub_steps(steps[:i+1]) # 更新當前完成的子步驟
self._capture_frame() # 捕獲每個步驟的幀
time.sleep(0.3) # 模擬處理時間(實際系統中為真實處理)
print(" 執行反應式處理...")
# 調用反應式顧問處理查詢
result = self.reactive_advisor.process(user_input)
# 標記處理完成
self._update_sub_steps(['反應式處理完成'])
self._capture_frame()
return result
- 特點:快速響應,適合簡單查詢
- 處理時間:0.5-2秒
- 適用場景:基金查詢、風險評估、簡單問答
2.5 混合式處理通道
def _hybrid_processing(self, user_id: str, user_input: str) -> Dict[str, Any]:
self.current_status['current_architecture'] = 'hybrid'
# 混合式處理的步驟序列
steps = ['啓動反應式線程', '啓動深思式線程', '並行處理中', '結果融合']
for i, step in enumerate(steps):
self._update_sub_steps(steps[:i+1])
self._capture_frame()
time.sleep(0.3)
print(" 執行混合式並行處理...")
def reactive_task():
"""反應式處理線程任務"""
self._update_sub_steps(['反應式線程運行中'])
self._capture_frame()
return self.reactive_advisor.process(user_input)
def deliberative_task():
"""深思式快速分析線程任務"""
self._update_sub_steps(['深思式線程運行中'])
self._capture_frame()
return self.deliberative_advisor.quick_analysis(user_input)
# 使用線程池並行執行兩個任務
with ThreadPoolExecutor(max_workers=2) as executor:
# 提交任務到線程池
reactive_future = executor.submit(reactive_task)
deliberative_future = executor.submit(deliberative_task)
# 獲取處理結果(設置超時防止阻塞)
reactive_result = reactive_future.result(timeout=3.0)
deliberative_result = deliberative_future.result(timeout=8.0)
# 融合兩個通道的結果
self._update_sub_steps(['結果融合中'])
self._capture_frame()
result = self._fuse_results(reactive_result, deliberative_result)
self._update_sub_steps(['混合處理完成'])
self._capture_frame()
return result
- 特點:並行處理 + 結果融合,平衡速度與深度
- 處理時間:2-8秒
- 適用場景:中等複雜度諮詢、資產配置
2.6 融合反應式和深思式處理結果
def _fuse_results(self, reactive_result: Dict, deliberative_result: Dict) -> Dict[str, Any]:
return {
'type': 'hybrid_advice',
'quick_advice': reactive_result.get('advice', ''), # 反應式的快速建議
'detailed_analysis': deliberative_result.get('analysis', ''), # 深思式的深度分析
'recommended_strategy': deliberative_result.get('strategy', {}), # 投資策略
'risk_assessment': deliberative_result.get('risk', {}), # 風險評估
'fusion_note': '本建議融合了快速響應和深度分析的優點' # 融合説明
}
- 策略:取反應式的快速建議 + 深思式的深度分析
- 形成既有速度又有深度的綜合建議
2.7 更新用户畫像
def _update_user_profile(self, user_id: str, user_input: str, result: Dict, route_info: Dict):
# 初始化新用户畫像
if user_id not in self.user_profiles:
self.user_profiles[user_id] = {
'query_count': 0, # 查詢次數
'avg_complexity': 0, # 平均複雜度
'preferred_architecture': {}, # 偏好架構統計
'last_interaction': None # 最後交互時間
}
profile = self.user_profiles[user_id]
profile['query_count'] += 1
profile['last_interaction'] = datetime.now()
# 更新平均複雜度(滑動平均)
old_avg = profile['avg_complexity']
profile['avg_complexity'] = (
(old_avg * (profile['query_count'] - 1) + route_info['complexity_score'])
/ profile['query_count']
)
# 更新架構使用統計
arch = route_info['architecture']
if arch not in profile['preferred_architecture']:
profile['preferred_architecture'][arch] = 0
profile['preferred_architecture'][arch] += 1
- 基於本次交互更新用户特徵,用於後續個性化服務優化
2.8 更新系統當前狀態
def _update_status(self, step: str, progress: int, user_id: str, query: str, sub_steps: List[str]):
self.current_status = {
'step': step,
'progress': progress,
'current_architecture': self.current_status.get('current_architecture'),
'current_user': user_id,
'current_query': query,
'sub_steps': sub_steps
}
- 用於實時監控和可視化顯示
2.9 更新子步驟列表
def _update_sub_steps(self, sub_steps: List[str]):
self.current_status['sub_steps'] = sub_steps
- 用於顯示當前正在執行的具體操作
2.10 捕獲當前系統狀態為動畫幀
def _capture_frame(self):
# 創建當前狀態的可視化圖像
fig = self._create_visualization_frame()
# 將圖像保存到內存緩衝區
buf = io.BytesIO()
fig.savefig(buf, format='png', dpi=100, bbox_inches='tight')
buf.seek(0)
# 將圖像幀添加到序列中
self.animation_frames.append(Image.open(buf))
plt.close(fig) # 關閉圖像釋放內存
- 將當前可視化狀態保存為圖像幀,用於生成GIF動畫
2.11 創建系統狀態可視化幀
def _create_visualization_frame(self):
# 創建3x2的子圖佈局
fig, ((ax1, ax2), (ax3, ax4), (ax5, ax6)) = plt.subplots(3, 2, figsize=(16, 12))
fig.suptitle('混合式智能投資顧問 - 實時運行監控', fontsize=16, fontweight='bold')
# 隱藏最後一個子圖(保持佈局對稱)
ax6.axis('off')
# 繪製各個監控面板
self._draw_architecture_overview(ax1) # 架構概覽面板
self._draw_processing_flow(ax2) # 處理流程面板
self._draw_current_status(ax3) # 當前狀態面板
self._draw_performance_metrics(ax4) # 性能指標面板
self._draw_qwen_usage(ax5) # Qwen使用情況面板
plt.tight_layout()
return fig
生成包含5個監控面板的完整可視化界面:
- 1. 架構概覽 2. 處理流程 3. 當前狀態 4. 性能指標 5. Qwen使用情況
2.12 繪製架構概覽面板
def _draw_architecture_overview(self, ax):
ax.set_title(' 混合架構概覽', fontweight='bold', fontsize=12)
# 定義三種處理通道的顯示信息
modules = [
('反應式\n0.5-2秒', 0.2, 0.7, '#4CAF50', 'reactive'),
('混合式\n2-8秒', 0.5, 0.7, '#2196F3', 'hybrid'),
('深思式\n8-15秒', 0.8, 0.7, '#FF9800', 'deliberative')
]
current_arch = self.current_status.get('current_architecture')
# 繪製每個架構模塊
for name, x, y, color, arch_type in modules:
# 當前使用的架構高亮顯示
if arch_type == current_arch:
edge_color = 'red'
edge_width = 3
alpha = 1.0
else:
edge_color = 'black'
edge_width = 1
alpha = 0.7
# 繪製架構圓圈
circle = Circle((x, y), 0.08, fill=True, color=color,
alpha=alpha, edgecolor=edge_color, linewidth=edge_width)
ax.add_patch(circle)
ax.text(x, y, name, ha='center', va='center',
fontsize=9, fontweight='bold', color='white')
# 繪製連接線
ax.plot([0.3, 0.45], [0.7, 0.7], 'k-', alpha=0.5, linewidth=2)
ax.plot([0.55, 0.7], [0.7, 0.7], 'k-', alpha=0.5, linewidth=2)
# 繪製路由引擎
router_circle = Circle((0.5, 0.3), 0.1, fill=True, color='lightgray',
alpha=0.8, edgecolor='blue', linewidth=2)
ax.add_patch(router_circle)
ax.text(0.5, 0.3, '智能路由\n引擎', ha='center', va='center',
fontsize=9, fontweight='bold')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
- 顯示三種處理通道和智能路由引擎的關係
- 當前使用的架構會高亮顯示
2.13 繪製處理流程面板
def _draw_processing_flow(self, ax):
ax.set_title(' 實時處理流程', fontweight='bold', fontsize=12)
# 定義處理流程的各個階段
stages = [
(0.1, 0.8, '用户輸入', '#E3F2FD', 0),
(0.3, 0.8, '路由分析', '#BBDEFB', 20),
(0.5, 0.8, '執行處理', '#90CAF9', 50),
(0.7, 0.8, '生成結果', '#64B5F6', 80),
(0.9, 0.8, '完成輸出', '#42A5F5', 100)
]
current_progress = self.current_status.get('progress', 0)
# 繪製每個處理階段
for i, (x, y, name, color, stage_progress) in enumerate(stages):
# 已完成或正在進行的階段高亮顯示
if current_progress >= stage_progress:
fill_color = color
text_color = 'black'
alpha = 1.0
else:
fill_color = 'lightgray'
text_color = 'gray'
alpha = 0.5
# 繪製階段方塊
rect = FancyBboxPatch((x-0.08, y-0.05), 0.16, 0.1,
boxstyle="round,pad=0.02",
facecolor=fill_color, edgecolor='black', alpha=alpha)
ax.add_patch(rect)
ax.text(x, y, name, ha='center', va='center',
fontsize=8, fontweight='bold', color=text_color)
# 繪製階段間的箭頭
if i < len(stages) - 1:
arrow_color = 'green' if current_progress > stage_progress else 'gray'
ax.annotate('', xy=(x+0.1, y), xytext=(x+0.18, y),
arrowprops=dict(arrowstyle='->', lw=2, color=arrow_color))
# 繪製進度條
ax.add_patch(Rectangle((0.1, 0.6), 0.8, 0.05, fill=False, edgecolor='black'))
ax.add_patch(Rectangle((0.1, 0.6), 0.8 * (current_progress/100), 0.05,
facecolor='green', alpha=0.7))
ax.text(0.5, 0.55, f'進度: {current_progress}%', ha='center', va='center',
fontsize=10, fontweight='bold')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
- 顯示當前處理的進度和階段狀態
2.14 繪製當前狀態面板
def _draw_current_status(self, ax):
ax.set_title(' 當前執行狀態', fontweight='bold', fontsize=12)
status = self.current_status
# 顯示當前主步驟
ax.text(0.05, 0.9, f'當前步驟: {status["step"]}', fontsize=11, fontweight='bold',
bbox=dict(boxstyle="round,pad=0.3", facecolor='lightyellow'))
# 顯示用户和架構信息
ax.text(0.05, 0.75, f'用户: {status["current_user"] or "等待中"}', fontsize=9)
ax.text(0.05, 0.70, f'架構: {status["current_architecture"] or "未選擇"}', fontsize=9)
# 顯示當前查詢(截斷長文本)
if status["current_query"]:
query_display = status["current_query"][:25] + "..." if len(status["current_query"]) > 25 else status["current_query"]
ax.text(0.05, 0.65, f'查詢: {query_display}', fontsize=8, style='italic')
# 顯示子步驟執行情況
ax.text(0.05, 0.55, '執行子步驟:', fontsize=10, fontweight='bold')
for i, sub_step in enumerate(status["sub_steps"]):
y_pos = 0.5 - i * 0.07
# 當前正在執行的子步驟用箭頭標識
if i == len(status["sub_steps"]) - 1 and status["progress"] < 100:
ax.text(0.08, y_pos, f' {sub_step}', fontsize=8, color='blue', fontweight='bold')
else:
ax.text(0.08, y_pos, f'✓ {sub_step}', fontsize=8, color='green')
# 顯示Qwen API狀態
ax.text(0.05, 0.2, ' Qwen API狀態: 運行中', fontsize=9,
bbox=dict(boxstyle="round,pad=0.2", facecolor='lightgreen'))
# 顯示更新時間
current_time = datetime.now().strftime('%H:%M:%S')
ax.text(0.7, 0.1, f'更新時間: {current_time}', fontsize=7, style='italic')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
- 顯示詳細的系統運行狀態信息
2.15 繪製性能指標面板
def _draw_performance_metrics(self, ax):
ax.set_title(' 性能指標監控', fontweight='bold', fontsize=11)
# 檢查是否有數據可顯示
if not self.visualization_data:
ax.text(0.5, 0.5, '等待數據...', ha='center', va='center', fontsize=10)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
return
# 提取架構使用數據
data = self.visualization_data
architectures = [d['architecture'] for d in data]
# 定義架構類型和顏色
arch_types = ['reactive', 'hybrid', 'deliberative']
arch_colors = ['#4CAF50', '#2196F3', '#FF9800']
# 統計各架構使用次數
counts = [architectures.count(arch) for arch in arch_types]
# 繪製柱狀圖
bars = ax.bar(arch_types, counts, color=arch_colors, alpha=0.7)
ax.set_ylabel('使用次數')
ax.set_xlabel('架構類型')
# 在柱子上顯示具體數值
for bar, count in zip(bars, counts):
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height + 0.1,
f'{count}', ha='center', va='bottom', fontweight='bold', fontsize=9)
- 顯示各架構使用頻率的統計信息
2.16 繪製Qwen使用情況面板
def _draw_qwen_usage(self, ax):
ax.set_title(' Qwen API 使用情況', fontweight='bold', fontsize=11)
if not self.visualization_data:
ax.text(0.5, 0.5, '等待數據...', ha='center', va='center', fontsize=9)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
return
# 模擬Qwen API調用統計(實際系統中從API監控獲取)
qwen_calls = len(self.visualization_data) * 2 # 假設每次查詢平均2次API調用
# 繪製使用情況餅圖
labels = ['API調用成功', '剩餘額度']
sizes = [min(qwen_calls, 80), max(100 - qwen_calls, 0)]
colors = ['#66BB6A', '#EF5350']
wedges, texts, autotexts = ax.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%',
startangle=90, textprops={'fontsize': 8})
# 設置百分比文本樣式
for autotext in autotexts:
autotext.set_color('white')
autotext.set_fontweight('bold')
ax.set_aspect('equal')
- 顯示API調用情況的餅圖
2.17 保存動畫為GIF文件
def save_animation_gif(self, filename: str = 'hybrid_advisor_animation.gif'):
if not self.animation_frames:
print(" 沒有可用的動畫幀")
return
print(f" 正在生成GIF動畫,共{len(self.animation_frames)}幀...")
# 使用PIL庫保存GIF動畫
self.animation_frames[0].save(
filename,
save_all=True,
append_images=self.animation_frames[1:],
duration=500, # 每幀顯示500毫秒
loop=0, # 無限循環
optimize=True # 優化文件大小
)
print(f" GIF動畫已保存為: {filename}")
return filename
- 將捕獲的動畫幀序列保存為GIF動畫,用於演示系統運行流程
2.18 智能路由引擎
class IntelligentRouter:
def __init__(self):
# 設置複雜度閾值
self.complexity_thresholds = {'reactive': 0.3, 'hybrid': 0.7, 'deliberative': 0.3}
def analyze_route(self, user_input: str, user_profile: Dict) -> Dict[str, Any]:
"""
分析查詢並選擇處理通道
====================
基於查詢複雜度和用户價值做出路由決策
"""
# 計算查詢複雜度分數
complexity_score = self._assess_complexity(user_input)
# 評估用户價值
user_value_score = self._assess_user_value(user_profile)
# 路由決策邏輯
if complexity_score <= self.complexity_thresholds['reactive']:
architecture = 'reactive' # 簡單查詢使用反應式
elif complexity_score >= self.complexity_thresholds['deliberative'] and user_value_score > 0.5:
architecture = 'deliberative' # 複雜查詢且高價值用户使用深思式
else:
architecture = 'hybrid' # 其他情況使用混合式
return {
'architecture': architecture,
'complexity_score': complexity_score,
'user_value_score': user_value_score
}
def _assess_complexity(self, user_input: str) -> float:
"""
評估查詢複雜度
============
基於多個維度計算查詢的複雜程度
"""
factors = {
'length': min(len(user_input) / 100, 1.0), # 文本長度
'financial_terms': min(self._count_financial_terms(user_input) / 5, 1.0), # 金融術語
'numbers': min(len(re.findall(r'\d+', user_input)) / 3, 1.0), # 數字數量
}
# 返回平均複雜度分數
return sum(factors.values()) / len(factors)
def _count_financial_terms(self, text: str) -> int:
"""統計文本中的金融術語數量"""
terms = ['基金', '股票', '債券', '投資', '收益', '風險', '配置', '組合', '資產', '理財']
return sum(1 for term in terms if term in text)
def _assess_user_value(self, user_profile: Dict) -> float:
"""評估用户價值(基於歷史交互)"""
if not user_profile:
return 0.5 # 新用户默認價值
query_count = user_profile.get('query_count', 0)
return min(query_count / 10, 1.0) # 基於查詢次數計算價值
職責:分析用户查詢的複雜度,智能選擇最優處理通道
路由策略:
- - 簡單查詢(≤0.3) → 反應式通道
- - 複雜查詢(≥0.7) + 高價值用户 → 深思式通道
- - 其他情況 → 混合式通道
2.19 反應式投資顧問
class ReactiveAdvisor:
def process(self, user_input: str) -> Dict[str, Any]:
"""處理簡單查詢,快速返回建議"""
time.sleep(0.5) # 模擬處理時間
advice = "基於快速分析,建議您根據個人風險承受能力進行多元化資產配置。"
return {
'type': 'reactive_advice',
'advice': advice,
'response_time': '快速響應'
}
- 特點:快速響應,適合簡單查詢
- 處理時間:0.5-2秒
2.20 深思式投資顧問
class DeliberativeAdvisor:
def process(self, user_input: str) -> Dict[str, Any]:
"""深度處理複雜查詢"""
time.sleep(1) # 模擬深度分析時間
analysis = "經過深度分析,我們建議採用平衡型投資策略,重點關注長期價值投資。"
return {
'type': 'deliberative_advice',
'analysis': analysis,
'strategy': {
'name': '個性化資產配置策略',
'allocation': {'股票': '40%', '債券': '35%', '基金': '20%', '現金': '5%'}
}
}
def quick_analysis(self, user_input: str) -> Dict[str, Any]:
"""快速深度分析(混合模式使用)"""
time.sleep(0.8) # 較快的深度分析
return {
'type': 'quick_deliberative',
'analysis': "快速深度分析結果:基於關鍵因素評估的優化建議",
'strategy': {
'name': '優化配置策略',
'allocation': {'股票': '45%', '債券': '30%', '基金': '20%', '現金': '5%'}
}
}
- 特點:深度分析,適合複雜規劃
- 處理時間:8-15秒
2.21 創建混合式顧問實例
def create_detailed_animation():
print("=== 混合式智能投資顧問 ===")
print("正在創建詳細的執行流程動畫...\n")
# 創建混合式顧問實例
advisor = HybridInvestmentAdvisor()
# 測試用例 - 覆蓋所有處理通道
test_cases = [
("基金風險大嗎?", "user_001"), # 簡單查詢 → 反應式
("有20萬怎麼投資?", "user_002"), # 中等查詢 → 混合式
("我35歲,年收入50萬,如何規劃退休投資?", "user_003"), # 複雜查詢 → 深思式
]
print(" 開始執行測試用例...")
# 執行所有測試用例
for i, (query, user_id) in enumerate(test_cases, 1):
print(f"\n 執行測試案例 {i}: {query}")
print("-" * 50)
# 處理查詢並捕獲動畫幀
result = advisor.process_query(user_id, query)
print(f" 處理完成!")
print(f" 使用架構: {result['architecture_used']}")
print(f" 處理時間: {result['processing_time']}")
# 添加結果展示幀
for _ in range(2):
advisor._capture_frame()
print(f"\n 收集到 {len(advisor.animation_frames)} 個動畫幀")
# 顯示執行統計信息
print("\n執行統計:")
print("=" * 40)
for user_id, profile in advisor.user_profiles.items():
print(f"用户 {user_id}:")
print(f" 查詢次數: {profile['query_count']}")
print(f" 平均複雜度: {profile['avg_complexity']:.2f}")
print(f" 架構偏好: {profile['preferred_architecture']}")
return gif_filename
2.22 主程序入口
if __name__ == "__main__":
create_detailed_animation()
3. 輸出結果
開始處理用户 user_001 的查詢: 簡單查詢:基金風險大嗎?
路由決策: reactive (複雜度: 0.13)
執行反應式處理...
處理完成!
使用架構: reactive
處理時間: 7.23秒
執行測試案例 2: 中等查詢:有20萬怎麼投資?
--------------------------------------------------
開始處理用户 user_002 的查詢: 中等查詢:有20萬怎麼投資?
路由決策: reactive (複雜度: 0.17)
執行反應式處理...
處理完成!
使用架構: reactive
處理時間: 7.58秒
執行測試案例 3: 複雜查詢:我35歲,年收入50萬,如何規劃退休投資?
--------------------------------------------------
開始處理用户 user_003 的查詢: 複雜查詢:我35歲,年收入50萬,如何規劃退休投資?
路由決策: hybrid (複雜度: 0.45)
執行混合式並行處理...
=== 混合式智能投資顧問生成 ===
正在創建詳細的執行流程動畫...
開始執行測試用例...
執行測試案例 1: 基金風險大嗎?
--------------------------------------------------
開始處理用户 user_001 的查詢: 基金風險大嗎?
路由決策: reactive (複雜度: 0.16)
執行反應式處理...
處理完成!
使用架構: reactive
處理時間: 5.95秒
執行測試案例 2: 有20萬怎麼投資?
--------------------------------------------------
開始處理用户 user_002 的查詢: 有20萬怎麼投資?
路由決策: reactive (複雜度: 0.21)
執行反應式處理...
處理完成!
使用架構: reactive
處理時間: 6.58秒
執行測試案例 3: 我35歲,年收入50萬,如何規劃退休投資?
--------------------------------------------------
開始處理用户 user_003 的查詢: 我35歲,年收入50萬,如何規劃退休投資?
路由決策: hybrid (複雜度: 0.36)
執行混合式並行處理...
處理完成!
使用架構: hybrid
處理時間: 8.09秒
執行統計:
========================================
用户 user_001:
查詢次數: 1
平均複雜度: 0.16
架構偏好: {'reactive': 1}
用户 user_002:
查詢次數: 1
平均複雜度: 0.21
架構偏好: {'reactive': 1}
用户 user_003:
查詢次數: 1
平均複雜度: 0.36
架構偏好: {'hybrid': 1}
生成完成!
八、總結
混合式架構不是簡單的技術堆砌,而是基於深刻業務理解的架構創新,開創性的智能路由機制,在速度與深度間找到平衡點,智能分配計算資源,避免不必要的深度處理,用户體驗升級,個性化提升,使用便捷。
混合式架構代表了智能投顧技術發展的成熟階段,它告訴我們:最好的技術方案不是非此即彼的選擇,而是恰到好處的融合。在這種架構下,我們既能夠給用户閃電般的響應速度,又能夠提供專家級的深度分析,真正實現了智能投資顧問的終極目標,讓專業的投資建議觸手可及。