問題出現
在做詞焙小程序詞庫更新的時候遇到一個問題:如果某一個單詞是一個非法的單詞,那就需要進行標記,之後再次遇到的時候可以直接跳過。
這個方案要實現的話,可能第一時間會想到用 Redis 的 Set;或者數據庫里加一張表,一行一個非法單詞。
但是詞焙本身是沒有用到 Redis 的,如果要用還得配置下內存淘汰策略;這麼簡單的需求放數據庫的話又有點殺雞用牛刀了。
所以我選擇了直接使用內存 + 定期持久化到文件,整個技術方案不難,加起來就一百行左右的代碼。
基礎功能
雖然整個技術方案不復雜,但我們也拆解需求,逐步完善。
業務側關心的方法:Add(新增)和 Exist(判斷是否存在),至於怎麼持久化的細節是業務側不太關心的,所以我們先實現最簡單的集合:
type PersistentList struct {
records map[string]struct{} // 內存中的數據集合
mu sync.RWMutex // 用於併發控制的讀寫鎖
}
// Add 將 value 添加到緩存中。
func (c *PersistentList) Add(value string) {
c.mu.Lock()
defer c.mu.Unlock()
if _, found := c.records[value]; !found {
c.records[value] = struct{}{}
}
}
// Exist 檢查 value 是否存在於緩存中。
func (c *PersistentList) Exist(value string) bool {
c.mu.RLock()
_, found := c.records[value]
c.mu.RUnlock()
return found
}
到這裏,最簡單的內存集合就實現了,接下來我們增加持久化功能!
持久化
我們可以將集合逐行保存到文本文件,這麼做有兩個好處:一是進程啓動時加載集合的邏輯很簡單,二是持久化的時候即使每 Add 一個元素就直接寫文件,因為是追加寫,性能也還是可以的。
修改結構體定義,新增 pedding 和 file,以及再增加一個 chan 用於發送退出信號:
type PersistentList struct {
records map[string]struct{} // 內存中的數據集合
pending []string // 待持久化的新數據
mu sync.RWMutex // 用於併發控制的讀寫鎖
file *os.File // 持久化文件句柄
close chan struct{} // 用於關閉後台任務的信號
log contract.Logger
}
我們將讀取數據和寫入數據這兩個邏輯分開實現:
從文件加載數據
如前面所説,加載數據是逐行讀取然後放進 records 裏就行,我們把這個邏輯放在實例化時進行:
// contract.Logger 的定義見文末
func NewPersistentList(dataFile string, log contract.Logger) (*PersistentList, error) {
file, err := os.OpenFile(dataFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
cache := &PersistentList{
records: make(map[string]struct{}),
file: file,
close: make(chan struct{}),
log: log,
}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
word := scanner.Text()
if word != "" {
cache.records[word] = struct{}{}
}
}
if err := scanner.Err(); err != nil && err != io.EOF {
file.Close()
return nil, err
}
log.Infof("Loaded %d data into persistent cache from %s", len(cache.records), dataFile)
return cache, nil
}
持久化數據到文件
我們可以每 Add 一個數據就追加寫文件,但這裏做一個小小的過度設計,批量刷盤,具體做法就是每次 Add 時追加到 pending 數組,後台每分鐘寫一次磁盤:
// 修改 Add 方法:
func (c *PersistentList) Add(value string) {
c.mu.Lock()
defer c.mu.Unlock()
if _, found := c.records[value]; !found {
c.records[value] = struct{}{}
c.pending = append(c.pending, value) // 新增了這行
}
}
// 新增刷盤方法:
func (c *PersistentList) syncToFile() error {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.pending) == 0 {
return nil
}
count := len(c.pending)
writer := bufio.NewWriter(c.file)
for _, word := range c.pending {
if _, err := writer.WriteString(word + "\n"); err != nil {
return err
}
}
if err := writer.Flush(); err != nil {
return err
}
// 清空待刷盤數據,準備下一次寫入
c.pending = nil
c.log.Infof("Successfully synced %d new keys to file.", count)
return nil
}
// 新增後台任務方法(給進程啓動時以協程異步執行):
func (c *PersistentList) Serve(ctx context.Context) error {
const syncInterval = 1 * time.Minute
ticker := time.NewTicker(syncInterval)
defer ticker.Stop()
defer c.file.Close()
for {
select {
case <-ctx.Done():
c.log.Warn("Context canceled. Performing final sync...")
return c.syncToFile()
case <-ticker.C:
if err := c.syncToFile(); err != nil {
return err
}
case <-c.close:
c.log.Warn("Received close signal. Performing final sync...")
return c.syncToFile()
}
}
}
到這裏,一個帶有持久化功能的 Set 就已經實現了,可以這樣使用:
func main() {
myset, err := NewPersistentList("/path/to/setfile")
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.TODO())
go myset.Serve(ctx)
myset.Add("asdfgh")
fmt.Println(myset.Exist("asdfgh"))
fmt.Println(myset.Exist("qwerty"))
cancel() // 退出進程時通知刷盤
}
但是上面這個 context.WithCancel 的方法還是略麻煩了,還記得我們剛剛定義的 close 這個 chan 嗎,我們可以增加一個 Shutdown 方法:
func (c *PersistentList) Shutdown() {
c.close <- struct{}{}
}
這樣就可以直接使用 myset.Shutdown() 來關閉了
定義
contract.Logger 定義如下:
package contract
type Logger interface {
Debug(msg string)
Debugf(msg string, args ...interface{})
Info(msg string)
Infof(msg string, args ...interface{})
Warn(msg string)
Warnf(msg string, args ...interface{})
Error(msg string)
Errorf(msg string, args ...interface{})
Close() error
}
本文首發於本人博客:https://yian.me/blog/coding/implement-lightweight-persistent-set-in-100-lines.html