Flink 在 B 站的多元化探索與實踐
本文整理自嗶哩嗶哩基礎架構部資深研發工程師張楊在 Flink Forward Asia 2021 平臺建設專場的演講。主要內容包括:
- 平臺建設
- 增量化
- AI On Flink
在過去的一年里,B站圍繞 Flink 主要做了三個方面的工作:平臺建設、增量化和 AI on Flink。實時平臺是實時業務的技術底座,也是 Flink 面向用戶的窗口,需要堅持持續迭代優化,不斷增強功能,提升用戶效率。增量化是我們在增量化數倉和流批一體上的嘗試,在實時和離線之間找到一個更好的平衡,加速數倉效率,解決計算口徑問題。AI 方向,我們也正在結合業務做進一步的探索,與 AIFlow 社區進行合作,完善優化機器學習工作流。
一、 平臺建設
1.1 基礎功能完善
在平臺的基礎功能方面,我們做了很多新的功能和優化。其中兩個重點的是支持 Kafka 的動態 sink 和任務提交引擎的優化。
我們遇到了大量這樣的 ETL 場景,業務的原始實時數據流是一條較大的混合數據流,包含了數個子業務數據。數據通過 Kafka 傳輸,末端的每個子業務都對應單獨的處理邏輯,每個子業務都去消費全量數據,再進行過濾,這樣的資源消耗對業務來說是難以接受的,Kafka 的 IO 壓力也很大。因此我們會開發一個 Flink 任務,對混合數據流按照子業務進行拆分,寫到子業務對應的 topic 里,讓業務使用。
技術實現上,早期 Flink SQL 的寫法就是寫一個 source 再寫多個 sink,每個 sink 對應一個業務的 topic,這確實可以滿足短期的業務訴求,但存在的問題也較多:
第一是數據的傾斜,不同的子業務數據量不同,數據拆分后,不同 sink 之處理的數據量也存在較大差別,而且 sink 都是獨立的 Kafka producer,高峰期間會造成 sink 之間資源的爭搶,對性能會有明顯的影響;
第二是無法動態增減 sink,需要改變 Flink SQL 代碼,然后重啟任務才能完成增減 sink。過程中,不僅所有下游任務都會抖動,還有一個嚴重的問題就是無法從 savepoint 恢復,也就意味著數據的一致性無法保證;
第三是維護成本高,部分業務存在上百個子分流需求,會導致 SQL 太長,維護成本極高。
基于以上原因,我們開發了一套 Kafka 動態 sink 的功能,支持在一個 Kafka sink 里面動態地寫多個 topic 數據,架構如上圖。我們對 Kafka 表的 DDL 定義進行了擴展,在 topic 屬性里支持了 UDF 功能,它會根據入倉的數據計算出這條數據應該寫入哪個 Kafka 集群和 topic。sink 收到數據后會先調用 UDF 進行計算,拿到結果后再進行目標集群和 topic 數據的寫入,這樣業務就不需要在 SQL 里編寫多個 sink,代碼很干凈,也易于維護,并且這個 sink 被所有 topic 共用,不會產生傾斜問題。UDF 直接面向業務系統,分流規則也會平臺化,業務方配置好規則后,分流實施自動生效,任務不需要做重啟。而且為了避免 UDF 的性能問題,避免用戶自己去開發 UDF,我們提供了一套標準的分流,做了大量的緩存優化,只要按照規范定義好分流,規則的業務表就可以直接使用 UDF。
目前內部幾個千億級別的分流場景,都在這套方案下高效運行中。
基礎功能上做的第二個優化就是任務的提交引擎優化。做提交器的優化主要是因為存在以下幾個問題:
第一,本地編譯問題。Flink SQL 任務在 Yarn 上的部署有三種模式:per-job、application 和 yarn-session。早前我們一直沿用 per-job 模式,但是隨著任務規模變大,這個模式出現了很多的問題。per-job 模式下,任務的編譯是在本地進行再提交到遠程 app master,編譯消耗提交引擎的服務性能,在短時批量操作時很容易導致性能不足;
第二,多版本的支持問題。我們支持多個 Flink 版本,因此在版本與提交引擎耦合的情況下,需要維護多個不同代碼版本的提交引擎,維護成本高;
第三,UDF 的加載。我們一直使用 Flink 命令里的 -c 命令進行 UDF 傳遞,UDF 代碼包存在 UDFS 上,通過 Hadoop 的 web HDFS 協議進行 cluster 加載,一些大的任務啟動時,web HDFS 的 HTTP 端口壓力會瞬間增大,存在很大的穩定隱患;
第四,代碼包的傳輸效率。用戶代碼包或者 Flink 引擎代碼包都要做多次的上傳下載操作,遇到 HDFS 反應較慢的場景,耗時較長,而實時任務希望做到極致的快速上下線。
因此我們做了提交器的優化:
首先引入了 1.11 版本以上支持的 application 模式,這個模式與 per-job 最大的區別就是 Flink 任務的編譯全部移到了 APP master 里做,這樣就解決了提交引擎的瓶頸問題;
在多版本的支持上面,我們對提交引擎也做了改造,把提交器與 Flink 的代碼徹底解耦,所有依賴 Flink 代碼的操作全部抽象了標準的接口放到了 Flink 源碼側,并在 Flink 源碼側增加了一個模塊,這個模塊會隨著 Flink 的版本一起升級提交引擎,對通用接口的調用全部進行反射和緩存,在性能上也是可接受的;
而且 Flink 的多版本源碼全部按照 maled 模式進行管理,存放在 HDFS。按照業務指定的任務版本,提交引擎會從遠程下載 Flink 相關的版本包緩存到本地,所以只需要維護一套提交器的引擎。Flink 任何變更完全和引擎無關,升級版本提交引擎也不需要參與;
完成 application 模式升級后,我們對 UDF 和其他資源包的上傳下載機制也進行了修改,通過 HDFS 遠程直接分發到 GM/TM 上,減少了上傳下載次數,同時也避免了 cluster 的遠程加載。
1.2 新任務構建模式
平臺之前支持 Flink 的構建模式主要有兩種, SQL 和 JAR 包。兩者的優劣勢都很明顯,SQL 簡單易用門檻低,但是不夠靈活,比如一些定時操作在 SQL 里面無法進行。JAR 包功能完善也靈活,但是門檻高,需要學習 Flink datastream 一整套 API 的概念,非開發人員難以掌握,而我們大量的用戶是數倉,這種JAR包的任務難以標準化管理。業務方大多希望使用 SQL,避免使用 JAR 包。
我們調研了平臺已有的 Datastream JAR 包任務,發現大部分的 JAR 包任務還是以 Table API 為主,只有少量過程用 Datastream 做了一些數據的轉換,完成之后還是注冊成了 Table 進行 Table 操作。如果平臺可以支持在 SQL 里面做一些復雜的自定義轉換,業務其實完全不需要編寫代碼。
因此我們支持了一種新的任務構建模式——算子化,模塊化地構建一個 Flink 任務,混合 JAR 包與 SQL,在進行任務構建時,先定義一段 SQL,再定義一個 JAR 包,再接一段 SQL,每段都稱為算子,算子之間相互串聯,構成一個完整的任務。
采用 Flink 標準的 SQL 語法,對 JAR 包進行了接口的限制,必須繼承平臺的接口定義進行開發。輸入輸出都是定義好的 Datastream。它比 UDF 的擴展性更強,靈活性也更好。而且整個任務的輸入輸出基本可以做到和 SQL 同級別的管控力,算子的開發也比純 JAR 包簡單得多,不需要學習太多 Flink API 的操作,只需要對 Datastream 進行變換。而且對于一些常用的公共算子,平臺可以統一開發提供,擁有更專業的性能優化,業務方只要引用即可。
目前在實時數倉等一些偏固定業務的場景,我們都在嘗試進行標準化算子的推廣和使用。
1.3 智能診斷
平臺建設的第三點是流任務的智能診斷。目前實時支持的業務場景包括 ETL、AI、數據集成等,且任務規模增長速度很快。越來越大的規模對平臺的服務能力也提出了更高的要求。
此前,平臺人員需要花費很多的時間在協助業務解決資源或各種業務問題上,主要存在以下幾個方面的問題:
- 資源配置:初始資源確認困難,碎片化嚴重,使用資源周期性變化;
- 性能調優:數據傾斜,網絡資源優化,state 性能調優,gc 性能調優;
- 錯誤診斷:任務失敗原因分析,修復建議。
這些問題日常都靠平臺人員兜底,規模小的時候大家勉強可以負擔,但是規模快速變大后已經完全無力消化,需要一套自動化的系統來解決這些問題。
因此我們做了一套流任務的智能診斷系統,架構如上圖。
系統會持續抓取任務運行時的 metrics 進行性能分析,分析完成后推給用戶,讓用戶自己執行具體的優化改進操作;也會實時抓取任務失敗的日志,并與詞庫進行匹配,將錯誤進行翻譯,使用戶更容易理解,同時也會給出更好理解的解決方案,讓用戶自行進行故障處理;同時還會根據任務的歷史運行資源進行自動化縮容處理,解決資源浪費和資源不足的問題。
目前此功能已經節省了整個隊列 10% 的資源左右,分擔了相當一部分平臺的運維壓力,在未來我們會持續進行優化迭代,更進一步提高這套系統在自動化運維上面的能力以及覆蓋度。
未來,在提交引擎方面,我們希望融合 Yarn session 模式與 application 模式做 session 的復用,解決任務上線的資源申請效率問題。同時希望大 state 任務也能夠在 session 的基礎上復用本地的 state,啟動時無需重新下載 state。
智能診斷方面,我們希望實現更多自動化的操作,實現自動進行優化改進,而不需要用戶手動操作,做到用戶低感知;擴容縮容也會持續提速,目前縮容的頻率只在天級,擴容還未實現自動化。未來我們希望整個操作的周期和頻率做到分鐘級的自動化。
算子方面,我們希望能統一目前的 SQL 和 JAR 包兩種模式,統一任務構建方式,讓用戶以更低的成本更多復雜的操作,平臺也更方便管理。
二、增量化
上圖是我們早期的數據架構,是典型的 Lambda 架構。實時和離線從源頭上就完全分離、互不干涉,實時占較低,離線數倉是核心的數倉模型,占主要的比例,但它存在幾個明顯的問題。
第一,時效性。數倉模型是分層架構,層與層之間的轉換靠調度系統驅動,而調度系統是有周期的,常見的基本都是天或小時。源頭生產的數據,數倉各層基本需要隔一天或幾個小時才可見,無法滿足實時性要求稍高的場景;
第二,數據的使用效率低。ETL 和 adhoc 的數據使用完全一樣,沒有針對性的讀寫優化,也沒有按照用戶的查詢習慣進行重新組織,缺乏數據布局優化的能力。
針對第一個問題,是否全部實時化即可?但是實時數倉的成本高,而且不太好做大規模的數據回溯。大部分業務也不需要做到 Kafka 的秒級時效。第二個問題也不好解決,流式寫入為了追求效率,對數據的布局能力較弱,不具備數據的重新組織能力。因此我們在實時和離線之間找到了一個平衡——做分鐘級的增量化。
我們采用 Flink 作為計算引擎,它的 checkpoint 是一個天然的增量化機制,實時任務進行一次 checkpoint,產出一批增量數據進行增量化處理。數倉來源主要有日志數據和 binlog 數據,日志數據使用 Append 傳統的 HDFS 存儲即可做到增量化的生產;binlog 數據是 update 模式,但 HDFS 對 update 的支持并不好,因此我們引入了 Hudi 存儲,它能夠支持 update 操作,并且具備一定的數據布局能力,同時它也可以做 Append 存儲,并且能夠解決 HDFS 的一些小文件問題。因此日志數據也選擇了 Hudi 存儲,采用 Append 模式。
最終我們的增量化方案由 Flink 計算引擎 + Hudi 存儲引擎構成。
增量化場景的落地上,考慮到落地的復雜性,我們先選取了業務邏輯相對簡單、沒有復雜聚合邏輯的 ODS 和 DWD 層進行落地。目前的數據是由 Flink 直接寫到 Hive 的 ODS 層,我們對此進行了針對性的適配,支持了 Hive 表的增量化讀取,開發了 HDFSStreamingSource,同時為了避免對 HDFS 路徑頻繁掃描的壓力,ODS 層寫入時會進行索引創建,記錄寫入的文件路徑和時間,只需要追蹤索引文件即可。
source 也是分層架構,有文件分發層和讀取層,文件分發層進行協調,分配讀取文件數,防止讀取層某個文件讀取過慢堆積過多文件,中間的轉換能夠支持 FlinkSQL 操作,具備完整的實時數倉的能力。
sink 側我們引入了 Hudi connector,支持數據 Append 寫入 Hudi,我們還對 Hudi 的 compaction 機制進行了一些擴展,主要有三個:DQC 檢測、數據布局的優化以及映射到 Hive 表的分區目錄。目前數據的布局依舊還很弱,主要依賴 Hudi 本身的 min、max 和 bloom 的優化。
完成所有上述操作后,ODS 到 DWD 的數據時效性有了明顯提升。
從數據生產到 DWD 可見,提高到了分鐘級別;DWD 層的生產完成時間也從傳統的 2:00~5:00 提前到了凌晨 1 點之前。此外,采用 Hudi 存儲也為日后的湖倉一體打下了以一個好的基礎。
除了日志數據,我們對 CDC 也采用這套方案進行加速。基于 Flink 的 CDC 能力,針對 MySQL 的數據同步實現了全增量一體化操作。依賴 Hudi 的 update 能力,單任務完成了 MySQL 的數據同步工作,并且數據只延遲了一個 checkpoint 周期。CDC 暫時不支持全量拉取,需要額外進行一次全量的初始化操作,其他的流程則完全一致。
Hudi 本身的模型和離線的分區全量有較大的區別,為了兼容離線調度需要的分區全量數據,我們也修改了 Hudi 的 compaction 機制。在做劃分區的 compaction 時會做一次數據的全量拷貝,生成全量的歷史數據分區,映射到 Hive 表的對應分區。同時對于 CDC 場景下的數據質量,我們也做了很多的保障工作。
為了保證 CDC 數據的一致性,我們從以下 4 個方面進行了完善和優化:
第一,binlog 條數的一致性。按照時間窗口進行 binlog 生產側和消費側的條數校驗,避免中間件丟數據;
第二,數據內容抽樣檢測。考慮到成本,我們在 DB 端和源端、Hudi 存儲端抽樣增量數據進行內容的精確比較,避免 update 出錯;
第三,全鏈路的黑盒測試。測試庫表模擬了線上情況,進行 7×24 小時不間斷的 Kafka 生產 MySQL 數據,然后串通整套流程防止鏈路故障;
第四,定期的全量對比。業務的庫表一般比較大,歷史數據會低頻地定期進行全量比對,防止抽樣觀測漏掉的錯誤。
剛開始使用 Hudi 的時候,Hudi on Flink 還是處于初級的階段,因此存在大量問題,我們也一起和 Hudi 社區做了大量優化工作,主要有 4 個方面:Hudi 表的冷啟動優化、checkpoint 一致性問題解決、Append 效率低的優化以及 get list 的性能問題。
首先是冷啟動的問題。Hudi 的索引存儲在 Flink state 里,一張存在的 Hudi 表如果要通過 Flink 進行增量化更新寫入,就必然面臨一個問題:如何把 Hudi 表已有的信息寫入到 Flink state 里。
MySQL 可以借助 Flink CDC 完成全量 + 增量的過程構建,可以繞開從已有 Hudi 表冷啟動的過程,但是 TiDB 不行,它的存量表在借助別的手段構建完之后,想要增量化就會面臨如何從 FlinkSQL 冷啟動的問題。
社區有個原始方案,在記錄所有的算子 BucketAssigner 里面讀取全部的 Hudi 表數據,然后進行 state 構建,從功能上是可行的,但是在性能上根本無法接受,尤其是大表,由于 Flink 的 key state 機制原理,BucketAssigner 每個并發度都要讀取全表數據,然后挑選出屬于當前這個并發的數據存儲到自己的 state 里面,每個并方案都要去讀全量的表,這在性能上難以滿足。
業務能啟動的時間太長了,很多百億級別的表能啟動的時間可能是在幾個小時,而且讀取的數據太多,很容易失敗。
和社區進行了溝通交流后,他們提供了一套全新的方案,新增了獨立的 Bootstrap 機制,專門負責冷啟動過程。Bootstrap 由 coordinator 和 IndexBootstrap 兩個算子組成,IndexBootstrap 負責讀取工作,coordinator 負責協調分配文件讀取,防止單個 IndexBootstrap 讀取速度慢而降低整個初始化流程的效率。
IndexBootstrap 算子讀取到數據后,會按照與業務數據一樣的 Keyby 規則,Keyby 到對應的 BucketAssigner 算子上,并在數據上面打標,告知 BucketAssigner 這條數據是有 Bootstrap 的,不需要往下游 writer 發送。整個流程里,原始數據只需讀取一遍,而且是多并發一起讀,效率獲得了極大的提升。而且 BucketAssigner 只需要處理自己應該處理的數據,不再需要處理全表的數據。
其次是 Hudi 的 checkpoint 一致性問題。Hudi on checkpoint 在每次 checkpoint 完成的時候會進行一次 commit 操作,具體流程是 writer 算子在 checkpoint 的時候 flush 內存數據,然后給 writer coordinator 算子匯報匯總信息,writer coordinor 算子收到匯報信息時會將其緩存起來,checkpoin 完成后,收到 notification 信息時會進行一次 commit 操作。
但是在 Flink 的 checkpoint 機制里,notification 無法保證一定成功,因為它并不在 checkpoint 的生命周期里,而是一個回調操作,是在 checkpoin 成功后執行。checkpoin 成功后,如果這個接口還沒有執行完成,commit 操作就會丟失,也就意味著 checkpoint 周期內的數據會丟失。
針對上述問題,我們進行了重構。Writer 算子在 cehckpoint 時,會對匯報的 writer coordinator 的信息進行 state 持久化,任務重啟后重新匯報給 writer coordinator 算子。writer coordinator 算子再收集所有 writer 算子信息并做一次 commit 判斷,確保對應的 commit 已經完成。此時,Writer 算子也會保持阻塞,確保上次持久化的 commit 完成之后才會處理最新的數據,這樣就對齊了 Hudi 與 Flink 的 checkpoint 機制,保證了邊界場景數據的一致性。
第三是針對 Hudi 在 Append 寫入場景下的優化。
由于 Append 模式是復用 update 模式的代碼,所以在沒有重復 key 的 Append 場景下,很多操作是可以簡化的,因為 update 為了處理重復,需要做很多額外的操作。如果能夠簡化這些操作,吞吐能力可以有較大的提升。
第一個操作是小文件的查找,每次 checkpoint 后,update 都會重新 list 文件,然后從文件中找到大小不達標的文件繼續 open 并寫入。update 場景存在傾斜,會造成很多文件大小不均勻,但是 Append 場景不存在這種問題,它所有的文件大小都很均勻;
第二個是 keyby。在 update 的模式下面,單個 key 只能被一個節點處理,因此上游需要按照 Hudi key 進行 keyby 操作。但是 Append 場景沒有重復 key,可以直接用 chain 代替 keyby,大大減少了節點之間序列化傳輸的開銷。同時 Append 場景下不存在內存合并,整體效率也會更高。
最后一個是 GetListing 的優化。Hudi 表與底層 HDFS 文件的映射是通過 ViewManager 來做的,Hudi table 對象和 TimelineService 都會自己去初始化一個 ViewManager,每個 ViewManager 在初始化的時候都會進行 HDFS 目錄的 list 操作,由于每個并發都持有多個 Hudi table 或 TimelineService,會造成大并發任務啟動時 HDFS 的壓力很大。我們對 TimelineService 進行了單例化的優化,保證每個進程只有一 TimelineService,能夠數倍地降低 HDFS list 的壓力。后續我們還會基于 Flink 的 coordinator 機制做任務級別的單例化。
未來,我們會繼續挖掘增量的能力,給業務帶來更多的價值。
三、AI on Flink
傳統的機器學習鏈路里數據的傳輸、特征的計算以及模型的訓練,都是離線處理的,存在兩個大的問題。
第一個是時效性低,模型和特征的更新周期基本是 t+1 天或者 t+1 小時,在追求時效性的場景下體驗并不好。第二個是計算訓練的效率很低,必須等天或小時的分區數據全部準備好之后才能開始特征計算和訓練。全量分區數據導致計算和訓練的壓力大。
在實時技術成熟后,大部分模型訓練流程都切換到實時架構上,數據傳輸、特征計算和訓練都可以做到幾乎實時,從全量變成了短時的小批量增量進行,訓練的壓力也大大減輕。同時由于實時對離線的兼容性,在很多場景比如特征回補上,也可以嘗試使用 Flink 的流批一體進行落地。
上圖是我們典型的機器學習鏈路圖。從圖上可以看出,樣本數據生產特征的計算、模型的訓練和效果的評估都大量實時化,中間也夾雜著少量離線過程,比如一些超長周期的特征計算。
同時也可以看出,完整的業務的模型訓練鏈路長,需要管理和維護大量的實時任務和離線任務。出現故障的時候,具體問題的定位也異常艱難。如何在整個機器學習的鏈路中同時管理號這么多實時和離線任務,并且讓任務之間的協同和調度有序進行、高效運維,是我們一直在思考的問題。
因此我們引入了 Flink 生態下 AIFlow 系統。AIFlow 本身的定位就是做機器學習鏈路的管理,核心的機器計算引擎是 Flink,這和我們的訴求不謀而合。這套系統有三個主要的特性符合我們的業務需求。
第一,流批的混合調度。在我們實際的業務生產上,一套完整的實時鏈路都會夾雜著實時和離線兩種類型的任務。AIFlow 支持流批的混合調度,支持數據依賴與控制依賴,能夠很好地支持我們現有的業務形態。并且未來在 Flink 流批一體方面也會有更多的發揮空間;
第二,元數據的管理,AIFlow 對所有數據和模型都支持版本管理。有了版本管理,各種實驗效果和實驗參數就都可追溯;
第三,開放的 notification 機制。整個鏈路中存在很多的外部系統節點,難以歸納到平臺內部,但是通過 notification 機制,可以打通 AIFlow 內部節點與外部節點的依賴。整套系統的部署分為三部分,notification service、 meta service 以及 scheduler,擴展性也很好,我們在內部化的過程中實現了很多自己的擴展。
實時平臺在今年引入 AIFlow 的之后已經經歷了兩個版本的迭代,V2 版本是社區 release 之前的一個內部版本,我們進行了分裝提供試用。V3 版本是今年 7 月社區正式 release 之后,我們進行了版本的對接。
AIFlow 的構建使用 Python 進行描述,運行時會有可視化的節點展示,可以很方便地追蹤各個節點的狀態,運維也可以做到節點級的管理,不需要做整個鏈路級別的運維。
未來我們會對這套系統在流批一體、特征管理以及模型訓練三個方向進行重點的迭代與開發,更好地發揮它的價值。