數據傾斜是指,map /reduce程序執行時,reduce節點大部分執行完畢,但是有一個或者幾個reduce節點運行很慢,導致整個程序的處理時間很長,這是因為某一個key的條數比其他key多很多(有時是百倍或者千倍之多),這條key所在的reduce節點所處理的數據量比其他節點就大很多,從而導致某幾個節點遲遲運行不完。
阿里的這篇比較實用,通俗易懂:數據傾斜總結
有篇分析比較詳細,如果需要使用可以細讀: 淺析 Hadoop 中的數據傾斜
· 症狀和原因:
· 操作:join,group by,count distinct
· 原因:key分佈不均勻,人為的建表疏忽,業務數據特點。
· 症狀:任務進度長時間維持在99%(或100%),查看任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成;查看未完成的子任務,可以看到本地讀寫數據量積累非常大,通常超過10GB可以認定為發生數據傾斜。
· 傾斜度:平均記錄數超過50w且最大記錄數是超過平均記錄數的4倍;最長時長比平均時長超過4分鐘,且最大時長超過平均時長的2倍。
我遇到的問題:
select * from a join b;
1. a表1000多萬,b表不到2億,用mapjoin顯然不行;
2. 設置參數 set hive.groupby.skewindata=true,不起作用;
3. 由於關連鍵為手機號,自認為業務數據上不存在數據傾斜;
後來通過查看每個表裏面關聯鍵的分佈,才發現兩個表裏面都存在空串'',而且嚴重傾斜,大表裏面的空串數量有400多萬。
將兩個表的空串過濾後再進行關聯,job時間由原來的40多分鐘減少到2分鐘。
總結:
1. 數據傾斜的原因就那麼幾種,逐一排查;
2. 細心,動手,不能光憑感覺來判定;
3. 判定某一個表的key是否存在數據傾斜,就是group by key,取top N來看;
附:數據傾斜常用解決方法:
1. 萬能膏藥:hive.groupby.skewindata=true
2. 大小表關聯:將key相對分散,並且數據量小的表放在join的左邊,這樣有效減少內存溢出錯誤發生機率;再進一步,可用map join讓小的維度表(1000條以下的記錄條數)先進內存。在map端完成reduce.
3. 大表和大表關聯:把空值的key變成一個字符串加上隨機數,把傾斜的數據分到不同的reduce上,由於null值關聯不上,處理後並不影響最終結果。
4. count distinct大量相同特殊值:(空值單獨處理)count distinct時,將值為空的情況單獨處理,如果是計算count distinct,可以不用處理,直接過濾,在最後結果中加1。如果還有其他計算,需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。
用hadoop程序進行數據關聯時,常碰到數據傾斜的情況,這裏提供一種解決方法。
(1)設置一個hash份數N,用來對條數眾多的key進行打散。
(2)對有多條重複key的那份數據進行處理:從1到N將數字加在key後面作為新key,如果需要和另一份數據關聯的話,則要重寫比較類和分發類(方法如上篇《hadoop job解決大數據量關聯的一種方法》)。如此實現多條key的平均分發。
int iNum = iNum % iHashNum;
String strKey = key + CTRLC + String.valueOf(iNum) + CTRLB + “B”;
(3)上一步之後,key被平均分散到很多不同的reduce節點。如果需要和其他數據關聯,為了保證每個reduce節點上都有關聯的key,對另一份單一key的數據進行處理:循環的從1到N將數字加在key後面作為新key
for(int i = 0; i < iHashNum; ++i){
String strKey =key + CTRLC + String.valueOf(i) ;
output.collect(new Text(strKey), new Text(strValues));}
以此解決數據傾斜的問題,經試驗大大減少了程序的運行時間。但此方法會成倍的增加其中一份數據的數據量,以增加shuffle數據量為代價,所以使用此方法時,要多次試驗,取一個最佳的hash份數值。
======================================
用上述的方法雖然可以解決數據傾斜,但是當關聯的數據量巨大時,如果成倍的增長某份數據,會導致reduce shuffle的數據量變的巨大,得不償失,從而無法解決運行時間慢的問題。
有一個新的辦法可以解決成倍增長數據的缺陷:
在兩份數據中找共同點。用重複較少的某個屬性作為分佈給reduce的依據。比如兩份數據裏除了關聯的字段以外,還有另外相同含義的字段,如果這個字段在所有log中的重複率比較小,則用這個字段計算hash值,如果是數字,直接用來模hash的份數,如果是字符可以用hashcode來模hash的份數(當然數字為了避免落到同一個reduce上的數據過多,也可以用hashcode),這樣如果字段值分佈足夠平均就可以解決上述問題。
我到過的處理的方式
1.mapjoin方式
/*+ MAPJOIN(c,d,e,f) */
其中c,d,e,f是你小表,也就是説可能會傾斜數據的表;
但是對於join,在判斷小表不大於1G的情況下,使用map join,也就是要考慮c,d,e,f等表的大小,不能超過內存限制,否則會出現OOM錯誤;
2.控制空值分佈
Java代碼
1. select
2. '${date}' as thedate,
3. a.search_type,
4. a.query,
5. a.category,
6. a.cat_name,
7. a.brand_id,
8. a.brand_name,
9. a.dir_type,
10. a.rewcatid,
11. a.new_cat_name,
12. a.new_brand_id,
13. f.brand_name as new_brand_name,
14. a.pv,
15. a.uv,
16. a.ipv,
17. a.ipvuv,
18. a.trans_amt,
19. a.trans_num,
20. a.alipay_uv
21. from fdi_search_query_cat_qp_temp a
22. left outer join brand f
23. on
24. f.pt='${date}000000'
25. and case when a.new_brand_id is null then concat('hive',rand() ) else a.new_brand_id end = f.brand_id;
1. select
2. '${date}' as thedate,
3. a.search_type,
4. a.query,
5. a.category,
6. a.cat_name,
7. a.brand_id,
8. a.brand_name,
9. a.dir_type,
10. a.rewcatid,
11. a.new_cat_name,
12. a.new_brand_id,
13. f.brand_name as new_brand_name,
14. a.pv,
15. a.uv,
16. a.ipv,
17. a.ipvuv,
18. a.trans_amt,
19. a.trans_num,
20. a.alipay_uv
21. from fdi_search_query_cat_qp_temp a
22. left outer join brand f
23. on
24. f.pt='${date}000000'
25. and case when a.new_brand_id is null then concat('hive',rand() ) else a.new_brand_id end = f.brand_id;
這樣的寫法把空值的 key 變成一個字符串加上隨機數,就能把傾斜的數據分到不同的reduce上 ,解決數據傾斜問題。
如果上述的方法還不能解決,比如當有多個JOIN的時候,建議建立臨時表,然後拆分HIVE SQL語句; 關於數據傾斜,阿里集團數據平台上的博客文章有很好的幾個方法,敢興趣的人也可以去看一下:
3.關於nonstrict
join同樣一張表多次的時候,會出現這樣的錯誤信息:
FAILED: Error in semantic analysis: In strict mode, cartesian product is not allowed. If you really want to perform the operation, set hive.mapred.mode=nonstrict
解決方式是在SQL前面加上如下:
set hive.mapred.mode=nonstrict;
strict模式在下面三種情況下有限制:
(1) partition表需要加上分區裁剪
(2) order by 只有一個reduce,需要加上limit
(3) join時,如果只有一個reduce,笛卡爾積不支持。
HIVE小技巧:
1.hive sql中:
sum(t.shop_gmvcount + t.GMVCOUNT_NEW + t.auc_shop_gmvcount + t.spu_gmv_cnt) gmv_cnt,
這樣的統計結果,當t.t.shop_gmvcount為NULL時,即使後面的t.GMVCOUNT_NEW 不為null,那麼總計的結果這個計算仍然是NULL;
修改的方法是:採用sum(coalesce(t.shop_gmvcount,cast(0 as bigint)) + coalesce(t.GMVCOUNT_NEW,cast(0 as bigint))
這樣的方式,coalesce函數類似於ORACLE數據庫裏面的nvl
2。join中where的過濾,on裏面才能起到表的過濾,放在where裏面起不到提前過濾的情況;
3.left semi jioin的使用
LEFT SEMI JOIN 是 IN/EXISTS 子查詢的一種更高效的實現。Hive 當前沒有實現 IN/EXISTS 子查詢,所以你可以用 LEFT SEMI JOIN 重寫你的子查詢語句。LEFT SEMI JOIN 的限制是, JOIN 子句中右邊的表只能在 ON 子句中設置過濾條件,在 WHERE 子句、SELECT 子句或其他地方過濾都不行。