1. 數據存儲與格式優化
- 選擇合適的文件格式:推薦使用 Parquet、ORC 等列式存儲格式,壓縮率高,查詢快。
- 分區存儲:根據業務字段(如日期、地區等)合理分區,減少掃描數據量。
- 避免小文件:合併小文件,減少文件系統開銷,提高並行度。
2. SQL 查詢優化
- 謂詞下推(Predicate Pushdown):儘量將過濾條件(WHERE)寫在 SQL 最外層,減少讀取無用數據。
- 列裁剪(Column Pruning):只讀取需要的字段,避免 SELECT *。
- JOIN 優化:
- 優先使用廣播(Broadcast Join)處理小表與大表的關聯。
- 調整 join 順序,小表放前面。
- 使用合適的 join 類型(如 broadcast、sort-merge、shuffle hash)。
- 避免數據傾斜:
- 對 key 分佈不均的 join/group by,考慮加鹽(salting)、拆分大 key。
- 監控任務 stage 的數據分佈和時間消耗。
3. 內存與資源配置
- 合理設置 spark.sql.shuffle.partitions:默認200,建議根據數據量和集羣規模調整。
- 內存分配:調整 executor 內存(spark.executor.memory)、core 數(spark.executor.cores)。
- 動態資源分配:開啓 dynamic allocation(spark.dynamicAllocation.enabled)。
4. 緩存與持久化
- 緩存熱點數據:使用
cache()或persist()緩存頻繁訪問的數據集。 - 合理選擇緩存級別:如 MEMORY_ONLY、MEMORY_AND_DISK 等。
- 及時釋放緩存:用完後
unpersist(),避免佔用內存。
5. 代碼與函數優化
- 避免 UDF 濫用:優先使用 Spark 內置函數,UDF 會降低優化器能力。
- 拆分複雜 SQL:分步處理,簡化邏輯,便於調優和排查。
- 避免笛卡爾積:join 時加好關聯條件。
6. 監控與調試
- 使用 Spark UI:監控 DAG、Stage、Task,分析瓶頸。
- 開啓 SQL 物理計劃分析:
EXPLAINSQL,查看執行計劃,發現潛在問題。 - 日誌分析:關注 executor、driver 日誌,定位異常。
7. 其他建議
- 合理使用窗口函數:避免大數據量全局排序。
- 批量處理:數據量大時分批處理,防止 OOM。
- 升級 Spark 版本:新版本優化器更智能,性能提升明顯。
總結表格
|
優化方向
|
具體建議
|
説明/工具
|
|
存儲格式
|
Parquet/ORC
|
列式存儲,壓縮快
|
|
分區策略
|
按業務字段分區
|
減少掃描數據
|
|
文件合併
|
合併小文件
|
提高並行度
|
|
謂詞下推
|
WHERE條件前置
|
減少無用數據讀取
|
|
列裁剪
|
只選必要字段
|
SELECT * 慎用
|
|
JOIN優化
|
廣播小表,調整順序
|
Broadcast Join
|
|
數據傾斜
|
加鹽、拆分大key
|
監控Stage分佈
|
|
資源配置
|
調整分區數、內存、core
|
shuffle.partitions
|
|
緩存持久化
|
cache/persist/unpersist
|
內存管理
|
|
UDF優化
|
優先內置函數
|
減少UDF使用
|
|
監控調試
|
Spark UI、EXPLAIN
|
物理計劃分析
|
8. Shuffle 優化
- 減少 Shuffle 操作:儘量避免多次 shuffle,比如頻繁的 group by、join、repartition 操作。
- 合併 Shuffle 文件:通過參數
spark.shuffle.file.buffer、spark.reducer.maxSizeInFlight優化 shuffle 文件大小和網絡傳輸。 - 合理設置分區數:
spark.sql.shuffle.partitions不是越大越好,數據量小可適當減少,數據量大可適當增加,避免單個 task 過慢或資源浪費。
9. 廣播 Join 細節
- 自動廣播閾值:
spark.sql.autoBroadcastJoinThreshold默認10MB,小表可自動廣播。 - 強制廣播:可用
broadcast(table)或 SQL hint/*+ BROADCAST(table) */強制廣播小表。 - 廣播數據大小控制:如果小表稍大,可提升閾值,但要注意 driver 內存壓力。
10. 分區與分桶(Bucketing)
- 分桶表優化 Join:對 join 的 key 做分桶(bucket),可減少 shuffle,提升 join 性能。
- 分區表優化過濾:按查詢高頻字段分區,提升 where 條件過濾效率。
- 分桶數和分區數要合理:過多會導致小文件,過少會導致數據傾斜。
11. 動態分區裁剪(DPP)
- 開啓 DPP:
spark.sql.optimizer.dynamicPartitionPruning.enabled=true,在 join 操作中自動裁剪分區,減少掃描數據量。 - 適用場景:大表 join 小表且分區字段一致時效果顯著。
12. Skew Join(數據傾斜 Join)處理
- 自動傾斜處理:
spark.sql.adaptive.skewJoin.enabled=true,Spark 3.x 支持自動拆分大 key,優化傾斜 join。 - 手動加鹽:對傾斜 key 增加隨機前綴,join 後再去前綴合並結果。
- 拆分大 key:將大 key 單獨處理,其餘 key 正常 join。
13. Adaptive Query Execution(AQE,自適應查詢執行)
- 開啓 AQE:
spark.sql.adaptive.enabled=true,自動優化分區數、join 策略、傾斜處理。 - 自動合併小分區:
spark.sql.adaptive.coalescePartitions.enabled=true。 - 自動選擇 Join 類型:根據實際數據量動態調整 join 策略。
14. 性能監控與瓶頸定位
- SQL Metrics:在 Spark UI 的 SQL tab 查看每一步執行時間、數據量、spill 情況。
- 物理計劃分析:
EXPLAIN FORMATTED或EXPLAIN COST查看詳細計劃、代價估算。 - Stage DAG 分析:關注 shuffle read/write、task duration,定位慢點。
15. 生產環境建議
- 資源隔離:不同業務分配獨立隊列和資源,避免互相影響。
- 參數調優:根據業務場景調節
spark.sql.*、spark.executor.*等參數。 - 定期合併小文件:使用 compaction 腳本,減輕 HDFS 壓力。
- 自動重試和容錯:設置合理的重試次數,避免單點故障。
16. 典型參數推薦(參考)
# 文件格式
spark.sql.parquet.compression.codec=snappy
# 自動廣播 join 閾值
spark.sql.autoBroadcastJoinThreshold=20MB
# shuffle 分區數
spark.sql.shuffle.partitions=400
# AQE
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
# 動態分區裁剪
spark.sql.optimizer.dynamicPartitionPruning.enabled=true
# 資源配置
spark.executor.memory=8G
spark.executor.cores=4
spark.dynamicAllocation.enabled=true
# 文件格式
spark.sql.parquet.compression.codec=snappy
# 自動廣播 join 閾值
spark.sql.autoBroadcastJoinThreshold=20MB
# shuffle 分區數
spark.sql.shuffle.partitions=400
# AQE
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
# 動態分區裁剪
spark.sql.optimizer.dynamicPartitionPruning.enabled=true
# 資源配置
spark.executor.memory=8G
spark.executor.cores=4
spark.dynamicAllocation.enabled=true
17. 優化流程建議
- 分析 SQL 物理計劃:確定瓶頸點(如 shuffle、scan)。
- 調整分區/分桶策略:減少不必要的數據掃描和 shuffle。
- 合理使用緩存:熱點表或中間結果緩存。
- 參數調優:根據實際數據量和集羣資源調整參數。
- 監控和回溯:持續監控 SQL 性能,定期回溯優化效果。