博客 / 詳情

返回

Dragonboat統一存儲LogDB實現分析|得物技術

一、項目概覽

Dragonboat 是純 Go 實現的(multi-group)Raft 庫。

為應用屏蔽 Raft 複雜性,提供易於使用的 NodeHost 和狀態機接口。該庫(自稱)有如下特點:

  • 高吞吐、流水線化、批處理;
  • 提供了內存/磁盤狀態機多種實現;
  • 提供了 ReadIndex、成員變更、Leader轉移等管理端API;
  • 默認使用 Pebble 作為 存儲後端。

本次代碼串講以V3的穩定版本為基礎,不包括GitHub上v4版本內容。

二、整體架構

三、LogDB 統一存儲

LogDB 模塊是 Dragonboat 的核心持久化存儲層,雖然模塊名字有Log,但是它囊括了所有和存儲相關的API,負責管理 Raft 協議的所有持久化數據,包括:

Raft狀態 (RaftState)

Raft內部狀態變更的集合結構

包括但不限於:

  • ClusterID/NodeID: 節點ID
  • RaftState: Raft任期、投票情況、commit進度
  • EntriesToSave:Raft提案日誌數據
  • Snapshot:快照元數據(包括快照文件路徑,快照大小,快照對應的提案Index,快照對應的Raft任期等信息)
  • Messages:發給其他節點的Raft消息
  • ReadyToReads:ReadIndex就緒的請求

引導信息 (Bootstrap)

type Bootstrap struct {
    Addresses map[uint64]string // 初始集羣成員
    Join      bool
    Type      StateMachineType
}

ILogDB的API如下:

type ILogDB interface {


    BinaryFormat() uint32 // 返回支持的二進制格式版本號


    ListNodeInfo() ([]NodeInfo, error) // 列出 LogDB 中所有可用的節點信息


    // 存儲集羣節點的初始化配置信息,包括是否加入集羣、狀態機類型等
    SaveBootstrapInfo(clusterID uint64, nodeID uint64, bootstrap pb.Bootstrap) error


    // 獲取保存的引導信息
    GetBootstrapInfo(clusterID uint64, nodeID uint64) (pb.Bootstrap, error)


    // 原子性保存 Raft 狀態、日誌條目和快照元數據
    SaveRaftState(updates []pb.Update, shardID uint64) error


    // 迭代讀取指定範圍內的連續日誌條目
    IterateEntries(ents []pb.Entry, size uint64, clusterID uint64, nodeID uint64, 
                   low uint64, high uint64, maxSize uint64) ([]pb.Entry, uint64, error)


    // 讀取持久化的 Raft 狀態
    ReadRaftState(clusterID uint64, nodeID uint64, lastIndex uint64) (RaftState, error)


    // 刪除指定索引之前的所有條目, 日誌壓縮、快照後清理舊日誌
    RemoveEntriesTo(clusterID uint64, nodeID uint64, index uint64) error


    // 回收指定索引之前條目佔用的存儲空間
    CompactEntriesTo(clusterID uint64, nodeID uint64, index uint64) (<-chan struct{}, error)


    // 保存所有快照元數據
    SaveSnapshots([]pb.Update) error


    // 刪除指定的快照元數據 清理過時或無效的快照
    DeleteSnapshot(clusterID uint64, nodeID uint64, index uint64) error


    // 列出指定索引範圍內的可用快照
    ListSnapshots(clusterID uint64, nodeID uint64, index uint64) ([]pb.Snapshot, error)


    // 刪除節點的所有相關數據
    RemoveNodeData(clusterID uint64, nodeID uint64) error


    // 導入快照並創建所有必需的元數據
    ImportSnapshot(snapshot pb.Snapshot, nodeID uint64) error
}

3.1索引鍵

存儲的底層本質是一個KVDB (pebble or rocksdb),由於業務的複雜性,要統一各類業務key的設計方法,而且要降低空間使用,所以有了如下的key設計方案。

龍舟中key分為3類:


其中,2字節的header用於區分各類不同業務的key空間。

entryKeyHeader       = [2]byte{0x1, 0x1}  // 普通日誌條目
persistentStateKey   = [2]byte{0x2, 0x2}  // Raft狀態
maxIndexKey          = [2]byte{0x3, 0x3}  // 最大索引記錄
nodeInfoKey          = [2]byte{0x4, 0x4}  // 節點元數據
bootstrapKey         = [2]byte{0x5, 0x5}  // 啓動配置
snapshotKey          = [2]byte{0x6, 0x6}  // 快照索引
entryBatchKey        = [2]byte{0x7, 0x7}  // 批量日誌

在key的生成中,採用了useAsXXXKey和SetXXXKey的方式,複用了data這個二進制變量,減少GC。

type Key struct {
    data []byte  // 底層字節數組複用池
    key  []byte  // 有效數據切片
    pool *sync.Pool // 似乎並沒有什麼用
}




func (k *Key) useAsEntryKey() {
    k.key = k.data
}


type IReusableKey interface {
    SetEntryBatchKey(clusterID uint64, nodeID uint64, index uint64)
    // SetEntryKey sets the key to be an entry key for the specified Raft node
    // with the specified entry index.
    SetEntryKey(clusterID uint64, nodeID uint64, index uint64)
    // SetStateKey sets the key to be an persistent state key suitable
    // for the specified Raft cluster node.
    SetStateKey(clusterID uint64, nodeID uint64)
    // SetMaxIndexKey sets the key to be the max possible index key for the
    // specified Raft cluster node.
    SetMaxIndexKey(clusterID uint64, nodeID uint64)
    // Key returns the underlying byte slice of the key.
    Key() []byte
    // Release releases the key instance so it can be reused in the future.
    Release()
}


func (k *Key) useAsEntryKey() {
    k.key = k.data
}


// SetEntryKey sets the key value to the specified entry key.
func (k *Key) SetEntryKey(clusterID uint64, nodeID uint64, index uint64) {
    k.useAsEntryKey()
    k.key[0] = entryKeyHeader[0]
    k.key[1] = entryKeyHeader[1]
    k.key[2] = 0
    k.key[3] = 0
    binary.BigEndian.PutUint64(k.key[4:], clusterID)
    // the 8 bytes node ID is actually not required in the key. it is stored as
    // an extra safenet - we don't know what we don't know, it is used as extra
    // protection between different node instances when things get ugly.
    // the wasted 8 bytes per entry is not a big deal - storing the index is
    // wasteful as well.
    binary.BigEndian.PutUint64(k.key[12:], nodeID)
    binary.BigEndian.PutUint64(k.key[20:], index)
}

3.2變量複用IContext

IContext的核心設計目的是實現併發安全的內存複用機制。在高併發場景下,頻繁的內存分配和釋放會造成較大的GC壓力,通過IContext可以實現:

  • 鍵對象複用:通過GetKey()獲取可重用的IReusableKey
  • 緩衝區複用:通過GetValueBuffer()獲取可重用的字節緩衝區
  • 批量操作對象複用:EntryBatch和WriteBatch的複用
// IContext is the per thread context used in the logdb module.
// IContext is expected to contain a list of reusable keys and byte
// slices that are owned per thread so they can be safely reused by the
// same thread when accessing ILogDB.
type IContext interface {
    // Destroy destroys the IContext instance.
    Destroy()
    // Reset resets the IContext instance, all previous returned keys and
    // buffers will be put back to the IContext instance and be ready to
    // be used for the next iteration.
    Reset()
    // GetKey returns a reusable key.
    GetKey() IReusableKey // 這就是上文中的key接口
    // GetValueBuffer returns a byte buffer with at least sz bytes in length.
    GetValueBuffer(sz uint64) []byte
    // GetWriteBatch returns a write batch or transaction instance.
    GetWriteBatch() interface{}
    // SetWriteBatch adds the write batch to the IContext instance.
    SetWriteBatch(wb interface{})
    // GetEntryBatch returns an entry batch instance.
    GetEntryBatch() pb.EntryBatch
    // GetLastEntryBatch returns an entry batch instance.
    GetLastEntryBatch() pb.EntryBatch
}








type context struct {
    size    uint64
    maxSize uint64
    eb      pb.EntryBatch
    lb      pb.EntryBatch
    key     *Key
    val     []byte
    wb      kv.IWriteBatch
}


func (c *context) GetKey() IReusableKey {
    return c.key
}


func (c *context) GetValueBuffer(sz uint64) []byte {
    if sz <= c.size {
        return c.val
    }
    val := make([]byte, sz)
    if sz < c.maxSize {
        c.size = sz
        c.val = val
    }
    return val
}


func (c *context) GetEntryBatch() pb.EntryBatch {
    return c.eb
}


func (c *context) GetLastEntryBatch() pb.EntryBatch {
    return c.lb
}


func (c *context) GetWriteBatch() interface{} {
    return c.wb
}


func (c *context) SetWriteBatch(wb interface{}) {
    c.wb = wb.(kv.IWriteBatch)
}

3.3存儲引擎封裝IKVStore

IKVStore 是 Dragonboat 日誌存儲系統的抽象接口,它定義了底層鍵值存儲引擎需要實現的所有基本操作。這個接口讓 Dragonboat 能夠支持不同的存儲後端(如 Pebble、RocksDB 等),實現了存儲引擎的可插拔性。

type IKVStore interface {
    // Name is the IKVStore name.
    Name() string
    // Close closes the underlying Key-Value store.
    Close() error


    // 範圍掃描 - 支持前綴遍歷的迭代器
    IterateValue(fk []byte,
            lk []byte, inc bool, op func(key []byte, data []byte) (bool, error)) error
    
    // 查詢操作 - 基於回調的內存高效查詢模式
    GetValue(key []byte, op func([]byte) error) error
    
    // 寫入操作 - 單條記錄的原子寫入
    SaveValue(key []byte, value []byte) error


    // 刪除操作 - 單條記錄的精確刪除
    DeleteValue(key []byte) error
    
    // 獲取批量寫入器
    GetWriteBatch() IWriteBatch
    
    // 原子提交批量操作
    CommitWriteBatch(wb IWriteBatch) error
    
    // 批量刪除一個範圍的鍵值對
    BulkRemoveEntries(firstKey []byte, lastKey []byte) error
    
    // 壓縮指定範圍的存儲空間
    CompactEntries(firstKey []byte, lastKey []byte) error
    
    // 全量壓縮整個數據庫
    FullCompaction() error
}


type IWriteBatch interface {
    Destroy()                 // 清理資源,防止內存泄漏
    Put(key, value []byte)    // 添加寫入操作
    Delete(key []byte)        // 添加刪除操作
    Clear()                   // 清空批處理中的所有操作
    Count() int               // 獲取當前批處理中的操作數量
}

openPebbleDB是Dragonboat 中 Pebble 存儲引擎的初始化入口,負責根據配置創建一個完整可用的鍵值存儲實例。

// KV is a pebble based IKVStore type.
type KV struct {
    db       *pebble.DB
    dbSet    chan struct{}
    opts     *pebble.Options
    ro       *pebble.IterOptions
    wo       *pebble.WriteOptions
    event    *eventListener
    callback kv.LogDBCallback
    config   config.LogDBConfig
}


var _ kv.IKVStore = (*KV)(nil)




// openPebbleDB
// =============
// 將 Dragonboat 的 LogDBConfig → Pebble 引擎實例
func openPebbleDB(
        cfg  config.LogDBConfig,
        cb   kv.LogDBCallback,   // => busy通知:busy(true/false)
        dir  string,             // 主數據目錄
        wal  string,             // WAL 獨立目錄(可空)
        fs   vfs.IFS,            // 文件系統抽象(磁盤/memfs)
) (kv.IKVStore, error) {
    
    //--------------------------------------------------
    // 2️⃣ << 核心調優參數讀入
    //--------------------------------------------------
    blockSz      := int(cfg.KVBlockSize)                    // 數據塊(4K/8K…)
    writeBufSz   := int(cfg.KVWriteBufferSize)              // 寫緩衝
    bufCnt       := int(cfg.KVMaxWriteBufferNumber)         // MemTable數量
    l0Compact    := int(cfg.KVLevel0FileNumCompactionTrigger) // L0 層文件數量觸發壓縮的閾值
    l0StopWrites := int(cfg.KVLevel0StopWritesTrigger)
    baseBytes    := int64(cfg.KVMaxBytesForLevelBase)
    fileBaseSz   := int64(cfg.KVTargetFileSizeBase)
    cacheSz      := int64(cfg.KVLRUCacheSize)
    levelMult    := int64(cfg.KVTargetFileSizeMultiplier)  // 每層文件大小倍數
    numLevels    := int64(cfg.KVNumOfLevels)
    
    
    //--------------------------------------------------
    // 4️⃣ 構建 LSM-tree 層級選項 (每層無壓縮)
    //--------------------------------------------------
    levelOpts := []pebble.LevelOptions{}
    sz := fileBaseSz
    for lvl := 0; lvl < int(numLevels); lvl++ {
        levelOpts = append(levelOpts, pebble.LevelOptions{
            Compression:    pebble.NoCompression, // 寫性能優先
            BlockSize:      blockSz,
            TargetFileSize: sz,                 // L0 < L1 < … 呈指數增長
        })
        sz *= levelMult
    }
    
    //--------------------------------------------------
    // 5️⃣ 初始化依賴:LRU Cache + 讀寫選項
    //--------------------------------------------------
    cache := pebble.NewCache(cacheSz)    // block緩存
    ro    := &pebble.IterOptions{}       // 迭代器默認配置
    wo    := &pebble.WriteOptions{Sync: true} // ❗fsync強制刷盤
    
    opts := &pebble.Options{
        Levels:                      levelOpts,
        Cache:                       cache,
        MemTableSize:                writeBufSz,
        MemTableStopWritesThreshold: bufCnt,
        LBaseMaxBytes:               baseBytes,
        L0CompactionThreshold:       l0Compact,
        L0StopWritesThreshold:       l0StopWrites,
        Logger:                      PebbleLogger,
        FS:                          vfs.NewPebbleFS(fs),
        MaxManifestFileSize:         128 * 1024 * 1024,
        // WAL 目錄稍後條件注入
    }
    
    kv := &KV{
        dbSet:    make(chan struct{}),          // 關閉->初始化完成信號
        callback: cb,                           // 上層 raft engine 回調
        config:   cfg,
        opts:     opts,
        ro:       ro,
        wo:       wo,
    }
    
    event := &eventListener{
        kv:      kv,
        stopper: syncutil.NewStopper(),
    }
    
    // => 關鍵事件觸發
    opts.EventListener = pebble.EventListener{
        WALCreated:    event.onWALCreated,
        FlushEnd:      event.onFlushEnd,
        CompactionEnd: event.onCompactionEnd,
    }
    
    //--------------------------------------------------
    // 7️⃣ 目錄準備
    //--------------------------------------------------
    if wal != "" {
        fs.MkdirAll(wal)        // 📁 為 WAL 單獨磁盤預留
        opts.WALDir = wal
    }
    fs.MkdirAll(dir)            // 📁 主數據目錄
    
    //--------------------------------------------------
    // 8️⃣ 真正的數據庫實例化
    //--------------------------------------------------
    pdb, err := pebble.Open(dir, opts)
    if err != nil { return nil, err }
    
    //--------------------------------------------------
    // 9️⃣ 🧹 資源整理 & 啓動事件
    //--------------------------------------------------
    cache.Unref()               // 去除多餘引用,防止泄露
    kv.db = pdb
    
    // 🔔 手動觸發一次 WALCreated 確保反壓邏輯進入首次輪詢
    kv.setEventListener(event)  // 內部 close(kv.dbSet)
    
    return kv, nil
}

其中eventListener是對pebble 內存繁忙的回調,繁忙判斷的條件有兩個:

  • 內存表大小超過閾值(95%)
  • L0 層文件數量超過閾值(L0寫入最大文件數量-1)


func (l *eventListener) notify() {
    l.stopper.RunWorker(func() {
        select {
        case <-l.kv.dbSet:
            if l.kv.callback != nil {
                memSizeThreshold := l.kv.config.KVWriteBufferSize *
                    l.kv.config.KVMaxWriteBufferNumber * 19 / 20
                l0FileNumThreshold := l.kv.config.KVLevel0StopWritesTrigger - 1
                m := l.kv.db.Metrics()
                busy := m.MemTable.Size >= memSizeThreshold ||
                    uint64(m.Levels[0].NumFiles) >= l0FileNumThreshold
                l.kv.callback(busy)
            }
        default:
        }
    })
}

3.4日誌條目存儲DB

db結構體是Dragonboat日誌數據庫的核心管理器,提供Raft日誌、快照、狀態等數據的持久化存儲接口。是橋接了業務和pebble存儲的中間層。

// db is the struct used to manage log DB.
type db struct {
    cs      *cache       // 節點信息、Raft狀態信息緩存
    keys    *keyPool     // Raft日誌索引鍵變量池
    kvs     kv.IKVStore  // pebble的封裝
    entries entryManager // 日誌條目讀寫封裝
}


// 這裏面的信息不會過期,叫寄存更合適
type cache struct {
    nodeInfo       map[raftio.NodeInfo]struct{}
    ps             map[raftio.NodeInfo]pb.State
    lastEntryBatch map[raftio.NodeInfo]pb.EntryBatch
    maxIndex       map[raftio.NodeInfo]uint64
    mu             sync.Mutex
}
  • 獲取一個批量寫容器

實現:

func (r *db) getWriteBatch(ctx IContext) kv.IWriteBatch {
    if ctx != nil {
        wb := ctx.GetWriteBatch()
        if wb == nil {
            wb = r.kvs.GetWriteBatch()
            ctx.SetWriteBatch(wb)
        }
        return wb.(kv.IWriteBatch)
    }
    return r.kvs.GetWriteBatch()
}

降低GC壓力

  • 獲取所有節點信息

實現:

func (r *db) listNodeInfo() ([]raftio.NodeInfo, error) {
    fk := newKey(bootstrapKeySize, nil)
    lk := newKey(bootstrapKeySize, nil)
    fk.setBootstrapKey(0, 0)
    lk.setBootstrapKey(math.MaxUint64, math.MaxUint64)
    ni := make([]raftio.NodeInfo, 0)
    op := func(key []byte, data []byte) (bool, error) {
        cid, nid := parseNodeInfoKey(key)
        ni = append(ni, raftio.GetNodeInfo(cid, nid))
        return true, nil
    }
    if err := r.kvs.IterateValue(fk.Key(), lk.Key(), true, op); err != nil {
        return []raftio.NodeInfo{}, err
    }
    return ni, nil
}
  • 保存集羣狀態

實現:

type Update struct {
    ClusterID uint64  // 集羣ID,標識節點所屬的Raft集羣
    NodeID    uint64  // 節點ID,標識集羣中的具體節點


    State  // 包含當前任期(Term)、投票節點(Vote)、提交索引(Commit)三個關鍵持久化狀態


    EntriesToSave []Entry    // 需要持久化到穩定存儲的日誌條目
    CommittedEntries []Entry // 已提交位apply的日誌條目
    MoreCommittedEntries bool  // 指示是否還有更多已提交條目等待處理


    Snapshot Snapshot  // 快照元數據,當需要應用快照時設置


    ReadyToReads []ReadyToRead  // ReadIndex機制實現的線性一致讀


    Messages []Message  // 需要發送給其他節點的Raft消息


    UpdateCommit struct {
        Processed         uint64  // 已推送給RSM處理的最後索引
        LastApplied       uint64  // RSM確認已執行的最後索引
        StableLogTo       uint64  // 已穩定存儲的日誌到哪個索引
        StableLogTerm     uint64  // 已穩定存儲的日誌任期
        StableSnapshotTo  uint64  // 已穩定存儲的快照到哪個索引
        ReadyToRead       uint64  // 已準備好讀的ReadIndex請求索引
    }
}




func (r *db) saveRaftState(updates []pb.Update, ctx IContext) error {
      // 步驟1:獲取寫入批次對象,用於批量操作提高性能
      // 優先從上下文中獲取已存在的批次,避免重複創建
      wb := r.getWriteBatch(ctx)
      
      // 步驟2:遍歷所有更新,處理每個節點的狀態和快照
      for _, ud := range updates {
          // 保存 Raft 的硬狀態(Term、Vote、Commit)
          // 使用緩存機制避免重複保存相同狀態
          r.saveState(ud.ClusterID, ud.NodeID, ud.State, wb, ctx)
          
          // 檢查是否有快照需要保存
          if !pb.IsEmptySnapshot(ud.Snapshot) {
              // 快照索引一致性檢查:確保快照索引不超過最後一個日誌條目的索引
              // 這是 Raft 協議的重要約束,防止狀態不一致
              if len(ud.EntriesToSave) > 0 {
                  lastIndex := ud.EntriesToSave[len(ud.EntriesToSave)-1].Index
                  if ud.Snapshot.Index > lastIndex {
                      plog.Panicf("max index not handled, %d, %d",
                          ud.Snapshot.Index, lastIndex)
                  }
              }
              
              // 保存快照元數據到數據庫
              r.saveSnapshot(wb, ud)
              
              // 更新節點的最大日誌索引為快照索引
              r.setMaxIndex(wb, ud, ud.Snapshot.Index, ctx)
          }
      }
      
      // 步驟3:批量保存所有日誌條目
      // 這裏會調用 entryManager 接口的 record 方法,根據配置選擇批量或單獨存儲策略
      r.saveEntries(updates, wb, ctx)
      
      // 步驟4:提交寫入批次到磁盤
      // 只有在批次中有實際操作時才提交,避免不必要的磁盤 I/O
      if wb.Count() > 0 {
          return r.kvs.CommitWriteBatch(wb)
      }
      return nil
  }
  
  
  • 保存引導信息

實現:

func (r *db) saveBootstrapInfo(clusterID uint64,
    nodeID uint64, bs pb.Bootstrap) error {
    wb := r.getWriteBatch(nil)
    r.saveBootstrap(wb, clusterID, nodeID, bs)
    return r.kvs.CommitWriteBatch(wb) // 提交至Pebble
}


func (r *db) saveBootstrap(wb kv.IWriteBatch,
    clusterID uint64, nodeID uint64, bs pb.Bootstrap) {
    k := newKey(maxKeySize, nil)
    k.setBootstrapKey(clusterID, nodeID) // 序列化集羣節點信息
    data, err := bs.Marshal()
    if err != nil {
        panic(err)
    }
    wb.Put(k.Key(), data)
}
  • 獲取Raft狀態

實現:

func (r *db) getState(clusterID uint64, nodeID uint64) (pb.State, error) {
    k := r.keys.get()
    defer k.Release()
    k.SetStateKey(clusterID, nodeID)
    hs := pb.State{}
    if err := r.kvs.GetValue(k.Key(), func(data []byte) error {
        if len(data) == 0 {
            return raftio.ErrNoSavedLog
        }
        if err := hs.Unmarshal(data); err != nil {
            panic(err)
        }
        return nil
    }); err != nil {
            return pb.State{}, err
    }
    return hs, nil
}

3.5對外存儲API實現

龍舟對ILogDB提供了實現:ShardedDB,一個管理了多個pebble bucket的存儲單元。

var _ raftio.ILogDB = (*ShardedDB)(nil)
// ShardedDB is a LogDB implementation using sharded pebble instances.
type ShardedDB struct {
    completedCompactions uint64             // 原子計數器:已完成壓縮操作數
    config               config.LogDBConfig // 日誌存儲配置
    ctxs                 []IContext         // 分片上下文池,減少GC壓力
    shards               []*db              // 核心:Pebble實例數組
    partitioner          server.IPartitioner // 智能分片策略器
    compactionCh         chan struct{}      // 壓縮任務信號通道
    compactions          *compactions       // 壓縮任務管理器
    stopper              *syncutil.Stopper  // 優雅關閉管理器
}
  • 初始化過程

實現:

// 入口函數:創建並初始化分片日誌數據庫
OpenShardedDB(config, cb, dirs, lldirs, batched, check, fs, kvf):


    // ===階段1:安全驗證===
    if 配置為空 then panic
    if check和batched同時為true then panic


    // ===階段2:預分配資源管理器===
    shards := 空數組
    closeAll := func(all []*db) { //出錯清理工具
        for s in all {
            s.close()
        }
    }


    // ===階段3:逐個創建分片===
    loop i := 0 → 分片總數:
        datadir := pathJoin(dirs[i], "logdb-"+i)  //數據目錄
        snapdir := ""                           //快照目錄(可選)
        if lldirs非空 {
            snapdir = pathJoin(lldirs[i], "logdb-"+i)
        }


        shardCb := {shard:i, callback:cb}      //監控回調
        db, err := openRDB(...)                //創建實際數據庫實例
        if err != nil {                        //創建失敗
            closeAll(shards)                   //清理已創建的
            return nil, err
        }
        shards = append(shards, db)


    // ===階段5:核心組件初始化===
    partitioner := 新建分區器(execShards數量, logdbShards數量)
    instance := &ShardedDB{
        shards:      shards,
        partitioner: partitioner,
        compactions: 新建壓縮管理器(),
        compactionCh: 通道緩衝1,
        ctxs:       make([]IContext, 執行分片數),
        stopper:    新建停止器()
    }


    // ===階段6:預分配上下文&啓動後台===
    for j := 0 → 執行分片數:
        instance.ctxs[j] = 新建Context(saveBufferSize)


    instance.stopper.RunWorker(func() {        //後台壓縮協程
        instance.compactionWorkerMain()
    })


    return instance, nil                      //構造完成
    

  • 保存集羣狀態

實現:

func (s *ShardedDB) SaveRaftState(updates []pb.Update, shardID uint64) error {
    if shardID-1 >= uint64(len(s.ctxs)) {
        plog.Panicf("invalid shardID %d, len(s.ctxs): %d", shardID, len(s.ctxs))
    }
    ctx := s.ctxs[shardID-1]
    ctx.Reset()
    return s.SaveRaftStateCtx(updates, ctx)
}


func (s *ShardedDB) SaveRaftStateCtx(updates []pb.Update, ctx IContext) error {
    if len(updates) == 0 {
        return nil
    }
    pid := s.getParititionID(updates)
    return s.shards[pid].saveRaftState(updates, ctx)
}

以sylas為例子,我們每個分片都是單一cluster,所以logdb只使用了一個分片,龍舟設計初衷是為了解放多cluster的吞吐,我們暫時用不上,tindb可以考慮

四、總結

LogDB是Dragonboat重要的存儲層實現,作者將Pebble引擎包裝為一組通用簡潔的API,極大方便了上層應用與存儲引擎的交互成本。

其中包含了很多Go語言的技巧,例如大量的內存變量複用設計,展示了這個庫對高性能的極致追求,是一個十分值得學習的優秀工程案例。

往期回顧

1. 從數字到版面:得物數據產品裏數字格式化的那些事

2. 一文解析得物自建 Redis 最新技術演進

3. Golang HTTP請求超時與重試:構建高可靠網絡請求|得物技術

4. RN與hawk碰撞的火花之C++異常捕獲|得物技術

5. 得物TiDB升級實踐

文 /酒米

關注得物技術,每週更新技術乾貨

要是覺得文章對你有幫助的話,歡迎評論轉發點贊~

未經得物技術許可嚴禁轉載,否則依法追究法律責任。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.