成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

大數(shù)據(jù)計算平臺Spark內(nèi)核全面解讀

大數(shù)據(jù) Spark
Spark是起源于美國加州大學(xué)伯克利分校AMPLab的大數(shù)據(jù)計算平臺,在2010年開源,目前是Apache軟件基金會的頂級項目。隨著Spark在大數(shù)據(jù)計算領(lǐng)域的暫露頭角,越來越多的企業(yè)開始關(guān)注和使用。

1、Spark介紹

Spark是起源于美國加州大學(xué)伯克利分校AMPLab的大數(shù)據(jù)計算平臺,在2010年開源,目前是Apache軟件基金會的***項目。隨著Spark在大數(shù)據(jù)計算領(lǐng)域的暫露頭角,越來越多的企業(yè)開始關(guān)注和使用。2014年11月,Spark在Daytona Gray Sort 100TB Benchmark競賽中打破了由Hadoop MapReduce保持的排序記錄。Spark利用1/10的節(jié)點數(shù),把100TB數(shù)據(jù)的排序時間從72分鐘提高到了23分鐘

Spark在架構(gòu)上包括內(nèi)核部分和4個官方子模塊--Spark SQL、Spark Streaming、機(jī)器學(xué)習(xí)庫MLlib和圖計算庫GraphX。圖1所示為Spark在伯克利的數(shù)據(jù)分析軟件棧BDAS(Berkeley Data Analytics Stack)中的位置。可見Spark專注于數(shù)據(jù)的計算,而數(shù)據(jù)的存儲在生產(chǎn)環(huán)境中往往還是由Hadoop分布式文件系統(tǒng)HDFS承擔(dān)。

圖1 Spark在BDAS中的位置 

Spark被設(shè)計成支持多場景的通用大數(shù)據(jù)計算平臺,它可以解決大數(shù)據(jù)計算中的批處理,交互查詢及流式計算等核心問題。Spark可以從多數(shù)據(jù)源的讀取數(shù)據(jù),并且擁有不斷發(fā)展的機(jī)器學(xué)習(xí)庫和圖計算庫供開發(fā)者使用。數(shù)據(jù)和計算在Spark內(nèi)核及Spark的子模塊中是打通的,這就意味著Spark內(nèi)核和子模塊之間成為一個整體。Spark的各個子模塊以Spark內(nèi)核為基礎(chǔ),進(jìn)一步支持更多的計算場景,例如使用Spark SQL讀入的數(shù)據(jù)可以作為機(jī)器學(xué)習(xí)庫MLlib的輸入。表1列舉了一些在Spark平臺上的計算場景。

表1 Spark的應(yīng)用場景舉例

在本文寫作是,Spark的***版本為1.2.0,文中的示例代碼也來自于這個版本。

2、Spark內(nèi)核介紹 

相信大數(shù)據(jù)工程師都非常了解Hadoop MapReduce一個***的問題是在很多應(yīng)用場景中速度非常慢,只適合離線的計算任務(wù)。這是由于MapReduce需要將任務(wù)劃分成map和reduce兩個階段,map階段產(chǎn)生的中間結(jié)果要寫回磁盤,而在這兩個階段之間需要進(jìn)行shuffle操作。Shuffle操作需要從網(wǎng)絡(luò)中的各個節(jié)點進(jìn)行數(shù)據(jù)拷貝,使其往往成為最為耗時的步驟,這也是Hadoop MapReduce慢的根本原因之一,大量的時間耗費(fèi)在網(wǎng)絡(luò)磁盤IO中而不是用于計算。在一些特定的計算場景中,例如像邏輯回歸這樣的迭代式的計算,MapReduce的弊端會顯得更加明顯。

那Spark是如果設(shè)計分布式計算的呢?首先我們需要理解Spark中最重要的概念--彈性分布數(shù)據(jù)集(Resilient Distributed Dataset),也就是RDD。 

 

2.1 彈性分布數(shù)據(jù)集RDD

RDD是Spark中對數(shù)據(jù)和計算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可變的并能夠被并行操作的數(shù)據(jù)集合。對RDD的操作分為兩種transformation和action。Transformation操作是通過轉(zhuǎn)換從一個或多個RDD生成新的RDD。Action操作是從RDD生成***的計算結(jié)果。在Spark***的版本中,提供豐富的transformation和action操作,比起MapReduce計算模型中僅有的兩種操作,會大大簡化程序開發(fā)的難度。

RDD的生成方式只有兩種,一是從數(shù)據(jù)源讀入,另一種就是從其它RDD通過transformation操作轉(zhuǎn)換。一個典型的Spark程序就是通過Spark上下文環(huán)境(SparkContext)生成一個或多個RDD,在這些RDD上通過一系列的transformation操作生成最終的RDD,***通過調(diào)用最終RDD的action方法輸出結(jié)果。

每個RDD都可以用下面5個特性來表示,其中后兩個為可選的:

  • 分片列表(數(shù)據(jù)塊列表)
  • 計算每個分片的函數(shù)
  • 對父RDD的依賴列表
  • 對key-value類型的RDD的分片器(Partitioner)(可選)
  • 每個數(shù)據(jù)分片的預(yù)定義地址列表(如HDFS上的數(shù)據(jù)塊的地址)(可選)

雖然Spark是基于內(nèi)存的計算,但RDD不光可以存儲在內(nèi)存中,根據(jù)useDisk、useMemory、useOffHeap, deserialized、replication五個參數(shù)的組合Spark提供了12種存儲級別,在后面介紹RDD的容錯機(jī)制時,我們會進(jìn)一步理解。值得注意的是當(dāng)StorageLevel設(shè)置成OFF_HEAP時,RDD實際被保存到Tachyon中。Tachyon是一個基于內(nèi)存的分布式文件系統(tǒng),目前正在快速發(fā)展,本文不做詳細(xì)介紹,可以通過其官方網(wǎng)站進(jìn)一步了解。

  1. class StorageLevel private(
  2.     private var _useDisk: Boolean,
  3.     private var _useMemory: Boolean,
  4.     private var _useOffHeap: Boolean,
  5.     private var _deserialized: Boolean
  6.     private var _replication: Int = 1)
  7.   extends Externalizable { //… }
  8.  
  9. val NONE = new StorageLevel(false, false, false, false)
  10.   val DISK_ONLY = new StorageLevel(true, false, false, false)
  11.   val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  12.   val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  13.   val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  14.   val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  15.   val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  16.   val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  17.   val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  18.   val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  19.   val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  20.   val OFF_HEAP = new StorageLevel(false, false, true, false)

 

2.2 DAG、Stage與任務(wù)的生成

Spark的計算發(fā)生在RDD的action操作,而對action之前的所有transformation,Spark只是記錄下RDD生成的軌跡,而不會觸發(fā)真正的計算。

Spark內(nèi)核會在需要計算發(fā)生的時刻繪制一張關(guān)于計算路徑的有向無環(huán)圖,也就是DAG。舉個例子,在圖2中,從輸入中邏輯上生成A和C兩個RDD,經(jīng)過一系列transformation操作,邏輯上生成了F,注意,我們說的是邏輯上,因為這時候計算沒有發(fā)生,Spark內(nèi)核做的事情只是記錄了RDD的生成和依賴關(guān)系。當(dāng)F要進(jìn)行輸出時,也就是F進(jìn)行了action操作,Spark會根據(jù)RDD的依賴生成DAG,并從起點開始真正的計算。

圖2 邏輯上的計算過程:DAG 

有了計算的DAG圖,Spark內(nèi)核下一步的任務(wù)就是根據(jù)DAG圖將計算劃分成任務(wù)集,也就是Stage,這樣可以將任務(wù)提交到計算節(jié)點進(jìn)行真正的計算。Spark計算的中間結(jié)果默認(rèn)是保存在內(nèi)存中的,Spark在劃分Stage的時候會充分考慮在分布式計算中可流水線計算(pipeline)的部分來提高計算的效率,而在這個過程中,主要的根據(jù)就是RDD的依賴類型。根據(jù)不同的transformation操作,RDD的依賴可以分為窄依賴(Narrow Dependency)和寬依賴(Wide Dependency,在代碼中為ShuffleDependency)兩種類型。窄依賴指的是生成的RDD中每個partition只依賴于父RDD(s) 固定的partition。寬依賴指的是生成的RDD的每一個partition都依賴于父 RDD(s) 所有partition。窄依賴典型的操作有map, filter, union等,寬依賴典型的操作有g(shù)roupByKey, sortByKey等。可以看到,寬依賴往往意味著shuffle操作,這也是Spark劃分stage的主要邊界。對于窄依賴,Spark會將其盡量劃分在同一個stage中,因為它們可以進(jìn)行流水線計算。

圖3 RDD的寬依賴和窄依賴

我們再通過圖4詳細(xì)解釋一下Spark中的Stage劃分。我們從HDFS中讀入數(shù)據(jù)生成3個不同的RDD,通過一系列transformation操作后再將計算結(jié)果保存回HDFS。可以看到這幅DAG中只有join操作是一個寬依賴,Spark內(nèi)核會以此為邊界將其前后劃分成不同的Stage. 同時我們可以注意到,在圖中Stage2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,通過map操作生成的partition可以不用等待整個RDD計算結(jié)束,而是繼續(xù)進(jìn)行union操作,這樣大大提高了計算的效率。

圖4 Spark中的Stage劃分 

#p#

Spark在運(yùn)行時會把Stage包裝成任務(wù)提交,有父Stage的Spark會先提交父Stage。弄清楚了Spark劃分計算的原理,我們再結(jié)合源碼看一看這其中的過程。下面的代碼是DAGScheduler中的得到一個RDD父Stage的函數(shù),可以看到寬依賴為劃分Stage的邊界。

  1. /**
  2.    * Get or create the list of parent stages for a given RDD. The stages will be assigned the
  3.    * provided jobId if they haven't already been created with a lower jobId.
  4.    */
  5.  
  6.   private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
  7.     val parents = new HashSet[Stage]
  8.     val visited = new HashSet[RDD[_]]
  9.     // We are manually maintaining a stack here to prevent StackOverflowError
  10.     // caused by recursively visiting
  11.     val waitingForVisit = new Stack[RDD[_]]
  12.     def visit(r: RDD[_]) {
  13.       if (!visited(r)) {
  14.         visited += r
  15.         // Kind of ugly: need to register RDDs with the cache here since
  16.         // we can't do it in its constructor because # of partitions is unknown
  17.         for (dep <- r.dependencies) {
  18.           dep match {
  19.             case shufDep: ShuffleDependency[_, _, _] =>
  20.               parents += getShuffleMapStage(shufDep, jobId)
  21.             case _ =>
  22.               waitingForVisit.push(dep.rdd)
  23.           }
  24.         }
  25.       }
  26.     }
  27.  
  28.     waitingForVisit.push(rdd)
  29.     while (!waitingForVisit.isEmpty) {
  30.       visit(waitingForVisit.pop())
  31.     }
  32.     parents.toList
  33.   }

上面提到Spark的計算是從RDD調(diào)用action操作時候觸發(fā)的,我們來看一個action的代碼

RDD的collect方法是一個action操作,作用是將RDD中的數(shù)據(jù)返回到一個數(shù)組中。可以看到,在此action中,會觸發(fā)Spark上下文環(huán)境SparkContext中的runJob方法,這是一系列計算的起點。

  1. abstract class RDD[T: ClassTag](
  2.     @transient private var sc: SparkContext,
  3.     @transient private var deps: Seq[Dependency[_]]
  4.   ) extends Serializable with Logging {
  5.   //….
  6. /**
  7.    * Return an array that contains all of the elements in this RDD.
  8.    */
  9.   def collect(): Array[T] = {
  10.     val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  11.     Array.concat(results: _*)
  12.   }
  13. }

SparkContext擁有DAGScheduler的實例,在runJob方法中會進(jìn)一步調(diào)用DAGScheduler的runJob方法。在此時,DAGScheduler會生成DAG和Stage,將Stage提交給TaskScheduler。TaskSchduler將Stage包裝成TaskSet,發(fā)送到Worker節(jié)點進(jìn)行真正的計算,同時還要監(jiān)測任務(wù)狀態(tài),重試失敗和長時間無返回的任務(wù)。整個過程如圖5所示。

 

圖5 Spark中任務(wù)的生成 

 

2.3 RDD的緩存與容錯

上文提到,Spark的計算是從action開始觸發(fā)的,如果在action操作之前邏輯上很多transformation操作,一旦中間發(fā)生計算失敗,Spark會重新提交任務(wù),這在很多場景中代價過大。還有一些場景,如有些迭代算法,計算的中間結(jié)果會被重復(fù)使用,重復(fù)計算同樣增加計算時間和造成資源浪費(fèi)。因此,在提高計算效率和更好支持容錯,Spark提供了基于RDDcache機(jī)制和checkpoint機(jī)制。

我們可以通過RDD的toDebugString來查看其遞歸的依賴信息,圖6展示了在spark shell中通過調(diào)用這個函數(shù)來查看wordCount RDD的依賴關(guān)系,也就是它的Lineage.

圖6 RDD wordCount的lineage 

如果發(fā)現(xiàn)Lineage過長或者里面有被多次重復(fù)使用的RDD,我們就可以考慮使用cache機(jī)制或checkpoint機(jī)制了。

我們可以通過在程序中直接調(diào)用RDD的cache方法將其保存在內(nèi)存中,這樣這個RDD就可以被多個任務(wù)共享,避免重復(fù)計算。另外,RDD還提供了更為靈活的persist方法,可以指定存儲級別。從源碼中可以看到RDD.cache就是簡單的調(diào)用了RDD.persist(StorageLevel.MEMORY_ONLY)。

  1. /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  2.   def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
  3.   def cache(): this.type = persist()

同樣,我們可以調(diào)用RDD的checkpoint方法將其保存到磁盤。我們需要在SparkContext中設(shè)置checkpoint的目錄,否則調(diào)用會拋出異常。值得注意的是,在調(diào)用checkpoint之前建議先調(diào)用cache方法將RDD放入內(nèi)存,否則將RDD保存到文件的時候需要重新計算。 

  1.   /**
  2.    * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
  3.    * directory set with SparkContext.setCheckpointDir() and all references to its parent
  4.    * RDDs will be removed. This function must be called before any job has been
  5.    * executed on this RDD. It is strongly recommended that this RDD is persisted in
  6.    * memory, otherwise saving it on a file will require recomputation.
  7.    */
  8.   def checkpoint() {
  9.     if (context.checkpointDir.isEmpty) {
  10.       throw new SparkException("Checkpoint directory has not been set in the SparkContext")
  11.     } else if (checkpointData.isEmpty) {
  12.       checkpointData = Some(new RDDCheckpointData(this))
  13.       checkpointData.get.markForCheckpoint()
  14.     }
  15.   }

Cache機(jī)制和checkpoint機(jī)制的差別在于cache將RDD保存到內(nèi)存,并保留Lineage,如果緩存失效RDD還可以通過Lineage重建。而checkpoint將RDD落地到磁盤并切斷Lineage,由文件系統(tǒng)保證其重建。

 

2.4 Spark任務(wù)的部署

Spark的集群部署分為Standalone、Mesos和Yarn三種模式,我們以Standalone模式為例,簡單介紹Spark程序的部署。如圖7示,集群中的Spark程序運(yùn)行時分為3種角色,driver, master和worker(slave)。在集群啟動前,首先要配置master和worker節(jié)點。啟動集群后,worker節(jié)點會向master節(jié)點注冊自己,master節(jié)點會維護(hù)worker節(jié)點的心跳。Spark程序都需要先創(chuàng)建Spark上下文環(huán)境,也就是SparkContext。創(chuàng)建SparkContext的進(jìn)程就成為了driver角色,上一節(jié)提到的DAGScheduler和TaskScheduler都在driver中運(yùn)行。Spark程序在提交時要指定master的地址,這樣可以在程序啟動時向master申請worker的計算資源。Driver,master和worker之間的通信由Akka支持。Akka 也使用 Scala 編寫,用于構(gòu)建可容錯的、高可伸縮性的Actor 模型應(yīng)用。關(guān)于Akka,可以訪問其官方網(wǎng)站進(jìn)行進(jìn)一步了解,本文不做詳細(xì)介紹。

圖7 Spark任務(wù)部署

 

3、更深一步了解Spark內(nèi)核

了解了Spark內(nèi)核的基本概念和實現(xiàn)后,更深一步理解其工作原理的***方法就是閱讀源碼。***的Spark源碼可以從Spark官方網(wǎng)站下載。源碼推薦使用IntelliJ IDEA閱讀,會自動安裝Scala插件。讀者可以從core工程,也就是Spark內(nèi)核工程開始閱讀,更可以設(shè)置斷點嘗試跟蹤一個任務(wù)的執(zhí)行。另外,讀者還可以通過分析Spark的日志來進(jìn)一步理解Spark的運(yùn)行機(jī)制,Spark使用log4j記錄日志,可以在啟動集群前修改log4j的配置文件來配置日志輸出和格式。

責(zé)任編輯:林師授 來源: 明略數(shù)據(jù)
相關(guān)推薦

2012-12-20 13:02:20

2019-06-27 11:18:00

Spark內(nèi)存大數(shù)據(jù)

2019-04-08 17:11:46

大數(shù)據(jù)框架Spark

2018-06-07 16:33:31

大數(shù)據(jù)冷熱數(shù)據(jù)存儲平臺

2014-06-25 13:57:50

云計算大數(shù)據(jù)Spark

2013-08-08 10:07:43

大數(shù)據(jù)存儲結(jié)構(gòu)化數(shù)據(jù)

2014-07-04 10:01:08

Spark集群

2022-07-20 15:10:38

Docker大數(shù)據(jù)平臺

2017-04-24 12:07:44

Spark大數(shù)據(jù)并行計算

2015-08-18 13:35:42

spark

2016-12-13 09:10:36

大數(shù)據(jù)人工智能計算工具

2009-12-15 15:35:56

Ruby symbol

2015-12-16 11:48:42

京東大數(shù)據(jù)

2015-08-04 09:22:37

2015-12-17 11:23:44

京東大數(shù)據(jù)

2012-09-25 09:37:23

大數(shù)據(jù)云計算迪士尼

2017-09-02 10:03:10

大數(shù)據(jù)分析大數(shù)據(jù)數(shù)據(jù)

2018-12-08 11:16:51

京東

2017-05-05 12:59:00

大數(shù)據(jù)物聯(lián)網(wǎng)安全
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 欧美综合一区二区 | 久久黄色 | 欧美一区二区在线观看 | 色综合99| 国产成人综合网 | 欧美一级免费看 | xxxcom在线观看 | 国产精品日韩欧美一区二区三区 | 香蕉视频久久久 | 久久久久成人精品 | 欧美一区二区三区 | 亚洲91av | 国产三级在线观看播放 | 国产激情视频 | 日韩在线免费播放 | 国产高清在线精品一区二区三区 | 成人三级视频 | 欧美天堂| 一区二区三区精品视频 | 黑人精品xxx一区一二区 | 日本精品网站 | 国产自产c区 | 黄色片免费在线观看 | 岛国av免费在线观看 | 欧美乱码精品一区二区三区 | 国产成人综合亚洲欧美94在线 | 日韩午夜在线观看 | a级在线免费视频 | 日本黄色短片 | 婷婷色在线 | 日韩1区 | 国产一区二区三区四区五区加勒比 | 热re99久久精品国99热观看 | www.日日夜夜 | 91综合在线观看 | 久久国产欧美日韩精品 | 97超级碰碰| 亚洲国产精品人人爽夜夜爽 | 欧美色综合网 | 亚洲一区二区三区四区五区中文 | 亚洲一区二区三区免费视频 |