Raft Leader 選舉實現文檔
目錄
- 1. 概述
- 2. 核心概念
- 3. 涉及的類及其職責
- 4. 實現細節
- 4.1 節點狀態與轉換
- 4.2 選舉超時機制
- 4.3 投票請求處理
- 4.4 選舉發起流程
- 4.5 投票響應處理
- 4.6 心跳機制
- 4.7 安全性保證
- 5. 測試指南
- 6. 使用示例
- 7. 常見問題
1. 概述
1.1 目的
本文檔詳細説明了 LingRaft-Lite 模塊中 Raft Leader 選舉功能的實現,包括涉及的類、實現細節、測試方法等,便於開發者理解和復現。
1.2 功能範圍
- 節點狀態管理(Follower、Candidate、Leader)
- 選舉超時檢測
- 投票請求與響應處理
- 多數派選舉機制
- 心跳維護 Leader 地位
- 網絡分區處理
1.3 Raft 算法參考
本實現基於 Raft 論文(Diego Ongaro 和 John Ousterhout, 2014)的 Leader 選舉部分,具體參考:
- Section 5.1: Leader Election
- Section 5.2: Leader Election - RequestVote RPC
- Section 5.4.1: Election Safety Property
2. 核心概念
2.1 節點狀態
Raft 節點有三種狀態:
| 狀態 | 説明 | 職責 |
|---|---|---|
| FOLLOWER | 從節點 | 響應 Leader 的 RPC 請求(AppendEntries、RequestVote) |
| CANDIDATE | 候選節點 | 發起選舉,向其他節點請求投票 |
| LEADER | 主節點 | 處理客户端請求,向 Follower 複製日誌,發送心跳 |
2.2 任期 (Term)
定義:
- 時間被分成多個任期,每個任期以選舉開始
- 任期號是單調遞增的整數
- 每次選舉都進入新任期
用途:
- 識別過時的信息(舊任期的投票、心跳等)
- 防止腦裂(分裂投票)
實現:
private volatile long currentTerm = 0; // 當前任期號
2.3 選舉超時 (Election Timeout)
定義:
- Follower 在收到有效心跳或投票請求之前等待的時間
- 超時後轉為 Candidate 併發起選舉
隨機化:
- 為了避免多個節點同時超時導致平票選舉,超時時間隨機化
- 通常在 150ms ~ 300ms 之間
實現:
// 配置隨機範圍
config.setElectionTimeoutRandomRange(Range.of(150, 300));
// 計算隨機超時時間
int randomTimeout = raftConfig.getElectionTimeoutMs();
2.4 多數派 (Majority)
定義:
- 超過半數的節點數:
N/2 + 1 - 3 節點集羣需要 2 票
- 5 節點集羣需要 3 票
重要性:
- 保證選舉結果的唯一性
- 兩個多數派必然有交集,確保只有一個 Leader
實現:
public VoteCounter(long term, int totalNodes) {
this.majorityCount = totalNodes / 2 + 1;
}
2.5 投票規則
節點投票給候選人的條件:
- 候選人的任期 >= 當前任期
- 如果任期相同,candidate 的日誌至少和當前節點一樣新
日誌比較規則:
- 如果
candidateLastLogTerm > lastLogTerm,投票 - 如果
candidateLastLogTerm == lastLogTerm且candidateLastLogIndex >= lastLogIndex,投票 - 否則,拒絕投票
3. 涉及的類及其職責
3.1 核心類
| 類名 | 路徑 | 職責 |
|---|---|---|
| RaftNodeImpl | com.ling.raft.core.RaftNodeImpl |
節點狀態管理、選舉發起、投票處理、心跳發送 |
| ConsensusModuleImpl | com.ling.raft.core.ConsensusModuleImpl |
投票請求和響應的具體實現邏輯 |
| VoteCounter | com.ling.raft.core.VoteCounter |
投票計數器,統計和判斷多數派 |
| ElectionTask | com.ling.raft.core.task.ElectionTask |
選舉超時檢測任務 |
| HeartbeatTask | com.ling.raft.core.task.HeartbeatTask |
Leader 心跳發送任務 |
| ServerStatusEnum | com.ling.raft.enums.ServerStatusEnum |
節點狀態枚舉 |
| VoteRequest | com.ling.raft.model.dto.VoteRequest |
投票請求 RPC |
| VoteResponse | com.ling.raft.model.dto.VoteResponse |
投票響應 RPC |
| ThreeNodeElectionTest | com.ling.raft.example.leader.ThreeNodeElectionTest |
完整測試程序 |
3.2 類關係圖
┌─────────────────────┐
│ RaftNodeImpl │
│ (節點主類) │
└──────────┬──────────┘
│ 持有引用
├─────────────────┐
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ ConsensusModuleImpl │ │ VoteCounter │
│ (投票邏輯) │ │ (投票計數) │
└─────────────────────┘ └─────────────────────┘
│ │
├─────────────────────┤
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ ElectionTask │ │ HeartbeatTask │
│ (選舉超時檢測) │ │ (心跳任務) │
└─────────────────────┘ └─────────────────────┘
3.3 關鍵字段説明
RaftNodeImpl
// 節點狀態
private volatile ServerStatusEnum nodeStatus = ServerStatusEnum.FOLLOWER;
// 持久化狀態
private volatile long currentTerm = 0; // 當前任期
private volatile String votedFor = null; // 本輪任期投票給的候選人
// 選舉相關
private ScheduledExecutorService electionExecutor;
private ScheduledFuture<?> electionFuture;
private VoteCounter currentVoteCounter;
private final Random random = new Random();
// 心跳相關
private ScheduledExecutorService heartbeatExecutor;
private ScheduledFuture<?> heartbeatFuture;
// 時間記錄
private volatile long prevElectionTime = 0; // 上次選舉時間
private volatile long preHeartBeatTime = 0; // 上次收到心跳時間
ConsensusModuleImpl
public final RaftNodeImpl node; // 持有 RaftNodeImpl 的引用
public final ReentrantLock voteLock = new ReentrantLock(); // 投票鎖
public final ReentrantLock appendEntriesLock = new ReentrantLock(); // 追加條目鎖
VoteCounter
private final long term; // 當前選舉任期
private final Set<String> votesReceived; // 已投票的節點ID集合
private final int majorityCount; // 需要獲得的多數派票數
private volatile boolean votedForSelf; // 是否已投票給自己
4. 實現細節
4.1 節點狀態與轉換
4.1.1 狀態枚舉
類名:ServerStatusEnum
定義:
public enum ServerStatusEnum {
LEADER("LEADER", "主節點"),
CANDIDATE("CANDIDATE", "候選節點"),
FOLLOWER("FOLLOWER", "從節點");
}
4.1.2 狀態轉換圖
+-------------------------+
| 初始化 |
+-------------------------+
|
▼
+-------------------------+
| FOLLOWER | <------------+
| (等待心跳或投票) | |
+-------------------------+ |
| |
| 選舉超時 | 收到更高任期的
| | AppendEntries 或
▼ | RequestVote
+-------------------------+ |
| CANDIDATE | |
| (發起選舉) | |
+-------------------------+ |
| |
| 獲得多數派 |
| |
▼ |
+-------------------------+ |
| LEADER | --------------+
| (處理客户端請求) | 發現更高任期
+-------------------------+
4.1.3 轉為 Follower
方法:becomeFollower(newTerm)
實現位置:RaftNodeImpl.java:175-196
public void becomeFollower(long newTerm) {
// 檢查任期
if (newTerm < currentTerm) {
log.warn("Cannot become Follower with smaller term: {} < {}",
newTerm, currentTerm);
return;
}
ServerStatusEnum oldStatus = nodeStatus;
// 更新狀態
nodeStatus = ServerStatusEnum.FOLLOWER;
currentTerm = newTerm;
votedFor = null; // 重置投票記錄
currentVoteCounter = null; // 清空投票計數器
// 停止心跳(如果之前是 Leader)
cancelHeartbeatTimer();
// 重置選舉定時器
resetElectionTimer();
log.info("State changed: {} -> FOLLOWER, term: {}", oldStatus, currentTerm);
}
調用場景:
- 節點初始化
- 收到更高任期的 AppendEntries
- 收到更高任期的 RequestVote
- Candidate 收到有效 AppendEntries
4.1.4 轉為 Candidate
方法:becomeCandidate()
實現位置:RaftNodeImpl.java:201-216
public void becomeCandidate() {
ServerStatusEnum oldStatus = nodeStatus;
// 增加任期號(重要!)
currentTerm++;
nodeStatus = ServerStatusEnum.CANDIDATE;
votedFor = currentNodeConfig.getServerId(); // 投票給自己
log.info("State changed: {} -> CANDIDATE, new term: {}", oldStatus, currentTerm);
// 重置選舉定時器
resetElectionTimer();
// 發起投票請求
startElection();
}
調用場景:
- 選舉超時
- 作為 Candidate 重新發起選舉(平票後)
4.1.5 轉為 Leader
方法:becomeLeader()
實現位置:RaftNodeImpl.java:221-243
public void becomeLeader() {
// 只有 Candidate 才能成為 Leader
if (nodeStatus != ServerStatusEnum.CANDIDATE) {
log.warn("Only CANDIDATE can become LEADER, current: {}", nodeStatus);
return;
}
ServerStatusEnum oldStatus = nodeStatus;
nodeStatus = ServerStatusEnum.LEADER;
// 初始化 Leader 狀態(nextIndex、matchIndex)
initializeLeaderState();
// 取消選舉定時器(Leader 不需要選舉)
cancelElectionTimer();
log.info("========================================");
log.info("State changed: {} -> LEADER, term: {}", oldStatus, currentTerm);
log.info("========================================");
// 立即發送心跳並開始心跳定時器
sendHeartbeats();
startHeartbeatTimer();
}
調用場景:
- Candidate 獲得多數派投票
- 單機模式直接成為 Leader
4.2 選舉超時機制
4.2.1 選舉超時檢測
類名:ElectionTask
實現位置:com.ling.raft.core.task.ElectionTask.java
核心邏輯:
@Override
public void run() {
try {
// Leader 不需要選舉
if (node.getNodeStatus() == ServerStatusEnum.LEADER) {
log.debug("Current node is LEADER, skip election");
return;
}
// 檢查是否超時
long currentTime = System.currentTimeMillis();
int electionTimeoutMs = node.getRaftConfig().getElectionTimeoutMs();
long timeElapsed = currentTime - node.getPrevElectionTime();
if (timeElapsed < electionTimeoutMs) {
// 未超時,重新設置定時器
node.resetElectionTimer();
return;
}
// 選舉超時,開始新一輪選舉
log.info("========================================");
log.info("ELECTION TIMEOUT DETECTED!");
log.info("Time elapsed: {}ms, Timeout: {}ms", timeElapsed, electionTimeoutMs);
log.info("Current term: {}, Status: {}", node.getCurrentTerm(), node.getNodeStatus());
log.info("Converting to CANDIDATE and starting new election...");
log.info("========================================");
node.becomeCandidate();
} catch (Exception e) {
log.error("Error in election task", e);
if (node.getIsRunning().get()) {
node.resetElectionTimer();
}
}
}
特點:
- 跳過 Leader:Leader 不需要選舉
- 嚴格超時檢查:確保真的超時才發起選舉
- 日誌詳細:記錄選舉超時的關鍵信息
4.2.2 選舉定時器管理
方法:resetElectionTimer()
實現位置:RaftNodeImpl.java:455-475
public void resetElectionTimer() {
if (!isRunning.get()) {
return;
}
// 取消舊的定時任務
cancelElectionTimer();
// 計算隨機超時時間
int randomTimeout = raftConfig.getElectionTimeoutMs();
// 更新超時時間戳
prevElectionTime = System.currentTimeMillis();
// 設置新的定時任務
electionFuture = electionExecutor.schedule(
new ElectionTask(this),
randomTimeout,
TimeUnit.MILLISECONDS
);
log.debug("Election timer reset, timeout: {}ms", randomTimeout);
}
調用時機:
- 節點初始化為 Follower
- 收到有效心跳
- 收到投票請求(即使拒絕)
- 轉為 Follower(從任何狀態)
- 轉為 Candidate(重新開始計時)
4.2.3 超時時間隨機化
配置方式:
RaftConfig config = new RaftConfig(currentNode, allNodes);
config.setElectionTimeout(2); // 基礎倍數
config.setElectionTimeoutRandomRange(Range.of(150, 300)); // 隨機範圍
實現原理:
// RaftConfig 內部實現
public int getElectionTimeoutMs() {
if (electionTimeoutRandomRange == null) {
return electionTimeout * 1000;
}
// 在隨機範圍內選擇一個值
int min = electionTimeoutRandomRange.getMin();
int max = electionTimeoutRandomRange.getMax();
Random random = new Random();
return min + random.nextInt(max - min + 1);
}
避免平票的原理:
- 3 個節點超時時間分別為:170ms、220ms、280ms
- node1 先超時發起選舉
- node2 和 node3 收到投票請求後重置超時時間
- node1 獲得多數派(自己的票),成為 Leader
4.3 投票請求處理
4.3.1 RequestVote RPC
請求格式:VoteRequest
字段説明:
public class VoteRequest {
private long term; // candidate 的任期號
private String candidateId; // candidate 的節點 ID
private long lastLogIndex; // candidate 最後一條日誌的索引
private long lastLogTerm; // candidate 最後一條日誌的任期號
}
響應格式:VoteResponse
字段説明:
public class VoteResponse {
private long term; // 當前任期(用於更新 candidate 的任期)
private boolean voteGranted; // 是否投票
}
4.3.2 投票邏輯
方法:requestVote(VoteRequest voteRequest)
實現位置:ConsensusModuleImpl.java:45-90
@Override
public VoteResponse requestVote(VoteRequest voteRequest) {
voteLock.lock();
try {
long currentTerm = node.getCurrentTerm();
String votedFor = node.getVotedFor();
String candidateId = voteRequest.getCandidateId();
log.info("Received vote request from candidate: {}, Term: {}, CurrentTerm: {}, VotedFor: {}",
candidateId, voteRequest.getTerm(), currentTerm, votedFor);
// 1. 任期檢查
if (voteRequest.getTerm() < currentTerm) {
log.info("Rejected: candidate term {} < current term {}",
voteRequest.getTerm(), currentTerm);
return new VoteResponse(currentTerm, false);
}
// 2. 任期更大,更新並轉為 Follower
if (voteRequest.getTerm() > currentTerm) {
log.info("Higher term received: {} -> {}, becoming FOLLOWER",
currentTerm, voteRequest.getTerm());
node.becomeFollower(voteRequest.getTerm());
currentTerm = node.getCurrentTerm();
votedFor = node.getVotedFor();
}
// 3. 檢查是否已投票給其他人
if (votedFor != null && !votedFor.equals(candidateId)) {
log.info("Already voted for {}, rejecting {}", votedFor, candidateId);
return new VoteResponse(currentTerm, false);
}
// 4. 檢查日誌是否至少一樣新
if (isLogUpToDate(voteRequest.getLastLogIndex(), voteRequest.getLastLogTerm())) {
log.info("Voting for candidate: {}", candidateId);
node.setVotedFor(candidateId);
node.setPrevElectionTime(System.currentTimeMillis()); // 重置超時
return new VoteResponse(currentTerm, true);
} else {
log.info("Candidate log not up to date");
return new VoteResponse(currentTerm, false);
}
} finally {
voteLock.unlock();
}
}
投票規則詳解:
-
任期檢查
- candidate 的任期 < 當前任期 → 拒絕
-
任期更新
- candidate 的任期 > 當前任期 → 更新任期,轉為 Follower
-
唯一投票
- 本輪任期已投票給其他人 → 拒絕
- 已投票給該 candidate → 接受(冪等性)
-
日誌完整性
- candidate 的日誌 >= 自己的日誌 → 接受
- 否則 → 拒絕
4.3.3 日誌比較邏輯
方法:isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)
實現位置:ConsensusModuleImpl.java:337-350
private boolean isLogUpToDate(long candidateLastLogIndex, long candidateLastLogTerm) {
long lastLogTerm = getLastLogTerm();
long lastLogIndex = getLastLogIndex();
// 優先比較任期:candidate 的任期更大 → 更新
if (candidateLastLogTerm > lastLogTerm) {
return true;
}
// 任期相同,比較索引:candidate 的索引 >= 自己的索引 → 更新
if (candidateLastLogTerm == lastLogTerm && candidateLastLogIndex >= lastLogIndex) {
return true;
}
// 其他情況 → 不更新
return false;
}
示例:
情況 1: candidate 任期更大
candidate: term=3, index=5
current: term=2, index=5
→ 投票 (任期更大)
情況 2: 任期相同,索引更大或相等
candidate: term=2, index=5
current: term=2, index=4
→ 投票 (索引更大)
情況 3: 任期相同,索引更小
candidate: term=2, index=4
current: term=2, index=5
→ 不投票 (日誌落後)
情況 4: 任期更小
candidate: term=1, index=10
current: term=2, index=5
→ 不投票 (任期更小)
4.3.4 投票鎖
目的:防止併發投票請求導致狀態不一致
實現:
public final ReentrantLock voteLock = new ReentrantLock();
@Override
public VoteResponse requestVote(VoteRequest voteRequest) {
voteLock.lock();
try {
// 投票邏輯
...
} finally {
voteLock.unlock();
}
}
保護的資源:
currentTermvotedFornodeStatus
4.4 選舉發起流程
4.4.1 開始選舉
方法:startElection()
實現位置:RaftNodeImpl.java:266-289
private void startElection() {
int totalNodes = raftConfig.getRaftNodeConfigList().size();
currentVoteCounter = new VoteCounter(currentTerm, totalNodes);
// 投票給自己
currentVoteCounter.voteForSelf(currentNodeConfig.getServerId());
log.info("Starting election for term: {}, voted for self, votes: {}/{}",
currentTerm, currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());
// 單機模式直接成為 Leader
if (totalNodes == 1) {
log.info("Single node mode, becoming leader immediately");
becomeLeader();
return;
}
// 發送投票請求給所有其他節點
List<RaftNodeConfig> otherNodes = getOtherNodes();
for (RaftNodeConfig nodeConfig : otherNodes) {
electionExecutor.execute(() -> sendVoteRequest(nodeConfig));
}
// 檢查是否已獲得多數派(可能只有自己一票的情況)
checkElectionResult();
}
流程:
- 創建投票計數器
- 投票給自己
- 單機模式直接成為 Leader
- 多機模式併發發送投票請求
- 檢查選舉結果
4.4.2 發送投票請求
方法:sendVoteRequest(targetNode)
實現位置:RaftNodeImpl.java:294-316
private void sendVoteRequest(RaftNodeConfig targetNode) {
try {
// 構建 VoteRequest
VoteRequest request = VoteRequest.builder()
.term(currentTerm)
.candidateId(currentNodeConfig.getServerId())
.lastLogIndex(getLastLogIndex())
.lastLogTerm(getLastLogTerm())
.build();
request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
request.setCmd(Request.REQUEST_VOTE);
log.debug("Sending VoteRequest to {} for term {}", targetNode.getServerId(), currentTerm);
// 發送 RPC 請求
VoteResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);
// 處理響應
if (response != null) {
handleVoteResponse(response, targetNode.getServerId());
}
} catch (Exception e) {
log.debug("Failed to send vote request to {}: {}", targetNode.getServerId(), e.getMessage());
}
}
特點:
- 併發發送到所有其他節點
- 使用線程池異步發送
- 超時設置為 3000ms
- 失敗不重試(等待下一次選舉)
4.4.3 投票計數器
類名:VoteCounter
實現位置:com.ling.raft.core.VoteCounter.java
核心方法:
// 記錄投票
public synchronized boolean recordVote(String nodeId) {
return votesReceived.add(nodeId);
}
// 投票給自己
public synchronized void voteForSelf(String selfId) {
if (!votedForSelf) {
votesReceived.add(selfId);
votedForSelf = true;
}
}
// 檢查是否獲得多數派
public boolean hasMajority() {
return votesReceived.size() >= majorityCount;
}
// 獲取當前票數
public int getVoteCount() {
return votesReceived.size();
}
數據結構:
- 使用
ConcurrentHashMap.newKeySet()存儲投票節點 ID - 保證線程安全
- 自動去重(不會重複計票)
4.5 投票響應處理
4.5.1 處理投票響應
方法:handleVoteResponse(response, voterId)
實現位置:RaftNodeImpl.java:322-361
private void handleVoteResponse(VoteResponse response, String voterId) {
// 使用同步塊確保原子性
synchronized (this) {
// 如果不是 Candidate,忽略
if (nodeStatus != ServerStatusEnum.CANDIDATE) {
log.debug("Not a candidate anymore (status: {}), ignoring vote from {}",
nodeStatus, voterId);
return;
}
// 如果收到更高任期,轉為 Follower
if (response.getTerm() > currentTerm) {
log.info("Received higher term {} from {}, stepping down",
response.getTerm(), voterId);
becomeFollower(response.getTerm());
return;
}
// 忽略舊任期的響應
if (response.getTerm() < currentTerm) {
log.debug("Received stale vote response from {} for old term {}",
voterId, response.getTerm());
return;
}
// 統計投票
if (response.isVoteGranted()) {
boolean isNewVote = currentVoteCounter.recordVote(voterId);
if (isNewVote) {
log.info("Received vote from {} for term {}, total votes: {}/{}",
voterId, currentTerm, currentVoteCounter.getVoteCount(),
currentVoteCounter.getMajorityCount());
// 檢查選舉結果
checkElectionResult();
}
} else {
log.debug("Vote denied by {} for term {}", voterId, currentTerm);
}
}
}
處理邏輯:
-
狀態檢查
- 不再是 Candidate → 忽略
-
任期檢查
- 響應任期 > 當前任期 → 發現更高任期,轉為 Follower
- 響應任期 < 當前任期 → 忽略舊響應
-
投票統計
- 投票成功 → 記錄投票,檢查是否獲得多數派
- 投票失敗 → 記錄日誌
4.5.2 檢查選舉結果
方法:checkElectionResult()
實現位置:RaftNodeImpl.java:367-373
private void checkElectionResult() {
if (currentVoteCounter != null && currentVoteCounter.hasMajority()) {
log.info("Majority votes received ({}/{}), becoming LEADER",
currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());
becomeLeader();
}
}
調用時機:
- 投票給自己後(單機模式)
- 收到每個投票響應後
- 所有投票請求發送後(初始檢查)
4.6 心跳機制
4.6.1 心跳任務
類名:HeartbeatTask
實現位置:com.ling.raft.core.task.HeartbeatTask.java
@Override
public void run() {
try {
// 只有 Leader 才發送心跳
if (node.getNodeStatus() != ServerStatusEnum.LEADER) {
log.debug("Current node is not LEADER, skip heartbeat");
return;
}
log.debug("Sending heartbeats to all nodes, term: {}", node.getCurrentTerm());
// 發送心跳給所有節點
node.sendHeartbeats();
} catch (Exception e) {
log.error("Error in heartbeat task", e);
}
}
4.6.2 發送心跳
方法:sendHeartbeats()
實現位置:RaftNodeImpl.java:407-413
public void sendHeartbeats() {
List<RaftNodeConfig> otherNodes = getOtherNodes();
for (RaftNodeConfig nodeConfig : otherNodes) {
heartbeatExecutor.execute(() -> sendHeartbeat(nodeConfig));
}
}
4.6.3 單次心跳發送
方法:sendHeartbeat(targetNode)
實現位置:RaftNodeImpl.java:418-436
private void sendHeartbeat(RaftNodeConfig targetNode) {
try {
// 構建心跳請求(entries 為空)
AppendEntriesRequest request = AppendEntriesRequest.builder()
.term(currentTerm)
.leaderId(currentNodeConfig.getServerId())
.entries(new ArrayList<>()) // 空列表表示心跳
.build();
request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
request.setCmd(Request.APPEND_ENTRIES);
// 發送請求
AppendEntriesResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);
// 處理響應
if (response != null) {
handleHeartbeatResponse(response, targetNode.getServerId());
}
} catch (Exception e) {
log.debug("Failed to send heartbeat to {}: {}", targetNode.getServerId(), e.getMessage());
}
}
心跳特點:
entries為空列表- 只包含
term、leaderId等元數據 - 用於維護 Leader 地位,防止 Follower 發起新選舉
4.6.4 心跳定時器
方法:startHeartbeatTimer()
實現位置:RaftNodeImpl.java:380-391
private void startHeartbeatTimer() {
int heartbeatInterval = raftConfig.getHeartbeatIntervalMs();
heartbeatFuture = heartbeatExecutor.scheduleAtFixedRate(
new HeartbeatTask(this),
0, // 立即開始
heartbeatInterval, // 間隔
TimeUnit.MILLISECONDS
);
log.debug("Heartbeat timer started, interval: {}ms", heartbeatInterval);
}
配置示例:
config.setHeartbeatInterval(1); // 每 1 秒發送一次心跳
4.6.5 心跳響應處理
方法:handleHeartbeatResponse(response, nodeId)
實現位置:RaftNodeImpl.java:441-448
private void handleHeartbeatResponse(AppendEntriesResponse response, String nodeId) {
// 如果響應的任期更大,轉為 Follower
if (response.getTerm() > currentTerm) {
log.info("Received higher term {} from {} in heartbeat response, stepping down",
response.getTerm(), nodeId);
becomeFollower(response.getTerm());
}
}
處理邏輯:
- 檢查響應中的任期
- 發現更高任期 → 立即轉為 Follower
- 避免網絡分區導致的腦裂
4.7 安全性保證
4.7.1 選舉安全性
目標:任期內最多一個 Leader
實現:
-
任期單調遞增
public void becomeCandidate() { currentTerm++; // 每次選舉增加任期 } -
只投一次票
// ConsensusModuleImpl.requestVote() if (votedFor != null && !votedFor.equals(candidateId)) { return new VoteResponse(currentTerm, false); } -
多數派約束
// VoteCounter public boolean hasMajority() { return votesReceived.size() >= majorityCount; // N/2 + 1 }
4.7.2 任期更新規則
規則:發現更高任期 → 更新任期,轉為 Follower
實現位置:
ConsensusModuleImpl.requestVote()第 63-68 行ConsensusModuleImpl.appendEntries()第 128-134 行RaftNodeImpl.handleVoteResponse()第 333-337 行RaftNodeImpl.handleHeartbeatResponse()第 443-447 行
示例:
// 在 requestVote 中
if (voteRequest.getTerm() > currentTerm) {
node.becomeFollower(voteRequest.getTerm());
currentTerm = node.getCurrentTerm();
}
4.7.3 日誌完整性檢查
目的:只投票給日誌至少和自己一樣新的候選人
實現:isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)
規則:
- candidate 任期 > 自己任期 → 投票
- 任期相同,candidate 索引 >= 自己索引 → 投票
- 否則 → 拒絕
重要性:
- 保證新 Leader 包含所有已提交的日誌
- 防止日誌丟失或覆蓋
4.7.4 腦裂預防
場景:網絡分區,兩個 Leader 同時存在
預防機制:
-
多數派約束
- Leader 需要多數派支持
- 分區後的少數派無法獲得足夠票數
-
心跳超時
- 少數派 Follower 收不到心跳
- 選舉超時後發起選舉
- 多數派選出新 Leader
-
任期遞增
- 新 Leader 使用更高任期
- 舊 Leader 的心跳被拒絕
示例:
初始狀態:5 節點(node1-5),Leader=node1
網絡分區:
- 分區 A: node1, node2 (2 節點)
- 分區 B: node3, node4, node5 (3 節點)
分區 A:
- node1 仍是 Leader
- node2 收不到心跳,超時後轉為 Candidate
- 只有 1 票(自己),無法獲得多數派(需要 3 票)
- 無法選出新 Leader
分區 B:
- node3 超時後發起選舉
- 獲得自己 + node4 + node5 的票(3 票)
- 成為新 Leader(term=2)
網絡恢復後:
- node1 發送心跳(term=1)
- 其他節點拒絕(term=2 > term=1)
- node1 收到更高任期,轉為 Follower
5. 測試指南
5.1 測試程序
文件位置:
LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader/ThreeNodeElectionTest.java
運行方式:
# 直接運行 main 方法
java -cp <classpath> com.ling.raft.example.leader.ThreeNodeElectionTest
腳本運行:
cd LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader
start-cluster.bat
5.2 測試功能
5.2.1 基本測試場景
場景 1:正常選舉
[STEP 1] Starting 3 nodes...
✓ node1 started on port 8081
✓ node2 started on port 8082
✓ node3 started on port 8083
✓ All nodes started!
[STEP 2] Waiting for leader election...
[Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
----------------------------------------
✓ Leader elected!
場景 2:Leader 故障
raft> kill node1
✓ node1 stopped
! Leader killed, waiting for new election...
[Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
場景 3:節點恢復
raft> revive node1
✓ node1 revived
✓ Status: FOLLOWER
✓ Election timer: active
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
5.2.2 交互式命令
| 命令 | 説明 | 示例 |
|---|---|---|
status |
查看所有節點狀態 | status |
leader |
顯示當前 Leader 信息 | leader |
kill <node> |
模擬節點故障 | kill node1 |
revive <node> |
恢復節點 | revive node1 |
log <level> |
控制日誌級別 | log debug |
stop |
停止所有節點並退出 | stop |
5.2.3 日誌級別控制
控制方式:
raft> log silent
✓ Log level set to ERROR (silent mode)
raft> log info
✓ Log level set to INFO
raft> log debug
✓ Log level set to DEBUG (verbose mode)
raft> log election
✓ Showing election logs only
raft> log heartbeat
✓ Showing heartbeat logs only
日誌級別説明:
silent/error- 僅錯誤信息warn- 警告及以上info- 信息及以上(默認)debug- 調試信息(全部日誌)election- 僅選舉相關日誌heartbeat- 僅心跳相關日誌
5.3 預期輸出
正常選舉
╔════════════════════════════════════════════════════════════╗
║ Raft Leader Election Test - 3 Nodes ║
╚════════════════════════════════════════════════════════════╝
[STEP 1] Starting 3 nodes...
✓ node1 started on port 8081
✓ node2 started on port 8082
✓ node3 started on port 8083
✓ All nodes started!
[STEP 2] Waiting for leader election...
[Cluster] node1:F(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
----------------------------------------
✓ Leader elected!
┌────────────────────────────────────────────────────────────┐
│ Cluster Status │
├────────────┬──────────────┬─────────┬─────────┬────────────┤
│ Node │ Status │ Term │ Log │ Voted For │
├────────────┼──────────────┼─────────┼─────────┼────────────┤
│ node1 │ LEADER │ 1 │ 0 │ - │
│ node2 │ FOLLOWER │ 1 │ 0 │ node1 │
│ node3 │ FOLLOWER │ 1 │ 0 │ node1 │
└────────────┴──────────────┴─────────┴─────────┴────────────┘
Leader 故障恢復
raft> kill node1
Killing node1...
✓ node1 stopped
! Leader killed, waiting for new election...
[Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
raft> leader
┌────────────────────────────────────────────────────────────┐
│ Leader Info │
├────────────────────────────────────────────────────────────┤
│ Node ID: node2 │
│ Address: 127.0.0.1:8082 │
│ Term: 2 │
└────────────────────────────────────────────────────────────┘
raft> revive node1
Reviving node1...
✓ node1 revived
✓ Status: FOLLOWER
✓ Election timer: active
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
5.4 完整測試流程
步驟 1:啓動並驗證選舉
# 運行測試程序
java com.ling.raft.example.leader.ThreeNodeElectionTest
# 觀察選舉過程
[Cluster] node1:F(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
# 查看當前狀態
raft> status
步驟 2:驗證心跳
# 等待幾秒,觀察心跳
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
# 啓用心跳日誌觀察
raft> log heartbeat
✓ Showing heartbeat logs only
步驟 3:模擬 Leader 故障
# 殺死 Leader
raft> kill node1
✓ node1 stopped
! Leader killed, waiting for new election...
# 觀察新選舉
[Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
步驟 4:恢復舊 Leader
# 恢復節點
raft> revive node1
✓ node1 revived
# 觀察恢復過程
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
步驟 5:多次故障測試
# 持續故障恢復
raft> kill node2
raft> kill node3
raft> revive node2
raft> revive node3
6. 使用示例
6.1 基本使用
// 1. 創建節點配置
RaftNodeConfig node1 = new RaftNodeConfig("node1", "127.0.0.1", 8081);
RaftNodeConfig node2 = new RaftNodeConfig("node2", "127.0.0.1", 8082);
RaftNodeConfig node3 = new RaftNodeConfig("node3", "127.0.0.1", 8083);
List<RaftNodeConfig> allNodes = Arrays.asList(node1, node2, node3);
// 2. 創建 Raft 配置
RaftConfig config1 = new RaftConfig(node1, allNodes);
config1.setElectionTimeout(2); // 基礎超時倍數
config1.setElectionTimeoutRandomRange(Range.of(150, 300)); // 隨機範圍
config1.setHeartbeatInterval(1); // 心跳間隔 1 秒
// 3. 創建 RPC 組件
DefaultRpcServer rpcServer1 = new DefaultRpcServer(node1.getPort(), null);
DefaultRpcClient rpcClient1 = new DefaultRpcClient();
// 4. 創建並初始化 Raft 節點
RaftNodeImpl raftNode1 = new RaftNodeImpl(config1, rpcServer1, rpcClient1);
rpcServer1.setRaftNode(raftNode1);
raftNode1.init();
// 5. 等待選舉
Thread.sleep(2000);
// 6. 檢查節點狀態
if (raftNode1.getNodeStatus() == ServerStatusEnum.LEADER) {
System.out.println("Node1 is Leader, term: " + raftNode1.getCurrentTerm());
}
6.2 監控選舉狀態
// 創建監控線程
Thread monitor = new Thread(() -> {
while (true) {
System.out.printf("Node1: %s(t%d) ",
raftNode1.getNodeStatus(),
raftNode1.getCurrentTerm());
System.out.printf("Node2: %s(t%d) ",
raftNode2.getNodeStatus(),
raftNode2.getCurrentTerm());
System.out.printf("Node3: %s(t%d)\n",
raftNode3.getNodeStatus(),
raftNode3.getCurrentTerm());
Thread.sleep(3000);
}
});
monitor.setDaemon(true);
monitor.start();
6.3 手動觸發選舉
// 停止 Leader 的心跳定時器
raftNode1.cancelHeartbeatTimer();
// 模擬 Follower 超時
raftNode2.resetElectionTimer(); // 重置超時
// 等待超時後,node2 會自動發起選舉
6.4 查詢投票信息
// 獲取當前投票信息
String votedFor = raftNode1.getVotedFor();
long currentTerm = raftNode1.getCurrentTerm();
ServerStatusEnum status = raftNode1.getNodeStatus();
System.out.println("Node1 - Status: " + status + ", Term: " + currentTerm + ", VotedFor: " + votedFor);
// 如果是 Candidate,查看投票計數器
if (status == ServerStatusEnum.CANDIDATE) {
VoteCounter counter = raftNode1.getCurrentVoteCounter();
System.out.println("Votes: " + counter.getVoteCount() + "/" + counter.getMajorityCount());
}
7. 常見問題
7.1 為什麼選舉超時需要隨機化?
原因:
- 如果所有節點使用固定的超時時間,可能同時超時
- 同時超時的節點會同時發起選舉
- 導致平票(split vote),需要重新選舉
- 隨機化可以避免多個節點同時超時
示例:
不隨機化(3 個節點都使用 200ms):
t=0ms: 所有節點啓動
t=200ms: 3 個節點同時超時,都轉為 Candidate
t=201ms: 3 個節點都發送投票請求
t=210ms: 每個節點只收到自己的票(1 票)
t=220ms: 選舉超時,重新選舉(平票)
隨機化(3 個節點使用 150-300ms 隨機):
t=0ms: 所有節點啓動
t=170ms: node1 超時,發起選舉
t=171ms: node2 和 node3 收到投票請求,重置超時
t=220ms: node2 超時(新時間)
t=221ms: node1 已經是 Leader,node2 收到心跳,重置超時
t=280ms: node3 超時
t=281ms: node3 收到心跳,重置超時
t=1000ms: 心跳繼續,node1 保持 Leader
代碼實現:
// RaftConfig.java
public int getElectionTimeoutMs() {
if (electionTimeoutRandomRange == null) {
return electionTimeout * 1000;
}
int min = electionTimeoutRandomRange.getMin();
int max = electionTimeoutRandomRange.getMax();
Random random = new Random();
return min + random.nextInt(max - min + 1);
}
7.2 為什麼收到投票請求後要重置超時?
原因:
- 收到投票請求表示至少有一個其他節點是活躍的
- 重置超時可以減少不必要的選舉
- 避免頻繁切換狀態
代碼實現:
// RaftNodeImpl.java:498-500
@Override
public VoteResponse handleVoteRequest(VoteRequest voteRequest) {
VoteResponse response = consensus.requestVote(voteRequest);
// 如果投票給了對方,重置選舉定時器
if (response.isVoteGranted()) {
resetElectionTimer();
}
return response;
}
7.3 為什麼 Candidate 要增加任期?
原因:
- 避免使用舊任期發起新的選舉
- 區分不同輪的選舉
- 保證任期單調遞增
代碼實現:
// RaftNodeImpl.java:201-216
public void becomeCandidate() {
ServerStatusEnum oldStatus = nodeStatus;
// 增加任期號(重要!)
currentTerm++;
nodeStatus = ServerStatusEnum.CANDIDATE;
votedFor = currentNodeConfig.getServerId();
log.info("State changed: {} -> CANDIDATE, new term: {}", oldStatus, currentTerm);
resetElectionTimer();
startElection();
}
7.4 如何處理網絡分區?
Raft 的保證:
- 舊 Leader 無法獲得多數派,無法提交新日誌
- 新 Leader 會在多數派分區選舉產生
- 網絡恢復後,舊 Leader 會轉為 Follower
代碼體現:
// 舊 Leader 的心跳被拒絕
private void handleHeartbeatResponse(AppendEntriesResponse response, String nodeId) {
if (response.getTerm() > currentTerm) {
log.info("Received higher term {}, stepping down", response.getTerm());
becomeFollower(response.getTerm());
}
}
// 舊 Leader 無法獲得多數派
public boolean hasMajority() {
return votesReceived.size() >= majorityCount; // N/2 + 1
}
7.5 為什麼心跳間隔通常遠小於選舉超時?
原因:
- 心跳間隔短(如 100ms),選舉超時長(如 200-300ms)
- 確保 Follower 在超時前收到心跳
- 避免不必要的選舉
配置示例:
config.setHeartbeatInterval(1); // 1 秒(1000ms)
config.setElectionTimeoutRandomRange(Range.of(150, 300)); // 150-300ms
// 注意:這裏心跳間隔是秒,超時是毫秒
// 實際使用時,心跳間隔應該 < 選舉超時
建議配置:
心跳間隔:50ms - 100ms
選舉超時:150ms - 300ms
7.6 如何避免平票(split vote)?
平票場景:
3 個節點:
- node1: term=2, votes=[node1]
- node2: term=2, votes=[node2]
- node3: term=2, votes=[node3]
每個節點只有 1 票,無法獲得多數派(需要 2 票)
選舉超時後重新選舉
避免方法:
-
隨機化超時(已實現)
- 減少多個節點同時超時的概率
-
預投票(Pre-vote)(未實現)
- 先詢問其他節點是否願意投票
- 如果多數派同意,再真正發起選舉
-
快速重試(未實現)
- 平票後快速重新選舉
- 立即開始,不等超時
當前實現:
- 僅依賴超時隨機化
- 平票後等待超時重試
7.7 為什麼單機模式直接成為 Leader?
原因:
- 單機集羣不需要選舉
- 只有一個節點,自己就是多數派
- 提高啓動速度
代碼實現:
// RaftNodeImpl.java:274-279
private void startElection() {
int totalNodes = raftConfig.getRaftNodeConfigList().size();
currentVoteCounter = new VoteCounter(currentTerm, totalNodes);
currentVoteCounter.voteForSelf(currentNodeConfig.getServerId());
// 單機模式直接成為 Leader
if (totalNodes == 1) {
log.info("Single node mode, becoming leader immediately");
becomeLeader();
return;
}
// 多機模式發送投票請求
...
}
7.8 如何調優選舉參數?
參數建議:
| 參數 | 推薦值 | 説明 |
|---|---|---|
| 心跳間隔 | 50ms - 100ms | 越短越快,但網絡開銷大 |
| 選舉超時最小 | 150ms - 200ms | 應該 > 心跳間隔 |
| 選舉超時最大 | 300ms - 400ms | 應該是心跳間隔的 3-5 倍 |
| RPC 超時 | 2000ms - 3000ms | 應該 > 選舉超時 |
調優示例:
// 低延遲場景(數據中心內)
config.setHeartbeatInterval(1); // 1ms
config.setElectionTimeoutRandomRange(Range.of(10, 20)); // 10-20ms
// 高穩定性場景(廣域網)
config.setHeartbeatInterval(100); // 100ms
config.setElectionTimeoutRandomRange(Range.of(500, 1000)); // 500-1000ms
// 開發調試場景
config.setHeartbeatInterval(1); // 1秒
config.setElectionTimeoutRandomRange(Range.of(2000, 4000)); // 2-4秒
附錄
A. 術語表
| 術語 | 説明 |
|---|---|
| Term | 任期號,單調遞增,用於識別 Leader |
| Election Timeout | 選舉超時時間,隨機化避免平票 |
| Heartbeat | 心跳,Leader 定期發送維持地位 |
| Majority | 多數派,超過半數的節點(N/2 + 1) |
| Split Vote | 平票選舉,沒有節點獲得多數派 |
| Candidate | 候選節點,發起選舉的節點 |
| Leader | 主節點,處理客户端請求 |
| Follower | 從節點,響應 Leader 的請求 |
B. 參考資料
- Raft 論文:Diego Ongaro, John Ousterhout. "In Search of an Understandable Consensus Algorithm." 2014
- Raft GitHub:https://github.com/ongardie/raft.github.io
- 可視化 Raft:http://thesecretlivesofdata.com/raft/
- Raft Scope:https://raft.github.io/raftscope/index.html
C. 相關文件
| 文件 | 路徑 |
|---|---|
| RaftNodeImpl | com.ling.raft.core.RaftNodeImpl |
| ConsensusModuleImpl | com.ling.raft.core.ConsensusModuleImpl |
| VoteCounter | com.ling.raft.core.VoteCounter |
| ElectionTask | com.ling.raft.core.task.ElectionTask |
| HeartbeatTask | com.ling.raft.core.task.HeartbeatTask |
| ServerStatusEnum | com.ling.raft.enums.ServerStatusEnum |
| VoteRequest | com.ling.raft.model.dto.VoteRequest |
| VoteResponse | com.ling.raft.model.dto.VoteResponse |
| ThreeNodeElectionTest | com.ling.raft.example.leader.ThreeNodeElectionTest |