博客 / 詳情

返回

Flink + Fluss 實戰: Delta Join 原理解析與操作指南

在使用 Flink SQL 進行實時數據處理的過程中,雙流 Join 是非常常見的操作之一。典型的場景包括分析廣告效果(曝光流訂單流實時關聯)、實時推薦(點擊流和商品信息)等等。然而,雙流 Join 需要在狀態中維護兩側全量的歷史數據,以確保計算結果的準確性。隨着作業的持續運行,雙流 Join 會逐漸帶來一些問題:

  • 運維層面

    • 狀態過大,開發者需要不斷加大作業的資源才能維持較高的吞吐。
    • Checkpoint 易超時,導致作業不穩定、持續 Failover。
    • 狀態是 Flink 內部產物,排查問題時,其內部數據難以探查。
  • 開發層面

    • Query 迭代修改後,狀態難以複用,且重啓回追代價高。

為了解決這些問題,Flink 社區在 2.1 引入了新的 Delta Join 算子,並在 2.2 對其進行了進一步的擴展。Delta Join 的核心思想是捨棄算子內本地狀態冗餘的數據存儲,利用雙向 Lookup Join 直接查詢源表中的數據,而非查詢狀態中的數據,從而複用源表數據。Delta Join 結合流存儲 Apache Fluss,在阿里巴巴淘寶天貓團隊成功落地,並且對比雙流 Join,擁有如下幾個優勢:

  • 消除了將近 50 TB 的 雙流 Join 狀態
  • 計算 CU 降低 10 倍
  • 作業恢復速度提升 87 %
  • Checkpoint 秒級完成

Flink Delta Join介紹請參考:《Delta Join:為超大規模流處理實現計算與歷史數據解耦》https://developer.aliyun.com/article/1690558

雙流 Join 實現原理

讓我們先簡單描述 Flink 雙流 Join 的工作原理。

image.png

我們以處理左側表來的 changelog 數據為例,流入的數據主要經過以下三個階段。

  1. 通過 Join Key 查詢對側(即右側)的狀態,獲取右側歷史上曾經流入該算子的全量數據。
  2. 使用 Join 條件過濾查詢得到的數據,並輸出。
  3. 將輸入的本條數據,存入本側(即左側)的狀態中,以供後續右側的數據來臨時,能正確的匹配數據。

之所以要把所有的數據用狀態記錄下來,是因為流計算是沒有邊界的,左側數據和右側數據匹配的時間點會存在時間差,即使一側的數據延遲到達,也需要保證可以關聯上另一側的數據,最終輸出。

雙流 Join 的算法確保了數據的正確性,但是其狀態會隨着時間的推移而無限制增大,成為影響作業資源消耗和穩定性的關鍵因素。雖然目前已有 Interval Join、Lookup Join、State TTL Hint 等手段來緩解或解決該問題,但是均面向了特定的業務場景,犧牲了一定的功能(如 Lookup Join 捨棄了維表側數據的更新追蹤,State TTL Hint 放棄匹配超過 TTL 期限的數據)。

Delta Join 技術原理

從雙流 Join 的原理上,我們可以觀察到,狀態裏記錄的全量數據,與源表中的數據基本相同,那麼一個直觀的想法是,可以複用源表的數據來取代原有的狀態。Delta Join 正是基於這個思路,它利用了外部存儲系統提供的索引能力,並不從狀態中查找數據,而是直接對外部存儲發出高效的、基於索引的數據查詢,以獲取匹配的記錄。通過這種方式,Delta Join 消除了雙流 Join 狀態與外部系統之間冗餘的數據存儲。

image.png

理論推導

我們以兩路輸入為例,增量更新 Join 結果的公式為:

Δ (A ⋈ B) =Δ A ⋈ B + A ⋈ Δ B + Δ A ⋈ Δ B = Δ A ⋈ (B + Δ B) + A ⋈ Δ B

其中,A 代表了左表的全量歷史數據, Δ A 代表了左表中的增量數據。B 和 Δ B 的定義與此類似。每當我們需要計算 Join 結果的增量部分時,我們只需要獲取源表中從上次計算到當前時間之間新生成的數據,並查詢對側源表中的歷史快照數據。因此我們需要:

  1. 感知源表的增量數據
  2. 訪問源表歷史快照數據

這對源表的物理存儲引擎提出了很高的要求,存儲引擎需要支持快照隔離,以確保強一致性語義。然而,目前存在以下幾個問題:

  1. 目前只有有限的存儲支持了快照的概念,例如 Paimon、Iceberg、Hudi 等等
  2. 快照生成的時間間隔為分鐘級別,無法滿足實時處理的要求
  3. 當指定快照查詢數據時,快照可能會在存儲系統中過期

考慮到上述這些問題,Flink 2.1 提出了一種滿足實時性要求的、最終一致性的 Delta Join 方案。

最終一致性語義的 Delta Join

最終一致性語義的 Delta Join 並不要求源表的存儲引擎支持快照。它總是去查詢源表當前最新的數據。其對應的變種公式如下:

Δ A ⋈ (B + Δ B) + (A + Δ A) ⋈ Δ B

和強一致性 Delta Join 相比,最終一致性 Delta Join 多出了一部分額外的中間結果 Δ A ⋈ Δ B,因此,這種方法只能確保最終的結果是一致的。

以下是雙流 Join 和兩種語義的 Delta Join 的對比。

雙流 Join 強一致性 Delta Join 最終一致性 Delta Join
延遲
狀態大小
狀態內數據詳情 兩側輸入全量明細數據 上一次觸發計算的源錶快照id 等待觸發計算的異步隊列
數據一致性 強一致性 強一致性 最終一致性

Delta Join 算子實現

為了提高算子的吞吐,在 Delta Join 算子中,分別引入了一個 TableAsyncExecutionController 組件和兩個雙側完全相同的 DeltaJoinRunner 組件。

image.png

TableAsyncExecutionController 原理

該組件由 FLIP-519 Introduce async lookup key ordered mode 引入,其嚴格限制相同 key 之間的數據必須串型執行,而允許不同 key 之間的數據並行處理,同時結合異步處理機制,大大提高了算子的吞吐能力。

該組件的運行原理如下:

image.png

TableAsyncExecutionController在接收到數據後,按照 key 放入 BlockingBuffer 內不同 key 的隊列裏,然後通過 KeyAccountingUnit 檢查該 key 是否被搶佔、有對應的數據正在執行。如果 key 被搶佔,直接返回;如果 key 未被搶佔,則搶佔該 key ,同時 poll 隊列數據,放入 ActiveBuffer,交給後續計算邏輯處理,同時註冊回調函數,在數據處理結束、輸出後,在 KeyAccountingUnit 內釋放該 key,去 BlockingBuffer 內拿下一條數據。

這套機制保證了相同 key 之間的數據是串行執行的,以避免出現分佈式亂序問題。該機制在某種程度上是 FLIP-425 Asynchronous Execution Model 的簡化版本,感興趣的可以另行研究。

在實際場景下,Delta Join 算子的吞吐會受到 BlockingBuffer 能允許的最大容量(各個 key 的隊列大小之和)影響,當 BlockingBuffer 最大容量過小時,即使收到的每個 key 都不一樣,也會由於無法充分利用異步並行的能力而導致吞吐較小。此時,可以適當調整下面的參數,來增大 BlockingBuffer 的最大容量。但如果設置的過大,BlockingBuffer 會佔用比較高的內存,同時也可能會給外部存儲帶來較大的查詢壓力。

// 默認 100
table.exec.async-lookup.buffer-capacity: 1000

我們可以通過監測 Delta Join 算子內以下幾個 metric,來判斷是否需要調整該參數。

  • aec_blocking_size:當前 BlockingBuffer 內被阻塞的所有 key 的隊列大小之和。

    該值越大,代表 join key 較為密集,考慮開啓或增大 delta join cache;該值越小,但吞吐不佳的情況下,考慮增大table.exec.async-lookup.buffer-capacity的值。

  • aec_inflight_size:當前 ActiveBuffer 內正在執行計算的數據數量。

該值越大,代表當前同時請求外部存儲集羣的數據較多,存在請求堆積的情況,需要進一步查看外部存儲系統是否存在異常,或查看是否有相關參數可以提高查詢效率;該值越小,代表 join key 較為密集,考慮開啓或增大 delta join cache。

image.png

注:當Fluss流存儲的表作為 Delta Join 的源表時,你可以通過 Flink Table Hint,在 Fluss 表上配置以下這些關鍵參數,來提高查詢效率。

  • client.lookup.queue-size
  • client.lookup.max-batch-size
  • client.lookup.max-inflight-requests
  • client.lookup.batch-timeout

具體請參考 Fluss Connector Options。

DeltaJoinRunner 原理

DeltaJoinRunner 是負責執行 Lookup 的組件。由於 Delta Join 算子會處理兩側的數據,因此對於不同側的數據,各有一個完全相同的 DeltaJoinRunner 負責 Lookup 對應表的數據。

想象一下,如果我們對每條數據都要去外部存儲進行查詢,對外部吞吐的壓力會非常大,算子的吞吐性能完全取決於請求外部系統的吞吐。但如果用普通的 cache 來對 Lookup 的數據進行緩存,Lookup 目標表的數據更新消息將無法訂閲。為此,我們引入了驅動側僅構建、Lookup 側僅更新的特殊 cache。

DeltaJoinRunner 組件的運行原理如下(圖例是用於左側輸入流查詢右側源表的 DeltaJoinRunner),分別由 LocalCache 和 LookupFetcher 組成。

image.png

當左側數據到達時,先去 LocalCache 查詢是否有 cache。當有 cache 時,直接輸出;當沒有 cache 時,藉助 LookupFetcher 通過右表的 index 查詢右表的數據,然後將查詢回來的數據在 LocalCache 中構建 cache,最後輸出。

同時,右表的數據到達時,將會查看此 DeltaJoinRunner 中的 LocalCache 是否有 cache。如果沒有cache,忽略更新;如果有 cache,更新 cache。

該 cache 機制一方面確保了在 join key 較為密集的場景,算子的吞吐能夠得到巨大的提升,同時對外部存儲也不會構成很大的查詢壓力;另一方面,確保了對側最新的數據能夠更新 cache,從而在後續的流程中能被正確地匹配上。

該 cache 是一個 LRU 的 cache,合理的設置該 cache 的大小是非常必要的。過小的 cache 大小將導致 cache 的命中率受到影響,過大的 cache 會佔用較多的內存。我們可以通過下面的參數來分別調節左右兩側 cache 的大小,甚至是在每條數據 join key 都不相同、cache 基本無用時關閉 cache。

// 是否啓用cache,默認為 true
table.exec.delta-join.cache-enabled: true

// 設置用於緩存左表數據的cache大小,默認為 10000
// 推薦在左表較小、或右流 join key 較為密集時,設置較大值
table.exec.delta-join.left.cache-size: 10000

// 設置用於緩存右表數據的cache大小,默認為 10000
// 推薦在右表較小、或左流 join key 較為密集時,設置較大值
table.exec.delta-join.right.cache-size: 10000

我們可以通過監測 Delta Join 算子上的 metric,來判斷是否需要適當增加 cache 的大小。

  • deltaJoin_leftCache_hitRate: 在右流查詢左表的場景下,緩存左表數據的 cache 的命中率百分比。該值越高越好。
  • deltaJoin_rightCache_hitRate:在左流查詢右表的場景下,緩存右表數據的 cache 的命中率百分比。該值越高越好。

image.png

注:該圖來自於“實戰”章節 Nexmark q20 變種 query。右表 Auction 表每次都產生不同的id,故而 deltaJoin_leftCache_hitRate的命中率始終為 0。

實戰

我們借用 nexmark 數據集 中 q20 的 query,略微修改後,作為本次實戰的樣例代碼。

-- 獲取包含相應拍賣信息的出價表
INSERT INTO nexmark_q20
SELECT
    auction, bidder, price, channel, url, B.`dateTime`, B.extra,
    itemName, description, initialBid, reserve, A.`dateTime`, expires, seller, category, A.extra
FROM
    bid AS B INNER JOIN auction AS A on B.auction = A.id;
-- WHERE A.category = 10;

方式一:使用 Docker 環境測試

環境準備

  1. 類 Unix 操作系統,如 Linux、Mac OS X
  2. 內存建議至少 4 GB,磁盤建議至少 4 GB

下載 Docker 鏡像

在命令行中,運行如下命令安裝 Docker 測試鏡像。

docker pull xuyangzzz/delta_join_example:1.0

運行如下命令運行該測試鏡像,進入測試 docker container 的命令行。

docker run -p 8081:8081 -p 9123:9123 --name delta_join_example -it xuyangzzz/delta_join_example:1.0 bash

運行任務 SQL

在測試 docker container 中執行下面的命令,運行 Flink 和 Fluss 集羣,創建相關表和 Delta Join 作業。

# 運行 flink 和 fluss 集羣
./start-flink-fluss.sh

# 創建相關表和 delta join 作業
./create-tables-and-run-delta-join.sh

此時,在宿主機 localhost:8081(或其他綁定的端口)即可查看 Flink UI 界面,可以看到此時 Delta Join 作業正在運行。

image.png

image.png

插入數據到源表

在測試 docker container 中執行下面的命令,為源表插入數據。

# 在源表插入數據
./insert-data.sh

觀察 Delta Join 作業

在宿主機localhost:8081(或其他綁定的端口)的 flink-ui 界面,就可以看到 Delta Join 作業在正常消費數據了。

image.png

方式二:手工搭建環境測試

環境準備

運行環境
  1. 類 Unix 操作系統,如 Linux、Mac OS X
  2. 內存建議至少 4 GB,磁盤建議至少 4 GB
  3. Java 11 及以上版本,且將環境變量 JAVA_HOME設置為 Java 的安裝目錄
準備 Apache Flink 計算引擎
  1. 下載

    在 Apache Flink 官方下載網站下載最新的 Flink 2.2.0 版本,並解壓。

  2. 修改相關配置

    修改 ./conf/config.yaml 文件,將 TaskManager numberOfTaskSlots 設置成 4 (默認為1)

image.png

準備 Apache Fluss 流存儲引擎

在 Apache Fluss 官方下載網站分別下載 Fluss 0.8 版本(並解壓)和適配 Apahce Flink 2.1 的連接器。

image.png

準備 Nexmark 源數據生成器

下載 Nexmark 項目 master 分支,在該項目根目錄下,用 maven-3.8.6 版本執行以下的 maven 命令

mvn clean install -DskipTests=true

在"./nexmark-flink/target/" 文件夾下,將會生成 nexmark-flink-0.3-SNAPSHOT.jar 文件

也可以直接使用下面準備好的 nexmark-flink-0.3-SNAPSHOT.jar 文件

請至釘釘文檔查看附件《nexmark-flink-0.3-SNAPSHOT.jar》

服務啓動

  1. 啓動 Flink 

    將 Fluss 適配 Flink 2.1 的連接器,以及 Nexmark 項目生成的 nexmark-flink-0.3-SNAPSHOT.jar 文件,放入 Flink 目錄的 ./lib 目錄下

    參考 Flink 本地模式安裝文檔,在 Flink 目錄中,執行下面的語句,啓動本地 Standalone 集羣。

    ## 請確保在 Flink 目錄下執行該語句
    ./bin/start-cluster.sh

    檢查 http://localhost:8081/#/overview 界面是否可正常訪問。

  2. 啓動 Fluss

    參考 Fluss 部署Local Cluster文檔,在 Fluss 目錄下,執行下面的語句,啓動本地集羣。

## 請確保在 Fluss 目錄下執行該語句
./bin/local-cluster.sh start

運行任務 SQL

3.1創建 Fluss 表

將下面的 SQL 代碼保存為“prepare\_table.sql”文件,其中定義了 2 張源表和 1 張結果表。

CREATE CATALOG fluss_catalog
WITH (
    'type' = 'fluss'
    ,'bootstrap.servers' = 'localhost:9123'
);

USE CATALOG fluss_catalog;

CREATE DATABASE IF NOT EXISTS my_db;

USE my_db;

-- 創建左側源表
CREATE TABLE IF NOT EXISTS fluss_catalog.my_db.bid
(
    auction     BIGINT
    ,bidder     BIGINT
    ,price      BIGINT
    ,channel    VARCHAR
    ,url        VARCHAR
    ,`dateTime` TIMESTAMP(3)
    ,extra      VARCHAR
    ,PRIMARY KEY (auction, bidder) NOT ENFORCED
)
WITH (
    -- fluss prefix lookup key,可用於 index
    'bucket.key' = 'auction'
    -- Flink 2.2 中,delta join 僅支持消費不帶 delete 操作的 cdc 源表
    ,'table.delete.behavior' = 'IGNORE'
);

-- 創建右側源表
CREATE TABLE IF NOT EXISTS fluss_catalog.my_db.auction
(
    id           BIGINT
    ,itemName    VARCHAR
    ,description VARCHAR
    ,initialBid  BIGINT
    ,reserve     BIGINT
    ,`dateTime`  TIMESTAMP(3)
    ,expires     TIMESTAMP(3)
    ,seller      BIGINT
    ,category    BIGINT
    ,extra       VARCHAR
    ,PRIMARY KEY (id) NOT ENFORCED
)
WITH (
    -- Flink 2.2 中,delta join 僅支持消費不帶 delete 操作的 cdc 源表
    'table.delete.behavior' = 'IGNORE'
);

-- 創建 delta join 寫入的結果表
CREATE TABLE fluss_catalog.my_db.delta_join_sink
(
    auction           BIGINT
    ,bidder           BIGINT
    ,price            BIGINT
    ,channel          VARCHAR
    ,url              VARCHAR
    ,bid_dateTime     TIMESTAMP(3)
    ,bid_extra        VARCHAR
    ,itemName         VARCHAR
    ,description      VARCHAR
    ,initialBid       BIGINT
    ,reserve          BIGINT
    ,auction_dateTime TIMESTAMP(3)
    ,expires          TIMESTAMP(3)
    ,seller           BIGINT
    ,category         BIGINT
    ,auction_extra    VARCHAR
    ,PRIMARY KEY (auction, bidder) NOT ENFORCED
);

在 Flink 目錄下,執行下面的語句,創建持久化的表。

## 請確保在 Flink 目錄下執行該語句
## 注意:請將 ${your_path} 替換為 prepare_table.sql 實際所在的目錄
./bin/sql-client.sh -f ${your_path}/prepare_table.sql
3.2 啓動 Delta Join 作業

將下面的 SQL 代碼保存為“run\_delta\_join.sql”文件,其中包含了可轉化為 delta join 的 q20 變體查詢。

CREATE CATALOG fluss_catalog
WITH (
    'type' = 'fluss'
    ,'bootstrap.servers' = 'localhost:9123'
);

USE CATALOG fluss_catalog;

USE my_db;

INSERT INTO delta_join_sink
SELECT
    auction
    ,bidder
    ,price
    ,channel
    ,url
    ,B.`dateTime`
    ,B.extra
    ,itemName
    ,description
    ,initialBid
    ,reserve
    ,A.`dateTime`
    ,expires
    ,seller
    ,category
    ,A.extra
FROM bid AS B
    INNER JOIN auction AS A
        ON B.auction = A.id;

在 Flink 目錄下,執行下面的語句,啓動 delta join 作業。

## 請確保在 Flink 目錄下執行該語句
## 注意:請將 ${your_path} 替換為 run_delta_join.sql 實際所在的目錄
./bin/sql-client.sh -f ${your_path}/run_delta_join.sql

在 Flink UI 上,我們可以看到 Delta Join 作業正常跑起來了。

image.png

插入數據到源表

將下面的 SQL 代碼保存為“insert\_data.sql”文件,其中包含了向兩張源表灌入 Nexmark 數據源產生模擬數據的作業。

CREATE CATALOG fluss_catalog
WITH (
    'type' = 'fluss'
    ,'bootstrap.servers' = 'localhost:9123'
);

USE CATALOG fluss_catalog;

USE my_db;

-- nexmark 模擬數據源
CREATE TEMPORARY TABLE datagen
(
    event_type  int
    ,person ROW<
        id BIGINT
        ,name VARCHAR
        ,emailAddress VARCHAR
        ,creditCard VARCHAR
        ,city VARCHAR
        ,state VARCHAR
        ,`dateTime` TIMESTAMP(3)
        ,extra VARCHAR >
    ,auction ROW<
        id BIGINT
        ,itemName VARCHAR
        ,description VARCHAR
        ,initialBid BIGINT
        ,reserve BIGINT
        ,`dateTime` TIMESTAMP(3)
        ,expires TIMESTAMP(3)
        ,seller BIGINT
        ,category BIGINT
        ,extra VARCHAR >
    ,bid ROW<
        auction BIGINT
        ,bidder BIGINT
        ,price BIGINT
        ,channel VARCHAR
        ,url VARCHAR
        ,`dateTime` TIMESTAMP(3)
        ,extra VARCHAR >
    ,`dateTime` AS 
        CASE
          WHEN event_type = 0 THEN person.`dateTime`
          WHEN event_type = 1 THEN auction.`dateTime`
          ELSE bid.`dateTime`
        END
    ,WATERMARK FOR `dateTime` AS `dateTime` - INTERVAL '4' SECOND
)
WITH (
    'connector' = 'nexmark'
    -- 下面兩個參數為每秒數據生成速度
    ,'first-event.rate' = '1000'
    ,'next-event.rate' = '1000'
    -- 生成的數據總條數,過大可能導致 OOM
    ,'events.num' = '100000'
    -- 下面三個參數為 Bid/Auction/Persion 三個數據的生成佔比
    ,'person.proportion' = '2'
    ,'auction.proportion' = '24'
    ,'bid.proportion' = '24'
)
;

CREATE TEMPORARY VIEW auction_view
AS SELECT
    auction.id
    ,auction.itemName
    ,auction.description
    ,auction.initialBid
    ,auction.reserve
    ,`dateTime`
    ,auction.expires
    ,auction.seller
    ,auction.category
    ,auction.extra
FROM datagen
WHERE event_type = 1
;

CREATE TEMPORARY VIEW bid_view
AS SELECT
    bid.auction
    ,bid.bidder
    ,bid.price
    ,bid.channel
    ,bid.url
    ,`dateTime`
    ,bid.extra
FROM datagen
WHERE event_type = 2
;

INSERT INTO bid
SELECT
    *
FROM bid_view
;

INSERT INTO auction
SELECT
    *
FROM auction_view
;

在 Flink 目錄下,執行下面的語句,啓動兩個將 nexmark 模擬數據寫入源表的作業。

## 請確保在 Flink 目錄下執行該語句
## 注意:請將 ${your_path} 替換為 insert_data.sql 實際所在的目錄
./bin/sql-client.sh -f ${your_path}/insert_data.sql

觀察 Delta Join 作業

重新點擊 Flink UI 上的 Delta Join 作業,可以看到 Delta Join 作業正常在消費數據了。

image.png

現狀和未來工作

目前 Delta Join 仍然在持續演進中,Flink 2.2 已經支持了一些常用的 SQL pattern,具體可以參考文檔。

在未來,我們將會持續推進以下幾個方向:

  1. 持續完善最終一致性 Delta Join

    1. 支持 Left / Right Join
    2. 支持消費 Delete
    3. 支持級聯 Delta Join
  2. 結合 Paimon/Iceberg/Hudi 等支持快照的存儲,支持分鐘級的強一致性 Delta Join

參考

  1. Apache Flink 社區 Delta Join 用户文檔 https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/tuning/
  2. Apache Flink 社區 Delta Join FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin?src=contextnavpagetreemode
  3. Apache Fluss (Incubating) 社區 Delta Join 用户文檔  https://fluss.apache.org/docs/engine-flink/delta-joins/
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.