成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Spark streaming中持久保存的RDD/有狀態的內存

存儲 存儲軟件 Spark
以spark streaming為例,就是希望有個數據集能夠在當前批次中更新,再下個批次后又可以繼續訪問。一個最簡單的實現是在driver的內存中,我們可以自行保存一個大的內存結構。這種hack的方式就是我們無法利用spark提供的分布式計算的能力。

在面向流處理的分布式計算中,經常會有這種需求,希望需要處理的某個數據集能夠不隨著流式數據的流逝而消失。

以spark streaming為例,就是希望有個數據集能夠在當前批次中更新,再下個批次后又可以繼續訪問。一個最簡單的實現是在driver的內存中,我們可以自行保存一個大的內存結構。這種hack的方式就是我們無法利用spark提供的分布式計算的能力。

對此,spark streaming提供了stateful streaming, 可以創建一個有狀態的DStream,我們可以操作一個跨越不同批次的RDD。

[[226324]]

1 updateStateByKey

該方法提供了這樣的一種機制: 維護了一個可以跨越不同批次的RDD, 姑且成為StateRDD,在每個批次遍歷StateRDD的所有數據,對每條數據執行update方法。當update方法返回None時,淘汰StateRDD中的該條數據。

具體接口如下:

  1. /** 
  2.  * Return a new "state" DStream where the state for each key is updated by applying 
  3.  * the given function on the previous state of the key and the new values of each key
  4.  * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. 
  5.  * @param updateFunc State update function. If `this` function returns None, then 
  6.  *                   corresponding state key-value pair will be eliminated. 
  7.  * @param numPartitions Number of partitions of each RDD in the new DStream. 
  8.  * @tparam S State type 
  9.  */ 
  10. def updateStateByKey[S: ClassTag]( 
  11.     updateFunc: (Seq[V], Option[S]) => Option[S], 
  12.     numPartitions: Int 
  13.   ): DStream[(K, S)] = ssc.withScope { 
  14.   updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) 

即用戶需要實現一個updateFunc的函數,該函數的參數:

Seq[V] 該批次中相同key的數據,以Seq數組形式傳遞

Option[S] 歷史狀態中的數據

返回值: 返回需要保持的歷史狀態數據,為None時表示刪除該數據

  1. def updateStateFunc(lines: Seq[Array[String]], state: Option[Array[String]]): Option[Array[String]] = {...} 

這種做法簡單清晰明了,但是其中有一些可以優化的地方:

a) 如果DRDD增長到比較大的時候,而每個進入的批次數據量相比并不大,此時每次都需要遍歷DRDD,無論該批次中是否有數據需要更新DRDD。這種情況有的時候可能會引發性能問題。

b) 需要用戶自定義數據的淘汰機制。有的時候顯得不是那么方便。

c) 返回的類型需要和緩存中的類型相同。類型不能發生改變。

2 mapWithState

該接口是對updateSateByKey的改良,解決了updateStateFunc中可以優化的地方:

  1. * :: Experimental :: 
  2. Return a [[MapWithStateDStream]] by applying a function to every key-value element of 
  3. * `this` stream, while maintaining some state data for each unique key. The mapping function 
  4. and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this 
  5. * transformation can be specified using [[StateSpec]] class. The state data is accessible in 
  6. as a parameter of type [[State]] in the mapping function
  7. * Example of using `mapWithState`: 
  8. * {{{ 
  9. *    // A mapping function that maintains an integer state and return a String 
  10. *    def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = { 
  11. *      // Use state.exists(), state.get(), state.update() and state.remove() 
  12. *      // to manage state, and return the necessary string 
  13. *    } 
  14. *    val spec = StateSpec.function(mappingFunction).numPartitions(10) 
  15. *    val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec) 
  16. * }}} 
  17. * @param spec          Specification of this transformation 
  18. * @tparam StateType    Class type of the state data 
  19. * @tparam MappedType   Class type of the mapped data 
  20. */ 
  21. @Experimental 
  22. def mapWithState[StateType: ClassTag, MappedType: ClassTag]( 
  23.     spec: StateSpec[K, V, StateType, MappedType] 
  24.   ): MapWithStateDStream[K, V, StateType, MappedType] = { 
  25.   new MapWithStateDStreamImpl[K, V, StateType, MappedType]( 
  26.     self, 
  27.     spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]] 
  28.   ) 

其中spec封裝了用戶自定義的函數,用以更新緩存數據:

  1. mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType 

實現樣例如下:

  1. val mappingFunc = (k: String, line: Option[Array[String]], state: State[Array[String]]) => {...} 

參數分別代表:

數據的key: k

RDD中的每行數據: line

state: 緩存數據

當對state調用remove方法時,該數據會被刪除。

注意,如果數據超時,不要調用remove方法,因為spark會在mappingFunc后自動調用remove。

a) 與updateStateByKey 每次都要遍歷緩存數據不同,mapWithState每次遍歷每個批次中的數據,更新緩存中的數據。對于緩存數據較大的情況來說,性能會有較大提升。

b) 提供了內置的超時機制,當數據一定時間內沒有更新時,淘汰相應數據。

注意,當有數據到來或者有超時發生時,mappingFunc都會被調用。

3 checkpointing

通常情況下,在一個DStream鐘,對RDD的各種轉換而依賴的數據都是來自于當前批次中。但是當在進行有狀態的transformations時,包括updateStateByKey/reduceByKeyAndWindow 、mapWithSate,還會依賴于以前批次的數據,RDD的容錯機制,在異常情況需要重新計算RDD時,需要以前批次的RDD信息。如果這個依賴的鏈路過長,會需要大量的內存,即使有些RDD的數據在內存中,不需要計算。此時spark通過checkpoint來打破依賴鏈路。checkpoint會生成一個新的RDD到hdfs中,該RDD是計算后的結果集,而沒有對之前的RDD依賴。

此時一定要啟用checkpointing,以進行周期性的RDD Checkpointing

在StateDstream在實現RDD的compute方法時,就是將之前的PreStateRDD與當前批次中依賴的ParentRDD進行合并。

而checkpoint的實現是將上述合并的RDD寫入HDFS中。

現在checkpoint的實現中,數據寫入hdfs的過程是由一個固定的線程池異步完成的。一種存在的風險是上次checkpoint的數據尚未完成,此次又來了新的要寫的checkpoint數據,會加大集群的負載,可能會引發一系列的問題。

4 checkpoint周期設置:

對mapWithStateByKey/updateStateByKey返回的DStream可以調用checkpoint方法設置checkpoint的周期。注意傳遞的時間只能是批次時間的整數倍。

另外,對于mapWithState而言,checkpoint執行時,才會進行數據的刪除。 State.remove方法只是設置狀態,標記為刪除,數據并不會真的刪除。 SnapShot方法還是可以獲取得到。

責任編輯:武曉燕 來源: 數客聯盟
相關推薦

2016-10-24 09:52:45

SparkRDD容錯

2016-10-24 23:04:56

SparkRDD數據

2016-01-28 10:11:30

Spark StreaSpark大數據平臺

2017-04-25 09:50:16

SparkRDD核心

2017-08-14 10:30:13

SparkSpark Strea擴容

2017-06-06 08:31:10

Spark Strea計算模型監控

2016-12-19 14:35:32

Spark Strea原理剖析數據

2024-04-30 11:14:19

KubernetesReplicaSet數量

2017-10-13 10:36:33

SparkSpark-Strea關系

2019-10-08 11:10:18

React自動保存前端

2023-10-24 20:32:40

大數據

2021-08-20 16:37:42

SparkSpark Strea

2019-12-13 08:25:26

FlinkSpark Strea流數據

2018-04-09 12:25:11

2018-05-28 08:54:45

SparkRDD Cache緩存

2016-05-11 10:29:54

Spark Strea數據清理Spark

2021-07-09 10:27:12

SparkStreaming系統

2017-08-04 10:58:55

RDDSpark算子

2018-05-10 09:51:39

Spark內存Hadoop

2019-10-17 09:25:56

Spark StreaPVUV
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久热电影| 久久久久国产 | 亚洲欧美日韩一区 | 国户精品久久久久久久久久久不卡 | 国产精品一区二区av | 国产精品欧美日韩 | 超碰在线免费公开 | 欧美中文一区 | 中文字幕一区二区三区在线视频 | 欧美久久一级特黄毛片 | 欧美成人第一页 | 成人在线小视频 | www.成人.com | 超碰在线免费公开 | 91精品一区二区三区久久久久 | 久久精品亚洲 | 成年人在线观看 | 欧美在线资源 | 国产乱码精品一区二区三区忘忧草 | 老外黄色一级片 | 久久成 | av播播| 欧美综合国产精品久久丁香 | 6080亚洲精品一区二区 | 亚洲色在线视频 | 亚洲欧洲精品成人久久奇米网 | 婷婷色国产偷v国产偷v小说 | 成人深夜福利 | 亚洲久视频 | 亚洲人成人一区二区在线观看 | 日韩黄色av | 亚洲精品三级 | 1000部精品久久久久久久久 | 欧美aaaa视频 | 夜夜草导航 | www.激情.com | 韩三级在线观看 | xxxxx免费视频 | 视频1区| 亚洲区一 | 久久69精品久久久久久久电影好 |