攻破主流數倉缺陷,字節跳動基于Doris的湖倉分析探索實踐
Doris是一種MPP架構的分析型數據庫,主要面向多維分析、數據報表、用戶畫像分析等場景。自帶分析引擎和存儲引擎,支持向量化執行引擎,不依賴其他組件,兼容MySQL協議。
一、Doris簡介
Apache Doris具備以下幾個特點:
1)良好的架構設計,支持高并發低延時的查詢服務,支持高吞吐量的交互式分析。多FE均可對外提供服務,并發增加時,線性擴充FE和BE即可支持高并發的查詢請求。
2)支持批量數據load和流式數據load,支持數據更新。支持Update/Delete語法,unique/aggregate數據模型,支持動態更新數據,實時更新聚合指標。
3)提供了高可用,容錯處理,高擴展的企業級特性。FE Leader錯誤異常,FE Follower秒級切換為新Leader繼續對外提供服務。
4)支持聚合表和物化視圖。多種數據模型,支持aggregate, replace等多種數據模型,支持創建rollup表,支持創建物化視圖。rollup表和物化視圖支持動態更新,無需用戶手動處理。
5)MySQL協議兼容,支持直接使用MySQL客戶端連接,非常易用的數據應用對接。
Doris 由 Frontend(以下簡稱FE)和 Backend(以下簡稱BE)組成,其中FE負責接受用戶請求、編譯、優化、分發執行計劃、元數據管理、BE節點的管理等功能,BE負責執行由FE下發的執行計劃,存儲和管理用戶數據。
二、數據湖格式Hudi簡介
Hudi是下一代流式數據湖平臺,為數據湖提供了表格式管理的能力,提供事務,ACID,MVCC,數據更新刪除,增量數據讀取等功能。支持Spark、Flink、Presto、Trino等多種計算引擎。
Hudi根據數據更新時行為不同分為兩種表類型:
針對Hudi的兩種表格式,存在3種不同的查詢類型:
三、Doris分析Hudi數據的技術背景
在數倉業務中,隨著業務對數據實時性的要求越來越高,T+1數倉業務逐漸往小時級、分鐘級,甚至秒級演進。實時數倉的應用也越來越廣,也經歷了多個發展階段。目前存在著多種解決方案。
1、Lambda架構
Lambda將數據處理流分為在線分析和離線分析兩條不同的處理路徑,兩條路徑互相獨立,互不影響。
離線分析處理T+1數據,使用Hive/Spark處理大數據量,不可變數據,數據一般存儲在HDFS等系統上。如果遇到數據更新,需要overwrite整張表或整個分區,成本比較高。
在線分析處理實時數據,使用Flink/Spark Streaming處理流式數據,分析處理秒級或分鐘級流式數據,數據保存在Kafka或定期(分鐘級)保存到HDFS中。
該套方案存在以下缺點:
- 同一套指標可能需要開發兩份代碼來進行在線分析和離線分析,維護復雜。
- 數據應用查詢指標時可能需要同時查詢離線數據和在線數據,開發復雜。
- 同時部署批處理和流式計算兩套引擎,運維復雜。
- 數據更新需要overwrite整張表或分區,成本高。
2、Kappa架構
隨著在線分析業務越來越多,Lambda架構的弊端就越來越明顯,增加一個指標需要在線離線分別開發,維護困難,離線指標可能和在線指標對不齊,部署復雜,組件繁多。于是Kappa架構應運而生。
Kappa架構使用一套架構處理在線數據和離線數據,使用同一套引擎同時處理在線和離線數據,數據存儲在消息隊列上。
Kappa架構也有一定的局限:
- 流式計算引擎批處理能力較弱,處理大數據量性能較弱。
- 數據存儲使用消息隊列,消息隊列對數據存儲有有效性限制,歷史數據無法回溯。
- 數據時序可能亂序,可能對部分在時序要求方面比較嚴格的應用造成數據錯誤。
- 數據應用需要從消息隊列中取數,需要開發適配接口,開發復雜。
3、基于數據湖的實時數倉
針對Lambda架構和Kappa架構的缺陷,業界基于數據湖開發了Iceberg、Hudi、DeltaLake這些數據湖技術,使得數倉支持ACID、Update/Delete,數據Time Travel、Schema Evolution等特性,使得數倉的時效性從小時級提升到分鐘級,數據更新也支持部分更新,大大提高了數據更新的性能。兼具流式計算的實時性和批計算的吞吐量,支持的是近實時的場景。
以上方案中其中基于數據湖的應用最廣,但數據湖模式無法支撐更高的秒級實時性,也無法直接對外提供數據服務,需要搭建其他的數據服務組件,系統較為復雜。基于此背景下,部分業務開始使用Doris來承接,業務數據分析師需要對Doris與Hudi中的數據進行聯邦分析,此外在Doris對外提供數據服務時既要能查詢Doris中數據,也要能加速查詢離線業務中的數據湖數據,因此我們開發了Doris訪問數據湖Hudi中數據的特性。
四、Doris分析Hudi數據的設計原理
基于以上背景,我們設計了Apache Doris中查詢數據湖格式Hudi數據,因Hudi生態為java語言,而Apache Doris的執行節點BE為C++環境,C++ 無法直接調用Hudi java SDK,針對這一點,我們有四種解決方案。
1)實現Hudi C++ client,在BE中直接調用Hudi C++ client去讀寫Hudi表。
該方案需要完整實現一套Hudi C++ client,開發周期較長,后期Hudi行為變更需要同步修改Hudi C++ client,維護較為困難。
2)BE通過thrift協議發送讀寫請求至Broker,由Broker調用Hudi java client讀取Hudi表。
該方案需要在Broker中增加讀寫Hudi數據的功能,目前Broker定位僅為fs的操作接口,引入Hudi打破了Broker的定位。第二,數據需要在BE和Broker之間傳輸,性能較低。
3)在BE中使用JNI創建JVM,加載Hudi java client去讀寫Hudi表。
該方案需要在BE進程中維護JVM,有JVM調用Hudi java client對Hudi進行讀寫。讀寫邏輯使用Hudi社區java實現,可以維護與社區同步;同時數據在同一個進程中進行處理,性能較高。但需要在BE維護一個JVM,管理較為復雜。
4)使用BE arrow parquet c++ api讀取hudi parquet base file,hudi表中的delta file暫不處理。
該方案可以由BE直接讀取hudi表的parquet文件,性能最高。但當前不支持base file和delta file的合并讀取,因此僅支持COW表Snapshot Queries和MOR表的Read Optimized Queries,不支持Incremental Queries。
綜上,我們選擇方案四,第一期實現了COW表Snapshot Queries和MOR表的Read Optimized Queries,后面聯合Hudi社區開發base file和delta file合并讀取的C++接口。
五、Doris分析Hudi數據的技術實現
Doris中查詢分析Hudi外表使用步驟非常簡單。
1、創建Hudi外表
建表時指定engine為Hudi,同時指定Hudi外表的相關信息,如hive metastore uri,在hive metastore中的database和table名字等。
建表僅僅在Doris的元數據中增加一張表,無任何數據移動。
建表時支持指定全部或部分hudi schema,也支持不指定schema創建hudi外表。指定schema時必須與hiveMetaStore中hudi表的列名,類型一致。
Example:
Plaintext
CREATE TABLE example_db.t_hudi
ENGINE=HUDI
PROPERTIES (
"hudi.database" = "hudi_db",
"hudi.table" = "hudi_table",
"hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083"
);
CREATE TABLE example_db.t_hudi (
column1 int,
column2 string)
ENGINE=HUDI
PROPERTIES (
"hudi.database" = "hudi_db",
"hudi.table" = "hudi_table",
"hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083"
);
2、查詢Hudi外表
查詢Hudi數據表時,FE在analazy階段會查詢元數據獲取到Hudi外表的的hive metastore地址,從Hive metastore中獲取hudi表的schema信息與文件路徑。
- 獲取hudi表的數據地址。
- FE規劃fragment增加HudiScanNode。HudiScanNode中獲取Hudi table對應的data file文件列表。
- 根據Hudi table獲取的data file列表生成scanRange。
- 下發HudiScan 任務至BE節點。
- BE節點根據HudiScanNode指定的Hudi外表文件路徑調用native parquet reader進行數據讀取。
六、后期規劃
目前Apche Doris查詢Hudi表已合入社區,當前已支持COW表的Snapshot Query,支持MOR表的Read Optimized Query。對MOR表的Snapshot Query暫時還未支持,流式場景中的Incremental Query也沒有支持。
后續還有幾項工作需要處理,我們和社區也在積極合作進行中:
- MOR表的Snapshot Query。MOR表實時讀需要合并讀取Data file與對應的Delta file,BE需要支持Delta file AVRO格式的讀取,需要增加avro的native讀取方式。
- COW/MOR表的Incremental Query。支持實時業務中的增量讀取。
- BE讀取Hudi base file和delta file的native接口。目前BE讀取Hudi數據時,僅能讀取data file,使用的是parquet的C++ SDK。后期我們和聯合Hudi社區提供Huid base file和delta file的C++/Rust等語言的讀取接口,在Doris BE中直接使用native接口來查詢Hudi數據。