Apache Spark Delta Lake寫(xiě)數(shù)據(jù)使用及實(shí)現(xiàn)原理代碼解析
Delta Lake 寫(xiě)數(shù)據(jù)是其最基本的功能,而且其使用和現(xiàn)有的 Spark 寫(xiě) Parquet 文件基本一致,在介紹 Delta Lake 實(shí)現(xiàn)原理之前先來(lái)看看如何使用它,具體使用如下:
- df.write.format("delta").save("/data/yangping.wyp/delta/test/")
- //數(shù)據(jù)按照 dt 分區(qū)
- df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/")
- // 覆蓋之前的數(shù)據(jù)
- df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/")
大家可以看出,使用寫(xiě) Delta 數(shù)據(jù)是非常簡(jiǎn)單的,這也是 Delte Lake 介紹的 100% 兼容 Spark。
Delta Lake 寫(xiě)數(shù)據(jù)原理
前面簡(jiǎn)單了解了如何使用 Delta Lake 來(lái)寫(xiě)數(shù)據(jù),本小結(jié)我們將深入介紹 Delta Lake 是如何保證寫(xiě)數(shù)據(jù)的基本原理以及如何保證事務(wù)性。
得益于 Apache Spark 強(qiáng)大的數(shù)據(jù)源 API,我們可以很方便的給 Spark 添加任何數(shù)據(jù)源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 實(shí)現(xiàn)的一種新的數(shù)據(jù)源,我們調(diào)用 df.write.format("delta") 其實(shí)底層調(diào)用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 類(lèi)。為了簡(jiǎn)單起見(jiàn),本文介紹的是 Delta Lake 批量寫(xiě)的實(shí)現(xiàn),實(shí)時(shí)流寫(xiě) Delta Lake 本文不涉及,后面有機(jī)會(huì)再介紹。 Delta Lake 批量寫(xiě)擴(kuò)展了 org.apache.spark.sql.sources.CreatableRelationProvider 特質(zhì),并實(shí)現(xiàn)了其中的方法。我們調(diào)用上面的寫(xiě)數(shù)據(jù)方法首先會(huì)調(diào)用 DeltaDataSource 類(lèi)的 createRelation 方法,它的具體實(shí)現(xiàn)如下:
- override def createRelation(
- sqlContext: SQLContext,
- mode: SaveMode,
- parameters: Map[String, String],
- data: DataFrame): BaseRelation = {
- // 寫(xiě)數(shù)據(jù)的路徑
- val path = parameters.getOrElse("path", {
- throw DeltaErrors.pathNotSpecifiedException
- })
- // 分區(qū)字段
- val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY)
- .map(DeltaDataSource.decodePartitioningColumns)
- .getOrElse(Nil)
- // 事務(wù)日志對(duì)象
- val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path)
- // 真正的寫(xiě)操作過(guò)程
- WriteIntoDelta(
- deltaLog = deltaLog,
- mode = mode,
- new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf),
- partitionColumns = partitionColumns,
- configuration = Map.empty,
- data = data).run(sqlContext.sparkSession)
- deltaLog.createRelation()
- }
其中 mode 就是保持?jǐn)?shù)據(jù)的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 這個(gè)傳遞的參數(shù),比如分區(qū)字段、數(shù)據(jù)保存路徑以及 Delta 支持的一些參數(shù)(replaceWhere、mergeSchema、overwriteSchema 等,具體參見(jiàn) org.apache.spark.sql.delta.DeltaOptions);data 就是我們需要保存的數(shù)據(jù)。
createRelation 方法緊接著就是獲取數(shù)據(jù)保存的路徑,分區(qū)字段等信息。然后初始化 deltaLog,deltaLog 的初始化會(huì)做很多事情,比如會(huì)讀取磁盤(pán)所有的事務(wù)日志(_delta_log 目錄下),并構(gòu)建最新事務(wù)日志的最新快照,里面可以拿到最新數(shù)據(jù)的版本。由于 deltaLog 的初始化成本比較高,所以 deltaLog 初始化完之后會(huì)緩存到 deltaLogCache 中,這是一個(gè)使用 Guava 的 CacheBuilder 類(lèi)實(shí)現(xiàn)的一個(gè)緩存,緩存的數(shù)據(jù)保持一小時(shí),緩存大小可以通過(guò) delta.log.cacheSize 參數(shù)進(jìn)行設(shè)置。只要寫(xiě)數(shù)據(jù)的路徑是一樣的,就只需要初始化一次 deltaLog,后面直接從緩存中拿即可。除非之前緩存的 deltaLog 被清理了,或者無(wú)效才會(huì)再次初始化。DeltaLog 類(lèi)是 Delta Lake 中最重要的類(lèi)之一,涉及的內(nèi)容非常多,所以我們會(huì)單獨(dú)使用一篇文章進(jìn)行介紹。
緊接著初始化 WriteIntoDelta,WriteIntoDelta 擴(kuò)展自 RunnableCommand,Delta Lake 中的更新、刪除、合并都是擴(kuò)展這個(gè)類(lèi)的。初始化完 WriteIntoDelta 之后,就會(huì)調(diào)用 run 方法執(zhí)行真正的寫(xiě)數(shù)據(jù)操作。WriteIntoDelta 的 run 方法實(shí)現(xiàn)如下:
- override def run(sparkSession: SparkSession): Seq[Row] = {
- deltaLog.withNewTransaction { txn =>
- val actions = write(txn, sparkSession)
- val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere)
- txn.commit(actions, operation)
- }
- Seq.empty
- }
Delta Lake 所有的更新操作都是在事務(wù)中進(jìn)行的,deltaLog.withNewTransaction 就是一個(gè)事務(wù),withNewTransaction 的實(shí)現(xiàn)如下:
- def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {
- try {
- // 更新當(dāng)前表事務(wù)日志的快照
- update()
- // 初始化樂(lè)觀事務(wù)鎖對(duì)象
- val txn = new OptimisticTransaction(this)
- // 開(kāi)啟事務(wù)
- OptimisticTransaction.setActive(txn)
- // 執(zhí)行寫(xiě)數(shù)據(jù)操作
- thunk(txn)
- } finally {
- // 關(guān)閉事務(wù)
- OptimisticTransaction.clearActive()
- }
- }
在開(kāi)啟事務(wù)之前,需要更新當(dāng)前表事務(wù)的快照,因?yàn)樵趫?zhí)行寫(xiě)數(shù)據(jù)之前,這張表可能已經(jīng)被修改了,執(zhí)行 update 操作之后,就可以拿到當(dāng)前表的最新版本,緊接著開(kāi)啟樂(lè)觀事務(wù)鎖。thunk(txn) 就是需要執(zhí)行的事務(wù)操作,對(duì)應(yīng) deltaLog.withNewTransaction 里面的所有代碼。
我們回到上面的 run 方法。val actions = write(txn, sparkSession) 就是執(zhí)行寫(xiě)數(shù)據(jù)的操作,它的實(shí)現(xiàn)如下:
- def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
- import sparkSession.implicits._
- // 如果不是第一次往表里面寫(xiě)數(shù)據(jù),需要判斷寫(xiě)數(shù)據(jù)的模式是否符合條件
- if (txn.readVersion > -1) {
- // This table already exists, check if the insert is valid.
- if (mode == SaveMode.ErrorIfExists) {
- throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath)
- } else if (mode == SaveMode.Ignore) {
- return Nil
- } else if (mode == SaveMode.Overwrite) {
- deltaLog.assertRemovable()
- }
- }
- // 更新表的模式,比如是否覆蓋現(xiàn)有的模式,是否和現(xiàn)有的模式進(jìn)行 merge
- updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation)
- // 是否定義分區(qū)過(guò)濾條件
- val replaceWhere = options.replaceWhere
- val partitionFilters = if (replaceWhere.isDefined) {
- val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get)
- if (mode == SaveMode.Overwrite) {
- verifyPartitionPredicates(
- sparkSession, txn.metadata.partitionColumns, predicates)
- }
- Some(predicates)
- } else {
- None
- }
- // 第一次寫(xiě)數(shù)據(jù)初始化事務(wù)日志的目錄
- if (txn.readVersion < 0) {
- // Initialize the log path
- deltaLog.fs.mkdirs(deltaLog.logPath)
- }
- // 寫(xiě)數(shù)據(jù)到文件系統(tǒng)中
- val newFiles = txn.writeFiles(data, Some(options))
- val deletedFiles = (mode, partitionFilters) match {
- // 全量覆蓋,直接拿出緩存在內(nèi)存中最新事務(wù)日志快照里面的所有 AddFile 文件
- case (SaveMode.Overwrite, None) =>
- txn.filterFiles().map(_.remove)
- // 從事務(wù)日志快照中獲取對(duì)應(yīng)分區(qū)里面的所有 AddFile 文件
- case (SaveMode.Overwrite, Some(predicates)) =>
- // Check to make sure the files we wrote out were actually valid.
- val matchingFiles = DeltaLog.filterFileList(
- txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect()
- val invalidFiles = newFiles.toSet -- matchingFiles
- if (invalidFiles.nonEmpty) {
- val badPartitions = invalidFiles
- .map(_.partitionValues)
- .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") }
- .mkString(", ")
- throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions)
- }
- txn.filterFiles(predicates).map(_.remove)
- case _ => Nil
- }
- newFiles ++ deletedFiles
- }
- }
如果 txn.readVersion == -1,說(shuō)明是第一次寫(xiě)數(shù)據(jù)到 Delta Lake 表,所以當(dāng)這個(gè)值大于 -1 的時(shí)候,需要判斷一下寫(xiě)數(shù)據(jù)的操作是否合法。
由于 Delta Lake 底層使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,這就是 updateMetadata 函數(shù)對(duì)應(yīng)的操作。
因?yàn)?Delta Lake 表支持分區(qū),所以我們可能在寫(xiě)數(shù)據(jù)的時(shí)候指定某個(gè)分區(qū)進(jìn)行覆蓋。
真正寫(xiě)數(shù)據(jù)的操作是 txn.writeFiles 函數(shù)執(zhí)行的,具體實(shí)現(xiàn)如下:
- def writeFiles(
- data: Dataset[_],
- writeOptions: Option[DeltaOptions],
- isOptimize: Boolean): Seq[AddFile] = {
- hasWritten = true
- val spark = data.sparkSession
- val partitionSchema = metadata.partitionSchema
- val outputPath = deltaLog.dataPath
- val (queryExecution, output) = normalizeData(data, metadata.partitionColumns)
- val partitioningColumns =
- getPartitioningColumns(partitionSchema, output, output.length < data.schema.size)
- // 獲取 DelayedCommitProtocol,里面可以設(shè)置寫(xiě)文件的名字,
- // commitTask 和 commitJob 等做一些事情
- val committer = getCommitter(outputPath)
- val invariants = Invariants.getFromSchema(metadata.schema, spark)
- SQLExecution.withNewExecutionId(spark, queryExecution) {
- val outputSpec = FileFormatWriter.OutputSpec(
- outputPath.toString,
- Map.empty,
- output)
- val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants)
- FileFormatWriter.write(
- sparkSession = spark,
- plan = physicalPlan,
- fileFormat = snapshot.fileFormat,
- committer = committer,
- outputSpec = outputSpec,
- hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
- partitionColumns = partitioningColumns,
- bucketSpec = None,
- statsTrackers = Nil,
- options = Map.empty)
- }
- // 返回新增的文件
- committer.addedStatuses
- }
Delta Lake 寫(xiě)操作最終調(diào)用 Spark 的 FileFormatWriter.write 方法進(jìn)行的,通過(guò)這個(gè)方法的復(fù)用將我們真正的數(shù)據(jù)寫(xiě)入到 Delta Lake 表里面去了。
在 Delta Lake 中,如果是新增文件則會(huì)在事務(wù)日志中使用 AddFile 類(lèi)記錄相關(guān)的信息,AddFile 持久化到事務(wù)日志里面的內(nèi)容如下:
- {"add":{"path":"dt=20190801/part-00001-bdff67f3-c70f-4817-898d-15a73c93271a.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566990855000,"dataChange":true}}
可以看出 AddFile 里面記錄了新增文件的保存路徑,分區(qū)信息,新增的文件大小,修改時(shí)間等信息。如果是刪除文件,也會(huì)在事務(wù)日志里面記錄這個(gè)刪除操作,對(duì)應(yīng)的就是使用 RemoveFile 類(lèi)存儲(chǔ),RemoveFile 持久化到事務(wù)日志里面的內(nèi)容如下:
- {"remove":{"path":"dt=20190801/part-00001-7f3fe89d-e55b-4848-93ea-4133b5d406d6.c000.snappy.parquet","deletionTimestamp":1566990856332,"dataChange":true}}
RemoveFile 里面保存了刪除文件的路徑,刪除時(shí)間等信息。如果新增一個(gè)文件,再刪除一個(gè)文件,那么最新的事務(wù)日志快照里面只會(huì)保存刪除這個(gè)文件的記錄。從這里面也可以看出, Delta Lake 刪除、新增 ACID 是針對(duì)文件級(jí)別的。
上面的寫(xiě)操作肯定會(huì)產(chǎn)生新的文件,所以寫(xiě)操作之后就需要拿到新增的文件(val newFiles = txn.writeFiles(data, Some(options)) )newFiles(AddFile) 和需要?jiǎng)h除的文件(RemoveFile)。針對(duì)那些文件需要?jiǎng)h除需要做一些判斷,主要分兩種情況(具體參見(jiàn) write 方法里面的):
- 如果是全表覆蓋,則直接從緩存在內(nèi)存中最新的事務(wù)日志快照中拿出所有 AddFile 文件,然后將其標(biāo)記為 RemoveFile;
- 如果是分區(qū)內(nèi)的覆蓋,則從緩存在內(nèi)存中最新的事務(wù)日志快照中拿出對(duì)應(yīng)分區(qū)下的 AddFile 文件,然后將其標(biāo)記為 RemoveFile。
最后 write 方法返回新增的文件和需要?jiǎng)h除的文件(newFiles ++ deletedFiles),這些文件最終需要記錄到事務(wù)日志里面去。關(guān)于事務(wù)日志是如何寫(xiě)進(jìn)去的請(qǐng)參見(jiàn)這篇文章的詳細(xì)分析。