背感壓力,Flink背壓你了解多少?
本文轉載自微信公眾號「大數據左右手」,作者左右。轉載本文請聯系大數據左右手公眾號。
了解背壓
什么是背壓
在流式處理系統中,如果出現下游消費的速度跟不上上游生產數據的速度,就種現象就叫做背壓(backpressure,有人叫反壓,不糾結,本篇叫背壓)。本篇主要以Flink作為流式計算框架來簡單背壓機制,為了更好理解,只做簡單分享。
背壓產生的原因
下游消費的速度跟不上上游生產數據的速度,可能出現的原因如下:
(1)節點有性能瓶頸,可能是該節點所在的機器有網絡、磁盤等等故障,機器的網絡延遲和磁盤不足、頻繁GC、數據熱點等原因。
(2)數據源生產數據的速度過快,計算框架處理不及時。比如消息中間件kafka,生產者生產數據過快,下游flink消費計算不及時。
(3)flink算子間并行度不同,下游算子相比上游算子過小。
背壓導致的影響
首先,背壓不會直接導致系統的崩盤,只是處在一個不健康的運行狀態。
(1)背壓會導致流處理作業數據延遲的增加。
(2)影響到Checkpoint,導致失敗,導致狀態數據保存不了,如果上游是kafka數據源,在一致性的要求下,可能會導致offset的提交不上。
原理: 由于Flink的Checkpoint機制需要進行Barrier對齊,如果此時某個Task出現了背壓,Barrier流動的速度就會變慢,導致Checkpoint整體時間變長,如果背壓很嚴重,還有可能導致Checkpoint超時失敗。
(3)影響state的大小,還是因為checkpoint barrier對齊要求。導致state變大。
原理:接受到較快的輸入管道的barrier后,它后面數據會被緩存起來但不處理,直到較慢的輸入管道的barrier也到達。這些被緩存的數據會被放到state 里面,導致state變大。
如何查找定位背壓
(1)在web頁面發現fink的checkpoint生成超時, 失敗。
(2)查看jobmanager日志
- 2021-10-17 19:43:19,235 org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- -Checkpoint 236663 of job d521558603f6ef25dfd053c665d6afbe expired before completing
(3)在BackPressure界面直接可以看到。背壓狀態可以大致鎖定背壓可能存在的算子,但具體背壓是由于當前Task自身處理速度慢還是由于下游Task處理慢導致的,需要通過metric監控進一步判斷。
原理:BackPressure界面會周期性的對Task線程棧信息采樣,通過線程被阻塞在請求Buffer的頻率來判斷節點是否處于背壓狀態。計算緩沖區阻塞線程數與總線程數的比值 rate。其中,rate < 0.1 為 OK,0.1 <= rate <= 0.5 為 LOW,rate > 0.5 為 HIGH。
(4)Metrics 監控背壓。緩沖區的數據處理不過來,barrier流動慢,導致checkpoint生成時間長, 出現超時的現象。input 和 output緩沖區都占滿。
outPoolUsage 與 inPoolUsage
指標 | 描述 |
---|---|
outPoolUsage | 發送端Buffer的使用率 |
inPoolUsage | 接收端Buffer的使用率 |
指標可能出現以下情況:
(1)outPoolUsage與inPoolUsage 都低,代表當前Subtask正常。
(2)outPoolUsage與inPoolUsage 都高,代表當前Subtask下游背壓。
(3)outPoolUsage 高,通常是被下游 Task 所影響。
(4)inPoolUsage高,則表明它有可能是背壓的根源。因為通常背壓會傳導至其上游,導致上游某些 Subtask 的 outPoolUsage 為高。
inputFloatingBuffersUsage 與 inputExclusiveBuffersUsage
指標 | 描述 |
---|---|
inputFloatingBuffersUsage | 每個 Operator 實例對應一個FloatingBuffers,inputFloatingBuffersUsage 表示 Operator 對應的FloatingBuffers 使用率。 |
inputExclusiveBuffersUsage | 每個 Operator實例的每個遠程輸入 通道(RemoteInputChannel)都有自己的一組獨占緩沖區(ExclusiveBuffer),inputExclusiveBuffersUsage表示 ExclusiveBuffer 的使用率。 |
指標可能出現以下情況:
(1)floatingBuffersUsage高,則表明背壓正在傳導至上游。
(2)floatingBuffersUsage 高、exclusiveBuffersUsage 低,則表明了背壓可能存在傾斜。
背壓的原理
基于 Credit-based Flow Control的背壓機制
Credit 的反饋策略,保證每次上游發送的數據都是下游 InputChannel 可以承受的數據量。具體原理是這樣的:
(1)上游 SubTask 給下游 SubTask 發送數據時,會把 Buffer 中要發送的數據和上游 ResultSubPartition堆積的數據量 Backlog size 發給下游,下游接收到上游發來的 Backlog size 后,會向上游反饋現在的 Credit值,Credit 值表示目前下游可以接收上游的 Buffer 量,1 個Buffer 等價于 1 個 Credit。上游接收到下游反饋的Credit 值后,上游下次最多只會發送 Credit 個數據到下游,保障不會有數據積壓在 Socket 這一層。
(2)當下游 SubTask 反壓比較嚴重時,可能就會向上游反饋 Channel Credit = 0,此時上游就知道下游目前對應的InputChannel 沒有可用空間了,所以就不向下游發送數據了。
(3)上游會定期向下游發送探測信號,檢測下游返回的 Credit 是否大于 0,當下游返回的 Credit 大于 0 表示下游有可用的Buffer 空間,上游就可以開始向下游發送數據了。
圖集流程上面流程
(1)上游 SubTask a 發送完數據后,還有 4 個 Buffer 被積壓,那么會把發送數據和 Backlog size = 4 一塊發送給下游 SubTask b。
(2)下游接受到數據后,知道上游積壓了 4 個Buffer,于是向 Buffer Pool 申請 Buffer,由于容量有限,下游 InputChannel 目前僅有 2 個 Buffer 空間。
(3)SubTask b 會向上游 SubTask a 反饋 Channel Credit = 2。然后上游下一次最多只給下游發送 2 個 Buffer 的數據,這樣每次上游發送的數據都是下游 InputChannel 的 Buffer 可以承受的數據量。
建議
參考官網【https://flink.apache.org/2019/07/23/flink-network-stack-2.html】
自行了解老版本TCP-based 背壓機制,這里不再闡述。
解決背壓
Flink不需要一個特殊的機制來處理背壓,因為Flink中的數據傳輸相當于已經提供了應對背壓的機制。所以只有從代碼上與資源上去做一些調整。
(1)背壓部分原因可能是由于數據傾斜造成的,我們可以通過 Web UI 各個 SubTask 的 指標值來確認。Checkpoint detail 里不同 SubTask 的 State size 也是一個分析數據傾斜的有用指標。解決方式把數據分組的 key 預聚合來消除數據傾斜。
(2)代碼的執行效率問題,阻塞或者性能問題。
(3)TaskManager 的內存大小導致背壓。