字節跳動 Spark Shuffle 大規模云原生化演進實踐
在字節跳動內部,Spark 計算引擎被廣泛應用于大規模數據處理,機器學習等場景,天任務數超過 150W。線上集群磁盤類型多樣,包括 SSD、HDD 及混合等。每天會產生超過 100PB 以上的 Shuffle 數據,同時單個任務的 Shuffle 數據量可能達到數百 TB。巨量的 Shuffle 數據和復雜的計算資源環境也給 Spark 運行過程中的 Shuffle 性能帶來了很多挑戰。本文將從背景介紹、穩定性資源場景和混部資源場景分享字節跳動在 Spark Shuffle 云原生化方面的大規模演進實踐。
一、背景介紹
Spark 是字節跳動內使用廣泛的計算引擎,已廣泛應用于各種大規模數據處理、機器學習和大數據場景。目前中國區域內每天的任務數已經超過 150 萬,每天的 Shuffle 讀寫數據量超過 500 PB。同時某些單個任務的 Shuffle 數據能夠達到數百 TB 級別。
與此同時作業量與 Shuffle 的數據量還在增長,相比去年,今年的天任務數增加了 50 萬,總體數據量的增長超過了 200 PB,達到了 50% 的增長。Shuffle 是用戶作業中會經常觸發的功能,各種 ReduceByKey、groupByKey、join、sortByKey 和 repartition 的操作都會使用到 Shuffle。所以在大規模的 Spark 集群內,Spark Shuffle 經常會成為性能及穩定性的瓶頸。Shuffle 的計算涉及到頻繁的磁盤和網絡 IO 操作,主要是需要把所有節點的數據進行重新分區并組合。
1、原理
在社區版 ESS 模式下默認使用的 Shuffle 模式的基本原理中,剛才提到 Shuffle 的計算會把數據進行重新分區,這里就是把 Map 的數據重新組合到所有的 Reducers 上。如果有 M 個 Mappers,和 R 個 Reducers,就會把 M 個 Mappers 的 Partition 數據分區成后面 R 個 Reducers 的 Partition。
Shuffle 的過程可以分為兩個階段——Shuffle Write 和 Shuffle Read。
Shuffle Write 的時候,mapper 會把當前的 Partition 按照 Reduce 的 Partition 分成 R 個新的 Partition,并排序后寫到本地磁盤上。生成的 map output 包含兩個文件:索引文件和按 Partition 排序后的數據文件。
當所有的 Mappers 寫完 map output 后,就會開始第二個階段,Shuffle Read 階段。這個時候每個 Reducer 會向包含它的 Reducer Partition 的所有 ESS 訪問,并讀取對應 Reduce Partition 的數據。這里有可能會請求到所有 Partition 所在的 ESS,直到這個 Reducer 獲取到所有對應的 Reduce Partition 的數據。
在 Shuffle Fetch 階段,每個 ESS 會收到所有 Reducer 的請求并返回相應的數據。這將產生 M 乘 R 級別的網絡連接和隨機的磁盤讀寫 IO,涉及到大量的磁盤讀寫和網絡傳輸。這就是為什么 Shuffle 會對磁盤以及網絡 IO 的請求都特別頻繁的原因。
由于 Shuffle 對資源的需求和消耗都非常高,所以 CPU、磁盤和網絡開銷都很有可能是造成 Fetch failure 的原因或 Shuffle 速度較慢的瓶頸。在字節跳動大規模的 Shuffle 場景中,同一個 ESS 節點可能需要同時服務多個商戶,而這些集群沒有進行 IO 的隔離,就可能會導致 Shuffle 成為用戶作業失敗的主要原因和痛點問題。
字節跳動從 2021 年初開始了 Spark Shuffle 的云原生化相關工作,Spark 作業與其他大數據生態開始了從 Yarn Godel 的遷移。Godel 是字節跳動基于 Kubernetes 自研的調度器,遷移時也提供了 Hadoop 上云的遷移方案——Yodel(Yarn on Godel),是一個完全兼容 Hadoop Yarn 的協議,目標是將所有大數據應用平滑地遷移到 Kubernetes 體系上。
在這套遷移工作中,ESS 也做了定制化的相關工作,完成了從之前 Yarn Node Manager 模式下的 Yarn Auxiliary Service 遷移至 Kubernetes DaemonSet 部署模方式的適配工作,并開始 Shuffle 作業的遷移工作。歷時兩年,在 2023 年,順利將所有大數據應用包括 Spark 應用都遷移到了如今的云原生生態上。
2、挑戰
在云原生化的遷移過程中,也遇到了很多挑戰:
- 首先,從 NM 遷移到 DaemonSet 的過程中,DaemonSet 上 ESS 的 CPU 有非常嚴格的限制,而在之前的 NM 模式下,ESS 基本上可以使用所有的 CPU 資源。所以在這個遷移實踐中,往往最開始設置的 ESS 的 CPU 資源是不夠的,需要經過持續不斷的調整。后續,某些高優集群甚至直接放開對 ESS 的 CPU 使用。
- 同時,DaemonSet 和 Pod 對 Spark 作業的 CPU 有更嚴格的限制。這也導致不少用戶的作業遷移到了新的架構后變得更加緩慢了。這是因為在之前的模式下,CPU 是有一定的超發的,因此需要對這個情況進行調整。我們在 Kubernetes 和 Godel 架構下開啟了 CPU Shares 模式,使用戶在遷移過程中感知不到性能上的差異。
- 另外,Pod 對內存的限制也非常嚴格,這導致 Shuffle Read 時無法使用空閑的 page cache 資源,從而導致 Shuffle Read 時 page cache 的命中率非常低。這個過程會帶來更多的磁盤 IO 開銷,導致整體性能變差。對此我們采取了相應的措施,通過適當開放 Pod 對 page cache 的使用,降低 Shuffle 在遷移后對性能的影響。
3、收益
完成遷移工作之后,我們成功地將所有的離線資源池完成統一,在調度層面能夠更友好地實施一些優化和調度策略,從而提高整體的資源使用率。ESS Daemonset 相比于 Yarn Auxilary Service 也獲得了不少的收益。首先,ESS DaemonSet 被獨立出來成為一個服務,脫離與 NM 的緊耦合,減少了運維成本。另外,Kubernetes 和 Pod 對 ESS 資源的隔離也增加了 ESS 的穩定性,這意味著 ESS 不會再受到其他作業或者節點上其它服務的影響。
云原生化后的 Spark 作業目前有兩個主要的運行環境:
- 穩定資源集群環境。這些穩定資源的集群主要以服務高優和 SLA 的任務為主。部署的磁盤是性能比較好的 SSD 磁盤。對于這些穩定資源集群,主要使用基于社區、深度定制化后的 ESS 服務。使用 SSD 磁盤,ESS 讀寫,也可以使用到本地的高性能 SSD 磁盤。部署在 Daemonset 模式,Godel 架構下。
- 混部資源集群環境。這些集群主要服務于中低游的作業,以一些臨時查詢、調試或者測試任務為主。這些集群的資源主要都部署在 HDD 磁盤上,有些是通過線上資源出讓或與其他服務共用的或者其他線上的服務共同部署的一些資源。這就導致集群的資源都不是獨占的,整體的磁盤性能以及儲存環境也都不是特別優異。
二、穩定資源場景
在穩定集群環境中,存在較多的高優作業,首要任務是提高這些作業 Shuffle 的穩定性,以及運行時的作業時長,以確保這些作業的 SLA。為了解決 Shuffle 的問題,對 ESS 深度定制了以下三方面能力:增強 ESS 的監控/治理能力、增加 ESS Shuffle 的限流功能、增加 Shuffle 溢寫分裂功能。
1、ESS 深度定制
(1)增強 ESS 的監控及治理能力
在監控方面,我們使用開源版本的過程中發現現有的監控不足以深度排查遇到的 Shuffle 問題和當前的 ESS 狀況。這就導致沒有辦法快速定位是哪些節點造成的 Shuffle 問題,也沒有辦法感知到有問題的節點,因此,我們對監控能力進行了一些增強。
首先,我們增加了監控 Shuffle 慢和 Fetch Rate 能力的一些關鍵指標,包括 Queued Chunks 和 Chunk Fetch Rate。Queued Chunks 用于監控當前請求 ESS 節點上請求的堆積,而 Chunk Fetch Rate 用于監控這些節點上請求的流量。同時,我們還將 ESS 的 Metrics 指標接入了字節跳動的 Metrics 系統,使我們能夠通過系統提供的 Application 維度的指標快速定位 ESS 節點的堆積情況。在用戶界面 (UI) 方面,我們在 Stage 詳情頁加入了兩個新功能,用于展示當前 Stage 里每個 Task Shuffle 遇到最慢的幾個節點,以及經過 Stage 統計后所有 Task 遇到 Shuffle 次數最多的 top 節點。這不僅方便用戶查詢,也可以利用這些指標進行相關大盤的搭建。
- 收益
有了這些監控與 UI 改善后,當用戶在 UI 上看到 Shuffle 慢的時候可以通過 UI 打開對應的 Shuffle 監控。方便用戶和我們團隊快速定位到導致 Shuffle 問題的 ESS 節點,看到這些節點上的實際情況,并快速定位這些堆積請求量是來自于哪些 Application。
新增的監控也會在運行排查 Shuffle 問題時感知到 ESS 節點上實際的 Chunk 堆積、latency 等關鍵指標。這在遇到 Shuffle 慢的情況下有助于更有效地實時采取措施。一旦定位到 Shuffle 問題,我們可以分析情況并提供治理方向和優化。
治理工作主要是通過 BatchBrain 系統來實施。BatchBrain 是專門為 Spark 作業設計的一套智能作業調優系統,它主要對作業數據進行采集,并進行離線與實時分析。采集的數據包括 Spark 本身的 Event Log、內部打入更詳細的 Timeline event 以及各種 Metrics 指標,包括對 ESS 加上的定制化 Shuffle 指標等。
在離線分析中主要需要治理周期性作業,根據每個作業的歷史特征,結合采集的數據,對這些作業的 Shuffle Stage 性能進行分析,并經過多次迭代調整,最終提供一套適合的Shuffle 參數,使這些作業在重新運行時可以對優化后的Shuffle 參數進行運行,從而獲得更好的性能和效果。
BatchBrain 在實時分析部分也可以利用之前添加的 Shuffle 指標進行自動掃描。用戶還可以通過 BatchBrain API 查詢他們集群內作業的 Shuffle 狀況,以及有效定位遇到 Shuffle 堆積的節點和作業,并通過報警通知相關人員。如果發現 Shuffle 慢是由于其他的作業或者異常作業導致的,用戶也可以直接采取治理動作,例如停止或者驅逐這些作業,以便為更高優先級的作業騰出更多資源進行 Shuffle。
(2)Shuffle 限流功能
通過 Shuffle 的監控和治理,我們發現在 ESS 節點上遇到 Shuffle 慢的情況,通常是因為某些任務的數據量過于龐大或者設置了不妥的參數,導致這些 Shuffle Stage 的 Mapper 和 Reducer 數量都異常地大。異常大量的 Mapper 和 Reducer 數量可能會導致 ESS 節點上出現大量的請求堆積,而這些請求的 chunk size 也可能非常小。有些異常作業的平均 Chunk size 可能連 20 KB 都沒達到。這些作業對 ESS 發送很大的請求量,這種情況可能會導致 ESS 無法及時處理所有的請求,從而引發請求堆積,甚至導致作業的延遲或直接失敗。
針對這些現象,我們采取的解決方案是對 ESS 節點上每個 Application 的總請求量進行限制。當某個 Application 的 Fetch 請求達到了上限,ESS 將拒絕該 Application 發送的新 Fetch 請求,直到該 Application 等待現有請求的部分結束后才能繼續發送新的請求。這樣可以防止出現單個 Application 占用節點上過大的資源而導致 ESS 沒有辦法正常為其他作業請求提供服務的情況,也可以避免其他作業失敗或 Shuffle 速度變慢。這個方案可以緩解異常或大規模的 Shuffle 作業對集群 Shuffle 的負面影響。
Shuffle 限流功能的特征
- 在作業運行正常的時候,即使開啟了限流功能,也不會對作業有任何影響。節點如果可以正常服務,是不需要觸發任何限流的。
- 只有當節點的負載超過可以承受的范圍,且 Shuffle IO 超過設置的閾值后,才會啟動限流機制,減少異常任務可以向 ESS 發送的請求數量,減低這個 ESS 服務當前的壓力。由于這時候 ESS 服務的負載能力已經超過了可承受的范圍,即使它收到這些請求,也無法正常返回這些請求,因此,限制異常任務過多的請求反而可能更好地提高這些任務本身的性能。
- 在限流的情況下,也會考慮作業的優先級。對于高優的任務,會允許更大的流量。
- 當限流生效后,如果發現 ESS 的流量已經恢復正常了將迅速解除限流。受限流的 Application 很快就可以恢復到之前的流量水平。
限流的詳細流程
限流功能主要在 ESS 服務端進行,每隔 5 秒在節點上進行 latency 指標的掃描,當這個 latency 指標超過設置的閾值時,會判定該節點的負載已經超出能夠承受的負載了。接著會對 ESS 節點當前所有正在進行 Shuffle 的 Application 進行評估,判斷是否要開啟限流。利用之前加上的指標,可以統計近 5 分鐘這個節點上 Fetch 的總流量和 IO,根據總流量的上限,對每個 ESS 節點當前正在運行 Shuffle 的 Application 合理地分配每個 Application 的流量并進行限制。流量分配也會根據 Application 的優先級進行調整。如果有任何 Application 的 Shuffle 或者當前堆積的 Chunk Fetch Rate 已經超過了其分配的流量,它們將受到限流,新發送的請求也會被拒絕,直到堆積的請求已經部分解除為止。
對于限流的分配,也有一個分級系統。首先,根據當前節點上運行 Shuffle 的 Application 的數量進行分配,Application 的數量越多,每個 Application 可以分配到的流量就越少。當節點上 Application 數量比較少的時候,每個 Application 可以分配更多的流量。限流級別也會根據節點上的實際情況每 30 秒進行調整。
在限流的情況下,如果節點上的 latency 沒有改善,且 Shuffle 的總流量也沒有恢復,就會升級限流,對所有 Application 進行更嚴格的流量限制。相反,如果 latency 有好轉或者節點流量已經在恢復,就會降級限流甚至直接解除掉。最后,限流也會根據所有作業的優先級進行適當調整。
上圖中有個例子,在作業較少的情況下,對一個高優作業進行限流,作業分配的流量可能會更高,然而,如果節點的負載一直沒有緩解,限流也會升級。同等的情況下,一個中低優的作業,會給它分配更少的流量。開通限流功能之后,線上許多高優集群都觀察到了性能的顯著提升。
首先,Chunk 的堆積問題得到了明顯的減輕。由于受到限流的限制,異常任務引發的 Chunk 堆積情況有效的減少了,大大降低了集群中某些節點上出現大量請求堆積的情況。
另外,Latency 的狀況也得到了改善。在開啟限流前,我們經常會看到集群中的節點出現高延遲的情況。而在啟用限流功能后,整體的 Latency 狀況得到了明顯緩解。通過減少無必要和無效的請求,以及對各種大型或異常任務對 ESS 節點發起的請求量進行限制,我們避免了這些異常大型任務對 ESS 服務負載的負面影響,減少了對其他高優任務運行的影響。
(3)Shuffle 溢寫分裂的功能
在分析一些慢 Shuffle 的作業時,我們也發現了另一個現象,一個作業中每個 Executor 寫 Shuffle 數據的數量可能非常不均衡。由于 ESS 使用了 Dynamic Allocation 機制,每個 Executor 的運行時長和分配的 Map Task 數量可能不同。這導致在作業運行期間,大量的 Shuffle 數據可能集中在少數的 Executor 上,導致 Shuffle 數據實際上都集中在少數節點上。
例如下圖中,我們發現有 5 個 Executor 的 Shuffle 寫入量超過了其他 Executor 的 10 倍以上。在這種情況下,Shuffle 的請求可能會集中在這幾個節點上,導致這幾個 ESS 節點的負載非常高,這也間接增加了 Fetch Failure 的可能性。
針對這種情況,我們提供的解決方案是控制每個容器或每個節點寫入磁盤的 Shuffle 數據總量。這個功能可以從兩個角度實現。首先,通過 Spark 本身來控制 Executor 的 Shuffle Write Size,也就是每個 Executor 在執行 Shuffle 時寫入的最大數據量。每個 Executor 會計算其當前寫入的 Shuffle 數據量,并將這信息匯報給 Spark Driver。Spark Driver 可以使用 Exclude on Failure 機制主動將那些寫入數據已經超出閾值的 Executor 排除在調度范圍之外,并回收這些 Executor。此外,我們還通過 Godel 調度器改善調度策略,盡量將新的 Executor 調度到其他節點,避免單個容器的 Shuffle 寫入數據過多,從而導致該節點的磁盤被填滿,或者在 Shuffle Fetch 階段數據集中在這幾個 ESS 節點上。
2、云原生優化
同時,在云原生優化方面,我們也進行了一些 Executor 的調度和功能優化,通過 Godel 調度器的策略,提升 Shuffle 能力。Godel 調度器提供的調度策略,可以在調度 Executor 時盡量避免負載高的 Shuffle 節點,從而降低這些節點后續遇到 Shuffle 問題的可能性。此外,調度器還可以為 Executor 的 Shuffle Write 提供更多的功能以實現打散。例如,它可以在磁盤壓力特別大的節點上驅逐 Executor,或者在磁盤剩余空間不足時,驅逐那些已經寫入大量 Shuffle 數據的容器。
Spark Driver 控制 Executor 的 Shuffle 與云原生調度功能結合可以將整體的 Shuffle 數據分散到更多的節點上,使 Shuffle Fetch 階段的數據和請求更加均衡分布。
- 效果
在線上開啟了上述深度定制的 Shuffle 優化后,我們觀察到了顯著的效果。以下是來自三個高優集群的一些運行數據,每天在這三個高優集群中的任務總數可能超過 30 萬,但平均每天因為 Shuffle Fetch 失敗而最終失敗的作業總數平均在 20 到 30 左右,可以說達到了低于 1/10000 的失敗率。如下圖可以觀察到這三個高優集群在優化后的穩定性都有了顯著的提升,也大幅度減少了用戶在 Shuffle 上遇到的問題。
三、混部資源場景
接下來介紹在混部場景中進行的優化。首先值得注意的是,在混部集群場景下,Fetch Failure 的情況通常比在穩定資源環境中嚴重得多。每天平均的 Fetch Failure 次數非常高,主要原因是這些資源大多來自于線上資源空閑的出讓,它們的磁盤 IO 能力和磁盤空間都比較有限。此外,由于磁盤 IOPS 和磁盤空間可能非常有限,與 HDFS 或其他服務混合部署的資源對集群的 Shuffle 性能影響較大,因此發生失敗的概率也較高。混部資源治理以降低作業的失敗率,確保作業的穩定性為主要目標,同時需要提高整個集群的 Shuffle 性能,減少資源浪費。
對于混部資源的集群,主要的方案是自研的 Cloud Shuffle Service(CSS),通過提供一個遠端的 Shuffle 服務來減少這些作業對本地磁盤的依賴。
1、CSS 功能介紹
首先,CSS 提供了一個 Push Based Shuffle 模式,與剛才介紹的 ESS 模式不同,在 Push Based Shuffle 模式下,不同 Mapper 的同一個 Reducer Partition 數據都會發送到一個共同的遠程服務上,在這個服務上進行合并,最后在某個 Worker 上寫上一個或者多個文件,使得 Reduce 階段可以通過 Sequential Read 模式讀取這些 Partition 數據,減少隨機 IO 的開銷。
CSS 也支持 Partition Group 功能,它的作用是將多個分區數據分配到一個 Reducer Partition Group。這樣,在 Map 階段 Mapper 可以通過 Batch Push 方式傳送數據,將批量數據直接傳輸到對應分區組的工作節點上,從而降低了批量模式下 IO 的開銷,提高了批量模式的性能。
CSS 也提供了一個快速雙寫備份的功能。由于使用的是 push based Shuffle 和聚合模式,所有的數據其實都聚集在一個 Worker 上,如果這個 Worker 數據丟失的話,等于所有的 Mapper 都要重新計算所對應的數據,因此對于 push 聚合的功能,使用一個雙寫備份是比較重要的。CSS 提高寫入的速度的方式是采用雙寫 In-memory 副本模式并進行異步刷盤,這樣 Mapper 無需等待刷盤結束就可以繼續推送后續的數據。
CSS 本身也具有一個負載均衡功能。CSS 通過一個 Cluster Manager 去管理所有服務上的節點。Cluster Manager 會定期去采集和收取 CSS Worker 節點匯報的負載信息,當有新的 Application 提交的時候,它會進行資源的均衡分配,以確保 Shuffle Write 和 Shuffle Read 會優先分配到集群上使用率較低的節點,從而實現集群中更好的 Shuffle 負載均衡。
2、CSS 整體架構
- Cluster Manager 負責集群的資源分配,并維護集群 Worker 和 Application 狀態,它可以通過 Zookeeper 或者本地磁盤保存這些信息,達到具有 High Availability 的服務。
- Worker 支持兩種寫入模式,分別是磁盤模式和 HDFS 模式。目前常用的是磁盤模式,每個分區的數據會寫入兩個不同的 Worker 節點,以實現數據冗余。
- CSS Master 位于 Spark driver 端,主要負責與 Cluster Manager 的心跳聯系以及 Application Lifecycle。作業啟動時,也會向 Cluster Manager 申請 Worker。Shuffle Stage 的過程也會統計 Shuffle Stage 的元數據以及的進展。
- Shuffle Client 是一個接入了 Spark Shuffle API 的組件,允許任何 Spark 作業直接使用 CSS 而無需額外配置。每個 Executor 會使用 ShuffleClient 進行讀寫。Shuffle Client 在寫入時進行雙寫,在讀的時候,它可以向任何一個存有數據的 Worker 讀取這些數據,如果其中一個 Worker 讀取失敗的話,也會自動切換到另一個 Worker 上,并對多讀的數據進行去重。
CSS 在寫入時 Worker 會直接發送數據,Mapper 會同時將數據發送到兩個 Worker,Worker 不會等到刷磁盤之后返回給 Mapper,而是異步返回給 Mapper 結果,如果遇到失敗,會在下一個請求再通知 Mapper。這時 Mapper 會重新跟節點申請兩個新的 Worker,重新推送傳送失敗的數據。讀的時候可以從任何一個節點讀取數據,通過 Map ID,Attempt ID 和 Batch ID 進行去重。
3、CSS 性能與未來演進
在 1TB 的 TPC-DS Benchmark 性能測試下,CSS 在 30% 以上的 Query 中得到了提升。
CSS 作為一個遠端 Shuffle 服務,特別適合云原生化,支持彈性部署和更多的遠程儲蓄服務。目前 CSS 已經完成了開源,有興趣的朋友可以去 CSS 開源網站了解更多信息,也希望把后面的一些迭代和優化同步到社區上。在未來云原生化的演進中需要支持彈性部署、支持遠程存儲服務等相關能力。
以上就是本次分享的內容,謝謝大家。