在使用 Flink SQL 進行實時數據處理的過程中,雙流 Join 是非常常見的操作之一。典型的場景包括分析廣告效果(曝光流訂單流實時關聯)、實時推薦(點擊流和商品信息)等等。然而,雙流 Join 需要在狀態中維護兩側全量的歷史數據,以確保計算結果的準確性。隨着作業的持續運行,雙流 Join 會逐漸帶來一些問題:
-
運維層面
- 狀態過大,開發者需要不斷加大作業的資源才能維持較高的吞吐。
- Checkpoint 易超時,導致作業不穩定、持續 Failover。
- 狀態是 Flink 內部產物,排查問題時,其內部數據難以探查。
-
開發層面
- Query 迭代修改後,狀態難以複用,且重啓回追代價高。
為了解決這些問題,Flink 社區在 2.1 引入了新的 Delta Join 算子,並在 2.2 對其進行了進一步的擴展。Delta Join 的核心思想是捨棄算子內本地狀態冗餘的數據存儲,利用雙向 Lookup Join 直接查詢源表中的數據,而非查詢狀態中的數據,從而複用源表數據。Delta Join 結合流存儲 Apache Fluss,在阿里巴巴淘寶天貓團隊成功落地,並且對比雙流 Join,擁有如下幾個優勢:
- 消除了將近 50 TB 的 雙流 Join 狀態
- 計算 CU 降低 10 倍
- 作業恢復速度提升 87 %
- Checkpoint 秒級完成
Flink Delta Join介紹請參考:《Delta Join:為超大規模流處理實現計算與歷史數據解耦》https://developer.aliyun.com/article/1690558
雙流 Join 實現原理
讓我們先簡單描述 Flink 雙流 Join 的工作原理。
我們以處理左側表來的 changelog 數據為例,流入的數據主要經過以下三個階段。
- 通過 Join Key 查詢對側(即右側)的狀態,獲取右側歷史上曾經流入該算子的全量數據。
- 使用 Join 條件過濾查詢得到的數據,並輸出。
- 將輸入的本條數據,存入本側(即左側)的狀態中,以供後續右側的數據來臨時,能正確的匹配數據。
之所以要把所有的數據用狀態記錄下來,是因為流計算是沒有邊界的,左側數據和右側數據匹配的時間點會存在時間差,即使一側的數據延遲到達,也需要保證可以關聯上另一側的數據,最終輸出。
雙流 Join 的算法確保了數據的正確性,但是其狀態會隨着時間的推移而無限制增大,成為影響作業資源消耗和穩定性的關鍵因素。雖然目前已有 Interval Join、Lookup Join、State TTL Hint 等手段來緩解或解決該問題,但是均面向了特定的業務場景,犧牲了一定的功能(如 Lookup Join 捨棄了維表側數據的更新追蹤,State TTL Hint 放棄匹配超過 TTL 期限的數據)。
Delta Join 技術原理
從雙流 Join 的原理上,我們可以觀察到,狀態裏記錄的全量數據,與源表中的數據基本相同,那麼一個直觀的想法是,可以複用源表的數據來取代原有的狀態。Delta Join 正是基於這個思路,它利用了外部存儲系統提供的索引能力,並不從狀態中查找數據,而是直接對外部存儲發出高效的、基於索引的數據查詢,以獲取匹配的記錄。通過這種方式,Delta Join 消除了雙流 Join 狀態與外部系統之間冗餘的數據存儲。
理論推導
我們以兩路輸入為例,增量更新 Join 結果的公式為:
Δ (A ⋈ B) =Δ A ⋈ B + A ⋈ Δ B + Δ A ⋈ Δ B = Δ A ⋈ (B + Δ B) + A ⋈ Δ B
其中,A 代表了左表的全量歷史數據, Δ A 代表了左表中的增量數據。B 和 Δ B 的定義與此類似。每當我們需要計算 Join 結果的增量部分時,我們只需要獲取源表中從上次計算到當前時間之間新生成的數據,並查詢對側源表中的歷史快照數據。因此我們需要:
- 感知源表的增量數據
- 訪問源表歷史快照數據
這對源表的物理存儲引擎提出了很高的要求,存儲引擎需要支持快照隔離,以確保強一致性語義。然而,目前存在以下幾個問題:
- 目前只有有限的存儲支持了快照的概念,例如 Paimon、Iceberg、Hudi 等等
- 快照生成的時間間隔為分鐘級別,無法滿足實時處理的要求
- 當指定快照查詢數據時,快照可能會在存儲系統中過期
考慮到上述這些問題,Flink 2.1 提出了一種滿足實時性要求的、最終一致性的 Delta Join 方案。
最終一致性語義的 Delta Join
最終一致性語義的 Delta Join 並不要求源表的存儲引擎支持快照。它總是去查詢源表當前最新的數據。其對應的變種公式如下:
Δ A ⋈ (B + Δ B) + (A + Δ A) ⋈ Δ B
和強一致性 Delta Join 相比,最終一致性 Delta Join 多出了一部分額外的中間結果 Δ A ⋈ Δ B,因此,這種方法只能確保最終的結果是一致的。
以下是雙流 Join 和兩種語義的 Delta Join 的對比。
| 雙流 Join | 強一致性 Delta Join | 最終一致性 Delta Join | |
|---|---|---|---|
| 延遲 | 低 | 高 | 低 |
| 狀態大小 | 大 | 小 | 小 |
| 狀態內數據詳情 | 兩側輸入全量明細數據 | 上一次觸發計算的源錶快照id | 等待觸發計算的異步隊列 |
| 數據一致性 | 強一致性 | 強一致性 | 最終一致性 |
Delta Join 算子實現
為了提高算子的吞吐,在 Delta Join 算子中,分別引入了一個 TableAsyncExecutionController 組件和兩個雙側完全相同的 DeltaJoinRunner 組件。
TableAsyncExecutionController 原理
該組件由 FLIP-519 Introduce async lookup key ordered mode 引入,其嚴格限制相同 key 之間的數據必須串型執行,而允許不同 key 之間的數據並行處理,同時結合異步處理機制,大大提高了算子的吞吐能力。
該組件的運行原理如下:
TableAsyncExecutionController在接收到數據後,按照 key 放入 BlockingBuffer 內不同 key 的隊列裏,然後通過 KeyAccountingUnit 檢查該 key 是否被搶佔、有對應的數據正在執行。如果 key 被搶佔,直接返回;如果 key 未被搶佔,則搶佔該 key ,同時 poll 隊列數據,放入 ActiveBuffer,交給後續計算邏輯處理,同時註冊回調函數,在數據處理結束、輸出後,在 KeyAccountingUnit 內釋放該 key,去 BlockingBuffer 內拿下一條數據。
這套機制保證了相同 key 之間的數據是串行執行的,以避免出現分佈式亂序問題。該機制在某種程度上是 FLIP-425 Asynchronous Execution Model 的簡化版本,感興趣的可以另行研究。
在實際場景下,Delta Join 算子的吞吐會受到 BlockingBuffer 能允許的最大容量(各個 key 的隊列大小之和)影響,當 BlockingBuffer 最大容量過小時,即使收到的每個 key 都不一樣,也會由於無法充分利用異步並行的能力而導致吞吐較小。此時,可以適當調整下面的參數,來增大 BlockingBuffer 的最大容量。但如果設置的過大,BlockingBuffer 會佔用比較高的內存,同時也可能會給外部存儲帶來較大的查詢壓力。
// 默認 100
table.exec.async-lookup.buffer-capacity: 1000
我們可以通過監測 Delta Join 算子內以下幾個 metric,來判斷是否需要調整該參數。
-
aec_blocking_size:當前 BlockingBuffer 內被阻塞的所有 key 的隊列大小之和。該值越大,代表 join key 較為密集,考慮開啓或增大 delta join cache;該值越小,但吞吐不佳的情況下,考慮增大
table.exec.async-lookup.buffer-capacity的值。 aec_inflight_size:當前 ActiveBuffer 內正在執行計算的數據數量。
該值越大,代表當前同時請求外部存儲集羣的數據較多,存在請求堆積的情況,需要進一步查看外部存儲系統是否存在異常,或查看是否有相關參數可以提高查詢效率;該值越小,代表 join key 較為密集,考慮開啓或增大 delta join cache。
注:當Fluss流存儲的表作為 Delta Join 的源表時,你可以通過 Flink Table Hint,在 Fluss 表上配置以下這些關鍵參數,來提高查詢效率。
client.lookup.queue-sizeclient.lookup.max-batch-sizeclient.lookup.max-inflight-requestsclient.lookup.batch-timeout
具體請參考 Fluss Connector Options。
DeltaJoinRunner 原理
DeltaJoinRunner 是負責執行 Lookup 的組件。由於 Delta Join 算子會處理兩側的數據,因此對於不同側的數據,各有一個完全相同的 DeltaJoinRunner 負責 Lookup 對應表的數據。
想象一下,如果我們對每條數據都要去外部存儲進行查詢,對外部吞吐的壓力會非常大,算子的吞吐性能完全取決於請求外部系統的吞吐。但如果用普通的 cache 來對 Lookup 的數據進行緩存,Lookup 目標表的數據更新消息將無法訂閲。為此,我們引入了驅動側僅構建、Lookup 側僅更新的特殊 cache。
DeltaJoinRunner 組件的運行原理如下(圖例是用於左側輸入流查詢右側源表的 DeltaJoinRunner),分別由 LocalCache 和 LookupFetcher 組成。
當左側數據到達時,先去 LocalCache 查詢是否有 cache。當有 cache 時,直接輸出;當沒有 cache 時,藉助 LookupFetcher 通過右表的 index 查詢右表的數據,然後將查詢回來的數據在 LocalCache 中構建 cache,最後輸出。
同時,右表的數據到達時,將會查看此 DeltaJoinRunner 中的 LocalCache 是否有 cache。如果沒有cache,忽略更新;如果有 cache,更新 cache。
該 cache 機制一方面確保了在 join key 較為密集的場景,算子的吞吐能夠得到巨大的提升,同時對外部存儲也不會構成很大的查詢壓力;另一方面,確保了對側最新的數據能夠更新 cache,從而在後續的流程中能被正確地匹配上。
該 cache 是一個 LRU 的 cache,合理的設置該 cache 的大小是非常必要的。過小的 cache 大小將導致 cache 的命中率受到影響,過大的 cache 會佔用較多的內存。我們可以通過下面的參數來分別調節左右兩側 cache 的大小,甚至是在每條數據 join key 都不相同、cache 基本無用時關閉 cache。
// 是否啓用cache,默認為 true
table.exec.delta-join.cache-enabled: true
// 設置用於緩存左表數據的cache大小,默認為 10000
// 推薦在左表較小、或右流 join key 較為密集時,設置較大值
table.exec.delta-join.left.cache-size: 10000
// 設置用於緩存右表數據的cache大小,默認為 10000
// 推薦在右表較小、或左流 join key 較為密集時,設置較大值
table.exec.delta-join.right.cache-size: 10000
我們可以通過監測 Delta Join 算子上的 metric,來判斷是否需要適當增加 cache 的大小。
deltaJoin_leftCache_hitRate: 在右流查詢左表的場景下,緩存左表數據的 cache 的命中率百分比。該值越高越好。deltaJoin_rightCache_hitRate:在左流查詢右表的場景下,緩存右表數據的 cache 的命中率百分比。該值越高越好。
注:該圖來自於“實戰”章節 Nexmark q20 變種 query。右表 Auction 表每次都產生不同的id,故而 deltaJoin_leftCache_hitRate的命中率始終為 0。
實戰
我們借用 nexmark 數據集 中 q20 的 query,略微修改後,作為本次實戰的樣例代碼。
-- 獲取包含相應拍賣信息的出價表
INSERT INTO nexmark_q20
SELECT
auction, bidder, price, channel, url, B.`dateTime`, B.extra,
itemName, description, initialBid, reserve, A.`dateTime`, expires, seller, category, A.extra
FROM
bid AS B INNER JOIN auction AS A on B.auction = A.id;
-- WHERE A.category = 10;
方式一:使用 Docker 環境測試
環境準備
- 類 Unix 操作系統,如 Linux、Mac OS X
- 內存建議至少 4 GB,磁盤建議至少 4 GB
下載 Docker 鏡像
在命令行中,運行如下命令安裝 Docker 測試鏡像。
docker pull xuyangzzz/delta_join_example:1.0
運行如下命令運行該測試鏡像,進入測試 docker container 的命令行。
docker run -p 8081:8081 -p 9123:9123 --name delta_join_example -it xuyangzzz/delta_join_example:1.0 bash
運行任務 SQL
在測試 docker container 中執行下面的命令,運行 Flink 和 Fluss 集羣,創建相關表和 Delta Join 作業。
# 運行 flink 和 fluss 集羣
./start-flink-fluss.sh
# 創建相關表和 delta join 作業
./create-tables-and-run-delta-join.sh
此時,在宿主機 localhost:8081(或其他綁定的端口)即可查看 Flink UI 界面,可以看到此時 Delta Join 作業正在運行。
插入數據到源表
在測試 docker container 中執行下面的命令,為源表插入數據。
# 在源表插入數據
./insert-data.sh
觀察 Delta Join 作業
在宿主機localhost:8081(或其他綁定的端口)的 flink-ui 界面,就可以看到 Delta Join 作業在正常消費數據了。
方式二:手工搭建環境測試
環境準備
運行環境
- 類 Unix 操作系統,如 Linux、Mac OS X
- 內存建議至少 4 GB,磁盤建議至少 4 GB
- Java 11 及以上版本,且將環境變量
JAVA_HOME設置為 Java 的安裝目錄
準備 Apache Flink 計算引擎
-
下載
在 Apache Flink 官方下載網站下載最新的 Flink 2.2.0 版本,並解壓。
-
修改相關配置
修改 ./conf/config.yaml 文件,將 TaskManager numberOfTaskSlots 設置成 4 (默認為1)
準備 Apache Fluss 流存儲引擎
在 Apache Fluss 官方下載網站分別下載 Fluss 0.8 版本(並解壓)和適配 Apahce Flink 2.1 的連接器。
準備 Nexmark 源數據生成器
下載 Nexmark 項目 master 分支,在該項目根目錄下,用 maven-3.8.6 版本執行以下的 maven 命令
mvn clean install -DskipTests=true
在"./nexmark-flink/target/" 文件夾下,將會生成 nexmark-flink-0.3-SNAPSHOT.jar 文件
也可以直接使用下面準備好的 nexmark-flink-0.3-SNAPSHOT.jar 文件
請至釘釘文檔查看附件《nexmark-flink-0.3-SNAPSHOT.jar》
服務啓動
-
啓動 Flink
將 Fluss 適配 Flink 2.1 的連接器,以及 Nexmark 項目生成的 nexmark-flink-0.3-SNAPSHOT.jar 文件,放入 Flink 目錄的 ./lib 目錄下。
參考 Flink 本地模式安裝文檔,在 Flink 目錄中,執行下面的語句,啓動本地 Standalone 集羣。
## 請確保在 Flink 目錄下執行該語句 ./bin/start-cluster.sh檢查 http://localhost:8081/#/overview 界面是否可正常訪問。
-
啓動 Fluss
參考 Fluss 部署Local Cluster文檔,在 Fluss 目錄下,執行下面的語句,啓動本地集羣。
## 請確保在 Fluss 目錄下執行該語句
./bin/local-cluster.sh start
運行任務 SQL
3.1創建 Fluss 表
將下面的 SQL 代碼保存為“prepare\_table.sql”文件,其中定義了 2 張源表和 1 張結果表。
CREATE CATALOG fluss_catalog
WITH (
'type' = 'fluss'
,'bootstrap.servers' = 'localhost:9123'
);
USE CATALOG fluss_catalog;
CREATE DATABASE IF NOT EXISTS my_db;
USE my_db;
-- 創建左側源表
CREATE TABLE IF NOT EXISTS fluss_catalog.my_db.bid
(
auction BIGINT
,bidder BIGINT
,price BIGINT
,channel VARCHAR
,url VARCHAR
,`dateTime` TIMESTAMP(3)
,extra VARCHAR
,PRIMARY KEY (auction, bidder) NOT ENFORCED
)
WITH (
-- fluss prefix lookup key,可用於 index
'bucket.key' = 'auction'
-- Flink 2.2 中,delta join 僅支持消費不帶 delete 操作的 cdc 源表
,'table.delete.behavior' = 'IGNORE'
);
-- 創建右側源表
CREATE TABLE IF NOT EXISTS fluss_catalog.my_db.auction
(
id BIGINT
,itemName VARCHAR
,description VARCHAR
,initialBid BIGINT
,reserve BIGINT
,`dateTime` TIMESTAMP(3)
,expires TIMESTAMP(3)
,seller BIGINT
,category BIGINT
,extra VARCHAR
,PRIMARY KEY (id) NOT ENFORCED
)
WITH (
-- Flink 2.2 中,delta join 僅支持消費不帶 delete 操作的 cdc 源表
'table.delete.behavior' = 'IGNORE'
);
-- 創建 delta join 寫入的結果表
CREATE TABLE fluss_catalog.my_db.delta_join_sink
(
auction BIGINT
,bidder BIGINT
,price BIGINT
,channel VARCHAR
,url VARCHAR
,bid_dateTime TIMESTAMP(3)
,bid_extra VARCHAR
,itemName VARCHAR
,description VARCHAR
,initialBid BIGINT
,reserve BIGINT
,auction_dateTime TIMESTAMP(3)
,expires TIMESTAMP(3)
,seller BIGINT
,category BIGINT
,auction_extra VARCHAR
,PRIMARY KEY (auction, bidder) NOT ENFORCED
);
在 Flink 目錄下,執行下面的語句,創建持久化的表。
## 請確保在 Flink 目錄下執行該語句
## 注意:請將 ${your_path} 替換為 prepare_table.sql 實際所在的目錄
./bin/sql-client.sh -f ${your_path}/prepare_table.sql
3.2 啓動 Delta Join 作業
將下面的 SQL 代碼保存為“run\_delta\_join.sql”文件,其中包含了可轉化為 delta join 的 q20 變體查詢。
CREATE CATALOG fluss_catalog
WITH (
'type' = 'fluss'
,'bootstrap.servers' = 'localhost:9123'
);
USE CATALOG fluss_catalog;
USE my_db;
INSERT INTO delta_join_sink
SELECT
auction
,bidder
,price
,channel
,url
,B.`dateTime`
,B.extra
,itemName
,description
,initialBid
,reserve
,A.`dateTime`
,expires
,seller
,category
,A.extra
FROM bid AS B
INNER JOIN auction AS A
ON B.auction = A.id;
在 Flink 目錄下,執行下面的語句,啓動 delta join 作業。
## 請確保在 Flink 目錄下執行該語句
## 注意:請將 ${your_path} 替換為 run_delta_join.sql 實際所在的目錄
./bin/sql-client.sh -f ${your_path}/run_delta_join.sql
在 Flink UI 上,我們可以看到 Delta Join 作業正常跑起來了。
插入數據到源表
將下面的 SQL 代碼保存為“insert\_data.sql”文件,其中包含了向兩張源表灌入 Nexmark 數據源產生模擬數據的作業。
CREATE CATALOG fluss_catalog
WITH (
'type' = 'fluss'
,'bootstrap.servers' = 'localhost:9123'
);
USE CATALOG fluss_catalog;
USE my_db;
-- nexmark 模擬數據源
CREATE TEMPORARY TABLE datagen
(
event_type int
,person ROW<
id BIGINT
,name VARCHAR
,emailAddress VARCHAR
,creditCard VARCHAR
,city VARCHAR
,state VARCHAR
,`dateTime` TIMESTAMP(3)
,extra VARCHAR >
,auction ROW<
id BIGINT
,itemName VARCHAR
,description VARCHAR
,initialBid BIGINT
,reserve BIGINT
,`dateTime` TIMESTAMP(3)
,expires TIMESTAMP(3)
,seller BIGINT
,category BIGINT
,extra VARCHAR >
,bid ROW<
auction BIGINT
,bidder BIGINT
,price BIGINT
,channel VARCHAR
,url VARCHAR
,`dateTime` TIMESTAMP(3)
,extra VARCHAR >
,`dateTime` AS
CASE
WHEN event_type = 0 THEN person.`dateTime`
WHEN event_type = 1 THEN auction.`dateTime`
ELSE bid.`dateTime`
END
,WATERMARK FOR `dateTime` AS `dateTime` - INTERVAL '4' SECOND
)
WITH (
'connector' = 'nexmark'
-- 下面兩個參數為每秒數據生成速度
,'first-event.rate' = '1000'
,'next-event.rate' = '1000'
-- 生成的數據總條數,過大可能導致 OOM
,'events.num' = '100000'
-- 下面三個參數為 Bid/Auction/Persion 三個數據的生成佔比
,'person.proportion' = '2'
,'auction.proportion' = '24'
,'bid.proportion' = '24'
)
;
CREATE TEMPORARY VIEW auction_view
AS SELECT
auction.id
,auction.itemName
,auction.description
,auction.initialBid
,auction.reserve
,`dateTime`
,auction.expires
,auction.seller
,auction.category
,auction.extra
FROM datagen
WHERE event_type = 1
;
CREATE TEMPORARY VIEW bid_view
AS SELECT
bid.auction
,bid.bidder
,bid.price
,bid.channel
,bid.url
,`dateTime`
,bid.extra
FROM datagen
WHERE event_type = 2
;
INSERT INTO bid
SELECT
*
FROM bid_view
;
INSERT INTO auction
SELECT
*
FROM auction_view
;
在 Flink 目錄下,執行下面的語句,啓動兩個將 nexmark 模擬數據寫入源表的作業。
## 請確保在 Flink 目錄下執行該語句
## 注意:請將 ${your_path} 替換為 insert_data.sql 實際所在的目錄
./bin/sql-client.sh -f ${your_path}/insert_data.sql
觀察 Delta Join 作業
重新點擊 Flink UI 上的 Delta Join 作業,可以看到 Delta Join 作業正常在消費數據了。
現狀和未來工作
目前 Delta Join 仍然在持續演進中,Flink 2.2 已經支持了一些常用的 SQL pattern,具體可以參考文檔。
在未來,我們將會持續推進以下幾個方向:
-
持續完善最終一致性 Delta Join
- 支持 Left / Right Join
- 支持消費 Delete
- 支持級聯 Delta Join
- 結合 Paimon/Iceberg/Hudi 等支持快照的存儲,支持分鐘級的強一致性 Delta Join
參考
- Apache Flink 社區 Delta Join 用户文檔 https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/tuning/
- Apache Flink 社區 Delta Join FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin?src=contextnavpagetreemode
- Apache Fluss (Incubating) 社區 Delta Join 用户文檔 https://fluss.apache.org/docs/engine-flink/delta-joins/