小紅書(shū)基于數(shù)據(jù)湖的流批統(tǒng)一存儲(chǔ)實(shí)踐
一、Lambda架構(gòu)與實(shí)時(shí)數(shù)倉(cāng)開(kāi)發(fā)痛點(diǎn)
1、小紅書(shū)的數(shù)據(jù)平臺(tái)概覽
首先來(lái)整體介紹一下小紅書(shū)的數(shù)據(jù)平臺(tái)。
首先在最底層是一個(gè)個(gè) Cloud,包括計(jì)算、存儲(chǔ)等。在這一基礎(chǔ)之上,是數(shù)據(jù)采集層,采集一些原始數(shù)據(jù),比如用戶行為日志數(shù)據(jù)、RDBMS 關(guān)系型數(shù)據(jù)庫(kù)的增量日志數(shù)據(jù),以及其他一些文件系統(tǒng)等。
然后基于源頭數(shù)據(jù)層(ODS 層)之上是數(shù)據(jù)存儲(chǔ)和加工層,主要分為兩大塊:一是偏離線的部分,主要使用 Hive、Spark 計(jì)算,使用 AWS S3 存儲(chǔ);二是偏實(shí)時(shí)的部分,主要使用 Flink 計(jì)算,使用 Kafka 存儲(chǔ)。
再往上是一個(gè)數(shù)據(jù)共享層,我們把一些聚合數(shù)據(jù)、Join 數(shù)據(jù)和寬表數(shù)據(jù)寫(xiě)入數(shù)據(jù)共享的一些分析引擎中,比如 ClickHouse、StarRocks、TiDB、HBase 等等。這些都是作為數(shù)據(jù)共享層數(shù)據(jù)存儲(chǔ)的底座,以及計(jì)算分析引擎的一個(gè)入口。
最上面是應(yīng)用層,我們基于這一層做報(bào)表、即時(shí)查詢等,還會(huì)對(duì)數(shù)據(jù)做封裝,打造一些統(tǒng)一的數(shù)據(jù)產(chǎn)品。
2、典型的 Lambda 架構(gòu)在小紅書(shū)的實(shí)踐現(xiàn)狀
小紅書(shū)采用的是典型的 Lambda 架構(gòu)。實(shí)時(shí)鏈路主要使用 Flink 和 Kafka;離線鏈路主要使用 S3、Spark 和 Hive。Lambda 的特點(diǎn)就是兩條鏈路互相獨(dú)立建設(shè),互不影響。
3、實(shí)時(shí)數(shù)倉(cāng)開(kāi)發(fā)痛點(diǎn)
Lambda 架構(gòu)的痛點(diǎn)可以總結(jié)為三個(gè)方面:
① 實(shí)時(shí)和離線數(shù)據(jù)不一致,造成數(shù)據(jù)不一致的原因主要有三點(diǎn):計(jì)算引擎不一致,相同 SQL 定義也容易產(chǎn)生不同結(jié)果;作業(yè)不同,開(kāi)發(fā)人員需要維護(hù)兩套代碼,技術(shù)門(mén)檻高;數(shù)據(jù) TTL 不同,Join 分析天然誤差。
② Kafka 缺乏數(shù)據(jù)檢索能力,對(duì)用戶來(lái)說(shuō) Kafka 更像一個(gè)黑盒。不管 Kafka 中數(shù)據(jù)存儲(chǔ)的是一些類(lèi)似 protobuf 的數(shù)據(jù)還是 json 格式的數(shù)據(jù),在做檢索的時(shí)候都非常困難。如果用戶想要根據(jù)某個(gè)條件去檢索數(shù)據(jù),這個(gè)數(shù)據(jù)很難被查找。KSQL 產(chǎn)品更像是一個(gè) streaming 的處理,更注重的是實(shí)時(shí)流處理能力,用來(lái)做離線大規(guī)模檢索并不適合。
③ 流存儲(chǔ)存數(shù)據(jù)有限,回溯效率低。這一點(diǎn)最大的原因是成本高,數(shù)據(jù)不能無(wú)限存。而且如果要去回溯讀,從歷史上去回追數(shù)據(jù),它讀的性能也不及批量讀。
二、流批統(tǒng)一存儲(chǔ)架構(gòu)介紹
基于 Lambda 帶來(lái)的痛點(diǎn),我們萌生了去開(kāi)發(fā)一個(gè)流批存儲(chǔ)的產(chǎn)品的想法來(lái)解決 Lambda 的痛點(diǎn)。下面就來(lái)介紹一些設(shè)計(jì)細(xì)節(jié)。
1、流批統(tǒng)一存儲(chǔ)架構(gòu)介紹
如下是流批統(tǒng)一存儲(chǔ)的整體架構(gòu):
我們的流批統(tǒng)一存儲(chǔ)叫 Morphing Server,對(duì)用戶提供的 API 還是跟 Kafka 完全兼容,都是使用流式的方式去寫(xiě)入和消費(fèi),這些接口都沒(méi)有變,所以用戶的使用方式不會(huì)有任何變化。
區(qū)別在于用戶寫(xiě)入數(shù)據(jù)到 Kafka,Kafka 內(nèi)部會(huì)有一個(gè)線程,異步將數(shù)據(jù)同步到數(shù)據(jù)湖中。我們的數(shù)據(jù)湖是采用的 Iceberg,當(dāng)數(shù)據(jù)寫(xiě)入到 Kafka 中,內(nèi)部線程會(huì)去抓取 Leader 數(shù)據(jù),經(jīng)過(guò)一些 Schema 數(shù)據(jù)解析轉(zhuǎn)換為 Table Format 格式寫(xiě)入到 Iceberg 中,這個(gè)過(guò)程是異步的,對(duì)用戶來(lái)說(shuō)是無(wú)感的。
Kafka 的數(shù)據(jù)會(huì)被其他 Flink 作業(yè)消費(fèi),消費(fèi)完之后可以寫(xiě)到下一個(gè) Kafka 中,在下一個(gè) Kafka 依然是以異步的形式將數(shù)據(jù)落地到數(shù)據(jù)湖中。數(shù)據(jù)湖中的數(shù)據(jù)就可以提供批讀取和批存儲(chǔ)的能力。對(duì)于 Iceberg 中的數(shù)據(jù)如何去讀取的問(wèn)題,我們會(huì)根據(jù)實(shí)際情況選取一些高性能的分析引擎,比如 StarRocks、小紅書(shū)自研的 RedCK 等來(lái)讀取離線數(shù)據(jù)。
2、產(chǎn)品能力
這里我們總結(jié)了 6 點(diǎn)流批統(tǒng)一存儲(chǔ)所提供的能力。
① 流批統(tǒng)一:同時(shí)提供流存儲(chǔ)和批存儲(chǔ)的讀寫(xiě)能力,構(gòu)建多種應(yīng)用場(chǎng)景。
② 無(wú)感寫(xiě)入:對(duì)外提供的寫(xiě)入接口為原生 Kafka API,用戶無(wú)需關(guān)注落數(shù)據(jù)湖過(guò)程,自動(dòng)異步寫(xiě)湖。
③ Schema 解析:數(shù)據(jù)在落湖前會(huì)提前進(jìn)行 Schema 解析,以結(jié)構(gòu)化、半結(jié)構(gòu)化的 Table 形式提供查詢。
④ 高速分析:借助 StarRocks 引擎的強(qiáng)大湖上查詢能力,能夠提供向量化、CBO 等高速查詢能力。
⑤ Exactly-Once:流、批數(shù)據(jù)實(shí)現(xiàn) Exactly-Once 語(yǔ)義,數(shù)據(jù)一致性高。
⑥ 支持 Rollback:支持批數(shù)據(jù)的 Rollback 能力,在 Schema 變更不及時(shí)下,回溯修復(fù)數(shù)據(jù)。
接下去,我們介紹一下技術(shù)選項(xiàng)是如何去考量?關(guān)于技術(shù)選項(xiàng)分為兩個(gè)部分:自動(dòng)落湖的過(guò)程如何選擇;對(duì)于數(shù)據(jù)湖中的數(shù)據(jù)如何選取合適的引擎去更加高效讀取
3、選型考量:Builtin Or Extension?
對(duì)于自動(dòng)落湖過(guò)程我們考慮了兩種形式,Builtin(內(nèi)嵌)和 Extension(外掛插件),這兩種形式其實(shí)都是可以的。
(1)Builtin 形式?
在 Builtin 的形式下,我們看到只有一個(gè)獨(dú)立的進(jìn)程,在里面處理落日志之外,還會(huì)有一個(gè)異步的線程叫 Iceberg Syncer 去不斷拉取日志中的數(shù)據(jù),然后寫(xiě)入湖中,這種方式有優(yōu)勢(shì)也有劣勢(shì)。
優(yōu)勢(shì)如下:
① 產(chǎn)品形態(tài)完整,統(tǒng)一入口。
② 不需要額外維護(hù)外部組件。
③ 資源利用率高,共享進(jìn)程。
劣勢(shì)如下:
① 企業(yè)內(nèi)生成集群版本難以升級(jí),在企業(yè)中有一些集群并沒(méi)有流批一體的功能,在升級(jí)中會(huì)非常困難。
② 進(jìn)程隔離性弱,如果在異步線程中產(chǎn)生 bug,可能影響 Kafka 正常的讀寫(xiě)功能。
(2)Extension 形式?
針對(duì) Builtin 形式的一些劣勢(shì),我們當(dāng)初考慮了另外一種選項(xiàng) - Extension,這個(gè)方式相對(duì)更加直觀。
Extension 形式,也存在著一些優(yōu)勢(shì)和劣勢(shì)。
優(yōu)勢(shì)如下:
① 接入靈活,集群不需要升級(jí),我們把 Kafka 落湖進(jìn)程摘取到 Kafka 進(jìn)程之外,是一個(gè)單獨(dú)的進(jìn)程,這是最大的一個(gè)好處。
② 流存儲(chǔ)可替換,并不局限于 Kafka,可以替換成其他引擎。
③ 進(jìn)程隔離。
劣勢(shì)如下:
① 運(yùn)維成本高,組件依賴過(guò)多,需要維護(hù)兩套組件。
② 產(chǎn)品體驗(yàn)稍差,整體性弱。
目前我們落地的是 Builtin 的方式,所以后面介紹的一些細(xì)節(jié)方案都是基于 Builtin 方式的。
4、查詢 & 分析引擎選擇
接下來(lái)介紹查詢分析引擎的選型。我們希望找到一款 OLAP 產(chǎn)品,具備以下特點(diǎn):
① MPP 架構(gòu)、向量化和 CBO 來(lái)提高分析性能。
② 支持多場(chǎng)景,能夠在各種場(chǎng)景下滿足我們的需求。
③ 大規(guī)模,離線分析數(shù)量大,數(shù)據(jù)種類(lèi)多的情況下,在大規(guī)模數(shù)據(jù)量下性能不退化。
基于這些考量,有兩大類(lèi)選擇:左邊的是 Apache Doris 和 StarRocks 為代表的 OLAP 分析引擎;右邊是 ClickHouse 和小紅書(shū)基于 ClickHouse 自研的 RedCK 分析引擎。
左邊的分析引擎對(duì)分布式支持更好,對(duì) SQL 協(xié)議兼容性高,提供更加一站式的查詢平臺(tái)。右邊的分析引擎對(duì)單表性能更加優(yōu)秀,在超大規(guī)模下的數(shù)據(jù)承載能力更強(qiáng),特別是我們?cè)?RedCK 上做了一些深度的定制化自研去滿足更多應(yīng)用場(chǎng)景。
(1)StarRocks(湖上分析)?
下面介紹我們?cè)诜植际揭嫔线x擇的 StarRocks。
StarRocks 支持湖上分析能力。它本身支持讀數(shù)據(jù)湖,不需要將數(shù)據(jù)以任何形式同步到 StarRocks 上,更像一種外表的形式,可以通過(guò) Iceberg 的 Catalog 去查詢數(shù)據(jù),還會(huì)做一些 Cache 緩存來(lái)加速查詢。
(2)StarRocks vs Persto 在流批一體(Iceberg)上的查詢對(duì)比?
我們對(duì) StarRocks 和 Presto 在流批一體上做了查詢性能的對(duì)比,主要分為兩大類(lèi),四小類(lèi)的 SQL 進(jìn)行比對(duì)。
左邊主要是 Scan 全表掃描相關(guān),在這一方面 Presto 的性能更加優(yōu)越,但是兩者差距不大。右邊主要是 GroupBy 相關(guān)的聚合場(chǎng)景,具有 MPP 架構(gòu)的 StarRocks 在性能上明顯更加優(yōu)于 Presto。這也是我們選擇 StarRocks 的原因。因?yàn)樵谶@個(gè)應(yīng)用場(chǎng)景下 Join 使用較少,所以這里沒(méi)有進(jìn)行對(duì)比。
(3)RedCK 架構(gòu)?
還有一類(lèi)分析引擎就是之前提到的 ClickHouse 和 RedCK,如何去更好的分析湖上的數(shù)據(jù),這里介紹一下我們自研的 RedCK。
它是一個(gè)存算分離的架構(gòu),主要分為三個(gè)模塊:Service、Query Processing 和 Storage。
Service 主要提供 Gateway 網(wǎng)關(guān)和 Service Discovery 服務(wù)發(fā)現(xiàn),能夠讓業(yè)務(wù)更好的接入;Query Processing 是計(jì)算層,可以去解析 SQL 生成執(zhí)行計(jì)劃,分派這些任務(wù)去讀寫(xiě);Storage 是存儲(chǔ)層,支持文件存儲(chǔ)比如 HDFS 和 Juice FS,還支持對(duì)象存儲(chǔ)比如 OBS 和 COS。
(4)RedCK(湖上分析)?
接下來(lái)看一下 RedCK 和流批存儲(chǔ)是如何結(jié)合的。
RedCK 通過(guò) MergeTree 的格式跟其他查詢引擎打通,比如 Spark、Flink 等計(jì)算可以直接讀寫(xiě) MergeTree 上的數(shù)據(jù),然后通過(guò) RedCK 在 MergeTree 上做 OLAP 分析。這樣的好處是使用 Spark 在寫(xiě)數(shù)據(jù)的時(shí)候可以有一個(gè)更好的性能,做到了讀和寫(xiě)兩種引擎的解耦。
基于這個(gè)考慮,我們?cè)?Kafka 流批一體的引擎在落湖的過(guò)程中,原本只支持傳統(tǒng)的 Parquet 現(xiàn)在也支持寫(xiě) MergeTree 格式,同時(shí)也去提交一些和 RedCK 相兼容的元數(shù)據(jù)信息。這樣 RedCK 可以根據(jù)元數(shù)據(jù)信息直接找到 MergeTree 去做一些分析。
5、架構(gòu)設(shè)計(jì)細(xì)節(jié)
整體上,落湖分為兩大塊:Commit 模塊和 Broker 模塊。
Commit 模塊主要負(fù)責(zé):
① Iceberg 的元信息的管理。
② 協(xié)調(diào) Broker 觸發(fā) Broker 做 Checkpoint。
③ 更新寫(xiě)入 Iceberg 的 WaterMark 和 CheckpointID。
④ Controller 做 RollBack 工作。
Broker 模塊主要負(fù)責(zé)的是數(shù)據(jù)湖寫(xiě)入,利用 Kafka 本身的 Fetch 機(jī)制,將 Leader 上的最新數(shù)據(jù)進(jìn)行解析并且不斷寫(xiě)入,按照 Partition 維度來(lái)做單獨(dú)的線程寫(xiě)入數(shù)據(jù)。
(1)Broker 設(shè)計(jì)細(xì)節(jié)?
Broker 的設(shè)計(jì)主要包括如下內(nèi)容:
① Replica Leader:Kafka 原生部分,處理 Produce 請(qǐng)求和 Consume 請(qǐng)求。
② ReplicaRemoteFetcherThread:主要工作線程,異步 Fetch Leader 數(shù)據(jù),經(jīng)過(guò) Schema 解析,寫(xiě)入 Iceberg。
③ DefultSchemaTransform:Schema 解析模塊,提供寫(xiě)入 Schema Server 變更。
④ IcebergRemoteLogStorageManager:封裝 Iceberg 接口,提供寫(xiě)入 Iceberg 的 API 集合。
⑤ Schema Server:提供 Schema 管理服務(wù),支持 Protobuf、Json 等。
(2)Commiter 設(shè)計(jì)細(xì)節(jié)?
Committer 主要的工作內(nèi)容包括:
① Controller:暫時(shí)復(fù)用 Kafka Controller,實(shí)現(xiàn) Commit 邏輯。
② 與 Broker 交互:發(fā)送 Checkpoint 請(qǐng)求,協(xié)調(diào)各 Broker Checkpoint 信息。
③ 與 Iceberg 交互:發(fā)起 Commit 請(qǐng)求。
(3)Excatly-Once 實(shí)現(xiàn):兩階段提交?
Exactly-once 語(yǔ)義主要依托于兩階段提交來(lái)實(shí)現(xiàn)數(shù)據(jù)不丟不重,具體如下:
① 第一步,Committer 向所有 Broker 發(fā)起一個(gè) RPC 請(qǐng)求,也就是 Checkpoint 請(qǐng)求。
② 第二步,Broker 在接受到 Broker 請(qǐng)求之后將目前為止還沒(méi) Flush 的數(shù)據(jù) Flush 到 Iceberg,完成之后將 Checkpoint 信息記錄到 Checkpoint Storage 中。
③ 第三步,Broker 向 Commiter 返回一個(gè) ACK,告訴 Commiter 已經(jīng)完成 Flush 工作。
④ 第四步,Commiter 等到所有 Broker 返回的 ACK 信息之后,發(fā)起第一階段提交并且記錄到 Checkpoint Storage 中,實(shí)際上做一個(gè) Commiter 和 CheckpointID 關(guān)聯(lián)。
⑤ 最后一步,等第一階段完成之后,發(fā)起第二階段提交,發(fā)出一個(gè) Commit 提交告訴 Iceberg 可以落盤(pán)。
(4)Exactly-Once 實(shí)現(xiàn):故障 Failover?
實(shí)際生產(chǎn)中,常會(huì)出現(xiàn)一些故障。接下來(lái)介紹各種故障情況下,如何保證數(shù)據(jù)的不丟不重。
故障情況大概分為如下幾類(lèi):
① Broker 故障:比如突然宕機(jī),其實(shí)這個(gè)故障沒(méi)有太大關(guān)系,因?yàn)?Kafka 本身有 Leader 切換能力,Leader 切換到其他 Broker 之后,會(huì)在新的 Broker 拉起異步線程寫(xiě) Iceberg。它會(huì)從 Checkpoint Storage 中讀取上一次 Checkpoint,從上一次的 Checkpoint 恢復(fù)這些數(shù)據(jù)去重新寫(xiě)操作。在一次 Checkpoint 數(shù)據(jù)向 Iceberg 的數(shù)據(jù),因?yàn)槭?committer 還沒(méi)有進(jìn)行第二階段提交,對(duì)于 Iceberg 來(lái)說(shuō)是不可見(jiàn)的,可以直接丟棄這些不可見(jiàn)的數(shù)據(jù)。
② Controller 故障:在第一階段提交的時(shí)候失敗,會(huì)被自動(dòng)切換到別的機(jī)器上面去再起一個(gè) Commiter 線程,會(huì)發(fā)現(xiàn)第一階段還沒(méi)完成,那么會(huì)重新向所有 Broker 發(fā)起一輪新的 RPC 請(qǐng)求,重新做一次 Checkpoint,這一次其它 Broker 在接受到 RPC 請(qǐng)求之后會(huì)發(fā)現(xiàn)不需要做 flush 操作,就會(huì)立刻返回 ACK。在收到所有 ACK 之后,會(huì)重新做一次第一階段提交;第一階段提交之后成功了,但是在第二階段提交的時(shí)候失敗了,那么 Controller 切換到另外的一個(gè)機(jī)器首先會(huì)去 Checkpoint Storage 中查詢,如果第一階段提交信息已經(jīng)存在就會(huì)直接發(fā)起第二階段提交工作。
③ Object Store 故障/HMS 故障:我們會(huì)做一個(gè)無(wú)限重試,并且將一些告警信息發(fā)送出來(lái)。
三、流批統(tǒng)一存儲(chǔ)應(yīng)用實(shí)踐
流批統(tǒng)一存儲(chǔ)在公司內(nèi)部落地之后,可以解決一些 Lambda 架構(gòu)帶來(lái)的問(wèn)題,下面將從四個(gè)方面來(lái)介紹。
1、Kafka 數(shù)據(jù)檢索
在流批一體之前,開(kāi)發(fā)同學(xué)去檢索 Kafka 數(shù)據(jù)比較復(fù)雜,如左圖顯示:第一步需要去申請(qǐng)一個(gè) topic,按照需要寫(xiě)數(shù)倉(cāng)作業(yè);第二步找 DBA 申請(qǐng)一個(gè) OLAP 表;第三步再去寫(xiě) Flink JOB 去解析 topic 數(shù)據(jù)寫(xiě)到剛剛申請(qǐng)的 OLAP 表中,這個(gè)表純粹是用來(lái)查詢和排障,整個(gè)鏈路比較長(zhǎng)。在使用流批一體之后,開(kāi)發(fā)同學(xué)申請(qǐng)一個(gè) Topic,然后往 Topic 中寫(xiě)作業(yè),這個(gè)時(shí)候開(kāi)發(fā)同學(xué)可以直接查詢流批統(tǒng)一存儲(chǔ)。
2、強(qiáng)一致的數(shù)倉(cāng) ODS 層
流批統(tǒng)一的存儲(chǔ),可作為數(shù)倉(cāng) ODS 層,建設(shè)下游鏈路。因?yàn)榱髋y(tǒng)一存儲(chǔ)是 Excatly-once 語(yǔ)義,所以可以做到實(shí)時(shí)和離線存儲(chǔ)完全匹配,可以避免雙鏈路帶來(lái)的數(shù)據(jù)不一致問(wèn)題。
3、批量分區(qū)回刷,提升Backfill效率
結(jié)合 Flink 提供的流批統(tǒng)一的計(jì)算能力,同時(shí)從批存儲(chǔ)和流存儲(chǔ)回刷數(shù)據(jù),極大提升回刷性能。與 Kafka 相比,批存儲(chǔ)提供更長(zhǎng)的數(shù)據(jù)生命周期,數(shù)據(jù) SLA 更有保障。
4、多維分析能力
利用 StarRocks 良好的湖上分析能力,充分發(fā)揮向量化引擎和 CBO 優(yōu)勢(shì),在統(tǒng)一的計(jì)算引擎上實(shí)現(xiàn)多業(yè)務(wù)多維分析。例如用戶行為分析、用戶畫(huà)像、自助報(bào)表、跨域分析等多種分析場(chǎng)景,都可以在一站式平臺(tái)上去完成。