網易面試:Flink 優化可以從哪幾個方面入手?分別詳細介紹一下?
1. MiniBatch聚合優化
MiniBatch聚合優化是Table API中最重要的性能優化技術之一,它通過批量處理輸入記錄來減少狀態訪問次數。
傳統模式下,每條記錄都需要進行三次狀態操作:讀取累加器、更新累加器、寫回累加器。這種模式在高頻數據更新場景下會產生嚴重的性能瓶頸。
(1) MiniBatch優化機制
MiniBatch通過內部緩沖區將多條記錄聚合后再進行狀態操作,顯著減少了狀態訪問次數。觸發條件包括:
- 緩沖區大小達到配置的閾值
- 延遲時間達到配置的上限
配置參數:
- table.exec.mini-batch.enabled: 啟用MiniBatch優化
- table.exec.mini-batch.allow-latency: 控制最大延遲時間
- table.exec.mini-batch.size: 控制緩沖區大小
configuration.set("table.exec.mini-batch.enabled", "true");
configuration.set("table.exec.mini-batch.allow-latency", "5 s");
configuration.set("table.exec.mini-batch.size", "5000");
2. Local-Global聚合優化
Local-Global聚合是解決數據傾斜問題的核心技術,采用兩階段聚合策略。
在傳統聚合中,某些key的數據量遠大于其他key,導致處理這些熱點key的任務成為瓶頸。
兩階段聚合策略Local階段:
- 在數據源附近進行預聚合,將相同key的多條記錄合并Global階段
- 對預聚合的結果進行最終聚合
參數配置:
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
Local-Global聚合依賴于MiniBatch優化,必須同時啟用兩個特性。
3. 拆分Distinct聚合優化
拆分Distinct聚合專門解決高基數distinct操作的性能問題。
對于高基數的distinct key,即使使用Local-Global優化也無法有效減少數據量,因為累加器仍然包含幾乎所有原始記錄。
(1) 分桶策略
通過引入bucket key將distinct聚合分解為兩層:
- 第一層按(group_key, bucket_key)聚合
- 第二層按原始group_key聚合
bucket key的計算公式:HASH_CODE(distinct_key) % BUCKET_NUM。
(2) 等價性保證
由于相同的distinct key總是映射到同一個bucket,這種轉換在語義上是等價的。
-- 原始查詢
SELECT day, COUNT(DISTINCT user_id)
FROM T GROUP BY day
-- 優化后自動轉換為
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024) )
GROUP BY day
// 完整的拆分Distinct聚合配置
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
// 啟用拆分Distinct聚合
tEnv.getConfig().set("table.optimizer.distinct-agg.split.enabled", "true");
// 可選:調整bucket數量(默認1024)
tEnv.getConfig().set("table.optimizer.distinct-agg.split.bucket-num", "2048");
// 執行包含高基數distinct的查詢
tEnv.executeSql("SELECT day, COUNT(DISTINCT user_id) FROM user_events GROUP BY day");
4. 檢查點優化決策
檢查點優化需要根據不同的背壓情況采用不同的策略。
背壓會顯著影響檢查點屏障的傳播速度,從而延長檢查點完成時間。
(1) 緩沖區Debloating機制
緩沖區 Debloating 機制可以通過將屬性`taskmanager.network.memory.buffer-debloat.enabled`設置為`true`來啟用。
此特性對對齊和非對齊 Checkpoint 都生效,并且在這兩種情況下都能縮短 Checkpointing 的時間,不過 Debloating 的效果對于對齊 Checkpoint 最明顯。
當在非對齊 Checkpoint 情況下使用緩沖區 Debloating 時,額外的好處是 Checkpoint 大小會更小,并且恢復時間更快 (需要保存和恢復的 In-flight 數據更少)。
(2) 非對齊檢查點
允許檢查點屏障跨越緩沖區,將in-flight數據作為檢查點狀態的一部分。
// 啟用非對齊 Checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();
5. RocksDB狀態后端內存優化
RocksDB內存優化是大狀態應用性能調優的關鍵環節。
RocksDB的性能很大程度上取決于可用內存量,增加內存通常能帶來顯著的性能提升。
6. 自適應批處理優化決策
AdaptiveBatchScheduler能夠根據運行時信息動態調整執行計劃。
傳統的靜態優化在面對動態數據特征時往往力不從心,AdaptiveBatchScheduler能夠根據運行時的實際情況動態調整執行計劃。
自動并行度推導:
Adaptive Batch Scheduler 默認啟用了自動并行度推導,可以通過配置 [`execution.batch.adaptive.auto-parallelism.enabled`]來開關此功能。
除此之外,也可以根據作業的情況調整以下配置:
[`execution.batch.adaptive.auto-parallelism.min-parallelism`]: 允許自動設置的并行度最小值。
[`execution.batch.adaptive.auto-parallelism.max-parallelism`]: 允許自動設置的并行度最大值,如果該配置項沒有配置將使用通過 [`parallelism.default`] 或者 `StreamExecutionEnvironment#setParallelism()` 設置的默認并行度作為允許自動設置的并行度最大值。
[`execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task`]: 期望每個任務平均處理的數據量大小。請注意,當出現數據傾斜,或者確定的并行度達到最大并行度(由于數據過多)時,一些任務實際處理的數據可能會遠遠超過這個值。
[`execution.batch.adaptive.auto-parallelism.default-source-parallelism`]: source 算子可動態推導的最大并行度,若該配置項沒有配置將優先使用 [`execution-batch-adaptive-auto-parallelism-max-parallelism`]作為允許動態推導的并行度最大值,若該配置項也沒有配置,將使用 [`parallelism.default`] 或者 `StreamExecutionEnvironment#setParallelism()` 設置的默認并行度。
系統能夠根據數據量自動確定最優的并行度,避免了手動調優的復雜性。