Stories

Detail Return Return

redigo連接池的源碼分析 - Stories Detail

redigo連接池的源碼分析

今天我們來看一看redigo(https://github.com/gomodule/redigo)是如何實現連接池的。

概述

連接池部分的代碼在redis/pool.go中,相關結構體和接口的UML圖如下圖所示

redigo連接池的UML類圖

Pool結構體定義了連接池的屬性和行為,包括以下主要參數:

  • Dial func() (Conn, error):指向用於新建連接的函數,由redigo的用户指定
  • MaxIdle int:最大空閒連接數
  • MaxActive int:連接池的容量,即連接池中最多可以包含多少個連接,包括正在使用的連接和空閒連接
  • IdleTimeout time.Duration:空閒連接的最大空閒時間
  • Get() Conn:從連接池獲取連接

另外,idleList是一個由空閒連接(類型為*poolConn)構成的雙向鏈表。pushFront()popFront()popBack()這3個函數分別用於,通過將剛剛使用過的連接插入到鏈表頭部來將其放回連接池;從鏈表頭部取出空閒連接;從鏈表尾部刪除長時間沒有使用的空閒連接。

type idleList struct {
    count       int
    front, back *poolConn
}

實現連接池時,需要考慮以下幾個問題

  • 何時新建連接?

    • 若新建連接時發現已創建的連接數到達連接池的容量上限,該如何處理?
  • 如何回收空閒時間過長的連接?
  • 如何確保連接池中的連接依然存活?

下面就帶着這幾個問題,重點梳理一下從連接池中獲取連接的func (p *Pool) Get() Conn方法和將連接放回連接池的func (ac *activeConn) Close()方法。

問題1:如何回收空閒時間過長的連接?

先來梳理func (p *Pool) Get() Conn方法的邏輯。

func (p *Pool) Get() Conn {
    // GetContext returns errorConn in the first argument when an error occurs.
    c, _ := p.GetContext(context.Background())
    return c
}

func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
    // Wait until there is a vacant connection in the pool.
    waited, err := p.waitVacantConn(ctx)
    if err != nil {
        return errorConn{err}, err
    }
  // ...

Get()會返回兩種類型的連接,activeConnerrorConn,這兩種類型都實現了Conn接口。

這裏採用了稱為Null ObjectSpecial Case的設計模式,即使獲取連接時發生錯誤,也不會產生nil,而是返回一個異常的連接。只不過在異常連接上的絕大多數操作都會返回錯誤。這樣設計的好處一是避免了空指針異常,二是延後了錯誤處理的時機,或者説減少了一處需要檢查錯誤的位置,redigo的用户可以認為Get()總會返回“有效的”連接,而在錯誤檢查時,只需重點檢查Do()等方法的返回值。

Get()調用了GetContext(),而後者又調用了waitVacantConn()waitVacantConn()有兩條執行路徑,我們先來看最簡單的一條——若沒有開啓等待模式p.Wait == false或者沒有設置最大連接數(連接池的容量),就直接返回。p.Wait == true時的邏輯將在後面介紹。

func (p *Pool) waitVacantConn(ctx context.Context) (waited time.Duration, err error) {
    if !p.Wait || p.MaxActive <= 0 {
        // No wait or no connection limit.
        return 0, nil
    }
  // ...

現在,關注點又回到GetContext()方法裏了,

func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
    // ...
    p.mu.Lock()

    if waited > 0 {
        // ...
    }

    // Prune stale connections at the back of the idle list.
    if p.IdleTimeout > 0 {
        n := p.idle.count
        for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
            pc := p.idle.back    // ①
            p.idle.popBack()
            p.mu.Unlock()
            pc.c.Close()
            p.mu.Lock()
            p.active--
        }
    }

這部分代碼回答了有關連接池的一個問題——如何回收空閒時間過長的連接

redigo的實現方法是獲取連接時順帶回收空閒時間過長的連接。①p.idle.back(類型為*poolConn)是指向空閒連接的雙向鏈表尾部的指針,所指向的空閒連接的t字段記錄了該連接最後一次使用的時間。如果t加上連接池參數p.IdleTimeout(最大空閒時間)在當前時間nowFunc()之前(類比食品的保質期在當前時間之前),就從雙向鏈表p.idle中刪除該連接後關閉。

由於這部分代碼可能會被多個goroutine併發執行,所以在回收(=從鏈表中刪除)空閒連接時,以及p.active計數器--時,都需要通過p.mu.Lock()加鎖。redigo在這裏還儘可能縮小了鎖的範圍:

p.mu.Lock()
// for ...
  p.mu.Unlock()
  pc.c.Close()
  p.mu.Lock()
    // ...
// }

問題2:如何確保連接池中的連接依然存活?

回收完空閒時間過長的連接後,就可以遍歷空閒連接的鏈表,從中獲取可用的空閒連接了。這部分代碼同樣可能會被多個goroutine併發執行,所以依然需要互斥鎖p.mu的保護。

p.mu.Lock()
for p.idle.front != nil {
    pc := p.idle.front
    p.idle.popFront()
    p.mu.Unlock()
  // return an `activeConn` or check next idle connection
     // ...
}

activeConn的結構如下

type activeConn struct {
    p     *Pool
    pc    *poolConn
    state int
}

之所以要確保空閒連接依然存活,是因為空閒連接雖然存在,但可能已經是失效的連接了。那麼什麼時候會出現這種情況呢?

在Redis的配置中,有一項叫做timeout,默認為0

# Close the connection after a client is idle for N seconds (0 to disable)
timeout 0

如果該選項的值不為0,且小於redigo連接池的配置項MaxIdle的值會發生什麼呢?我們不妨測試一下

$ fgrep timeout -B2 /usr/local/etc/redis.conf

# Close the connection after a client is idle for N seconds (0 to disable)
timeout 5
--

$ # 重啓redis 
$ # brew services restart redis
func main() {
    pool := &redis.Pool{
        MaxActive: 1,
        MaxIdle:   1,
        Dial: func() (redis.Conn, error) {
            return redis.Dial("tcp", "127.0.0.1:6379")
        },
    }

    c := pool.Get()
    reply, err := c.Do("PING")
    if err != nil {
        fmt.Println(reply, err)
    }
    c.Close() // return to pool

    time.Sleep(20 * time.Second)
    c = pool.Get()
    reply, err = c.Do("PING")
    if err != nil {
        fmt.Println(reply, err)    // <nil> EOF
    }
}

通過Wireshark抓包,就很容易解釋為什麼第二次c.Do("PING")報錯了,

redis-timeout

可以看到在9.40秒時,Redis關閉了與客户端之間的TCP連接。而在23.54秒左右(相對於第一次PING時的3.53秒,經歷了20秒,就是time.Sleep(20 * time.Second)睡眠的時間),redigo在已關閉的空閒連接上發送PING,Redis直接通過RST標誌斷開了連接。

這就是空閒連接雖然存在,但已經失效的情況。

為了避免這種情況,我們不但可以根據Redis的timeout的配置,調整連接池IdleTimeout time.Duration的值,還可以在創建連接池時指定TestOnBorrow函數,例如

//  pool := &redis.Pool{
//    // Other pool configuration not shown in this example.
//    TestOnBorrow: func(c redis.Conn, t time.Time) error {
//      if time.Since(t) < time.Minute {
//        return nil
//      }
//      _, err := c.Do("PING")
//      return err
//    },
//  }

if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
            // ...
            return &activeConn{p: p, pc: pc}, nil
        }
        pc.c.Close() // ①
        p.mu.Lock()
        p.active--

可以看到,當p.TestOnBorrow檢測失敗時,①空閒連接就會因無效而被關閉,避免了後續在已被Redis關閉的TCP連接上發送請求的問題。

問題3:新建連接的問題

如果空閒連接的鏈表為空,或者鏈表中沒有存活着的可用連接,就不得不新建連接了。

新建連接很簡單,只需要調用dial()函數,

p.mu.Lock()
// ...
p.active++
p.mu.Unlock()
c, err := p.dial(ctx)
// ...
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil

dial()的實現如下,僅僅是調用了創建連接池時指定的新建連接的(Dial成員指向的)函數

func (p *Pool) dial(ctx context.Context) (Conn, error) {
    // ...
    if p.Dial != nil {
        return p.Dial()
    }
    // ...
}

但新建時需要考慮,當已創建的連接數已達到連接池的容量上限時要如何處理。

我們先來看redigo中最簡單的一種處理方法,

// Handle limit for p.Wait == false.
    if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
        p.mu.Unlock()
        return errorConn{ErrPoolExhausted}, ErrPoolExhausted
    }

此時,p.Wait == false,且已創建的連接數達到了連接池的容量上限(p.active >= p.MaxActive),於是redigo直接返回了表示錯誤的連接return errorConn{}

p.Wait == true時的處理方式稍微複雜一些,簡單來説就是,當已創建的連接數達到了連接池的容量上限時,通過Pool結構體上的ch

type Pool struct {
  // ...
    ch           chan struct{} // limits open connections when p.Wait is true

讓獲取連接的goroutine進入等待狀態。

    select {
    case <-p.ch:
        // ...
    case <-ctx.Done():
        return 0, ctx.Err()
    }

p.ch有點類似令牌桶,只要桶裏還有令牌,就不會阻塞。初始化是在lazyInit()函數中完成的,桶中初始有p.MaxActive個令牌。

func (p *Pool) lazyInit() {
    p.initOnce.Do(func() {
        p.ch = make(chan struct{}, p.MaxActive)
        // ...
            for i := 0; i < p.MaxActive; i++ {
                p.ch <- struct{}{}
            }
        }
    })
}

將連接放回連接池

最後再來看一看將連接放回連接池的過程。

釋放連接是通過用户調用func (ac *activeConn) Close() (err error) {實現的。該方法最終會調用

func (p *Pool) put(pc *poolConn, forceClose bool) error {
    p.mu.Lock()
    if !p.closed && !forceClose {
        pc.t = nowFunc()            // ①
        p.idle.pushFront(pc)            // ②
        if p.idle.count > p.MaxIdle {    // ┐ 
            pc = p.idle.back             // │- ③
            p.idle.popBack()             // ┘    
        } else {
            pc = nil
        }
    }

    if pc != nil {                // ┐
        p.mu.Unlock()             // │
        pc.c.Close()              // │- ③
        p.mu.Lock()               // │
        p.active--                // │
    }                             // ┘

    // ...
    p.mu.Unlock()
    return nil
}

put()的主流程很簡單

  • ①更新連接的最後一次使用時間為當前時間
  • ②將連接插入到空閒連接鏈表的頭部
  • ③如果當前的空閒連接數(已算上剛剛插入到鏈表頭部的空閒連接)已超過了MaxIdle,則將空閒鏈表尾部的連接從鏈表刪除後關閉

與從連接池中獲取連接一樣,這部分代碼同樣可能會被多個goroutine併發執行,所以依然需要互斥鎖p.mu的保護。

至此,我們就梳理完成了redigo中連接池部分的源代碼了。

描述redigo的UML類圖的代碼

@startuml

interface Conn {
}

struct Pool {
  Dial        func
  MaxIdle     int
  MaxActive   int
  **idle        idleList**
  IdleTimeout time.Duration
  **Get()   Conn**
}

struct idleList {
    count       int
    front       *poolConn
    back        *poolConn
    
    pushFront(pc *poolConn)
    popFront()
    popBack()
}

struct poolConn {}

struct activeConn {
  p     *Pool
  pc    *poolConn
  
  **Close() error**
  Do(cmd string, args ...any) (reply any, err error)

}

idleList "1" *-- "many" poolConn : contains

idleList --* Pool

Pool --* activeConn
poolConn --* activeConn
activeConn ..|> Conn

poolConn ..|> Conn
@enduml
user avatar u_15641375 Avatar kinra Avatar haijun_5e7e16c909f52 Avatar sy_records Avatar writers Avatar headofhouchang Avatar huifeideniao Avatar apachekylin Avatar
Favorites 8 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.