還不收藏?Spark動態內存管理源碼解析!
一、Spark內存管理模式
Spark有兩種內存管理模式,靜態內存管理(Static MemoryManager)和動態(統一)內存管理(Unified MemoryManager)。動態內存管理從Spark1.6開始引入,在SparkEnv.scala中的源碼可以看到,Spark目前默認采用動態內存管理模式,若將spark.memory.useLegacyMode設置為true,則會改為采用靜態內存管理。
- // SparkEnv.scala
- val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
- val memoryManager: MemoryManager =
- if (useLegacyMemoryManager) {
- new StaticMemoryManager(conf, numUsableCores)
- } else {
- UnifiedMemoryManager(conf, numUsableCores)
- }
二、Spark動態內存管理空間分配
相比于Static MemoryManager模式,Unified MemoryManager模型打破了存儲內存和運行內存的界限,使每一個內存區能夠動態伸縮,降低OOM的概率。由上圖可知,executor JVM內存主要由以下幾個區域組成:
(1)Reserved Memory(預留內存):這部分內存預留給系統使用,默認為300MB,可通過spark.testing.reservedMemory進行設置。
- // UnifiedMemoryManager.scala
- private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
另外,JVM內存的最小值也與reserved Memory有關,即minSystemMemory = reserved Memory*1.5,即默認情況下JVM內存最小值為300MB*1.5=450MB。
- // UnifiedMemoryManager.scala
- val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
(2)Spark Memeoy:分為execution Memory和storage Memory。去除掉reserved Memory,剩下usableMemory的一部分用于execution和storage這兩類堆內存,默認是0.6,可通過spark.memory.fraction進行設置。例如:JVM內存是1G,那么用于execution和storage的默認內存為(1024-300)*0.6=434MB。
- // UnifiedMemoryManager.scala
- val usableMemory = systemMemory - reservedMemory
- val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
- (usableMemory * memoryFraction).toLong
他們的邊界由spark.memory.storageFraction設定,默認為0.5。即默認狀態下storage Memory和execution Memory為1:1.
- // UnifiedMemoryManager.scala
- onHeapStorageRegionSize =
- (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
- numCores = numCores)
(3)user Memory:剩余內存,用戶根據需要使用,默認占usableMemory的(1-0.6)=0.4.
三、內存控制詳解
首先我們先來了解一下Spark內存管理實現類之前的關系。
1.MemoryManager主要功能是:(1)記錄用了多少StorageMemory和ExecutionMemory;(2)申請Storage、Execution和Unroll Memory;(3)釋放Stroage和Execution Memory。
Execution內存用來執行shuffle、joins、sorts和aggegations操作,Storage內存用于緩存和廣播數據,每一個JVM中都存在著一個MemoryManager。構造MemoryManager需要指定onHeapStorageMemory和onHeapExecutionMemory參數。
- // MemoryManager.scala
- private[spark] abstract class MemoryManager(
- conf: SparkConf,
- numCores: Int,
- onHeapStorageMemory: Long,
- onHeapExecutionMemory: Long) extends Logging {
創建StorageMemoryPool和ExecutionMemoryPool對象,用來創建堆內或堆外的Storage和Execution內存池,管理Storage和Execution的內存分配。
- // MemoryManager.scala
- @GuardedBy("this")
- protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
- @GuardedBy("this")
- protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
- @GuardedBy("this")
- protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
- @GuardedBy("this")
- protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
默認情況下,不使用堆外內存,可通過saprk.memory.offHeap.enabled設置,默認堆外內存為0,可使用spark.memory.offHeap.size參數設置。
- // All the code you will ever need
- final val tungstenMemoryMode: MemoryMode = {
- if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
- require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
- "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
- require(Platform.unaligned(),
- "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
- MemoryMode.OFF_HEAP
- } else {
- MemoryMode.ON_HEAP
- }
- }
- // MemoryManager.scala
- protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
釋放numBytes字節的Execution內存方法
- // MemoryManager.scala
- def releaseExecutionMemory(
- numBytes: Long,
- taskAttemptId: Long,
- memoryMode: MemoryMode): Unit = synchronized {
- memoryMode match {
- case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
- case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
- }
- }
釋放指定task的所有Execution內存并將該task標記為inactive。
- // MemoryManager.scala
- private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
- onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
- offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
- }
釋放numBytes字節的Stoarge內存方法
- // MemoryManager.scala
- def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
- memoryMode match {
- case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
- case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
- }
- }
釋放所有Storage內存方法
- // MemoryManager.scala
- final def releaseAllStorageMemory(): Unit = synchronized {
- onHeapStorageMemoryPool.releaseAllMemory()
- offHeapStorageMemoryPool.releaseAllMemory()
- }
2.接下來我們了解一下,UnifiedMemoryManager是如何對內存進行控制的?動態內存是如何實現的呢?
UnifiedMemoryManage繼承了MemoryManager
- // UnifiedMemoryManage.scala
- private[spark] class UnifiedMemoryManager private[memory] (
- conf: SparkConf,
- val maxHeapMemory: Long,
- onHeapStorageRegionSize: Long,
- numCores: Int)
- extends MemoryManager(
- conf,
- numCores,
- onHeapStorageRegionSize,
- maxHeapMemory - onHeapStorageRegionSize) {
重寫了maxOnHeapStorageMemory方法,***Storage內存=***內存-***Execution內存。
- // UnifiedMemoryManage.scala
- override def maxOnHeapStorageMemory: Long = synchronized {
- maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
- }
核心方法acquireStorageMemory:申請Storage內存。
- // UnifiedMemoryManage.scala
- override def acquireStorageMemory(
- blockId: BlockId,
- numBytes: Long,
- memoryMode: MemoryMode): Boolean = synchronized {
- assertInvariants()
- assert(numBytes >= 0)
- val (executionPool, storagePool, maxMemory) = memoryMode match {
- //根據不同的內存模式去創建StorageMemoryPool和ExecutionMemoryPool
- case MemoryMode.ON_HEAP => (
- onHeapExecutionMemoryPool,
- onHeapStorageMemoryPool,
- maxOnHeapStorageMemory)
- case MemoryMode.OFF_HEAP => (
- offHeapExecutionMemoryPool,
- offHeapStorageMemoryPool,
- maxOffHeapMemory)
- }
- if (numBytes > maxMemory) {
- // 若申請內存大于***內存,則申請失敗
- logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
- s"memory limit ($maxMemory bytes)")
- return false
- }
- if (numBytes > storagePool.memoryFree) {
- // 如果Storage內存池沒有足夠的內存,則向Execution內存池借用
- val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)//當Execution內存有空閑時,Storage才能借到內存
- executionPool.decrementPoolSize(memoryBorrowedFromExecution)//縮小Execution內存
- storagePool.incrementPoolSize(memoryBorrowedFromExecution)//增加Storage內存
- }
- storagePool.acquireMemory(blockId, numBytes)
- }
核心方法acquireExecutionMemory:申請Execution內存。
- // UnifiedMemoryManage.scala
- override private[memory] def acquireExecutionMemory(
- numBytes: Long,
- taskAttemptId: Long,
- memoryMode: MemoryMode): Long = synchronized {//使用了synchronized關鍵字,調用acquireExecutionMemory方法可能會阻塞,直到Execution內存池有足夠的內存。
- ...
- executionPool.acquireMemory(
- numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
- }
方法***調用了ExecutionMemoryPool的acquireMemory方法,該方法的參數需要兩個函數:maybeGrowExecutionPool()和computeMaxExecutionPoolSize()。
每個Task能夠使用的內存被限制在pooSize / (2 * numActiveTask) ~ maxPoolSize / numActiveTasks。其中maxPoolSize代表了execution pool的***內存,poolSize表示當前這個pool的大小。
- // ExecutionMemoryPool.scala
- val maxPoolSize = computeMaxPoolSize()
- val maxMemoryPerTask = maxPoolSize / numActiveTasks
- val minMemoryPerTask = poolSize / (2 * numActiveTasks)
maybeGrowExecutionPool()方法實現了如何動態增加Execution內存區的大小。在每次申請execution內存的同時,execution內存池會進行多次嘗試,每次嘗試都可能會回收一些存儲內存。
- // UnifiedMemoryManage.scala
- def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
- if (extraMemoryNeeded > 0) {//如果申請的內存大于0
- //計算execution可借到的storage內存,是storage剩余內存和可借出內存的***值
- val memoryReclaimableFromStorage = math.max(
- storagePool.memoryFree,
- storagePool.poolSize - storageRegionSize)
- if (memoryReclaimableFromStorage > 0) {//如果可以申請到內存
- val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
- math.min(extraMemoryNeeded, memoryReclaimableFromStorage))//實際需要的內存,取實際需要的內存和storage內存區域全部可用內存大小的最小值
- storagePool.decrementPoolSize(spaceToReclaim)//storage內存區域減少
- executionPool.incrementPoolSize(spaceToReclaim)//execution內存區域增加
- }
- }
- }