成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

spark 自己的分布式存儲系統 - BlockManager

存儲 存儲軟件 大數據 Spark 分布式
BlockManager 是 spark 中至關重要的一個組件, 在 spark的的運行過程中到處都有 BlockManager 的身影, 只有搞清楚 BlockManager 的原理和機制,你才能更加深入的理解 spark。 今天我們來揭開 BlockaManager 的底層原理和設計思路,

整體架構

BlockManager 是 spark 中至關重要的一個組件, 在 spark的的運行過程中到處都有 BlockManager 的身影, 只有搞清楚 BlockManager 的原理和機制,你才能更加深入的理解 spark。 今天我們來揭開 BlockaManager 的底層原理和設計思路,

BlockManager 是一個嵌入在 spark 中的 key-value型分布式存儲系統,是為 spark 量身打造的,

BlockManager 在一個 spark 應用中作為一個本地緩存運行在所有的節點上, 包括所有 driver 和 executor上。 BlockManager 對本地和遠程提供一致的 get 和set 數據塊接口, BlockManager 本身使用不同的存儲方式來存儲這些數據, 包括 memory, disk, off-heap。

 

上面是一個整體的架構圖, BlockManagerMaster擁有BlockManagerMasterEndpoint 的actor和所有BlockManagerSlaveEndpoint的ref, 可以通過這些引用對 slave 下達命令

executor 節點上的BlockManagerMaster 則擁有BlockManagerMasterEndpoint的ref和自身BlockManagerSlaveEndpoint的actor。可以通過 Master的引用注冊自己。

在master 和 slave 可以正常的通信之后, 就可以根據設計的交互協議進行交互, 整個分布式緩存系統也就運轉起來了,

初始化

我們知道, sparkEnv 啟動的時候會啟動各個組件, BlockManager 也不例外, 也是這個時候啟動的,

啟動的時候會根據自己是在 driver 還是 executor 上進行不同的啟動過程,

  1. def registerOrLookupEndpoint( 
  2.         name: String, endpointCreator: => RpcEndpoint): 
  3.       RpcEndpointRef = { 
  4.       if (isDriver) { 
  5.         logInfo("Registering " + name
  6.         rpcEnv.setupEndpoint(name, endpointCreator) 
  7.       } else { 
  8.         RpcUtils.makeDriverRef(name, conf, rpcEnv) 
  9.       } 
  10.     } 

上圖是 sparkEnv 在 master上啟動的時候, 構造了一個 BlockManagerMasterEndpoint, 然后把這個Endpoint 注冊在 rpcEnv中, 同時也會啟動自己的 BlockManager

上圖是 sparkEnv 在executor上啟動的時候, 通過 setupEndpointRef 方法獲取到了  BlockManagerMaster的引用 BlockManagerMasterRef, 同時也會啟動自己的 BlockManager,

在 BlockManager 初始化自己的時候, 會向 BlockManagerMasterEndpoint 注冊自己, BlockManagerMasterEndpoint 發送 registerBlockManager消息,  BlockManagerMasterEndpoint 接受到消息, 把 BlockManagerSlaveEndpoint  的引用 保存在自己的  blockManagerInfo 數據結構中以待后用。

分布式協議

下面的一個表格是 master 和 slave 接受到各種類型的消息, 以及接受到消息后,做的處理。

  • BlockManagerMasterEndpoint  接受的消息

  • BlockManagerSlaveEndpoint 接受的消息

根據以上的協議, 相信我們可以很清楚的猜測整個交互的流程, 一般過程應該是這樣的, slave的 BlockManager  在自己接的上存儲一個 Block, 然后把這個 BlockId 匯報到master的BlockManager , 經過 cache, shuffle 或者 Broadcast后,別的節點需要上一步的Block的時候, 會到 master 獲取數據所在位置, 然后去相應節點上去 fetch。

存儲層

在RDD層面上我們了解到RDD是由不同的partition組成的,我們所進行的transformation和action是在partition上面進行的;而在storage模塊內部,RDD又被視為由不同的block組成,對于RDD的存取是以block為單位進行的,本質上partition和block是等價的,只是看待的角度不同。在Spark storage模塊中中存取數據的最小單位是block,所有的操作都是以block為單位進行的。

 

BlockManager對象被創建的時候會創建出MemoryStore和DiskStore對象用以存取block,如果內存中擁有足夠的內存, 就 使用 MemoryStore存儲,  如果 不夠, 就 spill 到 磁盤中, 通過 DiskStore進行存儲。

  • DiskStore 有一個DiskBlockManager,DiskBlockManager 主要用來創建并持有邏輯 blocks 與磁盤上的 blocks之間的映射,一個邏輯 block 通過 BlockId 映射到一個磁盤上的文件。 在 DiskStore 中會調用  diskManager.getFile 方法, 如果子文件夾不存在,會進行創建, 文件夾的命名方式為(spark-local-yyyyMMddHHmmss-xxxx, xxxx是一個隨機數), 所有的block都會存儲在所創建的folder里面。
  • MemoryStore 相對于DiskStore需要根據block id hash計算出文件路徑并將block存放到對應的文件里面,MemoryStore管理block就顯得非常簡單:MemoryStore內部維護了一個hash map來管理所有的block,以block id為key將block存放到hash map中。而從MemoryStore中取得block則非常簡單,只需從hash map中取出block id對應的value即可。

BlockManager 的 PUT 和GET接口

BlockManager 提供了 Put 接口和 Get 接口, 這兩個 api 屏蔽了底層的細節, 我們來看下底層是如何實現的

  • GET操作 如果 local 中存在就直接返回, 從本地獲取一個Block, 會先判斷如果是 useMemory, 直接從內存中取出, 如果是 useDisk, 會從磁盤中取出返回, 然后根據useMemory判斷是否在內存中緩存一下,方便下次獲取,  如果local 不存在, 從其他節點上獲取, 當然元信息是存在 drive上的,要根據我們上文中提到的 GETlocation 協議獲取 Block 所在節點位置, 然后到其他節點上獲取。
  • PUT操作 操作之前會加鎖來避免多線程的問題, 存儲的時候會根據 存儲級別, 調用對應的是 memoryStore 還是  diskStore, 然后在具體存儲器上面調用 存儲接口。 如果有 replication 需求, 會把數據備份到其他的機器上面。

blockManager 和 blockTransferService 關系

spark 歷史上使用過兩套網絡框架, 最開始的時候, rpc 調用使用的是 akka, 大文件傳輸使用的是 netty,  后面統一全部使用 netty,  這里的大文件傳輸其實走的是 netty,  在啟動 blockManager的時候會啟動一個 blockTransferService 服務, 這個服務就是用來傳輸大文件用的, 對應的具體類是  NettyBlockTransferService, 這個實例中也會有 BlocakManager的引用, 會啟動一個 NettyBlockRpcServer的 netty Handler, 也擁有 BlocakManager 的引用,  用來提供服務, BlocakManager 根據 BlockId 獲取一個 Block 然后包裝為一個 ManagedBuffer 對象,

當我們需要從遠端獲取一個 Block的時候,就需要 blockTransferService 傳輸大的字節數組,

首先需要從 driver上獲取到 Block的真正存儲位置, 然后調用 blockTransferService 的 fetchBlocks方法, 去其他真正存儲節點上去fetch數據, 會從 client 資源池中獲取一個client,  如果是一對一的進行fetch,  使用的是 OneForOneBlockFetcher, 這個Fetcher 是以 Chunks 為單位分別單獨fetch,  每個 Chunks 也就對應一個Block的數據, 根據配置,會進行重試直到***重試次數,發送 OpenBlocks消息,  里面會包裝對應的是哪個  BlockId,  其他節點服務端會根據 BlockId 從 blockManager中拿到數據, 然后用來傳輸, 使用的是 netty 的流式傳輸方式, 同時也會有回調函數,

如果是備份的時候同步上傳一個 Block,  其他節點服務端會根據,uploadBlock消息中包含的BlockId, 在本地的BlockManager 中冗余存儲一份,

ChunkFetch也有一個類似Stream的概念,ChunkFetch的對象是“一個內存中的Iterator[ManagedBuffer]”,即一組Buffer,每一個Buffer對應一個chunkIndex,整個Iterator[ManagedBuffer]由一個StreamID標識。Client每次的ChunkFetch請求是由(streamId,chunkIndex)組成的唯一的StreamChunkId,Server端根據StreamChunkId獲取為一個Buffer并返回給Client; 不管是Stream還是ChunkFetch,在Server的內存中都需要管理一組由StreamID與資源之間映射,即StreamManager類,它提供了getChunk和openStream兩個接口來分別響應ChunkFetch與Stream兩種操作,并且針對Server的ChunkFetch提供一個registerStream接口來注冊一組Buffer,比如可以將BlockManager中一組BlockID對應的Iterator[ManagedBuffer]注冊到StreamManager,從而支持遠程Block Fetch操作。

對于ExternalShuffleService(一種單獨shuffle服務進程,對其他計算節點提供本節點上面的所有shuffle map輸出),它為遠程Executor提供了一種OpenBlocks的RPC接口,即根據請求的appid,executorid,blockid(appid+executor對應本地一組目錄,blockid拆封出)從本地磁盤中加載一組FileSegmentManagedBuffer到內存,并返回加載后的streamId返回給客戶端,從而支持后續的ChunkFetch的操作。

Partition 與 Block 的關系

我們都知道, RDD 的運算是基于 partition, 每個 task 代表一個 分區上一個 stage 內的運算閉包, task 被分別調度到 多個 executor上去運行, 那么是在哪里變成了 Block 呢,  我們以 spark 2.11 源碼為準, 看看這個轉變過程,

一個 RDD 調度到 executor 上會運行調用 getOrCompute方法,

  1. SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { 
  2.       readCachedBlock = false 
  3.       computeOrReadCheckpoint(partition, context) 
  4.     }) 

如果 Block 在 BlockManager 中存在, 就會從 BlockManager 中獲取,如果不存在, 就進行計算這個Block, 然后在 BlockManager 中進行存儲持久化, 方便下次使用,

當然獲取的時候是先從本地的 BlockManager 中獲取, 如果本地沒有, 然后再 從 remote 獲取, 先從 driver 上獲取到元數據 Block的位置, 然后去到真正的節點上fetch

如果沒有, 就進行計算, 然后根據存儲級別,存儲到計算節點本地的BlockManager 的內存或磁盤中,

這樣RDD的transformation、action就和block數據建立了聯系,雖然抽象上我們的操作是在partition層面上進行的,但是partition最終還是被映射成為block,因此實際上我們的所有操作都是對block的處理和存取。

blockManager 在 spark 中扮演的角色

blockManager 是非常非常重要的一個 spark 組件, 我們隨便舉幾個例子, 你就知道 BlockManager 多重要了 ,

  • spark  shuffle 的過程總用到了 BlockManager 作為數據的中轉站
  • spark broadcast 調度 task 到多個 executor 的時候, broadCast 底層使用的數據存儲層
  • spark streaming  一個 ReceiverInputDStream 接受到的數據也是先放在 BlockManager 中, 然后封裝為一個 BlockRdd 進行下一步運算的
  • 如果我們 對一個 rdd 進行了cache, cacheManager 也是把數據放在了 blockmanager 中, 截斷了計算鏈依賴, 后續task 運行的時候可以直接從 cacheManager 中獲取到 cacherdd ,不用再從頭計算。

spark cache  與  spark   broadcast task

我隨便舉兩個例子, 看看具體 spark cache 和 spark  broadcast 調度 task 的時候怎么用的 blockManager的

spark cache

rdd 計算的時候, 首先根據RDD id和partition index構造出block id (rdd_xx_xx), 接著從BlockManager中取出相應的block, 如果該block存在,表示此RDD在之前已經被計算過和存儲在BlockManager中,因此取出即可,無需再重新計算。 如果 block 不存在我們可以 計算出來, 然后吧 block 通過   doPutIterator 函數存儲在 節點上的 BlockManager上面, 匯報block信息到 driver, 下次如果使用同一個 rdd, 就可以直接從分布式存儲中 直接取出相應的 block

下面看一下源碼

  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = { 
  2.     if (storageLevel != StorageLevel.NONE) { 
  3.       getOrCompute(split, context) 
  4.     } else { 
  5.       computeOrReadCheckpoint(split, context) 
  6.     } 
  7.   } 

如果存儲級別不是 NONE類型就會調用 getOrCompute 這個我們已經看過了,  里面實際調用  SparkEnv.get.blockManager.getOrElseUpdate 方法, 如果 Block 在 BlockManager 中存在, 就會從 BlockManager 中獲取,如果不存在, 就進行計算這個Block, 然后在 BlockManager 中進行存儲持久化, 方便下次使用,

在  BlockManager 進行存儲后, 會調用下面的代碼把 匯報block信息到 driver,

  1. private def tryToReportBlockStatus( 
  2.      blockId: BlockId, 
  3.      status: BlockStatus, 
  4.      droppedMemorySize: Long = 0L): Boolean = { 
  5.    val storageLevel = status.storageLevel 
  6.    val inMemSize = Math.max(status.memSize, droppedMemorySize) 
  7.    val onDiskSize = status.diskSize 
  8.    master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) 
  9.  } 

實際上上想  masterEndpoint 的引用發送一條 UpdateBlockInfo消息,  master 會把這個 blockId 對應的 location 放在 driver 上,

同樣的如果一個 Block已經計算過了,會到 driver 上獲取到 location 信息

  1. private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { 
  2.    val locs = Random.shuffle(master.getLocations(blockId)) 
  3.    val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } 
  4.    preferredLocs ++ otherLocs 
  5.  } 

spark   broadcast task

這個調度 task 到多個 task 上面過程代碼太多,我就不貼了, 直接說一下流程,

  • DAGScheduler 在  submitMissingTasks 方法提交 task的時候, 會把 task 包裝為一個 Broadcast 類型, 里面使用 TorrentBroadcastFactory 創建一個 TorrentBroadcast 的類型, 使用的是p2p的協議, 會減輕 master 的壓力,  這個里面會 調用 writeBlocks 里面把taskBinary  通過 blockManager.putSingle 放在 BlockManager 緩存中
  • ShuffleMapTask 或者 ResultTask,然后調用 runTask 方法, 里面實際上會調用 Broadcast 的value 方法, 里面最終調用了 BlockManager 的 getLocalBytes 或者 getRemoteBytes 方法

blockManager 在  spark streaming 中的應用

  • ReceiverTracker 在啟動的時候,會運行一個 job, 這個job 就是到 各個executor上去啟動 ReceiverSupervisorImpl, 然后啟動各個具體的數據接收器,  如果是SocketInputDStream, 就會啟動一個 SocketReceiver,
  • Receiver 接收到數據后, 先在 BlockGenerator 中緩存, 等到達一定的大小后,  調用 BlockManagerBasedBlockHandler 的 storeBlock方法持久化到 BlockManager 中, 然后把數據信息匯報到 ReceiverTracker上, 最終 匯總到   ReceivedBlockTracker 中的 timeToAllocatedBlocks中,
  • ReceiverInputDStream compute的時候,  receivedBlockTracker 會根據時間獲取到  BlockManager 中的元信息,里面最終對應的還是 BlockManager 的存儲位置, 最終獲取到數據進行計算,

測試 blockManager

我們做一個簡單的測試,兩端代碼的區別就是 一個 進行了cache ,一個沒有進行cache。

  1. val file = sc.textFile("/fusionlog/midsourcenew/2017-03-13-18-15_2.gz" 
  2. file.count()  
  3. file.count() 

我們從日志可以觀察出來, ***段代碼, 兩個 job 中都從 hdfs 中讀取文件, 讀取了兩次,

  1. val file = sc.textFile("/fusionlog/midsourcenew/2017-03-13-18-15_2.gz").cache() 
  2. file.count() 
  3. file.count() 

有以下日志

  1. MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 1354.9 MB, free 4.9 GB) 
  2. BlockManager: Found block rdd_1_0 locally 

我們發現在***次讀取文件后, 把文件 cache 在了 blockManager 中, 下一個 job 運行的時候, 在本地 BlockManager 直接發現獲取到了 block , 沒有讀取 hdfs 文件 ,

在 spark ui 中也發現了 cache的 Block, 全部是在內存中緩存的, 

責任編輯:武曉燕 來源: spark技術分享
相關推薦

2017-04-14 09:48:25

分布式存儲系統

2018-09-29 14:08:04

存儲系統分布式

2017-10-16 10:24:47

LogDevice存儲系統

2017-10-12 09:36:54

分布式存儲系統

2017-10-19 08:45:15

存儲系統HBase

2018-11-20 09:19:58

存儲系統雪崩效應

2017-07-18 09:51:36

文件存儲系統

2017-10-17 08:33:31

存儲系統分布式

2017-12-18 10:47:04

分布式存儲數據

2019-10-15 10:59:43

分布式存儲系統

2019-05-13 15:20:42

存儲系統算法

2018-10-24 11:01:53

分布式存儲系統

2018-10-29 12:42:23

Ceph分布式存儲

2014-02-19 11:37:57

分布式對象存儲Sheepdog

2013-12-27 10:56:42

分布式對象存儲Sheepdog性能測試

2010-07-02 10:08:12

BigtableGoogle

2021-08-07 05:00:20

存儲系統

2018-03-13 08:45:08

存儲系統DHT算法

2025-01-26 11:54:39

分布式存儲系統

2021-07-04 07:07:06

Ceph分布式存儲架構
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产欧美一区二区三区在线播放 | 99久久免费精品国产免费高清 | 国产一区二区三区四区在线观看 | 91在线一区 | 中文字幕av在线一二三区 | 五月激情婷婷六月 | 一区二区免费高清视频 | 视频一区二区在线观看 | 亚洲精品日韩一区二区电影 | 91视视频在线观看入口直接观看 | 国产欧美日韩综合精品一 | 国产精品亚洲综合 | 久久一区二区三区免费 | 亚洲一区二区三区在线播放 | 91精品国产高清久久久久久久久 | 国产一二三区免费视频 | 国产精品1区2区3区 欧美 中文字幕 | 91九色麻豆 | 亚洲va国产日韩欧美精品色婷婷 | 国产成人99 | 国产伦精品一区二区三区精品视频 | 国产日韩欧美 | 日韩精品在线看 | 国产激情免费视频 | 久久久www成人免费无遮挡大片 | 国产1区2区在线观看 | 欧美色偷拍| 国产视频一区二区在线观看 | 国产精品一区二区三区在线播放 | 国产91av视频 | 国产成人啪免费观看软件 | 在线国产精品一区 | 成人免费黄色片 | 色综久久 | 夜夜爽99久久国产综合精品女不卡 | 91精品综合久久久久久五月天 | 免费小视频在线观看 | 一级毛片大全免费播放 | 久久久久亚洲精品 | 亚洲一区 | 国产精品视频久久 |