Apache Spark 統(tǒng)一內(nèi)存管理模型詳解
本文將對 Spark 的內(nèi)存管理模型進(jìn)行分析,下面的分析全部是基于 Apache Spark 2.2.1 進(jìn)行的。為了讓下面的文章看起來不枯燥,我不打算貼出代碼層面的東西。文章僅對統(tǒng)一內(nèi)存管理模塊(UnifiedMemoryManager)進(jìn)行分析。
我們都知道 Spark 能夠有效的利用內(nèi)存并進(jìn)行分布式計算,其內(nèi)存管理模塊在整個系統(tǒng)中扮演著非常重要的角色。為了更好地利用 Spark,深入地理解其內(nèi)存管理模型具有非常重要的意義,這有助于我們對 Spark 進(jìn)行更好的調(diào)優(yōu);在出現(xiàn)各種內(nèi)存問題時,能夠摸清頭腦,找到哪塊內(nèi)存區(qū)域出現(xiàn)問題。下文介紹的內(nèi)存模型全部指 Executor 端的內(nèi)存模型, Driver 端的內(nèi)存模型本文不做介紹。統(tǒng)一內(nèi)存管理模塊包括了堆內(nèi)內(nèi)存(On-heap Memory)和堆外內(nèi)存(Off-heap Memory)兩大區(qū)域,下面對這兩塊區(qū)域進(jìn)行詳細(xì)的說明。
堆內(nèi)內(nèi)存(On-heap Memory)
默認(rèn)情況下,Spark 僅僅使用了堆內(nèi)內(nèi)存。Executor 端的堆內(nèi)內(nèi)存區(qū)域大致可以分為以下四大塊:
- Execution 內(nèi)存:主要用于存放 Shuffle、Join、Sort、Aggregation 等計算過程中的臨時數(shù)據(jù)
- Storage 內(nèi)存:主要用于存儲 spark 的 cache 數(shù)據(jù),例如RDD的緩存、unroll數(shù)據(jù);
- 用戶內(nèi)存(User Memory):主要用于存儲 RDD 轉(zhuǎn)換操作所需要的數(shù)據(jù),例如 RDD 依賴等信息。
- 預(yù)留內(nèi)存(Reserved Memory):系統(tǒng)預(yù)留內(nèi)存,會用來存儲Spark內(nèi)部對象。
整個 Executor 端堆內(nèi)內(nèi)存如果用圖來表示的話,可以概括如下:
如果想及時了解Spark、Hadoop或者Hbase相關(guān)的文章,歡迎關(guān)注微信公共帳號:iteblog_hadoop
我們對上圖進(jìn)行以下說明:
- systemMemory = Runtime.getRuntime.maxMemory,其實(shí)就是通過參數(shù) spark.executor.memory 或 --executor-memory 配置的。
- reservedMemory 在 Spark 2.2.1 中是寫死的,其值等于 300MB,這個值是不能修改的(如果在測試環(huán)境下,我們可以通過 spark.testing.reservedMemory 參數(shù)進(jìn)行修改);
- usableMemory = systemMemory - reservedMemory,這個就是 Spark 可用內(nèi)存。
堆外內(nèi)存(Off-heap Memory)
Spark 1.6 開始引入了Off-heap memory(詳見SPARK-11389)。這種模式不在 JVM 內(nèi)申請內(nèi)存,而是調(diào)用 Java 的 unsafe 相關(guān) API 進(jìn)行諸如 C 語言里面的 malloc() 直接向操作系統(tǒng)申請內(nèi)存,由于這種方式不進(jìn)過 JVM 內(nèi)存管理,所以可以避免頻繁的 GC,這種內(nèi)存申請的缺點(diǎn)是必須自己編寫內(nèi)存申請和釋放的邏輯。
默認(rèn)情況下,堆外內(nèi)存是關(guān)閉的,我們可以通過 spark.memory.offHeap.enabled 參數(shù)啟用,并且通過 spark.memory.offHeap.size 設(shè)置堆外內(nèi)存大小,單位為字節(jié)。如果堆外內(nèi)存被啟用,那么 Executor 內(nèi)將同時存在堆內(nèi)和堆外內(nèi)存,兩者的使用互補(bǔ)影響,這個時候 Executor 中的 Execution 內(nèi)存是堆內(nèi)的 Execution 內(nèi)存和堆外的 Execution 內(nèi)存之和,同理,Storage 內(nèi)存也一樣。相比堆內(nèi)內(nèi)存,堆外內(nèi)存只區(qū)分 Execution 內(nèi)存和 Storage 內(nèi)存,其內(nèi)存分布如下圖所示:
如果想及時了解Spark、Hadoop或者Hbase相關(guān)的文章,歡迎關(guān)注微信公共帳號:iteblog_hadoop
Execution 內(nèi)存和 Storage 內(nèi)存動態(tài)調(diào)整
細(xì)心的同學(xué)肯定看到上面兩張圖中的 Execution 內(nèi)存和 Storage 內(nèi)存之間存在一條虛線,這是為什么呢?
用過 Spark 的同學(xué)應(yīng)該知道,在 Spark 1.5 之前,Execution 內(nèi)存和 Storage 內(nèi)存分配是靜態(tài)的,換句話說就是如果 Execution 內(nèi)存不足,即使 Storage 內(nèi)存有很大空閑程序也是無法利用到的;反之亦然。這就導(dǎo)致我們很難進(jìn)行內(nèi)存的調(diào)優(yōu)工作,我們必須非常清楚地了解 Execution 和 Storage 兩塊區(qū)域的內(nèi)存分布。而目前 Execution 內(nèi)存和 Storage 內(nèi)存可以互相共享的。也就是說,如果 Execution 內(nèi)存不足,而 Storage 內(nèi)存有空閑,那么 Execution 可以從 Storage 中申請空間;反之亦然。所以上圖中的虛線代表 Execution 內(nèi)存和 Storage 內(nèi)存是可以隨著運(yùn)作動態(tài)調(diào)整的,這樣可以有效地利用內(nèi)存資源。Execution 內(nèi)存和 Storage 內(nèi)存之間的動態(tài)調(diào)整可以概括如下:
如果想及時了解Spark、Hadoop或者Hbase相關(guān)的文章,歡迎關(guān)注微信公共帳號:iteblog_hadoop
具體的實(shí)現(xiàn)邏輯如下:
- 程序提交的時候我們都會設(shè)定基本的 Execution 內(nèi)存和 Storage 內(nèi)存區(qū)域(通過 spark.memory.storageFraction參數(shù)設(shè)置);
- 在程序運(yùn)行時,如果雙方的空間都不足時,則存儲到硬盤;將內(nèi)存中的塊存儲到磁盤的策略是按照 LRU 規(guī)則進(jìn)行的。若己方空間不足而對方空余時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)
- Execution 內(nèi)存的空間被對方占用后,可讓對方將占用的部分轉(zhuǎn)存到硬盤,然后"歸還"借用的空間
- Storage 內(nèi)存的空間被對方占用后,目前的實(shí)現(xiàn)是無法讓對方"歸還",因?yàn)樾枰紤] Shuffle 過程中的很多因素,實(shí)現(xiàn)起來較為復(fù)雜;而且 Shuffle 過程產(chǎn)生的文件在后面一定會被使用到,而 Cache 在內(nèi)存的數(shù)據(jù)不一定在后面使用。
注意,上面說的借用對方的內(nèi)存需要借用方和被借用方的內(nèi)存類型都一樣,都是堆內(nèi)內(nèi)存或者都是堆外內(nèi)存,不存在堆內(nèi)內(nèi)存不夠去借用堆外內(nèi)存的空間。
Task 之間內(nèi)存分布
為了更好地使用使用內(nèi)存,Executor 內(nèi)運(yùn)行的 Task 之間共享著 Execution 內(nèi)存。具體的,Spark 內(nèi)部維護(hù)了一個 HashMap 用于記錄每個 Task 占用的內(nèi)存。當(dāng) Task 需要在 Execution 內(nèi)存區(qū)域申請 numBytes 內(nèi)存,其先判斷 HashMap 里面是否維護(hù)著這個 Task 的內(nèi)存使用情況,如果沒有,則將這個 Task 內(nèi)存使用置為0,并且以 TaskId 為 key,內(nèi)存使用為 value 加入到 HashMap 里面。之后為這個 Task 申請 numBytes 內(nèi)存,如果 Execution 內(nèi)存區(qū)域正好有大于 numBytes 的空閑內(nèi)存,則在 HashMap 里面將當(dāng)前 Task 使用的內(nèi)存加上 numBytes,然后返回;如果當(dāng)前 Execution 內(nèi)存區(qū)域無法申請到每個 Task 最小可申請的內(nèi)存,則當(dāng)前 Task 被阻塞,直到有其他任務(wù)釋放了足夠的執(zhí)行內(nèi)存,該任務(wù)才可以被喚醒。每個 Task 可以使用 Execution 內(nèi)存大小范圍為 1/2N ~ 1/N,其中 N 為當(dāng)前 Executor 內(nèi)正在運(yùn)行的 Task 個數(shù)。
比如如果 Execution 內(nèi)存大小為 10GB,當(dāng)前 Executor 內(nèi)正在運(yùn)行的 Task 個數(shù)為5,則該 Task 可以申請的內(nèi)存范圍為 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的范圍。
一個示例
為了更好的理解上面堆內(nèi)內(nèi)存和堆外內(nèi)存的使用情況,這里給出一個簡單的例子。
只用了堆內(nèi)內(nèi)存
現(xiàn)在我們提交的 Spark 作業(yè)關(guān)于內(nèi)存的配置如下:
--executor-memory 18g
由于沒有設(shè)置 spark.memory.fraction 和 spark.memory.storageFraction 參數(shù),我們可以看到 Spark UI 關(guān)于 Storage Memory 的顯示如下:
如果想及時了解Spark、Hadoop或者Hbase相關(guān)的文章,歡迎關(guān)注微信公共帳號:iteblog_hadoop
上圖很清楚地看到 Storage Memory 的可用內(nèi)存是 10.1GB,這個數(shù)是咋來的呢?根據(jù)前面的規(guī)則,我們可以得出以下的計算:
現(xiàn)在終于對上了。
具體將字節(jié)轉(zhuǎn)換成 GB 的計算邏輯如下(core 模塊下面的 /core/src/main/resources/org/apache/spark/ui/static/utils.js):
用了堆內(nèi)和堆外內(nèi)存
現(xiàn)在如果我們啟用了堆外內(nèi)存,情況咋樣呢?我們的內(nèi)存相關(guān)配置如下: