為何Spark在編程界越來越吃香?Spark將成為數(shù)據(jù)科學家的統(tǒng)一平臺
前言
統(tǒng)計科學家使用交互式的統(tǒng)計工具(比如R)來回答數(shù)據(jù)中的問題,獲得全景的認識。與之相比,數(shù)據(jù)工程師則更像一名程序員,他們在服務器上編寫代碼,創(chuàng)建和應用機器學習模型,熟悉C++和Java等系統(tǒng)級語言,經(jīng)常需要和企業(yè)級數(shù)據(jù)中心的某些組件打交道,比如Hadoop。
而有的數(shù)據(jù)科學家專注于更細的領域,就像精通R但從未聽說過Python或者scikit-learn(反之亦然),即便兩者都提供了豐富的統(tǒng)計庫。(文末附有驚喜!)
Spark相比其他工具
如果可以提供一種統(tǒng)一的工具,運行在統(tǒng)一的架構,用統(tǒng)一的語言編程,并可以同時滿足統(tǒng)計科學家和數(shù)據(jù)工程師的需求,那該多好啊。難道為了研究數(shù)據(jù),我就必須去學一種像Python或R的語言?我一直使用傳統(tǒng)的數(shù)據(jù)分析工具,難道為了應對大規(guī)模計算,就必須去懂MapReduce?正是統(tǒng)計工具的不完美造就了這種局面:
- R提供了一個豐富的統(tǒng)計分析和機器學習的解釋器。但R難以在分布式條件下執(zhí)行數(shù)據(jù)的分析和清洗,以便開展其所擅長的數(shù)據(jù)分析,也不以一種主流的開發(fā)語言為人所知。
- Python是-種通用的編程語言,也不乏出色的第三方數(shù)據(jù)分析庫(像Pandas和scikit-learn),但Python也有和R一樣的缺陷:只能局限在處理單機能負載的數(shù)據(jù)量。
- 在經(jīng)典的MapReduce計算框架上開發(fā)分布式的機器學習算法是可行的(參考Mahout),但程序員需要從零開始,更別說移植復雜計算的難度。
- 為降低復雜計算移植到MapReduce的難度,Crunch 提供一個簡單的、傻瓜式的Java API,但MapReduce天生決定了它在迭代計算方面是低效的,盡管大多數(shù)機器學習算法都需要迭代計算。

Spark的優(yōu)勢
Spark是一個超有潛力的通用數(shù)據(jù)計算平臺,無論是對統(tǒng)計科學家還是數(shù)據(jù)工程師。大部分人討論到Spark時,總是注意到將數(shù)據(jù)駐留內存以提高計算效率的方面(相對MapReduce),Spark 擁有許多的特征,使之真正成為一個融合統(tǒng)計科學和數(shù)據(jù)工程的交叉點:
- Spark附帶了一個機器學習庫MLib,雖然只是在初始階段。
- Spark是用Scala語言編寫的,運行在Java虛擬機上,同時也提供像R和Python的命令行解釋器。
- 對Java程序員,Scala 的學習曲線是比較陡峭的,但所幸Scala可以兼容一切的Java庫。
- Spark的RDD(彈性分布式數(shù)據(jù)集),是Crunch開發(fā)者熟知的--種數(shù)據(jù)結構。
- Spark模仿了Scala 的集合計算API,對Java和Scala開發(fā)者來說耳熟能詳,而Python開發(fā)者也不難上手,而Scala對統(tǒng)計計算的支持也不錯。
- Spark和其底層的Scala語言,并不只是為機器學習而誕生的,除此之外,像數(shù)據(jù)訪問、日志ETL和整合都可以通過API輕松搞定。就像Python,你可以把整個數(shù)據(jù)計算流程搬到Spark平臺.上來,而不僅僅是模型擬合和分析。
在命令行解釋器中執(zhí)行的代碼,和編譯后運行的效果相同。而且,命令行的輸入可以得到實時反饋,你將看到數(shù)據(jù)透明地在集群間傳遞與計算。
Spark和MLib還有待完善整個項目有不少bug,效率也還有提升的空間,和YARN的整合也存在問題。Spark還沒辦法提供像R那樣豐富的數(shù)據(jù)分析函數(shù)。但Spark已然是世界上最好的數(shù)據(jù)平臺,足以讓來自任何背景的數(shù)據(jù)科學家側目。

Stack Overflow問題的自動標注
Stack Overflow是一個著名的軟件技術問答平臺,在上面提的每個問題有可能被打上若干個短文本的標簽,比如java或者sql,我們的目標在于建立一.套系統(tǒng),使用ALS推薦算法,為新問題的標簽提供預測和建議。從推薦系統(tǒng)的角度,你可以把問題想象成user,把標簽想象成item。
首先,從Stack Overflow下載官方提供的截至20140120的問答數(shù)據(jù)
- sta ckoverflow. com-Posts. 7z
這是一個能夠直接用于分布式計算的bzip格式文件,但在我們的場景下,必須先解壓并拷
貝到HDFS
- bzcat stackover flow. com-Posts.7z| hdfs dfs -put一/user /srowen/ Posts. xml
解壓后的文件大約是24.4GB,包含210萬個問題,1800 萬個回答,總共標注了930萬個標簽,這些標簽排重之后大概是34000個。
確認機器安裝了Spark 之后,輸入spark-shell即可打開Scala的REPL環(huán)境。首先,我們讀取一個存儲在HDFS的Posts. xm文件:
- val postsXML = sC. textFile("hdfs:/ //user /srowen/Posts. xml")
這時命令行工具會返回:
- postsXML: org. apache. spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12
顯示文本文件已轉化為一個String型的RDD,你可以通過調用RDD的函數(shù),實現(xiàn)任意的查詢運算。比如統(tǒng)計文件的行數(shù):
- postsXML. count
這條指令生成大量的輸出,顯示Spark正在利用分布式的環(huán)境計數(shù),最終打印出18066983。
下一步,將XML文件的每- -行都存入形如(questionID, tag)的元組。得益于Scala的函數(shù)式編程的風格,RDD和Scala集合-樣可以使用map等方法:
- val postIDTags = postsXML. flatMap { line =>
- // Matches Id=".. ."
- Tags="..." in line
- val idTagRegex = "Id=\"(\\d+)\". +Tags=\"([^\"]+)\"".r
- // // Finds tags like <TAG> value from above
- val tagRegex = "<([^&]+)>".r
- // Yields 0 or 1 matches:
- idTagRegex. findFirstMatchIn(line) match {
- // No match -- not a line
- case None => None
- // Match, and can extract ID and tags from m
- case Some(m) => {
- val postID = m. group(1) . toInt
- val tagsString = m. group(2)
- // Pick out just TAG matching group
- val tags = tagRegex. findAllMatchIn(tagsString)。map(_ . group
- (1)) . toList
- // Keep only question with at least 4 tags, and map to (pos
- t,tag) tuples
- if (tags.size >= 4) tags . map( (postID,_)) else None
- }
- }
- // Because of flatMap,individual lists will concatenate
- // into one collection of tuples
- }
你會發(fā)現(xiàn)這條指令的執(zhí)行是立即返回的,而不像count一樣需要等待,因為到目前為止,Spark并未啟動任何主機間的數(shù)據(jù)變換。
ALS的MLib實現(xiàn)必須使用數(shù)值ID而非字符串作為惟一標識,而問題的標簽數(shù)據(jù)是字符串格式的,所以需要把字符串哈希成一個非負整數(shù),同時保留非負整數(shù)到字符串的映射。這里我們先定義一個哈希函數(shù)以便復用。
- def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
- var tagHashes = postIDTags .map(_._2) .distinct. map(tag => (nnHash
- (tag) , tag))
現(xiàn)在把元組轉換為ALS計算所需的輸入:
- import org. apache. spark. mllib. recommendation._
- // Convert to Rating(Int ,Int,Double) objects
- val alsInput = postIDTags.map(t => Rating(t. _1, nnHash(t._2), 1.
- 0) )
- // Train model with 40 features, 10 iterations of ALS
- val model = ALS. trainImplicit(alsInput, 40,10)
這一步生成特征矩陣,可以被用來預測問題與標簽之間的關聯(lián)。由于目前MLib還處于不完善的狀態(tài),沒有提供一個recommend的接口來獲取建議的標簽,我們可以簡單定義一個:
- def
- recommend (questionID: Int, howMany: Int = 5): Array[(String ,
- Double)] = {
- // Build list of one question and all items and predict value f
- or all of them
- val predictions = model. predict(tagHashes.map(t => (questionI
- D,t._1)))
- // Get top howMany recommendations ordered by prediction value
- val topN = predictions. top ( howMany) (Ordering . by [Rating,Doub1
- e](_.rating))
- // Translate back to tags from IDs
- topN . map(r => (tagHashes. lookup(r . product)(0),r .rating))
通過上述函數(shù),我們可以獲得任意一個問題比如ID為7122697的How to make substring-matching query work fast on a large table?的至少4個標簽:
- recommend ( 7122697)。foreach(println)
推薦結果如下所示:
- (sqL,0.17745152481166354)
- (database,0.13526622226672633)
- (oracle , 0.1079428707621154)
- (ruby-on-rails, 0.06067207312463499)
- (postgresql,0.050933613169706474)
注意:
- -每次運行得到的結果不盡相同,是因為ALS是從隨機解開始迭代的
- -如果你希望獲得實時性更高的結果,可以在recommend前輸入tagHashes=tagHas hes. cache
真實的問題標簽是postgresql、query-optimi zation、substring 和text-search。不過,預測結果也有一定的合理性(postgresq 經(jīng)常和ruby-on-rails一起出現(xiàn))
當然,以上的示例還不夠優(yōu)雅和高效,但是,我希望所有來自R的分析師、鼓搗Python 的黑客和熟悉Hadoop的開發(fā)者,都能從中找到你們熟悉的部分,從而找到一條適合你們的路徑去探索Spark,并從中獲益。