概念

Redis Stream 主要用於消息隊列(MQ,Message Queue),Redis 本身是有一個 Redis 發佈訂閲 (pub/sub) 來實現消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。

簡單來説發佈訂閲 (pub/sub) 可以分發消息,但無法記錄歷史消息

Redis Stream 5.x版本引入的一種數據結構,用於處理時間序列數據、消息隊列和日誌流。它提供了高吞吐量、持久性、有序、可擴展的消息傳遞解決方案,並提供了以下主要特性:

  • 多生產者和多消費者:多個生產者可以同時向 Stream 中寫入消息,而多個消費者可以獨立訂閲並消費消息。每個消費者可以有不同的消費速率。
  • 消費組:Redis Stream引入了消費者組的概念,多個消費者可以加入同一個消費者組並共同消費消息,這確保了消息在消費時不會被多次處理。
  • 消費者阻塞:消費者可以使用 XREADGROUP 命令以阻塞方式獲取消息,只有當有新消息到達時才會被推送給消費者。
  • 消費者自動確認:Redis Stream 支持自動確認消息,消費者可以告訴 Redis 何時確認已經成功處理了一條消息。
  • 多 Stream 支持:你可以創建多個 Stream 來存儲不同種類的數據,並分別處理它們。
  • 有序性:消息在 Stream 中按照消息的時間戳有序存儲,因此你可以按照消息的順序讀取數據。
  • 持久性存儲:Redis Stream 使用內存數據結構,但也支持將數據異步保存到磁盤,以確保數據不會丟失。

Redis 高級特性 Redis Stream使用_#redis

每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時自動創建。

上圖解析:

  • Consumer Group :消費組,使用 XGROUP CREATE 命令創建,一個消費組有多個消費者(Consumer)。
  • last_delivered_id :遊標,每個消費組會有個遊標 last_delivered_id,任意一個消費者讀取了消息都會使遊標 last_delivered_id 往前移動。
  • pending_ids :消費者(Consumer)的狀態變量,作用是維護消費者的未確認的 id。 pending_ids 記錄了當前已經被客户端讀取的消息,但是還沒有 ack (Acknowledge character:確認字符)。

常用命令

消息隊列相關命令:

  • XADD - 添加消息到末尾
  • XTRIM - 對流進行修剪,限制長度
  • XDEL - 刪除消息
  • XLEN - 獲取流包含的元素數量,即消息長度
  • XRANGE - 獲取消息列表,會自動過濾已經刪除的消息
  • XREVRANGE - 反向獲取消息列表,ID 從大到小
  • XREAD - 以阻塞或非阻塞方式獲取消息列表

消費者組相關命令:

  • XGROUP CREATE - 創建消費者組
  • XREADGROUP GROUP - 讀取消費者組中的消息
  • XACK - 將消息標記為"已處理"
  • XGROUP SETID - 為消費者組設置新的最後遞送消息ID
  • XGROUP DELCONSUMER - 刪除消費者
  • XGROUP DESTROY - 刪除消費者組
  • XPENDING - 顯示待處理消息的相關信息
  • XCLAIM - 轉移消息的歸屬權
  • XINFO - 查看流和消費者組的相關信息;
  • XINFO GROUPS - 打印消費者組的信息;
  • XINFO STREAM - 打印流信息

XADD

使用 XADD 向隊列添加消息,如果指定的隊列不存在,則創建一個隊列,XADD 語法格式:

XADD key ID field value [field value ...]

  • key :隊列名稱,如果不存在就創建
  • ID :消息 id,我們使用 * 表示由 redis 生成,可以自定義,但是要自己保證遞增性。
  • field value : 記錄。

XTRIM

使用 XTRIM 對流進行修剪,限制長度, 語法格式:

XTRIM key MAXLEN [~] count

  • key :隊列名稱
  • MAXLEN :長度
  • count :數量

XDEL

使用 XDEL 刪除消息,語法格式:

XDEL key ID [ID ...]

  • key:隊列名稱
  • ID :消息 ID

XLEN

使用 XLEN 獲取流包含的元素數量,即消息長度,語法格式:

XLEN key

  • key:隊列名稱

XRANGE

使用 XRANGE 獲取消息列表,會自動過濾已經刪除的消息 ,語法格式:

XRANGE key start end [COUNT count]

  • key :隊列名
  • start :開始值, - 表示最小值
  • end :結束值, + 表示最大值
  • count :數量

XREVRANGE

使用 XREVRANGE 獲取消息列表,會自動過濾已經刪除的消息 ,語法格式:

XREVRANGE key end start [COUNT count]

  • key :隊列名
  • end :結束值, + 表示最大值
  • start :開始值, - 表示最小值
  • count :數量

XREAD

使用 XREAD 以阻塞或非阻塞方式獲取消息列表 ,語法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

  • count :數量
  • milliseconds :可選,阻塞毫秒數,沒有設置就是非阻塞模式
  • key :隊列名
  • id :消息 ID

XGROUP CREATE

使用 XGROUP CREATE 創建消費者組,語法格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

  • key :隊列名稱,如果不存在就創建
  • groupname :組名。
  • $ : 表示從尾部開始消費,只接受新消息,當前 Stream 消息會全部忽略。

從頭開始消費:

XGROUP CREATE mystream consumer-group-name 0-0

從尾部開始消費:

XGROUP CREATE mystream consumer-group-name $

XREADGROUP GROUP

使用 XREADGROUP GROUP 讀取消費組中的消息,語法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

  • group :消費組名
  • consumer :消費者名。
  • count : 讀取數量。
  • milliseconds : 阻塞毫秒數。
  • key : 隊列名。
  • ID : 消息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >