Spark及Spark Streaming核心原理及實踐
導語 : Spark 已經成為廣告、報表以及推薦系統等大數據計算場景中***系統,因效率高,易用以及通用性越來越得到大家的青睞,我自己最近半年在接觸spark以及spark streaming之后,對spark技術的使用有一些自己的經驗積累以及心得體會,在此分享給大家。本文依次從spark生態,原理,基本概念,spark streaming原理及實踐,還有spark調優以及環境搭建等方面進行介紹,希望對大家有所幫助。
spark 生態及運行原理

Spark 特點
- 運行速度快 => Spark擁有DAG執行引擎,支持在內存中對數據進行迭代計算。官方提供的數據表明,如果數據由磁盤讀取,速度是Hadoop MapReduce的10倍以上,如果數據從內存中讀取,速度可以高達100多倍。
- 適用場景廣泛 => 大數據分析統計,實時數據處理,圖計算及機器學習
- 易用性 => 編寫簡單,支持80種以上的高級算子,支持多種語言,數據源豐富,可部署在多種集群中
容錯性高。Spark引進了彈性分布式數據集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一組節點中的只讀對象集合,這些集合是彈性的,如果數據集一部分丟失,則可以根據“血統”(即充許基于數據衍生過程)對它們進行重建。另外在RDD計算時可以通過CheckPoint來實現容錯,而CheckPoint有兩種方式:CheckPoint Data,和Logging The Updates,用戶可以控制采用哪種方式來實現容錯。
Spark的適用場景
目前大數據處理場景有以下幾個類型:
- 復雜的批量處理(Batch Data Processing),偏重點在于處理海量數據的能力,至于處理速度可忍受,通常的時間可能是在數十分鐘到數小時;
- 基于歷史數據的交互式查詢(Interactive Query),通常的時間在數十秒到數十分鐘之間
- 基于實時數據流的數據處理(Streaming Data Processing),通常在數百毫秒到數秒之間
Spark成功案例 目前大數據在互聯網公司主要應用在廣告、報表、推薦系統等業務上。在廣告業務方面需要大數據做應用分析、效果分析、定向優化等,在推薦系統方面則需要大數據優化相關排名、個性化推薦以及熱點點擊分析等。這些應用場景的普遍特點是計算量大、效率要求高。騰訊 / yahoo / 淘寶 / 優酷土豆
spark運行架構
spark基礎運行架構如下所示:

spark結合yarn集群背后的運行流程如下所示:

spark 運行流程:
Spark架構采用了分布式計算中的Master-Slave模型。Master是對應集群中的含有Master進程的節點,Slave是集群中含有Worker進程的節點。Master作為整個集群的控制器,負責整個集群的正常運行;Worker相當于計算節點,接收主節點命令與進行狀態匯報;Executor負責任務的執行;Client作為用戶的客戶端負責提交應用,Driver負責控制一個應用的執行。
Spark集群部署后,需要在主節點和從節點分別啟動Master進程和Worker進程,對整個集群進行控制。在一個Spark應用的執行過程中,Driver和Worker是兩個重要角色。Driver 程序是應用邏輯執行的起點,負責作業的調度,即Task任務的分發,而多個Worker用來管理計算節點和創建Executor并行處理任務。在執行階段,Driver會將Task和Task所依賴的file和jar序列化后傳遞給對應的Worker機器,同時Executor對相應數據分區的任務進行處理。
- Excecutor /Task 每個程序自有,不同程序互相隔離,task多線程并行,
- 集群對Spark透明,Spark只要能獲取相關節點和進程
- Driver 與Executor保持通信,協作處理
三種集群模式:
- Standalone 獨立集群
- Mesos, apache mesos
- Yarn, hadoop yarn
基本概念:
- Application =>Spark的應用程序,包含一個Driver program和若干Executor
- SparkContext => Spark應用程序的入口,負責調度各個運算資源,協調各個Worker Node上的Executor
- Driver Program => 運行Application的main()函數并且創建SparkContext
- Executor => 是為Application運行在Worker node上的一個進程,該進程負責運行Task,并且負責將數據存在內存或者磁盤上。每個Application都會申請各自的Executor來處理任務
- Cluster Manager =>在集群上獲取資源的外部服務 (例如:Standalone、Mesos、Yarn)
- Worker Node => 集群中任何可以運行Application代碼的節點,運行一個或多個Executor進程
- Task => 運行在Executor上的工作單元
- Job => SparkContext提交的具體Action操作,常和Action對應
- Stage => 每個Job會被拆分很多組task,每組任務被稱為Stage,也稱TaskSet
- RDD => 是Resilient distributed datasets的簡稱,中文為彈性分布式數據集;是Spark最核心的模塊和類
- DAGScheduler => 根據Job構建基于Stage的DAG,并提交Stage給TaskScheduler
- TaskScheduler => 將Taskset提交給Worker node集群運行并返回結果
- Transformations => 是Spark API的一種類型,Transformation返回值還是一個RDD,所有的Transformation采用的都是懶策略,如果只是將Transformation提交是不會執行計算的
- Action => 是Spark API的一種類型,Action返回值不是一個RDD,而是一個scala集合;計算只有在Action被提交的時候計算才被觸發。
Spark核心概念之RDD

Spark核心概念之Transformations / Actions

Transformation返回值還是一個RDD。它使用了鏈式調用的設計模式,對一個RDD進行計算后,變換成另外一個RDD,然后這個RDD又可以進行另外一次轉換。這個過程是分布式的。 Action返回值不是一個RDD。它要么是一個Scala的普通集合,要么是一個值,要么是空,最終或返回到Driver程序,或把RDD寫入到文件系統中。
Action是返回值返回給driver或者存儲到文件,是RDD到result的變換,Transformation是RDD到RDD的變換。
只有action執行時,rdd才會被計算生成,這是rdd懶惰執行的根本所在。
Spark核心概念之Jobs / Stage
- Job => 包含多個task的并行計算,一個action觸發一個job
- stage => 一個job會被拆為多組task,每組任務稱為一個stage,以shuffle進行劃分

Spark核心概念之Shuffle
以reduceByKey為例解釋shuffle過程。

在沒有task的文件分片合并下的shuffle過程如下:(spark.shuffle.consolidateFiles=false)

fetch 來的數據存放到哪里?
剛 fetch 來的 FileSegment 存放在 softBuffer 緩沖區,經過處理后的數據放在內存 + 磁盤上。這里我們主要討論處理后的數據,可以靈活設置這些數據是“只用內存”還是“內存+磁盤”。如果spark.shuffle.spill = false就只用內存。由于不要求數據有序,shuffle write 的任務很簡單:將數據 partition 好,并持久化。之所以要持久化,一方面是要減少內存存儲空間壓力,另一方面也是為了 fault-tolerance。
shuffle之所以需要把中間結果放到磁盤文件中,是因為雖然上一批task結束了,下一批task還需要使用內存。如果全部放在內存中,內存會不夠。另外一方面為了容錯,防止任務掛掉。
存在問題如下:
- 產生的 FileSegment 過多。每個 ShuffleMapTask 產生 R(reducer 個數)個 FileSegment,M 個 ShuffleMapTask 就會產生 M * R 個文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會存在大量的數據文件。
- 緩沖區占用內存空間大。每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產生 MR 個 bucket。雖然一個 ShuffleMapTask 結束后,對應的緩沖區可以被回收,但一個 worker node 上同時存在的 bucket 個數可以達到 cores R 個(一般 worker 同時可以運行 cores 個 ShuffleMapTask),占用的內存空間也就達到了cores R 32 KB。對于 8 核 1000 個 reducer 來說,占用內存就是 256MB。
為了解決上述問題,我們可以使用文件合并的功能。
在進行task的文件分片合并下的shuffle過程如下:(spark.shuffle.consolidateFiles=true)

可以明顯看出,在一個 core 上連續執行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執行完的 ShuffleMapTask 形成 ShuffleBlock i,后執行的 ShuffleMapTask 可以將輸出數據直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。下一個 stage 的 reducer 只需要 fetch 整個 ShuffleFile 就行了。這樣,每個 worker 持有的文件數降為 cores * R。FileConsolidation 功能可以通過spark.shuffle.consolidateFiles=true來開啟。
Spark核心概念之Cache
- val rdd1 = ... // 讀取hdfs數據,加載成RDD
- rdd1.cache
- val rdd2 = rdd1.map(...)
- val rdd3 = rdd1.filter(...)
- rdd2.take(10).foreach(println)
- rdd3.take(10).foreach(println)
- rdd1.unpersist
cache和unpersisit兩個操作比較特殊,他們既不是action也不是transformation。cache會將標記需要緩存的rdd,真正緩存是在***次被相關action調用后才緩存;unpersisit是抹掉該標記,并且立刻釋放內存。只有action執行時,rdd1才會開始創建并進行后續的rdd變換計算。
cache其實也是調用的persist持久化函數,只是選擇的持久化級別為MEMORY_ONLY。
persist支持的RDD持久化級別如下:

需要注意的問題: Cache或shuffle場景序列化時, spark序列化不支持protobuf message,需要java 可以serializable的對象。一旦在序列化用到不支持java serializable的對象就會出現上述錯誤。 Spark只要寫磁盤,就會用到序列化。除了shuffle階段和persist會序列化,其他時候RDD處理都在內存中,不會用到序列化。
Spark Streaming運行原理
spark程序是使用一個spark應用實例一次性對一批歷史數據進行處理,spark streaming是將持續不斷輸入的數據流轉換成多個batch分片,使用一批spark應用實例進行處理。

從原理上看,把傳統的spark批處理程序變成streaming程序,spark需要構建什么?


需要構建4個東西:
- 一個靜態的 RDD DAG 的模板,來表示處理邏輯;
- 一個動態的工作控制器,將連續的 streaming data 切分數據片段,并按照模板復制出新的 RDD 3. DAG 的實例,對數據片段進行處理;
- Receiver進行原始數據的產生和導入;Receiver將接收到的數據合并為數據塊并存到內存或硬盤中,供后續batch RDD進行消費
- 對長時運行任務的保障,包括輸入數據的失效后的重構,處理任務的失敗后的重調。
具體streaming的詳細原理可以參考廣點通出品的源碼解析文章:
- https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/0.1%20Spark%20Streaming%20%E5%AE%9E%E7%8E%B0%E6%80%9D%E8%B7%AF%E4%B8%8E%E6%A8%A1%E5%9D%97%E6%A6%82%E8%BF%B0.md#24
對于spark streaming需要注意以下三點:
盡量保證每個work節點中的數據不要落盤,以提升執行效率。

保證每個batch的數據能夠在batch interval時間內處理完畢,以免造成數據堆積。

使用steven提供的框架進行數據接收時的預處理,減少不必要數據的存儲和傳輸。從tdbank中接收后轉儲前進行過濾,而不是在task具體處理時才進行過濾。


Spark 資源調優
內存管理:

Executor的內存主要分為三塊:
- ***塊是讓task執行我們自己編寫的代碼時使用,默認是占Executor總內存的20%;
- 第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用,默認也是占Executor總內存的20%;
- 第三塊是讓RDD持久化時使用,默認占Executor總內存的60%。
每個task以及每個executor占用的內存需要分析一下。每個task處理一個partiiton的數據,分片太少,會造成內存不夠。
其他資源配置:

具體調優可以參考美團點評出品的調優文章:
- http://tech.meituan.com/spark-tuning-basic.html
- http://tech.meituan.com/spark-tuning-pro.html
Spark 環境搭建
spark tdw以及tdbank api文檔:
- http://git.code.oa.com/tdw/tdw-spark-common/wikis/api