成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Spark Streaming原理剖析

企業動態 Spark
在“1.初始化與集群上分布接收器”中介紹了,receiver集合轉換為RDD在集群上分布式地接收數據流。那么每個receiver是怎樣接收并處理數據流的呢?Spark Streaming數據接收與轉化的示意圖如圖8-14所示。

 1.初始化與集群上分布接收器 圖8-12所示為Spark Streaming執行模型從中可看到數據接收及組件間的通信。  

 

 

初始化的過程主要可以概括為以下兩點。

1)調度器的初始化。

2)將輸入流的接收器轉化為RDD在集群打散,然后啟動接收器集合中的每個接收器。

下面通過具體的代碼更深入地理解這個過程。

(1)NetworkWordCount示例 本例以NetworkWordCount作為研究Spark Streaming的入口程序。

  1. object NetworkWordCount {    
  2.     def main(args: Array[String]) {      
  3.         if (args.length < 2) {        
  4.             System.err.println("Usage: NetworkWordCount <hostname> <port>"))       
  5.              System.exit(1)     
  6.         }      
  7.         StreamingExamples.setStreamingLogLevels()  
  8.         val sparkConf = new SparkConf().setAppName("NetworkWordCount")  
  9.         /*創建StreamingContext對象,形成整個程序的上下文*/ 
  10.         val ssc = new StreamingContext(sparkConf, Seconds(1)) 
  11.         /*通過socketTextStream接收源源不斷地socket文本流*/ 
  12.         val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)   
  13.         val words = lines.flatMap(_.split(" "))      
  14.         val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)      
  15.         wordCounts.print()    
  16.         ssc.start()    
  17.         ssc.awaitTermination() 
  18.     }  
  19.  

(2)進入scoketTextStream

  1. def socketTextStream(hostname:String,port:Int,storageLevel:StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2):ReceiverInputDStream[String] = {  
  2. /*內部實際調用的socketStream方法 */ 
  3. socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) 
  4. }     
  5. /*進入socketStream方法 */   
  6. def socketStream[T: ClassTag](hostname:String, port:Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel  ): ReceiverInputDStream[T] = {  
  7. /*此處初始化SocketInputDStream對象 */     
  8. new SocketInputDStream[T](this, hostname, port, converter, storageLevel)    
  9.  

(3)初始化SocketInputDStream 在之前的Spark Streaming介紹中,讀者已經了解到整個Spark Streaming的調度靈魂就是DStream的DAG,可以將這個DStream DAG類比Spark中的RDD DAG,而DStream類比RDD,DStream可以理解為包含各個時間段的一個RDD集合。SocketInputDStream就是一個DStream。

  1. private[streaming] class SocketInputDStream[T: ClassTag](     
  2. @transient ssc_ : StreamingContext,host:String,port:Int, bytesToObjects:InputStream => Iterator[T],storageLevel:StorageLevel)extends ReceiverInputDStream[T](ssc_) {    
  3.     def getReceiver(): Receiver[T] = {     
  4.         new SocketReceiver(host,port,bytesToObjects,storageLevel)    
  5.     }  
  6.  

(4)觸發StreamingContext中的Start()方法上面的步驟基本完成了Spark Streaming的初始化工作。類似于Spark機制,Spark Streaming也是延遲(Lazy)觸發的,只有調用了start()方法,才真正地執行了。

  1. private[streaming] val scheduler = new JobScheduler(this)    
  2. /*StreamingContext中維持著一個調度器*/   
  3. def start(): Unit = synchronized { 
  4.     ……  
  5.     /*啟動調度器*/     
  6.     scheduler.start()    
  7.     ……    
  8.  

(5)JobScheduler.start()啟動調度器在start方法中初始化了很多重要的組件。

  1. def start(): Unit = synchronized {     
  2.     ……  
  3.     /*初始化事件處理Actor,當有消息傳遞給Actor時,調用processEvent進行事件處理*/      
  4.     eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {         
  5.         def receive = {           
  6.             case event: JobSchedulerEvent => processEvent(event)        
  7.         }   
  8.     }), "JobScheduler")   
  9.     /*啟動監聽總線*/  
  10.     listenerBus.start()      
  11.     receiverTracker = new ReceiverTracker(ssc)   
  12.     /*啟動接收器的監聽器receiverTracker*/     
  13.     receiverTracker.start()   
  14.     /*啟動job生成器*/     
  15.     jobGenerator.start()    
  16.      ……      
  17.  

(6)ReceiverTracker類

  1. /*進入ReceiverTracker查看*/ 
  2. private[streaming] class ReceiverTracker(ssc: StreamingContext) extends Logging {   
  3.     val receiverInputStreams = ssc.graph.getReceiverInputStreams()    
  4.     def start() = synchronized {  
  5.         ……  
  6.         val receiverExecutor = new ReceiverLauncher()    
  7.         ……  
  8.         if (!receiverInputStreams.isEmpty) {  
  9.             /*初始化ReceiverTrackerActor */       
  10.             actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker"
  11.             /*啟動ReceiverLauncher()實例,(7)中進行介紹*/       
  12.             receiverExecutor.start()    
  13.             ……      
  14.         }    
  15.     }  
  16. /*讀者可以先參考ReceiverTrackerActor的代碼查看實現注冊Receiver和注冊Block元數據信息的功能。 */   
  17. private class ReceiverTrackerActor extends Actor {  
  18.     def receive = {  
  19.         /*接收注冊receiver的消息,每個receiver就是一個輸入流接收器,Receiver分布在Worker節點,一個Receiver接收一個輸入流,一個Spark Streaming集群可以有多個輸入流 */      
  20.         case RegisterReceiver(streamId, typ, host, receiverActor) => registerReceiver(streamId, typ, host, receiverActor, sender)          
  21.         sender ! true case AddBlock(receivedBlockInfo) => addBlocks(receivedBlockInfo)        
  22.         ……      
  23.     }    
  24.  

(7)receivelauncher類,在集群上分布式啟動接收器

  1. class ReceiverLauncher {     
  2.     ……      
  3.     @transient val thread  = new Thread() {        
  4.         override def run() {        
  5.         ……  
  6.         /*啟動ReceiverTrackerActor已經注冊的Receiver*/         
  7.         startReceivers()        
  8.         ……     
  9.         }  
  10.     } 
  11.  

下面進入startReceivers方法,方法中將Receiver集合轉變為RDD,從而在集群上打散,分布式分布。如圖8-13所示,一個集群可以分布式地在不同的Worker節點接收輸入數據流。   

 

  1. private def startReceivers() {  
  2.     /*獲取之前配置的接收器 */      
  3.     val receivers = receiverInputStreams.map(nis => {          
  4.         val rcvr = nis.getReceiver()          
  5.         rcvr.setReceiverId(nis.id)          
  6.         cvr       
  7.     })        
  8.     ……        
  9.     /* 創建并行的在不同Worker節點分布的receiver集合 */       
  10.     val tempRDD = if (hasLocationPreferences) {           
  11.     val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))           
  12.     ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)          
  13.         } else {  
  14.             /*在這里創造RDD相當于進入SparkContext.makeRDD,此經典之處在于將receivers集合作為一個RDD [Receiver]進行分區。即使只有一個輸入流,按照分布式分區方式,也是將輸入分布在Worker端,而不在Master*/         
  15.             ssc.sc.makeRDD(receivers, receivers.size)  
  16.             /*調用Sparkcontext中的makeRDD方法,本質是調用將數據分布式化的方法parallelize*/ 
  17.             /* def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): //RDD[T] = { parallelize(seq, numSlices) */ 
  18.            /*在RDD[Receiver[_]]每個分區的每個Receiver 上都同時啟動,這樣其實Spark Streaming可以構建大量的分布式輸入流 */       
  19.            val startReceiver = (iterator: Iterator[Receiver[_]]) => {         
  20.                if (!iterator.hasNext) { 
  21.                    throw new SparkException( "Could not start receiver as object not found.")          
  22.            }          
  23.            val receiver = iterator.next()  
  24.            /*此處的supervisorImpl是一個監督者的角色,在下面的內容中將會剖析這個對象的作用 */        
  25.            val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)         
  26.            executor.start()         
  27.            executor.awaitTermination()       
  28.        }   
  29.        /*將receivers的集合打散,然后啟動它們 */ 
  30.        ……        
  31.        ssc.sparkContext.runJob(tempRDD, startReceiver)  
  32.        ……      
  33.     }  

2.數據接收與轉化

在“1.初始化與集群上分布接收器”中介紹了,receiver集合轉換為RDD在集群上分布式地接收數據流。那么每個receiver是怎樣接收并處理數據流的呢?Spark Streaming數據接收與轉化的示意圖如圖8-14所示。圖8-14的主要流程如下。

1)數據緩沖:在Receiver的receive函數中接收流數據,將接收到的數據源源不斷地放入BlockGenerator.currentBuffer。

2)緩沖數據轉化為數據塊:在BlockGenerator中有一個定時器(recurring timer),將當前緩沖區中的數據以用戶定義的時間間隔封裝為一個數據塊Block,放入BlockGenerator的blocksForPush隊列中。

3)數據塊轉化為Spark數據塊:在BlockGenerator中有一個BlockPushingThread線程,不斷地將blocksForPush隊列中的塊傳遞給Blockmanager,讓BlockManager將數據存儲為塊,讀者可以在本書的Spark IO章節了解Spark的底層存儲機制。BlockManager負責Spark中的塊管理。

4)元數據存儲:在pushArrayBuffer方法中還會將已經由BlockManager存儲的元數據信息(如Block的ID號)傳遞給ReceiverTracker,ReceiverTracker將存儲的blockId放到對應StreamId的隊列中。 上面過程中涉及最多的類就是BlockGenerator,在數據轉化的過程中,其扮演著不可或缺的角色。

  1. private[streaming] class BlockGenerator( listener: BlockGeneratorListener, receiverId: Int, conf: SparkConf ) extends Logging   

  


感興趣的讀者可以參照圖8-14中的類和方法更加具體地了解機制。由于篇幅所限,這個數據生成過程的代碼不再具體剖析。

【本文為51CTO專欄作者“王森豐”的原創稿件,轉載請注明出處】

責任編輯:龐桂玉 來源: 神算子
相關推薦

2018-04-09 12:25:11

2017-08-14 10:30:13

SparkSpark Strea擴容

2017-06-06 08:31:10

Spark Strea計算模型監控

2017-10-13 10:36:33

SparkSpark-Strea關系

2016-05-11 10:29:54

Spark Strea數據清理Spark

2016-01-28 10:11:30

Spark StreaSpark大數據平臺

2019-10-17 09:25:56

Spark StreaPVUV

2021-08-20 16:37:42

SparkSpark Strea

2017-09-26 09:35:22

2019-12-13 08:25:26

FlinkSpark Strea流數據

2023-10-24 20:32:40

大數據

2023-03-30 09:06:20

HiveSpark大數據

2009-09-14 10:35:15

Linq內部執行原理

2017-06-27 15:08:05

大數據Apache SparKafka Strea

2020-09-16 10:31:58

SMTP網絡電子郵件

2021-07-09 10:27:12

SparkStreaming系統

2017-10-11 11:10:02

Spark Strea大數據流式處理

2018-03-21 11:05:26

Spark大數據應用程序

2016-03-03 15:11:42

Spark Strea工作流調度器

2018-04-18 08:54:28

RDD內存Spark
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 91精品久久久久久久久99蜜臂 | 中文字幕精品一区 | 日韩精品在线看 | 国产小视频在线 | 91网站在线播放 | 91精品国产手机 | 欧美亚洲国产一区二区三区 | 亚洲激情综合 | 免费黄色网址视频 | 国产欧美精品一区 | 韩日一区 | 一级做a爰片久久毛片免费看 | 国产精品久久精品 | 黄色免费观看 | 欧美日韩美女 | 国产黄色一级片 | 伊人免费网 | 麻豆hd | 午夜av成人| 黄色一级网 | www.久久99| 亚洲国产高清高潮精品美女 | 欧美中文在线 | 国产欧美一区二区三区久久手机版 | 99久久婷婷国产综合精品电影 | 色爽女| 999精品视频 | 欧美一区二区在线观看 | 国产综合久久 | 一区二区三区高清 | 欧美一级二级在线观看 | 中文字幕在线观看视频一区 | 国产精品7777777| 日韩美女在线看免费观看 | 日本免费一区二区三区 | 日韩av一区二区在线观看 | 久久在线 | 精品九九九 | 日本偷偷操 | 亚洲精品福利在线 | 亚洲欧美在线观看 |