騰訊面試:詳細介紹 Spark 的 Shuffle 階段數據從輸入到輸出經歷了哪些步驟?
一、Shuffle概述
1. Shuffle的定義與作用
Shuffle,中文可理解為“洗牌操作”,在分布式計算框架如Spark中,它是一個關鍵且復雜的機制,用于在某些操作期間對集群中的數據進行重新分配和組織。在Spark里,數據以分布式方式存儲在集群的多個節點上,每個節點處理數據的一個子集,即分區(Partition)。
Spark的操作可分為窄變換(Narrow Transformations)和寬變換(Wide Transformations)。窄變換如map、filter等,這些操作在單個分區上執行,無需數據在節點之間移動;而寬變換如groupBy、join、reduceByKey等,需要跨分區重新分配數據,因為一個分區的輸出可能依賴于其他分區的數據。Shuffle就是在寬變換期間重新分配數據的過程,它確保相關數據(例如,groupBy中具有相同鍵的所有記錄)被分組到同一節點上,以便進一步處理。
2. Shuffle的重要性
Shuffle在Spark中扮演著至關重要的角色,大多數需要分區鍵(partition key)重新組織數據的場景,如統計、匯總、分組、連接等操作都離不開它。然而,Shuffle也是Spark中最昂貴的操作之一,它不僅會帶來網絡IO,還會引發磁盤IO、數據序列化/反序列化、JVM垃圾回收等操作,因此其性能往往決定著Spark作業的整體性能。
3. Shuffle的演進
Spark的Shuffle機制經歷了不斷的演進和優化,以提高性能和處理大規模數據的能力。
(1) Hash Shuffle(早期):
基于哈希分區,容易在大數據量下產生大量小文件,后續逐步棄用。在Spark 1.2之前,HashShuffleManager是默認的Shuffle管理模式。在未經優化的情況下,每個執行Shuffle Write的任務,會為下一個階段的每個任務創建一個磁盤文件,導致Shuffle Write階段產生大量的磁盤文件,增加了磁盤IO開銷和文件系統壓力。例如,若下一個階段有100個任務,當前階段的每個任務就要創建100份磁盤文件。為了優化這一問題,引入了合并機制,即通過設置spark.shuffle.consolidateFiles參數為true,可以復用磁盤文件,減少文件數量。
(2) Sort Shuffle(Spark 1.2+ 默認):
以排序和聚合的方式減少小文件數,通過排序將相同分區數據集中到一起,寫到更少的文件中。SortShuffleManager的運行機制主要分為普通運行機制和bypass運行機制。當shuffle read task的數量小于等于spark.shuffle.sort.bypassMergeThreshold參數的值(默認為200),且不是聚合類的shuffle算子時,會啟用bypass機制。普通運行機制下,數據會先寫入內存數據結構,根據不同的shuffle算子選用不同的數據結構,如reduceByKey會選用Map數據結構進行局部聚合,join會選用Array數據結構直接寫入內存。當達到臨界閾值時,會將內存數據結構中的數據溢寫到磁盤,并在溢寫前對數據進行排序,分批寫入磁盤文件,最后將所有臨時文件合并成一個磁盤文件,并創建一個單獨的索引文件。bypass機制下,任務會為每個下游任務創建一個臨時磁盤文件,將數據按key進行hash后寫入對應的磁盤文件,最后將所有臨時磁盤文件合并成一個磁盤文件和一個索引文件,該機制不會進行排序操作,節省了排序的性能開銷。
(3) Tungsten Sort Shuffle:
利用Tungsten引擎的內存管理和二進制處理進一步優化Shuffle。Tungsten Sort Shuffle是Sort Shuffle的優化版,它利用Tungsten的內存管理技術以及二進制處理加速Shuffle,減少內存占用和GC壓力。一般在Spark 2.0+中默認已使用Tungsten - level優化(無需額外配置)。
(4) Push - Based Shuffle(Spark 3.0+ 新特性):
在Map端預先將部分shuffle數據推給Reduce端,提升大規模Shuffle的性能并減少數據傾斜。該特性通過在Map階段就將部分shuffle數據推送到下游節點或第三方存儲,加速數據可用性并減少reduce端的拉取壓力,適合大規模、長tail任務場景,減少數據傾斜問題。
二、Shuffle的觸發
1. 觸發Shuffle的操作
在Spark中,以下幾類操作通常會觸發Shuffle:
- 重新調整分區操作:如repartition、coalesce(當shuffle=true時)。repartition會增加或減少RDD的分區數,必然會涉及數據的重新分配;coalesce在shuffle=true時,也會進行數據的重新分區。
- 基于Key的操作:如groupByKey、reduceByKey、aggregateByKey、foldByKey等。這些操作需要將具有相同鍵的數據聚集到一起進行處理,因此會觸發Shuffle。例如,groupByKey會將鍵相同的所有值分組到一個迭代器中,reduceByKey會在每個分區內先進行局部聚合,然后再將相同鍵的數據聚集到一起進行最終的聚合。
- 關聯操作:如join、cogroup等。join操作會將兩個RDD中具有相同鍵的元素進行連接,cogroup會將兩個RDD中具有相同鍵的元素分組到一起,這些操作都需要跨分區重新組織數據,從而觸發Shuffle。
2. 源碼分析觸發Shuffle的過程
在Spark的DAGScheduler(類org.apache.spark.scheduler.DAGScheduler)中,submitJob方法會分析RDD血緣關系并識別Shuffle依賴。當檢測到Shuffle依賴(通過ShuffleDependency)時,會創建一個新的ShuffleMapStage。例如,以下是submitJob方法的部分偽代碼:
def submitJob[T](
rdd: RDD[T],
func:(TaskContext, Iterator[T])=> _,
partitions: Seq[Int],
callSite: CallSite,
resultHandler:(Int, U)=>Unit,
properties: Properties
): JobId ={
// 檢測Shuffle依賴并在需要時創建新階段
if(hasShuffleDependency){
createNewShuffleMapStage()
}
}
三、Shuffle的詳細步驟
1. 數據輸入與轉換操作
(1) 數據輸入
首先,需要創建一個Spark上下文并輸入數據。以下是一個示例代碼:
from pyspark import SparkConf, SparkContext
# 創建Spark配置和上下文
conf = SparkConf().setAppName("Shuffle Example").setMaster("local")
sc = SparkContext(conf=conf)
# 讀入數據
data =[("Alice",1),("Bob",2),("Alice",3),("Bob",4)]
rdd = sc.parallelize(data)
# 輸出初始RDD
print("初始RDD:", rdd.collect())
在上述代碼中,SparkConf().setAppName("Shuffle Example").setMaster("local")設置了Spark應用的名稱和運行模式,sc.parallelize(data)將數據轉換為RDD(彈性分布式數據集),rdd.collect()收集并打印初始RDD的內容。
(2) 執行轉換操作
接下來,可以對RDD進行基本的轉換操作,例如map,對數據進行處理。示例代碼如下:
# 使用map轉換
mapped_rdd = rdd.map(lambda x:(x[0], x[1]*2))
# 輸出轉換后的RDD
print("轉換后的RDD:", mapped_rdd.collect())
在這個例子中,rdd.map(lambda x: (x[0], x[1] * 2))將每個值乘以2,形成新的RDD。
2. Shuffle Write階段
(1) 整體流程
Shuffle Write階段是由當前階段的ShuffleMapTask執行的,主要任務是將每個任務處理的數據按照下游分區的規則進行劃分,并將結果寫入磁盤。上游任務(Map端)按照下游分區的規則將數據切分成多個bucket,每一個bucket對應下游需要讀取的一個分區。默認使用Sort Shuffle時,數據會先寫入到內存中的緩存區(sorter),再通過內存溢寫(spill)到磁盤,同時進行排序和聚合壓縮。如果數據足夠大,可能會多次溢寫到磁盤。最后,Map端會寫出若干個文件,文件中按照分區將數據分塊組織。當Map端任務完成后,會向Driver匯報該map任務輸出的Shuffle數據位置信息(即ShuffleMapStatus)。
(2) HashShuffleManager的Shuffle Write
① 未經優化的HashShuffleManager
假設每個Executor只有1個CPU core,在Shuffle Write階段,將每個task處理的數據按照key進行hash算法,從而將相同key都寫入同一個磁盤文件,而每一個磁盤文件都只屬于下游stage的一個task。在數據寫入磁盤之前,會先將數據寫入內存緩沖區中,當內存緩沖區填滿以后,才會溢寫到磁盤文件中去。下一個stage的task有多少個,當前stage的每個task就要創建多少份磁盤文件。例如,當前stage有20個task,總共有4個Executor,每個Executor執行5個task,下一個stage總共有40個task,那么每個Executor上就要創建200個磁盤文件,所有Executor會創建800個磁盤文件。這種方式會產生大量的磁盤文件,增加磁盤IO開銷和文件系統壓力。
② 優化后的HashShuffleManager
為了優化HashShuffleManager,可以啟用參數spark.shuffle.consolidateFiles,該參數的默認值為false,將其設置為true即可開啟優化機制。開啟優化機制后,在shuffle write過程中,task不是為下游stage的每個task創建一個磁盤文件,而是會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量相同。一個Executor上有多少個CPU core,就可以并行執行多少個task。第一批并行執行的每個task都會創建一個shuffleFileGroup,并將數據寫入對應的磁盤文件內。當執行下一批task時,下一批task會復用之前已有的shuffleFileGroup,包括其中的磁盤文件。這樣可以有效將多個task的磁盤文件進行一定程度上的合并,從而大幅度減少磁盤文件的數量,進而提升shuffle write的性能。例如,上述例子中,優化后每個Executor上只需創建40個磁盤文件,所有Executor會創建160個磁盤文件。
(3) SortShuffleManager的Shuffle Write
① 普通運行機制
在普通運行機制下,數據會先寫入一個內存數據結構中。如果是reduceByKey這種聚合類的shuffle算子,會選用Map數據結構,一邊通過Map進行局部聚合,一邊寫入內存;如果是join這種普通的shuffle算子,會選用Array數據結構,直接寫入內存。接著,每寫一條數據進入內存數據結構之后,就會判斷是否達到某個臨界閾值。如果達到臨界閾值,就會嘗試將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序。排序過后,會分批將數據寫入磁盤文件,默認的batch數量是10000條。一個task將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,產生多個臨時文件。最后,會將之前所有的臨時磁盤文件都進行合并,同時單獨寫一份索引文件,標識下游各個task的數據在文件中的起始偏移量和長度。
② bypass運行機制
bypass運行機制的觸發條件為:shuffle map task數量小于spark.shuffle.sort.bypassMergeThreshold參數的值,且不是聚合類的shuffle算子(比如reduceByKey)。在這種機制下,任務會為每個下游task都創建一個臨時磁盤文件,將數據按key進行hash,將key的hash值寫入對應的磁盤文件之中,寫入磁盤也是先寫入內存緩沖,再溢寫到磁盤的方式。最后,會將所有臨時磁盤文件都合并成一個磁盤文件,并創建一個單獨的索引文件。與普通SortShuffleManager運行機制不同的是,bypass機制不會進行排序操作,節省了排序的性能開銷。
3. Shuffle數據傳輸
在Shuffle Write階段完成后,數據會被存儲在各個節點的磁盤上。當下游階段的任務需要這些數據時,就會觸發Shuffle數據傳輸。下游任務(Reduce端)會根據ShuffleMapStatus中的文件位置,從相應的節點拉取(Fetch)自己需要的分區數據。每個Reduce任務只會取自己相應的分區數據,然后進行后續的處理(如聚合、join等)。如果開啟了External Shuffle Service,則數據是從External Shuffle Service進程中取,這樣可以在Executor退出時保留Shuffle文件,避免因為Executor重啟導致無法讀取Shuffle文件的情況。
4. Shuffle Read階段
(1) 數據拉取
下游階段的ReduceTask會根據Driver中的MapOutputTrackerMaster提供的ShuffleMapStatus信息,知道每個ShuffleMapTask的輸出數據所在的位置。然后,ReduceTask會從各個節點拉取自己需要的數據。在拉取數據時,會使用BlockStoreShuffleFetcher等工具,通過網絡將數據從遠程節點傳輸到本地。拉取過程是一邊拉取一邊進行聚合的,每個shuffle read task都有一個自己的buffer緩沖,每次只能拉取與buffer緩沖相同大小的數據,然后在內存中進行聚合等操作,聚合完一批數據,再拉取下一批,以此類推,直到所有數據拉取完,并得到最終結果。
(2) 數據處理
拉取過來的數據會組成一個內部的ShuffleRDD,優先放入內存,內存不夠用則放入磁盤。在數據處理過程中,如果在Shuffle Write階段開啟了mapSideCombine(如reduceByKey默認開啟),在Shuffle Read階段也會進行合并操作。例如,在shuffle write階段,某個分區的數據為[(hello, 1), (hello, 1)],經過mapSideCombine后變為[(hello, 2)]。在shuffle read階段,會將多個分區中相同鍵的數據進行合并,最終得到聚合結果。排序操作通常發生在shuffle read階段,在進行完mapSideCombine之后,就開始進行排序了。
5. 執行Shuffle后的操作與數據輸出
(1) 執行Shuffle后的操作
在完成Shuffle Read階段后,可以繼續對Shuffle結果進行操作,例如進一步的轉換或過濾。以下是一個示例代碼:
# 使用filter操作
filtered_rdd = result_rdd.filter(lambda x: x[1]>2)
# 輸出過濾后的RDD
print("過濾后的RDD:", filtered_rdd.collect())
在這個例子中,result_rdd.filter(lambda x: x[1] > 2)過濾出聚合值大于2的數據。
(2) 數據輸出
最后,可以將結果保存到文件中或者進行其他操作。示例代碼如下:
# 將結果保存為文本文件
filtered_rdd.saveAsTextFile("output/result.txt")
# 停止Spark上下文
sc.stop()
在上述代碼中,filtered_rdd.saveAsTextFile("output/result.txt")將結果保存到指定的文本文件,sc.stop()停止Spark上下文,釋放資源。
四、Shuffle的性能問題與優化策略
1. Shuffle的性能問題
(1) 增加網絡I/O
Shuffle操作涉及跨網絡的數據交換和傳輸,導致較高的網絡輸入/輸出(I/O)開銷。shuffle數據量的增加會使網絡資源緊張,從而導致執行時間變慢并降低總體吞吐量。例如,在大規模數據處理時,大量的數據需要在節點之間傳輸,會占用大量的網絡帶寬,影響作業的執行效率。
(2) 資源密集型
Shuffle需要額外的計算資源,包括CPU、內存和磁盤I/O。shuffle期間資源利用率的增加會導致資源爭用、作業執行時間延長和效率降低。例如,在Shuffle過程中,如果內存不足,可能會導致GC(垃圾回收)頻率提高,從而影響性能;同時,大量的磁盤讀寫操作也會增加磁盤I/O的壓力。
(3) 產生大量小文件
在早期的HashShuffleManager中,會產生大量的小文件,這會增加磁盤IO開銷和文件系統壓力。例如,每個MapTask為每個下游分區都生成一個文件,如果下游有很多分區,就會生成大量小文件,導致文件系統崩潰的風險增加。
2. Shuffle的優化策略
(1) 減少Shuffle的次數
在某些情況下,可以通過調整計算邏輯,減少Shuffle的次數。例如,使用reduceByKey替代groupByKey可以減少數據的Shuffle,因為前者在Map階段就進行數據合并。示例代碼如下:
val result = rdd.reduceByKey((a, b)=> a + b)
(2) 增加并行度
通過增加分區數來提高并行處理能力,可以使用repartition方法來增加分區數。示例代碼如下:
val repartitionedRDD = rdd.repartition(numPartitions)
(3) 使用Tungsten執行引擎
Spark的Tungsten項目通過物理計劃優化、代碼生成和內存管理等技術,顯著提高了Shuffle的性能。開啟Tungsten,通常在使用DataFrame和Dataset時自動生效。
(4) 避免長鏈Shuffle
將復雜的操作鏈簡化為較少的Shuffle任務。例如,避免多次groupBy操作,可以先合并數據,再進行分組。
(5) 優化內存管理
調節Spark配置,如spark.memory.fraction和spark.memory.storageFraction,以有效利用內存。
(6) 其他優化策略
- 減少列并過濾行:減少混洗的列數并在混洗之前過濾掉不必要的行可以顯著減少傳輸的數據量。通過在管道中盡早消除不相關的數據,最大限度地減少shuffle的影響并提高整體性能。
- 使用廣播哈希連接:廣播哈希連接是一種將連接操作的較小數據集廣播到所有工作節點的技術,從而減少shuffle的需要。這種方法利用內存復制并消除與shuffle相關的網絡開銷,從而提高連接性能。
- 使用分桶技術:Bucketing是一種基于哈希函數將數據組織到桶中的技術。通過預先分區并將數據存儲在桶中,Spark可以避免在連接和聚合等操作期間進行shuffle。這種優化技術減少了跨分區的數據移動,從而縮短了執行時間。
Spark的Shuffle階段是一個復雜且關鍵的過程,它在分布式計算中起著重要的作用,確保了數據的重新分配和組織,使得各種復雜的數據處理操作得以實現。然而,Shuffle也是Spark作業性能的瓶頸之一,涉及大量的網絡IO、磁盤IO和資源消耗。通過了解Shuffle的觸發條件、詳細步驟以及性能優化策略,可以更好地優化Spark作業,提高數據處理效率。在實際應用中,需要根據具體的業務場景和數據特點,選擇合適的Shuffle管理模式和優化策略,以充分發揮Spark的性能優勢。