Apache Paimon 實(shí)時湖倉存儲底座
一、實(shí)時湖倉
這是一個非常常見的圖,展示了數(shù)據(jù)架構(gòu)的時效性演進(jìn)。
目前在企業(yè)中典型的數(shù)據(jù)架構(gòu)大致分為兩種,一種是批式數(shù)倉,傳統(tǒng)的 Hive 表加上 Hive 或 Spark 的計(jì)算,然后可能后面再對接一些 OLAP 引擎,包括 Doris 或 StarRocks。這套架構(gòu)的主要問題在于時效性。這里的時效性分為兩個含義:
- 第一個含義是 ETL 的時效性,也就是數(shù)據(jù)流入數(shù)倉中,什么時候能夠全部處理完畢,準(zhǔn)備好查詢,這個 ETL 的時間在批處理計(jì)算中通常是按天或小時計(jì)算的,根據(jù)分區(qū)來定義。
- 另一方面是查詢的時效性,會因使用的查詢引擎而異。例如,使用 Spark 或 Hive 進(jìn)行查詢通常需要分鐘級的時間,而使用 Doris 進(jìn)行查詢則可能達(dá)到秒級。
近年來,F(xiàn)link 在中國的實(shí)時數(shù)據(jù)倉庫領(lǐng)域得到了廣泛應(yīng)用,其架構(gòu)包括 Flink 流計(jì)算、Kafka 作為中間數(shù)據(jù)流轉(zhuǎn),以及將結(jié)果表直接存儲到 OLAP 系統(tǒng)中,這種純流式的架構(gòu)在許多企業(yè)中得到了推廣。其 ETL 的時效性可以達(dá)到秒級,當(dāng)然,這也取決于整個處理鏈路的不同,有些可能還是分鐘級。查詢的時效性可以更快,比如當(dāng)數(shù)據(jù)最終存儲到 ADS 層時,如果是 MySQL,可以提供毫秒級的查詢;如果存儲到 OLAP 系統(tǒng),也可以提供毫秒級或秒級的查詢。這兩種架構(gòu)在各個企業(yè)中都很常見。
作為一個早期從事流計(jì)算的研發(fā)人員,我一直在思考如何讓實(shí)時數(shù)據(jù)處理在更廣泛的范圍內(nèi)推廣,讓更多的數(shù)據(jù)能夠進(jìn)入實(shí)時處理領(lǐng)域,而不是所有數(shù)據(jù)都必須等到ETL(提取、轉(zhuǎn)換、加載)的次日才能被查看。
隨著近年來的推廣,幾乎所有企業(yè)都建立了實(shí)時架構(gòu)和實(shí)時數(shù)據(jù)倉庫架構(gòu),尤其是 Flink 架構(gòu)。然而,在企業(yè)中,大部分?jǐn)?shù)據(jù)仍然存儲在批處理系統(tǒng)中。實(shí)際上,人們只是將大約 10% 的數(shù)據(jù)轉(zhuǎn)換到實(shí)時數(shù)據(jù)倉庫中,以實(shí)現(xiàn)秒級的事務(wù)和操作(ETL)。
因此,我們進(jìn)行了許多嘗試,比如第一個嘗試是采用 Kappa 架構(gòu),將所有數(shù)據(jù)都導(dǎo)入實(shí)時數(shù)據(jù)倉庫。但這樣做整體的復(fù)雜性非常高,開發(fā)一條實(shí)時鏈路并不容易,中間結(jié)果不可查詢,開發(fā)過程也很復(fù)雜。此外,最核心的問題是成本非常高,與傳統(tǒng)的批式數(shù)據(jù)倉庫相比,成本可能高出十倍、幾十倍甚至上百倍。
第二個嘗試是流批一體的架構(gòu)。利用 Flink 的流計(jì)算能力,在滿足實(shí)時數(shù)據(jù)流計(jì)算的同時,也利用 Flink 批式 ETL 的能力,至少在周期性計(jì)算層,讓數(shù)據(jù)倉庫開發(fā)人員能夠編寫一套周期性工作,這套工作可以在兩種架構(gòu)中通用。但隨著這幾年的推廣,我們發(fā)現(xiàn)實(shí)現(xiàn)這一目標(biāo)的難度非常大。一方面,F(xiàn)link 的批處理功能還不夠成熟。另一方面,還有一個非常核心的問題,即兩套架構(gòu)的存儲方式不同。一邊是 Kafka 或者 OLAP 系統(tǒng)如 Doris 和 StarRocks,另一邊是 Hive 這種表存儲的格式,它們的操作方式完全不同。如果只是用一套計(jì)算引擎的SQL 來統(tǒng)一它們,會發(fā)現(xiàn)在業(yè)務(wù)上根本無法使用。
因此近年來聚焦于實(shí)時湖倉的架構(gòu)。我們分析了之前架構(gòu)的問題,主要是兩種架構(gòu)太過分離,只有實(shí)時鏈路,即實(shí)時數(shù)據(jù)倉庫這條鏈路,才能實(shí)現(xiàn)數(shù)據(jù)時效性的提升。
批處理架構(gòu)最大的問題在于存儲能力不足。比如使用 Hive 存儲,僅僅是將文件放置在一個文件夾中,至于文件如何組織、如何處理,它一概不負(fù)責(zé),只能通過寫一個大的 insert overwrite 語句來更新分區(qū),因此其能力極其有限。而湖格式的能力在于管理每一個文件,能夠完成 ACID(原子性、一致性、隔離性、持久性)操作,甚至能夠完成流式讀取和流式寫入,實(shí)現(xiàn)分鐘級的更新。這就能夠提升數(shù)據(jù)的時效性,或者使表向流式處理和實(shí)時處理方向發(fā)展。
這樣我們就得出了這樣一套架構(gòu)——實(shí)時湖倉。它不再局限于批處理計(jì)算,而是既能進(jìn)行批處理計(jì)算也能進(jìn)行流計(jì)算,各種計(jì)算都可以在上面進(jìn)行,存儲是完全統(tǒng)一的。基于這種統(tǒng)一的計(jì)算,結(jié)合湖格式和 OLAP,能夠達(dá)到分鐘級的時效性。這樣一套時效性的提升都是在原有的批式數(shù)據(jù)湖倉架構(gòu)中完成的。
因此,實(shí)時湖倉是批式數(shù)據(jù)倉庫的原地升級,它并不是一個替代關(guān)系,而是批式數(shù)據(jù)倉庫的直接升級。在原有的批式數(shù)據(jù)倉庫的時效性為 T+1 的情況下,通過流計(jì)算和實(shí)時更新技術(shù),能夠?qū)r效性提升到分鐘級。在原有讀寫操作非常粗粒度的情況下,能夠?qū)崿F(xiàn)流式更新、批式更新能夠?qū)崿F(xiàn)流式讀取,甚至能夠?qū)崿F(xiàn)對查詢文件的過濾,以實(shí)現(xiàn)高性能查詢的效果。
簡而言之,實(shí)時湖倉解鎖了完整的大數(shù)據(jù)全生態(tài),在一套存儲架構(gòu)上,能夠?qū)崿F(xiàn)流批一體的計(jì)算,也能完成典型的 OLAP 查詢。
原有的批式數(shù)據(jù)倉庫可以被認(rèn)為是經(jīng)典的綠皮車,運(yùn)行緩慢,但吞吐量大,成本低。而實(shí)時數(shù)據(jù)倉庫則像是飛機(jī),非常快,但成本很高,也容易出問題,就像下雨天常常有飛機(jī)晚點(diǎn)。而實(shí)時湖倉想要達(dá)到的效果是傳統(tǒng)火車的升級版,仍然在地上,仍然是傳統(tǒng)火車的成本,但可以變得更快,像高鐵一樣。
二、Apache Paimon
介紹完實(shí)時湖倉是什么之后,在介紹 Paimon 之前,需要先談?wù)?Iceberg。Apache Iceberg 在國外使用相當(dāng)廣泛,在最近一次峰會中,其創(chuàng)始人表示,Iceberg 是 Shared Database Storage for Big Data,即共享數(shù)據(jù)庫存儲。如何理解這句話呢?在傳統(tǒng)的大數(shù)據(jù)中,共享的不僅僅是 Iceberg,Hive 也是共享的,幾乎所有的計(jì)算引擎都可以訪問 Hive,包括 Doris、Spark、Flink 等。但為什么 Hive 的存儲方式不行呢?因?yàn)樗皇且粋€共享的文件存儲,而不是共享的數(shù)據(jù)庫存儲,它缺少了太多能力。所以,Iceberg 在國外的定位就是 Hive 存儲格式的升級。
Iceberg 增加的能力主要包括:
- 對象存儲友好
- ACID transactions
- INSERT & UPDATE & DELETE
- Time Travel and rollback
- Schema Evolution
- Tag & Branch
Paimon 則是站在 Iceberg 這個巨人的肩膀上做了全新的設(shè)計(jì)。Paimon 核心的創(chuàng)新點(diǎn)就是原生支持了在一張表上對它定義組件,定義組件之后就可以對于這張表進(jìn)行流式的更新。舉個例子,針對 MySQL,定義主鍵之后,可以對它進(jìn)行一些 update 的更新,同一個主鍵不用先去刪再去增,直接去寫 insert 即可。這樣就解鎖了流式處理的能力。可以在 Flink 中掛載一個 Sink,直接進(jìn)行流式更新這張表,然后基于它的組件進(jìn)行一次更新。在更新的過程中,也可以像 MySQL 一樣產(chǎn)生對應(yīng)的 changlog,讓流處理更加簡單。
那么 Paimon 是如何進(jìn)行主鍵更新的呢?主鍵更新的底層核心結(jié)構(gòu)是 LSM,也就是 Log-Structured Merge-Tree。這種結(jié)構(gòu)已經(jīng)得到了廣泛驗(yàn)證,更適合更新或偏實(shí)時的領(lǐng)域。Paimon 在這方面的創(chuàng)新是將 LSM 結(jié)構(gòu)引入湖格式中,將實(shí)時更新、實(shí)時消費(fèi)帶入了湖格式。
實(shí)際上 LSM 結(jié)構(gòu)很簡單,它是一個排好序的層次結(jié)構(gòu)。它給湖格式更新帶來的最大好處是,在進(jìn)行壓縮(compaction)時,不需要全部重寫一遍。從圖中可以看到,它實(shí)際上是一個三角形,越底層的數(shù)據(jù)量越大。LSM 結(jié)構(gòu)只需要維護(hù)幾層數(shù)據(jù),這意味著新來的數(shù)據(jù)只需要與最上層的數(shù)據(jù)進(jìn)行合并(merge),進(jìn)行小規(guī)模的壓縮(minor compaction),這樣整體的寫磁盤(write amplification)就非常小,因此壓縮的效率要高得多。至于讀取操作,LSM 結(jié)構(gòu)也是排好序的,可以進(jìn)行讀取時合并(merge on read),對每一層已經(jīng)排好序的數(shù)據(jù)進(jìn)行合并讀取,其成本也不會太大。
上圖展示了 Paimon 從過去到現(xiàn)在再到未來的發(fā)展歷程和方向。Paimon 最初作為 Flink 的一個子項(xiàng)目在 Flink 社區(qū)中發(fā)展,最初的名字叫做 Flink Table Store。隨著我們的發(fā)展,隨著一些業(yè)務(wù)的落地,我們發(fā)現(xiàn)實(shí)際上大家需要的是一個共享的湖格式,而不是一個簡單的 Flink 組件。Spark 和 OLAP 引擎等都需要讀取 Paimon 的數(shù)據(jù),并希望與 Paimon 進(jìn)行更深層次的集成。因此,我們決定將 Paimon 從 Flink 社區(qū)中獨(dú)立出來,成為一個全新的項(xiàng)目。經(jīng)過一年的孵化期,發(fā)布了通用可用(GA)版本,并且許多企業(yè)都在不斷優(yōu)化這個方案,直到 2024 年 3 月,Paimon 正式畢業(yè),成為 Apache 的一個頂級項(xiàng)目。
這次畢業(yè)實(shí)際上也標(biāo)志著 Paimon 不再是 Flink 的一個子項(xiàng)目,它不僅與 Flink,還與 Spark 和其他引擎,包括 Doris、StarRocks 等 OLAP 引擎都有了非常好的集成。預(yù)計(jì)在 2024 年下半年會正式發(fā)布 1.0 版本,這意味著 Paimon 在整個大數(shù)據(jù)引擎中的 OLAP 領(lǐng)域,已經(jīng)實(shí)現(xiàn)了非常好的集成。
三、應(yīng)用場景
第一個場景是 Paimon 最初開始應(yīng)用的場景,2023 年的主流應(yīng)用是這樣的簡單場景:數(shù)據(jù)庫 CDC 入湖,Paimon 可以使 CDC 入湖變得更簡單、更高效、更自動化,鏈路也更簡潔。你可以直接啟動一個 Flink 作業(yè)寫入 Paimon,然后用 Spark 來查詢,其它的清理、compaction(壓縮)等工作都為你自動完成。
在這個基礎(chǔ)上,Paimon 社區(qū)也提供了一套工具,可以幫助你進(jìn)行 schema evolution,將 MySQL 的數(shù)據(jù),甚至 Kafka 的數(shù)據(jù)同步到 Paimon 中。上游增加列,Paimon 也會跟著增加列。還有一些整庫同步的功能,通過一個 Paimon 作業(yè)就可以同步成百上千張這樣的小表。
這里分享一張阿里智能引擎實(shí)踐的示例圖。智能引擎的核心問題是下游有各種各樣的需求來讀取業(yè)務(wù)庫的表,可能需要將業(yè)務(wù)庫的表發(fā)送到 Kafka 中,或者并行讀取的需求,許多請求直接打到業(yè)務(wù)的備庫上,可能導(dǎo)致業(yè)務(wù)庫在很多時候不夠穩(wěn)定,整體的并發(fā)也受到限制。因此,業(yè)務(wù)庫偶爾有掛掉的風(fēng)險(xiǎn),而且只能安排在晚上處理,白天直接處理可能會導(dǎo)致系統(tǒng)崩潰。
這里進(jìn)行的一個改變就是通過 Paimon,將 Paimon 作為整個業(yè)務(wù)數(shù)據(jù)庫的統(tǒng)一鏡像表。Paimon 相比 Hive 的優(yōu)勢在于,可以通過 schema 離線地將 MySQL 表同步到 Paimon 中。Paimon 的下游可以支持分鐘級的流計(jì)算,可以進(jìn)行流式讀取,也可以批量查詢 Paimon 表,批量查詢的時效性是分鐘級的。因此,其核心是將流和批處理都統(tǒng)一到了 Paimon 這張表上,所有下游業(yè)務(wù)都通過 Paimon 的統(tǒng)一入口來消費(fèi)業(yè)務(wù)庫的數(shù)據(jù)。因此,整體的吞吐量沒有上限,因?yàn)楸娝苤琍aimon 是建立在文件系統(tǒng)上的,全天 24 小時都可以進(jìn)行數(shù)據(jù)拉取,對業(yè)務(wù)庫的壓力小了很多。
在這個場景中,Paimon 提供了很多更新的能力,不僅僅是更新,保留最后一條記錄,也可以在更新時定義部分更新,還可以在 Paimon 上定義聚合引擎,在湖上完成一個自動聚合的能力,或是通過 Paimon 的 change log producer 來實(shí)時產(chǎn)生 change log 給下游消費(fèi)。因此可以基于 Flink 加 Paimon 構(gòu)建出完整的一套流的這樣一個 ETL,這條鏈路當(dāng)中幾乎沒有 state 的存在,所有的數(shù)據(jù)都是基于分鐘級的批量更新,因此成本很低。查詢可以通過 Doris、StarRocks 來查詢。
另一個要分享的是螞蟻的一個應(yīng)用實(shí)踐。需要說明的一點(diǎn)是,這里講的并不是要替代實(shí)時鏈路,而是許多離線鏈路希望變得更加實(shí)時,但由于實(shí)時處理的成本太高,所以很難遷移過來。
在螞蟻計(jì)算 UV 指標(biāo)的例子中,之前是使用 Flink 的全狀態(tài)鏈路來實(shí)現(xiàn)的,但后來發(fā)現(xiàn)大量業(yè)務(wù)難以遷移到這種模式,因此將其替換為 Paimon。利用 Paimon 的 upsert(更新或插入)更新機(jī)制來進(jìn)行去重,并且利用 Paimon 的輕量級日志 changlog 來消費(fèi)數(shù)據(jù),為下游提供實(shí)時的 PV(Page View,頁面瀏覽量)和 UV 計(jì)算。
在整體資源消耗方面,Paimon 方案使得整體 CPU 使用率下降了 60%,同時 checkpoint 的穩(wěn)定性也得到了顯著提升。此外,由于 Paimon 支持 point-to-point(端到端)寫入,任務(wù)的回滾和重置時間也大幅減少。整體架構(gòu)因?yàn)樽兊酶雍唵危虼嗽跇I(yè)務(wù)研發(fā)成本上也實(shí)現(xiàn)了降低。
接下來分享的是偏向 OLAP(在線分析處理)的應(yīng)用場景。首先,Spark 與 Paimon 的集成非常好,不遜于 Spark 的內(nèi)表。通過 Spark 或 Flink 進(jìn)行一些 ETL(提取、轉(zhuǎn)換、加載)操作,將數(shù)據(jù)寫入 Paimon 中。基于 Paimon 進(jìn)行 z-order 排序、聚簇,甚至構(gòu)建文件級索引,然后通過 Doris 或 StarRocks 進(jìn)行 OLAP 查詢,這樣就可以達(dá)到全鏈路 OLAP 的效果。
內(nèi)部對阿里旗下的餓了么進(jìn)行了評測。當(dāng)然也可以將所有數(shù)據(jù)寫入 OLAP 類型的表,但 OLAP 系統(tǒng)的問題主要是其存儲是基于 SSD 的,它與計(jì)算緊密結(jié)合,為了達(dá)到 OLAP 性能,其成本非常高,導(dǎo)致大量數(shù)據(jù)無法實(shí)時化。
而將數(shù)據(jù)直接寫入 Paimon,因?yàn)?Paimon 背后是 OSS 這類對象存儲,其整體成本非常低,但時效性只有 1 到 5 分鐘,所以這里需要權(quán)衡,對于某些對時效性要求不高的數(shù)據(jù),可以直接寫入 Paimon,通過 Paimon 的一些排序或數(shù)據(jù)聚簇手段,使數(shù)據(jù)更利于 OLAP 查詢。然后使用 StarRocks 或 Doris 直接進(jìn)行 OLAP 查詢,其查詢延遲在大多數(shù)時候與 OLAP 內(nèi)表相差不大。但其成本能降到直接進(jìn)入 OLAP 系統(tǒng)成本的 1/10,這樣做的效果可以加速更多更大量的業(yè)務(wù)數(shù)據(jù)。
四、Paimon 前沿技術(shù)
最后來分享一下 Paimon 相關(guān)的一些前沿技術(shù)。
在數(shù)據(jù)湖格式中,大家聽得最多的可能是 merge on read 或 copy on write。Merge on read 即在更新時保留大量 Delta 數(shù)據(jù),查詢時會比較慢。Copy on write 即在更新時直接對數(shù)據(jù)進(jìn)行重寫,寫入成本非常大,但讀取數(shù)據(jù)非常高效。所以 merge on read 和 copy on write 是兩個極端。Merge on write 想做的是,比如在上圖的主鍵表定義一個主鍵,定義一個 deletion vectors 模式。它要做的是在寫入時,流式生成對原有數(shù)據(jù)的 deletion vectors,這樣不是只寫增量,而是先刪除之前的數(shù)據(jù)再寫增量,讀取時只需讀取之前的文件,再基于 deleting vector 直接進(jìn)行高效的 OLAP 查詢。所以大家把這種模式定義為 merge on write,即在寫入時進(jìn)行一定 merge,帶來的效果是寫入慢一些,但讀取快很多,因此這是一個更新和極速查詢兼得的方案。
Paimon 在最新版本中完全支持了標(biāo)簽(tag)和分支(branch)的功能。不僅支持標(biāo)簽,最新版本還支持了標(biāo)簽的自動 TTL(Time-To-Live,生存時間)管理。當(dāng)你將標(biāo)簽和分支結(jié)合起來使用,會覺得整個 Paimon 的數(shù)據(jù)操作可以像 Git 一樣。這在很多情況下非常有用,比如進(jìn)行工程驗(yàn)證或測試時,使用這些分支和標(biāo)簽會非常方便。
另外,我們內(nèi)部目前正在進(jìn)行的一件事是基于 branch 能力來實(shí)現(xiàn)完整的流批一體的概念。比如,有一個分支是用于流處理,正在進(jìn)行流式讀取和寫入,還有另一個 branch 是批處理的,我可以同時進(jìn)行批處理的寫操作。這樣,基于同一張表,從業(yè)務(wù)角度來看,它能夠?qū)崿F(xiàn)流和批完全隔離的流批一體效果。
關(guān)于通用索引的支持,正如剛才提到的 OLAP 場景,deletion vector 模式也是一種面向 OLAP 的技術(shù)手段,通用索引的支持也是向優(yōu)秀的 OLAP 引擎看齊的一種手段。例如,像 Doris、StarRocks 這樣的 OLAP 引擎,它們不僅支持 Min/Max 索引,還有 bitmap、Bloom Filter、倒排索引等能力。湖格式(Lake Format)也可以擁有這樣豐富的索引能力,并且可以在低廉的對象存儲基礎(chǔ)上,實(shí)現(xiàn)非常高效的基于過濾條件的數(shù)據(jù)跳過(data skipping),達(dá)到高效的 OLAP 查詢能力。所以 Paimon 的最新版本也在研發(fā) bitmap 索引,后續(xù)也會探索倒排索引的實(shí)現(xiàn)。大家都知道,一旦命中 bloomfilter,可能會有 10 到 100 倍的性能提升,命中 bitmap 索引也可能有數(shù)倍的性能提升。