成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Master 分配資源并在 Worker上啟動 Executor ,逐行代碼注釋版

開發 前端
這里有個假設是:Spark 集群以 Standalone 的方式來啟動的,作業也是提交到 Spark standalone 集群。

[[432016]]

本文轉載自微信公眾號「KK架構」,作者wangkai。轉載本文請聯系KK架構公眾號。

一、回顧一下之前的內容

上一次閱讀到了 SparkContext 初始化,繼續往下之前,先溫故一下之前的內容。

這里有個假設是:Spark 集群以 Standalone 的方式來啟動的,作業也是提交到 Spark standalone 集群。

首先需要啟動 Spark 集群,使用 start-all.sh 腳本依次啟動 Master (主備) 和多個 Worker。

啟動好之后,開始提交作業,使用 spark-submit 命令來提交。

  • 首先在提交任務的機器上使用 java 命令啟動了一個虛擬機,并且執行了主類 SparkSubmit 的 main 方法作為入口。
  • 然后根據提交到不同的集群,來 new 不同的客戶端類,如果是 standalone 的話,就 new 了一個 ClientApp;然后把 java DriverWrapper 這個命令封裝到 RequestSubmmitDriver 消息中,把這個消息發送給 Master;
  • Master 隨機找一個滿足資源條件的 Worker 來啟動 Driver,實際上是在虛擬機里執行 DriverWrapper 的 main 方法;
  • 然后 Worker 開始啟動 Driver,啟動的時候會執行用戶提交的 java 包里的 main 方法,然后開始執行 SparkContext 的初始化,依次在 Driver 中創建了 DAGScheduler、TaskScheduler、SchedulerBackend 三個重要的實例。并且啟動了 DriverEndpoint 和 ClientEndpoint ,用來和 Worker、Master 通信。

二、Master 處理應用的注冊

接著上次 ClientEndpoint 啟動之后,會向 Master 發送一個 RegisterApplication 消息,Master 開始處理這個消息。

然后看到 Matster 類處理 RegisterApplication 消息的地方:

可以看到,用應用程序的描述和 Driver 的引用創建了一個 Application,然后開始注冊這個 Application。

注冊 Application 很簡單,就是往 Master 的內存中加入各種信息,重點來了,把 ApplicationInfo 加入到了 waitingApps 這個結構里,然后 schedule() 方法會遍歷這個列表,為 Application 分配資源,并調度起來。

然后往 zk 中寫入了 Application 的信息,并且往 Driver 發送了一個 RegisteredApplication 應用已經注冊的消息。

接著開始 schedule(),這個方法上次講過,它會遍歷兩個列表,一個是遍歷 waitingDrivers 來啟動 Driver,一個是遍歷 waitingApps,來啟動 Application。

waitingDrivers 列表在客戶端請求啟動 Driver 的時候就處理過了,本次重點看這個方法:

  1. startExecutorsOnWorkers() 

三、Master 對資源的調度

有以下幾個步驟:

  • 遍歷 waitingApps 的所有 app;
  • 如果 app 需要的核數小于一個 Executor 可以提供的核數,就不為 app 分配新的 Executor;
  • 過濾出還有可供調度的 cpu 和 memory 的 workers,并按照 cores 的大小降序排序,作為 usableWorkers;
  • 計算所有 usableWorkers 上要分配多少 CPU;
  • 然后遍歷可用的 Workers,分配資源并執行調度,啟動 Executor。

源碼從 Master 類的 schedule() 方法的最后一行 startExecutorsOnWorkers() 開始:

這個方法主要作用是計算 worker 的 executor 數量和分配的資源并啟動 executor。

  1. /** 
  2.  * Schedule and launch executors on workers 
  3.  */ 
  4. private def startExecutorsOnWorkers(): Unit = { 
  5.     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app 
  6.     // in the queue, then the second app, etc. 
  7.  
  8.     for (app <- waitingApps) { 
  9.         val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) 
  10.         // If the cores left is less than the coresPerExecutor,the cores left will not be allocated 
  11.         if (app.coresLeft >= coresPerExecutor) { 
  12.             // 1. 剩余內存大于單個 executor 需要的內存 
  13.             // 2. 剩余的內核數大于單個 executor 需要的內核數 
  14.             // 3. 按照內核數從大到小排序 
  15.             // Filter out workers that don't have enough resources to launch an executor 
  16.             val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) 
  17.                 .filter(canLaunchExecutor(_, app.desc)) 
  18.                 .sortBy(_.coresFree).reverse 
  19.             val appMayHang = waitingApps.length == 1 && 
  20.                 waitingApps.head.executors.isEmpty && usableWorkers.isEmpty 
  21.             if (appMayHang) { 
  22.                 logWarning(s"App ${app.id} requires more resource than any of Workers could have."
  23.             } 
  24.             // 計算每個 Worker 上可用的 cores 
  25.             val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) 
  26.              
  27.             // Now that we've decided how many cores to allocate on each worker, let's allocate them 
  28.             for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { 
  29.                 allocateWorkerResourceToExecutors( 
  30.                     app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos)) 
  31.             } 
  32.         } 
  33.     } 

(1)遍歷 waitingApps,如果 app 還需要的 cpu 核數大于每個執行器的核數,才繼續分配。

(2)過濾可用的 worker,條件一:該 worker 剩余內存大于單個 executor 需要的內存;條件二:該 worker 剩余 cpu 核數大于單個 executor 需要的核數;然后按照可用 cpu核數從大到小排序。

(3)下面兩個方法是關鍵的方法

scheduleExecutorsOnWorkers(),用來計算每個 Worker 上可用的 cpu 核數;

allocateWorkerResourceToExecutors() 用來真正在 Worker 上分配 Executor。

四、scheduleExecutorsOnWorkers 計算每個 Worker 可用的核數

這個方法很長,首先看方法注釋,大致翻譯了一下:

當執行器分配的 cpu 核數(spark.executor.cores)被顯示設置的時候,如果這個 worker 上有足夠的核數和內存的話,那么每個 worker 上可以執行多個執行器;反之,沒有設置的時候,每個 worker 上只能啟動一個執行器;并且,這個執行器會使用 worker 能提供出來的盡可能多的核數;

appA 和 appB 都有一個執行器運行在 worker1 上。但是 appA 還需要一些 cpu 核,當 appB 執行結束,釋放了它在 worker1 上的核數時, 下一次調度的時候,appA 會新啟動一個 executor 獲得了 worker1 上所有的可用的核心,因此 appA 就在 worker1 上啟動了多個執行器。

設置 coresPerExecutor (spark.executor.cores)很重要,考慮下面的例子:集群有4個worker,每個worker有16核;用戶請求 3 個執行器(spark.cores.max = 48,spark.executor.cores=16)。如果不設置這個參數,那么每次分配 1 個 cpu核心,每個 worker 輪流分配一個 cpu核,最終 4 個執行器分配 12 個核心給每個 executor,4 個 worker 也同樣分配了48個核心,但是最終每個 executor 只有 12核 < 16 核,所以最終沒有執行器被啟動。

如果看我的翻譯還是很費勁,我就再精簡下:

  • 如果沒有設置 spark.executor.cores,那么每個 Worker 只能啟動一個 Executor,并且這個 Executor 會占用所有 Worker 能提供的 cpu核數;
  • 如果顯示設置了,那么每個 Worker 可以啟動多個 Executor;

下面是源碼,每句都有挨個注釋過,中間有一個方法是判斷這個 Worker 上還能不能再分配 Executor 了。

重點是中間方法后面那一段,遍歷每個 Worker 分配 cpu,如果不是 Spend Out 模式,則在一個 Worker 上一直分配,直到 Worker 資源分配完畢。

  1. private def scheduleExecutorsOnWorkers( 
  2.     app: ApplicationInfo, 
  3.     usableWorkers: Array[WorkerInfo], 
  4.     spreadOutApps: Boolean): Array[Int] = { 
  5.     // 每個 executor 的核數 
  6.     val coresPerExecutor = app.desc.coresPerExecutor 
  7.     // 每個 executor 的最小核數 為1 
  8.     val minCoresPerExecutor = coresPerExecutor.getOrElse(1) 
  9.     //  每個Worker分配一個Executor? 這個參數可以控制這個行為 
  10.     val oneExecutorPerWorker = coresPerExecutor.isEmpty 
  11.     //  每個Executor的內存 
  12.     val memoryPerExecutor = app.desc.memoryPerExecutorMB 
  13.     val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor 
  14.     // 可用 Worker 的總數 
  15.      
  16.     val numUsable = usableWorkers.length 
  17.     // 給每個Worker的cores數 
  18.     val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker 
  19.     // 給每個Worker上新的Executor數 
  20.     val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker 
  21.     // app 需要的核心數 和 所有 worker 能提供的核心總數,取最小值 
  22.     var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum
  23.  
  24.     //  判斷指定的worker是否可以為這個app啟動一個executor 
  25.     /** Return whether the specified worker can launch an executor for this app. */ 
  26.     def canLaunchExecutorForApp(pos: Int): Boolean = { 
  27.         // 如果能提供的核心數 大于等 executor 需要的最小核心數,則繼續分配 
  28.         val keepScheduling = coresToAssign >= minCoresPerExecutor 
  29.         // 是否有足夠的核心:當前 worker 能提供的核數 減去 每個 worker 已分配的核心數 ,大于每個 executor最小的核心數 
  30.         val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor 
  31.         // 當前 worker 新分配的 executor 個數 
  32.         val assignedExecutorNum = assignedExecutors(pos) 
  33.  
  34.         //  如果每個worker允許多個executor,就能一直在啟動新的的executor 
  35.         //  如果在這個worker上已經有executor,則給這個executor更多的core 
  36.         // If we allow multiple executors per worker, then we can always launch new executors. 
  37.         // Otherwise, if there is already an executor on this worker, just give it more cores. 
  38.  
  39.         // 如果一個 worker 上可以啟動多個 executor  或者 這個 worker 還沒分配 executor 
  40.         val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0 
  41.         if (launchingNewExecutor) { 
  42.             // 總共已經分配的內存 
  43.             val assignedMemory = assignedExecutorNum * memoryPerExecutor 
  44.             // 是否有足夠的內存:當前worker 的剩余內存 減去 已分配的內存 大于每個 executor需要的內存 
  45.             val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor 
  46.             // 
  47.             val assignedResources = resourceReqsPerExecutor.map { 
  48.                 req => req.resourceName -> req.amount * assignedExecutorNum 
  49.             }.toMap 
  50.             val resourcesFree = usableWorkers(pos).resourcesAmountFree.map { 
  51.                 case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0)) 
  52.             } 
  53.             val enoughResources = ResourceUtils.resourcesMeetRequirements( 
  54.                 resourcesFree, resourceReqsPerExecutor) 
  55.             // 所有已分配的核數+app需要的核數  小于 app的核數限制 
  56.             val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit 
  57.             keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit 
  58.         } else { 
  59.             // We're adding cores to an existing executor, so no need 
  60.             // to check memory and executor limits 
  61.             keepScheduling && enoughCores 
  62.         } 
  63.     } 
  64.  
  65.     // 不斷的啟動executor,直到不再有Worker可以容納任何Executor,或者達到了這個Application的要求 
  66.     // Keep launching executors until no more workers can accommodate any 
  67.     // more executors, or if we have reached this application's limits 
  68.     // 過濾出可以啟動 executor 的 workers 
  69.     var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp) 
  70.  
  71.     while (freeWorkers.nonEmpty) { 
  72.         // 遍歷每個 worker 
  73.         freeWorkers.foreach { pos => 
  74.             var keepScheduling = true 
  75.             while (keepScheduling && canLaunchExecutorForApp(pos)) { 
  76.                 coresToAssign -= minCoresPerExecutor 
  77.                 assignedCores(pos) += minCoresPerExecutor 
  78.  
  79.                 //  如果我們在每個worker上啟動一個executor,每次迭代為每個executor增加一個core 
  80.                 //  否則,每次迭代都會為新的executor分配cores 
  81.                 // If we are launching one executor per worker, then every iteration assigns 1 core 
  82.                 // to the executor. Otherwise, every iteration assigns cores to a new executor. 
  83.                 if (oneExecutorPerWorker) { 
  84.                     assignedExecutors(pos) = 1 
  85.                 } else { 
  86.                     assignedExecutors(pos) += 1 
  87.                 } 
  88.  
  89.                 //  如果不使用Spreading out方法,我們會在這個worker上繼續調度executor,直到使用它所有的資源 
  90.                 //  否則,就跳轉到下一個worker 
  91.                 // Spreading out an application means spreading out its executors across as 
  92.                 // many workers as possible. If we are not spreading outthen we should keep 
  93.                 // scheduling executors on this worker until we use all of its resources. 
  94.                 // Otherwise, just move on to the next worker. 
  95.                 if (spreadOutApps) { 
  96.                     keepScheduling = false 
  97.                 } 
  98.             } 
  99.         } 
  100.         freeWorkers = freeWorkers.filter(canLaunchExecutorForApp) 
  101.     } 
  102.     assignedCores 

接著真正開始在 Worker 上啟動 Executor:

在 launchExecutor 在方法里:

  1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { 
  2.     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) 
  3.     worker.addExecutor(exec
  4.     worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, 
  5.         exec.application.descexec.cores, exec.memory, exec.resources)) 
  6.     exec.application.driver.send( 
  7.         ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) 

給 Worker 發送了一個 LaunchExecutor 消息。

然后給執行器對應的 Driver 發送了 ExecutorAdded 消息。

五、總結

本次我們講了 Master 處理應用的注冊,重點是把 app 信息加入到 waitingApps 列表中,然后調用 schedule() 方法,計算每個 Worker 可用的 cpu核數,并且在 Worker 上啟動執行器。

 

責任編輯:武曉燕 來源: KK架構
相關推薦

2011-04-19 13:32:52

2009-12-24 11:04:59

固定分配資源動態分配資源

2015-04-17 10:28:02

無線頻譜移動通信頻譜

2021-08-31 23:09:27

Spark資源分配

2012-06-05 08:59:35

Hadoop架構服務器

2012-03-09 17:38:17

ibmdw

2014-12-26 10:58:35

托管云托管私有云公共云

2010-04-07 15:55:17

無線接入頻段

2022-12-12 08:42:06

Java對象棧內存

2022-06-06 12:02:23

代碼注釋語言

2011-04-19 13:48:55

vCloud Dire

2013-04-17 15:10:07

銳捷寬帶寬帶網絡

2024-10-09 14:25:21

2013-05-21 09:08:24

服務器虛擬化網卡

2019-12-20 08:50:21

LinuxKsnip截圖

2021-06-22 16:40:32

鴻蒙HarmonyOS應用

2022-04-19 07:47:13

數據中心末端資源分配

2016-03-21 18:56:54

物聯網IoTIT基礎架構

2023-10-24 07:25:10

容器資源云分級

2011-01-26 11:01:37

虛擬機負載管理資源分配
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美激情 亚洲 | 99综合在线 | 亚洲精品自在在线观看 | 欧美精品一区二区三区视频 | 亚洲三区视频 | 91欧美激情一区二区三区成人 | 一级毛片在线播放 | 久久国产日韩欧美 | 日韩成人免费视频 | 日韩中文字幕在线观看 | 亚洲手机视频在线 | 中文字幕在线精品 | 羞羞视频免费在线 | 日本欧美在线视频 | 97在线观视频免费观看 | 亚洲一区日韩 | 午夜精品久久久久99蜜 | 亚洲欧美国产精品久久 | 欧美激情视频一区二区三区免费 | 久久久国| 国产视频精品在线 | 久草a√ | 一区二区免费在线 | 国产一区二区在线视频 | 亚洲精品久久久蜜桃 | 99免费在线观看 | 国产偷久久一级精品60部 | 久久久www成人免费精品张筱雨 | 中文字幕亚洲视频 | 日本a∨精品中文字幕在线 亚洲91视频 | av网址在线播放 | 日韩欧美精品在线 | 日韩av免费看 | 欧美一级片免费看 | 亚洲国产成人在线观看 | 日韩欧美国产一区二区三区 | 一区二区三区影院 | 911影院| 日韩中文一区二区三区 | 欧美一二三区 | 青青草原综合久久大伊人精品 |