成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming

大數(shù)據(jù) Spark
Spark是基于內存的大數(shù)據(jù)綜合處理引擎,具有優(yōu)秀的作業(yè)調度機制和快速的分布式計算能力,使其能夠更加高效地進行迭代計算,因此Spark能夠在一定程度上實現(xiàn)大數(shù)據(jù)的流式處理。

隨著信息技術的迅猛發(fā)展,數(shù)據(jù)量呈現(xiàn)出爆炸式增長趨勢,數(shù)據(jù)的種類與變化速度也遠遠超出人們的想象,因此人們對大數(shù)據(jù)處理提出了更高的要求,越來越多的領域迫切需要大數(shù)據(jù)技術來解決領域內的關鍵問題。在一些特定的領域中(例如金融、災害預警等),時間就是金錢、時間可能就是生命!然而傳統(tǒng)的批處理框架卻一直難以滿足這些領域中的實時性需求。為此,涌現(xiàn)出了一批如S4、Storm的流式計算框架。Spark是基于內存的大數(shù)據(jù)綜合處理引擎,具有優(yōu)秀的作業(yè)調度機制和快速的分布式計算能力,使其能夠更加高效地進行迭代計算,因此Spark能夠在一定程度上實現(xiàn)大數(shù)據(jù)的流式處理。

[[205927]]

Spark Streaming是Spark上的一個流式處理框架,可以面向海量數(shù)據(jù)實現(xiàn)高吞吐量、高容錯的實時計算。Spark Streaming支持多種類型數(shù)據(jù)源,包括Kafka、Flume、trwitter、zeroMQ、Kinesis以及TCP sockets等,如圖1所示。Spark Streaming實時接收數(shù)據(jù)流,并按照一定的時間間隔將連續(xù)的數(shù)據(jù)流拆分成一批批離散的數(shù)據(jù)集;然后應用諸如map、reducluce、join和window等豐富的API進行復雜的數(shù)據(jù)處理;最后提交給Spark引擎進行運算,得到批量結果數(shù)據(jù),因此其也被稱為準實時處理系統(tǒng)。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖1 Spark Streaming支持多種類型數(shù)據(jù)源

目前應用最廣泛的大數(shù)據(jù)流式處理框架是Storm。Spark Streaming 最低0.5~2s做一次處理(而Storm最快可達0.1s),在實時性和容錯方面不如Storm。然而Spark Streaming的集成性非常好,通過RDD不僅能夠與Spark上的所有組件無縫銜接共享數(shù)據(jù),還能非常容易地與Kafka、Flume等分布式日志收集框架進行集成;同時Spark Streaming的吞吐量非常高,遠遠優(yōu)于Storm的吞吐量,如圖2所示。所以雖然Spark Streaming的處理延遲高于Storm,但是在集成性與吞吐量方面的優(yōu)勢使其更適用于大數(shù)據(jù)背景。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖2 Spark Streaming與Storm吞吐量比較圖

Spark Streaming基礎概念

批處理時間間隔

在Spark Streaming中,對數(shù)據(jù)的采集是實時、逐條進行的,但是對數(shù)據(jù)的處理卻是分批進行的。因此,Spark Streaming需要設定一個時間間隔,將該時間間隔內采集到的數(shù)據(jù)統(tǒng)一進行處理,這個間隔稱為批處理時間間隔。

也就是說對于源源不斷的數(shù)據(jù),Spark Streaming是通過切分的方式,先將連續(xù)的數(shù)據(jù)流進行離散化處理。數(shù)據(jù)流每被切分一次,對應生成一個RDD,每個RDD都包含了一個時間間隔內所獲取到的所有數(shù)據(jù),因此數(shù)據(jù)流被轉換為由若干個RDD構成的有序集合,而批處理時間間隔決定了Spark Streaming需要多久對數(shù)據(jù)流切分一次。Spark Streaming是Spark上的組件,其獲取的數(shù)據(jù)和數(shù)據(jù)上的操作最終仍以Spark作業(yè)的形式在底層的Spark內核中進行計算,因此批處理時間間隔不僅影響數(shù)據(jù)處理的吞吐量,同時也決定了Spark Streaming向Spark提交作業(yè)的頻率和數(shù)據(jù)處理的延遲。需要注意的是,批處理時間間隔的設置會伴隨Spark Streaming應用程序的整個生命周期,無法在程序運行期間動態(tài)修改,所以需要綜合考慮實際應用場景中的數(shù)據(jù)流特點和集群的處理性能等多種因素進行設定。

窗口時間間隔

窗口時間間隔又稱為窗口長度,它是一個抽象的時間概念,決定了Spark Streaming對RDD序列進行處理的范圍與粒度,即用戶可以通過設置窗口長度來對一定時間范圍內的數(shù)據(jù)進行統(tǒng)計和分析。如果設批處理時間設為1s,窗口時間間隔為3s,如3圖所示,其中每個實心矩形表示Spark Streaming每1秒鐘切分出的一個RDD,若干個實心矩形塊表示一個以時間為序的RDD序列,而透明矩形框表示窗口時間間隔。易知窗口內RDD的數(shù)量最多為3個,即Spark Streming 每次最多對3個RDD中的數(shù)據(jù)進行統(tǒng)計和分析。對于窗口時間間隔還需要注意以下幾點:

  • 以圖3為例,在系統(tǒng)啟動后的前3s內,因進入窗口的RDD不足3個,但是隨著時間的推移,最終窗口將被填滿。
  • 不同窗口內所包含的RDD可能會有重疊,即當前窗口內的數(shù)據(jù)可能被其后續(xù)若干個窗口所包含,因此在一些應用場景中,對于已經處理過的數(shù)據(jù)不能立即刪除,以備后續(xù)計算使用。
  • 窗口時間間隔必須是批處理時間間隔的整數(shù)倍。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖3 窗口時間間隔示意圖

滑動時間間隔

滑動時間間隔決定了Spark Streaming對數(shù)據(jù)進行統(tǒng)計與分析的頻率,多出現(xiàn)在與窗口相關的操作中。滑動時間間隔是基于批處理時間間隔提出的,其必須是批處理時間間隔的整數(shù)倍。在默認的情況下滑動時間間隔設置為與批處理時間間隔相同的值。如果批處理時間間隔為1s,窗口間隔為3s,滑動時間間隔為2s,如圖4所示,其含義是每隔2s對過去3s內產生的3個RDD進行統(tǒng)計分析。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖4 滑動時間間隔、窗口時間間隔、批處理時間間隔綜合示意圖

DStream基本概念

DStream是Spark Streaming的一個基本抽象,它以離散化的RDD序列的形式近似描述了連續(xù)的數(shù)據(jù)流。DStream本質上是一個以時間為鍵,RDD為值的哈希表,保存了按時間順序產生的RDD,而每個RDD封裝了批處理時間間隔內獲取到的數(shù)據(jù)。Spark Streaming每次將新產生的RDD添加到哈希表中,而對于已經不再需要的RDD則會從這個哈希表中刪除,所以DStream也可以簡單地理解為以時間為鍵的RDD的動態(tài)序列。設批處理時間間隔為1s,圖5為4s內產生的DStream示意圖。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖5 DStream示意圖

Spark Streaming編程模式與案例分析

Spark Streaming編程模式

下面以Spark Streaming官方提供的WordCount代碼為例來介紹Spark Streaming的使用方式。

示例1:

 

  1. import org.apache.spark._ 
  2. import org.apache.spark.streaming._ 
  3. import org.apache.spark.streaming.StreamingContext._ 
  4. /*創(chuàng)建一個本地模式的StreamingContext,并設定master節(jié)點工作線程數(shù)為2,并以1秒作為批處理時間間隔。*/ 
  5. val conf = new SparkConf().setMaster("local[2]"). 
  6. setAppName("NetworkWordCount"
  7. val ssc = new StreamingContext(conf, Seconds(1)) 
  8. /*通過獲取”localhost”節(jié)點9999端口中的實時數(shù)據(jù)流創(chuàng)建DStream。*/ 
  9. val lines = ssc.socketTextStream("localhost", 9999) 
  10. /*以空格作為分割DStream中數(shù)據(jù)的依據(jù),使得每一行文本轉換為若干個單詞。*/ 
  11. val words = lines.flatMap(_.split(" ")) 
  12. import org.apache.spark.streaming.StreamingContext._ 
  13. /*對于words中的每個單詞word,轉換為相應的二元組形式(word,1),在此基礎上統(tǒng)計每個單詞出現(xiàn)的次數(shù)。*/ 
  14. val pairs = words.map(word => (word, 1)) 
  15. val wordCounts = pairs.reduceByKey(_ + _) 
  16. //輸出DStream中每個RDD中前10個元素。 
  17. wordCounts.print() 
  18. //啟動Spark Streaming應用程序。 
  19. ssc.start() 
  20. //等待計算完成。 
  21. ssc.awaitTermination() 

Spark Streaming應用程序在功能結構上通常包含以下五部分,如上述示例1所示。

  • 導入Spark Streaming相關包:Spark Streaming作為Spark框架上的一個組件,具有很好的集成性。在開發(fā)Spark Streaming應用程序時,只需導入Spark Streaming相關包,無需額外的參數(shù)配置。
  • 創(chuàng)建StreamingContext對象:同Spark應用程序中的SparkContext對象一樣, StreamingContext對象是Spark Streaming應用程序與集群進行交互的唯一通道,其中封裝了Spark集群的環(huán)境信息和應用程序的一些屬性信息。在該對象中通常需要指明應用程序的運行模式(示例1中設為local[2])、設定應用程序名稱(示例1中設為NetworkWordCount)、設定批處理時間間隔(示例1中設為Seconds(1)即1秒鐘),其中批處理時間間隔需要根據(jù)用戶的需求和集群的處理能力進行適當?shù)卦O置。
  • 創(chuàng)建InputDStream:Spark Streaming需要根據(jù)數(shù)據(jù)源類型選擇相應的創(chuàng)建DStream的方法。示例1中Spark Streaming通過StreamingContext對象調用socketTextStream方法處理以socket連接類型數(shù)據(jù)源,創(chuàng)建出DStream即lines。Spark Streaming同時支持多種不同的數(shù)據(jù)源類型,其中包括Kafka、Flume、HDFS/S3、Kinesis和Twitter等數(shù)據(jù)源。
  • 操作DStream:對于從數(shù)據(jù)源得到的DStream,用戶可以調用豐富的操作對其進行處理。示例1中針對lines的一系列操作就是一個典型的WordCount執(zhí)行流程:對于當前批處理時間間隔內的文本數(shù)據(jù)以空格進行切分,進而得到words;再將words中每個單詞轉換為二元組,進而得到pairs;最后利用reduceByKey方法進行統(tǒng)計。
  • 啟動與停止Spark Streaming應用程序:在啟動Spark Streaming應用程序之前,DStream上所有的操作僅僅是定義了數(shù)據(jù)的處理流程,程序并沒有真正連接上數(shù)據(jù)源,也沒有對數(shù)據(jù)進行任何操作,當ssc.start()啟動后程序中定義的操作才會真正開始執(zhí)行。

文本文件數(shù)據(jù)處理案例

功能需求

實時監(jiān)聽并獲取本地home/dong/Streamingtext目錄中新生成的文件(文件均為英文文本文件,單詞之間使用空格進行間隔),并對文件中各單詞出現(xiàn)的次數(shù)進行統(tǒng)計。

代碼實現(xiàn)

 

  1. package dong.spark 
  2. import org.apache.spark.SparkConf 
  3. import org.apache.spark.streaming.{Seconds,StreamingContext} 
  4. import org.apache.spark.streaming.StreamingContext._ 
  5. object StreamingFileWordCount { 
  6. def main(args: Array[String]): Unit ={ 
  7. //以local模式運行,并設定master節(jié)點工作線程數(shù)為2。 
  8. val sparkConf = new SparkConf(). 
  9. setAppName("StreamingFileWordCount"). 
  10. setMaster("local[2]"
  11. /*創(chuàng)建StreamingContext實例,設定批處理時間間隔為20秒。*/ 
  12. val ssc = new StreamingContext(sparkConf,Seconds(20)) 
  13. /*指定數(shù)據(jù)源來自本地home/dong/Streamingtext。*/ 
  14. val lines = ssc.textFileStream("/home/dong/Streamingtext"
  15. /*在每個批處理時間間隔內,對指定文件夾中變化的數(shù)據(jù)進行單詞統(tǒng)計并打印。*/ 
  16. val words= lines.flatMap(_.split(" ")) 
  17. val wordcounts=words.map(x=>(x,1)).reduceByKey(_+_) 
  18. wordcounts.print() 
  19. ssc.start() 
  20. ssc.awaitTermination() 

運行演示

第1步,啟動Hadoop與Spark。

 

  1. $ start-all.sh  
  2. $ cd spark-1.4.0-bin-hadoop2.4  
  3. $ sbin/start-all.sh 

第2步,創(chuàng)建Streaming監(jiān)控目錄。

  1. $ mkdir /home/dong/Streamingtext 

在dong用戶主目錄下創(chuàng)建Streamingtext為Spark Streaming監(jiān)控的目錄,創(chuàng)建后如圖6所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖6 dong用戶主目錄下創(chuàng)建Streamingtext文件夾

第3步,在IntelliJ IDEA中編輯運行Streaming程序。在IntelliJ IDEA中創(chuàng)建工程StreamingFileWordCount,編輯對象StreamingFileWordCount,如圖7所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖7 IntelliJ IDEA中StreamingFileWordCount示意圖

由于該示例沒有輸入參數(shù),因此不需要配置參數(shù),可直接單擊右鍵->單擊"Run‘StreamingFileWordCount’ "。

第4步,在監(jiān)聽目錄下創(chuàng)建文本文件。在master節(jié)點上的/home/dong/Streamingtext中分別創(chuàng)建file1.txt與file2.txt。

file1.txt內容如下:

  • aa
  • bb

file2.txt內容如下:

  • ee
  • dd
  • cc

創(chuàng)建后,/home/dong/Streamingtext中內容如圖8所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖8 Streamingtext文件夾內容示意圖

查看結果

終端窗口輸出了每個批處理時間間隔(20秒)內,/home/dong/Streamingtext中新生成文件所包含的各單詞個數(shù),如圖9所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖9 StreamingFileWordCount運行結果示意圖

網絡數(shù)據(jù)處理案例

功能需求

監(jiān)聽本地節(jié)點指定端口傳輸?shù)臄?shù)據(jù)流(本案例為master節(jié)點9999端口的英文文本數(shù)據(jù),以逗號間隔單詞),每5秒統(tǒng)計一次該時間間隔內收集到的各單詞的個數(shù)。

代碼實現(xiàn)

本案例涉及數(shù)據(jù)流模擬器和分析器兩部分。為了更接近真實的網絡環(huán)境,首先定義數(shù)據(jù)流模擬器,該模擬器以Socket方式監(jiān)聽網絡中指定節(jié)點上的指定端口號(master節(jié)點9999端口),當外部程序通過該端口連接并請求數(shù)據(jù)時,數(shù)據(jù)流模擬器將定時地從指定文本文件中隨機選取數(shù)據(jù)發(fā)送至指定端口(每間隔1秒鐘數(shù)據(jù)流模擬器從master節(jié)點上的/home/dong/Streamingtext/file1.txt中隨機截取一行文本發(fā)送給master節(jié)點的9999端口),通過這種方式模擬網絡環(huán)境下源源不斷的數(shù)據(jù)流。針對獲取到的實時數(shù)據(jù),再定義分析器(Spark Streaming應用程序),用以統(tǒng)計時間間隔(5秒)內收集到的單詞個數(shù)。

數(shù)據(jù)流模擬器代碼實現(xiàn)如下:

 

  1. package dong.spark 
  2. import java.io.{PrintWriter} 
  3. import java.net.ServerSocket 
  4. import scala.io.Source 
  5. objectSocketSimulation { 
  6. //定義隨機獲取整數(shù)的方法。 
  7. def index(length: Int)={ 
  8. import java.util.Random 
  9. val rdm = new Random 
  10. rdm.nextInt(length) 
  11. def main(args:Array[String]): Unit ={ 
  12. if(args.length!=3){ 
  13. /*調用數(shù)據(jù)流模擬器需要三個參數(shù):文件路徑、端口號和批處理時間間隔時間(單位:毫秒)。*/ 
  14. System.err.println("Usage:<filename><port><millisecond>"
  15. System.exit(1) 
  16. //獲取指定文件總的行數(shù)。 
  17. val filename = args(0) 
  18. val lines = Source.fromFile(filename).getLines().toList 
  19. val filerow=lines.length 
  20. //指定監(jiān)聽參數(shù)args(1)指定的端口,當外部程序請求時建立連接。 
  21. val listener =new ServerSocket(args(1).toInt) 
  22. while(true){ 
  23. val socket = listener.accept() 
  24. new Thread(){ 
  25. override def run={ 
  26. println("Got client connected from:"+socket.getInetAddress) 
  27. val out = new PrintWriter(socket.getOutputStream(),true
  28. while(true){ 
  29. Thread.sleep(args(2).toLong) 
  30. //當該端口接受請求時,隨機獲取某行數(shù)據(jù)發(fā)送給對方。 
  31. val content= lines(index(filerow)) 
  32. println(content) 
  33. out.write(content+'\n'
  34. out.flush() 
  35. socket.close() 
  36. }.start() 

分析器代碼如下:

 

  1. package dong.spark 
  2. import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext} 
  3. import org.apache.spark.streaming.StreamingContext._ 
  4. import org.apache.spark.storage.StorageLevel 
  5. object NetworkWordCount { 
  6. def main (args:Array[String]) ={ 
  7. //以local模式運行,并設定master節(jié)點工作線程數(shù)為2。 
  8. val conf=new SparkConf().setAppName("NetworkWordCount"). 
  9. setMaster("local[2]"
  10. val sc=new SparkContext(conf) 
  11. val ssc=new StreamingContext(sc, Seconds(5)) 
  12. /*通過socketTextStream獲取指定節(jié)點指定端口的數(shù)據(jù)創(chuàng)建DStream,并保存在內存和硬盤中,其中節(jié)點與端口分別對應參數(shù)args(0)和args(1)。*/ 
  13.  
  14. val lines=ssc.socketTextStream(args(0), 
  15. args(1).toInt, 
  16. StorageLevel.MEMORY_AND_DISK_SER) 
  17. //在每個批處理時間間隔內對獲取到的數(shù)據(jù)進行單詞統(tǒng)計并且打印。 
  18. val words= lines.flatMap(_.split(",")) 
  19. val wordcounts = words.map(x=>(x,1)).reduceByKey(_+_) 
  20. wordcounts.print() 
  21. ssc.start() 
  22. ssc.awaitTermination() 

運行演示

第1步,在IntelliJ IDEA中編輯運行Streaming程序。master節(jié)點啟動IntelliJ IDEA,創(chuàng)建工程NetworkWordCount,編輯模擬器與分析器。模擬器如圖10所示,分析器如圖11所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖10 IntelliJ IDEA中數(shù)據(jù)流模擬器示意圖

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖11 IntelliJ IDEA中分析器示意圖

第2步,創(chuàng)建模擬器數(shù)據(jù)源文件。在master節(jié)點創(chuàng)建/home/dong/Streamingtext目錄,在其中創(chuàng)建文本文件file1.txt。

file1.txt內容如下:

  • spark,
  • hello,
  • hbase,
  • world,

第3步,打包數(shù)據(jù)流模擬器。打包過程詳見本書4.3.3節(jié)。在Artifacts打包配置界面中,根據(jù)用戶實際scala安裝目錄,在Class Path中添加下述scala依賴包,如圖12所示。

 

  1. /usr/scala-2.10.4/lib/scala-swing.jar 
  2. /usr/scala-2.10.4/lib/scala-library.jar 
  3. /usr/scala-2.10.4/lib/scala-actors.jar 

 

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖12 在Class Path中添加scala依賴包

打包后在主目錄下生成NetworkWordCount.jar,如圖13所示。

圖13 在dong用戶主目錄下生成NetworkWordCount.jar示意圖

第4步,啟動數(shù)據(jù)流模擬器。在master節(jié)點開啟控制終端,通過下面代碼啟動數(shù)據(jù)流模擬器。

  1. $ java -cp /home/dong/NetworkWordCount.jar dong.spark.SocketSimulation/ home/dong/Streamingtest/file1.txt 9999 1000 

數(shù)據(jù)流模擬器每間隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機截取一行文本發(fā)送給master節(jié)點的9999端口。在分析器未連接時,數(shù)據(jù)流模擬器處于阻塞狀態(tài),終端不會顯示輸出的文本。

第5步,運行分析器。在master上啟動IntelliJ IDEA編寫分析器代碼,然后單擊菜單"Build->"Build Artifacts",通過Application選項配置分析器運行所需的參數(shù),其中Socket主機名為master、端口號為9999,參數(shù)之間用空格間隔,如圖13所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖13 分析器參數(shù)配置示意圖

配置好參數(shù)后返回IntelliJ IDEA菜單欄,單擊"Run"->"Build Artifacts"運行分析器。

查看結果

第1步,在master上查看數(shù)據(jù)流模擬器運行情況。IntelliJ IDEA運行分析器從而與數(shù)據(jù)流模擬器建立連接。當檢測到外部連接時,數(shù)據(jù)流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機截取一行文本發(fā)送給master節(jié)點上的9999端口。為方便講解和說明,file1.txt中每一行只包含一個單詞,因此數(shù)據(jù)流模擬器每次僅發(fā)送一個單詞給端口,如圖14所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖14 在master上模擬器運行結果

第2步,在master的IntelliJ IDEA中查看分析器運行情況。在IntelliJ IDEA的運行日志窗口中,可以觀察到統(tǒng)計結果。通過分析可知Spark Streaming每個批處理時間間隔內獲取的單詞數(shù)為5,剛好是5秒內發(fā)送單詞的總數(shù),并對各單詞進行了統(tǒng)計,如圖15所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖15 IntelliJ IDEA中分析器運行結果

stateful應用案例

在很多數(shù)據(jù)流相關的實際應用場景中,對當前數(shù)據(jù)的統(tǒng)計分析需要借助于先前的數(shù)據(jù)處理結果完成。例如電商每間隔10分鐘統(tǒng)計某一商品當前累計銷售總額、車站每隔3小時統(tǒng)計當前客流總量,等等。此類應用需求可借助于Spark Streaming的有狀態(tài)轉換操作實現(xiàn)。

功能需求

監(jiān)聽網絡中某節(jié)點上指定端口傳輸?shù)臄?shù)據(jù)流(slave1節(jié)點9999端口的英文文本數(shù)據(jù),以逗號間隔單詞),每5秒分別統(tǒng)計各單詞的累計出現(xiàn)次數(shù)。

代碼實現(xiàn)

本案例功能的實現(xiàn)涉及數(shù)據(jù)流模擬器和分析器兩部分。

分析器代碼:

 

  1. package dong.spark 
  2. import org.apache.spark.{SparkContext, SparkConf} 
  3. import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext} 
  4. import org.apache.spark.streaming.StreamingContext._ 
  5. object StatefulWordCount { 
  6. def main(args:Array[String]): Unit ={ 
  7. /*定義更新狀態(tài)方法,參數(shù)values為當前批處理時間間隔內各單詞出現(xiàn)的次數(shù),state為以往所有批次各單詞累計出現(xiàn)次數(shù)。*/ 
  8. val updateFunc=(values: Seq[Int],state:Option[Int])=>{ 
  9. val currentCount=values.foldLeft(0)(_+_) 
  10. val previousCount=state.getOrElse(0) 
  11. Some(currentCount+previousCount) 
  12. val conf=new SparkConf(). 
  13. setAppName("StatefulWordCount"). 
  14.  
  15. setMaster("spark://192.168.149.132:7077"
  16. val sc=new SparkContext(conf) 
  17. //創(chuàng)建StreamingContext,Spark Steaming運行時間間隔為5秒。 
  18. val ssc=new StreamingContext(sc, Seconds(5)) 
  19. /*使用updateStateByKey時需要checkpoint持久化接收到的數(shù)據(jù)。在集群模式下運行時,需要將持久化目錄設為HDFS上的目錄。*/ 
  20. ssc.checkpoint("hdfs://master:9000/user/dong/input/StatefulWordCountlog"
  21. /*通過Socket獲取指定節(jié)點指定端口的數(shù)據(jù)創(chuàng)建DStream,其中節(jié)點與端口分別由參數(shù)args(0)和args(1)給出。*/ 
  22. val lines=ssc.socketTextStream(args(0),args(1).toInt) 
  23. val words=lines.flatMap(_.split(",")) 
  24. val wordcounts=words.map(x=>(x,1)) 
  25. //使用updateStateByKey來更新狀態(tài),統(tǒng)計從運行開始以來單詞總的次數(shù)。 
  26. val stateDstream=wordcounts.updateStateByKey[Int](updateFunc) 
  27. stateDstream.print() 
  28. ssc.start() 
  29. ssc.awaitTermination() 

運行演示

第1步,slave1節(jié)點啟動數(shù)據(jù)流模擬器。

第2步,打包分析器。master節(jié)點啟動IntelliJ IDEA創(chuàng)建工程StatefulWordCount編輯分析器,如圖16所示,并將分析器直接打包至master節(jié)點dong用戶的主目錄下,如圖17所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖16 IntelliJ IDEA中StatefulWordCount示意圖

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖17 master上的StatefulWordCount.jar示意圖

第3步,運行分析器。在master節(jié)點開啟終端,通過下面代碼向Spark集群提交應用程序。

  1. $ bin/spark-submit ~/StatefulWordCount.jar slave1 9999 

查看結果

第1步,查看slave1上數(shù)據(jù)流模擬器運行情況。分析器在集群上提交運行后與slave1上運行的數(shù)據(jù)流模擬器建立連接。當檢測到外部連接時,數(shù)據(jù)流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機截取一行文本發(fā)送給slave1節(jié)點上的9999端口。由于該文本文件中每一行只包含一個單詞,因此每秒僅發(fā)送一個單詞給端口。如圖18所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖18 slave1上數(shù)據(jù)流模擬器運行示意圖

第2步,查看master上分析器運行情況。在master節(jié)點的提交窗口中可以查看到統(tǒng)計結果,如圖19所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖19 master上分析器運行示意圖

圖中表明截至147920770500ms分析器共接收到14個單詞,其中"spark"累計出現(xiàn)3次,"hbase"累計出現(xiàn)5次,"hello"累計出現(xiàn)3次,"world"累計出現(xiàn)3次。由于批處理時間間隔是5s,模擬器每1秒發(fā)送1個單詞,使得分析器在5s內共接收到5個單詞,因此截止至147920771000ms,分析器共收到19個單詞,其中"spark"累計出現(xiàn)5次,"hbase"累計出現(xiàn)7次,"hello"累計出現(xiàn)4次,"world"累計出現(xiàn)3次。

第3步,查看HDFS中持久化目錄。運行后查看HDFS上的持久化目錄/user/dong/input/StatefulWordCountlog,如圖20所示。Streaming應用程序將接收到的網絡數(shù)據(jù)持久化至該目錄下,便于容錯處理。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖20 HDFS上持久化目錄示意圖

window應用案例

在實際生產環(huán)境中,與窗口相關的應用場景很常見,例如電商每間隔10分鐘小時統(tǒng)計某一商品前30分鐘內累計銷售總額、車站每隔1小時統(tǒng)計前3個小時內的客流量等,此類需求可借助Spark Streaming中的window相關操作實現(xiàn)。window應用案例同時涉及批處理時間間隔、窗口時間間隔與滑動時間間隔。

功能需求

監(jiān)聽網絡中某節(jié)點上指定端口傳輸?shù)臄?shù)據(jù)流(slave1節(jié)點上9999端口的英文文本數(shù)據(jù),以逗號間隔單詞),每10秒統(tǒng)計前30秒各單詞累計出現(xiàn)的次數(shù)。

代碼實現(xiàn)

本例功能的實現(xiàn)涉及數(shù)據(jù)流模擬器和分析器兩部分。

分析器代碼:

 

  1. package dong.spark 
  2. import org.apache.spark.{SparkContext, SparkConf} 
  3. import org.apache.spark.streaming.StreamingContext._ 
  4. import org.apache.spark.streaming._ 
  5. import org.apache.spark.storage.StorageLevel 
  6. object WindowWordCount { 
  7. def main(args:Array[String]) ={ 
  8. val conf=new SparkConf().setAppName("WindowWordCount"). 
  9. setMaster("spark://192.168.149.132:7077"
  10. val sc=new SparkContext(conf) 
  11. val ssc=new StreamingContext(sc, Seconds(5)) 
  12. ssc.checkpoint("hdfs://master:9000/user/dong/WindowWordCountlog"
  13. val lines=ssc.socketTextStream( args(0), 
  14. args(1).toInt, 
  15. StorageLevel.MEMORY_ONLY_SER) 
  16. val words= lines.flatMap(_.split(",")) 
  17. /*采用reduceByKeyAndWindow操作進行疊加處理,窗口時間間隔與滑動時間間隔分別由參數(shù)args(2)和args(3)給出。*/ 
  18. val wordcounts=words.map(x=>(x,1)). 
  19. reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(ar 
  20. gs(2).toInt),Seconds(args(3).toInt)) 
  21. wordcounts.print() 
  22. ssc.start() 
  23. ssc.awaitTermination() 

運行演示

第1步,slave1節(jié)點啟動數(shù)據(jù)流模擬器。

第2步,打包分析器。在master節(jié)點啟動IntelliJ IDEA創(chuàng)建工程WindowWordCount編輯分析器,如圖21,并將分析器直接打包至master節(jié)點dong用戶的主目錄下,如圖22所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖21 IntelliJ IDEA中WindowWordCount示意圖

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖22 master上WindowWordCount.jar示意圖

第3步,運行分析器。在master節(jié)點開啟終端,通過下面代碼向Spark集群提交應用程序。

  1. $ bin/spark-submit ~/WindowWordCount.jar slave1 9999 30 10 

查看結果

第1步 在slave1上查看數(shù)據(jù)流模擬器運行情況。分析器在集群上提交運行后與slave1上運行的數(shù)據(jù)流模擬器建立連接。當檢測到外部連接時,數(shù)據(jù)流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機截取一行文本發(fā)送給slave1節(jié)點的9999端口。由于該文本文件中每一行只包含一個單詞和一個逗號,因此每秒僅發(fā)送一個單詞和一個逗號給端口,如圖23所示。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖23 在slave1上數(shù)據(jù)流模擬器運行示意圖

第2步,在master上查看分析器運行情況。在master節(jié)點的提交窗口中可以查看到統(tǒng)計結果。在WindowWordCount應用程序啟動初期,窗口并沒有被接收到的單詞填滿,但隨著時間的推進,每個窗口中的單詞數(shù)目最終固定為30個。圖7.35只是截取了運行結果中的三個批次。由于設定了窗口時間間隔是30s,滑動時間間隔是10s,且數(shù)據(jù)流模擬器每間隔1s發(fā)送一個單詞,因此WindowWordCount每間隔10s對過去30s內收到的各單詞個數(shù)進行統(tǒng)計。圖24中截至1479276925000ms分析器對過去30s內收到的30個單詞進行統(tǒng)計,其中"spark"累計出現(xiàn)5次,"hbase"累計出現(xiàn)8次,"hello"累計出現(xiàn)9次,"world"累計出現(xiàn)8次。再間隔10s,截至1479276935000ms,分析器對過去30s內收到的30個單詞進行統(tǒng)計,其中"spark"累計出現(xiàn)8次,"hbase"累計出現(xiàn)9次,"hello"累計出現(xiàn)7次,"world"累計出現(xiàn)6次。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖24 在master上分析器運行示意圖

第3步,查看持久化數(shù)據(jù)。運行后查看HDFS上的持久化目錄/user/dong/input/WindowWordCountlog,如圖25所示。Streaming應用程序將接收到的網絡數(shù)據(jù)持久化至該目錄下,便于容錯處理。

大數(shù)據(jù)分析技術與實戰(zhàn)之Spark Streaming
圖25 HDFS上持久化目錄示意圖

性能考量

在開發(fā)Spark Streaming應用程序時,要結合集群中各節(jié)點的配置情況盡可能地提高數(shù)據(jù)處理的實時性。在調優(yōu)的過程中,一方面要盡可能利用集群資源來減少每個批處理的時間;另一方面要確保接收到的數(shù)據(jù)能及時處理掉。

運行時間優(yōu)化

設置合理的批處理時間和窗口大小

Spark Streaming中作業(yè)之間通常存在依賴關系,后面的作業(yè)必須確保前面的作業(yè)執(zhí)行結束后才能提交,若前面的作業(yè)的執(zhí)行時間超過了設置的批處理時間間隔,那么后續(xù)的作業(yè)將無法按時提交執(zhí)行,造成作業(yè)的堵塞。也就是說若想Spark Streaming應用程序穩(wěn)定地在集群中運行,對于接收到的數(shù)據(jù)必須盡快處理掉。例如若設定批處理時間為1秒鐘,那么系統(tǒng)每1秒鐘生成一個RDD,如果系統(tǒng)計算一個RDD的時間大于1秒,那么當前的RDD還沒來得及處理,后續(xù)的RDD已經提交上來在等待處理了,這就產生了堵塞。因此需要設置一個合理的批處理時間間隔以確保作業(yè)能夠在這個批處理時間間隔時間內結束。許多實驗數(shù)據(jù)表明,500毫秒對大多Spark Streaming應用而言是較好的批處理時間間隔。

類似地,對于窗口操作,滑動時間間隔對于性能也有很大的影響。當單批次數(shù)據(jù)計算代價過高時,可以考慮適當增大滑動時間間隔。

對于批處理時間和窗口大小的設定,并沒有統(tǒng)一的標準。通常是先從一個比較大的批處理時間(10秒左右)開始,然后不斷地使用更小的值進行對比測試。如果Spark Streaming用戶界面中顯示的處理時間保持不變,則可以進一步設定更小的值;如果處理時間開始增加,則可能已經達到了應用的極限,再減小該值則可能會影響系統(tǒng)的性能。

提高并行度

提高并行度也是一種減少批處理所消耗時間的常見方法。有以下三種方式可以提高并行度。一種方法是增加接收器數(shù)目。如果獲取的數(shù)據(jù)太多,則可能導致單個節(jié)點來不及對數(shù)據(jù)進行讀入與分發(fā),使得接收器成為系統(tǒng)瓶頸。這時可以通過創(chuàng)建多個輸入DStream來增加接收器數(shù)目,然后再使用union來把數(shù)據(jù)合并為一個數(shù)據(jù)源。第二種方法是將收到的數(shù)據(jù)顯式地重新分區(qū)。如果接收器數(shù)目無法再增加,可以通過使用DStream.repartition、spark.streaming.blocklnterval等參數(shù)顯式地對Dstream進行重新分區(qū)。第三種方法是提高聚合計算的并行度。對于會導致shuffle的操作,例如reduceByKey、reduceByKeyAndWindow等操作,可通過顯示設置更高的行度參數(shù)確保更為充分地使用集群資源。

內存使用與垃圾回收

控制批處理時間間隔內的數(shù)據(jù)量

Spark Streaming會把批處理時間間隔內獲取到的所有數(shù)據(jù)存放在Spark內部可用的內存中。因此必須確保在當前節(jié)點上SparkStreaming可用的內存容量至少能容下一個批處理時間間隔內所有的數(shù)據(jù)。比如一個批處理時間間隔是1秒,但是1秒產生了1GB的數(shù)據(jù),那么要確保當前的節(jié)點上至少有可供SparkStreaming使用的1GB內存。

及時清理不再使用的數(shù)據(jù)

對于內存中處理過的、不再需要的數(shù)據(jù)應及時清理,以確保Spark Streaming能夠擁有足夠的內存空間可以使用。一種方法是可以通過設置合理的spark.cleaner.ttl時長來及時清理超時的無用數(shù)據(jù),但該方法應慎重使用,以免后續(xù)數(shù)據(jù)在需要時被錯誤清理。另一種方法是將spark.streaming.unpersist設置為true,系統(tǒng)將自動清理已經不需要的RDD。該方法能顯著減少RDD對內存的需要,同時潛在地提高GC的性能。此外用戶還可以通過配置參數(shù)streamingContext.remember為數(shù)據(jù)設置更長的保留時間。

減少序列化與反序列化的負擔

SparkStreaming默認將接收到的數(shù)據(jù)序列化后放入內存,以減少內存使用。序列化和反序列化需要更多的CPU資源,因此使用適當?shù)男蛄谢ぞ?例如Kryo)和自定義的序列化接口可以更高效地使用CPU。除了使用更好的序列化工具外還可以結合壓縮機制,通過配置spark.rdd.compress,以CPU的時間開銷來換取內存資源,降低GC開銷。

責任編輯:未麗燕 來源: 網絡大數(shù)據(jù)
相關推薦

2017-04-28 08:13:08

大數(shù)據(jù)框架HDFS

2015-08-14 10:28:09

大數(shù)據(jù)

2017-09-18 17:59:23

Hadoop數(shù)據(jù)分析

2015-08-25 10:32:07

健康大數(shù)據(jù)

2012-11-30 14:49:58

IBMGartnerHadoop

2021-01-27 09:18:50

大數(shù)據(jù)數(shù)據(jù)收集大數(shù)據(jù)分析

2021-04-08 10:45:37

大數(shù)據(jù)技術安全

2017-01-23 13:34:44

2012-11-27 09:46:36

大數(shù)據(jù)運算云計算

2022-08-03 14:30:52

大數(shù)據(jù)數(shù)據(jù)分析數(shù)據(jù)收集

2020-09-17 20:36:46

大數(shù)據(jù)架構技術

2015-06-17 14:39:23

大數(shù)據(jù)大數(shù)據(jù)分析

2015-08-19 13:50:19

數(shù)據(jù)分析

2014-03-19 13:50:53

大數(shù)據(jù)分析云技術

2017-10-13 10:36:33

SparkSpark-Strea關系

2014-03-27 09:36:36

Spark

2015-04-03 11:19:21

大數(shù)據(jù)大數(shù)據(jù)分析師

2013-03-11 17:37:36

大數(shù)據(jù)

2023-02-26 00:12:10

Hadoop數(shù)據(jù)湖存儲

2015-08-11 15:52:52

大數(shù)據(jù)數(shù)據(jù)分析
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 性色网站| 亚洲最大看片网站 | 亚洲久久一区 | 亚洲黄色一级 | 一级在线| 日韩欧美精品一区 | 日韩精品一区二区三区中文字幕 | a精品视频| 91成人精品视频 | 成人在线免费观看 | 成人网在线观看 | 在线观看免费观看在线91 | 视频一区二区在线观看 | 精品一区二区免费视频 | 欧美视频三区 | www.一级片| 成人午夜在线 | www.日韩av.com | 97人人澡人人爽91综合色 | 自拍偷拍亚洲一区 | 亚洲福利在线观看 | 在线亚洲欧美 | 日韩在线三级 | 国产一二三区精品视频 | 亚州一区二区三区 | wwwww在线观看 | 国色天香综合网 | 亚洲视频在线免费观看 | 国产欧美一区二区三区在线播放 | 日韩一区二区在线视频 | 在线观看的av| 国产精品视频免费播放 | 日韩av成人| 97在线观看| 亚洲人精品午夜 | 精品综合在线 | 久久成 | www.成人免费视频 | 久久久蜜臀国产一区二区 | 午夜电影网站 | 在线观看av免费 |