B站離線計算的實踐
1. 背景介紹
2018年B站基于Hadoop開始搭建離線計算服務,計算集群規(guī)模從最初的兩百臺到發(fā)展到目前近萬臺,從單機房發(fā)展到多機房。我們先后在生產(chǎn)上大規(guī)模的使用了 Hive、Spark、Presto 作為離線計算引擎,其中 Hive 和 Spark 部署在 Yarn 上,具體的架構(gòu)如下,目前每天有約20w的離線批作業(yè)運行在 Spark 和 Hive 上,下面介紹下我們做了哪些工作來確保這些作業(yè)的高效與穩(wěn)定。
2. 從Hive到Spark
21年初的時候Hive還是B站主要的離線計算引擎,80%以上的離線作業(yè)使用 Hive 執(zhí)行,Spark2.4作業(yè)占比接近20%,集群資源的使用率長期維持在80%以上。21年3月 Spark3.1 發(fā)布,相較于 Spark2.4 性能有了較大的提升,我們開始推動Spark3.1 在B站的落地,同時將 Hive-SQL 整體遷移至 Spark-SQL。
在B站,離線計算的調(diào)度已經(jīng)完成了收口,80%以上的作業(yè)來自于自建的 BSK 調(diào)度平臺,其余的作業(yè)基本也都是 airflow 提交上來的,只有少量的任務來自散落的開發(fā)機。在推動 Hive 升級 Spark 時只要將調(diào)度平臺的作業(yè)完成遷移就可以覆蓋90%以上的作業(yè)。起步階段我們進行了少量的人工遷移,對用戶 SQL 進行了簡單改寫,修改了輸入輸出表后由兩個引擎執(zhí)行,開發(fā)了一個結(jié)果對比的工具,通過對雙跑結(jié)果分析保障遷移效果。基于這個操作鏈路我們自研了一個自動遷移工具,減少人工失誤和人力壓力。
2.1 語句轉(zhuǎn)換
我們重寫了 SparkSqlParser,將從調(diào)度系統(tǒng)中收集到的 SQL 進行輸入輸出表的替換,避免對生產(chǎn)環(huán)境的影響。調(diào)度平臺進行作業(yè)調(diào)度時以 DAG 為單位,一個調(diào)度任務里面可能存在多條 SQL,這些 SQL的輸入輸出表間存在依賴關(guān)系,為了保證雙跑盡可能的模擬生產(chǎn)表現(xiàn),對一個 DAG 里面的多個調(diào)度作業(yè)進行輸入輸出表替換時進行整體替換,保證了相互間依賴。對于 Select語句因為本身沒有輸出表,需要將 Select 語句轉(zhuǎn)換為 CTAS 語句,這樣就能將執(zhí)行結(jié)果落地進行對比,需要注意的是轉(zhuǎn)換過程中要將列名進行編碼防止中文列導致的建表失敗。當遷移工具識別出 SQL 語句為 DDL 語句,如果不是 CTAS 這種需要消耗計算資源的就直接跳過對比,同時對該語句進行標記,保證交由 Hive 執(zhí)行,防止意外的元信息修改。
2.2 結(jié)果對比
雙跑輸出結(jié)果的對比是保證數(shù)據(jù)準確性的關(guān)鍵。首先對兩個結(jié)果表的 Schema 進行對比,這個通過調(diào)用 DESC 語法返回結(jié)果對照就可以完成。對于 Schema 一致的兩個表則進行下一步操作,兩表全量數(shù)據(jù)對比,我們設計了一個 SQL 對數(shù)據(jù)按行進行整體對比,具體的對比思路如圖:
第一步將兩表按所有列(這里是 name 和 num 字段)進行 GROUP BY,第二步 UNION ALL 兩表數(shù)據(jù),第三步再按所有列(這里是 name, num 和 cnt 字段) GROUP BY 一次產(chǎn)生最終表,在最終表中 cnts 值為2的行表示這行數(shù)據(jù)在兩表中都有且重復值一致,對于值非2的數(shù)據(jù)就是差異行了。從上圖的例子來說差異行 Jack|1|2|1 表示 Jack|1 這行數(shù)據(jù)數(shù)據(jù)在一個表中存在兩行,結(jié)合差異行 Jack|1|1|1 來看其實就是 Jack|1 這行數(shù)據(jù)一個表有一行另一個表有兩行。通過這個方式就可以對雙跑產(chǎn)出的結(jié)果表進行一個全量的對比。通過這種結(jié)果對比方法可以完成大部分雙跑任務的結(jié)果對比,但是對于結(jié)果表中存在 LIST、SET、MAP 這種容器類型的,因為在 toString 時順序是無法保證的,所以會被識別為不一致,此外對于非穩(wěn)定性的 SQL 如某列數(shù)據(jù)是 random 產(chǎn)生,因為每次執(zhí)行產(chǎn)出的結(jié)果不一致,也會識別為對比失敗,這兩種情況下就需用人工的介入來分析了。
資源利用率的提升是做引擎升級的出發(fā)點,除了結(jié)果對比來保證數(shù)據(jù)準確性,我們還做了資源消耗對比來保證遷移的收益。對比系統(tǒng)收集了每個作業(yè)的執(zhí)行時間以及消耗的資源,從執(zhí)行時間、CPU 和內(nèi)存的資源消耗進行兩個引擎執(zhí)行性能的對比,在執(zhí)行最終遷移前依據(jù)收集的數(shù)據(jù)為用戶提供了遷移的預期收益,提高了用戶遷移任務的積極性。從遷移中收集的數(shù)據(jù)來看 hive 切到 spark 可以減少40%以上的執(zhí)行時間,同時整體資源消耗降低30%以上。
2.3 遷移&回滾
遷移系統(tǒng)對每個任務都執(zhí)行了至少3次的雙跑對比,但依然不能完全消除執(zhí)行遷移的風險,在實際遷移過程中的幾次問題都是遷移后穩(wěn)定性不符合預期導致的,因此遷移系統(tǒng)對于遷移后的任務增加了監(jiān)控,在一個任務遷移后,該任務的前3次調(diào)度執(zhí)行消耗的時間、CPU 和內(nèi)存資源將被用來和遷移前的七次平均執(zhí)行數(shù)據(jù)對比,如果存在負優(yōu)化的情況則會將這個任務執(zhí)行引擎進行回滾并通知我們介入進行進一步分析。
3. Spark 在B站的實踐
3.1 穩(wěn)定性改進
3.1.1 小文件問題
隨著B站業(yè)務高速發(fā)展,數(shù)據(jù)量和作業(yè)數(shù)增長越來越快,伴隨而來的小文件數(shù)也快速增長,小文件太多會增加 HDFS 元數(shù)據(jù)的壓力,在計算引擎讀取時也大大增加了讀請求的數(shù)量降低了讀取效率。為了解決小文件的問題,在寫表場景下對 Spark 做了如下兩種改造。
兜底小文件合并:我們修改了數(shù)據(jù)的寫出目錄,引擎計算先寫到一個中間目錄,在 FileFormatWriter.write 結(jié)束后 refreshUpdatedPartitions 前,插入了一個文件合并邏輯,從中間目錄中獲取分區(qū)下文件的平均大小,對于不存在小文件情況的目錄直接MV到最終目錄,對于存在小文件的目錄新增一個讀 RDD coalesce 到一個合適值寫出后 MV 到最終目錄。
基于 reparation 的小文件合并:可以看到兜底小文件合并方式需要先將數(shù)據(jù)落地到 HDFS,重新讀取后再寫出,這樣做放大了 HDFS寫操作(三副本),降低了計算引擎的執(zhí)行性能。而 Spark3的 AQE 特性可以在有 shuffle 的場景下有效解決小文件的問題,很多情況下對于沒有 shuffle 的場景新增一個 reparation 操作就可以借助 AQE 的能力解決小文件的問題。社區(qū) AQE 對于 reparation 這個 hint 是不會調(diào)整目標分區(qū)數(shù)的,我們新增了一個 rebalance hint,本質(zhì)上和reparation 一樣只是將 AQE 的特性應用在了這個操作上,同時將 AQE 目標 size 相關(guān)的屬性和 rebalance 設置屬性做了隔離方便更好的設置文件大小而不影響計算的并行度。rebalance 操作會在最終寫出前增加一個 shuffle stage,有些情況下沒有這個 stage 上游輸出就已經(jīng)沒有小文件了,為此作業(yè)是否增加 rebalance 操作依賴于我們對任務的畫像通過 HBO 系統(tǒng)開啟。
3.1.2 shuffle 穩(wěn)定性問題
Shuffle 穩(wěn)定性直接影響了 Spark 作業(yè)的 SLA,在B站推動 Spark 升級過程中成為用戶顧慮的點。
shuffle 磁盤分級:B站 Yarn 主集群采用 DataNode 和 NodeManage 混部模式,節(jié)點配置了多塊 HDD 盤和少量 SSD 盤,NM 以 HDD 盤作為計算盤,由于和 DN 沒有做到 IO 隔離,DN 和shuffle service 經(jīng)常互相影響,因此我們對DiskBlockManager 進行了改造,優(yōu)先使用 SSD 盤下的目錄作為工作目錄,當 SSD 盤存儲空間或者 inode 緊張時則降級到 Yarn 配置的計算目錄,借助 SSD 優(yōu)異的隨機 IO 能力,有效的提高的了 shuffle 穩(wěn)定性。
remote shuffle service:push based shuffle 方案可以大量降低磁盤隨機IO讀請求,如下圖:
通過中間服務將同屬一個分區(qū)的數(shù)據(jù)進行歸并,后續(xù) reduce 操作就不需要從上游所有的 Map 節(jié)點拉取數(shù)據(jù),在 shuffle 上下游 Task 數(shù)量多的情況下會對磁盤 IO 壓力指數(shù)放大,生產(chǎn)上 shuffle heavy 的任務表現(xiàn)很不穩(wěn)定,經(jīng)常出現(xiàn)FetchFailed Exception。B站在推動 RSS 落地時選擇了社區(qū)3.2 Push based shuffle 的方案,這個方案主要的優(yōu)點是對 AQE 支持比較好,缺點是因為本地也要寫一份數(shù)據(jù)放大了寫。
將數(shù)據(jù)先寫本地后異步的發(fā)送到 driver 維護的 executor 節(jié)點的 external shuffle 節(jié)點上,后續(xù)生產(chǎn)實踐中該方案有個問題,就是當作業(yè)啟動時通常 driver 維護的 executor 數(shù)不足以滿足遠程節(jié)點的選擇,而 SQL 作業(yè)參與計算的數(shù)據(jù)量通常是隨著過濾條件層層遞減的,通常 shuffle 數(shù)據(jù)量大的時候因為沒有足夠的節(jié)點會 fall back 到原先的 shuffle 方式,為了解決這個問題,我們新增了 shuffle service master 節(jié)點,具體調(diào)用流程如下圖,所有的 external shuffle 節(jié)點啟動時都會注冊到 shuffle master 節(jié)點上,后續(xù)節(jié)點本身也會周期性的上報心跳和節(jié)點繁忙程度,DAGScheduler 后續(xù)請求遠程節(jié)點都從 shuffle master 申請,這樣不僅解決了冷啟動節(jié)點不足的問題,在節(jié)點選擇上也考慮了節(jié)點的健康程度。
因為是先落盤后發(fā)送,在 stage 執(zhí)行結(jié)束后會有一個等待時間,這里面會有個性能回退的問題,對小任務不友好,所以在生產(chǎn)應用中我們基于任務畫像系統(tǒng) HBO 自動決定任務是否啟用RSS服務,目前生產(chǎn)大約7%的大任務在使用RSS 服務,這些任務平均執(zhí)行時間縮短了25%,穩(wěn)定性有了顯著提升。
目前B站生產(chǎn)中使用該方案基本解決了 shuffle 穩(wěn)定性的問題,不過這套方案依舊需要計算節(jié)點配置本地 shuffle 盤,在本地落 shuffle 數(shù)據(jù),無法支持存算分離的架構(gòu)。后續(xù)我們在 k8s 上會大規(guī)模上線混部集群,需要盡量不依賴本地磁盤,避免對在線應用的影響,我們也關(guān)注到騰訊和阿里相繼開源各自的 RSS 方案,我們也在嘗試在生產(chǎn)中使用純遠程 shuffle 方案來滿足 Spark on K8s 的技術(shù)需要。
3.1.3 大結(jié)果集溢寫到磁盤
在adhoc 場景中用戶通常會拉取大量結(jié)果到 driver 中,造成了大量的內(nèi)存消耗,driver 穩(wěn)定性又直接影響著用戶即席查詢的體驗,為此專門優(yōu)化了 executor fetch result 的過程,在獲取結(jié)果時會監(jiān)測 driver 內(nèi)存使用情況,在高內(nèi)存使用下將拉取到的結(jié)果直接寫出到文件中,返回給用戶時則直接分批從文件中獲取,增加 driver 的穩(wěn)定性。
3.1.4 單 SQL task 并行度、task 數(shù)、執(zhí)行時間限制
生產(chǎn)上我們按隊列隔離了用戶的 adhoc 查詢,在實踐過程中經(jīng)常性的遇到單個大作業(yè)直接占用了全部并行度,有些短作業(yè)直接因為獲取不到資源導致長時間的 pending 的情況,為了解決這種問題首先對單個 SQL 執(zhí)行時間和總 task 數(shù)進行了限制,此外考慮到在 task 調(diào)度時有資源就會全部調(diào)度出去,后續(xù) SQL 過來就面臨著完全無資源可用的情況,我們修改了調(diào)度方法對單個 SQL 參與調(diào)度的 task 數(shù)進行了限制,具體的限制數(shù)隨著可用資源進行一個動態(tài)變化,在 current executor 數(shù)接近于 max executor 的情況下進行嚴格限制 ,在 current executor 數(shù)明顯少于 max executor 的情況下,提高單 SQL 并行的 task 總數(shù)限制。
3.1.5 危險 join condition 發(fā)現(xiàn)& join 膨脹率檢測
- 危險 join condition 發(fā)現(xiàn)
在選擇 join 方式的時候如果是等值 join 則按照 BHJ,SHJ,SMJ 的順序選擇,如果還沒有選擇出則判斷 Cartesian Join,如果 join 類型是 InnerType 的就使用 Cartesian Join,Cartesian Join 會產(chǎn)生笛卡爾積比較慢,如果不是 InnerType,則使用 BNLJ,在判斷 BHJ 時,表的大小就超過了 broadcast 閾值,因此將表 broadcast 出去可能會對 driver 內(nèi)存造成壓力,性能比較差甚至可能會 OOM,因此將這兩種 join 類型定義為危險 join。
如果不是等值 join 則只能使用 BNLJ 或者 Cartesian Join,如果在第一次 BNLJ 時選不出 build side 說明兩個表的大小都超過了 broadcast 閾值,則使用 Cartesian Join,如果 Join Type 不是 InnerType 則只能使用 BNLJ,因此 Join 策略選擇Cartesian Join 和第二次選擇 BNLJ 時為危險 join。
- join 膨脹率檢測
ShareState 中的 statusScheduler 用于收集 Execution 的狀態(tài)和指標,這其中的指標就是按照 nodes 匯總了各個 task 匯報上來的 metrics,我們啟動了一個 join 檢測的線程定時的監(jiān)控 Join 節(jié)點的 "number of output rows"及 Join 的2個父節(jié)點的 "number of output rows" 算出該 Join 節(jié)點的膨脹率。
- 傾斜 Key 發(fā)現(xiàn)
數(shù)據(jù)傾斜是 ETL 任務比較常見的問題,以 shuffle 過程中的傾斜為例,通常有以下幾個解決方法:增大 shuffle 的分區(qū)數(shù)量從而使數(shù)據(jù)分散到更多的分區(qū)中;修改邏輯,將 shuffle 時的 key 盡可能打散;單獨找出產(chǎn)生了極大傾斜的 key,在邏輯中單獨處理等等。但在進行這些處理之前,我們都需要先知道傾斜發(fā)生在 SQL 邏輯的哪個部分以及發(fā)生傾斜的是哪些 key。為了幫助用戶自助高效的解決數(shù)據(jù)傾斜問題,我們實現(xiàn)了傾斜 key 發(fā)現(xiàn)的功能。以 SortMergeJoin 為例,在 shuffle fetch 階段,首先根據(jù) mapStatuses 計算出每個 partition size,并根據(jù)一定策略判斷該 task 所處理的 partition 是否傾斜。如果傾斜,則在 join 階段對數(shù)據(jù)進行采樣,找到發(fā)生傾斜的 key,通過 TaskMetric 發(fā)送到 driver 端,driver 端消費 metric后會記錄傾斜信息。
上面這些 bad case 在運行時發(fā)現(xiàn)后會自動將信息發(fā)送到我們內(nèi)部作業(yè)診斷平臺,用戶可以查看并對語句做優(yōu)化和改進。
3.2 性能優(yōu)化
3.2.1 DPP 和 AQE 兼容
spark3.1 的 DPP 和 AQE 存在兼容問題,在使用 AQE 后 DPP 的策略就無法生效,這個問題在3.2得到了修復,我們將3.2的相關(guān)代碼 backport 回來,從 TPCDS 測試上看對3.1有很明顯的提升。
3.2.2 AQE 支持 ShuffledHashJoin
AQE 通過對 map 階段收集的指標數(shù)據(jù)來優(yōu)化 Join 方式,對于存在小表的情況能將 SMJ 優(yōu)化為 BHJ,這個操作可以顯著的優(yōu)化性能。Spark的 shuffle 策略還有一個就是 ShuffledHashJoin,該策略性能相對較好,但內(nèi)存壓力大,在默認情況下為了保證任務的穩(wěn)定性我們將其關(guān)閉,基于 AQE 的思想,在 map 完成后收集 partition size,當最大的 partition size 小于定義的值后,通過新增 DynamicJoin 優(yōu)化策略將 SMJ 優(yōu)化為 SHJ。
3.2.3 Runtime filter
DPP 通過對大表直接進行 partition 級別的裁剪,可以大大提高查詢速度,但 DPP 的適用條件也相對嚴格,需要大表的分區(qū)列參與 join,但如果大表參與 join 的列為非分區(qū)列則無法應用。我們知道 shuffle 是比較耗時的操作,shuffle 的數(shù)據(jù)量越大,耗時越久,而且對網(wǎng)絡,機器 IO 都會產(chǎn)生比較大的壓力。如果能在大表 shuffle 前根據(jù)非分區(qū)列的 join 列對其進行過濾,即使無法像 DPP 一樣直接減少從存儲中讀取的數(shù)據(jù)量,但減小了其參與 shuffle 以及后續(xù)操作的數(shù)據(jù)量,也能獲得比較不錯的收益,這就是 runtime filter 的動機,即運行時預先掃描小表獲取 join 列的值,構(gòu)造 bloom filter 對大表進行過濾。具體實現(xiàn)思路和 DPP 基本一致,首先在 SparkOptimizer 新增 DynamicBloomFilterPruning 規(guī)則,邏輯上類似PartitionPruning,符合一系列判斷條件后插入一個節(jié)點 DynamicBloomFilterPruningSubquery。與 DPP 不同的是,如果 join 可以被轉(zhuǎn)化為 BroadcastHashJoin,則不會應用該規(guī)則,因為在 BroadcastHashJoin 的情況下對大表進行預先的過濾其實是多余的(非 pushdown 的情況下)。判斷是否加入 filter 節(jié)點的主要邏輯如下,這里以裁剪左表(左右兩側(cè)都為 logicalPlan,為了方便表達,用左右表指代)為例進行說明,需要滿足以下條件:
- 右表 rowCount 需要小于左表
- Join 類型支持裁剪左表
- 右表 rowCount > 0
- 右表 rowCount 小于 spark.sql.optimizer.dynamicBloomFilterJoinPruning.maxBloomFilterEntries,默認值為100000000,避免 bloom filter 占用內(nèi)存過大
- 右表中沒有DynamicBloomFilterPruningSubquery
- 右表不是 stream 且存在 SelectivePredicate
- 左表(這里的左表是真正的左表或者包含左表的Filter節(jié)點)沒有 SelectivePredicate,因為如果存在 SelectivePredicate,那么下一步便無法根據(jù)統(tǒng)計信息去計算過濾收益
在 prepare 階段,PlanAdaptiveSubqueries 會把 DynamicBloomFilterPruningSubquery 節(jié)點替換為 DynamicPruningExpression(InBloomFilterSubqueryExec(_, _, _)),擴展了PlanAdaptiveDynamicPruningFilters,支持對以上節(jié)點進行處理。新增了 BuildBloomFilter 和 InBloomFilter 兩個 UDF。BuildBloomFilter 在 sparkPlan prepare 階段提交任務構(gòu)造 BloomFilter 并 broadcast 出去,具體的 evaluate 邏輯還是交給 InBloomFilter。另外在 AQE 的reOptimize 階段也新增了規(guī)則 OptimizeBloomFilterJoin,這個規(guī)則主要是用來根據(jù)執(zhí)行過程的 metric 信息更新BuildBloomFilter的expectedNumItems。
可以看到在開啟了runtime filter后數(shù)據(jù)量在join前從120億條降至3W條,收益還是相當明顯的。
3.2.4 Data skipping
目前B站離線表存儲主要使用 orc、parquet 格式,列式存儲都支持一定程度的 data skipping,比如 orc 有三個級別的統(tǒng)計信息,file/stripe/row group,統(tǒng)計信息中會包含count,對于原始類型的列,還會記錄 min/max 值,對于數(shù)值類型的列,也會記錄 sum 值。在查詢時,就可以根據(jù)不同粒度的統(tǒng)計信息以及 index 決定該 file/stripe/row 是否符合條件,不符合條件的直接跳過。對于統(tǒng)計信息及索引的細節(jié)見orc format (https://orc.apache.org/specification/ORCv1/) 和 orc index (https://orc.apache.org/docs/indexes.html) 。Parquet 與 orc 類似,也有相應的設計,具體見parquet format (https://github.com/apache/parquet-format) 和 parquet pageIndex (https://github.com/apache/parquet-format/blob/master/PageIndex.md) 。雖然 orc/parquet 都有 data skipping 的能力,但這種能力非常依賴數(shù)據(jù)的分布。前面提到統(tǒng)計信息中會包含每一列的 min/max 值,理論上如果查詢條件(比如> < =)不在這個范圍內(nèi),那么這個file/stripe/row group 就可以被跳過。但如果數(shù)據(jù)沒有按照 filter 列排序,那最壞的情況下,可能每個 file/stripe/row group的min/max 值都一樣,這樣就造成任何粒度的數(shù)據(jù)都不能被跳過。為了增加列式存儲 data skipping 效果,可以通過對數(shù)據(jù)增加額外的組織,如下:
select
count(1)
from
tpcds.archive_spl_cluster
where
log_date = '20211124'
and state = -16
表 archive_spl,不調(diào)整任何分布與排序
表 archive_spl_order,order by state,avid
通過對 state 進行 order 后 scan 階段數(shù)據(jù)量直接從億級別降至數(shù)十萬級別。在生產(chǎn)中我們通過對 SQL 進行血緣分析找到那些熱點表及高頻 filter 列,將這些熱列作為 table properties 存入 hms 中,在 Spark 執(zhí)行時根據(jù)從 hms 中獲取的列信息,通過相應的優(yōu)化規(guī)則,物理計劃自動增加 sort 算子,完成對數(shù)據(jù)組織。這個方案是基于列存優(yōu)化數(shù)據(jù)組織來進行 data skipping,目前我們也在往索引方向上進一步探索。
3.3 功能性改進
3.3.1 對于ZSTD的支持
Spark 社區(qū)在3.2版本全面支持了 ZSTD 壓縮,為了更好的使用 ZSTD,我們在 Spark3.1 的基礎(chǔ)上引入了社區(qū)的相關(guān) patch。其中也遇到了一些問題。在測試 ZSTD 的過程中偶然發(fā)現(xiàn)下推到 ORC 的過濾條件沒有生效,經(jīng)調(diào)查發(fā)現(xiàn)是 ORC 代碼的 bug,在和社區(qū)討論之后,我們修復了該 bug并將 patch提交給了社區(qū):https://issues.apache.org/jira/browse/ORC-1121 。
離線平臺的 Presto 也承接了很多 ETL 任務,由于 Presto 使用的是自己實現(xiàn)的 ORC reader/writer,所以在 Spark 升級 ORC 版本之后,對一些 Presto 寫出的表,出現(xiàn)了查詢結(jié)果錯誤的問題。正常情況下,Apache ORC writer 在寫文件時會記錄每個 stripe/rowGroup 中每列的統(tǒng)計信息,如 min/max 等。Apache ORC reader 在讀取文件時會根據(jù)這些統(tǒng)計信息結(jié)合下推的過濾條件進行 stripe/rowGroup 級別的過濾。但 Presto ORC writer 在寫文件時,如果 String 類型的列長度超過64 bytes,則這一列不會記錄 min/max 信息。雖然 Presto ORC reader 可以正常處理這類文件,但 Spark/Hive 使用的 Apache ORC reader 因為無法正常的反序列化 columnStatistics 得到正確的統(tǒng)計信息,導致做 stripe/rowGroup 級別的過濾時出現(xiàn)了錯誤的結(jié)果。我們也發(fā)現(xiàn)這個問題是由于 ORC 1.6 版本的一次代碼重構(gòu)導致,1.5及之前是沒有該問題的。我們已在內(nèi)部分支修復了該問題,也已將問題反饋給社區(qū)。
3.3.2 多格式混合讀兼容
歷史上很多表使用了 text 存儲,在資源上造成了很大的浪費,通過修改表的元信息可以保障新增分區(qū)切換到列存,這就造成了一個離線表可能存在多種 fileformat 的情況,為了兼容我們修改了 DataSourceScanExec 相關(guān)的邏輯,將reader 的實例化從基于table元信息粒度細化到分區(qū)元信息粒度。
3.3.3 轉(zhuǎn)表&小文件合并語法
為了方便用戶修改表的存儲格式和文件壓縮格式我們在引擎層提供了相關(guān)語法及具體實現(xiàn)。用戶可以通過指定分區(qū)條件對特定分區(qū)進行轉(zhuǎn)換。
CONVERT TABLE target=tableIdentifier
(convertFormat | compressType) partitionClause? #convertTable
MERGE TABLE target=tableIdentifier
partitionClause? #mergeTable
3.3.4 字段血緣
作業(yè)間的依賴關(guān)系分析、數(shù)據(jù)地圖等業(yè)務都需要SQL血緣的支持,團隊后續(xù)工作(z-order , analyze , index)也需要依賴血緣,我們通過注冊一個 LineageQueryListener 繼承 QueryExecutionListener,在 onSuccess 方法拿到當前執(zhí)行的QueryExecution,通過 analyzedLogicalPlan,利用 NamedExpression 的 exprId 映射關(guān)系,對其進行遍歷和解析,構(gòu)建出字段級血緣(PROJECTION/PREDICATE)和 levelRelation(層級關(guān)系)。
3.4 基于歷史執(zhí)行的自動參數(shù)優(yōu)化(HBO)
Spark 提供了大量的參數(shù)設置,對于用戶而言了解這些參數(shù)并使用好需要花費很大的代價,在很多情況下不同的參數(shù)調(diào)優(yōu)對于 spark 的作業(yè)執(zhí)行和資源消耗會有很大差異。為了盡可能的適配任務執(zhí)行,我們預設了一組參數(shù),這種統(tǒng)一配置存在很多問題,以內(nèi)存而言為了適配盡可能多的任務,該值設置偏大,通過對執(zhí)行的分析發(fā)現(xiàn)大量的任務存在資源浪費的問題,整體的內(nèi)存利用率僅20%左右。要求每個用戶成為專家對作業(yè)進行細致的調(diào)優(yōu)顯然不可能,因此我們設計了 HBO 系統(tǒng),具體的思路如下圖:
首先對任務執(zhí)行的 SQL 進行了指紋計算,通過指紋來標識該任務每天執(zhí)行情況,將每次執(zhí)行中采集到的 metrics 收集后用策略進行分析給出相應的參數(shù)優(yōu)化建議,在下次執(zhí)行的時候根據(jù)指紋來獲取推薦的執(zhí)行參數(shù),對于使用默認參數(shù)的任務則進行覆蓋,對于那些用戶指定的參數(shù)則優(yōu)先使用用戶參數(shù)。
- 內(nèi)存優(yōu)化策略:通過收集每個 executor 的峰值內(nèi)存,如果峰值內(nèi)存占配置內(nèi)存比值低于30%,就推薦使用更少的內(nèi)存來執(zhí)行此次的計算,對于峰值內(nèi)存占比過高的任務,則調(diào)大內(nèi)存配置。通過這個策略生產(chǎn)上的內(nèi)存使用率提升至50%左右。
- 并行度優(yōu)化策略:生產(chǎn)上開啟了動態(tài)資源配置,在對數(shù)據(jù)分析時發(fā)現(xiàn)有些節(jié)點從分配后就沒有task執(zhí)行過,完全浪費了節(jié)點的資源,對于這些任務會在下次執(zhí)行的時候降低 spark.dynamicAllocation.executorAllocationRatio 值來降低執(zhí)行并行度,此外默認提供的 spark.sql.shuffle.partitions 值對于大任務來說執(zhí)行并行度不夠,后續(xù)也將進行自動的調(diào)整。
- 優(yōu)化shuffle策略:如上文所講 RSS 對小任務存在性能下降的問題,通過對 block size、shuffle 數(shù)據(jù)量的分析,HBO 系統(tǒng)只會對那些 shuffle heavy 任務開啟使用 RSS 服務。
- 小文件合并策略:小文件合并會消耗額外的資源,對于不存在小文件情況的作業(yè) HBO 系統(tǒng)會關(guān)閉小文件合并相關(guān)的配置。
- 此外平時工作中一些 feature 的上線也會依賴該系統(tǒng)進行一個灰度過程。
3.5 Smart Data Manager (SDM)
Smart Data Manager(SDM)是我們自研的一個對數(shù)據(jù)進行組織和分析的服務,通過對數(shù)據(jù)的額外處理將我們對 Spark 的一些技改真正落地。它的整體架構(gòu)如圖,目前提供了如下的幾個數(shù)據(jù)組織和分析能力:
- 表存儲和壓縮方式的轉(zhuǎn)換:將表從 Text 存儲轉(zhuǎn)換為 ORC 或 Parquet 存儲,將壓縮類型從 None 或 Snappy 轉(zhuǎn)換為 ZSTD 可以帶來不錯的存儲和性能收益,SDM 提供了按分區(qū)對表異步進行轉(zhuǎn)換的能力。
- 數(shù)據(jù)重組織:在分區(qū)內(nèi)部按列對數(shù)據(jù)進行 order/zorder 組織可以有效的提高 data skipping 的效果,新增分區(qū)通過查詢 table properties 中的排序列 meta 來改寫執(zhí)行計劃應用,存量分區(qū)就可以通過 SDM 重刷。
- Statistics 的統(tǒng)計:開啟 CBO 時需要依賴對表統(tǒng)計信息的收集,在對 hive 表的列進行索引時也依賴收集到的列基數(shù)和操作信息選擇合適的索引類型,通過 sdm 監(jiān)聽 hms 的 partition 事件就可以在分區(qū)更新時異步完成信息采樣。
- 小文件合并:對有小文件較多的分區(qū)和表異步進行小文件合并,減少 namenode 的壓力
- Hive 表索引:通過分析血緣信息得到熱表熱列上的高頻操作(點查,范圍查詢),基于此在分區(qū)文件層面異步的建立索引來加速查詢。
- 血緣解析:解析語句,分析字段血緣,吐出 UDF 血緣、算子(order by / sort by / group by...)影響關(guān)系等
對數(shù)據(jù)進行重組織時會涉及到對數(shù)據(jù)的讀寫,為了防止對生產(chǎn)作業(yè)的影響我們在進行操作時會修改相關(guān)表的 Table Properties 增加鎖表標記,各個計算引擎適配實現(xiàn)了類 Hive 的鎖管理機制,由 Hive metastore 統(tǒng)一作為 lock manager,在對表和分區(qū)并發(fā)操作場景下,做到對用戶完全透明。
4. Hive Meta Store 上的優(yōu)化
B站使用 HMS(Hive MetaStore)管理所有的離線表元信息,整個的離線計算的可用性都依賴 HMS 的穩(wěn)定性。業(yè)務方在使用分區(qū)表時存在不少4級及以上分區(qū)的情況,有多個表分區(qū)數(shù)超百萬。分區(qū)元信息龐大單次分區(qū)獲取代價高,原生 HMS 基于單個 MySQL 實例存在性能瓶頸。
4.1 MetaStore Federation
隨著多機房業(yè)務的推進,獨立業(yè)務的 HDFS 數(shù)據(jù)和計算資源已經(jīng)遷移到新機房,但是 HIVE 元數(shù)據(jù)仍在原有機房的 Mysql 中,這時候如果發(fā)生機房間的網(wǎng)絡分區(qū),就會影響新機房的任務。
為了解決上述問題,我們進行了方案調(diào)研,有兩種方案供我們選擇:
- WaggleDance
- HMS Federation
4.1.1 WaggleDance
WaggleDance是開源的一個項目(https://github.com/ExpediaGroup/waggle-dance),該項目主要是聯(lián)合多個 HMS 的數(shù)據(jù)查詢服務,實現(xiàn)了一個統(tǒng)一的路由接口解決多套 HMS 環(huán)境間的元數(shù)據(jù)共享問題。并且 WaggleDance 支持 HMS Client的接口調(diào)用。主要是通過 DB,把請求路由到對應的 HMS。
4.1.2 HMS Federation
HMS Federation 是解決多機房場景下的 HIVE 元數(shù)據(jù)存儲問題,HIVE 元數(shù)據(jù)和 HDFS 數(shù)據(jù)存儲在同一個機房,并且允許跨機房訪問 HIVE 元數(shù)據(jù)。比如主站業(yè)務的 HDFS 數(shù)據(jù)存放在 IDC1,那么主站業(yè)務 HDFS 數(shù)據(jù)對應的 HIVE 元數(shù)據(jù)就存在IDC1 的 Mysql,同樣直播業(yè)務的 HDFS 數(shù)據(jù)和 HIVE 元數(shù)據(jù)都存放在 IDC2。
同時 HMS Federation 也提供了 Mysql 的橫向擴容能力,允許一個機房可以有多個 Mysql 來存放 HIVE 元數(shù)據(jù),如果單個 Mysql 的壓力過大,可以把單個 Mysql 的數(shù)據(jù)存放到多個 Mysql 里面,分擔 Mysql 的壓力。比如主站業(yè)務的 HIVE 庫,zhu_zhan 和 zhu_zhan_tmp,可以分別放在 idc1-mysql1 和 idc1-mysql2。
我們在 HMS Federation 中加入了一個 StateStore 的角色,該角色可以理解為一個路由器,HMS 在查詢 Hive 庫/表/分區(qū)之前,先問 StateStore 所要訪問的 HIVE 元信息存放在哪一個 Mysql 中,獲取到了對應的 Mysql 后,構(gòu)建相應的ObjectStore,進行 SQL 拼接或者是利用 JDO 查詢后端 Mysql。
4.1.3 HMS Federation 與 WaggleDance 的對比
數(shù)據(jù)遷移
我們的主要目的是實現(xiàn) HIVE 元數(shù)據(jù)按業(yè)務劃分到各自 IDC 的 Mysql
- WaggleDance 并沒有提供相應元數(shù)據(jù)遷移工具,要遷移需要停止整個 HIVE 庫新建表/分區(qū),才能夠開始遷移過去,對業(yè)務影響較大。
- HMS Federation 可以按表的粒度遷移,對業(yè)務影響較小,并且可以指定某個 HIVE 庫下,新建表在新的 Mysql,舊的等待著鎖表遷移。
運維復雜度
- WaggleDance 方案需要不同的 HMS,配置不同的 Mysql 地址,增加了 HMS 配置的復雜度。WaggleDance 是一個獨立的服務,為了保證可用性,運維復雜度會再一次提升。
- HMS Fedration 是 HMS 的功能升級,在 HMS 代碼上開發(fā),并且使用統(tǒng)一的配置。
綜合上述對比,我們最終選擇了 HMS Federation 的方案。通過修改 HMS 的代碼,實現(xiàn)元數(shù)據(jù)跨 Mysql 存儲。
4.2 MetaStore 請求追蹤和流量控制
HMS 在處理 getPartitions 相關(guān)請求的時候,如果拉取的分區(qū)數(shù)量非常多,會給 HMS 的堆內(nèi)存,以及后端的 Mysql 帶來很大的壓力,導致 HMS 服務響應延遲。
為了能夠快速的定位到有問題的任務,我們在 Driver 中將 Job 相關(guān)的信息保存到 Hadoop CallerContext 中,在調(diào)用 HMS 接口的時候?qū)?CallerContext 中的相關(guān)屬性設置到 EnvironmentContext 中透傳到 HMS 端,同時擴展了所有g(shù)etPartitions 相關(guān)的接口支持傳遞 EnvironmentContext,EnvironmentContext 中的 properties 會在 HMS 的 audit log 中打印出來,方便問題任務的定位。
同時為了提高 HMS 服務的穩(wěn)定性,我們在 HMS 端也做了接口的限流和主動關(guān)閉大查詢。對于限流,我們新增了一個 TrafficControlListener,當接口被調(diào)用的時候會以 function 和 user 為單位記錄 Counters 保存在該 Listener 中,同時在該Listener 中啟動采集 used memory 和 counters 的線程,當平均使用內(nèi)存達到閾值時,檢查接口的QPS,如果qps達到閾值會讓調(diào)用接口的線程 sleep 一段時間,下一次檢查通過或者達到最大等待時間后放行。HMS 也有可能因為 getPartitions 方法返回的分區(qū)數(shù)量太大導致內(nèi)存被打滿,一方面我們限制了 getPartitions 從 mysql 返回的分區(qū)數(shù)量,超過一定數(shù)量就直接拒絕該請求,另一方面我們在 TProcessor 中以 threadId 和 socket 為 key 和 value 保存當前的連接,在檢查 partition 數(shù)量時我們也按照 threadId 和 num partitions 為 key 和 value 保存 partition 的 cost,當 HMS 平均使用內(nèi)存達到閾值超過一定時間后,會選擇 num partitions 最大的 threadId,再根據(jù) threadId 獲取對應的連接,主動 close 該連接,來緩解內(nèi)存壓力。
5. 未來的一些工作
調(diào)研不落地的 Remote Shuffle Service 來更好的適配 K8S 混部的場景
使用向量化技術(shù)加速 Spark 的執(zhí)行引擎,提升計算性能
增強自動排錯診斷系統(tǒng),提升平臺用戶體驗
我們會和業(yè)界同行和開源社區(qū)保持密切技術(shù)交流,在服務好內(nèi)部用戶作業(yè)的同時,也會積極反饋社區(qū),共建社區(qū)生態(tài)。