Apache Flink 在小米的穩(wěn)定性優(yōu)化和實(shí)踐
摘要:本文整理自小米大數(shù)據(jù)部高級軟件工程師張蛟在 Flink Forward Asia 2021 生產(chǎn)實(shí)踐專場的演講。主要內(nèi)容包括:
- 發(fā)展現(xiàn)狀和規(guī)模
- 穩(wěn)定性優(yōu)化及實(shí)踐
- 運(yùn)維優(yōu)化及實(shí)踐
- 未來規(guī)劃與展望
01發(fā)展現(xiàn)狀及規(guī)模
現(xiàn)階段,我們的整體架構(gòu)可以分成5層,數(shù)據(jù)從下往上流動,如上圖。
數(shù)據(jù)采集層主要負(fù)責(zé)收集各類數(shù)據(jù),數(shù)據(jù)的來源分為兩類,一類是埋點(diǎn)和業(yè)務(wù)日志以及服務(wù)日志,經(jīng)由 LCS Agent 進(jìn)行采集,另一類是數(shù)據(jù)庫數(shù)據(jù)經(jīng)由 Binlog 或 Checkpoint 數(shù)據(jù)集成等方式收集到消息隊列中。以 Flink、Spark 為主的計算層對其進(jìn)行處理,并最終存儲到各類存儲和查詢服務(wù)中,供業(yè)務(wù)使用。Flink 是計算層實(shí)時和準(zhǔn)實(shí)時處理的主要框架,在其中正發(fā)揮著越來越重要的作用,尤其是 Flink+Iceberg 數(shù)據(jù)湖技術(shù),正在讓流批一體成為現(xiàn)實(shí)。
目前我們的集群上運(yùn)行著 3000 多個作業(yè),主力版本是 1.12,1.14 版本也已經(jīng)合并上線,日均處理 10 萬億+ 條消息,PB 級的數(shù)據(jù)量,峰值數(shù)據(jù) 2 億條/秒,運(yùn)行在國內(nèi)外 10 多個集群,使用超過 45000 個 CPU core,內(nèi)存使用超過 200tb。
在這樣規(guī)模的數(shù)據(jù)處理過程中,我們遇到了許多問題。
- 作業(yè)內(nèi)存占用不可控,on Yarn 模式非常容易出現(xiàn) Yarn container OOM kill,導(dǎo)致 container lost,引發(fā)作業(yè)頻繁重啟,包括框架內(nèi)重啟。
- on Yarn 模式無法支持作業(yè)自動平滑重啟,在機(jī)器過保、下線、機(jī)房遷移等過程中,只能觸發(fā) failover。
- 實(shí)時作業(yè)對負(fù)載較為敏感,啟動和運(yùn)行的過程中需要保證機(jī)器性能,避免因離線和在線混部造成影響。
- Checkpoint 作為 Flink 有狀態(tài)計算數(shù)據(jù)一致性的保障,存在穩(wěn)定性問題。
- historyserver 默認(rèn)的清理策略不好設(shè)置,導(dǎo)致占用的磁盤空間比較大,訪問慢。
- 作業(yè)異常時難以確定異常原因和節(jié)點(diǎn),需要查看大量的作業(yè)日志,導(dǎo)致故障排查困難。
02穩(wěn)定性優(yōu)化及實(shí)踐
首先是 Yarn container lost 的優(yōu)化。Flink JobManager 首先會向 Yarn reCheckpointmanager 申請資源,Yarn reCheckpointmanager 為該申請分配資源后將分配信息返回給 JobManager,然后 JobManager 會根據(jù)分配信息去啟動 taskmanager,并使之與 JobManager 進(jìn)行心跳。
JobManager 包括 JobMaster 和 reCheckpointmanager,它會主動發(fā)送心跳請求,探測 taskmanager 是否存活。如果 taskexecutor 因?yàn)槟承┰蛞馔獗?kill,JobManager 的日志中就會提示 container lost。
上圖是 container lost 現(xiàn)象的提示之一,一般老版本的 Flink 中出現(xiàn)比較多。
上圖是 container lost 現(xiàn)象的另一種提示。
在出現(xiàn) container lost 時,如果去查看 Yarn的nodemanager 或 JobManager 中異常前后的日志,一般都可以看到類似 beyond the physical memory limit 的日志,這表明它是因?yàn)槲锢韮?nèi)存使用超限被 Yarn kill。
這里需要先介紹一下 Yarn 控制內(nèi)存超用的方式,Yarn Nodemanager 會啟動一個 containersmonitor 的線程,這個線程會定期掃描 Nodemanager 上的 container 內(nèi)存占用,從而實(shí)現(xiàn)內(nèi)存資源的隔離。
簡單來說,如果某個 container 對應(yīng)進(jìn)程樹中所有年齡大于 0 的進(jìn)程,總內(nèi)存使用量超過申請量的兩倍,或所有年齡大于 1 的進(jìn)程,總內(nèi)存使用量超過上限,就表明其內(nèi)存超用,需要被 kill。
但實(shí)際上這種方式存在一定的問題:
- 一是定期掃描對于內(nèi)存突增的隔離性比較差,可能還沒有開始掃描就已經(jīng)達(dá)到系統(tǒng)總內(nèi)存上限,導(dǎo)致被系統(tǒng) kill;
- 二是 Yarn 通常會開啟節(jié)點(diǎn)資源的超賣,此時如果所有資源都被使用,會導(dǎo)致節(jié)點(diǎn)不穩(wěn)定;
- 三是如果作業(yè)只是臨時的內(nèi)存需求,即使此時節(jié)點(diǎn)仍有富余內(nèi)存,也會觸發(fā) kill。
針對這些問題,我們采用 Cgroup + JDK升級 + Jemalloc 的方式進(jìn)行了優(yōu)化。可能有人會問為什么需要進(jìn)行 JDK 升級?這是因?yàn)槔习姹镜?JDK 使用 Jemalloc 存在線程死鎖的問題,另外升級最新的 JDK 也能避免其他的 JDK bug,通常這類 bug 都不容易被找到和復(fù)現(xiàn)。
Cgroup 的方式主要是開啟內(nèi)存軟限制,它對 container 的內(nèi)存限制不再是基于單個 container 的內(nèi)存申請量,而是整個 Nodemanager 的內(nèi)存量。這個時候如果 NodeManager 上仍有富余內(nèi)存,內(nèi)存超用的 container 就可以接著使用這些富余的內(nèi)存。一個節(jié)點(diǎn)上同時存在多個 Container 內(nèi)存超用導(dǎo)致整個節(jié)點(diǎn)內(nèi)存達(dá)到上限,才會觸發(fā) oom event。Oom listener 對該事件進(jìn)行監(jiān)聽并判斷,如果達(dá)到節(jié)點(diǎn)總內(nèi)存就會選取內(nèi)存實(shí)際占用量超過申請量且啟動時間最短、優(yōu)先級最低的作業(yè)觸發(fā) oom kill。
然而,Cgroup 只是在一定程度上解決了 container 頻繁被 Yarn oom kill 導(dǎo)致 lost 的問題,并沒有完全徹底地解決。在使用的過程中,依然存在某些 container 的內(nèi)存使用持續(xù)上漲,最終被 cgroup oom kill 的情況,然后我們發(fā)現(xiàn)該問題可能與 glibc 的內(nèi)存分配 bug 有關(guān),長期運(yùn)行的進(jìn)程會存在連續(xù)多塊大小為 65536 的 anon 塊,所以我們最終的解決方案如下:
使用 Cgroup 解決內(nèi)存臨時超用的問題,比如 RocksDB 對內(nèi)存的限制不嚴(yán)格、小白用戶對內(nèi)存的設(shè)置和使用不正確等造成的問題,然后升級 JDK 版本,解決 Jemalloc 分配時的線程死鎖 bug,最后切換 Jemalloc,解決 Linux 系統(tǒng)下的 64M anon 分配 bug。
經(jīng)過一系列的優(yōu)化,從上圖可以看出,container lost 的頻率由每月的近 5000 次減少到不到 100 次,因 Yarn oom kill 造成的作業(yè)異常重啟減少了 90% 以上,效果顯著。
第二個優(yōu)化實(shí)踐是節(jié)點(diǎn)的平滑重啟功能,流式作業(yè)是長時間運(yùn)行的作業(yè),由于大部分都運(yùn)行在廉價的機(jī)器上,因此機(jī)器出現(xiàn)過保、硬件故障、維修下線、機(jī)房遷移等都比較常見。為了提前預(yù)防可能出現(xiàn)的隱患,避免框架重啟造成的影響,提升云環(huán)境下作業(yè)的穩(wěn)定性,解決 Yarn 模式下恢復(fù)時間過長帶來的問題,我們開發(fā)了作業(yè)的平滑重啟功能。
將節(jié)點(diǎn)加入到 exclude 后,F(xiàn)link recheckpoint manager 會獲取到 decommission 的信息,通過解析該信息得到對應(yīng)的節(jié)點(diǎn),并判斷當(dāng)前運(yùn)行任務(wù)的 container 是否運(yùn)行在被 decommission 的節(jié)點(diǎn)上。如果是,就通過調(diào)用任務(wù)的 JobManager 的 stop with savepoint 接口去停止。平臺會自動檢測任務(wù)的運(yùn)行狀態(tài),如果某個作業(yè)不是通過平臺停止,則平臺會自動將該任務(wù)重新拉起,作業(yè)從 savepoint 恢復(fù)。這個過程會進(jìn)行周期性的觸發(fā)并批量合并后再處理,避免消息頻繁觸發(fā)造成瞬時負(fù)載壓力。此外,節(jié)點(diǎn)和 container 都會進(jìn)行去重,避免對同一任務(wù)多次觸發(fā)影響穩(wěn)定性。另外它的觸發(fā)周期遠(yuǎn)小于 sre 在下線節(jié)點(diǎn)時設(shè)置的下線周期,也緩解了運(yùn)維壓力。
JobManager 會啟動指標(biāo)收集監(jiān)控線程,并周期性地采集節(jié)點(diǎn)的 CPU、內(nèi)存、磁盤 io 和網(wǎng)絡(luò) io 等指標(biāo),然后匯聚成指標(biāo)集合,通過動態(tài)指標(biāo)規(guī)則對指標(biāo)進(jìn)行判定,如果滿足條件就會將其加入到節(jié)點(diǎn)黑名單,這樣該 Application 的 container 便不會再運(yùn)行在這個節(jié)點(diǎn)上。如果某個節(jié)點(diǎn)被多個 application 加入黑名單,則表明該節(jié)點(diǎn)可能存在問題,會自動觸發(fā)作業(yè)平滑重啟,并進(jìn)行監(jiān)控報警,以此來自動發(fā)現(xiàn)可能的異常節(jié)點(diǎn)。
上圖是 Flink Checkpoint 的大致流程,Checkpoint coordinator 會觸發(fā) Checkpoint Operator 進(jìn)行 Checkpoint,Checkpoint Operator 生成并向下游廣播 Checkpoint Barrier,然后 Snapshot State。Checkpoint Operator 完成 Checkpoint 后進(jìn)行 ack,下游節(jié)點(diǎn)收到 Checkpoint Barrier 后,根據(jù)是否要進(jìn)行對齊做對應(yīng)的處理,然后進(jìn)入 Checkpoint 邏輯。所有的節(jié)點(diǎn)都向 Checkpoint Coordinateor ack 之后,表示該次 Checkpoint 已經(jīng)完成,接著向所有參與 Checkpoint 的 Operator 發(fā)送完成通知,最后 Operator 做最后的提交操作等。
Checkpoint 過程中遇到的問題包括以下這些:
磁盤滿或其他 io 異常,會導(dǎo)致 Checkpoint 長期無法觸發(fā),但異常信息只存在于 JobManager 的日志中,并不影響作業(yè)的正常執(zhí)行,導(dǎo)致潛在的隱患不容易被感知。
作業(yè)因邏輯變更、調(diào)整并發(fā)、重新調(diào)度等原因,重啟時默認(rèn)不會從 Checkpoint 恢復(fù),導(dǎo)致狀態(tài)丟失或者消息積壓。
大并發(fā)度時 Checkpoint 小文件過多,引發(fā)大量的 HDFS RPC 負(fù)載壓力。
用戶錯誤配置 Checkpoint 目錄引發(fā)的恢復(fù)沖突非常不容易控制,也不易排查。
針對以上問題,我們也進(jìn)行了一些優(yōu)化。
針對磁盤滿、io 異常、Kerberos 文件損壞的問題,我們會捕獲異常棧,根據(jù)異常棧進(jìn)行判斷和重試,并在失敗時增加 Checkpoint 的失敗計數(shù),超過一定次數(shù)則進(jìn)行框架內(nèi)的重啟,或向用戶發(fā)送告警,保證作業(yè)不會出現(xiàn)長時間的 Checkpoint 失敗而從一個非常老的 Checkpoint 恢復(fù)。
針對作業(yè)重啟時無法從 Checkpoint 恢復(fù)的問題,優(yōu)化方式是對每個作業(yè)設(shè)置默認(rèn)的保留數(shù)量,并在進(jìn)行 Checkpoint 時先生成一個臨時的 Checkpoint Metadata 文件,只有在 Finalize 時才會被 rename 成正式的文件。接著將所有 Checkpoint 文件按最后修改時間降序排序,在其中尋找正式的 Checkpoint Metadata 文件。如果成功則表明其是一個完備的、可用于恢復(fù)的 Checkpoint 文件。
在這樣的設(shè)定下,必須確保文件最后修改時間的正確性。為此我們設(shè)置了任務(wù) finish 默認(rèn)不刪除 Checkpoint 文件,任務(wù)在做 Savepoint 時默認(rèn)不 discard 最新的 Checkpoint 文件,以確保這兩類文件最后修改時間的正確性。通過以上方式保證了任務(wù)能自動從最新的、完備的狀態(tài)進(jìn)行恢復(fù),需要重新處理的數(shù)據(jù)和狀態(tài)盡量少。另外,如果任務(wù)已經(jīng)找到最新的、完備的 Checkpoint 并可以用來恢復(fù),這表明前面的 Savepoint 和 Checkpoint 已經(jīng)可以清理,由此減少空間的占用。
于是我們通過為 Savepoint 設(shè)置生命周期來清理全量 Savepoint;對于增量的 Checkpoint,為了避免清除掉正在使用的狀態(tài),會先去讀取其 Metadata 文件的內(nèi)容,將其中用到的狀態(tài)文件對應(yīng)的父文件夾保留,其余的進(jìn)行清理,從而確保在不影響狀態(tài)恢復(fù)的前提下,盡量減少文件數(shù)和空間占用。
針對用戶隨意配置 Checkpoint 目錄導(dǎo)致狀態(tài)恢復(fù)沖突和引發(fā)負(fù)載壓力的問題,通過在 Metadata 文件中增加作業(yè)名和時間戳,當(dāng)前的作業(yè)名與存儲的作業(yè)名不同則會提示告警信息,恢復(fù)的 Checkpoint 的時間戳與當(dāng)前時間存在較大的差異,也會有告警信息。
小文件是使用 HDFS 經(jīng)常遇到的問題,由于 HDFS 適合于存儲大塊文件,所以必須對小文件進(jìn)行優(yōu)化來提升性能和穩(wěn)定性。方法是在進(jìn)行 Checkpoint 時對小文件寫入進(jìn)行合并,比如將多個小文件寫入到 sequence file 中,形成一個大的文件,這可能會造成空間浪費(fèi),但是對降低 HDFS Namenode 負(fù)載壓力效果比較明顯。
此外通過聯(lián)邦集群的方式,使用多個 Namenode 均衡 RPC 請求負(fù)載,每一個 Namenode 都是一個相對獨(dú)立的服務(wù),然后對用戶作業(yè)規(guī)范其 Checkpoint 目錄,使其訪問能夠被均衡到多個 Namenode 上,再對舊的 HDFS 文件通過掛載表的形式讀舊寫新,逐步實(shí)現(xiàn)自動遷移到新的統(tǒng)一的規(guī)范目錄下。
接下來介紹一個案例,該案例來自小米數(shù)據(jù)采集服務(wù),圖示是他們非常簡單的架構(gòu)圖,主要是將多個源端 SDK 的埋點(diǎn)和數(shù)據(jù)收集到消息隊列中,然后使用 Flink 進(jìn)行 ETL,最終存儲到 Doris 中并在看板上進(jìn)行展示。
目前該業(yè)務(wù)已經(jīng)接入 750+ 國內(nèi)外業(yè)務(wù),日均處理 1600億+ 條消息。通過采用 Checkpoint 相關(guān)的優(yōu)化手段,將 RPC 延遲降低了約 40%,減少了小文件。同時在作業(yè)通過 stop with savepoint 啟停時,保證了恢復(fù)的正確性,確保了 exactly once 的語義。
03運(yùn)維優(yōu)化實(shí)踐
Flink Historyserver 對作業(yè)運(yùn)維非常有效,尤其是它能在作業(yè)停止后,查看作業(yè)的統(tǒng)計信息,如果作業(yè)異常退出或處理結(jié)果有問題,我們又因?yàn)橐恍┰驘o法及時查看相關(guān)日志,就可以在將來通過 Historyserver 查看。
Flink Historyserver 會在每一次定時清理時獲取上一次清理已經(jīng)被緩存的作業(yè) ID,再獲取本次已經(jīng)打包的歷史日志信息,然后判斷歷史日志是否已經(jīng)超過了配置的最大值,若是,就會將后面的歷史日志直接執(zhí)行清理,否則就會判斷上一次緩存的作業(yè)在當(dāng)次歷史日志中是否存在,如果不存在也會執(zhí)行清理。
但上述流程存在一系列的問題,一個是服務(wù)重啟會造成當(dāng)前緩存的已下載的作業(yè)信息丟失,如果在重啟之間該作業(yè)的歷史日志也丟失,就會形成懸浮的緩存作業(yè),本地緩存的作業(yè)將會長期存在,無法清理。當(dāng)前已打包的歷史日志信息不支持過期,導(dǎo)致大量的日志存留于 HDFS 和本地磁盤,且會長期存在,不僅影響訪問的速度,也會造成磁盤空間的較大浪費(fèi)。緩存下來的作業(yè)歷史日志最大值難以確定,基礎(chǔ)服務(wù)如 HDFS 等如果出現(xiàn)異常,會導(dǎo)致同時出現(xiàn)大量失敗,沖走有效日志。另外當(dāng)前默認(rèn)并沒有記錄 Taskmanager 上的日志,非常不利于異常排查,
針對上述問題我們也做了相應(yīng)優(yōu)化。
一個是讀取當(dāng)前已經(jīng)緩存到本地磁盤的作業(yè)歷史日志信息,并將其與歷史日志記錄進(jìn)行對比,從而避免出現(xiàn)懸浮的緩存作業(yè);支持歷史日志的最長保留時間,超過其生命周期就會進(jìn)行清理,相比于當(dāng)前支持的歷史日志最大保留數(shù)量,更加科學(xué)合理;另外我們也支持了 Taskmanager 和 Container 歷史數(shù)據(jù)的打包和清理,更全面地記錄作業(yè)在異常退出時的各項(xiàng)信息,方便排查問題。
作業(yè)的全鏈路心跳監(jiān)控功能主要是對作業(yè)的鏈路延時進(jìn)行監(jiān)控,實(shí)現(xiàn)方式是通過在 Stream Checkpoint 中插入特殊標(biāo)記,標(biāo)記信息包括作業(yè)的名稱、當(dāng)前的時間,名稱的生成方式是 op+operator 在整個鏈路的 index 以及 subtask 在 operator 的 index,非 Checkpoint 節(jié)點(diǎn)會在收到標(biāo)記后更新名稱,并用當(dāng)前的時間減去 Checkpoint 插入的時間生成從 Checkpoint 到該 subtask 的耗時,并上報到 Metrics Reporter 中,最終對這些 metrics 進(jìn)行計算,通過這種方式可以發(fā)現(xiàn)鏈路中的異常節(jié)點(diǎn),監(jiān)測作業(yè)的數(shù)據(jù)異常丟失,還能夠通過心跳信息的插入頻率預(yù)估其影響。
心跳標(biāo)記在遇到多個下游鏈路時并不是隨機(jī)選擇鏈路,而是同時廣播到多條鏈路中,因此可能會出現(xiàn)心跳監(jiān)控標(biāo)記信息過多的情況,影響正常作業(yè)的處理性能。
這時就出現(xiàn)了一個矛盾點(diǎn),全鏈路心跳監(jiān)控采樣越頻繁,對各節(jié)點(diǎn)處理性能的監(jiān)控就越及時準(zhǔn)確,但同時也會造成信息過多、影響正常數(shù)據(jù)的處理。針對這個問題,我們進(jìn)行了以下三個方面的改進(jìn)和處理:
- 一是將 chain operator metrics 信息進(jìn)行合并上報,因?yàn)樗谋O(jiān)控信息基本相同,這樣可以減少上報的數(shù)據(jù)量。
- 二是通過 restful 接口動態(tài)啟停監(jiān)控,這樣只有在有異常時才會進(jìn)行采樣和監(jiān)控,正常情況下不影響作業(yè)的運(yùn)行。
- 三是通過對采樣進(jìn)行周期性的合并和處理,實(shí)現(xiàn)了對任務(wù) pipeline 數(shù)據(jù)量和延遲的預(yù)估以及監(jiān)控功能。
restful 接口動態(tài)啟停監(jiān)控功能不僅能動態(tài)啟停心跳監(jiān)控,我們發(fā)現(xiàn)還有其他場景也能從這個功能中受益,因此我們對其進(jìn)行了擴(kuò)展。簡單的代碼修改就能讓它支持其他配置的動態(tài)調(diào)整,包括 Checkpoint 配置,如 Checkpoint 周期和超時時間,動態(tài)日志的級別等。
當(dāng)作業(yè)出現(xiàn)性能或 Checkpoint 問題時,可以通過 restful 接口動態(tài)開啟、問題確定后動態(tài)停止,這樣就能解決心跳信息過多的問題。在負(fù)載突增、短時數(shù)據(jù)傾斜導(dǎo)致 Checkpoint 超時,動態(tài)調(diào)整 Checkpoint 超時時間能避免作業(yè)因 Checkpoint 超時而失敗,它也能避免由于 Checkpoint 長時間不成功導(dǎo)致數(shù)據(jù)積壓更多、數(shù)據(jù)傾斜問題更嚴(yán)重而陷入的死循環(huán)。同時它還能用于確定超時時間,用戶可以通過動態(tài)調(diào)整的方式,不斷測試最適合作業(yè)的超時時間,減少了壓測過程中的作業(yè)啟停次數(shù)。它也支持其他配置的調(diào)整,比如動態(tài)調(diào)整日志級別,但是需要注意的是調(diào)整后的配置并沒有持久化,會因?yàn)榭蚣苤貑⒒蜃鳂I(yè)的重啟而失效。
04未來規(guī)劃
未來,我們將在以下方面繼續(xù)探索:
- 持續(xù)開發(fā)并優(yōu)化自動彈性伸縮容的功能。Flink1.13 開始提供了自動彈性伸縮容的功能,但是目前并不完善,要在生產(chǎn)環(huán)境上用起來還需要做不少的工作。
- 版本收斂是很多Flink開發(fā)人員都會遇到的一個問題。Flink 社區(qū)的發(fā)展比較快,版本的發(fā)布和迭代也是非常快。為了降低運(yùn)維壓力,緊跟社區(qū),這也是勢在必行的。
- 對 state 讀寫性能進(jìn)行優(yōu)化,提升大狀態(tài)作業(yè)的性能。
- Heartbeat timeout 也是目前線上對穩(wěn)定性影響比較大的問題,我們也會進(jìn)行跟進(jìn)和優(yōu)化。
- 對作業(yè)啟動和恢復(fù)性能進(jìn)行優(yōu)化,減少作業(yè)因各種原因造成的斷流是 Flink 社區(qū)和許多業(yè)務(wù)非常關(guān)注的問題,我們同樣也有面臨著這樣的壓力。
- 繼續(xù)打磨批流融合能力,完善對 batch 模式和數(shù)據(jù)湖等的支持,也是現(xiàn)在的熱點(diǎn),我們希望能在這上面進(jìn)行更多探索,從而更好地支撐業(yè)務(wù),也讓 Flink 的應(yīng)用更加廣泛。?