動態

詳情 返回 返回

告別 Redis/MySQL:用一百行 Go 代碼實現持久化 Set - 動態 詳情

問題出現

在做詞焙小程序詞庫更新的時候遇到一個問題:如果某一個單詞是一個非法的單詞,那就需要進行標記,之後再次遇到的時候可以直接跳過。

這個方案要實現的話,可能第一時間會想到用 Redis 的 Set;或者數據庫里加一張表,一行一個非法單詞。

但是詞焙本身是沒有用到 Redis 的,如果要用還得配置下內存淘汰策略;這麼簡單的需求放數據庫的話又有點殺雞用牛刀了。

所以我選擇了直接使用內存 + 定期持久化到文件,整個技術方案不難,加起來就一百行左右的代碼。

基礎功能

雖然整個技術方案不復雜,但我們也拆解需求,逐步完善。

業務側關心的方法:Add(新增)和 Exist(判斷是否存在),至於怎麼持久化的細節是業務側不太關心的,所以我們先實現最簡單的集合:

type PersistentList struct {
    records map[string]struct{} // 內存中的數據集合

    mu sync.RWMutex  // 用於併發控制的讀寫鎖
}

// Add 將 value 添加到緩存中。
func (c *PersistentList) Add(value string) {
    c.mu.Lock()
    defer c.mu.Unlock()

    if _, found := c.records[value]; !found {
        c.records[value] = struct{}{}
    }
}

// Exist 檢查 value 是否存在於緩存中。
func (c *PersistentList) Exist(value string) bool {
    c.mu.RLock()
    _, found := c.records[value]
    c.mu.RUnlock()
    return found
}

到這裏,最簡單的內存集合就實現了,接下來我們增加持久化功能!

持久化

我們可以將集合逐行保存到文本文件,這麼做有兩個好處:一是進程啓動時加載集合的邏輯很簡單,二是持久化的時候即使每 Add 一個元素就直接寫文件,因為是追加寫,性能也還是可以的。

修改結構體定義,新增 peddingfile,以及再增加一個 chan 用於發送退出信號:

type PersistentList struct {
    records map[string]struct{} // 內存中的數據集合
    pending []string            // 待持久化的新數據

    mu    sync.RWMutex  // 用於併發控制的讀寫鎖
    file  *os.File      // 持久化文件句柄
    close chan struct{} // 用於關閉後台任務的信號

    log contract.Logger
}

我們將讀取數據和寫入數據這兩個邏輯分開實現:

從文件加載數據

如前面所説,加載數據是逐行讀取然後放進 records 裏就行,我們把這個邏輯放在實例化時進行:

// contract.Logger 的定義見文末
func NewPersistentList(dataFile string, log contract.Logger) (*PersistentList, error) {
    file, err := os.OpenFile(dataFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
    if err != nil {
        return nil, err
    }

    cache := &PersistentList{
        records: make(map[string]struct{}),
        file:    file,
        close:   make(chan struct{}),
        log:     log,
    }

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        word := scanner.Text()
        if word != "" {
            cache.records[word] = struct{}{}
        }
    }

    if err := scanner.Err(); err != nil && err != io.EOF {
        file.Close()
        return nil, err
    }

    log.Infof("Loaded %d data into persistent cache from %s", len(cache.records), dataFile)
    return cache, nil
}

持久化數據到文件

我們可以每 Add 一個數據就追加寫文件,但這裏做一個小小的過度設計,批量刷盤,具體做法就是每次 Add 時追加到 pending 數組,後台每分鐘寫一次磁盤:

// 修改 Add 方法:
func (c *PersistentList) Add(value string) {
    c.mu.Lock()
    defer c.mu.Unlock()

    if _, found := c.records[value]; !found {
        c.records[value] = struct{}{}
        c.pending = append(c.pending, value) // 新增了這行
    }
}

// 新增刷盤方法:
func (c *PersistentList) syncToFile() error {
    c.mu.Lock()
    defer c.mu.Unlock()

    if len(c.pending) == 0 {
        return nil
    }

    count := len(c.pending)
    writer := bufio.NewWriter(c.file)
    for _, word := range c.pending {
        if _, err := writer.WriteString(word + "\n"); err != nil {
            return err
        }
    }

    if err := writer.Flush(); err != nil {
        return err
    }

    // 清空待刷盤數據,準備下一次寫入
    c.pending = nil

    c.log.Infof("Successfully synced %d new keys to file.", count)
    return nil
}

// 新增後台任務方法(給進程啓動時以協程異步執行):
func (c *PersistentList) Serve(ctx context.Context) error {
    const syncInterval = 1 * time.Minute
    ticker := time.NewTicker(syncInterval)
    defer ticker.Stop()
    defer c.file.Close()

    for {
        select {
        case <-ctx.Done():
            c.log.Warn("Context canceled. Performing final sync...")
            return c.syncToFile()
        case <-ticker.C:
            if err := c.syncToFile(); err != nil {
                return err
            }
        case <-c.close:
            c.log.Warn("Received close signal. Performing final sync...")
            return c.syncToFile()
        }
    }
}

到這裏,一個帶有持久化功能的 Set 就已經實現了,可以這樣使用:

func main() {
    myset, err := NewPersistentList("/path/to/setfile")
    if err != nil {
        panic(err)
    }

    ctx, cancel := context.WithCancel(context.TODO())
    go myset.Serve(ctx)

    myset.Add("asdfgh")
    fmt.Println(myset.Exist("asdfgh"))
    fmt.Println(myset.Exist("qwerty"))

    cancel() // 退出進程時通知刷盤
}

但是上面這個 context.WithCancel 的方法還是略麻煩了,還記得我們剛剛定義的 close 這個 chan 嗎,我們可以增加一個 Shutdown 方法:

func (c *PersistentList) Shutdown() {
    c.close <- struct{}{}
}

這樣就可以直接使用 myset.Shutdown() 來關閉了

定義

contract.Logger 定義如下:

package contract

type Logger interface {
    Debug(msg string)
    Debugf(msg string, args ...interface{})

    Info(msg string)
    Infof(msg string, args ...interface{})

    Warn(msg string)
    Warnf(msg string, args ...interface{})

    Error(msg string)
    Errorf(msg string, args ...interface{})

    Close() error
}

本文首發於本人博客:https://yian.me/blog/coding/implement-lightweight-persistent-set-in-100-lines.html

Add a new 評論

Some HTML is okay.