成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Spark AQE SkewedJoin 在字節跳動的實踐和優化

精選
運維
本文將首先介紹 Spark AQE SkewedJoin 的基本原理以及字節跳動在使用 AQE SkewedJoin 的實踐中遇到的一些問題;其次介紹針對遇到的問題所做的相關優化和功能增強,以及相關優化在字節跳動的收益。

1. 概述

本文將首先介紹 Spark AQE SkewedJoin 的基本原理以及字節跳動在使用 AQE SkewedJoin 的實踐中遇到的一些問題;其次介紹針對遇到的問題所做的相關優化和功能增強,以及相關優化在字節跳動的收益;此外,我們還將分享 SkewedJoin 的使用經驗。

2. 背景

首先對 Spark AQE SkewedJoin 做一個簡單的介紹。Spark Adaptive Query Execution, 簡稱 Spark AQE,總體思想是動態優化和修改 stage 的物理執行計劃。利用執行結束的上游 stage 的統計信息(主要是數據量和記錄數),來優化下游 stage 的物理執行計劃。

圖片

Spark AQE 能夠在 stage 提交執行之前,根據上游 stage 的所有 MapTask 的統計信息,計算得到下游每個 ReduceTask 的 shuffle 輸入,因此 Spark AQE 能夠自動發現發生數據傾斜的 Join,并且做出優化處理,該功能就是 Spark AQE SkewedJoin。

圖片

例如 A 表 inner join B 表,并且 A 表中第 0 個 partition(A0)是一個傾斜的 partition,正常情況下,A0 會和 B 表的第 0 個 partition(B0)發生 join,由于此時 A0 傾斜,task 0 就會成為長尾 task。

SkewedJoin 在執行 A Join B 之前,通過上游 stage 的統計信息,發現 partition A0 明顯超過平均值的數倍,即判斷 A Join B 發生了數據傾斜,且傾斜分區為 partition A0。Spark AQE 會將 A0 的數據拆成 N 份,使用 N 個 task 去處理該 partition,每個 task 只讀取若干個 MapTask 的 shuffle 輸出文件,如下圖所示,A0-0 只會讀取 Stage0#MapTask0 中屬于 A0 的數據。這 N 個 Task 然后都讀取 B 表 partition 0 的數據做 join。這 N 個 task 執行的結果和 A 表的 A0 join B0 的結果是等價的。

圖片

不難看出,在這樣的處理中,B 表的 partition 0 會被讀取 N 次,雖然這增加了一定的額外成本,但是通過 N 個任務處理傾斜數據帶來的收益仍然大于這樣的成本。

Spark 從3.0 版本開始支持了 AQE SkewedJoin 功能,但是我們在實踐中發現了一些問題。

  • 不準確的統計數據可能導致 Spark 無法識別數據傾斜。
  • 切分不均勻導致優化處理效果不理想。
  • 不支持復雜場景例如同一個字段發生連續 join。

我將在【優化增強】中詳述這些問題以及我們的優化和解決方案。

3. 優化增強

3.1 提高數據傾斜的識別能力

由 Spark AQE 處理數據傾斜的原理不難發現,Spark AQE 識別傾斜以及切分數據傾斜的功能依賴于上游 Stage 的統計數據,統計數據越準確,傾斜的識別能力和處理能力就越高,直觀表現就是傾斜數據被拆分的非常平均,拆分后的數據大小幾乎和中位數一致,將長尾Task的影響降到最低。

MapStage 執行結束之后,每一個 MapTask 會生成統計結果 MapStatus,并將其發送給 Driver。MapStatus維護了一個 Array[Long],記錄了該 MapTask 中屬于下游每一個 ReduceTask 的數據大小。當 Driver 收集到了所有的 MapTask 的MapStatu之后,就能夠計算得到每一個 ReduceTask 的輸入數據量,以及分屬于每一個上游 MapTask 的數據大小。根據每一個 ReduceTask 的數據大小,Spark AQE 能夠判斷出數據傾斜,并根據上游 MapTask 的統計信息,合理切分 Reducetask,盡可能保證切分的均勻性。

圖片

如下圖描述,ReduceTask0 的 ShuffleRead(shuffle 過程中讀取的數據量) 為 200,明顯大于 ReduceTask1 和 ReduceTask2 的 100,發生了數據傾斜。我們可以將 ReduceTask0 拆成 2 份,ReduceTask0-0 讀取 MapTask0 和 MapTask1 的數據,ReduceTask0-1 讀取 MapTask2 和 MapTask3 的數據,拆分后的兩個 task 的 ShuffleRead 均為 100。

我們可以看出,統計信息的大小的空間復雜度是 O(M*R),對于大任務而言,會占據大量的 Driver 內存,所以 Spark 原生做了限制,對于 MapTask,當下游 ReduceTask 個數大于某一閾值(spark.shuffle.minNumPartitionsToHighlyCompress?,默認 2000),就會將MapStatus進行壓縮,所有小于 spark.shuffle.accurateBlockThreshold(默認100M)的值都會被一個平均值所代替填充。

舉個例子,下圖是我們遇到的一個 SkewedJoin 沒有生效的作業,從運行 metrics 來看,ShuffleRead 發生了很嚴重的傾斜,符合 SkewedJoin 生效的場景,但實際運行時并沒有生效。

圖片

通過閱讀日志,可以看到,Spark AQE 在運行時,獲取的 join 兩側的 shuffle partitions 的中位數和最大值都是一樣的,所以沒有識別到任何的傾斜。這就是由于壓縮后 MapStatus 的統計數據的不準確造成的。

圖片

我們在實踐中,遇到很多大作業由于統計數據不準確,無法識別傾斜。而當我們嘗試提高這一閾值之后,部分大作業由于 Driver 內存使用上漲而失敗,為了解決這一問題,我們做了以下優化:

  1. Driver 收到詳細的 MapStatus之后,先將數據用于更新每個 ReduceTask 的累計輸入數據,然后將 MapStatus壓縮,這樣就不會占用太多內存。此時,雖然壓縮后的 MapStatus無法讓我們獲得 ReduceTask 準確的上游分布,但是能夠獲得準確的 ReduceTask 的輸入數據總大小,這樣我們就能夠準確的識別發生傾斜的 ReduceTask。
  2. 上述優化增加了一次 MapStatus 的解壓操作,而 MapStatus 的解壓是一個比較耗CPU的操作,對于大作業可能出現 Driver CPU 被打滿,無法處理 Executor 心跳導致作業失敗的情況。對此,我們使用緩存保證Driver端在消費 MapStatus 時,每個 MapStatus 只會被解壓一次,大大降低了優化帶來的 Overhead。

通過上述優化,我們成功在線上將默認閾值從 2000 調整為 5000,保證了線上 96.6% 的 Spark 作業能夠準確的識別數據傾斜(如果存在)。

3.2 提高傾斜數據的切分均勻程度

由于 HighlyCompressMapStatus 用平均值填充所有低于 spark.shuffle.accurateBlockThreshold 的值,每個 ReduceTask 通過壓縮后的 MapStatus 累加計算得到的總數據大小和數據分布,就和實際差距很大。

舉個簡單的例子:我們得到 ReduceTask0 的實際總數據是 1G,而中位數是 100M,因此我們的期望是將 ReduceTask0 拆成 10 份,每一份是 100M。此時上游的 MapStage 一共有 100 個 MapTask,除了 MapTask0 中屬于 ReduceTask0 的數據是 100M,其他 99 個 MapStak 的數據都是 10M。當我們將所有的 MapStatus 壓縮之后,AQE 獲取的 ReduceTask0 的上游分布,就是 MapTask0 有 100M (因為大塊數據所以被保留),其他 99 個 MapTask 的數據都是 1M(在壓縮時使用平均值填充)。這時,Spark AQE 按照 100M 的期望值來切分,只會切分成兩個 ReduceTask:ReduceTask0-0(讀取MapTask0)和 ReduceTask0-1(讀取剩下99個MapTask)。

基于此,我們改進后的方法是利用精確的 ReduceTask 數據量來反推每個 MapperTask 對應的數據量,得到盡可能準確的數據分布。同樣是剛才的例子,我們已知 ReduceTask0 的實際總數據是 1G,MapStatus 壓縮的閾值是 100M,那么可以確定的是,MapTask0 關于 ReduceTask0 的數據 100M 是準確被保留的(因為大于等于閾值),而其他 99 個 MapTask 的數據都是不準確的。此時 AQE 就不會使用被壓縮的數據,而是通過 1G 的總數據反推得到其他 99 個 MapTask 中屬于 ReduceTask0 的數據是 10M,雖然同樣是存在誤差的平均值,但是相比壓縮數據,通過準確的總量反推得到的平均值會更加準確。這個時候 Spark 按照 100M 的期望值來切分,就會切成 10 個 ReduceTask,符合我們的預期。

而在實際應用中,利用新方案,AQE SkewedJoin 切分傾斜數據更加平均,優化效果有明顯的提升。

下圖是某個傾斜處理效果不理想的作業,SkewedJoin 生效后,該 Stage ShuffleReadSize 的中位數和最大值分別為 4M 和 9.9G。

圖片

經過我們的優化后,該 Stage 的 ShuffleReadSize 的中位數和最大值分別為 149M 和 1427M,傾斜分區的切分更加均勻,該 Stage 的運行時間也由原來的 2h 降為 20m。

圖片

3.3 支持更多的場景

場景1:JoinWithAggOrWin

以下圖為例,Stage10 雖然只有一個 SortMergeJoin,但是 join 的一邊并不是 Sort+Exchange 的組合,而是存在 Aggregate 算子或者 Window 算子,因此不屬于社區實現的范圍內。

圖片

場景2:MultipleSkewedJoin

在用戶的業務邏輯中,經常出現這樣一種場景:一張表的主鍵需要連續的 join 多張表,這種場景體現在 Spark 的具體執行上,就是連續的 join 存在于同一個 Stage 當中。如下圖所示 Stage21 中存在連續的多個 SortMergeJoin,而這種場景也是社區的實現無法優化的。

圖片

場景3:JoinWithUnion

Stage 中有 Union 算子,且 Union 的 children 中有 SMJ。

圖片

此外,我們還支持了 ShuffleHashJoin、 BucketJoin、MultipleJoinWithAggOrWin 等更多場景。

4. 字節的實踐

上面介紹的 LAS 對 Spark AQE SkewedJoin 的優化功能在字節跳動內部已使用 1 年左右,截止 2022年8月,優化日均覆蓋1.8萬+ Spark 作業,優化命中作業平均性能提升 35% 左右,其中 30% 被優化的 Spark 作業所屬于的場景是 LAS 自研支持的,大家可以通過火山引擎開通 LAS 服務并體驗這些優化功能。

5. 用戶指南

5.1 哪些場景 AQE SkewedJoin 不支持

AQE SkewedJoin 功能并不能處理所有發生數據傾斜的 Join,這是由它的實現邏輯所決定的。

第一,如果傾斜的分區的大部分數據來自于上游的同一個 Mapper,AQE SkewedJoin 無法處理,原因是 Spark 不支持 Reduce Task 只讀取上游 Mapper 的一個 block 的部分數據。

第二,如果 Join 的發生傾斜的一側存在 Agg 或者 Window 這類有指定 requiredChildDistribution 的算子,那么 SkewedJoin 優化無法處理,因為將分區切分會破壞 RDD 的 outputPartitioning,導致不再滿足 requiredChildDistribution。

第三,對于 Outer/Semi Join,AQE SkewedJoin 是無法處理非 Outer/Semi 側的數據傾斜。比如,對于 LeftOuter Join,SkewedJoin 無法處理右側的數據傾斜。

第四,AQE 無法處理傾斜的 BroadcastHashJoin。

5.2 AQE SkewedJoin 優化效果不明顯時的措施

如果遇到了符合應用場景但是 SkewedJoin 沒有生效或者傾斜處理效果不理想的情況,有以下調優手段:

  • 提高spark.shuffle.minNumPartitionsToHighlyCompress,保證值大于等于 shuffle 并發(當開啟 AQE 時,即為spark.sql.adaptive.coalescePartitions.initialPartitionNum)。
  • 調小spark.shuffle.accurateBlockThreshold,比如 4M。但是需要注意的是,這會增加 Driver 的內存消耗,需要同步增加 Driver 的 cpu 和內存。
  • 降低spark.sql.adaptive.skewJoin.skewedPartitionFactor,降低定義發生傾斜的閾值。

6. 總結

本文首先簡單介紹了 Spark AQE 的基本思想以及 SkewedJoin 功能的原理,接著提出了我們在應用 SkewedJoin的過程中遇到的一些問題。針對這些問題,我們介紹了對 AQE SkewedJoin 做的優化和增強——提高統計的準確度;提高傾斜數據的切分均勻程度;支持了更多的場景。接著,本文介紹了 AQE SkewedJoin 在字節跳動的使用情況,包括日均優化覆蓋作業和優化效果,其中30%被優化的 Spark 作業所屬于的場景是字節自研支持的。最后分享了我們關于 AQE SkewedJoin 的用戶指南:哪些場景 AQE SkewedJoin 不支持;當 AQE SkewedJoin 效果不明顯時,可以采取哪些措施。

7. 附錄A :本文涉及的關于 AQE SkewedJoin 優化的相關參數配置

參數配置名

默認值

參數意義

spark.shuffle.minNumPartitionsToHighlyCompress

2000

決定 Mapstatus 使用 HighlyCompressedMapStatus還是 CompressedMapStatus 的閾值,如果 huffle partition 大于該值,則使用 HighlyCompressedMapStatus。

spark.shuffle.accurateBlockThreshold

100M

HighlyCompressedMapStatus 中記錄 shuffle blcok 準確大小的閾值,當 block 小于該值則用平均值代替。

spark.sql.adaptive.skewJoin.skewedPartitionFactor

10

如果一個 partition 大于該因子乘以分區大小的中位數,那么它就是傾斜的 partition。

責任編輯:未麗燕 來源: 字節跳動技術團隊
相關推薦

2022-07-12 16:54:54

字節跳動Flink狀態查詢

2022-05-23 13:30:48

數據胡實踐

2024-11-01 17:00:03

2022-04-07 16:35:59

PGO 優化profile 數據編譯優化

2024-09-25 15:57:56

2024-08-22 14:53:24

PromptAI大模型

2022-08-21 21:28:32

數據庫實踐

2023-11-20 07:27:00

云原生Spark

2024-01-03 16:29:01

Agent性能優化

2024-04-23 10:16:29

云原生

2023-01-10 09:08:53

埋點數據數據處理

2022-12-23 08:58:35

字節跳動YARN架構

2022-07-18 16:02:10

數據庫實踐

2022-06-22 06:49:39

Hertz開源HTTP 框架

2022-11-24 08:50:07

數據中臺Data Catal

2021-09-06 11:15:05

數據治理字節跳動埋點

2025-01-22 14:00:12

2022-09-05 17:26:27

技術

2023-06-09 14:14:45

大數據容器化
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 午夜天堂精品久久久久 | 91精品久久久久久久久 | 欧美一区二区三区小说 | 国产一区二区三区四区 | 99久久精品国产一区二区三区 | 中文字幕一区在线 | 日本在线视频一区二区 | 在线国产视频 | 亚洲国产精品第一区二区 | 99久久视频| 四虎影院久久 | 蜜臀久久99精品久久久久野外 | 精品一区国产 | 亚洲综合电影 | 亚洲成人高清 | 亚洲精品一| 国产高清在线精品一区二区三区 | 精品亚洲国产成av人片传媒 | 免费在线看a| 二区成人 | 青青久久久 | 精品国产一区二区三区免费 | 日韩一区二区三区四区五区 | 中文字幕一级毛片视频 | 超碰男人天堂 | 涩涩鲁亚洲精品一区二区 | 国产精品无码专区在线观看 | 国产一级视屏 | 日韩精品中文字幕一区二区三区 | 欧美日本在线观看 | 国内毛片毛片毛片毛片 | 欧美一级片在线 | 九九九久久国产免费 | 欧美精品一区二区在线观看 | 久久久国产精品视频 | 日韩免费av | 亚洲电影一级片 | 国产一区二区激情视频 | 精品少妇一区二区三区日产乱码 | 热久色 | 九九热在线视频观看这里只有精品 |