成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

字節跳動使用 Flink State 的經驗分享

原創 精選
運維
本文主要分享字節跳動在使用 Flink State 上的實踐經驗,內容包括 Flink State 相關實踐以及部分字節內部在引擎上的優化,希望可以給 Flink 用戶的開發及調優提供一些借鑒意義。

本文主要分享字節跳動在使用 Flink State 上的實踐經驗,內容包括 Flink State 相關實踐以及部分字節內部在引擎上的優化,希望可以給 Flink 用戶的開發及調優提供一些借鑒意義。

前言

Flink 作業需要借助 State 來完成聚合、Join 等有狀態的計算任務,而 State 也一直都是作業調優的一個重點。目前 State 和 Checkpoint 已經在字節跳動內部被廣泛使用,業務層面上 State 支持了數據集成、實時數倉、特征計算、樣本拼接等典型場景;作業類型上支持了 Map-Only 類型的通道任務、ETL 任務,窗口聚合計算的指標統計任務,多流 Join 等存儲數據明細的數據拼接任務。

以 WordCount 為例,假設我們需要統計 60 秒窗口內 Word 出現的次數:

select
word,
TUMBLE_START(eventtime, INTERVAL '60' SECOND) as t,
count(1)
from
words_stream
group by
TUMBLE(eventtime, INTERVAL '60' SECOND), word

每個還未觸發的 60s 窗口內,每個 Word 對應的出現次數就是 Flink State,窗口每收到新的數據就會更新這個狀態直到最后輸出。為了防止作業失敗,狀態丟失,Flink 引入了分布式快照 Checkpoint 的概念,定期將 State 持久化到 Hdfs 上,如果作業 Failover,會從上一次成功的 checkpoint 恢復作業的狀態(比如 kafka 的 offset,窗口內的統計數據等)。

在不同的業務場景下,用戶往往需要對 State 和 Checkpoint 機制進行調優,來保證任務執行的性能和 Checkpoint 的穩定性。閱讀下方內容之前,我們可以回憶一下,在使用 Flink State 時是否經常會面臨以下問題:

  • 某個狀態算子出現處理瓶頸時,加資源也沒法提高性能,不知該如何排查性能瓶頸
  • Checkpoint 經常出現執行效率慢,barrier 對齊時間長,頻繁超時的現象
  • 大作業的 Checkpoint 產生過多小文件,對線上 HDFS 產生小文件壓力
  • RocksDB 的參數過多,使用的時候不知該怎么選擇
  • 作業擴縮容恢復時,恢復時間過長導致線上斷流

State 及 RocksDB 相關概念介紹

State 分類

由于 OperatorState 背后的 StateBackend 只有 DefaultOperatorStateBackend,所以用戶使用時通常指定的 FsStateBackend 和 RocksDBStateBackend 兩種,實際上指定的是 KeyedState 對應的 StateBackend 類型:

  • FsStateBackend:DefaultOperatorStateBackend 和 HeapKeyedStateBackend 的組合
  • RocksDBStateBackend:DefaultOperatorStateBackend 和 RocksDBKeyedStateBackend 的組合

RocksDB 介紹

RocksDB 是嵌入式的 Key-Value 數據庫,在 Flink 中被用作 RocksDBStateBackend 的底層存儲。如下圖所示,RocksDB 持久化的 SST 文件在本地文件系統上通過多個層級進行組織,不同層級之間會通過異步 Compaction 合并重復、過期和已刪除的數據。在 RocksDB 的寫入過程中,數據經過序列化后寫入到 WriteBuffer,WriteBuffer 寫滿后轉換為 Immutable Memtable 結構,再通過 RocksDB 的 flush 線程從內存 flush 到磁盤上;讀取過程中,會先嘗試從 WriteBuffer 和 Immutable Memtable 中讀取數據,如果沒有找到,則會查詢 Block Cache,如果內存中都沒有的話,則會按層級查找底層的 SST 文件,并將返回的結果所在的 Data Block 加載到 Block Cache,返回給上層應用。

RocksDBKeyedStateBackend 增量快照介紹

這里介紹一下大家在大狀態場景下經常需要調優的 RocksDBKeyedStateBackend 增量快照。RocksDB 具有 append-only 特性,Flink 利用這一特性將兩次 checkpoint 之間 SST 文件列表的差異作為狀態增量上傳到分布式文件系統上,并通過 JobMaster 中的 SharedStateRegistry 進行狀態的注冊和過期。

如上圖所示,Task 進行了 3 次快照(假設作業設置保留最近 2 次 Checkpoint):

  • CP-1:RocksDB 產生 sst-1 和 sst-2 兩個文件,Task 將文件上傳至 DFS,JM 記錄 sst 文件對應的引用計數
  • CP-2:RocksDB 中的 sst-1 和 sst-2 通過 compaction 生成了 sst-1,2,并且新生成了 sst-3 文件,Task 將兩個新增的文件上傳至 DFS,JM 記錄 sst 文件對應的引用計數
  • CP-3:RocksDB 中新生成 sst-4 文件,Task 將增量的 sst-4 文件上傳至 DFS,且在 CP-3 完成后,由于只保留最近 2 次 CP,JobMaster 將 CP-1 過期,同時將 CP-1 中的 sst 文件對應的引用計數減 1,并刪除引用計數歸 0 的 sst 文件(sst-1 和 sst-2)

增量快照涉及到 Task 多線程上傳/下載增量文件,JobMaster 引用計數統計,以及大量與分布式文件系統的交互等過程,相對其他的 StateBackend 要更為復雜,在 100+GB 甚至 TB 級別狀態下,作業比較容易出現性能和穩定性瓶頸的問題。

State 實踐經驗

提升 State 操作性能

用戶在使用 State 時,會發現操作 State 并不是一件很"容易"的事情,如果使用 FsStateBackend,會經常遇到 GC 問題、頻繁調參等問題;如果使用 RocksDBStateBackend,涉及到磁盤讀寫,對象序列化,在缺乏相關 Metrics 的情況下又不是很容易進行性能問題的定位,或者面對 RocksDB 的大量參數不知道如何調整到最優。

目前字節跳動內有 140+ 作業的狀態大小達到了 TB 級別,單作業的最大狀態為 60TB,在逐步支持大狀態作業的實踐中,我們積累了一些 State 的調優經驗,也做了一些引擎側的改造以支持更好的性能和降低作業調優成本。

選擇合適的 StateBackend

我們都知道 FsStateBackend 適合小狀態的作業,而 RocksDBStateBackend 適合大狀態的作業,但在實際選擇 FsStateBackend 時會遇到以下問題:

  • 進行開發之前,對狀態大小無法做一個準確的預估,或者做狀態大小預估的復雜度較高
  • 隨著業務增長,所謂的 "小狀態" 很快就變成了 "大狀態",需要人工介入做調整
  • 同樣的狀態大小,由于狀態過期時間不同,使用 FsStateBackend 產生 GC 壓力也不同

針對上面 FsStateBackend 中存在的若干個問題,可以看出 FsStateBackend 的維護成本還是相對較高的。在字節內部,我們暫時只推薦部分作業總狀態小于 1GB 的作業使用 FsStateBackend,而對于大流量業務如短視頻、直播、電商等,我們更傾向于推薦用戶使用 RocksDBStateBackend 以減少未來的 GC 風險,獲得更好的穩定性。

隨著內部硬件的更新迭代,ssd 的推廣,長遠來看我們更希望將 StateBackend 收斂到 RocksDBStateBackend 來提高作業穩定性和減少用戶運維成本;性能上期望在小狀態場景下,RocksDBStateBackend 可以和 FsStateBackend 做到比較接近或者打平。

觀測性能指標,使用火焰圖分析瓶頸

社區版本的 Flink 使用 RocksDBStateBackend 時,如果遇到性能問題,基本上是很難判斷出問題原因,此時建議打開相關指標進行排查[1]。另外,在字節跳動內部,造成 RocksDBStateBackend 性能瓶頸的原因較多,我們構建了一套較為完整的 RocksDB 指標體系,并在 Flink 層面上默認透出了部分關鍵的 RocksDB 指標,并新增了 State 相關指標,部分指標的示意圖如下:

造成 RocksDB 性能瓶頸的常見如下:

  • 單條記錄的 State Size 過大,由于 RocksDB 的 append-only 的特性,write buffer 很容易打滿,造成數據頻繁刷盤和 Compaction,搶占作業 CPU
  • Operator 內部的 RocksDB 容量過大,如 Operator 所在的 RocksDB 實例大小超過 15GB 我們就會比較明顯地看到 Compaction 更加頻繁,并且造成 RocksDB 頻繁的 Write Stall
  • 硬件問題,如磁盤 IO 打滿,從 State 操作的 Latency 指標可以看出來,如果長時間停留在秒級別,說明硬件或者機器負載偏高

除了以上指標外,另外一個可以相配合的方法是火焰圖,常見方法比如使用阿里的 arthas[2]?;鹧鎴D內部會展示 Flink 和 RocksDB 的 CPU 開銷,示意圖如下:

如上所示,可以看出火焰圖中 Compaction 開銷是占比非常大的,定位到 Compaction 問題后,我們可以再根據 Value Size、RocksDB 容量大小、作業并行度和資源等進行進一步的分析。

使用合理的 RocksDB 參數

除了 Flink 中提供的 RocksDB 參數[3]之外,RocksDB 還有很多調優參數可供用戶使用。用戶可以通過自定義 RocksDBOptionsFactory 來做 RocksDB 的調優[4]。經過內部的一些實踐,我們列舉兩個比較有效的參數:

  • 關閉 RocksDB 的 compression(需要自定義 RocksDBOptionsFactory):RocksDB 默認使用 snappy 算法對數據進行壓縮,由于 RocksDB 的讀寫、Compaction 都存在壓縮的相關操作,所以在對 CPU 敏感的作業中,可以通過ColumnFamilyOptions.setCompressionType(CompressionType.NO_COMPRESSION) 將壓縮關閉,采用磁盤空間容量換 CPU 的方式來減少 CPU 的損耗
  • 開啟 RocksDB 的 bloom-filter(需要自定義 RocksDBOptionsFactory):RocksDB 默認不使用 bloom-filter[5],開啟 bloom-filter 后可以節省一部分 RocksDB 的讀開銷
  • 其他 cache、writebuffer 和 flush/compaction 線程數的調整,同樣可以在不同場景下獲得不同的收益,比如在寫少多讀的場景下,我們可以通過調大 Cache 來減少磁盤 IO

這里要注意一點,由于很多參數都以內存或磁盤來換取性能上的提高,所以以上參數的使用需要結合具體的性能瓶頸分析才能達到最好的效果,比如在上方的火焰圖中可以明顯地看到 snappy 的壓縮占了較大的 CPU 開銷,此時可以嘗試 compression 相關的參數。

關注 RocksDBStateBackend 的序列化開銷

使用 RocksDB State 的相關 API,Key 和 Value 都是需要經過序列化和反序列化,如果 Java 對象較復雜,并且用戶沒有自定義 Serializer,那么它的序列化開銷也會相對較大。比如去重操作中常用的 RoaringBitmap,在序列化和反序列化時,MB 級別的對象的序列化開銷達到秒級別,這對于作業性能是非常大的損耗。因此對于復雜對象,我們建議:

  • 業務上嘗試在 State 中使用更精簡的數據結構,去除不需要存儲的字段
  • StateDescriptor 中通過自定義 Serializer 來減小序列化開銷
  • 在 KryoSerializer 顯式注冊 PB/Thrift Serializer[6]
  • 減小 State 的操作次數,比如下方的示例代碼,如果是使用 FsStateBackend ,則沒有太多性能損耗;但是在 RocksDBStateBackend 上因為兩次 State 的操作導致 userKey 產生了額外一次序列化的開銷,如果 userKey 本身是個相對復雜的對象就要注意了
if (mapState.contains(userKey)) {
UV userValue = mapState.get(userKey);
}

更多關于序列化的性能和指導可以參考社區的調優文檔[7]。

構建 RocksDB State 的緩存

上面提到 RocksDB 的序列化開銷可能會比較大,字節跳動內部在 StateBackend 和 Operator 中間構建了 StateBackend Cache Layer,負責緩存算子內部的熱點數據,并且根據 GC 情況進行動態擴縮容,對于有熱點的作業收益明顯。

同樣,對于用戶而言,如果作業熱點明顯的話,可以嘗試在內存中構建一個簡單的 Java 對象的緩存,但是需要注意以下幾點:

  • 控制緩存的閾值,防止緩存對象過多造成 GC 壓力過大
  • 注意緩存中 State TTL 邏輯處理,防止出現臟讀的情況

降低 Checkpoint 耗時

Checkpoint 持續時間和很多因素相關,比如作業反壓、資源是否足夠等,在這里我們從 StateBackend 的角度來看看如何提高 Checkpoint 的成功率。一次 Task 級別的快照可以劃分為以下幾個步驟:

等待 checkpointLock:Source Task 中,觸發 Checkpoint 的 Rpc 線程需要等待 Task 線程完成當前數據處理后,釋放 checkpointLock 后才能觸發 checkpoint,這一步的耗時主要取決于用戶的處理邏輯及每條數據的處理時延

收集 Barrier: 非 Source 的 Task 中,這一步是將上游所有 Task 發送的 checkpoint barrier 收集齊,這一步的耗時主要在 barrier 在 buffer 隊列中的排隊時間

同步階段:執行用戶自定義的 snapshot 方法以及 StateBackend 上的元信息快照,比如 FsStateBackend 在同步階段會對內存中的狀態結構做淺拷貝

異步階段:將狀態數據或文件上傳到 DFS

字節跳動內部,我們也針對這四個步驟構建了相關的監控看板:

生產環境中,「等待 checkpointLock」和「同步階段」更多是在業務邏輯上的耗時,通常耗時也會相對較短;從 StateBackend 的層面上,我們可以對「收集 Barrier」和「異步階段」這兩個階段進行優化來降低 Checkpoint 的時長。

減少 Barrier 對齊時間

減少 Barrier 對齊時間的核心是降低 in-flight 的 Buffer 總大小,即使是使用社區的 Unaligned Checkpoint 特性,如果 in-flight 的 Buffer 數量過多,會導致最后寫入到分布式存儲的狀態過大,有時候 in-flight 的 Buffer 大小甚至可能超過 State 本身的大小,反而會對異步階段的耗時產生負面影響。

  • 降低 channel 中 Buffer 的數量:Flink 1.11 版本支持在數據傾斜的環境下限制單個 channel 的最大 Buffer 數量,可以通過 taskmanager.network.memory.max-buffers-per-channel 參數進行調整
  • 降低單個 Buffer 的大小:如果單條數據 Size 在 KB 級別以下,我們可以通過降低 taskmanager.memory.segment-size 來減少單個 Buffer 的大小,從而減少 Barrier 的排隊時間

結合業務場景降低 DFS 壓力

如果在你的集群中,所有 Flink 作業都使用同一個 DFS 集群,那么業務增長到一定量級后,DFS 的 IO 壓力和吞吐量會成為「異步階段」中非常重要的一個參考指標。尤其是在 RocksDBStateBackend 的增量快照中,每個 Operator 產生的狀態文件會上傳到 DFS中,上傳文件的數量和作業并行度、作業狀態大小呈正比。而在 Flink 并行度較高的作業中,由于各個 Task 的快照基本都在同一時間發生,所以幾分鐘內,對 DFS 的寫請求數往往能夠達到幾千甚至上萬。

合理設置 state.backend.fs.memory-threshold 減小 DFS 文件數量:此參數表示生成 DFS 文件的最小閾值,小于此閾值的狀態會以 byte[] 的形式封裝在 RPC 請求內傳給 JobMaster 并持久化在 _metadata 里)。

  • 對于 Map-Only 類型的任務,通常狀態中存儲的是元信息相關的內容(如 Kafka 的消費位移),狀態相對較小,我們可以通過調大此參數避免將這些狀態落盤。Flink 1.11 版本之前,state.backend.fs.memory-threshold 默認的 1kb 閾值較小,比較容易地導致每個并行度都需要上傳自己的狀態文件,上傳文件個數和并行度成正比。我們可以結合業務場景調整此參數,將 DFS 的請求數從 N(N=并行度) 次優化到 1 次
  • 這里需要注意,如果閾值設置過高(MB級別),可能會導致 _metadata 過大,從而增大 JobMaster 恢復 Checkpoint 元信息和部署 Task 時的 GC 壓力,導致 JobMaster 頻繁 Full GC

合理設置 state.backend.rocksdb.checkpoint.transfer.thread.num 線程數減少 DFS 壓力:此參數表示制作快照時上傳和恢復快照時下載 RocksDB 狀態文件的線程數。

  • 在狀態較大的情況下,用戶為了提高 Checkpoint 效率,可能會將此線程數設置的比較大,比如超過 10,在這種情況下快照制作和快照恢復都會給 DFS 帶來非常大的瞬時壓力,尤其是對 HDFS NameNode,很有可能瞬間占滿 NameNode 的請求資源,影響其他正在執行的作業

調大 state.backend.rocksdb.writebuffer.size:此參數表示 RocksDB flush 到磁盤之前,在內存中存儲的數據大小。

  • 如果作業的吞吐比較高,Update 比較頻繁,造成了 RocksDB 目錄下的文件過多,通過調大此參數可以一定程度上通過加大文件大小來減少上傳的文件數量,減少 DFS IO 次數。

合并 RocksDBKeyedStateBackend 上傳的文件(FLINK-11937)

在社區版本的增量快照中,RocksDB 新生成的每個 SST 文件都需要上傳到 DFS,以 HDFS 為例,HDFS 的默認 Block 大小通常在 100+MB(字節跳動內部是 512MB),而 RocksDB 生成的文件通常為 100MB 以下,對于小數據量的任務甚至是 KB 級別的文件大小,Checkpoint 產生的大量且頻繁的小文件請求,對于 HDFS 的元數據管理和 NameNode 訪問都會產生比較大的壓力。

社區在 FLINK-11937 中提出了將小文件合并上傳的思路,類似的,在字節內部的實現中,我們將小文件合并的邏輯抽象成 Strategy,這樣我們可以根據 SST 文件數量、大小、存活時長等因素實現符合我們自己業務場景的上傳策略。

提高 StateBackend 恢復速度

除了 State 性能以及 DFS 瓶頸之外,StateBackend 的恢復速度也是實際生產過程中考慮的一個很重要的點,我們在生產過程中會發現,由于某些參數的設置不合理,改變作業配置和并發度會導致作業在重啟時,從快照恢復時性能特別差,恢復時間長達十分鐘以上。

謹慎使用 Union State

Union State 的特點是在作業恢復時,每個并行度恢復的狀態是所有并行度狀態的并集,這種特性導致 Union State 在 JobMaster 狀態分配和 TaskManager 狀態恢復上都比較重:

  • JobMaster 需要完成一個 NN 的遍歷,將每個并行度的狀態都賦值成所有并行度狀態的并集。(這里實際上可以使用 HashMap 將遍歷優化成 N1 的復雜度[8])
  • TaskManager 需要讀取全量 Union State 的狀態文件,比如 1000 并行度的作業在恢復時,每個并行度中的 Union State 在恢復狀態時都需要讀取 1000 個并行度 Operator 所產生的狀態文件,這個操作是非常低效的。(我們內部的優化是將 Union State 狀態在 JobMaster 端聚合成 1 個文件,這樣 TaskManager 在恢復時只需要讀取一個文件即可)

Union State 在實際使用中,除恢復速度慢的問題外,如果使用不當,對于 DFS 也會產生大量的壓力,所以建議在高并行度的作業中,盡量避免使用 Union State 以降低額外的運維負擔。

增量快照 vs 全量快照恢復

RocksDBStateBackend 中支持的增量快照和全量快照(或 Savepoint),這兩種快照的差異導致了它們在不同場景下的恢復速度也不同。其中增量快照是將 RocksDB 底層的增量 SST 文件上傳到 DFS;而全量快照是遍歷 RocksDB 實例的 Key-Value 并寫入到 DFS。

以是否擴縮容來界定場景,這兩種快照下的恢復速度如下:

非擴縮容場景:

  • 增量快照的恢復只需將 SST 文件拉到本地即可完成 RocksDB 的初始化*(多線程)
  • 全量快照的恢復需要遍歷屬于當前 Subtask 的 KeyGroup Range 下的所有鍵值對,寫入到本地磁盤并完成 RocksDB 初始化(單線程)

擴縮容場景:

  • 增量快照的恢復涉及到多組 RocksDB 的數據合并,涉及到多組 RocksDB 文件的下載以及寫入到同一個 RocksDB 中產生的大量 Compaction,Compaction 過程中會產生嚴重的寫放大
  • 全量快照的恢復和上面的非擴縮容場景一致(單線程)

這里比較麻煩的一點是擴縮容恢復時比較容易遇到長尾問題,由于單個并行度狀態過大而導致整體恢復時間被拉長,目前在社區版本下還沒有比較徹底的解決辦法,我們也在針對大狀態的作業進行恢復速度的優化,在這里基于社區已支持的功能,在擴縮容場景下給出一些加快恢復速度的建議:

  • 擴縮容恢復時盡量選擇從 Savepoint 進行恢復,可以避免增量快照下多組 Task 的 RocksDB 實例合并產生的 Compaction 開銷
  • 調整 RocksDB 相關參數,調大 WriteBuffer 大小和 Flush/Compaction 線程數,增強 RocksDB 批量將數據刷盤的能力

總結

本篇文章中,我們介紹了 State 和 RocksDB 的相關概念,并針對字節跳動內部在 State 應用上遇到的問題,給出了相關實踐的建議,希望大家在閱讀本篇文章之后,對于 Flink State 在日常開發工作中的應用,會有更加深入的認識和了解。

責任編輯:未麗燕 來源: 字節跳動技術團隊
相關推薦

2022-07-12 16:54:54

字節跳動Flink狀態查詢

2024-09-25 15:57:56

2010-09-02 12:54:30

CSS

2022-08-21 21:28:32

數據庫實踐

2011-04-18 10:56:41

PythonDropBox

2023-10-18 11:56:17

開源AI

2015-09-01 14:45:41

蘋果彭博Mesos

2018-10-16 08:54:35

Apache Flin流計算State

2022-06-08 13:25:51

數據

2022-05-23 13:30:48

數據胡實踐

2024-04-23 10:16:29

云原生

2023-01-10 09:08:53

埋點數據數據處理

2010-04-17 12:48:33

2024-11-01 17:00:03

2021-04-06 11:07:02

字節跳動組織架構

2023-01-03 16:54:27

字節跳動深度學習

2009-09-25 17:58:00

CCNA自學

2010-06-13 09:09:34

MySQL 4.0.2

2009-09-28 10:52:00

CCNA考試經驗CCNA
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 91精品国产色综合久久不卡98 | 成年免费大片黄在线观看岛国 | 91精品久久久久久久久久入口 | 精品久久国产 | 日韩有码在线播放 | 久久久高清 | 在线观看av网站 | 日韩精品免费一区 | 黄色在线播放视频 | 一区二区三区精品在线视频 | 久久久性色精品国产免费观看 | 男女羞羞在线观看 | 日韩美女一区二区三区在线观看 | 中文字幕国产第一页 | 日韩不卡一区二区三区 | 欧美精品一区在线 | 成人水多啪啪片 | 国产黄色大片在线观看 | 国产综合在线视频 | 国产精品久久久亚洲 | 欧美一区二区网站 | 日本a∨精品中文字幕在线 亚洲91视频 | 久久久国产精品网站 | 午夜视频一区 | 色综合色综合 | 日本特黄a级高清免费大片 国产精品久久性 | 欧洲妇女成人淫片aaa视频 | av黄色网| 在线免费中文字幕 | 久久久久久久久久爱 | 久久宗合色 | 99久久久久 | 丝袜天堂| av黄色免费在线观看 | www.黄色网| 天堂av中文在线 | 欧美日日| 日本视频免费 | 国产精品久久久久久婷婷天堂 | 亚洲www啪成人一区二区 | 先锋资源网 |