字節(jié)跳動(dòng)數(shù)據(jù)湖技術(shù)選型的思考
字節(jié)跳動(dòng)數(shù)據(jù)集成的現(xiàn)狀
在2018年,我們基于Flink構(gòu)造了異構(gòu)數(shù)據(jù)源之間批式同步通道,主要用于將在線數(shù)據(jù)庫(kù)導(dǎo)入到離線數(shù)倉(cāng),和不同數(shù)據(jù)源之間的批式傳輸。
在2020年,我們基于Flink構(gòu)造了MQ-Hive的實(shí)時(shí)數(shù)據(jù)集成通道,主要用于將消息隊(duì)列中的數(shù)據(jù)實(shí)時(shí)寫(xiě)入到Hive和HDFS,在計(jì)算引擎上做到了流批統(tǒng)一。
到了2021年,我們基于Flink構(gòu)造了實(shí)時(shí)數(shù)據(jù)湖集成通道,從而完成了湖倉(cāng)一體的數(shù)據(jù)集成系統(tǒng)的構(gòu)建。
字節(jié)跳動(dòng)數(shù)據(jù)集成系統(tǒng)目前支持了幾十條不同的數(shù)據(jù)傳輸管道,涵蓋了線上數(shù)據(jù)庫(kù),例如Mysql Oracle和MangoDB;消息隊(duì)列,例如Kafka RocketMQ;大數(shù)據(jù)生態(tài)系統(tǒng)的各種組件,例如HDFS、HIVE和ClickHouse。
在字節(jié)跳動(dòng)內(nèi)部,數(shù)據(jù)集成系統(tǒng)服務(wù)了幾乎所有的業(yè)務(wù)線,包括抖音、今日頭條等大家耳熟能詳?shù)膽?yīng)用。
整個(gè)系統(tǒng)主要分成3種模式——批式集成、流式集成和增量集成。
- 批式集成模式基于Flink Batch模式打造,將數(shù)據(jù)以批的形式在不同系統(tǒng)中傳輸,目前支持了20多種不同數(shù)據(jù)源類型。
- 流式集成模式主要是從MQ將數(shù)據(jù)導(dǎo)入到Hive和HDFS,任務(wù)的穩(wěn)定性和實(shí)時(shí)性都受到了用戶廣泛的認(rèn)可。
- 增量模式即CDC模式,用于支持通過(guò)數(shù)據(jù)庫(kù)變更日志Binlog,將數(shù)據(jù)變更同步到外部組件的數(shù)據(jù)庫(kù)。這種模式目前支持5種數(shù)據(jù)源,雖然數(shù)據(jù)源不多,但是任務(wù)數(shù)量非常龐大,其中包含了很多核心鏈路,例如各個(gè)業(yè)務(wù)線的計(jì)費(fèi)、結(jié)算等,對(duì)數(shù)據(jù)準(zhǔn)確性要求非常高。在CDC鏈路的整體鏈路比較長(zhǎng)。首先,首次導(dǎo)入為批式導(dǎo)入,我們通過(guò)Flink Batch模式直連Mysql庫(kù)拉取全量數(shù)據(jù)寫(xiě)入到Hive,增量Binlog數(shù)據(jù)通過(guò)流式任務(wù)導(dǎo)入到HDFS。由于Hive不支持更新操作,我們依舊使用了一條基于Spark的批處理鏈路,通過(guò)T-1增量合并的方式,將前一天的Hive表和新增的Binlog進(jìn)行合并從而產(chǎn)出當(dāng)天的Hive表。
隨著業(yè)務(wù)的快速發(fā)展,這條鏈路暴露出來(lái)的問(wèn)題也越來(lái)越多。
- 首先,這條基于Spark的離線鏈路資源消耗嚴(yán)重,每次產(chǎn)出新數(shù)據(jù)都會(huì)涉及到一次全量數(shù)據(jù)Shuffle以及一份全量數(shù)據(jù)落盤(pán),中間所消耗的儲(chǔ)存以及計(jì)算資源都比較嚴(yán)重。
- 同時(shí),隨著字節(jié)跳動(dòng)業(yè)務(wù)的快速發(fā)展,近實(shí)時(shí)分析的需求也越來(lái)越多。
- 最后,整條鏈路流程太長(zhǎng),涉及到Spark和Flink兩個(gè)計(jì)算引擎,以及3個(gè)不同的任務(wù)類型,用戶使用成本和學(xué)習(xí)成本都比較高,并且?guī)?lái)了不小的運(yùn)維成本。
為了解決這些問(wèn)題,我們希望對(duì)增量模式做一次徹底的架構(gòu)升級(jí),將增量模式合并到流式集成中,從而可以擺脫對(duì)Spark的依賴,在計(jì)算引擎層面做到統(tǒng)一。
改造完成后,基于Flink的數(shù)據(jù)集成引擎就能同時(shí)支持批式、流式和增量模式,幾乎可以覆蓋所有的數(shù)據(jù)集成場(chǎng)景。
同時(shí),在增量模式上,提供和流式通道相當(dāng)?shù)臄?shù)據(jù)延遲,賦予用戶近實(shí)時(shí)分析能力。在達(dá)到這些目標(biāo)的同時(shí),還可以進(jìn)一步降低計(jì)算成本、提高效率。
經(jīng)過(guò)一番探索,我們關(guān)注到了正在興起的數(shù)據(jù)湖技術(shù)。
關(guān)于數(shù)據(jù)湖技術(shù)選型的思考我們的目光集中在了Apache軟件基金會(huì)旗下的兩款開(kāi)源數(shù)據(jù)湖框架Iceberg和Hudi中。Iceberg和Hudi兩款數(shù)據(jù)湖框架都非常優(yōu)秀。但兩個(gè)項(xiàng)目被創(chuàng)建的目的是為了解決不同的問(wèn)題,所以在功能上的側(cè)重點(diǎn)也有所不同。
- Iceberg:核心抽象對(duì)接新的計(jì)算引擎的成本比較低,并且提供先進(jìn)的查詢優(yōu)化功能和完全的schema變更。
- Hudi:更注重于高效率的Upsert和近實(shí)時(shí)更新,提供了Merge On Read文件格式,以及便于搭建增量ETL管道的增量查詢功能。
一番對(duì)比下來(lái),兩個(gè)框架各有千秋,并且離我們想象中的數(shù)據(jù)湖最終形態(tài)都有一定距離,于是我們的核心問(wèn)題便集中在了以下兩個(gè)問(wèn)題:
- 哪個(gè)框架可以更好的支持我們CDC數(shù)據(jù)處理的核心訴求?
- 哪個(gè)框架可以更快速補(bǔ)齊另一個(gè)框架的功能,從而成長(zhǎng)為一個(gè)通用并且成熟的數(shù)據(jù)湖框架?
經(jīng)過(guò)多次的內(nèi)部討論,我們認(rèn)為:Hudi在處理CDC數(shù)據(jù)上更為成熟,并且社區(qū)迭代速度非常快,特別是最近一年補(bǔ)齊了很多重要的功能,與Flink的集成也愈發(fā)成熟,最終我們選擇了Hudi作為我們的數(shù)據(jù)湖底座。
01 - 索引系統(tǒng)
我們選擇Hudi,最為看重的就是Hudi的索引系統(tǒng)。
這張圖是一個(gè)有索引和沒(méi)有索引的對(duì)比。在CDC數(shù)據(jù)寫(xiě)入的過(guò)程中,為了讓新增的Update數(shù)據(jù)作用在底表上,我們需要明確知道這條數(shù)據(jù)是否出現(xiàn)過(guò)、出現(xiàn)在哪里,從而把數(shù)據(jù)寫(xiě)到正確的地方。
在合并的時(shí)候,我們就可以只合并單個(gè)文件,而不需要去管全局?jǐn)?shù)據(jù)。如果沒(méi)有索引,合并的操作只能通過(guò)合并全局?jǐn)?shù)據(jù),帶來(lái)的就是全局的shuffle。在圖中的例子中,沒(méi)有索引的合并開(kāi)銷是有索引的兩倍,并且如果隨著底表數(shù)據(jù)量的增大,這個(gè)性能差距會(huì)呈指數(shù)型上升。
所以,在字節(jié)跳動(dòng)的業(yè)務(wù)數(shù)據(jù)量級(jí)下,索引帶來(lái)的性能收益是非常巨大的。Hudi提供了多種索引來(lái)適配不同的場(chǎng)景,每種索引都有不同的優(yōu)缺點(diǎn),索引的選擇需要根據(jù)具體的數(shù)據(jù)分布來(lái)進(jìn)行取舍,從而達(dá)到寫(xiě)入和查詢的最優(yōu)解。下面舉兩個(gè)不同場(chǎng)景的例子。
日志數(shù)據(jù)去重場(chǎng)景
在日志數(shù)據(jù)去重的場(chǎng)景中,數(shù)據(jù)通常會(huì)有一個(gè)create_time的時(shí)間戳,底表的分布也是按照這個(gè)時(shí)間戳進(jìn)行分區(qū),最近幾小時(shí)或者幾天的數(shù)據(jù)會(huì)有比較頻繁的更新,但是更老的數(shù)據(jù)則不會(huì)有太多的變化。冷熱分區(qū)的場(chǎng)景就比較適合布隆索引、帶TTL的State索引和哈希索引。
CDC場(chǎng)景
第二個(gè)例子是一個(gè)數(shù)據(jù)庫(kù)導(dǎo)出的例子,也就是CDC場(chǎng)景。這個(gè)場(chǎng)景更新數(shù)據(jù)會(huì)隨機(jī)分布,沒(méi)有什么規(guī)律可言,并且底表的數(shù)據(jù)量會(huì)比較大,新增的數(shù)據(jù)量通常相比底表會(huì)比較小。在這種場(chǎng)景下,我們可以選用哈希索引、State索引和Hbase索引來(lái)做到高效率的全局索引。這兩個(gè)例子說(shuō)明了不同場(chǎng)景下,索引的選擇也會(huì)決定了整個(gè)表讀寫(xiě)性能。Hudi提供多種開(kāi)箱即用的索引,已經(jīng)覆蓋了絕大部分場(chǎng)景,用戶使用成本非常低。
02 - Merge On Read表格式
除了索引系統(tǒng)之外,Hudi的Merge On Read表格式也是一個(gè)我們看重的核心功能之一。這種表格式讓實(shí)時(shí)寫(xiě)入、近實(shí)時(shí)查詢成為了可能。在大數(shù)據(jù)體系的建設(shè)中,寫(xiě)入引擎和查詢引擎存在著天然的沖突:
- 寫(xiě)入引擎更傾向于寫(xiě)小文件,以行存的數(shù)據(jù)格式寫(xiě)入,盡可能避免在寫(xiě)入過(guò)程中有過(guò)多的計(jì)算包袱,最好是來(lái)一條寫(xiě)一條。
- 查詢引擎則更傾向于讀大文件,以列存的文件格式儲(chǔ)存數(shù)據(jù),比如說(shuō)parquet和orc,數(shù)據(jù)以某種規(guī)則嚴(yán)格分布,比如根據(jù)某個(gè)常用字段進(jìn)行排序,從而做到可以在查詢的時(shí)候,跳過(guò)掃描無(wú)用的數(shù)據(jù),來(lái)減少計(jì)算開(kāi)銷。
為了在這種天然的沖突下找到最佳的取舍,Hudi支持了Merge On Read的文件格式。
MOR格式中包含兩種文件:一種是基于行存Avro格式的log文件,一種是基于列存格式的base文件,包括Parquet或者ORC。log文件通常體積較小,包含了新增的更新數(shù)據(jù)。base文件體積較大,包含了所有的歷史數(shù)據(jù)。
- 寫(xiě)入引擎可以低延遲的將更新的數(shù)據(jù)寫(xiě)入到log文件中。
- 查詢引擎在讀的時(shí)候?qū)og文件與base文件進(jìn)行合并,從而可以讀到最新的視圖;compaction任務(wù)定時(shí)觸發(fā)合并base文件和log文件,避免log文件持續(xù)膨脹。在這個(gè)機(jī)制下,Merge On Read文件格式做到了實(shí)時(shí)寫(xiě)入和近實(shí)時(shí)查詢。
03 - 增量計(jì)算
索引系統(tǒng)和Merge On Read格式給實(shí)時(shí)數(shù)據(jù)湖打下了非常堅(jiān)實(shí)的基礎(chǔ),增量計(jì)算則是這個(gè)基礎(chǔ)之上的Hudi的又一個(gè)亮眼功能:
?增量計(jì)算賦予了Hudi類似于消息隊(duì)列的能力。用戶可以通過(guò)類似于offset的時(shí)間戳,在Hudi的時(shí)間線上拉取一段時(shí)間內(nèi)的新增數(shù)據(jù)。在一些數(shù)據(jù)延遲容忍度在分鐘級(jí)別的場(chǎng)景中,基于Hudi可以統(tǒng)一Lambda架構(gòu),同時(shí)服務(wù)于實(shí)時(shí)場(chǎng)景和離線場(chǎng)景,在儲(chǔ)存上做到流批一體。
結(jié)語(yǔ)在選擇了基于Hudi的數(shù)據(jù)湖框架后,我們基于字節(jié)跳動(dòng)內(nèi)部的場(chǎng)景,打造定制化落地方案。我們的目標(biāo)是通過(guò)Hudi來(lái)支持所有帶Update的數(shù)據(jù)鏈路。