騰訊面試:Spark 內存如何優化?包含哪幾個方面?
一、Spark內存架構概述
Apache Spark作為一個高效的分布式計算引擎,其性能很大程度上取決于內存的使用效率。本文將詳細介紹Spark內存管理機制及優化策略,包括緩存優化、內存配置、狀態存儲優化等方面及并給出相應的樣例代碼。
Spark的內存管理分為執行內存(Execution Memory)和存儲內存(Storage Memory)兩大部分:
- 執行內存:用于shuffle、join、sort等計算操作
- 存儲內存:用于緩存RDD、DataFrame和廣播變量
在Spark 1.6之前,這兩部分內存是靜態分配的,而在Spark 1.6及以后版本中,采用了統一內存管理(Unified Memory Management),允許兩種類型的內存相互借用。
二、緩存優化策略
1. 緩存級別選擇
Spark提供了多種緩存級別,可以通過persist()或cache()方法設置:
以下是一個使用不同存儲級別的示例:
from
pyspark.storagelevel
import
StorageLevel
# 默認緩存級別 MEMORY_AND_DISK_DESER
df.cache()
# 僅使用內存
df.persist(StorageLevel.MEMORY_ONLY)
# 僅使用磁盤
df.persist(StorageLevel.DISK_ONLY)
# 使用內存和磁盤,序列化存儲
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
# 使用堆外內存
df.persist(StorageLevel.OFF_HEAP)
2. 列式存儲優化
Spark SQL使用列式存儲格式來緩存數據,這種方式比行式存儲更節省內存,并且支持壓縮。
列式存儲的主要優勢:
- 更高的壓縮率:相同類型的數據放在一起,壓縮效率更高
- 謂詞下推:可以只讀取查詢所需的列
- 向量化處理:支持批量處理,提高CPU效率
以下配置可以優化列式緩存:
// 啟用列式緩存壓縮
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)
// 設置批處理大小
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 10000)
// 啟用向量化讀取
spark.conf.set("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", true)
3. 堆外內存使用
Spark支持使用堆外內存(Off-heap Memory)來存儲數據,減少GC壓力:
啟用堆外內存的配置:
// 啟用堆外內存
spark.conf.set("spark.memory.offHeap.enabled", true)
// 設置堆外內存大小(字節)
spark.conf.set("spark.memory.offHeap.size", "10g")
// 啟用列向量堆外內存
spark.conf.set("spark.sql.columnVector.offheap.enabled", true)
三、內存配置優化
1. 執行器內存配置
合理配置執行器內存是優化Spark性能的關鍵:
// 設置執行器內存
spark.conf.set("spark.executor.memory", "8g")
// 設置內存開銷因子
spark.conf.set("spark.executor.memoryOverheadFactor", "0.1")
// 設置執行器核心數
spark.conf.set("spark.executor.cores", "4")
2. 內存分配比例調整
調整執行內存和存儲內存的比例:
// 設置存儲內存占比(默認0.5,即50%)
spark.conf.set("spark.memory.storageFraction", "0.4")
3. 動態內存管理
Spark 1.6引入的統一內存管理允許執行內存和存儲內存動態共享:
// 啟用動態內存分配(默認開啟)
spark.conf.set("spark.memory.useLegacyMode", false)
四、Shuffle優化
Shuffle是Spark中最消耗內存和磁盤I/O的操作之一。
1. Shuffle內存占比
// 設置shuffle內存占比
spark.conf.set("spark.shuffle.memoryFraction", "0.2")
2. Shuffle合并
// 啟用shuffle文件合并
spark.conf.set("spark.shuffle.consolidateFiles", true)
3. Shuffle溢出優化
// 設置溢出前內存中排序的條目數
spark.conf.set("spark.shuffle.sort.bypassMergeThreshold", 200)
五、狀態存儲優化
對于Structured Streaming等有狀態操作,內存優化尤為重要。
1. RocksDB狀態存儲
Spark 3.2引入了RocksDB狀態存儲實現,可以有效減少JVM GC壓力:
啟用RocksDB狀態存儲:
// 設置狀態存儲提供者為RocksDB
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass"
, "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)
2. RocksDB內存管理
RocksDB提供了內存使用限制功能,避免OOM問題:
配置RocksDB內存限制:
// 啟用RocksDB內存限制
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage", true)
// 設置最大內存使用量(MB)
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB", 1000)
// 設置寫緩沖區占比
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio", 0.4)
// 設置高優先級池占比
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio", 0.1)
3. 狀態存儲優化示例
以下是一個使用RocksDB狀態存儲的Structured Streaming示例:
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
// 配置Spark Session
val spark = SparkSession.builder()
.appName("StatefulStreamingWithMemoryOptimization")
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
.config("spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage", true)
.config("spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB", 500)
.getOrCreate()
// 創建輸入流
val inputStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.load()
// 解析并處理數據
val processedStream = inputStream
.selectExpr("CAST(value AS STRING)")
.as[String]
.flatMap(_.split(" "))
.groupBy("value")
.count()
// 輸出結果
val query = processedStream.writeStream
.outputMode("update")
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
query.awaitTermination()
六、內存泄漏檢測與處理
Spark提供了內存泄漏檢測機制:
啟用內存泄漏檢測:
// 內存泄漏時拋出異常
spark.conf.set("spark.unsafe.exceptionOnMemoryLeak", true)
七、實際案例分析與優化
1. 數據傾斜處理
數據傾斜會導致某些執行器內存壓力過大。解決方案:
// 示例:處理傾斜的join
// 1. 識別傾斜鍵
val skewedKeys = df1.groupBy("key").count().filter("count > 1000").select("key")
// 2. 對傾斜鍵進行特殊處理
val skewedKeysBroadcast = spark.sparkContext.broadcast(skewedKeys.collect().map(_.getString(0)).toSet)
// 3. 將數據分為傾斜和非傾斜部分
val dfSkewed = df1.filter(r => skewedKeysBroadcast.value.contains(r.getString(0)))
val dfNormal = df1.filter(r => !skewedKeysBroadcast.value.contains(r.getString(0)))
// 4. 對傾斜部分進行加鹽處理
val saltedDfSkewed = dfSkewed.withColumn("salt", (rand() * 10).cast("int"))
.withColumn("salted_key", concat($"key", lit("_"), $"salt"))
val saltedDf2 = df2.join(skewedKeys, "key")
.withColumn("salt", explode(array((0 until 10).map(lit): _*)))
.withColumn("salted_key", concat($"key", lit("_"), $"salt"))
// 5. 分別join并合并結果
val joinSkewed = saltedDfSkewed.join(saltedDf2, "salted_key").drop("salt", "salted_key")
val joinNormal = dfNormal.join(df2, "key")
val result = joinSkewed.union(joinNormal)
2. 緩存優化實例
以下是一個優化DataFrame緩存的實例:
import org.apache.spark.storage.StorageLevel
// 創建測試數據
val df = spark.range(0, 1000000)
.withColumn("square", $"id" * $"id")
.withColumn("cube", $"square" * $"id")
// 1. 基準測試 - 不使用緩存
val t1 = System.nanoTime()
val count1 = df.filter($"square" > 1000).count()
val count2 = df.filter($"cube" > 10000).count()
val duration1 = (System.nanoTime() - t1) / 1e9d
println(s"未緩存執行時間: $duration1 秒")
// 2. 使用默認緩存
df.cache()
df.count() // 觸發緩存
val t2 = System.nanoTime()
val count3 = df.filter($"square" > 1000).count()
val count4 = df.filter($"cube" > 10000).count()
val duration2 = (System.nanoTime() - t2) / 1e9d
println(s"默認緩存執行時間: $duration2 秒")
df.unpersist()
// 3. 使用序列化緩存
df.persist(StorageLevel.MEMORY_ONLY_SER)
df.count() // 觸發緩存
val t3 = System.nanoTime()
val count5 = df.filter($"square" > 1000).count()
val count6 = df.filter($"cube" > 10000).count()
val duration3 = (System.nanoTime() - t3) / 1e9d
println(s"序列化緩存執行時間: $duration3 秒")
df.unpersist()
// 4. 使用列式緩存(默認已啟用)
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 20000)
df.cache()
df.count() // 觸發緩存
val t4 = System.nanoTime()
val count7 = df.filter($"square" > 1000).count()
val count8 = df.filter($"cube" > 10000).count()
val duration4 = (System.nanoTime() - t4) / 1e9d
println(s"優化列式緩存執行時間: $duration4 秒")
df.unpersist()
3. 內存監控與調優
// 創建監控函數
def monitorMemory(sc: SparkContext): Unit = {
val executorMemoryInfo = sc.getExecutorMemoryStatus
println("==== 內存使用情況 ====")
executorMemoryInfo.foreach { case (executorId, (usedMem, maxMem)) =>
println(s"執行器 $executorId: 已用內存 ${usedMem / 1024 / 1024}MB, 最大內存 ${maxMem / 1024 / 1024}MB")
}
// 打印存儲內存使用情況
println("==== 緩存塊信息 ====")
sc.getRDDStorageInfo.foreach { info =>
println(s"RDD: ${info.name}, 分區數: ${info.numPartitions}, " +
s"緩存級別: ${info.storageLevel}, 內存使用: ${info.memoryUsed / 1024 / 1024}MB")
}
}
// 定期監控內存使用情況
val monitorThread = new Thread(() => {
while (true) {
monitorMemory(spark.sparkContext)
Thread.sleep(10000) // 每10秒監控一次
}
})
monitorThread.setDaemon(true)
monitorThread.start()
八、自適應查詢執行(AQE)內存優化
Spark 3.0引入的自適應查詢執行(Adaptive Query Execution)可以根據運行時統計信息動態調整執行計劃,對內存使用也有積極影響。
1. 啟用AQE
// 啟用自適應查詢執行
spark.conf.set("spark.sql.adaptive.enabled", true)
// 設置合并shuffle分區的目標大小
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m")
// 啟用shuffle分區合并
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", true)
// 設置合并后的最小分區大小
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1m")
2. AQE內存優化示例
// 創建測試數據
val largeDF = spark.range(0, 10000000)
.withColumn("key", $"id" % 1000)
.withColumn("value", rand() * 100)
// 設置較大的初始shuffle分區數
spark.conf.set("spark.sql.shuffle.partitions", 200)
// 執行聚合查詢
val result = largeDF.groupBy("key")
.agg(
avg("value").as("avg_value"),
max("value").as("max_value"),
min("value").as("min_value")
)
.filter($"avg_value" > 50)
// 查看執行計劃
result.explain(true)
// 執行查詢并觀察實際使用的分區數
result.collect()
九、列式存儲與壓縮優化
Spark SQL的列式存儲是內存優化的重要手段,通過壓縮和編碼技術可以顯著減少內存使用。
1. 列式存儲優化示例
// 啟用列式存儲壓縮
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)
// 設置批處理大小
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 10000)
// 創建測試數據
val wideDF = spark.range(0, 100000)
.withColumn("col1", $"id" % 100)
.withColumn("col2", $"id" * 2)
.withColumn("col3", $"id" * 3)
.withColumn("col4", $"id" * 4)
.withColumn("col5", $"id" * 5)
.withColumn("col6", concat(lit("value-"), $"id".cast("string")))
// 緩存數據
wideDF.cache()
wideDF.count() // 觸發緩存
// 查看緩存統計信息
println(s"列式緩存內存使用: ${spark.sparkContext.getRDDStorageInfo.filter(_.id == wideDF.rdd.id).map(_.memoryUsed).sum / 1024 / 1024}MB")
// 執行查詢
val result = wideDF.filter($"col1" < 10).select("id", "col1", "col6")
result.explain()
result.show(5)
十、堆外內存優化
堆外內存(Off-heap Memory)是減輕GC壓力的有效方法,特別適合大數據量處理。
1. 堆外內存示例
// 啟用堆外內存
spark.conf.set("spark.memory.offHeap.enabled", true)
spark.conf.set("spark.memory.offHeap.size", "4g")
// 啟用列向量堆外內存
spark.conf.set("spark.sql.columnVector.offheap.enabled", true)
// 創建測試數據
val largeDF = spark.range(0, 10000000)
.withColumn("value", rand() * 1000)
// 執行聚合操作
val result = largeDF.groupBy($"id" % 100 as "key")
.agg(sum("value") as "sum_value")
.orderBy("key")
// 查看執行計劃
result.explain()
// 執行查詢
result.show()
十一、Structured Streaming內存優化
對于Structured Streaming應用,狀態管理是內存優化的關鍵。
Structured Streaming優化示例:
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
// 配置Spark Session
val spark = SparkSession.builder()
.appName("StreamingMemoryOptimization")
// 啟用RocksDB狀態存儲
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
// 啟用RocksDB內存限制
.config("spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage", true)
.config("spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB", 1000)
.config("spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio", 0.4)
.config("spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio", 0.1)
// 啟用RocksDB壓縮
.config("spark.sql.streaming.stateStore.rocksdb.compression", "lz4")
// 啟用RocksDB變更日志檢查點
.config("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", true)
.getOrCreate()
// 創建輸入流
val inputStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.option("startingOffsets", "latest")
.load()
// 解析JSON數據
val parsedStream = inputStream
.selectExpr("CAST(value AS STRING) as json")
.select(from_json($"json",
"id STRING, timestamp LONG, value DOUBLE").as("data"))
.select("data.*")
.withColumn("event_time",
to_timestamp($"timestamp" / 1000))
.withWatermark("event_time", "10 minutes")
// 執行窗口聚合
val windowedAggregation = parsedStream
.groupBy(
window($"event_time", "5 minutes", "1 minute"),
$"id"
)
.agg(
avg("value").as("avg_value"),
count("*").as("event_count")
)
.select(
$"window.start".as("window_start"),
$"window.end".as("window_end"),
$"id",
$"avg_value",
$"event_count"
)
// 輸出結果
val query = windowedAggregation.writeStream
.outputMode("update")
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("1 minute"))
.start()
十二、內存泄漏檢測與處理
Spark提供了內存泄漏檢測機制,可以幫助識別和解決內存問題。
內存泄漏監控示例:
// 啟用內存泄漏檢測
spark.conf.set("spark.unsafe.exceptionOnMemoryLeak", true)
// 創建自定義累加器監控內存使用
val memoryLeakMonitor = spark.sparkContext.longAccumulator("MemoryLeakMonitor")
// 創建可能導致內存泄漏的函數
def processWithPotentialLeak(df: DataFrame): DataFrame = {
// 模擬處理邏輯
val result = df.mapPartitions { iter =>
// 記錄處理前內存
val runtime = Runtime.getRuntime
val beforeMem = runtime.totalMemory() - runtime.freeMemory()
// 處理數據
val resultIter = iter.map(row => {
// 處理邏輯
row
})
// 記錄處理后內存
val afterMem = runtime.totalMemory() - runtime.freeMemory()
memoryLeakMonitor.add(afterMem - beforeMem)
resultIter
}
result
}
// 使用監控函數處理數據
val df = spark.range(0, 1000000).withColumn("value", rand())
val processed = processWithPotentialLeak(df)
processed.count()
// 檢查累加器值
println(s"內存增長: ${memoryLeakMonitor.value} 字節")
十三、綜合優化案例
以下是一個綜合應用多種內存優化技術的實際案例。
1. 大規模數據處理優化
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
// 配置Spark Session
val spark = SparkSession.builder()
.appName("ComprehensiveMemoryOptimization")
// 啟用自適應查詢執行
.config("spark.sql.adaptive.enabled", true)
.config("spark.sql.adaptive.coalescePartitions.enabled", true)
// 啟用堆外內存
.config("spark.memory.offHeap.enabled", true)
.config("spark.memory.offHeap.size", "8g")
.config("spark.sql.columnVector.offheap.enabled", true)
// 列式存儲優化
.config("spark.sql.inMemoryColumnarStorage.compressed", true)
.config("spark.sql.inMemoryColumnarStorage.batchSize", 20000)
.getOrCreate()
// 讀取大規模數據
val rawData = spark.read
.format("parquet")
.load("/path/to/large/dataset")
// 數據預處理
val processedData = rawData
.filter($"value" > 0)
.withColumn("category", when($"value" < 100, "low")
.when($"value" < 500, "medium")
.otherwise("high"))
.withColumn("date", to_date($"timestamp"))
// 使用優化的存儲級別緩存
processedData.persist(StorageLevel.MEMORY_AND_DISK_SER)
processedData.count() // 觸發緩存
// 檢測數據傾斜
val keyDistribution = processedData
.groupBy("key")
.count()
.cache()
val maxCount = keyDistribution.agg(max("count")).first().getLong(0)
val avgCount = keyDistribution.agg(avg("count")).first().getDouble(0)
println(s"最大鍵計數: $maxCount, 平均鍵計數: $avgCount, 傾斜比例: ${maxCount / avgCount}")
// 處理傾斜鍵
val skewThreshold = avgCount * 5
val skewedKeys = keyDistribution
.filter($"count" > skewThreshold)
.select("key")
.collect()
.map(_.getString(0))
.toSet
val skewedKeysBroadcast = spark.sparkContext.broadcast(skewedKeys)
// 分離傾斜和非傾斜數據
val skewedData = processedData
.filter(row => skewedKeysBroadcast.value.contains(row.getAs[String]("key")))
// 對兩部分數據分別進行聚合
val skewedAggregated = saltedSkewedData
.groupBy("salted_key")
.agg(
sum("value").as("sum_value"),
count("*").as("count")
)
.withColumn("key", split($"salted_key", "_").getItem(0))
.drop("salted_key")
.groupBy("key")
.agg(
sum("sum_value").as("sum_value"),
sum("count").as("count")
)
val normalAggregated = normalData
.groupBy("key")
.agg(
sum("value").as("sum_value"),
count("*").as("count")
)
// 合并結果
val finalResult = skewedAggregated.union(normalAggregated)
2. 廣播變量優化
廣播變量可以有效減少數據傳輸和內存使用:
// 創建一個大型查找表
val lookupTable = spark.range(0, 100000)
.withColumn("value", rand() * 1000)
.collect()
.map(row => (row.getLong(0), row.getDouble(1)))
.toMap
// 廣播查找表
val broadcastLookupTable = spark.sparkContext.broadcast(lookupTable)
// 使用廣播變量進行查找
val result = spark.range(0, 1000000)
.withColumn("key", $"id" % 100000)
.mapPartitions { iter =>
val lookup = broadcastLookupTable.value
iter.map { row =>
val id = row.getLong(0)
val key = row.getLong(1)
val value = lookup.getOrElse(key, 0.0)
(id, key, value)
}
}
十四、內存調優優秀實踐
1. 內存配置原則
- 合理設置執行器內存:根據集群節點內存和并行度設置合適的執行器內存
- 避免過度分配:每個執行器內存不宜過大,以免GC時間過長
- 預留系統開銷:為操作系統和其他進程預留足夠內存
- 調整存儲與執行內存比例:根據應用特點調整存儲內存和執行內存的比例
// 示例:4節點集群,每節點64GB內存,16核
// 設置每個執行器使用4核
spark.conf.set("spark.executor.cores", "4")
// 每節點運行3個執行器,每執行器約16GB內存
spark.conf.set("spark.executor.memory", "16g")
// 設置內存開銷因子
spark.conf.set("spark.executor.memoryOverhead", "2g")
// 調整存儲內存比例
spark.conf.set("spark.memory.storageFraction", "0.4")
2. 緩存策略選擇
根據數據特點和查詢模式選擇合適的緩存策略:
數據特點 | 推薦緩存策略 |
高頻訪問,內存充足 | MEMORY_ONLY |
高頻訪問,內存有限 | MEMORY_ONLY_SER |
數據量大,查詢少 | MEMORY_AND_DISK_SER |
數據量極大,內存緊張 | OFF_HEAP |
// 示例:根據數據大小選擇緩存策略
def smartCache(df: DataFrame): DataFrame = {
val sizeEstimate = SparkContext.getActive.get.estimateRDDSize(df.rdd)
val availableMemory = SparkContext.getActive.get.getExecutorMemoryStatus
.map(_._2._2).sum * 0.6 // 可用內存的60%
if (sizeEstimate < availableMemory * 0.5) {
// 數據較小,使用MEMORY_ONLY
df.persist(StorageLevel.MEMORY_ONLY)
} else if (sizeEstimate < availableMemory * 0.8) {
// 數據中等,使用MEMORY_ONLY_SER
df.persist(StorageLevel.MEMORY_ONLY_SER)
} else {
// 數據較大,使用MEMORY_AND_DISK_SER
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
}
df
}
十五、高級內存優化技術
1. 列裁剪和謂詞下推
列裁剪和謂詞下推可以減少處理的數據量,從而降低內存使用:
// 啟用列裁剪和謂詞下推
spark.conf.set("spark.sql.optimizer.columnPruning.enabled", true)
spark.conf.set("spark.sql.optimizer.nestedPredicatePushdown.enabled", true)
// 示例:只選擇需要的列并盡早過濾
val optimizedQuery = spark.table("large_table")
.select("id", "name", "value") // 列裁剪
.filter($"value" > 100) // 謂詞下推
.join(spark.table("small_table").select("id", "category"), "id")
2. 分區修剪
分區修剪可以減少讀取的數據量:
// 啟用動態分區修剪
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", true)
// 示例:使用分區字段進行過濾
val result = spark.table("partitioned_table")
.filter($"date" >= "2023-01-01" && $"date" <= "2023-01-31")
.join(spark.table("dimension_table"), "id")
3. 內存使用監控與調優
定期監控內存使用情況,及時調整配置:
// 創建內存使用監控函數
def monitorMemoryUsage(sc: SparkContext): Unit = {
// 獲取執行器內存狀態
val executorMemoryStatus = sc.getExecutorMemoryStatus
// 計算總內存和已用內存
val totalMem = executorMemoryStatus.map(_._2._2).sum
val usedMem = executorMemoryStatus.map(_._2._1).sum
// 計算內存使用率
val memoryUtilization = usedMem.toDouble / totalMem
println(s"內存使用率: ${memoryUtilization * 100}%")
println(s"已用內存: ${usedMem / 1024 / 1024} MB")
println(s"總內存: ${totalMem / 1024 / 1024} MB")
// 獲取存儲內存使用情況
val storageMemoryUsed = sc.getRDDStorageInfo.map(_.memoryUsed).sum
println(s"存儲內存使用: ${storageMemoryUsed / 1024 / 1024} MB")
// 檢查是否需要調整配置
if (memoryUtilization > 0.85) {
println("警告:內存使用率過高,考慮增加執行器內存或減少并行度")
}
if (storageMemoryUsed > usedMem * 0.7) {
println("警告:存儲內存占比過高,考慮調整存儲內存比例或減少緩存數據量")
}
}
// 定期執行監控
val monitoringThread = new Thread(() => {
while (true) {
try {
monitorMemoryUsage(spark.sparkContext)
Thread.sleep(60000) // 每分鐘監控一次
} catch {
case e: Exception => println(s"監控異常: ${e.getMessage}")
}
}
})
monitoringThread.setDaemon(true)
monitoringThread.start()
十六、特定場景的內存優化
1. 機器學習應用優化
機器學習應用通常需要處理大量特征和模型參數:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.RandomForestClassifier
// 啟用ML優化配置
spark.conf.set("spark.sql.shuffle.partitions", 200)
spark.conf.set("spark.memory.offHeap.enabled", true)
spark.conf.set("spark.memory.offHeap.size", "4g")
// 讀取數據
val data = spark.read.parquet("/path/to/features")
// 特征工程
val featureCols = data.columns.filter(_ != "label")
val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")
// 使用列式存儲優化特征數據
val assembled = assembler.transform(data)
.select("features", "label")
assembled.cache()
assembled.count()
// 訓練模型
val rf = new RandomForestClassifier()
.setNumTrees(100)
.setMaxDepth(10)
.setFeatureSubsetStrategy("sqrt")
// 使用checkpoint減少RDD依賴鏈
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
val model = rf.fit(assembled)
2. 圖計算優化
圖計算應用通常需要處理大量頂點和邊的數據:
import org.apache.spark.graphx._
import org.apache.spark.storage.StorageLevel
// 配置圖計算優化
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
// 創建頂點和邊
val vertices = spark.sparkContext.parallelize(
(1L to 1000000L).map(id => (id, Map("name" -> s"vertex_$id")))
)
val edges = spark.sparkContext.parallelize(
(1L to 2000000L).map { i =>
val src = scala.util.Random.nextInt(1000000) + 1L
val dst = scala.util.Random.nextInt(1000000) + 1L
Edge(src, dst, 1.0)
}
)
// 創建圖并使用優化的存儲級別
val graph = Graph(vertices, edges, Map.empty[String, Any],
StorageLevel.MEMORY_AND_DISK_SER,
StorageLevel.MEMORY_AND_DISK_SER)
// 緩存圖
graph.cache()
graph.vertices.count()
graph.edges.count()
// 執行PageRank算法
val ranks = graph.pageRank(0.0001).vertices
十七、總結與優秀實踐
1. 內存優化核心原則
- 了解應用特點:分析應用的數據量、計算模式和內存需求
- 合理配置資源:根據集群規模和應用特點配置執行器數量和內存
- 選擇適當的緩存策略:根據數據特點選擇合適的存儲級別
- 利用列式存儲和壓縮:減少內存占用,提高查詢效率
- 使用堆外內存:減輕GC壓力,提高大數據處理能力
- 監控和調優:定期監控內存使用情況,及時調整配置
2. 常見問題及解決方案
問題 | 解決方案 |
OOM錯誤 | 增加執行器內存、減少并行度、使用序列化緩存 |
GC時間過長 | 使用堆外內存、調整GC策略、減少執行器內存大小 |
數據傾斜 | 加鹽處理、拆分任務、使用AQE |
緩存效率低 | 調整批處理大小、啟用壓縮、使用列式存儲 |
Shuffle溢出 | 增加shuffle內存比例、調整分區數、使用AQE |