當今互聯網行業中,對於分佈式一致性算法,個人覺得實用性最高並且應用最廣泛的就是 Raft 算法了。Raft 非常適合用於所有的節點均為可信節點時的必要數據同步場景中。Raft 的基本原理理解起來並不難,網上很多文字簡介,都不如一個很生動的動畫來得直觀。
etcd/raft
在 Kubenetes 中廣泛使用的分佈式 KV 存儲系統 etcd 使用的就是 Raft 算法。算法的實現就直接作為 etcd 的子 package(用 Go 編寫),路徑為:github.com/etcd-io/etcd/raft。
官方提供了一個 demo。這個 demo 其實已經非常完整了,它包含了網絡通信、快照壓縮、數據同步等完整的功能。而對於 etcd/raft 的初見者而言,還是稍微有點門檻了。本文的目的是儘量抽絲剝繭,首先從 raft 最基本的功能——選舉來入手,構建一個小的集羣 demo,一步一步説明 etcd/raft 的用法。
Demo 功能
這個小 demo 只實現一個功能:已知數量的集羣節點,能夠進行 leader 的選舉。更多的功能(比如數據的存儲)在以後的文章陸續解析。
為此,我們需要研究 etcd/raft 的相關函數的用法。
Raft 節點數據結構
包 raft 使用接口 Node 來描述一個 Raft 節點。該接口的函數中,本文(或者説本階段)涉及的有四個:
type Node interface {
Tick()
Step(ctx context.Context, msg raftpb.Message) error
Ready() <-chan Ready
Status() Status
}
啓動節點
Raft 節點數量建議是一個素數。這裏我採用 3 個。在節點數量已知的情況下,我們首先要告知 Raft node 節點的列表。每個節點應該有唯一的一個 uint64 類型的 ID:
peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}
應用程序需要自己實現節點與節點之間的網絡通信。這裏我就在本地單進程運行三個協程,模擬三個節點,給三個節點分配三個 channel 用來通信:
var (
bcChans = []chan raftpb.Message{
make(chan raftpb.Message),
make(chan raftpb.Message),
make(chan raftpb.Message),
}
)
// ......
func main() {
peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}
go startNode(0x01, peers)
go startNode(0x02, peers)
go startNode(0x03, peers)
time.Sleep(2 * time.Second)
return
}
節點程序中要完成的功能
在 etcd/raft 中對 Raft 算法的邏輯實現是儘量地輕量化,只實現算法的核心功能。但與此相對的,需要調用 Raft 的應用程序實現較多的額外邏輯來實現完整的節點功能。在本文中,我們只關心節點的選舉,該場景下我們需要實現的功能有以下兩個:
節點內部心跳機制
Raft 節點依賴定期的心跳來進行週期性的狀態機流轉,應用程序需要給 raft 節點提供。在 demo 中,我用了一個帶隨機抖動的 ticker 來實現——而這也是 Raft 算法中建議的方案,也就是帶有一點隨機因素。當每一次 tick 到來時,就可以調用 raft node 的 Tick() 方法,推動內部狀態機的更新:
func startNode(id uint64, peers []raft.Peer) {
// ......
for {
select {
case <-n.tick.Elapsed(): // 相當於 time 包 Ticker 的 tick.C
n.node.Tick() // n.node 是 raft.Node 對象,下同
// ......
}
// ......
}
轉發節點之間的 raft 通信
前文説到,Raft 節點之間的網絡通信需要應用程序來實現。應用程序通過 etcd/raft 節點的 Ready() 方法接收節點需要對其他發出的的信息。Ready() 函數返回 raft.Ready 結構體,在這一階段中,我們需要使用的是 Ready 結構體的 Messages 成員,這是一個 []raftpb.Message 類型。應用程序需要負責的,就是將這些 message 發送出去。
Message 的定義並不長,如下所示:
type Message struct {
Type MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
To uint64 `protobuf:"varint,2,opt,name=to" json:"to"`
From uint64 `protobuf:"varint,3,opt,name=from" json:"from"`
Term uint64 `protobuf:"varint,4,opt,name=term" json:"term"`
LogTerm uint64 `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
Index uint64 `protobuf:"varint,6,opt,name=index" json:"index"`
Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"`
Commit uint64 `protobuf:"varint,8,opt,name=commit" json:"commit"`
Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"`
RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
Context []byte `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
可以看到,這個結構體分別按照 protobuf 和 json 進行了定義,這就非常方便應用程序根據不同的通信模式對數據進行序列化和反序列化後在網絡中傳輸。而 To 則告訴了應用程序應該將這個消息發送給哪一個節點。在 demo 則是根據 To 發到對應的 channel 裏。
接收其他節點發來的 raft 通信
在 demo 中,節點從 channel 中獲取到 Message 對象之後,調用本節點的 Step() 函數:
func startNode(id uint64, peers []raft.Peer) {
// ......
for {
select {
// ......
case m := <-n.recv:
n.node.Step(context.TODO(), m)
}
// ......
}
完整 demo 代碼
完整代碼九十來行,可以直接運行之後觀察 shell 輸出,瞭解 raft 的選舉過程。
package main
import (
"context"
"log"
"strings"
"time"
"github.com/coreos/etcd/raft/raftpb"
"github.com/etcd-io/etcd/raft"
"github.com/influxdata/telegraf/agent"
)
func init() {
log.SetFlags(log.Lshortfile | log.LstdFlags)
}
var (
infof = log.Printf
errorf = log.Printf
bcChans = []chan raftpb.Message{
make(chan raftpb.Message),
make(chan raftpb.Message),
make(chan raftpb.Message),