Spark學習筆記:核心概念可視化
對于在分布式系統上背景知識較少的人來說,學習Spark并非易事。 即使我已經使用Spark已有一段時間了,但我發現全面了解Spark中的所有核心概念仍然很耗時。 Spark的官方文檔提供了非常詳細的解釋,但更多地側重于實際編程方面。 同樣,大量的在線教程可能會讓您不知所措。 因此,在本文中,我想以更直觀的方式記下那些Spark核心概念。 希望您也會發現它有用!
注意:也許您已經對Hadoop有一定的了解,所以我將跳過對瑣碎事物(例如節點和集群)的解釋。
Spark架構和部署模式
簡而言之,Spark在主工作架構上運行,這是并行任務計算模型的典型類型。 運行Spark時,我們可以選擇幾種模式,即本地(主,執行器,驅動程序都在同一臺JVM機器中),獨立,YARN和Mesos。 在這里,我們僅談論YARN上的Spark以及YARN客戶端和YARN群集之間的區別,因為兩者都是最常用的,但非常令人困惑。
下兩張圖片說明了兩種模式的設置。 它們看起來很相似,不是嗎? 但是,通過查看橙色突出顯示的部分,您可能會注意到細微的差別,這就是Spark驅動程序的位置。 這基本上是兩種模式之間的唯一區別。


> Fig 1. Spark deployment mode YARN-client (left) and YARN-cluster (right)
假設您編寫了一個名為spark_hello_world.py的Spark應用程序。 在客戶端模式下,使用spark-submit執行python文件時,驅動程序直接在spark-submit進程內啟動,因此它將與spark_hello_world.py駐留在同一臺計算機上。 初始化Spark上下文時,本地計算機中的驅動程序將連接到群集中的應用程序主機。 從主機開始,Spark啟動了更多執行器。
在群集模式下,spark_hello_world.py代碼位于客戶端計算機中,而客戶端計算機不在群集中。 執行應用程序python代碼時,它將在集群中的一個節點中啟動驅動程序。 與Spark應用程序主文件一起,它可以啟動執行程序并發布應用程序命令。
鑒于設置差別不大,您一定想知道為什么我們需要兩種不同的模式。 在實踐中,這與客戶端計算機與工作計算機在物理上位于同一位置有關。 如果客戶端計算機離工作節點"遙遠",例如 您在筆記本電腦上編寫了spark_hello_world.py,但是工作程序是AWS EC2實例,那么使用群集模式是有意義的,以便最大程度地減少驅動程序和執行程序之間的網絡延遲。 在另一種情況下,如果您的python文件位于與工作節點"非常近"的網關計算機中,則客戶端模式可能是一個不錯的選擇。

執行者
現在,我們了解了Spark集群的設置,讓我們放大到Spark中最重要的元素之一-執行器。 執行器是運行任務并將數據跨任務存儲在內存或磁盤中的過程。
瀏覽Spark文檔時,您可能會對與執行程序相關的可配置參數數量感到驚訝。 讓我們從視覺上看一下它,而不是一次又一次地嘗試弄清楚一個人的多個參數之間的關系。

> Fig 2. Spark executor internals
如圖2所示,在每個執行器中都有一個執行器JVM,用于存儲RDD分區,緩存的RDD分區,運行內部線程和任務。 如果內核數量超出任務要求,則JVM中還將有可用的內核。 這個執行器JVM的綠色塊將成為我們研究執行器中的內存管理的起點。
執行程序內存管理
在執行程序容器中,主要分配了兩個內存塊:內存開銷和執行程序內存。
內存開銷是為虛擬機開銷,內部字符串,其他本機開銷等內容預留的堆外內存。通過將數據緩存在主要Java堆空間之外但仍在RAM中的方式,堆外內存可使高速緩存克服冗長的時間 使用大堆大小時,JVM垃圾收集會暫停。
執行器的內存包括以下三個部分。
- 預留內存
- 用戶內存:用于在Spark中存儲用戶數據結構和內部元數據等內容。
- 存儲和執行內存:用于存儲所有RDD分區并為任務分配運行時內存。
圖3顯示了每個存儲塊的相關參數。 假設我們將spark.executor.memory設置為4 GB,那么Spark將向資源管理器請求總共4.4 GB的內存。 在4 GB的執行程序內存中,我們實際上獲得了3.7 GB,因為其余部分已保留。 默認情況下,我們獲得2.2 GB(0.6 * 3.7)作為執行+存儲內存。 其中1.1 GB用于存儲RDD等存儲空間,其余空間用于執行內存。

> Fig 3. Spark executor memory decomposition
RDD,工作,階段和任務
如果您已經開始使用Spark UI調試Spark應用程序,那么可能很熟悉諸如作業,階段和任務之類的關鍵字。 那么它們與RDD有何關系?
我們知道在RDD上有兩種操作,即轉換(例如,過濾,并集,非重復,交集),這些操作實際上是在沒有實際執行的情況下從現有的RDD中生成一個新的RDD,以及要執行的操作(例如,采取,顯示,收集,foreach) 觸發執行。 轉換RDD時,基于父RDD和轉換后的RDD之間的關系,相關性可以窄或寬。 依賴關系較窄,在父RDD中,一個或多個分區將映射到新RDD中的一個分區。 盡管具有廣泛的依賴性,例如在執行join或sortBy時,但我們需要對分區進行混洗以計算新的RDD。

> Fig 4–1. narrow dependency in RDD transformation

> Fig 4–2. Wide dependency in RDD transformation
因此,作業,階段和任務由操作類型和轉換類型確定。 在RDD上執行操作時,將創建一個作業。 在工作中,可能有多個階段,具體取決于我們是否需要執行廣泛的轉換(即洗牌)。 在每個階段中,可以將一個或多個轉換映射到每個執行程序中的任務。

> Fig 5. Illustration of one Spark job
為了真正理解它,我們來看以下簡單的代碼片段。
- val RDD1 = sc.parallelize(Array('1', '2', '3', '4', '5')).map{ x => val xi = x.toInt; (xi, xi+1) }
- val RDD2 = sc.parallelize(Array('1', '2', '3', '4', '5')).map{ x => val xi = x.toInt; (xi, xi*10) }
- val joinedData = RDD2.join(RDD1)
- val filteredRDD = joinedData.filter{case (k, v) => k % 2 == 0}
- val resultRDD = filteredRDD.mapPartitions{ iter => iter.map{ case (k, (v1, v2) ) => (k, v1+v2) } }
- resultRDD.take(2)
此代碼中包含一些操作,即map,join,filter,mapPartitions和take。 創建RDD時,Spark將分別為RDD1和RDD2生成兩個階段,如階段0和1所示。由于map函數包含一個狹窄的依賴性,因此映射的RDD也將分別包含在階段0和1中。 然后,我們將RDD1和RDD2連接起來,因為連接是包含混洗的廣泛轉換,因此Spark為該操作創建了另一個階段。 之后,filter和mapPartition仍然是第2階段的狹窄轉換,通過調用take(這是一個動作),我們觸發了Spark的執行。

> Fig 6. DAG visualization
因此,這就是Spark的所有基本內容。 希望在閱讀本文之后,這些概念對您來說更加清楚。 學習愉快!