系統的運行過程中,需要對外部請求進行限流。限流有本地限流與分佈式限流,本文現在對項目實踐過程中使用的分佈式限流中間件進行介紹。
該分佈式限流中間件實現的原理是每次從遠端(遠端的redis)消費掉固定數量的配額,待消費完後,再從遠端申請。

// rate.go
// 定義了限流的頻次
 type Rate struct {
 	Period int64  // 秒級別
 	Limit int64
 }

核心實現:

// ratelimit.go
func NewRateLimit(name string, rate Rate, quota int64, ops ...Option) (*RateLimit, error) {
	// quota 每次申請的配額
	// quota 不應該過大
	if rate.Limit <= quota*10 { .... 生成錯誤}
	rl := &RateLimit{
		name : name,
		quota : quota,
		rate : rate,
		// 消費剩下的
		remainder : 0,
		// 是否超出限定
		over : false
		// 記錄時間
		tick : time.Now().Unix()
	}
	// rl.spinlock : if more request call remote meanwhile, only one pass 
	rl.spinlock.store(false)
	if len(ops) > 0 {
		for _, opt := range opts {
			opt(rl)
		}
	}
	if rl.client == nil {
		... 生成一個redis client
	}
	return 	rl, nil
}

func (rl *RateLimit) Allow(token int64) bool {
	if toke <= 0 { return false}
	var span time.Duration
	for rl.spinlock.load().(bool) {
		// 	此時,已有請求在請求遠端
		time.Sleep(DefaultWaitInterval)
		span += DefaultWaitInterval
		if span >= MaxWaitInterval {
			// 降級策略,避免過長時間的等待
			logs.Warnf(...)
			return true
		}
	}
	now := time.Now().Unix() / rate.Period
	if now == rl.tick() {
	    // 仍然在同一個時間段內
		if rl.over() {
			return false
		}
		// 為了性能的需要,忍受了可能的超賣
		if rl.loadInt64(&rl.remainer) > token {
			rl.AddInt64(&rl.remainer, ^int64(token - 1))
			return true
		}
	}
	// 從遠端申請配額
	return rl.consumeRemote(now, token)
}

從遠端申請配額:

// redis.go
func (rl *RateLimte) consumeRemote( now int64, token int64) bool {
	// 標記狀態
	rl.spinlock.store(true)
	defer rl.spinlock.store(false)
	// here adjust quota to larget tokens
	quota := rl.quota
	if quota < tokens {
		quota = tokens
	} 
	key := rl.getKey(now)
	total, err := rl.client.IncrBy(key, quota).Result()
	if total == quota {
		// 第一次,設定過期時間
		go rl.clent.Expire(key, time.Duration(rl.rate.Period)* time.Second * 2).Result()
	}
	if err != nil {
		logs...
		// 降級策略
	 	return true
	}
	var more int64
	if total <= rl.rate.Limit {
		more = quota
	} else {
		more = quota + rl.rateLimit - total
	}
	if more < tokens {
		rl.over = true
		retur false
	}
	if rl.tick == now {
		rl.AddInt64(&rl.remainer, more - tokens)
	} else {
		rl.StoreInt64(&rl.remainer, more - tokens)
		rl.tick = now
		rl.over = false
	}
	return true
}

func (*RateLimit) getKey(now int64) string {
	return prefix + ":" + version + ":" + rl.name + ":" + strconv.FormatInt(now, 10)
}

在代碼實現的過程中,為了性能的需要, 做了很多降級策略,同時避免了鎖的使用,而是儘量使用原子操作。
因為公司的redis不支持lua, 所以在向遠端請求的時候,使用redis的IncrBy操作來申請配額,等到超過限定後,進行限流。