成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Apache Spark Delta Lake寫(xiě)數(shù)據(jù)使用及實(shí)現(xiàn)原理代碼解析

大數(shù)據(jù) Spark
Delta Lake 寫(xiě)數(shù)據(jù)是其最基本的功能,而且其使用和現(xiàn)有的 Spark 寫(xiě) Parquet 文件基本一致,在介紹 Delta Lake 實(shí)現(xiàn)原理之前先來(lái)看看如何使用它,具體使用如下。

[[278252]]

Delta Lake 寫(xiě)數(shù)據(jù)是其最基本的功能,而且其使用和現(xiàn)有的 Spark 寫(xiě) Parquet 文件基本一致,在介紹 Delta Lake 實(shí)現(xiàn)原理之前先來(lái)看看如何使用它,具體使用如下: 

  1. df.write.format("delta").save("/data/yangping.wyp/delta/test/"
  2.   
  3. //數(shù)據(jù)按照 dt 分區(qū) 
  4. df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/"
  5.   
  6. // 覆蓋之前的數(shù)據(jù) 
  7. df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/"

大家可以看出,使用寫(xiě) Delta 數(shù)據(jù)是非常簡(jiǎn)單的,這也是 Delte Lake 介紹的 100% 兼容 Spark。

Delta Lake 寫(xiě)數(shù)據(jù)原理

前面簡(jiǎn)單了解了如何使用 Delta Lake 來(lái)寫(xiě)數(shù)據(jù),本小結(jié)我們將深入介紹 Delta Lake 是如何保證寫(xiě)數(shù)據(jù)的基本原理以及如何保證事務(wù)性。

得益于 Apache Spark 強(qiáng)大的數(shù)據(jù)源 API,我們可以很方便的給 Spark 添加任何數(shù)據(jù)源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 實(shí)現(xiàn)的一種新的數(shù)據(jù)源,我們調(diào)用 df.write.format("delta") 其實(shí)底層調(diào)用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 類(lèi)。為了簡(jiǎn)單起見(jiàn),本文介紹的是 Delta Lake 批量寫(xiě)的實(shí)現(xiàn),實(shí)時(shí)流寫(xiě) Delta Lake 本文不涉及,后面有機(jī)會(huì)再介紹。 Delta Lake 批量寫(xiě)擴(kuò)展了 org.apache.spark.sql.sources.CreatableRelationProvider 特質(zhì),并實(shí)現(xiàn)了其中的方法。我們調(diào)用上面的寫(xiě)數(shù)據(jù)方法首先會(huì)調(diào)用 DeltaDataSource 類(lèi)的 createRelation 方法,它的具體實(shí)現(xiàn)如下: 

  1. override def createRelation( 
  2.     sqlContext: SQLContext, 
  3.     mode: SaveMode, 
  4.     parameters: Map[String, String], 
  5.     data: DataFrame): BaseRelation = { 
  6.   
  7.   // 寫(xiě)數(shù)據(jù)的路徑 
  8.   val path = parameters.getOrElse("path", { 
  9.     throw DeltaErrors.pathNotSpecifiedException 
  10.   }) 
  11.   
  12.   // 分區(qū)字段 
  13.   val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY) 
  14.     .map(DeltaDataSource.decodePartitioningColumns) 
  15.     .getOrElse(Nil) 
  16.   
  17.   
  18.   // 事務(wù)日志對(duì)象 
  19.   val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path) 
  20.   
  21.   // 真正的寫(xiě)操作過(guò)程 
  22.   WriteIntoDelta( 
  23.     deltaLog = deltaLog, 
  24.     mode = mode, 
  25.     new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf), 
  26.     partitionColumns = partitionColumns, 
  27.     configuration = Map.empty, 
  28.     data = data).run(sqlContext.sparkSession) 
  29.   
  30.   deltaLog.createRelation() 

其中 mode 就是保持?jǐn)?shù)據(jù)的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 這個(gè)傳遞的參數(shù),比如分區(qū)字段、數(shù)據(jù)保存路徑以及 Delta 支持的一些參數(shù)(replaceWhere、mergeSchema、overwriteSchema 等,具體參見(jiàn) org.apache.spark.sql.delta.DeltaOptions);data 就是我們需要保存的數(shù)據(jù)。

createRelation 方法緊接著就是獲取數(shù)據(jù)保存的路徑,分區(qū)字段等信息。然后初始化 deltaLog,deltaLog 的初始化會(huì)做很多事情,比如會(huì)讀取磁盤(pán)所有的事務(wù)日志(_delta_log 目錄下),并構(gòu)建最新事務(wù)日志的最新快照,里面可以拿到最新數(shù)據(jù)的版本。由于 deltaLog 的初始化成本比較高,所以 deltaLog 初始化完之后會(huì)緩存到 deltaLogCache 中,這是一個(gè)使用 Guava 的 CacheBuilder 類(lèi)實(shí)現(xiàn)的一個(gè)緩存,緩存的數(shù)據(jù)保持一小時(shí),緩存大小可以通過(guò) delta.log.cacheSize 參數(shù)進(jìn)行設(shè)置。只要寫(xiě)數(shù)據(jù)的路徑是一樣的,就只需要初始化一次 deltaLog,后面直接從緩存中拿即可。除非之前緩存的 deltaLog 被清理了,或者無(wú)效才會(huì)再次初始化。DeltaLog 類(lèi)是 Delta Lake 中最重要的類(lèi)之一,涉及的內(nèi)容非常多,所以我們會(huì)單獨(dú)使用一篇文章進(jìn)行介紹。

緊接著初始化 WriteIntoDelta,WriteIntoDelta 擴(kuò)展自 RunnableCommand,Delta Lake 中的更新、刪除、合并都是擴(kuò)展這個(gè)類(lèi)的。初始化完 WriteIntoDelta 之后,就會(huì)調(diào)用 run 方法執(zhí)行真正的寫(xiě)數(shù)據(jù)操作。WriteIntoDelta 的 run 方法實(shí)現(xiàn)如下: 

  1. override def run(sparkSession: SparkSession): Seq[Row] = { 
  2.     deltaLog.withNewTransaction { txn => 
  3.       val actions = write(txn, sparkSession) 
  4.       val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere) 
  5.       txn.commit(actions, operation) 
  6.     } 
  7.     Seq.empty 

Delta Lake 所有的更新操作都是在事務(wù)中進(jìn)行的,deltaLog.withNewTransaction 就是一個(gè)事務(wù),withNewTransaction 的實(shí)現(xiàn)如下: 

  1. def withNewTransaction[T](thunk: OptimisticTransaction => T): T = { 
  2.   try { 
  3.     // 更新當(dāng)前表事務(wù)日志的快照 
  4.     update() 
  5.     // 初始化樂(lè)觀事務(wù)鎖對(duì)象 
  6.     val txn = new OptimisticTransaction(this) 
  7.     // 開(kāi)啟事務(wù) 
  8.     OptimisticTransaction.setActive(txn) 
  9.     // 執(zhí)行寫(xiě)數(shù)據(jù)操作 
  10.     thunk(txn) 
  11.   } finally { 
  12.     // 關(guān)閉事務(wù) 
  13.     OptimisticTransaction.clearActive() 
  14.   } 

在開(kāi)啟事務(wù)之前,需要更新當(dāng)前表事務(wù)的快照,因?yàn)樵趫?zhí)行寫(xiě)數(shù)據(jù)之前,這張表可能已經(jīng)被修改了,執(zhí)行 update 操作之后,就可以拿到當(dāng)前表的最新版本,緊接著開(kāi)啟樂(lè)觀事務(wù)鎖。thunk(txn) 就是需要執(zhí)行的事務(wù)操作,對(duì)應(yīng) deltaLog.withNewTransaction 里面的所有代碼。

我們回到上面的 run 方法。val actions = write(txn, sparkSession) 就是執(zhí)行寫(xiě)數(shù)據(jù)的操作,它的實(shí)現(xiàn)如下: 

  1.   def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = { 
  2.     import sparkSession.implicits._ 
  3.     // 如果不是第一次往表里面寫(xiě)數(shù)據(jù),需要判斷寫(xiě)數(shù)據(jù)的模式是否符合條件 
  4.     if (txn.readVersion > -1) { 
  5.       // This table already exists, check if the insert is valid. 
  6.       if (mode == SaveMode.ErrorIfExists) { 
  7.         throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath) 
  8.       } else if (mode == SaveMode.Ignore) { 
  9.         return Nil 
  10.       } else if (mode == SaveMode.Overwrite) { 
  11.         deltaLog.assertRemovable() 
  12.       } 
  13.     } 
  14.   
  15.     // 更新表的模式,比如是否覆蓋現(xiàn)有的模式,是否和現(xiàn)有的模式進(jìn)行 merge 
  16.     updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation) 
  17.   
  18.     // 是否定義分區(qū)過(guò)濾條件 
  19.     val replaceWhere = options.replaceWhere 
  20.     val partitionFilters = if (replaceWhere.isDefined) { 
  21.       val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get) 
  22.       if (mode == SaveMode.Overwrite) { 
  23.         verifyPartitionPredicates( 
  24.           sparkSession, txn.metadata.partitionColumns, predicates) 
  25.       } 
  26.       Some(predicates) 
  27.     } else { 
  28.       None 
  29.     } 
  30.   
  31.     // 第一次寫(xiě)數(shù)據(jù)初始化事務(wù)日志的目錄 
  32.     if (txn.readVersion < 0) { 
  33.       // Initialize the log path 
  34.       deltaLog.fs.mkdirs(deltaLog.logPath) 
  35.     } 
  36.   
  37.     // 寫(xiě)數(shù)據(jù)到文件系統(tǒng)中 
  38.     val newFiles = txn.writeFiles(data, Some(options)) 
  39.       
  40.     val deletedFiles = (mode, partitionFilters) match { 
  41.        // 全量覆蓋,直接拿出緩存在內(nèi)存中最新事務(wù)日志快照里面的所有 AddFile 文件 
  42.       case (SaveMode.Overwrite, None) => 
  43.         txn.filterFiles().map(_.remove) 
  44.       // 從事務(wù)日志快照中獲取對(duì)應(yīng)分區(qū)里面的所有 AddFile 文件 
  45.       case (SaveMode.Overwrite, Some(predicates)) => 
  46.         // Check to make sure the files we wrote out were actually valid. 
  47.         val matchingFiles = DeltaLog.filterFileList( 
  48.           txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect() 
  49.         val invalidFiles = newFiles.toSet -- matchingFiles 
  50.         if (invalidFiles.nonEmpty) { 
  51.           val badPartitions = invalidFiles 
  52.             .map(_.partitionValues) 
  53.             .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") } 
  54.             .mkString(", "
  55.           throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions) 
  56.         } 
  57.   
  58.         txn.filterFiles(predicates).map(_.remove) 
  59.       case _ => Nil 
  60.     } 
  61.   
  62.     newFiles ++ deletedFiles 
  63.   } 

如果 txn.readVersion == -1,說(shuō)明是第一次寫(xiě)數(shù)據(jù)到 Delta Lake 表,所以當(dāng)這個(gè)值大于 -1 的時(shí)候,需要判斷一下寫(xiě)數(shù)據(jù)的操作是否合法。

由于 Delta Lake 底層使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,這就是 updateMetadata 函數(shù)對(duì)應(yīng)的操作。

因?yàn)?Delta Lake 表支持分區(qū),所以我們可能在寫(xiě)數(shù)據(jù)的時(shí)候指定某個(gè)分區(qū)進(jìn)行覆蓋。

真正寫(xiě)數(shù)據(jù)的操作是 txn.writeFiles 函數(shù)執(zhí)行的,具體實(shí)現(xiàn)如下: 

  1. def writeFiles( 
  2.       data: Dataset[_], 
  3.       writeOptions: Option[DeltaOptions], 
  4.       isOptimize: Boolean): Seq[AddFile] = { 
  5.     hasWritten = true 
  6.   
  7.     val spark = data.sparkSession 
  8.     val partitionSchema = metadata.partitionSchema 
  9.     val outputPath = deltaLog.dataPath 
  10.   
  11.     val (queryExecution, output) = normalizeData(data, metadata.partitionColumns) 
  12.     val partitioningColumns = 
  13.       getPartitioningColumns(partitionSchema, outputoutput.length < data.schema.size
  14.   
  15.     // 獲取 DelayedCommitProtocol,里面可以設(shè)置寫(xiě)文件的名字, 
  16.     // commitTask 和 commitJob 等做一些事情 
  17.     val committer = getCommitter(outputPath) 
  18.   
  19.     val invariants = Invariants.getFromSchema(metadata.schema, spark) 
  20.   
  21.     SQLExecution.withNewExecutionId(spark, queryExecution) { 
  22.       val outputSpec = FileFormatWriter.OutputSpec( 
  23.         outputPath.toString, 
  24.         Map.empty, 
  25.         output
  26.   
  27.       val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants) 
  28.   
  29.       FileFormatWriter.write( 
  30.         sparkSession = spark, 
  31.         plan = physicalPlan, 
  32.         fileFormat = snapshot.fileFormat, 
  33.         committer = committer, 
  34.         outputSpec = outputSpec, 
  35.         hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration), 
  36.         partitionColumns = partitioningColumns, 
  37.         bucketSpec = None, 
  38.         statsTrackers = Nil, 
  39.         options = Map.empty) 
  40.     } 
  41.   
  42.     // 返回新增的文件 
  43.     committer.addedStatuses 

Delta Lake 寫(xiě)操作最終調(diào)用 Spark 的 FileFormatWriter.write 方法進(jìn)行的,通過(guò)這個(gè)方法的復(fù)用將我們真正的數(shù)據(jù)寫(xiě)入到 Delta Lake 表里面去了。

在 Delta Lake 中,如果是新增文件則會(huì)在事務(wù)日志中使用 AddFile 類(lèi)記錄相關(guān)的信息,AddFile 持久化到事務(wù)日志里面的內(nèi)容如下:

  1. {"add":{"path":"dt=20190801/part-00001-bdff67f3-c70f-4817-898d-15a73c93271a.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566990855000,"dataChange":true}} 

可以看出 AddFile 里面記錄了新增文件的保存路徑,分區(qū)信息,新增的文件大小,修改時(shí)間等信息。如果是刪除文件,也會(huì)在事務(wù)日志里面記錄這個(gè)刪除操作,對(duì)應(yīng)的就是使用 RemoveFile 類(lèi)存儲(chǔ),RemoveFile 持久化到事務(wù)日志里面的內(nèi)容如下:

  1. {"remove":{"path":"dt=20190801/part-00001-7f3fe89d-e55b-4848-93ea-4133b5d406d6.c000.snappy.parquet","deletionTimestamp":1566990856332,"dataChange":true}} 

RemoveFile 里面保存了刪除文件的路徑,刪除時(shí)間等信息。如果新增一個(gè)文件,再刪除一個(gè)文件,那么最新的事務(wù)日志快照里面只會(huì)保存刪除這個(gè)文件的記錄。從這里面也可以看出, Delta Lake 刪除、新增 ACID 是針對(duì)文件級(jí)別的。

上面的寫(xiě)操作肯定會(huì)產(chǎn)生新的文件,所以寫(xiě)操作之后就需要拿到新增的文件(val newFiles = txn.writeFiles(data, Some(options)) )newFiles(AddFile) 和需要?jiǎng)h除的文件(RemoveFile)。針對(duì)那些文件需要?jiǎng)h除需要做一些判斷,主要分兩種情況(具體參見(jiàn) write 方法里面的):

  • 如果是全表覆蓋,則直接從緩存在內(nèi)存中最新的事務(wù)日志快照中拿出所有 AddFile 文件,然后將其標(biāo)記為 RemoveFile;
  • 如果是分區(qū)內(nèi)的覆蓋,則從緩存在內(nèi)存中最新的事務(wù)日志快照中拿出對(duì)應(yīng)分區(qū)下的 AddFile 文件,然后將其標(biāo)記為 RemoveFile。

最后 write 方法返回新增的文件和需要?jiǎng)h除的文件(newFiles ++ deletedFiles),這些文件最終需要記錄到事務(wù)日志里面去。關(guān)于事務(wù)日志是如何寫(xiě)進(jìn)去的請(qǐng)參見(jiàn)這篇文章的詳細(xì)分析。

 

責(zé)任編輯:未麗燕 來(lái)源: 阿里云棲社區(qū)
相關(guān)推薦

2022-07-06 09:53:04

開(kāi)源數(shù)據(jù)湖

2023-06-30 07:40:31

數(shù)據(jù)分析Delta lake

2018-04-09 12:25:11

2015-03-10 13:55:31

JavaScript預(yù)解析原理及實(shí)現(xiàn)

2022-03-17 08:55:43

本地線程變量共享全局變量

2020-12-04 14:31:45

大數(shù)據(jù)Spark

2023-12-22 13:58:00

C++鏈表開(kāi)發(fā)

2016-12-20 09:47:38

Apache SparLambda架構(gòu)

2021-07-07 10:13:56

大數(shù)據(jù)Delta Lake 湖倉(cāng)一體

2014-02-14 15:43:16

ApacheSpark

2018-05-22 09:47:07

2018-04-10 10:32:07

NginxApache服務(wù)器

2022-06-01 13:52:11

開(kāi)源大數(shù)據(jù)

2012-04-11 15:41:48

JavaNIO

2018-07-27 08:39:44

負(fù)載均衡算法實(shí)現(xiàn)

2019-10-08 17:38:17

開(kāi)源技術(shù) 趨勢(shì)

2015-10-16 09:21:13

SparkMySQL數(shù)據(jù)分析

2020-07-10 09:04:55

HTTPS瀏覽器網(wǎng)絡(luò)協(xié)議

2017-06-26 15:00:17

2011-08-04 15:52:48

Objective-C HTML
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 另类专区成人 | 一区二区不卡视频 | 亚洲精品免费在线 | 亚洲九九 | 久久亚洲一区二区三 | 91伊人网| 日韩成人高清在线 | 欧美日韩亚洲国产综合 | 欧美日韩在线播放 | 精品久久网 | 国产精品久久在线 | 一区二区欧美在线 | 亚洲三区在线观看 | 欧美综合精品 | 国产一区二区三区久久久久久久久 | 精品国产91乱码一区二区三区 | 久久久视 | 亚洲精品国产第一综合99久久 | 日韩最新网站 | 噜啊噜在线 | 日韩欧美一区二区三区免费观看 | 999免费网站 | 亚洲一区二区三区四区五区午夜 | 亚洲高清视频一区二区 | 一区二区三区不卡视频 | 欧美天堂在线观看 | 亚洲人在线观看视频 | 成人二区 | 三级黄视频在线观看 | 精品九九| 免费视频一区 | 亚洲免费视频网站 | 国产成人一区二区 | 一二三区在线 | av电影手机在线看 | 成人网av | 91视频网| 久久精品视频播放 | 国产成人精品网站 | 国产精品视频免费看 | 国产精品一区二区在线免费观看 |