快手自研Spark向量化引擎正式發布,性能提升200% 原創
Blaze 是快手自研的基于Rust語言和DataFusion框架開發的Spark向量化執行引擎,旨在通過本機矢量化執行技術來加速Spark SQL的查詢處理。Blaze在快手內部上線的數倉生產作業也觀測到了平均30%的算力提升,實現了較大的降本增效。本文將深入剖析blaze的技術原理、實現細節及在快手實際生產環境中的真實表現。
一、研究背景
當下,Spark 的重要發展方向之一是通過向量化執行進一步提升性能。向量化執行的思想是將算子的執行粒度從每次處理一行變成每次處理一個行組,以此來避免大量的函數調用。通過對行組內部處理按列進行計算,同時利用編譯技術減少分支判斷檢查以及更多的 SIMD 優化執行計劃。
Blaze 是快手自研的基于Rust語言和DataFusion框架開發的Spark向量化執行引擎,旨在通過本機矢量化執行技術來加速Spark SQL的查詢處理。在性能方面,Blaze展現出顯著的優勢:在TPC-DS 1TB的測試中,Blaze相較于Spark 3.3版本減少了60%的計算時間、Spark 3.5版本減少了40%的計算時間,并大幅降低了集群資源的消耗;此外,Blaze在快手內部上線的數倉生產作業也觀測到了平均30%的算力提升,實現了較大幅度的降本增效。
如今,Blaze已開源,擁抱更廣闊的開發者社區。開源版本全面兼容Spark 3.0~3.5,用戶能夠輕松集成Blaze至現有Spark環境中,僅需簡單添加Jar包,即可解鎖Blaze帶來的極致性能優化,享受前所未有的數據處理速度與資源效率。
Github地址: https://github.com/kwai/blaze
二、Blaze的整體架構及核心
Spark on Blaze架構的整體流向
Spark+Blaze 的架構設計原理如下圖:
對比Spark原生的執行流程,引入Blaze Session Extension組件所帶來的作用是顯著的,特別是在提升數據處理效率和性能方面。
Spark原生執行流程主要依賴于Java虛擬機(JVM)進行任務的執行,盡管JVM在提供跨平臺、內存管理等方面有著卓越的表現,但在大數據處理場景下,尤其是涉及大規模數據計算和復雜查詢時,JVM的性能開銷可能會成為瓶頸。
Blaze Session Extension組件的引入,巧妙地解決了這一問題。該組件在Spark生成物理執行計劃之后介入,通過其翻譯邏輯將這一計劃轉換為等效的、native向量化引擎可以識別的形式,隨后提交到Executor端由native引擎執行計算,從而實現了數據處理效率的飛躍。
而這一切的背后,離不開Native向量化引擎這一核心模塊的支持。該引擎由Rust語言實現,憑借其卓越的性能、安全性和并發處理能力,成功實現了Spark中大多數關鍵算子的等效替代,包括但不限于Project、Filter、Sort等。這些經過優化的算子在執行過程中,通過向量化技術顯著提升了計算效率,使得數據處理過程更加流暢、快速。
四大核心組件
Blaze 架構中的核心模塊有四個,共同驅動著大數據性能的顯著提升。這些模塊分別為:
- Native Engine:基于 Datafusion 框架實現的與 Spark 功能一致的 Native 算子,以及相關內存管理、FFI 交互等功能。
- ProtoBuf:定義用于 JVM 和 native 之間的算子描述協議,對 Datafusion 執行計劃進行序列化和反序列化。
- JNI Bridge:實現 Spark Extension 和 Native Engine 之間的互相調用。
- Spark Extension:Spark 插件,實現 Spark 算子到 Native 算子之間的翻譯。
具體的執行過程中,遵循以下步驟:
物理執行計劃的轉換:首先,Spark Extension將 Spark 生成的物理執行計劃轉換為對應的 Native Plan;
生成和提交Native Plan:轉換完成后,Native Plan通過JNI Bridge被提交給Native Engine進行進一步的處理。
Native 執行層:最后,Native Engine利用其高效的內存管理機制和向量化處理技術,對Native Plan進行真正的執行。執行結果通過JNI Bridge返回給Spark,從而完成整個數據處理流程。
三、Blaze的技術優勢:面向生產的深度優化
在跑通 tpch 和 tpcds 測試集并取得預期性能提升后,我們面向線上生產環境進一步做了系列深度優化,包括性能和穩定性等方面工作:
細粒度的FailBack機制
我們針對Spark執行效率的提升,設計并實現了演進式向量化執行方案。這一方案旨在逐步優化算子與表達式的向量化覆蓋,同時解決Java UDF無法直接轉化為Native執行的問題。通過引入細粒度的FailBack機制,Blaze在翻譯過程中遇到暫無Native實現的算子、單個表達式或UDF時,支持算子/單個表達式粒度的回退,能夠靈活回退到Spark原生執行。此機制首先確定算子/表達式的依賴參數列,利用Arrow FFI技術將這些參數列作為行傳遞給Spark進行處理,然后將結果以列的形式回傳至Blaze,從而在JVM與Native執行之間構建了一座橋梁。
此方案不僅加速了向量化執行的全面部署,還確保了即便在用戶SQL中有少量UDF等不支持的場景,細粒度回退單個表達式開銷較小,作業整體依然可以得到優化。
更高效的向量化數據傳輸格式
在Spark中,Shuffle操作因其復雜的數據流轉過程成為性能瓶頸,涉及編碼、壓縮、網絡傳輸、解壓及解碼等多個環節。原生Spark采用基于行的序列化與壓縮方式,而業界向量化數據則傾向于Arrow格式傳輸,但實踐中發現Arrow與主流輕量壓縮算法適配不佳,導致壓縮率低下且存在冗余信息。針對此問題,Blaze定制了優化的數據傳輸格式,不僅去除了列名、數據類型等冗余數據,還使用了byte-transpose列式數據序列化技術,通過重組整型/浮點型數據的字節順序,顯著提升數據壓縮效率。這一改進大幅減少了Shuffle過程中的數據傳輸量,并在實際測試與TPC-DS基準測試中展現出顯著的性能提升與資源消耗降低,有效解決了原有問題并優化了系統整體性能。
::: hljs-center
線上2000多個作業的真實數據,上線后輸入數據量小幅上漲的情況下,Shuffle數據量相比spark降近30%
:::
減少用戶成本的多級內容管理策略
面對Spark與Native執行模式在內存管理上的差異,我們設計了跨堆內堆外的自適應內存管理機制。該機制根據任務的向量化覆蓋情況智能調整內存分配,確保資源高效利用。同時,我們構建了堆外內存、堆內內存與磁盤文件之間的多級管理體系,有效防止了內存不足及頻繁數據溢寫的問題。這些措施不僅保障了向量化引擎上線后任務的穩定運行,無需用戶手動調整內存參數,大幅降低了用戶操作成本,提升了整體系統的易用性與可靠性。
復雜度更優的聚合算法實現
為深度適配Spark的復雜需求,Blaze在aggregate、sort、shuffle等關鍵算子的實現上并未直接采用DataFusion的現成方案,而是進行了定制化開發。以HashAggregate為例,當面對大規模group-by聚合且內存不足時,Spark會轉而采用基于排序的聚合,這涉及高復雜度的排序與歸并過程。而在Blaze中,我們采用了基于分桶的歸并方式,利用基數排序在spill時進行分桶、溢寫,并在合并階段通過hash 表快速合并,整個流程保持O(n)的復雜度,顯著提升了聚合算子的執行效率與資源利用率。
向量化計算場景的表達式重復計算優化
針對SQL執行中算子間常見的重復表達式計算問題,Blaze借鑒了Spark的Whole-stage codegen技術,應用了這一項優化策略。該策略能夠智能識別并合并包含重復表達式的算子,如下圖中的Project與Filter合并為一個大算子,并在其中對表達式計算結果進行緩存、復用,達到了減少重復計算、提高執行效率的目的。這一優化在應對復雜計算邏輯(如JSON解析多個字段、UDF調用)時尤為顯著,能將執行效率提升一倍以上。特別是在內部業務場景中,對于高頻調用的重負載UDF,該優化成功減少了約40%的計算開銷,顯著增強了系統的整體性能與響應速度。
四、當前進展及未來規劃
當前進展
Blaze 作為一款高性能數據處理引擎,已取得了顯著進展,全面支持多項核心功能,展現出強大的技術實力與廣泛的應用潛力。具體而言,Blaze 目前已具備以下關鍵能力:
- Parquet向量化讀寫能力:實現了對Parquet格式數據的高效向量化讀寫,極大地提升了數據處理的速度與效率。
- 全面算子與表達式支持:覆蓋了線上常用的所有算子與表達式,少量不支持的表達式和UDF也可以細粒度回退,確保用戶能夠無縫遷移并享受向量化處理帶來的性能提升。
- Remote Shuffle Service集成:內部集成了自研的Remote Shuffle Service,同時我們也在和阿里合作,增加對Apache Celeborn 的支持,預計9月份可以提交到社區。
- TPC-H/TPC-DS測試優異表現:在業界權威的TPC-H/TPC-DS基準測試中,Blaze成功通過全部測試場景,并以TPC-H平均3倍以上、TPC-DS 2.5倍的性能提升展示了其在復雜查詢處理上的卓越能力。
在真實的生產環境中,向量化引擎大規模上線應用,算力平均提升 30%+,成本節約年化數千萬元。
未來規劃
1、持續迭代優化,內部線上推全:通過不斷收集用戶反饋與性能數據,我們將精準定位并修復潛在問題,同時引入更多先進的算法與優化策略,以進一步提升Blaze的性能與穩定性。
2、支持更多引擎或場景,例如數據湖等:為了滿足用戶日益多樣化的數據處理需求,我們將不斷拓展Blaze的應用場景,支持更多類型的數據處理引擎與場景,如數據湖等。通過加強與業界主流技術的兼容性,我們將為用戶提供更加靈活、便捷的數據處理方案,助力用戶解鎖數據價值,推動業務創新與發展。
3、加強開源社區運營建設,共建生態繁榮:我們深知開源社區對于技術發展與生態繁榮的重要性。因此,我們將在之后加強Blaze開源社區的運營建設,積極構建一個開放、包容、協作的社區環境。當前我們已經與阿里、B站、攜程、聯通云等公司達成合作。
如果您對該項目感興趣,歡迎您為項目點個star。
項目地址:https://github.com/kwai/blaze
本文作者:快手技術王磊
