动态

详情 返回 返回

Saga分佈式事務框架執行邏輯

Saga分佈式事務框架執行邏輯

📋 目錄

  • 框架概述
  • 核心組件架構
  • 數據庫表設計
  • 完整執行流程
  • 節點發現與調用機制
  • 精簡補償策略設計
  • 總結

框架概述

這是一個基於數據庫驅動的Saga分佈式事務框架,專門用於解決跨服務間數據同步的一致性問題。框架採用了混合編排模式,結合了集中式任務分解和分佈式執行的優勢。

核心設計理念

  • 🎯 分層解耦: 任務分解與任務執行完全分離
  • 🌐 節點自治: 消費端節點獨立執行和管理任務
  • 📊 狀態透明: 完整的執行日誌和狀態追蹤
  • 🔄 容錯恢復: 失敗重試與自動補償機制
  • ⚖️ 負載均衡: 基於節點負載的智能調度

業務場景

  • 空間同步開啓: 跨服務複製空間、頁面、權限等數據
  • 增量數據同步: 已開啓同步的項目進行增量更新
  • 同步關閉清理: 關閉同步時清理相關數據

核心組件架構

``mermaid

graph TB
    subgraph "業務觸發層"
        A1[空間同步開啓] --> B1[業務端拆解步驟]
        A2[增量數據更新] --> B1
        A3[同步關閉清理] --> B1
    end
    
    subgraph "任務分解層"
        B1 --> C1[存儲distribute_event]
        C1 --> C2[存儲distribute_event_step]
        C2 --> C3[HTTP發送步驟數據]
    end
    
    subgraph "消費端接收層"
        C3 --> D1[消費端接收HTTP請求]
        D1 --> D2[存儲distribute_event_step_log]
        D2 --> D3[返回接收確認]
        D3 --> D4[業務端更新狀態為待消費]
    end
    
    subgraph "定時執行層"
        D4 --> E1[定時任務掃描待執行記錄]
        E1 --> E2[2線程併發控制]
        E2 --> E3[執行具體業務邏輯]
        E3 --> E4[HTTP回調通知結果]
        E4 --> E5[業務端更新狀態]
    end

數據庫表設計

📋 核心表結構

1. distribute_event (主事務表)

記錄頂層業務事務的基本信息和整體狀態。

2. distribute_event_step (步驟表)

記錄事務分解後的各個原子步驟信息。

3. distribute_event_step_log (執行日誌表) ✨ 完整設計

記錄消費端節點的執行日誌,實現簡潔而強大的冪等性保證、重試機制和通知狀態管理。

CREATE TABLE distribute_event_step_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主鍵',
    step_code VARCHAR(64) NOT NULL COMMENT '關聯業務端distribute_event_step.code',
    job_code VARCHAR(64) NOT NULL COMMENT '主事務編碼,關聯業務端distribute_event.code',
    consumer_node VARCHAR(50) NOT NULL COMMENT '消費者節點地址',
    
    -- 冪等性保證字段
    execution_key VARCHAR(128) NOT NULL COMMENT '執行唯一鍵: {step_code}_{consumer_node}_{yyyyMMdd}',
    business_key VARCHAR(64) COMMENT '業務唯一鍵,基於業務數據哈希值',
    
    -- 執行狀態管理
    exec_status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '執行狀態: PENDING, EXECUTING, SUCCESS, FAILED, RETRYING',
    retry_count INT DEFAULT 0 COMMENT '當前重試次數',
    max_retry INT DEFAULT 3 COMMENT '最大重試次數',
    
    -- 通知狀態管理(新增)
    notify_status VARCHAR(20) DEFAULT 'NOT_REQUIRED' COMMENT '通知狀態: NOT_REQUIRED, PENDING, SUCCESS, FAILED',
    notify_retry_count INT DEFAULT 0 COMMENT '通知重試次數',
    max_notify_retry INT DEFAULT 3 COMMENT '最大通知重試次數',
    next_notify_time TIMESTAMP NULL COMMENT '下次通知時間',
    notify_url VARCHAR(255) COMMENT '通知回調地址',
    
    -- 執行信息
    payload TEXT COMMENT '執行數據載荷',
    result_data TEXT COMMENT '執行結果數據',
    error_message TEXT COMMENT '執行錯誤信息',
    notify_error_message TEXT COMMENT '通知錯誤信息',
    start_time TIMESTAMP NULL COMMENT '開始執行時間',
    end_time TIMESTAMP NULL COMMENT '結束執行時間',
    next_retry_time TIMESTAMP NULL COMMENT '下次執行重試時間',
    
    -- 回滾支持
    rollback_data TEXT COMMENT '回滾數據快照,JSON格式',
    is_rollback TINYINT(1) DEFAULT 0 COMMENT '是否已回滾',
    
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
    
    -- 冪等性保證索引
    UNIQUE KEY uk_execution_key (execution_key),
    UNIQUE KEY uk_business_key (step_code, business_key),
    
    -- 查詢優化索引
    INDEX idx_exec_status (exec_status),
    INDEX idx_notify_status (notify_status, next_notify_time),
    INDEX idx_step_code (step_code),
    INDEX idx_consumer_node (consumer_node),
    INDEX idx_retry_time (next_retry_time),
    INDEX idx_job_code (job_code)
);

📊 字段設計詳解與使用説明

1. 冪等性保證字段
/**
 * execution_key: 執行唯一鍵
 * 用途: 防止同一步驟在同一節點同一天重複執行
 * 格式: SPACE_SYNC_001_192.168.1.10:8080_20240316
 * 使用場景: 消費端接收HTTP請求時檢查是否已存在相同的execution_key
 */
public String generateExecutionKey(String stepCode, String consumerNode) {
    String dateStr = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
    return String.format("%s_%s_%s", stepCode, consumerNode, dateStr);
}

/**
 * business_key: 業務唯一鍵
 * 用途: 基於業務數據內容的去重,防止相同業務數據重複處理
 * 生成: 對payload業務數據進行MD5哈希
 * 使用場景: 當業務數據完全相同時避免重複執行
 */
public String generateBusinessKey(Object payload) {
    String payloadJson = JSON.toJSONString(payload);
    return DigestUtils.md5DigestAsHex(payloadJson.getBytes()).substring(0, 8);
}
2. 狀態管理字段
/**
 * exec_status: 執行狀態
 * PENDING: 待執行 - 剛接收到任務,等待定時器掃描
 * EXECUTING: 執行中 - 正在執行業務邏輯
 * SUCCESS: 執行成功 - 業務邏輯執行完成
 * FAILED: 執行失敗 - 重試次數耗盡後的最終失敗狀態
 * RETRYING: 重試中 - 執行失敗後等待重試的狀態
 */

/**
 * notify_status: 通知狀態
 * NOT_REQUIRED: 無需通知 - 執行中或失敗時的默認狀態
 * PENDING: 待通知 - 執行成功後需要通知業務端
 * SUCCESS: 通知成功 - 業務端已收到通知並確認
 * FAILED: 通知失敗 - 通知重試次數耗盡後的狀態
 */
3. 重試機制字段
/**
 * retry_count / max_retry: 執行重試控制
 * 用途: 控制業務邏輯執行的重試次數,避免無限重試
 * 邏輯: 失敗時retry_count+1,超過max_retry則標記為FAILED
 */

/**
 * notify_retry_count / max_notify_retry: 通知重試控制
 * 用途: 控制通知業務端的重試次數,確保通知到位
 * 邏輯: 通知失敗時notify_retry_count+1,採用指數退避策略
 */

/**
 * next_retry_time / next_notify_time: 重試時間控制
 * 用途: 指數退避算法的時間調度
 * 計算: delay = Math.pow(2, retryCount) * baseDelaySeconds
 */
4. 數據存儲字段
/**
 * payload: 執行數據載荷
 * 用途: 存儲從業務端接收的步驟執行數據
 * 格式: JSON字符串,包含業務邏輯執行所需的所有參數
 */

/**
 * result_data: 執行結果數據
 * 用途: 存儲業務邏輯執行後的結果,用於通知業務端
 * 格式: JSON字符串,包含執行結果、影響的數據ID等
 */

/**
 * rollback_data: 回滾數據快照
 * 用途: 存儲執行前的原始數據狀態,用於失敗回滾
 * 格式: JSON字符串,包含需要刪除的數據ID列表等
 */

完整執行流程

🎆 整體流程時序圖(數據庫分離)

sequenceDiagram
    participant BIZ as 業務服務A
    participant BIZ_DB as 業務端數據庫
    participant BIZ2 as 業務服務B(消費端)
    participant CONSUMER_DB as 消費端數據庫
    participant TASK as 定時任務
    
    Note over BIZ,TASK: 階段一: 業務觸發與任務分解
    BIZ->>BIZ: 1. 業務觸發(空間同步/增量更新等)
    BIZ->>BIZ: 2. 執行步驟拆解邏輯
    BIZ->>BIZ_DB: 3. 存儲主事務 distribute_event
    BIZ->>BIZ_DB: 4. 存儲子步驟 distribute_event_step
    
    Note over BIZ,TASK: 階段二: HTTP任務分發
    BIZ->>BIZ2: 5. HTTP請求發送步驟數據
    BIZ2->>CONSUMER_DB: 6. 存儲執行日誌 distribute_event_step_log
    BIZ2->>BIZ: 7. 返回接收確認
    BIZ->>BIZ_DB: 8. 更新步驟狀態為'待消費'
    
    Note over BIZ,TASK: 階段三: 定時消費執行(限流2線程)
    TASK->>CONSUMER_DB: 9. 掃描待執行狀態記錄
    TASK->>TASK: 10. 併發控制(最多2線程)
    TASK->>CONSUMER_DB: 11. 更新exec_status為'EXECUTING'
    TASK->>TASK: 12. 執行具體業務邏輯
    
    alt 執行成功
        TASK->>CONSUMER_DB: 13a. 更新exec_status為'SUCCESS'
        TASK->>CONSUMER_DB: 14a. 設置notify_status為'PENDING'
        
        Note over BIZ,TASK: 階段四: 通知業務端(指數退避重試)
        TASK->>BIZ: 15a. HTTP回調通知執行結果
        alt 通知成功
            BIZ->>TASK: 16a. 返回200狀態
            TASK->>CONSUMER_DB: 17a. 更新notify_status為'SUCCESS'
            BIZ->>BIZ_DB: 18a. 更新distribute_event_step狀態
            BIZ->>BIZ_DB: 19a. 更新distribute_event主事務狀態
        else 通知失敗
            BIZ->>TASK: 16b. 返回非200狀態或網絡異常
            TASK->>CONSUMER_DB: 17b. notify_retry_count+1
            alt 通知重試次數 < 3
                TASK->>CONSUMER_DB: 18b. 計算next_notify_time(指數退避)
                Note right of TASK: 等待指數退避時間後重新通知
                TASK->>BIZ: 19b. 重新發送通知(循環至16a)
            else 通知重試次數 >= 3
                TASK->>CONSUMER_DB: 20b. 更新notify_status為'FAILED'
                Note right of TASK: 通知失敗,需人工介入
            end
        end
        
    else 執行失敗
        TASK->>CONSUMER_DB: 13c. 更新retry_count+1
        alt 執行重試次數 < 3
            TASK->>CONSUMER_DB: 14c. 更新exec_status為'RETRYING'
            TASK->>CONSUMER_DB: 15c. 刪除當前記錄
            TASK->>CONSUMER_DB: 16c. 重新插入新記錄(計算next_retry_time)
            Note right of TASK: 等待指數退避時間後重新執行
        else 執行重試次數 >= 3
            TASK->>CONSUMER_DB: 17c. 更新exec_status為'FAILED'
            TASK->>BIZ: 18c. 通知執行最終失敗
            BIZ->>BIZ: 19c. 觸發回滾邏輯(刪除重新開始)
        end
    end

整體執行流程圖

flowchart TD
    A[業務觸發] --> B[拆解步驟存儲]
    B --> C[HTTP發送步驟數據]
    C --> D[消費端存儲日誌]
    D --> E[更新狀態為待消費]
    E --> F[定時任務掃描]
    F --> G[2線程併發執行]
    G --> H{執行結果}
    H -->|成功| I[HTTP回調通知]
    H -->|失敗| J{重試次數}
    J -->|<3次| K[指數退避重試]
    J -->|>=3次| L[觸發回滾刪除]
    I --> M[更新主事務狀態]
    L --> N[通知失敗完成]

節點發現與調用機制

服務節點通過心跳註冊,基於負載權重選擇最優節點執行任務。

@Service
public class DistributeNodeRegistry {
    
    @Scheduled(fixedRate = 30000) // 每30秒心跳
    public void heartbeat() {
        String nodeAddress = getLocalNodeAddress();
        NodeMetadata metadata = collectNodeMetadata(); // 收集CPU、內存、任務數量等
        
        nodeRegistryMapper.upsertNode(NodeRegistryRecord.builder()
            .serviceName(getServiceName())
            .nodeAddress(nodeAddress)
            .status(1) // 在線
            .lastHeartbeatTime(new Date())
            .metadata(JSON.toJSONString(metadata))
            .build());
    }
    
    // 選擇最優節點(基於負載權重)
    public String selectOptimalNode(String serviceName) {
        List<NodeRegistryRecord> nodes = nodeRegistryMapper.selectAvailableNodes(serviceName);
        return nodes.stream()
            .min(Comparator.comparing(this::calculateNodeLoad))
            .map(NodeRegistryRecord::getNodeAddress)
            .orElseThrow(() -> new NoAvailableNodeException("無可用節點"));
    }
}

🔄 精簡補償策略設計

執行重試與通知重試機制

@Component
public class RetryAndNotificationService {
    
    /**
     * 處理執行失敗 - 重試機制(刪除重新插入)
     */
    public void handleExecutionFailure(DistributeEventStepLog stepLog, String errorMessage) {
        int currentRetry = stepLog.getRetryCount();
        
        if (currentRetry < stepLog.getMaxRetry()) {
            // 還有重試機會 - 刪除重新插入
            executeRetryByReinsert(stepLog, errorMessage);
        } else {
            // 重試次數耗盡,觸發回滾
            triggerRollback(stepLog, errorMessage);
        }
    }
    
    /**
     * 執行重試邏輯:刪除重新插入
     */
    private void executeRetryByReinsert(DistributeEventStepLog stepLog, String errorMessage) {
        try {
            // 1. 更新狀態為 RETRYING
            stepLogMapper.updateStatus(stepLog.getId(), "RETRYING", errorMessage);
            
            // 2. 計算下次重試時間(指數退避)
            long delaySeconds = (long) Math.pow(2, stepLog.getRetryCount() + 1) * 30;
            Timestamp nextRetryTime = new Timestamp(System.currentTimeMillis() + delaySeconds * 1000);
            
            // 3. 刪除當前記錄
            stepLogMapper.deleteById(stepLog.getId());
            
            // 4. 重新插入新記錄
            DistributeEventStepLog newStepLog = DistributeEventStepLog.builder()
                .stepCode(stepLog.getStepCode())
                .jobCode(stepLog.getJobCode())
                .consumerNode(stepLog.getConsumerNode())
                .executionKey(stepLog.getExecutionKey())
                .businessKey(stepLog.getBusinessKey())
                .execStatus("PENDING")
                .retryCount(stepLog.getRetryCount() + 1)
                .maxRetry(stepLog.getMaxRetry())
                .nextRetryTime(nextRetryTime)
                .payload(stepLog.getPayload())
                .rollbackData(stepLog.getRollbackData())
                .build();
                
            stepLogMapper.insert(newStepLog);
            
            log.info("執行重試安排成功,第{}次重試,下次執行時間: {}", 
                newStepLog.getRetryCount(), nextRetryTime);
                
        } catch (Exception e) {
            log.error("執行重試安排失敗: {}", stepLog.getStepCode(), e);
        }
    }
    
    /**
     * 處理通知失敗 - 指數退避重試
     */
    public void handleNotificationFailure(DistributeEventStepLog stepLog, String notifyErrorMessage) {
        int currentNotifyRetry = stepLog.getNotifyRetryCount();
        
        if (currentNotifyRetry < stepLog.getMaxNotifyRetry()) {
            // 還有通知重試機會 - 指數退避
            scheduleNotificationRetry(stepLog, notifyErrorMessage);
        } else {
            // 通知重試次數耗盡
            markNotificationFailed(stepLog, notifyErrorMessage);
        }
    }
    
    /**
     * 安排通知重試(指數退避)
     */
    private void scheduleNotificationRetry(DistributeEventStepLog stepLog, String notifyErrorMessage) {
        try {
            int nextNotifyRetryCount = stepLog.getNotifyRetryCount() + 1;
            
            // 指數退避算法: 2^n * 60秒
            long delaySeconds = (long) Math.pow(2, nextNotifyRetryCount) * 60;
            Timestamp nextNotifyTime = new Timestamp(System.currentTimeMillis() + delaySeconds * 1000);
            
            stepLogMapper.updateNotificationForRetry(
                stepLog.getId(),
                nextNotifyRetryCount,
                nextNotifyTime,
                notifyErrorMessage
            );
            
            log.info("通知重試安排成功,第{}次重試,下次通知時間: {}", 
                nextNotifyRetryCount, nextNotifyTime);
                
        } catch (Exception e) {
            log.error("通知重試安排失敗: {}", stepLog.getStepCode(), e);
        }
    }
    
    /**
     * 回滾機制:刪除重新開始
     */
    private void triggerRollback(DistributeEventStepLog stepLog, String errorMessage) {
        try {
            // 1. 標記為執行失敗
            stepLogMapper.updateStatus(stepLog.getId(), "FAILED", errorMessage);
            
            // 2. 執行回滾操作(刪除相關數據)
            executeRollbackAction(stepLog);
            
            // 3. 標記回滾完成
            stepLogMapper.markAsRollback(stepLog.getId());
            
            // 4. 通知業務端失敗
            notifyBusinessFailure(stepLog);
            
        } catch (Exception e) {
            log.error("回滾執行失敗: {}", stepLog.getStepCode(), e);
            alertService.sendRollbackFailureAlert(stepLog, e);
        }
    }
    
    // 執行具體的回滾操作
    private void executeRollbackAction(DistributeEventStepLog stepLog) {
        String rollbackData = stepLog.getRollbackData();
        if (StringUtils.isBlank(rollbackData)) return;
        
        RollbackSnapshot snapshot = JSON.parseObject(rollbackData, RollbackSnapshot.class);
        
        switch (snapshot.getStepType()) {
            case "PAGE_CREATE":
                pageService.deleteById(snapshot.getEntityId());
                break;
            case "DATA_COPY":
                dataService.deleteBatch(snapshot.getDataIds());
                break;
            case "PERMISSION_GRANT":
                permissionService.revoke(snapshot.getPermissionIds());
                break;
        }
    }
}

總結

主要特點
流程清晰: 業務拆解 → HTTP分發 → 定時消費 → 狀態同步
冪等性簡單: 一天一次執行保證,避免重複處理
重試機制: 最多3次,指數退避,失敗後智能回滾
回滾策略: 刪除重新開始,簡單有效
併發控制: 2線程限流,避免資源爭搶
狀態追蹤: 完整的執行鏈路監控

核心優勢
🎯 設計精簡: 去除複雜的多重冪等性策略,採用基於日期的簡單方案
💡 實用性強: 回滾即刪除,符合業務實際需求
🔧 易於維護: 清晰的代碼結構和執行流程
性能優化: 合理的併發控制和索引設計
🛡️ 可靠性高: 完善的重試和回滾機制

user avatar
0 用户, 点赞了这篇动态!

发布 评论

Some HTML is okay.