背景

火山引擎增長分析DataFinder基於ClickHouse來進行行為日誌的分析,ClickHouse的主要版本是基於社區版改進開發的字節內部版本。主要的表結構:

火山雲服務器開放所有端口_數據

事件表:存儲用户行為數據,以用户ID分shard存儲。

--列出了主要的字段信息
CREATE TABLE tob_apps_all
(
    `tea_app_id`                UInt32,  --應用ID
    `device_id`             String DEFAULT '', --設備ID
    `time`                  UInt64,--事件日誌接受時間
    `event`                 String,--事件名稱
    `user_unique_id`        String,--用户ID
    `event_date`            Date,--事件日誌日期,由time轉換而來
    `hash_uid`              UInt64 --用户ID hash過後的id,用來join降低內存消耗
)│

 

用户表:存儲用户的屬性數據,以用户ID分shard存儲。

--列出了主要的字段信息
CREATE TABLE users_unique_all
(
    `tea_app_id`            UInt32,            --應用ID
    `user_unique_id`        String DEFAULT '', -- 用户ID
    `device_id`             String DEFAULT '', -- 用户最近的設備ID
    `hash_uid`              UInt64,--用户ID hash過後的id,用來join降低內存消耗
    `update_time`           UInt64,--最近一次更新時間
    `last_active_date`      Date   --用户最後活躍日期
)

 

設備表:存儲設備相關的數據,以設備ID分shard存儲。

--列出了主要的字段信息
CREATE TABLE devices_all
(
    `tea_app_id`            UInt32,            --應用ID
    `device_id`             String DEFAULT '', --設備ID    
    `update_time`           UInt64,            --最近一次更新時間
    `last_active_date`      Date               --用户最後活躍日期
)

 

業務對象表:存儲業務對象相關的數據,每個shard存儲全量的數據

--列出了主要的字段信息
CREATE TABLE rangers.items_all
(
    `tea_app_id`            UInt32,
    `hash_item_id`          Int64,
    `item_name`             String, --業務對象名稱。比如商品
    `item_id`               String, --業務對象ID。比如商品id 1000001
    `last_active_date`      Date
)

 

業務挑戰

火山雲服務器開放所有端口_火山雲服務器開放所有端口_02


隨着接入應用以及應用的DAU日益增加,ClickHouse表的事件量增長迅速;並且基於行為數據需要分析的業務指標越來越複雜,需要JOIN的表增多;我們遇到有一些涉及到JOIN的複雜SQL執行效率低,內存和CPU資源佔用高,導致分析接口響應時延和錯誤率增加。

關於Clickhouse的JOIN

在介紹優化之前,先介紹一下基本的ClickHouse JOIN的類型和實現方式

1. 分佈式JOIN

火山雲服務器開放所有端口_子查詢_03

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')

火山雲服務器開放所有端口_子查詢_04

基本執行過程:

  1. 一個Clickhouse節點作為Coordinator節點,給每個節點分發子查詢,子查詢sql(tob_apps_all替換成本地表,users_unique_all保持不變依然是分佈式表)
  2. 每個節點執行Coordinator分發的sql時,發現users_unique_all是分佈式表,就會去所有節點上去查詢以下SQL(一共有N*N。N為shard數量)
  • SELECT device_id, hash_uid FROM users_unique WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
  1. 每個節點從其他N-1個節點拉取2中子查詢的全部數據,全量存儲(內存or文件),進行本地JOIN
  2. Coordinator節點從每個節點拉取3中的結果集,然後做處理返回給client
    存在的問題:
  3. 子查詢數量放大
  4. 每個節點都全量存儲全量的數據

2. 分佈式Global JOIN

火山雲服務器開放所有端口_火山雲服務器開放所有端口_05

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')

火山雲服務器開放所有端口_數據_06

 

基本執行過程:

  1. 一個Clickhouse節點作為Coordinator節點,分發查詢。在每個節點上執行sql(tob_apps_all替換成本地表,右查詢替換成別名ut)
  2. Coordinator節點去其他節點拉取users_unique_all的全部數據,然後分發到全部節點(作為1中別名表ut的數據)
  3. 每個節點都會存儲全量的2中分發的數據(內存or文件),進行本地local join
  4. Coordinator節點從每個節點拉取3中的結果集,然後做處理返回給client
    存在的問題:
  5. 每個節點都全量存儲數據
  6. 如果右表較大,分發的數據較大,會佔用網絡帶寬資源

3. 本地JOIN

SQL裏面只有本地表的JOIN,只會在當前節點執行

火山雲服務器開放所有端口_火山雲服務器開放所有端口_07

SELECT et.os_name,ut.device_id AS user_device_id
FROM tob_apps et any LEFT JOIN
    (SELECT device_id,
         hash_uid
    FROM rangers.users_unique
    WHERE tea_app_id = 268411
            AND last_active_date>='2022-08-06') ut
    ON et.hash_uid=ut.hash_uid
WHERE tea_app_id = 268411
        AND event='app_launch'
        AND event_date='2022-08-06'

火山雲服務器開放所有端口_數據_08

 

3.1 Hash join

  • 右表全部數據加載到內存,再在內存構建hash table。key為joinkey
  • 從左表分批讀取數據,從右表hash table匹配數據
  • 優點是:速度快 缺點是:右表數據量大的情況下佔用內存
    3.2 Merge join
  • 對右表排序,內部 block 切分,超出內存部分 flush 到磁盤上,內存大小通過參數設定
  • 左表基於 block 排序,按照每個 block 依次與右表 merge
  • 優點是:能有效控制內存 缺點是:大數據情況下速度會慢
    優先使用hash join當內存達到一定閾值後再使用merge join,優先滿足性能要求

解決方案

火山雲服務器開放所有端口_子查詢_09

1. 避免JOIN

1.1 數據預生成

數據預生成(由Spark/Flink或者Clickhouse物化視圖產出數據),形成大寬表,基於單表的查詢是ClickHouse最為擅長的場景
我們有個指標,實現的SQL比較複雜(如下),每次實時查詢很耗時,我們單獨建了一個表table,由Spark每日構建出這個指標,查詢時直接基於table查詢

火山雲服務器開放所有端口_火山雲服務器開放所有端口_10

SELECT event_date,count(distinct uc1) AS uv,sum(value) AS sum_value, ......
FROM
    (SELECT event_date,hash_uid AS uc1,sum(et.float_params{'amount'}) AS value, count(1) AS cnt, value*cnt AS multiple
    FROM tob_apps_all et GLOBAL ANY LEFT JOIN
        (SELECT hash_uid AS join_key,int_profiles{'$ab_time_34'}*1000 AS first_time
        FROM users_unique_all
        WHERE app_id = 10000000 AND last_active_date >= '2022-07-19' AND first_time is NOT null) upt
            ON et.hash_uid=upt.join_key
        WHERE (查詢條件)
        GROUP BY  uc1,event_date)
GROUP BY event_date;

火山雲服務器開放所有端口_火山雲服務器開放所有端口_11

 

數據量2300W,查詢時間由7秒->0.008秒。當然這種方式,需要維護額外的數據構建任務。總的思路就是不要讓ClickHouse實時去JOIN

火山雲服務器開放所有端口_SQL_12

1.2 使用IN代替JOIN

JOIN需要基於內存構建hash table且需要存儲右表全部的數據,然後再去匹配左表的數據。而IN查詢會對右表的全部數據構建hash set,但是不需要匹配左表的數據,且不需要回寫數據到block
比如

火山雲服務器開放所有端口_SQL_13

SELECT event_date, count()
FROM tob_apps_all et global any INNER JOIN
    (SELECT hash_uid AS join_key
    FROM users_unique_all
    WHERE app_id = 10000000
            AND last_active_date >= '2022-01-01') upt
    ON et.hash_uid = upt.join_key
WHERE app_id = 10000000
        AND event_date >= '2022-01-01'
        AND event_date <= '2022-08-02'
GROUP BY  event_date

火山雲服務器開放所有端口_火山雲服務器開放所有端口_14

 

可以改成如下形式:

火山雲服務器開放所有端口_SQL_15

SELECT event_date,
         count()
FROM tob_apps_all
WHERE app_id = 10000000
        AND event_date >= '2022-01-01'
        AND event_date <= '2022-08-02'
        AND hash_uid global IN 
    (SELECT hash_uid
    FROM users_unique_all
    WHERE (tea_app_id = 10000000)
            AND (last_active_date >= '2022-01-01') )
 GROUP BY event_date

火山雲服務器開放所有端口_火山雲服務器開放所有端口_16

 

如果需要從右表提取出屬性到外層進行計算,則不能使用IN來代替JOIN相同的條件下,上面的測試SQL,由JOIN時的16秒優化到了IN查詢時的11秒

火山雲服務器開放所有端口_數據_17

2. 更快的JOIN

2.1 優先本地JOIN

2.1.1 數據預先相同規則分區
也就是Colocate JOIN。優先將需要關聯的表按照相同的規則進行分佈,查詢時就不需要分佈式的JOIN

火山雲服務器開放所有端口_數據_18

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

火山雲服務器開放所有端口_子查詢_19

 

比如事件表tob_apps_all和用户表users_unique_all都是按照用户ID來分shard存儲的,相同的用户的兩個表的數據都在同一個shard上,因此這兩個表的JOIN就不需要分佈式JOIN了
distributed_perfect_shard這個settings key是字節內部ClickHouse支持的,設置過這個參數,指定執行計劃時就不會再執行分佈式JOIN了
基本執行過程:

  1. 一個ClickHouse節點作為Coordinator節點,分發查詢。在每個節點上執行sql(tob_apps_all、users_unique_all替換成本地表)
  2. 每個節點都執行1中分發的本地表join的SQL(這一步不再分發右表全量的數據)
  3. 數據再回傳到coordinator節點,然後返回給client

2.1.2 數據冗餘存儲
如果一個表的數據量比較小,可以不分shard存儲,每個shard都存儲全量的數據,例如我們的業務對象表。查詢時,不需要分佈式JOIN,直接在本地進行JOIN即可

火山雲服務器開放所有端口_SQL_20

SELECT count()
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT item_id
    FROM items_all 
    WHERE (tea_app_id = 268411)
) AS it ON et.item_id = it.item_id
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

火山雲服務器開放所有端口_火山雲服務器開放所有端口_21

 

例如這個SQL,items_all表每個shard都存儲同樣的數據,這樣也可以避免分佈式JOIN帶來的查詢放大和全表數據分發問題

2.2 更少的數據

不論是分佈式JOIN還是本地JOIN,都需要儘量讓少的數據參與JOIN,既能提升查詢速度也能減少資源消耗

2.2.1 SQL下推

ClickHouse對SQL的下推做的不太好,有些複雜的SQL下推會失效。因此,我們手動對SQL做了下推,目前正在測試基於查詢優化器來幫助實現下推優化,以便讓SQL更加簡潔
下推的SQL:

火山雲服務器開放所有端口_數據_22

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) 
        AND (last_active_date >= '2022-08-06'
        AND 用户屬性條件1  OR 用户屬性條件2)
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

火山雲服務器開放所有端口_子查詢_23

 

對應的不下推的SQL:

火山雲服務器開放所有端口_SQL_24

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM rangers.users_unique_all 
    WHERE (tea_app_id = 268411) 
        AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
AND (ut.用户屬性條件1  OR ut.用户屬性條件2)
settings distributed_perfect_shard=1

火山雲服務器開放所有端口_火山雲服務器開放所有端口_25

 

可以看到,不下推的SQL更加簡潔,直接基於JOIN過後的寬表進行過濾。但是ClickHouse可能會將不滿足條件的users_unique_all數據也進行JOIN我們使用中有一個複雜的case,用户表過濾條件不下推有1千萬+,SQL執行了3000秒依然執行超時,而做了下推之後60秒內就執行成功了

火山雲服務器開放所有端口_子查詢_26

2.3Clickhouse引擎層優化

一個SQL實際在Clickhouse如何執行,對SQL的執行時間和資源消耗至關重要。社區版的Clickhouse在執行模型和SQL優化器上還要改進的空間,尤其是複雜SQL以及多JOIN的場景下

執行模型優化社區版的Clickhouse

目前還是一個兩階段執行的執行模型。第一階段,Coordinator在收到查詢後,將請求發送給對應的Worker節點。第二階段,Worker節點完成計算,Coordinator在收到各Worker節點的數據後進行匯聚和處理,並將處理後的結果返回。

火山雲服務器開放所有端口_數據_27


有以下幾個問題:

  1. 第二階段的計算比較複雜時,Coordinator的節點計算壓力大,容易成為瓶頸
  2. 不支持shuffle join,hash join時右表為大表時構建慢,容易OOM
  3. 對複雜查詢的支持不友好

字節跳動ClickHouse團隊為了解決上述問題,改進了執行模型,參考其他的分佈式數據庫引擎(例如Presto等),將一個複雜的Query按數據交換情況切分成多個 Stage,各Stage之間則通過Exchange完成數據交換。根據Stage依賴關係定義拓撲結構,產生DAG圖,並根據DAG圖調度Stage。例如兩表Join,會先調度左右表讀取Stage,之後再調度Join這個Stage,Join的Stage依賴於左右表的Stage。

火山雲服務器開放所有端口_子查詢_28


舉個例子

火山雲服務器開放所有端口_子查詢_29

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id, 
    dt.hash_did AS device_hashid
FROM tob_apps_all AS et 
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_did
    FROM devices_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS dt ON et.device_id = dt.device_id
WHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
LIMIT 10

火山雲服務器開放所有端口_SQL_30

 

Stage執行模型基本過程(可能的):

  • 讀取tob_apps_all數據,按照join key(hash_uid)進行shuffle,數據分發到每個節點。這是一個Stage
  • 讀取users_unique_all數據,按照join key(hash_uid)進行shuffle,數據分發到每個節點。這是一個Stage
  • 上述兩個表的數據,在每個節點上的數據進行本地join,然後再按照join key(device_id)進行shuffle。這是一個Stage
  • 讀取devices_all數據,按照join key(device_id)進行shuffle,這是一個Stage
  • 第3步、第4步的數據,相同join key(device_id)的數據都在同一個節點上,然後進行本地JOIN,這是一個Stage
  • 彙總數據,返回limit 10的數據。這是一個Stage
    統計效果如下:

    查詢優化器
    有了上面的stage的執行模型,可以靈活調整SQL的執行順序,字節跳動Clickhouse團隊自研了查詢優化器,根據優化規則(基於規則和代價預估)對SQL的執行計劃進行轉換,一個執行計劃經過優化規則後會變成另外一個執行計劃,能夠準確的選擇出一條效率最高的執行路徑,然後構建Stage的DAG圖,大幅度降低查詢時間
    下圖描述了整個查詢的執行流程,從 SQL parse 到執行期間所有內容全部進行了重新實現(其中紫色模塊),構建了一套完整的且規範的查詢優化器。

    還是上面的三表JOIN的例子,可能的一個執行過程是:
  • 查詢優化器發現users_unique_all表與tob_apps_all表的分shard規則一樣(基於用户ID),所以就不會先對錶按 join key 進行 shuffle,users_unique與tob_apps直接基於本地表JOIN,然後再按照join key(device_id)進行shuffle。這是一個Stage
  • 查詢優化器根據規則或者代價預估決定設備表devices_all是需要broadcast join還是shuffle join 如果broadcast join:在一個節點查到全部的device數據,然後分發到其他節點。這是一個Stage 如果shuffle join:在每個節點對device數據按照join key(device_id)進行shuffle。這是一個Stage
  • 彙總數據,返回limit 10的數據。這是一個Stage

效果:可以看到,查詢優化器能優化典型的複雜的SQL的執行效率,縮短執行時間

火山雲服務器開放所有端口_數據_31

總結

ClickHouse最為擅長的領域是一個大寬表來進行查詢,多表JOIN時Clickhouse性能表現不佳。作為業內領先的用户分析與運營平台,火山引擎增長分析DataFinder基於海量數據做到了複雜指標能夠秒級查詢。本文介紹了我們是如何優化Clickhouse JOIN查詢的。
主要有以下幾個方面:

  1. 減少參與JOIN的表以及數據量
  2. 優先使用本地JOIN,避免分佈式JOIN帶來的性能損耗
  3. 優化本地JOIN,優先使用內存進行JOIN
  4. 優化分佈式JOIN的執行邏輯,依託於字節跳動對ClickHouse的深度定製化