成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

阿里云RemoteShuffleService新功能:AQE和流控

原創 精選
云計算 云原生
本文將介紹RSS最新的兩個重要功能:支持Adaptive Query Execution(AQE),以及流控。

作者 |  一錘、明濟

阿里云EMR自2020年推出Remote Shuffle Service(RSS)以來,幫助了諸多客戶解決Spark作業的性能、穩定性問題,并使得存算分離架構得以實施。為了更方便大家使用和擴展,RSS在2022年初開源,歡迎各路開發者共建。RSS的整體架構請參考[1],本文將介紹RSS最新的兩個重要功能:支持Adaptive Query Execution(AQE),以及流控。

一、RSS支持AQE

1.AQE簡介

自適應執行(Adaptive Query Execution, AQE)是Spark3的重要功能[2],通過收集運行時Stats,來動態調整后續的執行計劃,從而解決由于Optimizer無法準確預估Stats導致生成的執行計劃不夠好的問題。AQE主要有三個優化場景: Partition合并(Partition Coalescing), Join策略切換(Switch Join Strategy),以及傾斜Join優化(Optimize Skew Join)。這三個場景都對Shuffle框架的能力提出了新的需求。

Partition合并

Partition合并的目的是盡量讓reducer處理的數據量適中且均勻,做法是首先Mapper按較多的Partition數目進行Shuffle Write,AQE框架統計每個Partition的Size,若連續多個Partition的數據量都比較小,則將這些Partition合并成一個,交由一個Reducer去處理。過程如下所示。

由上圖可知,優化后的Reducer2需讀取原屬于Reducer2-4的數據,對Shuffle框架的需求是ShuffleReader需要支持范圍Partition:

def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]

Join策略切換

Join策略切換的目的是修正由于Stats預估不準導致Optimizer把本應做的Broadcast Join錯誤的選擇了SortMerge Join或ShuffleHash Join。具體而言,在Join的兩張表做完Shuffle Write之后,AQE框架統計了實際大小,若發現小表符合Broadcast Join的條件,則將小表Broadcast出去,跟大表的本地Shuffle數據做Join。流程如下:

Join策略切換有兩個優化:1. 改寫成Broadcast Join; 2. 大表的數據通過LocalShuffleReader直讀本地。其中第2點對Shuffle框架提的新需求是支持Local Read。

傾斜Join優化

傾斜Join優化的目的是讓傾斜的Partition由更多的Reducer去處理,從而避免長尾。具體而言,在Shuffle Write結束之后,AQE框架統計每個Partition的Size,接著根據特定規則判斷是否存在傾斜,若存在,則把該Partition分裂成多個Split,每個Split跟另外一張表的對應Partition做Join。如下所示。

Partiton分裂的做法是按照MapId的順序累加他們Shuffle Output的Size,累加值超過閾值時觸發分裂。對Shuffle框架的新需求是ShuffleReader要能支持范圍MapId。綜合Partition合并優化對范圍Partition的需求,ShuffleReader的接口演化為:

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]

2.RSS架構回顧

RSS的核心設計是Push Shuffle + Partition數據聚合,即不同的Mapper把屬于同一個Partition的數據推給同一個Worker做聚合,Reducer直讀聚合后的文件。如下圖所示。

在核心設計之外,RSS還實現了多副本,全鏈路容錯,Master HA,磁盤容錯,自適應Pusher,滾動升級等特性,詳見[1]。

3.RSS支持Partition合并

Partition合并對Shuffle框架的需求是支持范圍Partition,在RSS中每個Partition對應著一個文件,因此天然支持,如下圖所示。

4.RSS支持Join策略切換

Join策略切換對Shuffle框架的需求是能夠支持LocalShuffleReader。由于RSS的Remote屬性,數據存放在RSS集群,僅當RSS和計算集群混部的場景下才會存在在本地,因此暫不支持Local Read(將來會優化混部場景并加以支持)。需要注意的是,盡管不支持Local Read,但并不影響Join的改寫,RSS支持Join改寫優化如下圖所示。

5.RSS支持Join傾斜優化

在AQE的三個場景中,RSS支持Join傾斜優化是最為困難的一點。RSS的核心設計是Partition數據聚合,目的是把Shuffle Read的隨機讀轉變為順序讀,從而提升性能和穩定性。多個Mapper同時推送給RSS Worker,RSS在內存聚合后刷盤,因此Partition文件中來自不同Mapper的數據是無序的,如下圖所示。

Join傾斜優化需要讀取范圍Map,例如讀Map1-2的數據,常規的做法有兩種:

  • 讀取完整文件,并丟棄范圍之外的數據。
  • 引入索引文件,記錄每個Block的位置及所屬MapId,僅讀取范圍內的數據。

這兩種做法的問題顯而易見。方法1會導致大量冗余的磁盤讀;方法2本質上回退成了隨機讀,喪失了RSS最核心的優勢,并且創建索引文件成為通用的Overhead,即使是針對非傾斜的數據(Shuffle Write過程中難以準確預測是否存在傾斜)。

為了解決以上兩個問題,我們提出了新的設計:主動Split + Sort On Read。

主動Split

傾斜的Partition大概率Size非常大,極端情況會直接打爆磁盤,即使在非傾斜場景出現大Partition的幾率依然不小。因此,從磁盤負載均衡的角度,監控Partition文件的Size并做主動Split(默認閾值256m)是非常必要的。

Split發生時,RSS會為當前Partition重新分配一對Worker(主副本),后續數據將推給新的Worker。為了避免Split對正在運行的Mapper產生影響,我們提出了Soft Split的方法,即當觸發Split時,RSS異步去準備新的Worker,Ready之后去熱更新Mapper的PartitionLocation信息,因此不會對Mapper的PushData產生任何干擾。整體流程如下圖所示。

Sort On Read

為了避免隨機讀的問題,RSS采用了Sort On Read的策略。具體而言,File Split的首次Range讀會觸發排序(非Range讀不會觸發),排好序的文件連同其位置索引寫回磁盤。后續的Range讀即可保證是順序讀取。如下圖所示。

為了避免多個Sub-Reducer等待同一個File Split的排序,我們打散了各個Sub-Reducer讀取Split的順序,如下圖所示。

Sort優化

Sort On Read可以有效避免冗余讀和隨機讀,但需要對Split File(256m)做排序,本節討論排序的實現及開銷。文件排序包括3個步驟:讀文件,對MapId做排序,寫文件。RSS的Block默認256k,Block的數量大概是1000,因此排序的過程非常快,主要開銷在文件讀寫。整個排序過程大致有三種方案:

  • 預先分配文件大小的內存,文件整體讀入,解析并排序MapId,按MapId順序把Block寫回磁盤。
  • 不分配內存,Seek到每個Block的位置,解析并排序MapId,按MapId順序把原文件的Block transferTo新文件。
  • 分配小塊內存(如256k),順序讀完整個文件并解析和排序MapId,按MapId順序把原文件的Block transferTo新文件。

從IO的視角,乍看之下,方案1通過使用足量內存,不存在順序讀寫;方案2存在隨機讀和隨機寫;方案3存在隨機寫;直觀上方案1性能更好。然而,由于PageCache的存在,方案3在寫文件時原文件大概率緩存在PageCache中,因此實測下來方案3的性能更好,如下圖所示。

同時方案3無需占用進程額外內存,故RSS采用方案3的算法。我們同時還測試了Sort On Read跟上述的不排序、僅做索引的隨機讀方法的對比,如下圖所示。

整體流程

RSS支持Join傾斜優化的整體流程如下圖所示。

二、RSS流控

流控的主要目的是防止RSS Worker內存被打爆。流控通常有兩種方式:

  • Client在每次PushData前先向Worker預留內存,預留成功才觸發Push。
  • Worker端反壓。

由于PushData是非常高頻且性能關鍵的操作,若每次推送都額外進行一次RPC交互,則開銷太大,因此我們采用了反壓的策略。以Worker的視角,流入數據有兩個源:

  • Client推送的數據
  • 主副本發送的數據

如下圖所示,Worker2既接收來自Mapper推送的Partition3的數據,也接收Worker1發送的Partition1的副本數據,同時會把Partition3的數據發給對應的從副本。

其中,來自Mapper推送的數據,當且僅當同時滿足以下條件時才會釋放內存:

  • Replication執行成功
  • 數據寫盤成功

來自主副本推送的數據,當且僅當滿足以下條件時才會釋放內存:

  • 數據寫盤成功

我們在設計流控策略時,不僅要考慮限流(降低流入的數據),更要考慮泄流(內存能及時釋放)。具體而言,高水位我們定義了兩檔內存閾值(分別對應85%和95%內存使用),低水位只有一檔(50%內存使用)。達到高水位一檔閾值時,觸發流控,暫停接收Mapper推送的數據,同時強制刷盤,從而達到泄流的目標。僅限制來自Mapper的流入并不能控制來自主副本的流量,因此我們定義了高水位第二檔,達到此閾值時將同時暫停接收主副本發送的數據。當水位低于低水位后,恢復正常狀態。整體流程如下圖所示。

三、性能測試

我們對比了RSS和原生的External Shufle Service(ESS)在Spark3.2.0開啟AQE的性能。RSS采用混部的方式,沒有額外占用任何機器資源。此外,RSS所使用的內存為8g,僅占機器內存的2.3%(機器內存352g)。具體環境如下。

1.測試環境

硬件:

header 機器組 1x ecs.g5.4xlargeworker 機器組 8x ecs.d2c.24xlarge,96 CPU,352 GB,12x 3700GB HDD。

Spark AQE相關配置:

spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.coalescePartitions.initialPartitionNum 1000
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.localShuffleReader.enabled false

RSS相關配置:

RSS_MASTER_MEMORY=2g
RSS_WORKER_MEMORY=1g
RSS_WORKER_OFFHEAP_MEMORY=7g

2.TPCDS 10T測試集

我們測試了10T的TPCDS,E2E來看,ESS耗時11734s,RSS單副本/兩副本分別耗時8971s/10110s,分別比ESS快了23.5%/13.8%,如下圖所示。我們觀察到RSS開啟兩副本時網絡帶寬達到上限,這也是兩副本比單副本低的主要因素。

具體每個Query的時間對比如下:

相關鏈接

github地址:https://github.com/alibaba/RemoteShuffleService

Reference

[1]阿里云EMR Remote Shuffle Service在小米的實踐,以及開源. https://developer.aliyun.com/article/857757

[2]Adaptive Query Execution: Speeding Up Spark SQL at Runtime. https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html

責任編輯:武曉燕 來源: 阿里開發者
相關推薦

2022-03-29 09:03:08

JavaScript數組語義

2011-07-28 14:06:52

XCode XCode 3.2

2010-11-12 10:32:53

微軟Azure云計算

2022-04-16 12:21:59

XubuntuLinux

2009-06-19 12:53:56

Spring 2.0

2009-12-17 10:21:15

2021-05-18 11:29:26

Oracle分析云

2023-04-17 07:32:01

軟件包OpenBSD

2014-12-19 09:53:25

Android 5.1

2022-04-17 18:55:44

KubuntuLinux 發行版Ubuntu

2023-04-05 19:27:05

Debian

2021-04-12 10:07:06

云計算邊緣云阿里云

2012-05-08 13:18:42

流控引擎流控

2022-04-25 10:34:16

UbuntuLinux 內核穩定版本

2022-03-01 09:08:35

Fedora 36GNOMEKDE 流派

2018-01-02 09:06:10

2021-11-26 15:27:23

MetaWorkplaceTeams

2009-09-17 09:39:28

Chrome 3.0谷歌瀏覽器

2012-09-13 11:08:53

IBMdw

2012-07-20 10:21:13

Ubuntu開源
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 免费h视频 | 日韩欧美三级 | 日韩中文字幕免费 | 超碰在线影院 | 日韩精品在线网站 | 国产一区二区三区视频 | 99re热这里只有精品视频 | 久久视频免费看 | 视频一区在线观看 | 国产一区二区欧美 | 国产精品成人一区二区三区 | 国产精品久久久久久久久免费桃花 | av性色全交蜜桃成熟时 | 日韩一区二区在线播放 | 欧美aⅴ | 一级做a爰片性色毛片视频停止 | 成人精品福利 | 久久精品视频免费看 | 丁香色婷婷 | 精品一区二区三区91 | 免费性视频 | 亚州精品成人 | 国产精品大片在线观看 | 国产欧美日韩精品一区二区三区 | 国产日韩在线观看一区 | 狠狠婷婷综合久久久久久妖精 | 日韩精品一区二区久久 | 国产精品美女一区二区三区 | 久久国产精品视频 | 欧美日韩电影一区二区 | 午夜三区 | 高清久久| 日韩av一区二区在线观看 | 久久久久网站 | 永久免费在线观看 | 美日韩免费视频 | 国产精品一区二区电影 | 99综合在线 | av黄色免费在线观看 | 香蕉国产在线视频 | 高清视频一区二区三区 |