博客 / 詳情

返回

收入寫RAFT算法(一)Leader選舉

Raft Leader 選舉實現文檔

目錄

  • 1. 概述
  • 2. 核心概念
  • 3. 涉及的類及其職責
  • 4. 實現細節
    • 4.1 節點狀態與轉換
    • 4.2 選舉超時機制
    • 4.3 投票請求處理
    • 4.4 選舉發起流程
    • 4.5 投票響應處理
    • 4.6 心跳機制
    • 4.7 安全性保證
  • 5. 測試指南
  • 6. 使用示例
  • 7. 常見問題

1. 概述

1.1 目的

本文檔詳細説明了 LingRaft-Lite 模塊中 Raft Leader 選舉功能的實現,包括涉及的類、實現細節、測試方法等,便於開發者理解和復現。

1.2 功能範圍

  • 節點狀態管理(Follower、Candidate、Leader)
  • 選舉超時檢測
  • 投票請求與響應處理
  • 多數派選舉機制
  • 心跳維護 Leader 地位
  • 網絡分區處理

1.3 Raft 算法參考

本實現基於 Raft 論文(Diego Ongaro 和 John Ousterhout, 2014)的 Leader 選舉部分,具體參考:

  • Section 5.1: Leader Election
  • Section 5.2: Leader Election - RequestVote RPC
  • Section 5.4.1: Election Safety Property

2. 核心概念

2.1 節點狀態

Raft 節點有三種狀態:

狀態 説明 職責
FOLLOWER 從節點 響應 Leader 的 RPC 請求(AppendEntries、RequestVote)
CANDIDATE 候選節點 發起選舉,向其他節點請求投票
LEADER 主節點 處理客户端請求,向 Follower 複製日誌,發送心跳

2.2 任期 (Term)

定義

  • 時間被分成多個任期,每個任期以選舉開始
  • 任期號是單調遞增的整數
  • 每次選舉都進入新任期

用途

  • 識別過時的信息(舊任期的投票、心跳等)
  • 防止腦裂(分裂投票)

實現

private volatile long currentTerm = 0;  // 當前任期號

2.3 選舉超時 (Election Timeout)

定義

  • Follower 在收到有效心跳或投票請求之前等待的時間
  • 超時後轉為 Candidate 併發起選舉

隨機化

  • 為了避免多個節點同時超時導致平票選舉,超時時間隨機化
  • 通常在 150ms ~ 300ms 之間

實現

// 配置隨機範圍
config.setElectionTimeoutRandomRange(Range.of(150, 300));

// 計算隨機超時時間
int randomTimeout = raftConfig.getElectionTimeoutMs();

2.4 多數派 (Majority)

定義

  • 超過半數的節點數:N/2 + 1
  • 3 節點集羣需要 2 票
  • 5 節點集羣需要 3 票

重要性

  • 保證選舉結果的唯一性
  • 兩個多數派必然有交集,確保只有一個 Leader

實現

public VoteCounter(long term, int totalNodes) {
    this.majorityCount = totalNodes / 2 + 1;
}

2.5 投票規則

節點投票給候選人的條件

  1. 候選人的任期 >= 當前任期
  2. 如果任期相同,candidate 的日誌至少和當前節點一樣新

日誌比較規則

  • 如果 candidateLastLogTerm > lastLogTerm,投票
  • 如果 candidateLastLogTerm == lastLogTermcandidateLastLogIndex >= lastLogIndex,投票
  • 否則,拒絕投票

3. 涉及的類及其職責

3.1 核心類

類名 路徑 職責
RaftNodeImpl com.ling.raft.core.RaftNodeImpl 節點狀態管理、選舉發起、投票處理、心跳發送
ConsensusModuleImpl com.ling.raft.core.ConsensusModuleImpl 投票請求和響應的具體實現邏輯
VoteCounter com.ling.raft.core.VoteCounter 投票計數器,統計和判斷多數派
ElectionTask com.ling.raft.core.task.ElectionTask 選舉超時檢測任務
HeartbeatTask com.ling.raft.core.task.HeartbeatTask Leader 心跳發送任務
ServerStatusEnum com.ling.raft.enums.ServerStatusEnum 節點狀態枚舉
VoteRequest com.ling.raft.model.dto.VoteRequest 投票請求 RPC
VoteResponse com.ling.raft.model.dto.VoteResponse 投票響應 RPC
ThreeNodeElectionTest com.ling.raft.example.leader.ThreeNodeElectionTest 完整測試程序

3.2 類關係圖

┌─────────────────────┐
│   RaftNodeImpl      │
│   (節點主類)         │
└──────────┬──────────┘
           │ 持有引用
           ├─────────────────┐
           ▼                 ▼
┌─────────────────────┐  ┌─────────────────────┐
│ ConsensusModuleImpl │  │   VoteCounter       │
│ (投票邏輯)          │  │   (投票計數)         │
└─────────────────────┘  └─────────────────────┘
           │                     │
           ├─────────────────────┤
           ▼                     ▼
┌─────────────────────┐  ┌─────────────────────┐
│  ElectionTask       │  │  HeartbeatTask      │
│  (選舉超時檢測)      │  │  (心跳任務)         │
└─────────────────────┘  └─────────────────────┘

3.3 關鍵字段説明

RaftNodeImpl

// 節點狀態
private volatile ServerStatusEnum nodeStatus = ServerStatusEnum.FOLLOWER;

// 持久化狀態
private volatile long currentTerm = 0;           // 當前任期
private volatile String votedFor = null;         // 本輪任期投票給的候選人

// 選舉相關
private ScheduledExecutorService electionExecutor;
private ScheduledFuture<?> electionFuture;
private VoteCounter currentVoteCounter;
private final Random random = new Random();

// 心跳相關
private ScheduledExecutorService heartbeatExecutor;
private ScheduledFuture<?> heartbeatFuture;

// 時間記錄
private volatile long prevElectionTime = 0;      // 上次選舉時間
private volatile long preHeartBeatTime = 0;      // 上次收到心跳時間

ConsensusModuleImpl

public final RaftNodeImpl node;  // 持有 RaftNodeImpl 的引用
public final ReentrantLock voteLock = new ReentrantLock();  // 投票鎖
public final ReentrantLock appendEntriesLock = new ReentrantLock();  // 追加條目鎖

VoteCounter

private final long term;                      // 當前選舉任期
private final Set<String> votesReceived;      // 已投票的節點ID集合
private final int majorityCount;              // 需要獲得的多數派票數
private volatile boolean votedForSelf;        // 是否已投票給自己

4. 實現細節

4.1 節點狀態與轉換

4.1.1 狀態枚舉

類名ServerStatusEnum

定義

public enum ServerStatusEnum {
    LEADER("LEADER", "主節點"),
    CANDIDATE("CANDIDATE", "候選節點"),
    FOLLOWER("FOLLOWER", "從節點");
}

4.1.2 狀態轉換圖

         +-------------------------+
         |         初始化          |
         +-------------------------+
                    |
                    ▼
         +-------------------------+
         |      FOLLOWER          | <------------+
         |  (等待心跳或投票)        |              |
         +-------------------------+              |
                    |                             |
                    | 選舉超時                     | 收到更高任期的
                    |                             | AppendEntries 或
                    ▼                             | RequestVote
         +-------------------------+              |
         |     CANDIDATE          |              |
         |  (發起選舉)             |              |
         +-------------------------+              |
                    |                             |
                    | 獲得多數派                  |
                    |                             |
                    ▼                             |
         +-------------------------+              |
         |      LEADER            | --------------+
         |  (處理客户端請求)        |  發現更高任期
         +-------------------------+

4.1.3 轉為 Follower

方法becomeFollower(newTerm)

實現位置RaftNodeImpl.java:175-196

public void becomeFollower(long newTerm) {
    // 檢查任期
    if (newTerm < currentTerm) {
        log.warn("Cannot become Follower with smaller term: {} < {}",
                 newTerm, currentTerm);
        return;
    }

    ServerStatusEnum oldStatus = nodeStatus;

    // 更新狀態
    nodeStatus = ServerStatusEnum.FOLLOWER;
    currentTerm = newTerm;
    votedFor = null;  // 重置投票記錄
    currentVoteCounter = null;  // 清空投票計數器

    // 停止心跳(如果之前是 Leader)
    cancelHeartbeatTimer();

    // 重置選舉定時器
    resetElectionTimer();

    log.info("State changed: {} -> FOLLOWER, term: {}", oldStatus, currentTerm);
}

調用場景

  1. 節點初始化
  2. 收到更高任期的 AppendEntries
  3. 收到更高任期的 RequestVote
  4. Candidate 收到有效 AppendEntries

4.1.4 轉為 Candidate

方法becomeCandidate()

實現位置RaftNodeImpl.java:201-216

public void becomeCandidate() {
    ServerStatusEnum oldStatus = nodeStatus;

    // 增加任期號(重要!)
    currentTerm++;
    nodeStatus = ServerStatusEnum.CANDIDATE;
    votedFor = currentNodeConfig.getServerId();  // 投票給自己

    log.info("State changed: {} -> CANDIDATE, new term: {}", oldStatus, currentTerm);

    // 重置選舉定時器
    resetElectionTimer();

    // 發起投票請求
    startElection();
}

調用場景

  1. 選舉超時
  2. 作為 Candidate 重新發起選舉(平票後)

4.1.5 轉為 Leader

方法becomeLeader()

實現位置RaftNodeImpl.java:221-243

public void becomeLeader() {
    // 只有 Candidate 才能成為 Leader
    if (nodeStatus != ServerStatusEnum.CANDIDATE) {
        log.warn("Only CANDIDATE can become LEADER, current: {}", nodeStatus);
        return;
    }

    ServerStatusEnum oldStatus = nodeStatus;
    nodeStatus = ServerStatusEnum.LEADER;

    // 初始化 Leader 狀態(nextIndex、matchIndex)
    initializeLeaderState();

    // 取消選舉定時器(Leader 不需要選舉)
    cancelElectionTimer();

    log.info("========================================");
    log.info("State changed: {} -> LEADER, term: {}", oldStatus, currentTerm);
    log.info("========================================");

    // 立即發送心跳並開始心跳定時器
    sendHeartbeats();
    startHeartbeatTimer();
}

調用場景

  1. Candidate 獲得多數派投票
  2. 單機模式直接成為 Leader

4.2 選舉超時機制

4.2.1 選舉超時檢測

類名ElectionTask

實現位置com.ling.raft.core.task.ElectionTask.java

核心邏輯

@Override
public void run() {
    try {
        // Leader 不需要選舉
        if (node.getNodeStatus() == ServerStatusEnum.LEADER) {
            log.debug("Current node is LEADER, skip election");
            return;
        }

        // 檢查是否超時
        long currentTime = System.currentTimeMillis();
        int electionTimeoutMs = node.getRaftConfig().getElectionTimeoutMs();
        long timeElapsed = currentTime - node.getPrevElectionTime();

        if (timeElapsed < electionTimeoutMs) {
            // 未超時,重新設置定時器
            node.resetElectionTimer();
            return;
        }

        // 選舉超時,開始新一輪選舉
        log.info("========================================");
        log.info("ELECTION TIMEOUT DETECTED!");
        log.info("Time elapsed: {}ms, Timeout: {}ms", timeElapsed, electionTimeoutMs);
        log.info("Current term: {}, Status: {}", node.getCurrentTerm(), node.getNodeStatus());
        log.info("Converting to CANDIDATE and starting new election...");
        log.info("========================================");

        node.becomeCandidate();

    } catch (Exception e) {
        log.error("Error in election task", e);
        if (node.getIsRunning().get()) {
            node.resetElectionTimer();
        }
    }
}

特點

  1. 跳過 Leader:Leader 不需要選舉
  2. 嚴格超時檢查:確保真的超時才發起選舉
  3. 日誌詳細:記錄選舉超時的關鍵信息

4.2.2 選舉定時器管理

方法resetElectionTimer()

實現位置RaftNodeImpl.java:455-475

public void resetElectionTimer() {
    if (!isRunning.get()) {
        return;
    }

    // 取消舊的定時任務
    cancelElectionTimer();

    // 計算隨機超時時間
    int randomTimeout = raftConfig.getElectionTimeoutMs();

    // 更新超時時間戳
    prevElectionTime = System.currentTimeMillis();

    // 設置新的定時任務
    electionFuture = electionExecutor.schedule(
        new ElectionTask(this),
        randomTimeout,
        TimeUnit.MILLISECONDS
    );

    log.debug("Election timer reset, timeout: {}ms", randomTimeout);
}

調用時機

  1. 節點初始化為 Follower
  2. 收到有效心跳
  3. 收到投票請求(即使拒絕)
  4. 轉為 Follower(從任何狀態)
  5. 轉為 Candidate(重新開始計時)

4.2.3 超時時間隨機化

配置方式

RaftConfig config = new RaftConfig(currentNode, allNodes);
config.setElectionTimeout(2);  // 基礎倍數
config.setElectionTimeoutRandomRange(Range.of(150, 300));  // 隨機範圍

實現原理

// RaftConfig 內部實現
public int getElectionTimeoutMs() {
    if (electionTimeoutRandomRange == null) {
        return electionTimeout * 1000;
    }

    // 在隨機範圍內選擇一個值
    int min = electionTimeoutRandomRange.getMin();
    int max = electionTimeoutRandomRange.getMax();
    Random random = new Random();
    return min + random.nextInt(max - min + 1);
}

避免平票的原理

  • 3 個節點超時時間分別為:170ms、220ms、280ms
  • node1 先超時發起選舉
  • node2 和 node3 收到投票請求後重置超時時間
  • node1 獲得多數派(自己的票),成為 Leader

4.3 投票請求處理

4.3.1 RequestVote RPC

請求格式VoteRequest

字段説明

public class VoteRequest {
    private long term;          // candidate 的任期號
    private String candidateId; // candidate 的節點 ID
    private long lastLogIndex;  // candidate 最後一條日誌的索引
    private long lastLogTerm;   // candidate 最後一條日誌的任期號
}

響應格式VoteResponse

字段説明

public class VoteResponse {
    private long term;          // 當前任期(用於更新 candidate 的任期)
    private boolean voteGranted; // 是否投票
}

4.3.2 投票邏輯

方法requestVote(VoteRequest voteRequest)

實現位置ConsensusModuleImpl.java:45-90

@Override
public VoteResponse requestVote(VoteRequest voteRequest) {
    voteLock.lock();
    try {
        long currentTerm = node.getCurrentTerm();
        String votedFor = node.getVotedFor();
        String candidateId = voteRequest.getCandidateId();

        log.info("Received vote request from candidate: {}, Term: {}, CurrentTerm: {}, VotedFor: {}",
                candidateId, voteRequest.getTerm(), currentTerm, votedFor);

        // 1. 任期檢查
        if (voteRequest.getTerm() < currentTerm) {
            log.info("Rejected: candidate term {} < current term {}",
                    voteRequest.getTerm(), currentTerm);
            return new VoteResponse(currentTerm, false);
        }

        // 2. 任期更大,更新並轉為 Follower
        if (voteRequest.getTerm() > currentTerm) {
            log.info("Higher term received: {} -> {}, becoming FOLLOWER",
                    currentTerm, voteRequest.getTerm());
            node.becomeFollower(voteRequest.getTerm());
            currentTerm = node.getCurrentTerm();
            votedFor = node.getVotedFor();
        }

        // 3. 檢查是否已投票給其他人
        if (votedFor != null && !votedFor.equals(candidateId)) {
            log.info("Already voted for {}, rejecting {}", votedFor, candidateId);
            return new VoteResponse(currentTerm, false);
        }

        // 4. 檢查日誌是否至少一樣新
        if (isLogUpToDate(voteRequest.getLastLogIndex(), voteRequest.getLastLogTerm())) {
            log.info("Voting for candidate: {}", candidateId);
            node.setVotedFor(candidateId);
            node.setPrevElectionTime(System.currentTimeMillis());  // 重置超時
            return new VoteResponse(currentTerm, true);
        } else {
            log.info("Candidate log not up to date");
            return new VoteResponse(currentTerm, false);
        }
    } finally {
        voteLock.unlock();
    }
}

投票規則詳解

  1. 任期檢查

    • candidate 的任期 < 當前任期 → 拒絕
  2. 任期更新

    • candidate 的任期 > 當前任期 → 更新任期,轉為 Follower
  3. 唯一投票

    • 本輪任期已投票給其他人 → 拒絕
    • 已投票給該 candidate → 接受(冪等性)
  4. 日誌完整性

    • candidate 的日誌 >= 自己的日誌 → 接受
    • 否則 → 拒絕

4.3.3 日誌比較邏輯

方法isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)

實現位置ConsensusModuleImpl.java:337-350

private boolean isLogUpToDate(long candidateLastLogIndex, long candidateLastLogTerm) {
    long lastLogTerm = getLastLogTerm();
    long lastLogIndex = getLastLogIndex();

    // 優先比較任期:candidate 的任期更大 → 更新
    if (candidateLastLogTerm > lastLogTerm) {
        return true;
    }

    // 任期相同,比較索引:candidate 的索引 >= 自己的索引 → 更新
    if (candidateLastLogTerm == lastLogTerm && candidateLastLogIndex >= lastLogIndex) {
        return true;
    }

    // 其他情況 → 不更新
    return false;
}

示例

情況 1: candidate 任期更大
candidate: term=3, index=5
current:  term=2, index=5
→ 投票 (任期更大)

情況 2: 任期相同,索引更大或相等
candidate: term=2, index=5
current:  term=2, index=4
→ 投票 (索引更大)

情況 3: 任期相同,索引更小
candidate: term=2, index=4
current:  term=2, index=5
→ 不投票 (日誌落後)

情況 4: 任期更小
candidate: term=1, index=10
current:  term=2, index=5
→ 不投票 (任期更小)

4.3.4 投票鎖

目的:防止併發投票請求導致狀態不一致

實現

public final ReentrantLock voteLock = new ReentrantLock();

@Override
public VoteResponse requestVote(VoteRequest voteRequest) {
    voteLock.lock();
    try {
        // 投票邏輯
        ...
    } finally {
        voteLock.unlock();
    }
}

保護的資源

  • currentTerm
  • votedFor
  • nodeStatus

4.4 選舉發起流程

4.4.1 開始選舉

方法startElection()

實現位置RaftNodeImpl.java:266-289

private void startElection() {
    int totalNodes = raftConfig.getRaftNodeConfigList().size();
    currentVoteCounter = new VoteCounter(currentTerm, totalNodes);

    // 投票給自己
    currentVoteCounter.voteForSelf(currentNodeConfig.getServerId());

    log.info("Starting election for term: {}, voted for self, votes: {}/{}",
            currentTerm, currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());

    // 單機模式直接成為 Leader
    if (totalNodes == 1) {
        log.info("Single node mode, becoming leader immediately");
        becomeLeader();
        return;
    }

    // 發送投票請求給所有其他節點
    List<RaftNodeConfig> otherNodes = getOtherNodes();
    for (RaftNodeConfig nodeConfig : otherNodes) {
        electionExecutor.execute(() -> sendVoteRequest(nodeConfig));
    }

    // 檢查是否已獲得多數派(可能只有自己一票的情況)
    checkElectionResult();
}

流程

  1. 創建投票計數器
  2. 投票給自己
  3. 單機模式直接成為 Leader
  4. 多機模式併發發送投票請求
  5. 檢查選舉結果

4.4.2 發送投票請求

方法sendVoteRequest(targetNode)

實現位置RaftNodeImpl.java:294-316

private void sendVoteRequest(RaftNodeConfig targetNode) {
    try {
        // 構建 VoteRequest
        VoteRequest request = VoteRequest.builder()
                .term(currentTerm)
                .candidateId(currentNodeConfig.getServerId())
                .lastLogIndex(getLastLogIndex())
                .lastLogTerm(getLastLogTerm())
                .build();
        request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
        request.setCmd(Request.REQUEST_VOTE);

        log.debug("Sending VoteRequest to {} for term {}", targetNode.getServerId(), currentTerm);

        // 發送 RPC 請求
        VoteResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);

        // 處理響應
        if (response != null) {
            handleVoteResponse(response, targetNode.getServerId());
        }
    } catch (Exception e) {
        log.debug("Failed to send vote request to {}: {}", targetNode.getServerId(), e.getMessage());
    }
}

特點

  1. 併發發送到所有其他節點
  2. 使用線程池異步發送
  3. 超時設置為 3000ms
  4. 失敗不重試(等待下一次選舉)

4.4.3 投票計數器

類名VoteCounter

實現位置com.ling.raft.core.VoteCounter.java

核心方法

// 記錄投票
public synchronized boolean recordVote(String nodeId) {
    return votesReceived.add(nodeId);
}

// 投票給自己
public synchronized void voteForSelf(String selfId) {
    if (!votedForSelf) {
        votesReceived.add(selfId);
        votedForSelf = true;
    }
}

// 檢查是否獲得多數派
public boolean hasMajority() {
    return votesReceived.size() >= majorityCount;
}

// 獲取當前票數
public int getVoteCount() {
    return votesReceived.size();
}

數據結構

  • 使用 ConcurrentHashMap.newKeySet() 存儲投票節點 ID
  • 保證線程安全
  • 自動去重(不會重複計票)

4.5 投票響應處理

4.5.1 處理投票響應

方法handleVoteResponse(response, voterId)

實現位置RaftNodeImpl.java:322-361

private void handleVoteResponse(VoteResponse response, String voterId) {
    // 使用同步塊確保原子性
    synchronized (this) {
        // 如果不是 Candidate,忽略
        if (nodeStatus != ServerStatusEnum.CANDIDATE) {
            log.debug("Not a candidate anymore (status: {}), ignoring vote from {}",
                    nodeStatus, voterId);
            return;
        }

        // 如果收到更高任期,轉為 Follower
        if (response.getTerm() > currentTerm) {
            log.info("Received higher term {} from {}, stepping down",
                    response.getTerm(), voterId);
            becomeFollower(response.getTerm());
            return;
        }

        // 忽略舊任期的響應
        if (response.getTerm() < currentTerm) {
            log.debug("Received stale vote response from {} for old term {}",
                    voterId, response.getTerm());
            return;
        }

        // 統計投票
        if (response.isVoteGranted()) {
            boolean isNewVote = currentVoteCounter.recordVote(voterId);
            if (isNewVote) {
                log.info("Received vote from {} for term {}, total votes: {}/{}",
                        voterId, currentTerm, currentVoteCounter.getVoteCount(),
                        currentVoteCounter.getMajorityCount());

                // 檢查選舉結果
                checkElectionResult();
            }
        } else {
            log.debug("Vote denied by {} for term {}", voterId, currentTerm);
        }
    }
}

處理邏輯

  1. 狀態檢查

    • 不再是 Candidate → 忽略
  2. 任期檢查

    • 響應任期 > 當前任期 → 發現更高任期,轉為 Follower
    • 響應任期 < 當前任期 → 忽略舊響應
  3. 投票統計

    • 投票成功 → 記錄投票,檢查是否獲得多數派
    • 投票失敗 → 記錄日誌

4.5.2 檢查選舉結果

方法checkElectionResult()

實現位置RaftNodeImpl.java:367-373

private void checkElectionResult() {
    if (currentVoteCounter != null && currentVoteCounter.hasMajority()) {
        log.info("Majority votes received ({}/{}), becoming LEADER",
                currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());
        becomeLeader();
    }
}

調用時機

  1. 投票給自己後(單機模式)
  2. 收到每個投票響應後
  3. 所有投票請求發送後(初始檢查)

4.6 心跳機制

4.6.1 心跳任務

類名HeartbeatTask

實現位置com.ling.raft.core.task.HeartbeatTask.java

@Override
public void run() {
    try {
        // 只有 Leader 才發送心跳
        if (node.getNodeStatus() != ServerStatusEnum.LEADER) {
            log.debug("Current node is not LEADER, skip heartbeat");
            return;
        }

        log.debug("Sending heartbeats to all nodes, term: {}", node.getCurrentTerm());

        // 發送心跳給所有節點
        node.sendHeartbeats();

    } catch (Exception e) {
        log.error("Error in heartbeat task", e);
    }
}

4.6.2 發送心跳

方法sendHeartbeats()

實現位置RaftNodeImpl.java:407-413

public void sendHeartbeats() {
    List<RaftNodeConfig> otherNodes = getOtherNodes();

    for (RaftNodeConfig nodeConfig : otherNodes) {
        heartbeatExecutor.execute(() -> sendHeartbeat(nodeConfig));
    }
}

4.6.3 單次心跳發送

方法sendHeartbeat(targetNode)

實現位置RaftNodeImpl.java:418-436

private void sendHeartbeat(RaftNodeConfig targetNode) {
    try {
        // 構建心跳請求(entries 為空)
        AppendEntriesRequest request = AppendEntriesRequest.builder()
                .term(currentTerm)
                .leaderId(currentNodeConfig.getServerId())
                .entries(new ArrayList<>())  // 空列表表示心跳
                .build();
        request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
        request.setCmd(Request.APPEND_ENTRIES);

        // 發送請求
        AppendEntriesResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);

        // 處理響應
        if (response != null) {
            handleHeartbeatResponse(response, targetNode.getServerId());
        }
    } catch (Exception e) {
        log.debug("Failed to send heartbeat to {}: {}", targetNode.getServerId(), e.getMessage());
    }
}

心跳特點

  • entries 為空列表
  • 只包含 termleaderId 等元數據
  • 用於維護 Leader 地位,防止 Follower 發起新選舉

4.6.4 心跳定時器

方法startHeartbeatTimer()

實現位置RaftNodeImpl.java:380-391

private void startHeartbeatTimer() {
    int heartbeatInterval = raftConfig.getHeartbeatIntervalMs();

    heartbeatFuture = heartbeatExecutor.scheduleAtFixedRate(
            new HeartbeatTask(this),
            0,  // 立即開始
            heartbeatInterval,  // 間隔
            TimeUnit.MILLISECONDS
    );

    log.debug("Heartbeat timer started, interval: {}ms", heartbeatInterval);
}

配置示例

config.setHeartbeatInterval(1);  // 每 1 秒發送一次心跳

4.6.5 心跳響應處理

方法handleHeartbeatResponse(response, nodeId)

實現位置RaftNodeImpl.java:441-448

private void handleHeartbeatResponse(AppendEntriesResponse response, String nodeId) {
    // 如果響應的任期更大,轉為 Follower
    if (response.getTerm() > currentTerm) {
        log.info("Received higher term {} from {} in heartbeat response, stepping down",
                response.getTerm(), nodeId);
        becomeFollower(response.getTerm());
    }
}

處理邏輯

  • 檢查響應中的任期
  • 發現更高任期 → 立即轉為 Follower
  • 避免網絡分區導致的腦裂

4.7 安全性保證

4.7.1 選舉安全性

目標:任期內最多一個 Leader

實現

  1. 任期單調遞增

    public void becomeCandidate() {
        currentTerm++;  // 每次選舉增加任期
    }
    
  2. 只投一次票

    // ConsensusModuleImpl.requestVote()
    if (votedFor != null && !votedFor.equals(candidateId)) {
        return new VoteResponse(currentTerm, false);
    }
    
  3. 多數派約束

    // VoteCounter
    public boolean hasMajority() {
        return votesReceived.size() >= majorityCount;  // N/2 + 1
    }
    

4.7.2 任期更新規則

規則:發現更高任期 → 更新任期,轉為 Follower

實現位置

  • ConsensusModuleImpl.requestVote() 第 63-68 行
  • ConsensusModuleImpl.appendEntries() 第 128-134 行
  • RaftNodeImpl.handleVoteResponse() 第 333-337 行
  • RaftNodeImpl.handleHeartbeatResponse() 第 443-447 行

示例

// 在 requestVote 中
if (voteRequest.getTerm() > currentTerm) {
    node.becomeFollower(voteRequest.getTerm());
    currentTerm = node.getCurrentTerm();
}

4.7.3 日誌完整性檢查

目的:只投票給日誌至少和自己一樣新的候選人

實現isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)

規則

  1. candidate 任期 > 自己任期 → 投票
  2. 任期相同,candidate 索引 >= 自己索引 → 投票
  3. 否則 → 拒絕

重要性

  • 保證新 Leader 包含所有已提交的日誌
  • 防止日誌丟失或覆蓋

4.7.4 腦裂預防

場景:網絡分區,兩個 Leader 同時存在

預防機制

  1. 多數派約束

    • Leader 需要多數派支持
    • 分區後的少數派無法獲得足夠票數
  2. 心跳超時

    • 少數派 Follower 收不到心跳
    • 選舉超時後發起選舉
    • 多數派選出新 Leader
  3. 任期遞增

    • 新 Leader 使用更高任期
    • 舊 Leader 的心跳被拒絕

示例

初始狀態:5 節點(node1-5),Leader=node1

網絡分區:
- 分區 A: node1, node2 (2 節點)
- 分區 B: node3, node4, node5 (3 節點)

分區 A:
- node1 仍是 Leader
- node2 收不到心跳,超時後轉為 Candidate
- 只有 1 票(自己),無法獲得多數派(需要 3 票)
- 無法選出新 Leader

分區 B:
- node3 超時後發起選舉
- 獲得自己 + node4 + node5 的票(3 票)
- 成為新 Leader(term=2)

網絡恢復後:
- node1 發送心跳(term=1)
- 其他節點拒絕(term=2 > term=1)
- node1 收到更高任期,轉為 Follower

5. 測試指南

5.1 測試程序

文件位置

LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader/ThreeNodeElectionTest.java

運行方式

# 直接運行 main 方法
java -cp <classpath> com.ling.raft.example.leader.ThreeNodeElectionTest

腳本運行

cd LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader
start-cluster.bat

5.2 測試功能

5.2.1 基本測試場景

場景 1:正常選舉

[STEP 1] Starting 3 nodes...
  ✓ node1 started on port 8081
  ✓ node2 started on port 8082
  ✓ node3 started on port 8083
  ✓ All nodes started!

[STEP 2] Waiting for leader election...
[Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
----------------------------------------
  ✓ Leader elected!

場景 2:Leader 故障

raft> kill node1
✓ node1 stopped
! Leader killed, waiting for new election...
[Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)

場景 3:節點恢復

raft> revive node1
✓ node1 revived
✓ Status: FOLLOWER
✓ Election timer: active
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)

5.2.2 交互式命令

命令 説明 示例
status 查看所有節點狀態 status
leader 顯示當前 Leader 信息 leader
kill <node> 模擬節點故障 kill node1
revive <node> 恢復節點 revive node1
log <level> 控制日誌級別 log debug
stop 停止所有節點並退出 stop

5.2.3 日誌級別控制

控制方式

raft> log silent
✓ Log level set to ERROR (silent mode)

raft> log info
✓ Log level set to INFO

raft> log debug
✓ Log level set to DEBUG (verbose mode)

raft> log election
✓ Showing election logs only

raft> log heartbeat
✓ Showing heartbeat logs only

日誌級別説明

  • silent/error - 僅錯誤信息
  • warn - 警告及以上
  • info - 信息及以上(默認)
  • debug - 調試信息(全部日誌)
  • election - 僅選舉相關日誌
  • heartbeat - 僅心跳相關日誌

5.3 預期輸出

正常選舉

╔════════════════════════════════════════════════════════════╗
║          Raft Leader Election Test - 3 Nodes               ║
╚════════════════════════════════════════════════════════════╝

[STEP 1] Starting 3 nodes...
  ✓ node1 started on port 8081
  ✓ node2 started on port 8082
  ✓ node3 started on port 8083
  ✓ All nodes started!

[STEP 2] Waiting for leader election...
[Cluster] node1:F(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
----------------------------------------
  ✓ Leader elected!

┌────────────────────────────────────────────────────────────┐
│                     Cluster Status                         │
├────────────┬──────────────┬─────────┬─────────┬────────────┤
│ Node       │ Status       │ Term    │ Log     │ Voted For  │
├────────────┼──────────────┼─────────┼─────────┼────────────┤
│ node1      │ LEADER       │ 1       │ 0       │ -          │
│ node2      │ FOLLOWER     │ 1       │ 0       │ node1      │
│ node3      │ FOLLOWER     │ 1       │ 0       │ node1      │
└────────────┴──────────────┴─────────┴─────────┴────────────┘

Leader 故障恢復

raft> kill node1
Killing node1...
✓ node1 stopped
! Leader killed, waiting for new election...
[Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)

raft> leader
┌────────────────────────────────────────────────────────────┐
│                      Leader Info                           │
├────────────────────────────────────────────────────────────┤
│  Node ID:   node2                                         │
│  Address:   127.0.0.1:8082                                 │
│  Term:      2                                              │
└────────────────────────────────────────────────────────────┘

raft> revive node1
Reviving node1...
✓ node1 revived
✓ Status: FOLLOWER
✓ Election timer: active
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)

5.4 完整測試流程

步驟 1:啓動並驗證選舉

# 運行測試程序
java com.ling.raft.example.leader.ThreeNodeElectionTest

# 觀察選舉過程
[Cluster] node1:F(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)

# 查看當前狀態
raft> status

步驟 2:驗證心跳

# 等待幾秒,觀察心跳
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)

# 啓用心跳日誌觀察
raft> log heartbeat
✓ Showing heartbeat logs only

步驟 3:模擬 Leader 故障

# 殺死 Leader
raft> kill node1
✓ node1 stopped
! Leader killed, waiting for new election...

# 觀察新選舉
[Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)

步驟 4:恢復舊 Leader

# 恢復節點
raft> revive node1
✓ node1 revived

# 觀察恢復過程
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)

步驟 5:多次故障測試

# 持續故障恢復
raft> kill node2
raft> kill node3
raft> revive node2
raft> revive node3

6. 使用示例

6.1 基本使用

// 1. 創建節點配置
RaftNodeConfig node1 = new RaftNodeConfig("node1", "127.0.0.1", 8081);
RaftNodeConfig node2 = new RaftNodeConfig("node2", "127.0.0.1", 8082);
RaftNodeConfig node3 = new RaftNodeConfig("node3", "127.0.0.1", 8083);
List<RaftNodeConfig> allNodes = Arrays.asList(node1, node2, node3);

// 2. 創建 Raft 配置
RaftConfig config1 = new RaftConfig(node1, allNodes);
config1.setElectionTimeout(2);  // 基礎超時倍數
config1.setElectionTimeoutRandomRange(Range.of(150, 300));  // 隨機範圍
config1.setHeartbeatInterval(1);  // 心跳間隔 1 秒

// 3. 創建 RPC 組件
DefaultRpcServer rpcServer1 = new DefaultRpcServer(node1.getPort(), null);
DefaultRpcClient rpcClient1 = new DefaultRpcClient();

// 4. 創建並初始化 Raft 節點
RaftNodeImpl raftNode1 = new RaftNodeImpl(config1, rpcServer1, rpcClient1);
rpcServer1.setRaftNode(raftNode1);
raftNode1.init();

// 5. 等待選舉
Thread.sleep(2000);

// 6. 檢查節點狀態
if (raftNode1.getNodeStatus() == ServerStatusEnum.LEADER) {
    System.out.println("Node1 is Leader, term: " + raftNode1.getCurrentTerm());
}

6.2 監控選舉狀態

// 創建監控線程
Thread monitor = new Thread(() -> {
    while (true) {
        System.out.printf("Node1: %s(t%d) ",
            raftNode1.getNodeStatus(),
            raftNode1.getCurrentTerm());

        System.out.printf("Node2: %s(t%d) ",
            raftNode2.getNodeStatus(),
            raftNode2.getCurrentTerm());

        System.out.printf("Node3: %s(t%d)\n",
            raftNode3.getNodeStatus(),
            raftNode3.getCurrentTerm());

        Thread.sleep(3000);
    }
});
monitor.setDaemon(true);
monitor.start();

6.3 手動觸發選舉

// 停止 Leader 的心跳定時器
raftNode1.cancelHeartbeatTimer();

// 模擬 Follower 超時
raftNode2.resetElectionTimer();  // 重置超時
// 等待超時後,node2 會自動發起選舉

6.4 查詢投票信息

// 獲取當前投票信息
String votedFor = raftNode1.getVotedFor();
long currentTerm = raftNode1.getCurrentTerm();
ServerStatusEnum status = raftNode1.getNodeStatus();

System.out.println("Node1 - Status: " + status + ", Term: " + currentTerm + ", VotedFor: " + votedFor);

// 如果是 Candidate,查看投票計數器
if (status == ServerStatusEnum.CANDIDATE) {
    VoteCounter counter = raftNode1.getCurrentVoteCounter();
    System.out.println("Votes: " + counter.getVoteCount() + "/" + counter.getMajorityCount());
}

7. 常見問題

7.1 為什麼選舉超時需要隨機化?

原因

  • 如果所有節點使用固定的超時時間,可能同時超時
  • 同時超時的節點會同時發起選舉
  • 導致平票(split vote),需要重新選舉
  • 隨機化可以避免多個節點同時超時

示例

不隨機化(3 個節點都使用 200ms):
t=0ms: 所有節點啓動
t=200ms: 3 個節點同時超時,都轉為 Candidate
t=201ms: 3 個節點都發送投票請求
t=210ms: 每個節點只收到自己的票(1 票)
t=220ms: 選舉超時,重新選舉(平票)

隨機化(3 個節點使用 150-300ms 隨機):
t=0ms: 所有節點啓動
t=170ms: node1 超時,發起選舉
t=171ms: node2 和 node3 收到投票請求,重置超時
t=220ms: node2 超時(新時間)
t=221ms: node1 已經是 Leader,node2 收到心跳,重置超時
t=280ms: node3 超時
t=281ms: node3 收到心跳,重置超時
t=1000ms: 心跳繼續,node1 保持 Leader

代碼實現

// RaftConfig.java
public int getElectionTimeoutMs() {
    if (electionTimeoutRandomRange == null) {
        return electionTimeout * 1000;
    }

    int min = electionTimeoutRandomRange.getMin();
    int max = electionTimeoutRandomRange.getMax();
    Random random = new Random();
    return min + random.nextInt(max - min + 1);
}

7.2 為什麼收到投票請求後要重置超時?

原因

  • 收到投票請求表示至少有一個其他節點是活躍的
  • 重置超時可以減少不必要的選舉
  • 避免頻繁切換狀態

代碼實現

// RaftNodeImpl.java:498-500
@Override
public VoteResponse handleVoteRequest(VoteRequest voteRequest) {
    VoteResponse response = consensus.requestVote(voteRequest);

    // 如果投票給了對方,重置選舉定時器
    if (response.isVoteGranted()) {
        resetElectionTimer();
    }

    return response;
}

7.3 為什麼 Candidate 要增加任期?

原因

  • 避免使用舊任期發起新的選舉
  • 區分不同輪的選舉
  • 保證任期單調遞增

代碼實現

// RaftNodeImpl.java:201-216
public void becomeCandidate() {
    ServerStatusEnum oldStatus = nodeStatus;

    // 增加任期號(重要!)
    currentTerm++;
    nodeStatus = ServerStatusEnum.CANDIDATE;
    votedFor = currentNodeConfig.getServerId();

    log.info("State changed: {} -> CANDIDATE, new term: {}", oldStatus, currentTerm);

    resetElectionTimer();
    startElection();
}

7.4 如何處理網絡分區?

Raft 的保證

  • 舊 Leader 無法獲得多數派,無法提交新日誌
  • 新 Leader 會在多數派分區選舉產生
  • 網絡恢復後,舊 Leader 會轉為 Follower

代碼體現

// 舊 Leader 的心跳被拒絕
private void handleHeartbeatResponse(AppendEntriesResponse response, String nodeId) {
    if (response.getTerm() > currentTerm) {
        log.info("Received higher term {}, stepping down", response.getTerm());
        becomeFollower(response.getTerm());
    }
}

// 舊 Leader 無法獲得多數派
public boolean hasMajority() {
    return votesReceived.size() >= majorityCount;  // N/2 + 1
}

7.5 為什麼心跳間隔通常遠小於選舉超時?

原因

  • 心跳間隔短(如 100ms),選舉超時長(如 200-300ms)
  • 確保 Follower 在超時前收到心跳
  • 避免不必要的選舉

配置示例

config.setHeartbeatInterval(1);  // 1 秒(1000ms)
config.setElectionTimeoutRandomRange(Range.of(150, 300));  // 150-300ms

// 注意:這裏心跳間隔是秒,超時是毫秒
// 實際使用時,心跳間隔應該 < 選舉超時

建議配置

心跳間隔:50ms - 100ms
選舉超時:150ms - 300ms

7.6 如何避免平票(split vote)?

平票場景

3 個節點:
- node1: term=2, votes=[node1]
- node2: term=2, votes=[node2]
- node3: term=2, votes=[node3]

每個節點只有 1 票,無法獲得多數派(需要 2 票)
選舉超時後重新選舉

避免方法

  1. 隨機化超時(已實現)

    • 減少多個節點同時超時的概率
  2. 預投票(Pre-vote)(未實現)

    • 先詢問其他節點是否願意投票
    • 如果多數派同意,再真正發起選舉
  3. 快速重試(未實現)

    • 平票後快速重新選舉
    • 立即開始,不等超時

當前實現

  • 僅依賴超時隨機化
  • 平票後等待超時重試

7.7 為什麼單機模式直接成為 Leader?

原因

  • 單機集羣不需要選舉
  • 只有一個節點,自己就是多數派
  • 提高啓動速度

代碼實現

// RaftNodeImpl.java:274-279
private void startElection() {
    int totalNodes = raftConfig.getRaftNodeConfigList().size();
    currentVoteCounter = new VoteCounter(currentTerm, totalNodes);
    currentVoteCounter.voteForSelf(currentNodeConfig.getServerId());

    // 單機模式直接成為 Leader
    if (totalNodes == 1) {
        log.info("Single node mode, becoming leader immediately");
        becomeLeader();
        return;
    }

    // 多機模式發送投票請求
    ...
}

7.8 如何調優選舉參數?

參數建議

參數 推薦值 説明
心跳間隔 50ms - 100ms 越短越快,但網絡開銷大
選舉超時最小 150ms - 200ms 應該 > 心跳間隔
選舉超時最大 300ms - 400ms 應該是心跳間隔的 3-5 倍
RPC 超時 2000ms - 3000ms 應該 > 選舉超時

調優示例

// 低延遲場景(數據中心內)
config.setHeartbeatInterval(1);      // 1ms
config.setElectionTimeoutRandomRange(Range.of(10, 20));  // 10-20ms

// 高穩定性場景(廣域網)
config.setHeartbeatInterval(100);    // 100ms
config.setElectionTimeoutRandomRange(Range.of(500, 1000));  // 500-1000ms

// 開發調試場景
config.setHeartbeatInterval(1);      // 1秒
config.setElectionTimeoutRandomRange(Range.of(2000, 4000));  // 2-4秒

附錄

A. 術語表

術語 説明
Term 任期號,單調遞增,用於識別 Leader
Election Timeout 選舉超時時間,隨機化避免平票
Heartbeat 心跳,Leader 定期發送維持地位
Majority 多數派,超過半數的節點(N/2 + 1)
Split Vote 平票選舉,沒有節點獲得多數派
Candidate 候選節點,發起選舉的節點
Leader 主節點,處理客户端請求
Follower 從節點,響應 Leader 的請求

B. 參考資料

  1. Raft 論文:Diego Ongaro, John Ousterhout. "In Search of an Understandable Consensus Algorithm." 2014
  2. Raft GitHub:https://github.com/ongardie/raft.github.io
  3. 可視化 Raft:http://thesecretlivesofdata.com/raft/
  4. Raft Scope:https://raft.github.io/raftscope/index.html

C. 相關文件

文件 路徑
RaftNodeImpl com.ling.raft.core.RaftNodeImpl
ConsensusModuleImpl com.ling.raft.core.ConsensusModuleImpl
VoteCounter com.ling.raft.core.VoteCounter
ElectionTask com.ling.raft.core.task.ElectionTask
HeartbeatTask com.ling.raft.core.task.HeartbeatTask
ServerStatusEnum com.ling.raft.enums.ServerStatusEnum
VoteRequest com.ling.raft.model.dto.VoteRequest
VoteResponse com.ling.raft.model.dto.VoteResponse
ThreeNodeElectionTest com.ling.raft.example.leader.ThreeNodeElectionTest

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.