Spark 核心技術(shù)概要與大數(shù)據(jù)生態(tài)圈的演進(jìn)之路
從 MapReduce 的局限到 Spark 的誕生
我們知道,Google 的 MapReduce 框架是大數(shù)據(jù)處理的開(kāi)山鼻祖,它將復(fù)雜的分布式計(jì)算抽象成了簡(jiǎn)單的 map
和 reduce
兩個(gè)階段,讓工程師可以輕松地在商用硬件集群上處理海量數(shù)據(jù)。但它并非萬(wàn)能藥。
MapReduce 的核心問(wèn)題在于其 無(wú)狀態(tài) 和 基于磁盤(pán) 的設(shè)計(jì)。每?jī)蓚€(gè) MapReduce 作業(yè)之間的數(shù)據(jù)交換,都必須通過(guò)一個(gè)外部穩(wěn)定存儲(chǔ)系統(tǒng)(比如 HDFS)。這意味著,一個(gè)作業(yè)的 reduce
輸出被寫(xiě)到磁盤(pán),下一個(gè)作業(yè)的 map
再?gòu)拇疟P(pán)上把它讀出來(lái)。這個(gè)過(guò)程涉及到大量的磁盤(pán) I/O、數(shù)據(jù)復(fù)制和序列化開(kāi)銷(xiāo)。
對(duì)于只需要“掃一遍”數(shù)據(jù)的簡(jiǎn)單 ETL(提取、轉(zhuǎn)換、加載)任務(wù),這沒(méi)什么問(wèn)題。但對(duì)于那些需要復(fù)用中間結(jié)果的應(yīng)用——比如多次迭代的機(jī)器學(xué)習(xí)算法(邏輯回歸、K-均值聚類)和交互式數(shù)據(jù)挖掘——MapReduce 就顯得力不從心了。想象一下,一個(gè)需要迭代 10 次的 PageRank 算法,用 MapReduce 實(shí)現(xiàn)就意味著要執(zhí)行 10 個(gè)獨(dú)立的 MapReduce 作業(yè),中間結(jié)果來(lái)來(lái)回回在磁盤(pán)上讀寫(xiě) 9 次,性能之差可想而知。
為了解決這個(gè)問(wèn)題,學(xué)術(shù)界和工業(yè)界提出了各種專用框架,例如用于迭代圖計(jì)算的 Pregel 和用于迭代 MapReduce 的 HaLoop 。但這些系統(tǒng)往往只針對(duì)特定計(jì)算模式,缺乏通用性。
正是在這個(gè)背景下, Spark 應(yīng)運(yùn)而生。它的目標(biāo)是提供一個(gè) 通用 的、 高性能 的計(jì)算框架,既能優(yōu)雅地處理迭代和交互式任務(wù),又能兼容 MapReduce 擅長(zhǎng)的批處理場(chǎng)景。Spark 的核心武器,就是一種名為 彈性分布式數(shù)據(jù)集 (Resilient Distributed Datasets, RDD) 的抽象。
Spark 的誕生本身就是一個(gè)傳奇故事。它源于加州大學(xué)伯克利分校的一個(gè)研究項(xiàng)目,其主要貢獻(xiàn)者 Matei Zaharia 憑借這項(xiàng)工作贏得了計(jì)算機(jī)協(xié)會(huì) (ACM) 的博士論文獎(jiǎng)。對(duì)于一個(gè)博士生來(lái)說(shuō),創(chuàng)造出如此規(guī)模和影響力的系統(tǒng)是相當(dāng)了不起的成就。如今,Spark 已經(jīng)被全球各大公司廣泛應(yīng)用于生產(chǎn)環(huán)境,你可以從其商業(yè)化公司 Databricks 的客戶列表中一窺其影響力。
Spark 的核心:RDD (彈性分布式數(shù)據(jù)集)
那么,RDD 到底是什么?
從形式上看,一個(gè) RDD 是一個(gè) 只讀的 、 被分區(qū)的 記錄集合。你可以把它想象成一個(gè)分布在集群成百上千臺(tái)機(jī)器內(nèi)存中的一個(gè)巨大 List
或 Array
,但你不能像操作普通 Array
那樣去修改它的某個(gè)元素。
這聽(tīng)起來(lái)限制很大,但正是這些限制賦予了 Spark 強(qiáng)大的能力。讓我們來(lái)逐一拆解 RDD 的關(guān)鍵特性:
只讀 (Immutable) 與轉(zhuǎn)換 (Transformations)
有人可能會(huì)問(wèn):“如果 RDD 是只讀的,那我們?cè)趺催M(jìn)行計(jì)算呢?” 這就引出了 Spark 的核心編程模型: 轉(zhuǎn)換 (transformation) 。
你不能“修改”一個(gè) RDD,但你可以對(duì)一個(gè) RDD 應(yīng)用一個(gè)轉(zhuǎn)換操作(比如 map
、filter
、join
),然后生成一個(gè) 全新的 RDD 。這就像在函數(shù)式編程里,你不會(huì)去修改一個(gè)傳入的 List
,而是返回一個(gè)新的、經(jīng)過(guò)處理的 List
。
比如,我們有一個(gè)包含了日志文件所有文本行的 lines
RDD,我們可以這樣操作:
// errors RDD 是通過(guò)對(duì) lines RDD 進(jìn)行 filter 轉(zhuǎn)換得到的
val errors = lines.filter(line => line.startsWith("ERROR"))
在這里,lines
RDD 本身沒(méi)有任何變化,我們得到的是一個(gè)全新的、只包含錯(cuò)誤信息的 errors
RDD。這種“只讀”或稱為 不可變性 (immutability) 的設(shè)計(jì)是 Spark 實(shí)現(xiàn)廉價(jià)、高效容錯(cuò)機(jī)制的基石。
分區(qū) (Partitioned)
RDD 在物理上是分布式的,它由多個(gè) 分區(qū) (partition) 組成,每個(gè)分區(qū)是數(shù)據(jù)集的一部分。比如一個(gè) 1TB 的 HDFS 文件,在 Spark 中可以被表示為一個(gè) RDD,這個(gè) RDD 可能有 8000 個(gè)分區(qū)(例如,每個(gè) HDFS 塊對(duì)應(yīng)一個(gè)分區(qū))。
這些分區(qū)分布在集群的不同 工作節(jié)點(diǎn) (Worker) 上,使得計(jì)算可以并行進(jìn)行。Spark 的調(diào)度器會(huì)盡可能地將計(jì)算任務(wù)分配到存儲(chǔ)著對(duì)應(yīng)數(shù)據(jù)分區(qū)的節(jié)點(diǎn)上,這被稱為 數(shù)據(jù)本地性 (data locality) ,它可以極大減少網(wǎng)絡(luò)數(shù)據(jù)傳輸,提升性能。
應(yīng)用程序是如何知道一個(gè) RDD 的位置的呢?在驅(qū)動(dòng)程序 (Driver program) 中,我們用 Scala 的變量名來(lái)指代 RDD 。而每個(gè) RDD 的元數(shù)據(jù)中都包含了其分區(qū)的位置信息。調(diào)度器正是利用這些信息,來(lái)將計(jì)算任務(wù)(比如一個(gè)
map
函數(shù))發(fā)送到離數(shù)據(jù)最近的節(jié)點(diǎn)上執(zhí)行。
惰性計(jì)算 (Lazy Evaluation) 與行動(dòng) (Actions)
在 Spark 中,所有的轉(zhuǎn)換操作都是 惰性 (lazy) 的。什么意思呢?就是當(dāng)你調(diào)用一個(gè) transformation
時(shí)(如 filter
, map
),Spark 并不會(huì)立即執(zhí)行計(jì)算。它只是默默地記下你做了什么操作。
例如,下面的代碼:
val lines = spark.sparkContext.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val hdfsErrors = errors.filter(_.contains("HDFS"))
執(zhí)行完這三行,集群上什么計(jì)算都還沒(méi)發(fā)生。Spark 只是構(gòu)建了一個(gè)計(jì)算計(jì)劃。
那么,計(jì)算何時(shí)才會(huì)真正發(fā)生呢?答案是當(dāng)你調(diào)用一個(gè) 行動(dòng) (action) 操作時(shí)。行動(dòng)操作是那些會(huì)真正觸發(fā)計(jì)算并返回一個(gè)值給驅(qū)動(dòng)程序,或者將數(shù)據(jù)寫(xiě)入到外部存儲(chǔ)的命令。常見(jiàn)的 action
包括 count()
(返回 RDD 的元素個(gè)數(shù))、collect()
(將 RDD 的所有元素以數(shù)組形式返回到驅(qū)動(dòng)程序)、saveAsTextFile()
(將 RDD 內(nèi)容存為文本文件) 等。
當(dāng)你對(duì) hdfsErrors
RDD 調(diào)用 count()
時(shí),Spark 會(huì)審視整個(gè)計(jì)算計(jì)劃,然后說(shuō):“哦,原來(lái)用戶想要計(jì)算 hdfsErrors
的數(shù)量。要得到它,我得先執(zhí)行對(duì) errors
的 filter
,而要得到 errors
,我得先對(duì) lines
進(jìn)行 filter
,而 lines
來(lái)自于 HDFS 文件。” 于是,它將整個(gè)計(jì)算流程打包成任務(wù),分發(fā)到集群上執(zhí)行。
這種惰性計(jì)算的策略,讓 Spark 有機(jī)會(huì)在執(zhí)行前對(duì)整個(gè)計(jì)算流程進(jìn)行優(yōu)化,比如將多個(gè) filter
操作合并(串聯(lián))在一起執(zhí)行,避免產(chǎn)生不必要的中間數(shù)據(jù)。
血緣 (Lineage) 與容錯(cuò)
RDD 最精妙的設(shè)計(jì)在于其容錯(cuò)機(jī)制。前面提到,RDD 是只讀的,并且只能通過(guò)對(duì)其他 RDD 進(jìn)行確定的轉(zhuǎn)換操作來(lái)創(chuàng)建。Spark 會(huì)記錄下這一系列的轉(zhuǎn)換關(guān)系,形成一個(gè) **血緣關(guān)系圖 (lineage graph)**,也叫作 有向無(wú)環(huán)圖 (DAG) 。
這個(gè)血緣圖完整地記錄了任何一個(gè) RDD 是如何從最原始的輸入數(shù)據(jù)一步步計(jì)算得來(lái)的。
現(xiàn)在,假設(shè)集群中一臺(tái)機(jī)器宕機(jī)了,它內(nèi)存中保存的某個(gè) RDD 分區(qū)也隨之丟失。怎么辦?傳統(tǒng)的分布式系統(tǒng)可能需要依賴高成本的數(shù)據(jù)復(fù)制或檢查點(diǎn) (checkpointing) 機(jī)制來(lái)恢復(fù)。
而 Spark 的做法則非常優(yōu)雅:它根本不需要復(fù)制數(shù)據(jù)來(lái)實(shí)現(xiàn)容錯(cuò)。它只需要根據(jù)血緣圖,找到丟失的那個(gè)分區(qū)是如何計(jì)算出來(lái)的,然后在另外一個(gè)空閑的節(jié)點(diǎn)上, 重新執(zhí)行一遍 當(dāng)初的計(jì)算過(guò)程,就能把它恢復(fù)出來(lái)。因?yàn)檗D(zhuǎn)換操作是 確定性 (deterministic) 的,所以重新計(jì)算的結(jié)果和之前會(huì)完全一樣。
這種基于血緣的恢復(fù)方式,開(kāi)銷(xiāo)極小,而且恢復(fù)任務(wù)可以并行進(jìn)行,速度很快。這就是 RDD 中“彈性 (Resilient)”一詞的由來(lái)。
深入 Spark 執(zhí)行:窄依賴與寬依賴
為了優(yōu)化執(zhí)行,Spark 將 RDD 之間的依賴關(guān)系分為兩類:窄依賴 (narrow dependencies) 和 寬依賴 (wide dependencies) 。理解這個(gè)區(qū)別至關(guān)重要。
- 窄依賴 :子 RDD 的每個(gè)分區(qū) 只依賴于 父 RDD 的一個(gè)分區(qū)(或少數(shù)幾個(gè)固定的分區(qū))。典型的例子是
map
和filter
。這種依賴關(guān)系非常高效,因?yàn)橛?jì)算可以在一個(gè)節(jié)點(diǎn)上以流水線 (pipeline) 的方式進(jìn)行,不需要等待其他節(jié)點(diǎn)。 - 寬依賴 :子 RDD 的每個(gè)分區(qū) 可能依賴于 父 RDD 的所有分區(qū)。典型的例子是
groupByKey
和reduceByKey
。groupByKey
需要找到所有分區(qū)中具有相同key
的元素,并將它們聚集在一起。這個(gè)過(guò)程不可避免地需要在集群節(jié)點(diǎn)之間進(jìn)行大規(guī)模的數(shù)據(jù)交換,這個(gè)過(guò)程被稱為 洗牌 (shuffle) 。
你可以通過(guò)下面的示意圖來(lái)理解:
窄依賴 (Narrow Dependency)
父 RDD 子 RDD
[Partition 1] -> [Partition A]
[Partition 2] -> [Partition B]
[Partition 3] -> [Partition C]
(map, filter, union)
寬依賴 (Wide Dependency)
父 RDD 子 RDD
[Partition 1] --\
[Partition 2] -->-- [Partition X]
[Partition 3] --/
[Partition 1] --\
[Partition 2] -->-- [Partition Y]
[Partition 3] --/
(groupByKey, join, distinct)
寬依賴是 Spark 中代價(jià)高昂的操作,因?yàn)樗枰W(wǎng)絡(luò) I/O,并且是一個(gè) 屏障 (barrier) ,后續(xù)步驟必須等待 shuffle 完成才能開(kāi)始。Spark 的調(diào)度器會(huì)根據(jù)血緣圖中的寬依賴來(lái)劃分 階段 (Stage) 。在一個(gè) Stage 內(nèi)部,所有的計(jì)算都是窄依賴,可以高效地流水線執(zhí)行。而 Stage 之間的邊界就是 shuffle 。
一個(gè)完整的例子:用 Spark 實(shí)現(xiàn) PageRank
讓我們結(jié)合 PageRank 例子,看看這些概念是如何協(xié)同工作的。PageRank 是一種迭代算法,用于評(píng)估網(wǎng)頁(yè)的重要性,非常適合用 Spark 實(shí)現(xiàn)。
// 1. 讀取輸入文件,創(chuàng)建初始 RDD
val lines = spark.read.textFile("in").rdd
// 2. 解析鏈接關(guān)系 (from, to),這是一系列窄依賴轉(zhuǎn)換
val links = lines.map { s =>
val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache() // distinct 和 groupByKey 是寬依賴
// 3. 初始化所有頁(yè)面的 rank 為 1.0,這是一個(gè)窄依賴轉(zhuǎn)換
var ranks = links.mapValues(v => 1.0)
// 4. 進(jìn)行 10 次迭代
for (i <- 1 to 10) {
// 將鏈接關(guān)系和排名進(jìn)行 join (寬依賴)
val contribs = links.join(ranks).values.flatMap {
case (urls, rank) =>
val size = urls.size
urls.map(url => (url, rank / size))
}
// 按 URL 聚合貢獻(xiàn)值,并計(jì)算新排名 (寬依賴)
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
// 5. 觸發(fā)計(jì)算,并將結(jié)果收集回驅(qū)動(dòng)程序
val output = ranks.collect()
output.foreach(tup => println(s"${tup._1} has rank: ${tup._2} ."))
這個(gè)例子完美地展示了 Spark 的威力:
- 表達(dá)力 :相比 MapReduce,代碼更簡(jiǎn)潔、更符合邏輯。
- 迭代計(jì)算 :
for
循環(huán)中的links
和ranks
RDD 在每次迭代中都被復(fù)用。 - 持久化 :
links.cache()
是一個(gè)關(guān)鍵優(yōu)化。它告訴 Spark:“我將來(lái)會(huì)頻繁使用links
RDD,請(qǐng)把它緩存到內(nèi)存里吧!”。這樣,在 10 次迭代中,links
RDD 只需要從文件計(jì)算一次,后續(xù) 9 次直接從內(nèi)存讀取,極大地提升了性能。cache()
是persist(StorageLevel.MEMORY_ONLY)
的一個(gè)別名。 - 惰性求值 :在調(diào)用
collect()
之前,整個(gè)復(fù)雜的計(jì)算圖(包含 10 次迭代)只是被定義好了,并沒(méi)有執(zhí)行。 - 容錯(cuò) :如果在迭代的第 8 輪,某個(gè)節(jié)點(diǎn)掛了,導(dǎo)致
ranks
RDD 的某個(gè)分區(qū)丟失,Spark 會(huì)根據(jù)血緣圖自動(dòng)從上一個(gè) Stage 的可用數(shù)據(jù)開(kāi)始重算,恢復(fù)這個(gè)丟失的分區(qū)。
對(duì)于血緣關(guān)系特別長(zhǎng)的 RDD(比如迭代上百次的 PageRank),如果從頭開(kāi)始重算,代價(jià)可能會(huì)很高。為此,Spark 允許用戶手動(dòng)對(duì)某些關(guān)鍵 RDD 設(shè)置 檢查點(diǎn) (checkpoint) ,將它們物化到 HDFS 等可靠存儲(chǔ)上,從而截?cái)嘌夑P(guān)系,降低故障恢復(fù)的時(shí)間。
Spark vs. MapReduce:該用誰(shuí)?
既然 Spark 看起來(lái)全面優(yōu)于 MapReduce,那 MapReduce 是不是就該被淘汰了?
不完全是。雖然 Spark 更強(qiáng)大,但在某些特定場(chǎng)景下,MapReduce 依然有其一席之地。關(guān)鍵在于你的計(jì)算模式。
- 如果你的任務(wù)是 單次遍歷 一個(gè)巨大的數(shù)據(jù)集,進(jìn)行簡(jiǎn)單的映射和聚合(比如統(tǒng)計(jì)詞頻),那么這個(gè)任務(wù)的主要瓶頸是 I/O。Spark 的內(nèi)存緩存優(yōu)勢(shì)無(wú)法體現(xiàn),因?yàn)樗鼪](méi)有任何 RDD 可以被重用。在這種情況下,Spark 和 MapReduce 的性能可能不相上下,甚至 MapReduce 可能因?yàn)槠涑墒旆€(wěn)定而更受青睞。
- 但只要你的任務(wù)涉及 迭代 、 交互式查詢 ,或者包含多個(gè)需要共享中間數(shù)據(jù)的步驟,Spark 的優(yōu)勢(shì)就是壓倒性的。實(shí)驗(yàn)表明,在迭代式機(jī)器學(xué)習(xí)應(yīng)用上,Spark 的性能可以比 Hadoop MapReduce 高出 20 倍。
總的來(lái)說(shuō),Spark 可以看作是 MapReduce 的一種 泛化和超集 。它不僅能完成 MapReduce 的工作,還能高效處理 MapReduce 難以勝任的復(fù)雜計(jì)算模式。
好的,這是對(duì)大數(shù)據(jù)生態(tài)圈演進(jìn)之路的重寫(xiě)與擴(kuò)充版本,希望能解答你的疑惑,并提供一個(gè)更全面的視角。
大數(shù)據(jù)生態(tài)圈的演進(jìn)之路
為了更好地理解 Spark 的地位,我們有必要回顧一下大數(shù)據(jù)技術(shù)棧的演進(jìn)歷史。這個(gè)過(guò)程并非簡(jiǎn)單的技術(shù)迭代,而是一個(gè)不斷發(fā)現(xiàn)問(wèn)題、解決問(wèn)題,從而推動(dòng)整個(gè)領(lǐng)域向前發(fā)展的生動(dòng)故事。
HDFS + MapReduce (Hadoop 1.0):奠基時(shí)代
在 Hadoop 出現(xiàn)之前,處理超過(guò)單機(jī)容量的數(shù)據(jù)是一項(xiàng)極其昂貴且復(fù)雜的任務(wù),通常需要專用的、昂貴的硬件。Hadoop 的誕生,參考了谷歌發(fā)布的兩篇革命性論文(關(guān)于 GFS 和 MapReduce),徹底改變了這一局面。
HDFS (Hadoop Distributed File System) 如何解決存儲(chǔ)問(wèn)題?
HDFS 是谷歌文件系統(tǒng) (GFS) 的開(kāi)源實(shí)現(xiàn),其核心思想是“分而治之”和“容錯(cuò)于廉價(jià)硬件”。當(dāng)一個(gè)大文件(如 1TB 的日志)存入 HDFS 時(shí),它并不會(huì)被完整地存放在一臺(tái)機(jī)器上。相反,它會(huì)被切分成許多固定大小的 數(shù)據(jù)塊 (Block) ,通常為 128MB 或 256MB。這些數(shù)據(jù)塊被分散存儲(chǔ)在集群中成百上千臺(tái)廉價(jià)的服務(wù)器(稱為 DataNode )上。為了實(shí)現(xiàn)容錯(cuò),每個(gè)數(shù)據(jù)塊默認(rèn)還會(huì)有 2 個(gè)副本,存放在不同的 DataNode 上。
集群中還有一個(gè)名為 NameNode 的主節(jié)點(diǎn),它就像是整個(gè)文件系統(tǒng)的“目錄”,記錄著每個(gè)文件的元數(shù)據(jù),比如文件被分成了哪些塊,以及每個(gè)塊和它的副本分別存儲(chǔ)在哪臺(tái) DataNode 上。通過(guò)這種方式,HDFS 實(shí)現(xiàn)了用普通商用硬件存儲(chǔ)海量數(shù)據(jù)的能力,并且當(dāng)任何一臺(tái) DataNode 宕機(jī)時(shí),數(shù)據(jù)都能從副本中恢復(fù),保證了高可靠性。
MapReduce 如何解決計(jì)算問(wèn)題及其局限性?
MapReduce 框架則負(fù)責(zé)處理存儲(chǔ)在 HDFS 上的數(shù)據(jù)。它的主節(jié)點(diǎn) JobTracker 是整個(gè)計(jì)算的大腦。當(dāng)用戶提交一個(gè)計(jì)算任務(wù)時(shí),JobTracker 會(huì)做兩件核心事情:
- 資源管理 :它持續(xù)追蹤集群中所有從節(jié)點(diǎn)( TaskTracker )的心跳,了解每個(gè)節(jié)點(diǎn)上有多少可用的計(jì)算槽位(Map Slot 和 Reduce Slot)。
- 作業(yè)調(diào)度與監(jiān)控 :它接收用戶的 MapReduce 作業(yè),將其拆分成大量的 Map 任務(wù)和 Reduce 任務(wù),然后像一個(gè)調(diào)度中心一樣,將這些任務(wù)分配給有空閑槽位的 TaskTracker 去執(zhí)行。它還負(fù)責(zé)監(jiān)控任務(wù)的執(zhí)行進(jìn)度,一旦發(fā)現(xiàn)某個(gè)任務(wù)失?。ū热绻?jié)點(diǎn)宕機(jī)),就會(huì)在其他節(jié)點(diǎn)上重新調(diào)度該任務(wù)。
這種模式雖然強(qiáng)大,但其局限性也十分明顯。首先,JobTracker 將資源管理和 MapReduce 計(jì)算模型 緊密耦合 ,導(dǎo)致整個(gè)集群只能運(yùn)行 MapReduce 類型的作業(yè),無(wú)法支持像 Spark 這樣的新興計(jì)算框架。其次,JobTracker 本身是一個(gè) 單點(diǎn)故障 (Single Point of Failure) ,一旦它崩潰,整個(gè)集群就會(huì)癱瘓,所有正在運(yùn)行的任務(wù)都會(huì)失敗。最后,在超大規(guī)模集群中,JobTracker 需要管理所有任務(wù),其自身也成為了一個(gè)巨大的性能瓶頸。
YARN (Hadoop 2.0):資源管理的革命
為了解決 Hadoop 1.0 的核心缺陷,Hadoop 2.0 引入了 YARN (Yet Another Resource Negotiator),它將 JobTracker 的功能進(jìn)行了一次優(yōu)雅的“權(quán)責(zé)分離”。
YARN 如何分離職能?
YARN 將 JobTracker 的兩大職責(zé)拆分給了兩個(gè)獨(dú)立的組件:
- 全局的 ResourceManager (RM) :這是一個(gè)純粹的資源調(diào)度中心,是集群的唯一主宰。它只負(fù)責(zé)管理和分配整個(gè)集群的資源(如 CPU、內(nèi)存),但對(duì)應(yīng)用程序的具體內(nèi)容一無(wú)所知。
- 每個(gè)應(yīng)用專屬的 ApplicationMaster (AM) :當(dāng)一個(gè)計(jì)算任務(wù)(無(wú)論是 MapReduce 作業(yè)還是 Spark 作業(yè))被提交時(shí),YARN 的 RM 首先會(huì)啟動(dòng)一個(gè)專屬于該任務(wù)的“司令官”——ApplicationMaster。這個(gè) AM 負(fù)責(zé)向 RM “申請(qǐng)”計(jì)算資源(比如“我需要 100 個(gè)容器,每個(gè)容器 4G 內(nèi)存、2 個(gè)核”),在獲得資源后,再由它自己負(fù)責(zé)在其獲得的資源上啟動(dòng)、管理和監(jiān)控具體的計(jì)算任務(wù)。
- 帶來(lái)了什么?
YARN 本身是 Hadoop 生態(tài)中的一個(gè)核心框架服務(wù)。用戶通常不直接操作 YARN,而是通過(guò) spark-submit
或 mapred
等命令提交應(yīng)用。這些應(yīng)用框架會(huì)自動(dòng)與 YARN 的 RM 通信,啟動(dòng)自己的 AM,從而在集群上運(yùn)行。
這個(gè)解耦是革命性的。它將 Hadoop 集群從一個(gè)“只能跑 MapReduce 的專用平臺(tái)”升級(jí)為了一個(gè)通用的 “數(shù)據(jù)操作系統(tǒng)” 。從此,任何符合 YARN 規(guī)范的計(jì)算框架(如 Spark、Flink、Storm 等)都可以作為“應(yīng)用程序”運(yùn)行在同一個(gè)集群之上,共享硬件資源,極大地提升了集群的利用率和靈活性。
Spark:性能的飛躍
在 YARN 提供的通用資源管理平臺(tái)上,Spark 橫空出世,旨在解決 MapReduce 的性能瓶頸。當(dāng)一個(gè) Spark 應(yīng)用提交到 YARN 集群時(shí),YARN 的 RM 會(huì)先為其啟動(dòng) Spark 的 ApplicationMaster。隨后,這個(gè) AM 會(huì)向 RM 申請(qǐng)更多資源(在 YARN 中稱為容器 Container)來(lái)運(yùn)行 Spark 的 Executor 進(jìn)程,這些 Executor 才是真正執(zhí)行計(jì)算任務(wù)的工作單元。
Spark 的性能優(yōu)勢(shì)源于其核心抽象——RDD。通過(guò)將中間計(jì)算結(jié)果保存在內(nèi)存中,并利用惰性計(jì)算和有向無(wú)環(huán)圖 (DAG) 來(lái)優(yōu)化整個(gè)計(jì)算流程,Spark 避免了 MapReduce 在多步驟任務(wù)中頻繁的、昂貴的磁盤(pán)讀寫(xiě)。對(duì)于需要多次迭代的機(jī)器學(xué)習(xí)算法和需要快速響應(yīng)的交互式數(shù)據(jù)分析場(chǎng)景,Spark 提供了比 MapReduce 高出幾個(gè)數(shù)量級(jí)的性能提升。
Hive:降低大數(shù)據(jù)的門(mén)檻
雖然 MapReduce 和 Spark 提供了強(qiáng)大的計(jì)算能力,但直接用 Java 或 Scala 編寫(xiě)分布式程序?qū)υS多人來(lái)說(shuō)門(mén)檻太高。Hive 的出現(xiàn),就是為了讓更廣泛的用戶群體能夠利用大數(shù)據(jù)的能力。
Hive 是一套完整的 數(shù)據(jù)倉(cāng)庫(kù)基礎(chǔ)設(shè)施 ,它不僅僅是一種語(yǔ)法。其核心組件包括:
- HiveQL :一種與標(biāo)準(zhǔn) SQL 非常相似的查詢語(yǔ)言,讓數(shù)據(jù)分析師可以用熟悉的語(yǔ)法來(lái)查詢海量數(shù)據(jù)。
- 引擎 :Hive 的核心引擎負(fù)責(zé)將用戶提交的 HiveQL 查詢語(yǔ)句進(jìn)行解析、優(yōu)化,并最終 翻譯 成底層的分布式計(jì)算作業(yè)(早期是 MapReduce,現(xiàn)在更多地配置為 Spark 或 Tez)。
- Metastore :這是 Hive 的靈魂所在。它是一個(gè)獨(dú)立的元數(shù)據(jù)存儲(chǔ)服務(wù)(通常使用 MySQL 或 PostgreSQL 實(shí)現(xiàn)),記錄了 HDFS 上非結(jié)構(gòu)化數(shù)據(jù)文件的“結(jié)構(gòu)化”信息。它像一個(gè)戶口本,定義了“表”名、列名、數(shù)據(jù)類型,并指明了這些表對(duì)應(yīng)的數(shù)據(jù)實(shí)際存放在 HDFS 的哪個(gè)目錄下。正是因?yàn)橛辛?Metastore,Hive 才能讓用戶像查詢傳統(tǒng)數(shù)據(jù)庫(kù)表一樣查詢一堆分散的文本文件。
- 服務(wù)接口 (HiveServer2) :Hive 還可以作為一個(gè)常駐服務(wù)運(yùn)行,提供 JDBC/ODBC 接口,允許各種商業(yè)智能 (BI) 工具(如 Tableau)和應(yīng)用程序像連接普通數(shù)據(jù)庫(kù)一樣連接到 Hive,進(jìn)行數(shù)據(jù)查詢和分析。
HBase:賦予 Hadoop 實(shí)時(shí)讀寫(xiě)能力
HDFS 擅長(zhǎng)存儲(chǔ)大文件并支持高吞吐量的順序讀取,但它天生不支持對(duì)數(shù)據(jù)的隨機(jī)、實(shí)時(shí)讀寫(xiě)。你無(wú)法高效地執(zhí)行“查詢用戶 ID 為 123 的個(gè)人信息”這類操作。HBase 的出現(xiàn)正是為了彌補(bǔ)這一短板。
HBase 是一個(gè)構(gòu)建在 HDFS 之上的 NoSQL 數(shù)據(jù)庫(kù)。所謂 NoSQL(“Not Only SQL”),泛指所有非關(guān)系型的數(shù)據(jù)庫(kù)。相比傳統(tǒng)的關(guān)系型數(shù)據(jù)庫(kù)(如 MySQL),NoSQL 數(shù)據(jù)庫(kù)通常具備以下優(yōu)勢(shì):
- 靈活的數(shù)據(jù)模型 :它們不需要預(yù)先定義嚴(yán)格的表結(jié)構(gòu),可以輕松存儲(chǔ)半結(jié)構(gòu)化甚至非結(jié)構(gòu)化數(shù)據(jù)。
- 超強(qiáng)的水平擴(kuò)展能力 :它們被設(shè)計(jì)為可以輕松地?cái)U(kuò)展到成百上千臺(tái)服務(wù)器,以應(yīng)對(duì)數(shù)據(jù)量和訪問(wèn)量的增長(zhǎng)。
- 高可用性 :通常內(nèi)置了數(shù)據(jù)復(fù)制和自動(dòng)故障轉(zhuǎn)移機(jī)制。
HBase 本質(zhì)上是一個(gè)巨大的、稀疏的、分布式的、多維度的、已排序的哈希表。它允許你通過(guò)一個(gè)唯一的行鍵 (Row Key) 在毫秒級(jí)別內(nèi)從億萬(wàn)行數(shù)據(jù)中定位并讀寫(xiě)數(shù)據(jù),完美地滿足了需要對(duì)海量數(shù)據(jù)進(jìn)行實(shí)時(shí)隨機(jī)訪問(wèn)的在線應(yīng)用場(chǎng)景,例如實(shí)時(shí)推薦系統(tǒng)、用戶畫(huà)像查詢、風(fēng)控系統(tǒng)等。
Flink 及其他:擁抱真正的流處理
隨著物聯(lián)網(wǎng)、移動(dòng)互聯(lián)網(wǎng)的發(fā)展,數(shù)據(jù)不再僅僅是離線存儲(chǔ)的“批數(shù)據(jù)”,而是像水流一樣源源不斷產(chǎn)生的“流數(shù)據(jù)”。
Spark Streaming 的微批處理 (Micro-batching)
Spark 最早通過(guò) Spark Streaming 模塊來(lái)處理流數(shù)據(jù)。它的工作模式是“微批處理”:它將實(shí)時(shí)數(shù)據(jù)流按照一個(gè)極小的時(shí)間間隔(如 1 秒)切分成一個(gè)個(gè)微小的數(shù)據(jù)批次(mini-batch),然后用 Spark 引擎快速地處理這些小批次。這種方式巧妙地復(fù)用了 Spark 成熟的批處理引擎,可以實(shí)現(xiàn)很低的延遲(準(zhǔn)實(shí)時(shí)),并且吞吐量大。但它并非真正的“逐條處理”,因?yàn)閿?shù)據(jù)總要攢夠一個(gè)批次的間隔才能被處理,因此存在一個(gè)固有的、最小等于批次間隔的延遲。
Flink 的真正事件驅(qū)動(dòng)流處理
Apache Flink 則代表了另一條技術(shù)路線—— 真正的流處理 。它是一個(gè) 事件驅(qū)動(dòng) (Event-driven) 的框架,其核心理念是“數(shù)據(jù)流是第一公民”。在 Flink 中,每一條數(shù)據(jù)(一個(gè)事件)一旦抵達(dá),就會(huì)被立刻處理,而無(wú)需等待湊成一個(gè)批次。這種模式能夠?qū)崿F(xiàn)最低的毫秒級(jí)甚至亞毫秒級(jí)延遲。Flink 強(qiáng)大的 狀態(tài)管理 和 精確一次 (exactly-once) 處理語(yǔ)義保證,使其非常適合構(gòu)建復(fù)雜的、有狀態(tài)的實(shí)時(shí)應(yīng)用,如實(shí)時(shí)欺詐檢測(cè)、金融交易監(jiān)控和實(shí)時(shí)數(shù)據(jù)大屏等。在 Flink 的世界觀里,批處理只是流處理的一個(gè)特例——一個(gè)有限的數(shù)據(jù)流。
關(guān)于 Spark 的開(kāi)發(fā)語(yǔ)言,為什么選擇 Scala
Spark 主要使用 Scala 開(kāi)發(fā)。這部分是因?yàn)轫?xiàng)目啟動(dòng)時(shí) Scala 正是一門(mén)“新潮”的語(yǔ)言。但更重要的技術(shù)原因是,Scala 作為一門(mén)運(yùn)行在 JVM 上的函數(shù)式語(yǔ)言,它能非常簡(jiǎn)潔、高效地定義和傳遞用戶代碼(即 閉包 (closures) ),并且可以將其序列化后發(fā)送到工作節(jié)點(diǎn)上執(zhí)行,這是實(shí)現(xiàn)分布式計(jì)算的關(guān)鍵。
關(guān)于 RDD 概念的延伸 :雖然 RDD 的概念與 Spark 緊密相連,但其背后的核心思想——基于血緣的恢復(fù)和面向集合的 API——在許多其他系統(tǒng)中都有體現(xiàn),如 DryadLINQ 和 FlumeJava 。值得一提的是,Spark 自身也在不斷進(jìn)化。如今,更推薦使用 DataFrame 和 Dataset API。它們?cè)?RDD 的基礎(chǔ)上,引入了更優(yōu)化的列式存儲(chǔ)和執(zhí)行計(jì)劃,性能通常比直接操作 RDD 更高。
最后,關(guān)于 能源效率 ,雖然它是計(jì)算機(jī)科學(xué)的一個(gè)重要議題,但在分布式系統(tǒng)軟件設(shè)計(jì)層面,它通常不是首要的優(yōu)化目標(biāo)。主要的節(jié)能工作集中在數(shù)據(jù)中心設(shè)計(jì)、硬件(如 CPU 動(dòng)態(tài)調(diào)頻)和散熱等方面。因?yàn)樵谲浖用孢M(jìn)行優(yōu)化的節(jié)能效果,遠(yuǎn)不如在這些物理層面進(jìn)行改進(jìn)來(lái)得顯著。
總而言之, Spark 不僅僅是 MapReduce 的一個(gè)替代品,更是數(shù)據(jù)處理范式的一次重要飛躍。