縱騰湖倉全鏈路落地實踐
一、總體架構
面對日益增長的數據量,Lambda 架構使用離線/實時兩條鏈路和兩種存儲完成數據的保存和處理。這種繁雜的架構體系帶來了不一致的問題,需要通過修數、補數等一系列監(jiān)控運維手段去彌補。為了統(tǒng)一簡化架構,提高開發(fā)效率,減少運維負擔,我們實施了基于數據湖 Hudi+Flink 的流批一體架構,達到了降本增效的目的。
如下圖所示,總體架構包括數據采集、ETL、查詢、調度、監(jiān)控、數據服務等。要解決的是數據從哪里來到哪里去,怎么過去,怎么用,以及過程中的調度和監(jiān)控、元數據管理、權限管理等問題。
“數據從哪里來”,我們的數據來自 MySQL、MongoDB、Tablestore、Hana?!皵祿侥睦锶ァ保覀兊臄祿懭氲?Hudi、Doris,其中 Doris 負責存儲部分應用層的數據。“數據怎么過去”,將在后面的實時入湖部分進行介紹?!皵祿迷谀睦铩?,我們的數據會被 OLAP、機器學習、API、BI 查詢使用,其中 OLAP 和 BI 都通過 Kyuubi 的服務進行查詢。
任務的調度主要通過 DolpuinScheduler 來執(zhí)行,基于 quartz 的 cronTrigger 完成 shell、SQL 等調度。監(jiān)控部分則是通過 Prometheus 和 Grafana,這是業(yè)界通用的解決方案。元數據采集通過 DataHub 完成,采用了 datahub 的 ingestion framework 框架來采集各種數據源的元數據。權限管理主要包括 Kyuubi 服務端的統(tǒng)一認證和引擎端的獨立鑒權。
二、入湖方案選型
數據入湖方案設計上,我們比較了三種入湖的實現思路。
1、入湖方案一
如下圖所示,包含了兩條支線:
- 分支①:Flink SQL 通過 MySQL-CDC connector 和 Hudi connector 完成 source 和 sink 端讀寫。這樣 MySQL 每張表由單獨的 binlog dump 線程讀取 binlog。
- 分支②:通過 MySQL 多庫表配置一個 Debezium Connector 實現單獨 binlog dump 線程讀取多庫表,解析后發(fā)送到 Kafka 的多個 topic。即一張表一個 topic。之后用 Flink SQL通過 Kafka connector 和 Hudi connector 完成 source 和 sink 端讀寫。
這種方案的主要優(yōu)點是 Flink 和 CDC 組件都經過了充分驗證,已經非常穩(wěn)定成熟了。而主要缺點是 Flink SQL 需要定義表 DDL。但我們已經開發(fā) DDL 列信息從元數據系統(tǒng)獲取,無須自定義。并且寫 Hudi 是每張表一個 Flink 任務,這樣會導致資源占用過多。另外 Flink CDC 還不支持 Schema 演變,一旦 Schema 變更,需要重新拉取數據。
2、入湖方案二
這一方案是在前一個方案分支二的基礎上進行了一定的改進,通過 Dinky 完成整庫數據同步,其優(yōu)點是同源數據合并成一個 source 節(jié)點,減輕源庫壓力,根據 schema、database、table 分流 sink 到對應表。其缺點是不支持 schema 演變,表結構變更須重新導數。如下圖所示,mysql_biz 庫中有3張表,從 flink dag 圖看到 mysql cdc source 分3條流 sink 到 Hudi 的3張表。
3、入湖方案三
主要流程如下圖所示。其主要優(yōu)點是支持 Schema 演變。Schema 變更的信息由 Debezium 注冊到 Confluence Schema Registry,schema change 的信息通過 DeltaStreamer 執(zhí)行任務變更到 Hudi,使得任務執(zhí)行過程中不需要重新拉起。其主要缺點是依賴于 Spark 計算引擎,而我們部門主要用 Flink,當然,這會因各個公司實際情況而不同。
下圖分別是 Yarn 的 deltastreamer 任務, Kafka schema-change topic 的 DML message 和 Hudi 表變更后的數據。
4、入湖方案總結
在方案選型時,可以根據下面的流程圖進行比較選擇:
(1) 先看計算框架是 Spark 還是Flink,如果是Spark 則選擇方案三,即 Deltastreamer,這一方案適用于表結構變更頻繁,重新拉取代價高,主要技術棧是Spark 的情況。
(2) 如果是 Flink,再看數據量是否較少,以及表結構是否較穩(wěn)定,如果是的話,選擇方案二,Dinky 整庫同步方案支持表名過濾,適用數據量較少且表結構較穩(wěn)定的表。
(3) 如果否,再考慮 mysql 能否抗較大壓力,如果否,那么選擇方案一下分支,即 Kafka Connect,Debezium 拉取發(fā)送 Kafka,從 Kafka 讀取后寫 Hudi。適用數據量較大的多張表。
(4) 如果是,則選擇方案一上分支,即 Flink SQL mysql-cdc 寫 Hudi,適用于對實時穩(wěn)定要求高于資源敏感的重要業(yè)務場景。
三、實時入湖優(yōu)化
我們的入湖場景是 Flink Stream API 讀取Pulsar 寫 Hudi MOR 表,特點是數據量大,并且源端的每條消息都只包含了部分的列數據。我們通過使用 Hudi 的 MOR 表格式和 PartialUpdateAvroPayload 實現了這個需求。使用 Hudi 的 MOR 格式,是因為 COW 的寫放大問題,不適合數據量大的實時場景,而 MOR 是增量數據寫行存 Avro 格式log,通過在線或離線方式壓縮合并至列存格式 parquet。在保證寫效率的同時也兼顧了查詢的性能。不過需要通過合并任務定期地對數據進行合并處理,這是引入復雜度的地方。
以下面這張圖為例,recordKey 是 ID1 的3條 msg,每條分別包含一個列值,其余字段為空,按 ts 列 precombine,當 ts3 > ts2 > ts1時,最終 Hudi 存的 ID1 行的值是 v1,v2,v3,ts3。
此入湖場景痛點包括,MOR 表索引選擇不當,壓縮異常導致越寫越慢,直至 checkpoint 超時,某分區(qū)存在重復文件導致寫任務出錯,MOR 表某個壓縮計劃 pending阻礙此 bucket 的壓縮及后續(xù)的壓縮計劃生成,以及如何平衡效率與資源等。
我們在實踐過程中針對一些痛點實施了相應的解決方案。
Hudi 表索引類型選擇不當,導致越寫越慢至 CK 超時,這是因為 Bucket 索引通過 hash 映射 recordKey 到 fileGroup。而 Bloom 索引是保存 recordKey 和 partition、fileGroup 值來實現,因此 checkpoint size 會隨數據量的增加而增長。Bloom Filter 索引基于布隆過濾器實現,索引信息存儲在 parquet 的 footer 中,Bloom 的假陽性問題也會導致更新越來越慢,假陽性是指只能判斷數據一定不在某個文件而不能保證數據一定在某個文件,因此存在多個文件都可能存在某條數據,即須讀取多個文件才能準確判斷。
我們做的優(yōu)化是使用 Bucket 索引代替 Bloom 索引,Hudi 目前也支持了可以動態(tài)擴容的 Bucket 參數。
MOR 表壓縮執(zhí)行異常,具體來說有以下三個場景:
- 單 log 超過1G,使寫延遲提高,導致越寫越慢至 checkpoint 超時,checkpoint 端到端耗時增長至3-6分鐘。
- 在 inline schedule 的壓縮模式下,offline execute 出現報錯:log文件不存在。
- Compaction 一直處于 Infight 狀態(tài),即進行中,不能完成;同時存在無效 compaction,既不能被壓縮,也不能被取消。
此3種現象的原因都是 Sink:compact_commit 算子的并行度 > 1,我們做的優(yōu)化是降低壓縮過程的并發(fā)度,設置 compact_commit Parallelism = 1。并行度改成1后1G的 log 壓縮正常。整張表size 明顯減少。log 到 parquet 的壓縮比默認是0.35。
MOR 表某分區(qū)存在重復文件,導致寫任務出錯。出現這個問題的原因是某個 instant 已寫 log 文件但未成功提交到 timeline 時,發(fā)生異常重啟后未 rollback 這個 instant,即未清理已有 log,繼續(xù)寫此 instant 則有重復。
我們做的優(yōu)化是在遇到重復文件時,通過 Hudi-Cli 執(zhí)行去重任務,再恢復執(zhí)行。具體來說,需要拆分成以下四個步驟:
- 停止當前的 Flink 任務。
- 通過 Hudi-cli 執(zhí)行去重命令。
repair deduplicate --duplicatedPartitionPath 20220604 --repairedOutputPath hdfs:///hudi/hudi_tis.db/track_detail_3_repair/20220604 --dedupeType upsert_type --sparkMaster local
- 刪除 partition 文件,修復文件移到原分區(qū)。
- 重新啟動 Flink 任務。
MOR 表某個壓縮計劃 pending,阻礙此 bucket 的壓縮及后續(xù)的壓縮計劃生成。這個問題是由于環(huán)境問題導致的 zombie compaction 或 bug。上圖中第一列是compaction instant time,即壓縮計劃生成時間,第二列是狀態(tài),第三列是此壓縮計劃包含的文件數。8181的 instant 卡住,且此壓縮計劃包含2198個文件,即涉及到大量的 file group,涉及的 file group不會有新的壓縮計劃生成。導致表的 size 增加,寫延時。
我們做的優(yōu)化是回滾不正常的合并任務,重新處理。即利用較多資源快速離線壓縮完。保證之后啟動的 Flink 任務在相對少的資源情況下仍然可以保證更新和在線壓縮的效率。
具體來說,包括下面的命令:
- 執(zhí)行HoodieFlinkCompactor把所有inflight instant回滾成requested狀態(tài)。
- 執(zhí)行compaction unschedule命。
sh bin/hudi-compactor.sh hudi_tis track_detail_3 100
compaction unschedule --instant 20230613180604970 --parallelism 200 --sparkMaster local --sparkMemory 5g
經過多次的修改和驗證,我們的入湖任務在性能和穩(wěn)定性上取得了明顯的改善。在穩(wěn)定性上,做到了在十幾天內任務無異常。在時延上,做到了分鐘級別的 checkpoint 和數據可見。在資源使用上,對 Hadoop YARN 資源的占用明顯減少。
下圖總結了我們對實時入湖做的參數優(yōu)化方案,包括:
- 索引選擇GLOBAL_BLOOM-> BUCKET_INDEX #Bucket索引較布隆索引寫吞吐性能高。
- BUCKET_NUM 20 #Bucket數量:根據單分區(qū)數據量評估,保證File Slice2GB,平衡讀寫性能。
Flink增量checkpoint:Rockdb #Flink ck存儲,rockdb支持增量ck,減少單ck數據量,提高寫吞吐。
- Yarn資源:
jobmanager 5G #Flink jobmanager內存,減少oom,保證穩(wěn)定。
taskmanager 50G 20S #Flink taskmanager內存與slot數,slot與并發(fā)度、bucket數一致。
- write.rate.limit 30000 #寫速度限制,過載保護,保證作業(yè)穩(wěn)定運行。
- write.max.size 2560 #寫用到最大內存,于taskmanager每個slot內存一致。
- write.batch.size 512 #批量寫,適量調大減少刷盤頻率。
- compaction.max.memory 2048 #壓縮用到的最大內存,適量調大提升壓縮速度。
- compaction.trigger.strategy num_and_time #壓縮策略 增量提交個數或時間達標觸發(fā)生成壓縮策略。
- compaction.delta_seconds 30 #壓縮策略之時間,減少時間間隔,減少單個壓縮文件數。
- compaction.delta_commits 2 #壓縮策略之增量提交個數,減少個數,減少單個壓縮文件數。
實時任務入湖的優(yōu)化思路流程包括下面幾個步驟:
- 先確定 bucket 數量,觀察 fileSlice 大小,估算調整。
- 根據 bucket 數確定 Flink job 并行度,與 bucket 數保持一致。
- 根據并行度確定 tm 資源,即并行度 = 總 slot 數。
- 根據總 slot 數確定內存,即總內存 = num(slot) * (write.max.size)。
- 根據 pulsar topic 流量確定 write.rate.limit,一般峰值 * 1.5。
- 根據 Flink job 內存使用情況及平穩(wěn)度確定 write.max.size,可拿追存量數據測試,一般內存降到寫速度明顯下降即為內存最低值。
- 根據write.max.size 確定 write.batch.size 和 compaction.max.memory,前者是后兩個的和。
- 根據 pulsar topic 流量確定壓縮類型,一般超過 10w/s 考慮使用 inline schedule 和 offline execution。
四、數據湖上的查詢
在引入 Kyuubi 前,我們通過 JDBC、Beeline、Spark Client、Flink Client 等客戶端訪問服務層執(zhí)行查詢,沒有統(tǒng)一入口,多個平臺不互通,多賬號權限體系。用戶的痛點是跨多平臺開發(fā)體驗差,低效率。平臺層的痛點是問題定位運維復雜,存在資源浪費。
在引入 Kyuubi 后,我們基于社區(qū)版 Kyuubi 做了一定的改造,包括 JDBC 引擎開發(fā)、JDBC 引擎 Ranger 鑒權開發(fā)、BI、JDBC 客戶端元數據適配修改、Spark 引擎大結果集存 HDFS、支持導數開發(fā)、JDBC 引擎 SQL 攔截控流開發(fā)等,實現了統(tǒng)一數據服務入口,做到了統(tǒng)一認證權限管理和統(tǒng)一易用原則。
下圖展示了 Kyuubi 的架構和權限管控:
Kyuubi 查詢流程是:客戶端請求通過 LDAP 認證后,連接 Kyuubi Server 生成 Kyuubi session,之后 Kyuubi server 根據連接用戶以及用戶隔離級別路由到已經啟動的 engine 或啟動一個新的 engine。Spark 引擎會先申請 container 運行 AppMaster,后申請 container 運行 executor 執(zhí)行 task。Flink 引擎會完成 StreamGraph 至 JobGraph 至 executionGraph 構建并通過 Jobmanager 和 taskmanager 運行。其中 engine 端 RangerPlungin 會在 SQL 解析后拉取 RangerAdmin 由用戶配置的策略進行鑒權。RangerAdmin 完成用戶同步,策略刷新等。
Kyuubi on Flink 跨庫查詢的目的是嘗試基于 Flink實現流批一體,支持跨數據源導數 SQL 化。我們的實現方案是通過 Flink Metadata Catalog Connector 的開發(fā),即基于元數據系統(tǒng)以統(tǒng)一 datasource.db.table 的格式查詢所有數據源,且讓用戶免于自定義 DDL。其中元數據采集是用 datahub 的 ingestion framework 采集各種數據源的元數據,并生成對應 Flink 表屬性。Flink 端是擴展 AbstractCatalog 查詢 metadata DB,實現 CatalogFactory 接口。
其基本流程如下圖所示:
完整流程是1 發(fā)起采集請求2和3是采集服務調 Datahub ingestion framework 完成元數據采集并寫到 metadata DB 同時寫 Flink 表屬性。4是 用戶發(fā)送 SQL 到 Kyuubi server 5是 Kyuubi server 發(fā)送 SQL 到 Flink engine 6和7是 Flink metadata catalog 會讀取 metadata DB 根據 Flink 表屬性讀取對應數據源。
Kyuubi on JDBC Doris 可以通過外表查詢 Hudi,但在 Doris 1.2 版本,仍然有一定的限制,Hudi 目前僅支持Copy On Write 表的 Snapshot Query,以及 Merge On Read 表的 Read Optimized Query。后續(xù)將支持 Incremental Query 和 Merge On Read 表的 Snapshot Query。
Doris 的架構示意和其基本使用流程如下圖所示: