成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

騰訊面試:Spark 內存如何優化?包含哪幾個方面?

大數據
本文將詳細介紹Spark內存管理機制及優化策略,包括緩存優化、內存配置、狀態存儲優化等方面及并給出相應的樣例代碼。

一、Spark內存架構概述

Apache Spark作為一個高效的分布式計算引擎,其性能很大程度上取決于內存的使用效率。本文將詳細介紹Spark內存管理機制及優化策略,包括緩存優化、內存配置、狀態存儲優化等方面及并給出相應的樣例代碼。

Spark的內存管理分為執行內存(Execution Memory)和存儲內存(Storage Memory)兩大部分:

  • 執行內存:用于shuffle、join、sort等計算操作
  • 存儲內存:用于緩存RDD、DataFrame和廣播變量

在Spark 1.6之前,這兩部分內存是靜態分配的,而在Spark 1.6及以后版本中,采用了統一內存管理(Unified Memory Management),允許兩種類型的內存相互借用。

二、緩存優化策略

1. 緩存級別選擇

Spark提供了多種緩存級別,可以通過persist()或cache()方法設置: 

以下是一個使用不同存儲級別的示例:

from
 pyspark.storagelevel 
import
 StorageLevel  
# 默認緩存級別 MEMORY_AND_DISK_DESER  
df.cache()  
# 僅使用內存  
df.persist(StorageLevel.MEMORY_ONLY)  
# 僅使用磁盤  
df.persist(StorageLevel.DISK_ONLY)  
# 使用內存和磁盤,序列化存儲  
df.persist(StorageLevel.MEMORY_AND_DISK_SER)  
# 使用堆外內存  
df.persist(StorageLevel.OFF_HEAP)

2. 列式存儲優化

Spark SQL使用列式存儲格式來緩存數據,這種方式比行式存儲更節省內存,并且支持壓縮。

列式存儲的主要優勢:

  • 更高的壓縮率:相同類型的數據放在一起,壓縮效率更高
  • 謂詞下推:可以只讀取查詢所需的列
  • 向量化處理:支持批量處理,提高CPU效率

以下配置可以優化列式緩存:

// 啟用列式緩存壓縮  
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)  
// 設置批處理大小  
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 10000)  
// 啟用向量化讀取  
spark.conf.set("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", true)

3. 堆外內存使用

Spark支持使用堆外內存(Off-heap Memory)來存儲數據,減少GC壓力: 

啟用堆外內存的配置:

// 啟用堆外內存  
spark.conf.set("spark.memory.offHeap.enabled", true)  
// 設置堆外內存大小(字節)  
spark.conf.set("spark.memory.offHeap.size", "10g")  
// 啟用列向量堆外內存  
spark.conf.set("spark.sql.columnVector.offheap.enabled", true)

三、內存配置優化

1. 執行器內存配置

合理配置執行器內存是優化Spark性能的關鍵:

// 設置執行器內存  
spark.conf.set("spark.executor.memory", "8g")  
// 設置內存開銷因子  
spark.conf.set("spark.executor.memoryOverheadFactor", "0.1")  
// 設置執行器核心數  
spark.conf.set("spark.executor.cores", "4")

2. 內存分配比例調整

調整執行內存和存儲內存的比例:

// 設置存儲內存占比(默認0.5,即50%)  
spark.conf.set("spark.memory.storageFraction", "0.4")

3. 動態內存管理

Spark 1.6引入的統一內存管理允許執行內存和存儲內存動態共享:

// 啟用動態內存分配(默認開啟)  
spark.conf.set("spark.memory.useLegacyMode", false)

四、Shuffle優化

Shuffle是Spark中最消耗內存和磁盤I/O的操作之一。

1. Shuffle內存占比

// 設置shuffle內存占比  
spark.conf.set("spark.shuffle.memoryFraction", "0.2")

2. Shuffle合并

// 啟用shuffle文件合并  
spark.conf.set("spark.shuffle.consolidateFiles", true)

3. Shuffle溢出優化

// 設置溢出前內存中排序的條目數  
spark.conf.set("spark.shuffle.sort.bypassMergeThreshold", 200)

五、狀態存儲優化

對于Structured Streaming等有狀態操作,內存優化尤為重要。

1. RocksDB狀態存儲

Spark 3.2引入了RocksDB狀態存儲實現,可以有效減少JVM GC壓力: 

啟用RocksDB狀態存儲:

// 設置狀態存儲提供者為RocksDB  
spark.conf.set(  
"spark.sql.streaming.stateStore.providerClass"
,   "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)

2. RocksDB內存管理

RocksDB提供了內存使用限制功能,避免OOM問題:

配置RocksDB內存限制:

// 啟用RocksDB內存限制  
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage", true)  
// 設置最大內存使用量(MB)  
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB", 1000)  
// 設置寫緩沖區占比  
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio", 0.4)  
// 設置高優先級池占比  
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio", 0.1)

3. 狀態存儲優化示例

以下是一個使用RocksDB狀態存儲的Structured Streaming示例:

import org.apache.spark.sql.streaming.Trigger  
import org.apache.spark.sql.functions._  


// 配置Spark Session  
val spark = SparkSession.builder()  
  .appName("StatefulStreamingWithMemoryOptimization")  
  .config("spark.sql.streaming.stateStore.providerClass",   
          "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")  
  .config("spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage", true)  
  .config("spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB", 500)  
  .getOrCreate()  


// 創建輸入流  
val inputStream = spark.readStream  
  .format("kafka")  
  .option("kafka.bootstrap.servers", "localhost:9092")  
  .option("subscribe", "input-topic")  
  .load()  


// 解析并處理數據  
val processedStream = inputStream  
  .selectExpr("CAST(value AS STRING)")  
  .as[String]  
  .flatMap(_.split(" "))  
  .groupBy("value")  
  .count()  


// 輸出結果  
val query = processedStream.writeStream  
  .outputMode("update")  
  .format("console")  
  .trigger(Trigger.ProcessingTime("10 seconds"))  
  .start()  


query.awaitTermination()

六、內存泄漏檢測與處理

Spark提供了內存泄漏檢測機制: 

啟用內存泄漏檢測:

// 內存泄漏時拋出異常  
spark.conf.set("spark.unsafe.exceptionOnMemoryLeak", true)

七、實際案例分析與優化

1. 數據傾斜處理

數據傾斜會導致某些執行器內存壓力過大。解決方案:

// 示例:處理傾斜的join  
// 1. 識別傾斜鍵  
val skewedKeys = df1.groupBy("key").count().filter("count > 1000").select("key")  


// 2. 對傾斜鍵進行特殊處理  
val skewedKeysBroadcast = spark.sparkContext.broadcast(skewedKeys.collect().map(_.getString(0)).toSet)  


// 3. 將數據分為傾斜和非傾斜部分  
val dfSkewed = df1.filter(r => skewedKeysBroadcast.value.contains(r.getString(0)))  
val dfNormal = df1.filter(r => !skewedKeysBroadcast.value.contains(r.getString(0)))  


// 4. 對傾斜部分進行加鹽處理  
val saltedDfSkewed = dfSkewed.withColumn("salt", (rand() * 10).cast("int"))  
  .withColumn("salted_key", concat($"key", lit("_"), $"salt"))  


val saltedDf2 = df2.join(skewedKeys, "key")  
  .withColumn("salt", explode(array((0 until 10).map(lit): _*)))  
  .withColumn("salted_key", concat($"key", lit("_"), $"salt"))  


// 5. 分別join并合并結果  
val joinSkewed = saltedDfSkewed.join(saltedDf2, "salted_key").drop("salt", "salted_key")  
val joinNormal = dfNormal.join(df2, "key")  


val result = joinSkewed.union(joinNormal)

2. 緩存優化實例

以下是一個優化DataFrame緩存的實例:

import org.apache.spark.storage.StorageLevel  


// 創建測試數據  
val df = spark.range(0, 1000000)  
  .withColumn("square", $"id" * $"id")  
  .withColumn("cube", $"square" * $"id")  


// 1. 基準測試 - 不使用緩存  
val t1 = System.nanoTime()  
val count1 = df.filter($"square" > 1000).count()  
val count2 = df.filter($"cube" > 10000).count()  
val duration1 = (System.nanoTime() - t1) / 1e9d  
println(s"未緩存執行時間: $duration1 秒")  


// 2. 使用默認緩存  
df.cache()  
df.count() // 觸發緩存  
val t2 = System.nanoTime()  
val count3 = df.filter($"square" > 1000).count()  
val count4 = df.filter($"cube" > 10000).count()  
val duration2 = (System.nanoTime() - t2) / 1e9d  
println(s"默認緩存執行時間: $duration2 秒")  
df.unpersist()  


// 3. 使用序列化緩存  
df.persist(StorageLevel.MEMORY_ONLY_SER)  
df.count() // 觸發緩存  
val t3 = System.nanoTime()  
val count5 = df.filter($"square" > 1000).count()  
val count6 = df.filter($"cube" > 10000).count()  
val duration3 = (System.nanoTime() - t3) / 1e9d  
println(s"序列化緩存執行時間: $duration3 秒")  
df.unpersist()  


// 4. 使用列式緩存(默認已啟用)  
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)  
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 20000)  
df.cache()  
df.count() // 觸發緩存  
val t4 = System.nanoTime()  
val count7 = df.filter($"square" > 1000).count()  
val count8 = df.filter($"cube" > 10000).count()  
val duration4 = (System.nanoTime() - t4) / 1e9d  
println(s"優化列式緩存執行時間: $duration4 秒")  
df.unpersist()

3. 內存監控與調優

// 創建監控函數  
def monitorMemory(sc: SparkContext): Unit = {  
  val executorMemoryInfo = sc.getExecutorMemoryStatus  
  println("==== 內存使用情況 ====")  
  executorMemoryInfo.foreach { case (executorId, (usedMem, maxMem)) =>  
    println(s"執行器 $executorId: 已用內存 ${usedMem / 1024 / 1024}MB, 最大內存 ${maxMem / 1024 / 1024}MB")  
  }  


  // 打印存儲內存使用情況  
  println("==== 緩存塊信息 ====")  
  sc.getRDDStorageInfo.foreach { info =>  
    println(s"RDD: ${info.name}, 分區數: ${info.numPartitions}, " +  
      s"緩存級別: ${info.storageLevel}, 內存使用: ${info.memoryUsed / 1024 / 1024}MB")  
  }  
}  


// 定期監控內存使用情況  
val monitorThread = new Thread(() => {  
  while (true) {  
    monitorMemory(spark.sparkContext)  
    Thread.sleep(10000) // 每10秒監控一次  
  }  
})  
monitorThread.setDaemon(true)  
monitorThread.start()

八、自適應查詢執行(AQE)內存優化

Spark 3.0引入的自適應查詢執行(Adaptive Query Execution)可以根據運行時統計信息動態調整執行計劃,對內存使用也有積極影響。

1. 啟用AQE

// 啟用自適應查詢執行  
spark.conf.set("spark.sql.adaptive.enabled", true)  


// 設置合并shuffle分區的目標大小  
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m")  


// 啟用shuffle分區合并  
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", true)  


// 設置合并后的最小分區大小  
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1m")

2. AQE內存優化示例

// 創建測試數據  
val largeDF = spark.range(0, 10000000)  
  .withColumn("key", $"id" % 1000)  
  .withColumn("value", rand() * 100)  


// 設置較大的初始shuffle分區數  
spark.conf.set("spark.sql.shuffle.partitions", 200)  


// 執行聚合查詢  
val result = largeDF.groupBy("key")  
  .agg(  
    avg("value").as("avg_value"),  
    max("value").as("max_value"),  
    min("value").as("min_value")  
  )  
  .filter($"avg_value" > 50)  


// 查看執行計劃  
result.explain(true)  


// 執行查詢并觀察實際使用的分區數  
result.collect()

九、列式存儲與壓縮優化

Spark SQL的列式存儲是內存優化的重要手段,通過壓縮和編碼技術可以顯著減少內存使用。

1. 列式存儲優化示例

// 啟用列式存儲壓縮  
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)  


// 設置批處理大小  
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 10000)  


// 創建測試數據  
val wideDF = spark.range(0, 100000)  
  .withColumn("col1", $"id" % 100)  
  .withColumn("col2", $"id" * 2)  
  .withColumn("col3", $"id" * 3)  
  .withColumn("col4", $"id" * 4)  
  .withColumn("col5", $"id" * 5)  
  .withColumn("col6", concat(lit("value-"), $"id".cast("string")))  


// 緩存數據  
wideDF.cache()  
wideDF.count() // 觸發緩存  


// 查看緩存統計信息  
println(s"列式緩存內存使用: ${spark.sparkContext.getRDDStorageInfo.filter(_.id == wideDF.rdd.id).map(_.memoryUsed).sum / 1024 / 1024}MB")  


// 執行查詢  
val result = wideDF.filter($"col1" < 10).select("id", "col1", "col6")  
result.explain()  
result.show(5)

十、堆外內存優化

堆外內存(Off-heap Memory)是減輕GC壓力的有效方法,特別適合大數據量處理。

1. 堆外內存示例

// 啟用堆外內存  
spark.conf.set("spark.memory.offHeap.enabled", true)  
spark.conf.set("spark.memory.offHeap.size", "4g")  


// 啟用列向量堆外內存  
spark.conf.set("spark.sql.columnVector.offheap.enabled", true)  


// 創建測試數據  
val largeDF = spark.range(0, 10000000)  
  .withColumn("value", rand() * 1000)  


// 執行聚合操作  
val result = largeDF.groupBy($"id" % 100 as "key")  
  .agg(sum("value") as "sum_value")  
  .orderBy("key")  


// 查看執行計劃  
result.explain()  


// 執行查詢  
result.show()

十一、Structured Streaming內存優化

對于Structured Streaming應用,狀態管理是內存優化的關鍵。

Structured Streaming優化示例:

import org.apache.spark.sql.streaming.Trigger  
import org.apache.spark.sql.functions._  


// 配置Spark Session  
val spark = SparkSession.builder()  
  .appName("StreamingMemoryOptimization")  
  // 啟用RocksDB狀態存儲  
  .config("spark.sql.streaming.stateStore.providerClass",   
          "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")  
  // 啟用RocksDB內存限制  
  .config("spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage", true)  
  .config("spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB", 1000)  
  .config("spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio", 0.4)  
  .config("spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio", 0.1)  
  // 啟用RocksDB壓縮  
  .config("spark.sql.streaming.stateStore.rocksdb.compression", "lz4")  
  // 啟用RocksDB變更日志檢查點  
  .config("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", true)  
  .getOrCreate()  


// 創建輸入流  
val inputStream = spark.readStream  
  .format("kafka")  
  .option("kafka.bootstrap.servers", "localhost:9092")  
  .option("subscribe", "input-topic")  
  .option("startingOffsets", "latest")  
  .load()  


// 解析JSON數據  
val parsedStream = inputStream  
  .selectExpr("CAST(value AS STRING) as json")  
  .select(from_json($"json",   
    "id STRING, timestamp LONG, value DOUBLE").as("data"))  
  .select("data.*")  
  .withColumn("event_time",   
    to_timestamp($"timestamp" / 1000))  
  .withWatermark("event_time", "10 minutes")  


// 執行窗口聚合  
val windowedAggregation = parsedStream  
  .groupBy(  
    window($"event_time", "5 minutes", "1 minute"),  
    $"id"  
  )  
  .agg(  
    avg("value").as("avg_value"),  
    count("*").as("event_count")  
  )  
  .select(  
    $"window.start".as("window_start"),  
    $"window.end".as("window_end"),  
    $"id",  
    $"avg_value",  
    $"event_count"  
  )  


// 輸出結果  
val query = windowedAggregation.writeStream  
  .outputMode("update")  
  .format("console")  
  .option("truncate", false)  
  .trigger(Trigger.ProcessingTime("1 minute"))  
  .start()

十二、內存泄漏檢測與處理

Spark提供了內存泄漏檢測機制,可以幫助識別和解決內存問題。

內存泄漏監控示例:

// 啟用內存泄漏檢測  
spark.conf.set("spark.unsafe.exceptionOnMemoryLeak", true)  


// 創建自定義累加器監控內存使用  
val memoryLeakMonitor = spark.sparkContext.longAccumulator("MemoryLeakMonitor")  


// 創建可能導致內存泄漏的函數  
def processWithPotentialLeak(df: DataFrame): DataFrame = {  
  // 模擬處理邏輯  
  val result = df.mapPartitions { iter =>  
    // 記錄處理前內存  
    val runtime = Runtime.getRuntime  
    val beforeMem = runtime.totalMemory() - runtime.freeMemory()  


    // 處理數據  
    val resultIter = iter.map(row => {  
      // 處理邏輯  
      row  
    })  


    // 記錄處理后內存  
    val afterMem = runtime.totalMemory() - runtime.freeMemory()  
    memoryLeakMonitor.add(afterMem - beforeMem)  


    resultIter  
  }  


  result  
}  


// 使用監控函數處理數據  
val df = spark.range(0, 1000000).withColumn("value", rand())  
val processed = processWithPotentialLeak(df)  
processed.count()  


// 檢查累加器值  
println(s"內存增長: ${memoryLeakMonitor.value} 字節")

十三、綜合優化案例

以下是一個綜合應用多種內存優化技術的實際案例。

1. 大規模數據處理優化

import org.apache.spark.storage.StorageLevel  
import org.apache.spark.sql.functions._  


// 配置Spark Session  
val spark = SparkSession.builder()  
  .appName("ComprehensiveMemoryOptimization")  
  // 啟用自適應查詢執行  
  .config("spark.sql.adaptive.enabled", true)  
  .config("spark.sql.adaptive.coalescePartitions.enabled", true)  
  // 啟用堆外內存  
  .config("spark.memory.offHeap.enabled", true)  
  .config("spark.memory.offHeap.size", "8g")  
  .config("spark.sql.columnVector.offheap.enabled", true)  
  // 列式存儲優化  
  .config("spark.sql.inMemoryColumnarStorage.compressed", true)  
  .config("spark.sql.inMemoryColumnarStorage.batchSize", 20000)  
  .getOrCreate()  


// 讀取大規模數據  
val rawData = spark.read  
  .format("parquet")  
  .load("/path/to/large/dataset")  


// 數據預處理  
val processedData = rawData  
  .filter($"value" > 0)  
  .withColumn("category", when($"value" < 100, "low")  
    .when($"value" < 500, "medium")  
    .otherwise("high"))  
  .withColumn("date", to_date($"timestamp"))  


// 使用優化的存儲級別緩存  
processedData.persist(StorageLevel.MEMORY_AND_DISK_SER)  
processedData.count() // 觸發緩存  


// 檢測數據傾斜  
val keyDistribution = processedData  
  .groupBy("key")  
  .count()  
  .cache()  


val maxCount = keyDistribution.agg(max("count")).first().getLong(0)  
val avgCount = keyDistribution.agg(avg("count")).first().getDouble(0)  
println(s"最大鍵計數: $maxCount, 平均鍵計數: $avgCount, 傾斜比例: ${maxCount / avgCount}")  


// 處理傾斜鍵  
val skewThreshold = avgCount * 5  
val skewedKeys = keyDistribution  
  .filter($"count" > skewThreshold)  
  .select("key")  
  .collect()  
  .map(_.getString(0))  
  .toSet  


val skewedKeysBroadcast = spark.sparkContext.broadcast(skewedKeys)  


// 分離傾斜和非傾斜數據  
val skewedData = processedData  
  .filter(row => skewedKeysBroadcast.value.contains(row.getAs[String]("key")))  
// 對兩部分數據分別進行聚合  
val skewedAggregated = saltedSkewedData  
  .groupBy("salted_key")  
  .agg(  
    sum("value").as("sum_value"),  
    count("*").as("count")  
  )  
  .withColumn("key", split($"salted_key", "_").getItem(0))  
  .drop("salted_key")  
  .groupBy("key")  
  .agg(  
    sum("sum_value").as("sum_value"),  
    sum("count").as("count")  
  )  


val normalAggregated = normalData  
  .groupBy("key")  
  .agg(  
    sum("value").as("sum_value"),  
    count("*").as("count")  
  )  


// 合并結果  
val finalResult = skewedAggregated.union(normalAggregated)

2. 廣播變量優化

廣播變量可以有效減少數據傳輸和內存使用:

// 創建一個大型查找表  
val lookupTable = spark.range(0, 100000)  
  .withColumn("value", rand() * 1000)  
  .collect()  
  .map(row => (row.getLong(0), row.getDouble(1)))  
  .toMap  
// 廣播查找表  
val broadcastLookupTable = spark.sparkContext.broadcast(lookupTable)  
// 使用廣播變量進行查找  
val result = spark.range(0, 1000000)  
  .withColumn("key", $"id" % 100000)  
  .mapPartitions { iter =>  
    val lookup = broadcastLookupTable.value  
    iter.map { row =>  
      val id = row.getLong(0)  
      val key = row.getLong(1)  
      val value = lookup.getOrElse(key, 0.0)  
      (id, key, value)  
    }  
  }

十四、內存調優優秀實踐

 1. 內存配置原則

  • 合理設置執行器內存:根據集群節點內存和并行度設置合適的執行器內存
  • 避免過度分配:每個執行器內存不宜過大,以免GC時間過長
  • 預留系統開銷:為操作系統和其他進程預留足夠內存
  • 調整存儲與執行內存比例:根據應用特點調整存儲內存和執行內存的比例
// 示例:4節點集群,每節點64GB內存,16核  
// 設置每個執行器使用4核  
spark.conf.set("spark.executor.cores", "4")  


// 每節點運行3個執行器,每執行器約16GB內存  
spark.conf.set("spark.executor.memory", "16g")  


// 設置內存開銷因子  
spark.conf.set("spark.executor.memoryOverhead", "2g")  


// 調整存儲內存比例  
spark.conf.set("spark.memory.storageFraction", "0.4")

2. 緩存策略選擇

根據數據特點和查詢模式選擇合適的緩存策略:

數據特點

推薦緩存策略

高頻訪問,內存充足

MEMORY_ONLY

高頻訪問,內存有限

MEMORY_ONLY_SER

數據量大,查詢少

MEMORY_AND_DISK_SER

數據量極大,內存緊張

OFF_HEAP

// 示例:根據數據大小選擇緩存策略  
def smartCache(df: DataFrame): DataFrame = {  
  val sizeEstimate = SparkContext.getActive.get.estimateRDDSize(df.rdd)  
  val availableMemory = SparkContext.getActive.get.getExecutorMemoryStatus  
    .map(_._2._2).sum * 0.6 // 可用內存的60%  


  if (sizeEstimate < availableMemory * 0.5) {  
    // 數據較小,使用MEMORY_ONLY  
    df.persist(StorageLevel.MEMORY_ONLY)  
  } else if (sizeEstimate < availableMemory * 0.8) {  
    // 數據中等,使用MEMORY_ONLY_SER  
    df.persist(StorageLevel.MEMORY_ONLY_SER)  
  } else {  
    // 數據較大,使用MEMORY_AND_DISK_SER  
    df.persist(StorageLevel.MEMORY_AND_DISK_SER)  
  }  


  df  
}

十五、高級內存優化技術

1. 列裁剪和謂詞下推

列裁剪和謂詞下推可以減少處理的數據量,從而降低內存使用:

// 啟用列裁剪和謂詞下推  
spark.conf.set("spark.sql.optimizer.columnPruning.enabled", true)  
spark.conf.set("spark.sql.optimizer.nestedPredicatePushdown.enabled", true)  


// 示例:只選擇需要的列并盡早過濾  
val optimizedQuery = spark.table("large_table")  
  .select("id", "name", "value") // 列裁剪  
  .filter($"value" > 100) // 謂詞下推  
  .join(spark.table("small_table").select("id", "category"), "id")

2. 分區修剪

分區修剪可以減少讀取的數據量:

// 啟用動態分區修剪  
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", true)  


// 示例:使用分區字段進行過濾  
val result = spark.table("partitioned_table")  
  .filter($"date" >= "2023-01-01" && $"date" <= "2023-01-31")  
  .join(spark.table("dimension_table"), "id")

3. 內存使用監控與調優

定期監控內存使用情況,及時調整配置:

// 創建內存使用監控函數  
def monitorMemoryUsage(sc: SparkContext): Unit = {  
  // 獲取執行器內存狀態  
  val executorMemoryStatus = sc.getExecutorMemoryStatus  


  // 計算總內存和已用內存  
  val totalMem = executorMemoryStatus.map(_._2._2).sum  
  val usedMem = executorMemoryStatus.map(_._2._1).sum  


  // 計算內存使用率  
  val memoryUtilization = usedMem.toDouble / totalMem  


  println(s"內存使用率: ${memoryUtilization * 100}%")  
  println(s"已用內存: ${usedMem / 1024 / 1024} MB")  
  println(s"總內存: ${totalMem / 1024 / 1024} MB")  


  // 獲取存儲內存使用情況  
  val storageMemoryUsed = sc.getRDDStorageInfo.map(_.memoryUsed).sum  
  println(s"存儲內存使用: ${storageMemoryUsed / 1024 / 1024} MB")  


  // 檢查是否需要調整配置  
  if (memoryUtilization > 0.85) {  
    println("警告:內存使用率過高,考慮增加執行器內存或減少并行度")  
  }  


  if (storageMemoryUsed > usedMem * 0.7) {  
    println("警告:存儲內存占比過高,考慮調整存儲內存比例或減少緩存數據量")  
  }  
}  


// 定期執行監控  
val monitoringThread = new Thread(() => {  
  while (true) {  
    try {  
      monitorMemoryUsage(spark.sparkContext)  
      Thread.sleep(60000) // 每分鐘監控一次  
    } catch {  
      case e: Exception => println(s"監控異常: ${e.getMessage}")  
    }  
  }  
})  
monitoringThread.setDaemon(true)  
monitoringThread.start()

十六、特定場景的內存優化

1. 機器學習應用優化

機器學習應用通常需要處理大量特征和模型參數:

import org.apache.spark.ml.feature.VectorAssembler  
import org.apache.spark.ml.classification.RandomForestClassifier  


// 啟用ML優化配置  
spark.conf.set("spark.sql.shuffle.partitions", 200)  
spark.conf.set("spark.memory.offHeap.enabled", true)  
spark.conf.set("spark.memory.offHeap.size", "4g")  


// 讀取數據  
val data = spark.read.parquet("/path/to/features")  


// 特征工程  
val featureCols = data.columns.filter(_ != "label")  
val assembler = new VectorAssembler()  
  .setInputCols(featureCols)  
  .setOutputCol("features")  


// 使用列式存儲優化特征數據  
val assembled = assembler.transform(data)  
  .select("features", "label")  
assembled.cache()  
assembled.count()  


// 訓練模型  
val rf = new RandomForestClassifier()  
  .setNumTrees(100)  
  .setMaxDepth(10)  
  .setFeatureSubsetStrategy("sqrt")  


// 使用checkpoint減少RDD依賴鏈  
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")  
val model = rf.fit(assembled)

2. 圖計算優化

圖計算應用通常需要處理大量頂點和邊的數據:

import org.apache.spark.graphx._  
import org.apache.spark.storage.StorageLevel  


// 配置圖計算優化  
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  
spark.conf.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")  


// 創建頂點和邊  
val vertices = spark.sparkContext.parallelize(  
  (1L to 1000000L).map(id => (id, Map("name" -> s"vertex_$id")))  
)  


val edges = spark.sparkContext.parallelize(  
  (1L to 2000000L).map { i =>  
    val src = scala.util.Random.nextInt(1000000) + 1L  
    val dst = scala.util.Random.nextInt(1000000) + 1L  
    Edge(src, dst, 1.0)  
  }  
)  


// 創建圖并使用優化的存儲級別  
val graph = Graph(vertices, edges, Map.empty[String, Any],  
  StorageLevel.MEMORY_AND_DISK_SER,  
  StorageLevel.MEMORY_AND_DISK_SER)  


// 緩存圖  
graph.cache()  
graph.vertices.count()  
graph.edges.count()  


// 執行PageRank算法  
val ranks = graph.pageRank(0.0001).vertices

十七、總結與優秀實踐

1. 內存優化核心原則

  • 了解應用特點:分析應用的數據量、計算模式和內存需求
  • 合理配置資源:根據集群規模和應用特點配置執行器數量和內存
  • 選擇適當的緩存策略:根據數據特點選擇合適的存儲級別
  • 利用列式存儲和壓縮:減少內存占用,提高查詢效率
  • 使用堆外內存:減輕GC壓力,提高大數據處理能力
  • 監控和調優:定期監控內存使用情況,及時調整配置

2. 常見問題及解決方案

問題

解決方案

OOM錯誤

增加執行器內存、減少并行度、使用序列化緩存

GC時間過長

使用堆外內存、調整GC策略、減少執行器內存大小

數據傾斜

加鹽處理、拆分任務、使用AQE

緩存效率低

調整批處理大小、啟用壓縮、使用列式存儲

Shuffle溢出

增加shuffle內存比例、調整分區數、使用AQE

責任編輯:趙寧寧 來源: 大數據技能圈
相關推薦

2017-12-07 08:49:02

機房租用

2025-05-30 20:08:03

2020-07-09 16:13:00

大數據就業大數據人才

2011-06-02 17:07:32

2021-03-11 09:00:14

云計算大數據數據中心

2018-01-29 09:02:51

2016-11-15 15:16:39

Linux操作系統Windows

2019-10-23 05:44:52

Linux 命令

2020-02-26 21:58:41

Linux命令

2017-11-08 09:02:23

CIO信息化轉型

2019-10-22 22:31:15

Python切片字符串

2020-08-17 08:00:54

計算機IT互聯網

2019-11-01 09:23:31

開源項目UI

2021-08-07 15:29:48

區塊鏈比特幣技術

2024-11-22 00:09:15

2022-05-22 07:29:24

工具插件客戶端軟件

2019-08-09 15:03:53

2021-08-23 19:00:13

數據分析大數據

2019-04-12 13:56:30

物聯網協議物聯網IOT

2018-08-09 17:45:58

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: www日本在线播放 | 精品久久不卡 | 精品久久国产老人久久综合 | xx性欧美肥妇精品久久久久久 | 日本不卡一区二区三区在线观看 | 国产乱码一区 | 欧美一区二区三区在线看 | 欧洲一区在线观看 | 亚洲成av人影片在线观看 | 国产午夜视频 | 欧美日韩三级 | 日韩精品免费 | 黄色毛片黄色毛片 | 久久久爽爽爽美女图片 | 欧美日韩久久精品 | 浮生影院免费观看中文版 | 久久综合久 | 国产精品美女久久久久久免费 | 久久欧美高清二区三区 | 中国毛片免费 | 中文字幕啪啪 | 亚洲精品久久久久久国产精华液 | 亚洲欧美日本在线 | 91视频观看| 日本精品视频在线 | 久久高清精品 | av电影一区二区 | 欧美精品一区二区三区蜜桃视频 | 国产成人精品免费视频大全最热 | 久久噜| 亚洲国产精品人人爽夜夜爽 | 在线观看不卡av | 国产免费视频 | 国产精品一区二区在线观看 | 中文字幕av在线 | 亚洲男人的天堂网站 | 91玖玖| 久久久久久久一区二区 | 国产黄色大片在线免费观看 | 久久久91 | 亚洲va欧美va人人爽午夜 |