核心結論

  1. Checkpoint 對 Key State 的存儲:按Key Group劃分並持久化到外部存儲(如 HDFS);RocksDBStateBackend 會將本地 RocksDB 的狀態快照(而非完整內容)上傳到 Checkpoint 目錄。
  2. 故障恢復邏輯:作業重啓後,故障算子的 Task 會重新分配到其他 TaskManager,通過 Checkpoint 目錄下載對應 Key Group 的狀態快照,恢復到本地 StateBackend(Heap/RocksDB),最終基於恢復的狀態繼續處理數據。

Flink狀態存儲_Group

一、Checkpoint 對 Key State 的存儲機制(按 StateBackend 分類)

Flink 的 Key State 存儲和 Checkpoint 持久化邏輯,完全由StateBackend 決定,核心分為兩類:HeapStateBackend 和 RocksDBStateBackend(生產常用)。兩者的本地存儲和 Checkpoint 流程差異顯著。

1. 核心前提:Key State 按「Key Group」劃分

無論哪種 StateBackend,Key State 都會先按Key Group拆分 —— 這是 Flink 實現狀態分片和並行恢復的核心機制:

  • Key Group:將所有 Key 哈希映射到固定數量的分組(數量 = 作業最大並行度 maxParallelism),每個 Task 負責處理若干個 Key Group 的數據和狀態。
  • 作用:Checkpoint 時按 Key Group 持久化狀態,恢復時按 Key Group 重新分配,建立並行恢復。

2. 兩種 StateBackend 的 Checkpoint 存儲流程

(1)HeapStateBackend(內存型,適用於小狀態)
  • 本地存儲:Key State 直接存儲在 TaskManager 的 JVM 堆內存中(如 HashMap 結構)。
  • Checkpoint 持久化流程
  1. Checkpoint 觸發時,Flink 對本地堆內存中的 Key State(按 Key Group 劃分)進行序列化
  2. 將序列化後的狀態數據,通過網絡上傳到外部持久化存儲(如 HDFS、S3,由 state.checkpoints.dir 配置)。
  3. 最終在 Checkpoint 目錄生成每個 Key Group 的序列化狀態記錄,完成持久化。
(2)RocksDBStateBackend(磁盤型,適用於大狀態,生產主流)
  • 本地存儲:Key State 存儲在本地磁盤的 RocksDB 實例中(數據以 SST 記錄、MANIFEST 等格式持久化到本地磁盤)。
  • Checkpoint 持久化流程(核心:上傳快照,而非完整複製):
  1. Checkpoint 觸發時,RocksDB 先對當前狀態生成增量快照(Incremental Checkpoint) 或 全量快照(Full Checkpoint)
  • 全量:將所有 Key Group 的完整狀態快照寫入臨時文件。
  • 增量(默認推薦):僅上傳自上次 Checkpoint 以來變化的 SST 文件,大幅減少數據傳輸量。
  1. Flink 將 RocksDB 生成的快照文件(按 Key Group 劃分)上傳到外部持久化存儲(如 HDFS)。
  2. 在 Checkpoint 目錄生成對應算子、對應 Key Group 的狀態元素材和數據文件,完成持久化。

關鍵結論:RocksDB 的本地內容不會完整複製到 HDFS,而是通過「快照 + 增量上傳」的方式,將狀態快照持久化到 Checkpoint 目錄 —— 既保證數據安全,又避免大量冗餘傳輸。

二、Checkpoint 目錄結構(以 RocksDB 為例)

外部存儲(如 HDFS)的 Checkpoint 目錄按「作業 → Checkpoint ID → 算子 → Key Group」層級組織,清晰存儲每個 Key State 的快照,示例結構如下:

hdfs:///flink-checkpoints/  # 配置的 state.checkpoints.dir
├─ job-xxx/                 # 作業唯一標識
│  ├─ cp-123/               # Checkpoint ID(遞增)
│  │  ├─ operator-456/      # 算子唯一標識(如 KeyedProcessFunction)
│  │  │  ├─ keygroup-0/     # Key Group 0 的狀態
│  │  │  │  ├─ 0.sst        # RocksDB 的 SST 數據文件
│  │  │  │  └─ MANIFEST     # 狀態元數據文件
│  │  │  ├─ keygroup-1/     # Key Group 1 的狀態
│  │  │  └─ ...
│  │  ├─ operator-789/      # 其他算子的狀態
│  │  └─ _metadata          # Checkpoint 元數據文件(記錄所有算子、Key Group 的狀態位置)
  • _metadata 是核心:記錄本次 Checkpoint 包含的所有算子、Key Group 的狀態文件路徑、版本等信息,恢復時作為 “索引” 使用。

三、Operator 所在機器故障的恢復流程

當 Operator 所在的 TaskManager(物理機器)故障時,Flink 基於Checkpoint 快照 和 Key Group 重分配實現狀態恢復,核心流程如下

步驟序號

步驟名稱

核心內容

1

故障檢測

JobManager 憑藉心跳機制檢測 TaskManager 失聯,標記該 TaskManager 上所有 Task 為失敗狀態

2

作業重啓

按 restart-strategy 部署觸發作業重啓,將失敗 Operator 的 Task 重新分配至正常 TaskManager

3

Key Group 重分配

依據作業並行度,將故障 Task 負責的 Key Group 重分配給新 Task(單個接管或多 Task 分攤)

4

從 Checkpoint 下載狀態

新 Task 讀取最新 Checkpoint 的_metadata 文件,獲取自身 Key Group 狀態文件路徑並下載;HeapStateBackend 反序列化為堆內存結構,RocksDBStateBackend 恢復快照至本地實例

5

本地狀態驗證與一致性保障

驗證下載狀態完整性(如校驗文件哈希),確保與 Checkpoint 時的狀態一致

6

從 Checkpoint 位置恢復消費

狀態恢復後,從 Checkpoint 記錄的數據源消費位置(如 Kafka offset)重新消費數據

7

基於恢復狀態處理數據

結合恢復的 Key State(如累計計數、會話狀態)處理數據,保障輸出一致性(如 Exactly-Once)

1. 故障檢測與作業重啓

  • 步驟 1:故障檢測:JobManager 依據「心跳機制」檢測到 TaskManager 失聯(超時未發送心跳),標記該 TaskManager 上的所有 Task 為失敗狀態。
  • 步驟 2:作業重啓:JobManager 觸發作業重啓(根據 restart-strategy 配置,如固定延遲重啓),重新為失敗的 Operator 分配 Task 到其他正常的 TaskManager。

2. State 恢復(核心:Key Group 重分配 + 狀態下載)

  • 步驟 3:Key Group 重分配:JobManager 根據作業的並行度,將故障 Task 負責的Key Group重新分配給新的 Task(可能是單個 Task 接管,或分攤給多個 Task,取決於並行度是否調整)。
  • 例:原 Task A 負責 Key Group 0~9,故障後可能由新 Task B 接管 0~9,或由 Task B 接管 0~4、Task C 接管 5~9。
  • 步驟 4:從 Checkpoint 下載狀態:新 Task 啓動後,讀取最新完成的 Checkpoint 目錄中的 _metadata 文件,找到自身負責的 Key Group 對應的狀態文件路徑(如 HDFS 上的 keygroup-0/keygroup-1/ 等)。
  • 若使用 HeapStateBackend:下載序列化的狀態文件,反序列化為堆內存中的 Key State 結構(如 HashMap)。
  • 若使用 RocksDBStateBackend:下載 RocksDB 的快照文件(SST、MANIFEST 等),恢復到本地磁盤的 RocksDB 實例中,重建 Key State。
  • 步驟 5:本地狀態驗證與一致性保障:新 Task 驗證下載的狀態完整性(如校驗文件哈希),確保與 Checkpoint 時的狀態一致,避免數據損壞。

3. 作業繼續運行

  • 步驟 6:從 Checkpoint 位置恢復數據消費:狀態恢復完成後,Task 從 Checkpoint 記錄的「數據源消費位置」(如 Kafka 的 offset)開始,重新消費未處理或可能重複的數據。
  • 步驟 7:基於恢復的狀態處理數據:Task 結合恢復的 Key State(如累計計數、會話狀態等),繼續處理資料,確保後續輸出的一致性(如 Exactly-Once)。

四、關鍵總結

  1. Key State 存儲核心:按 Key Group 劃分,Checkpoint 時持久化到外部存儲,存儲方式由 StateBackend 決定。
  2. RocksDB 與 Checkpoint:不是完整複製本地內容,而是上傳快照(增量 / 全量)到外部存儲,兼顧效率與安全性。
  3. 故障恢復本質:Key Group 重分配 + 從 Checkpoint 下載對應狀態 + 恢復數據源消費位置,最終實現 “狀態一致、數據不丟不重”。