我們一起深入理解Flink State
寫在前面
State是指流計算過程中計算節點的中間計算結果或元數據屬性,比如 在aggregation過程中要在state中記錄中間聚合結果,比如 Apache Kafka 作為數據源時候,我們也要記錄已經讀取記錄的offset,這些State數據在計算過程中會進行持久化(插入或更新)。本文將詳細介紹一下Flink State,通過本文,你可以了解到:
- State分類
- 什么是狀態后端(state backend)
- State對擴縮容的處理
感謝關注,希望本文對你有所幫助。
State分類
Flink 中的狀態分為兩種主要類型:Keyed State 和 Operator State。
Keyed State
- 概念:Keyed State 是和鍵(key)相關聯的狀態。在 Flink 的 Keyed Streams 上進行有狀態操作時(例如在使用 keyBy 方法后),每個 key 都會有自己的狀態實例,這個狀態是獨立的,即每個 key 的狀態對于其他 keys 不可見。
- 用法:Keyed State 常用于需要按 key 進行分區處理的情況,如聚合計算(sum、min、max)、窗口操作和其他需要按 key 維護和更新狀態的計算。在 SQL 語句中,Keyed State 對應的就是通過 GroupBy 或 PartitionBy 所定義的字段分組。
- 數據結構:Keyed State 底層通常是基于哈希表的實現,確保每個 key 都能快速地找到對應的狀態。這種狀態通常存儲在 Keyed State 后端中,可以是內存中,也可以是 RocksDB 這種本地存儲。
Operator State
- 概念:Operator State 與特定的操作符實例(Task)相關聯,而不是和特定的 key 關聯。每個操作符實例維護自己的狀態,所有的 Operator State 實例對于同一操作符是可見的。
- 用法:Operator State 通常用于記錄源(Source)和接收器(Sink)的相關狀態,或者用于需要操作符級別聚合的場合。例如,一個 Source Connector 可能會使用 Operator State 來記錄已經讀取的數據源的 offset。
- 實現:Flink 提供了幾種不同的 Operator State 類型,包括列表狀態(ListState)、聯合列表狀態(UnionListState)、廣播狀態(BroadcastState)等。這些狀態通常存儲在 Operator State 后端中,可以是內存中,也可以是持久化存儲。
值得注意的是:
在 Flink 的 Table API 或 SQL API 中,對于內部的 GroupBy/PartitionBy 操作,Flink 會自動管理 Keyed State。而對于 Source Connector 記錄 offset 這樣的操作,通常是在底層的 DataStream API 中實現的,可能直接使用 Operator State 來管理。例如,Flink Kafka Consumer 會使用 Operator State 來存儲 Kafka 主題的分區 offset,以便在發生故障時能夠從上次成功的檢查點恢復。
什么是狀態后端(state backend)
State的具體存儲、訪問和維護是由**狀態后端(state backend)**決定的。狀態后端主要負責兩件事情:
- 本地狀態管理
- 將狀態以checkpoint的形式寫入遠程存儲
Flink提供了三種狀態后端:
MemoryStateBackend(內存狀態后端)
- 存儲:狀態存儲在 TaskManager 的 JVM 堆內存上。生成checkpoint時,*MemoryStateBackend會將狀態發送至JobManager并保存到它的堆內存中。
- 使用場景:適用于小規模狀態或本地測試,因為它將所有狀態作為序列化數據保存在 JVM 堆上。如果 TaskManager 發生故障,狀態會丟失。
- 性能:由于狀態是直接存儲在內存中的,所以訪問速度很快。
- 限制:狀態大小受限于 TaskManager 可用內存。大規模狀態可能導致內存溢出錯誤。
FsStateBackend(文件系統狀態后端)
- 存儲:狀態存儲在 TaskManager 的 JVM 堆內存中(作為緩存),但在檢查點(checkpoint)時,會持久化到配置的文件系統(如 HDFS)中。
- 使用場景:適用于需要持久化狀態以避免數據丟失的場景。在發生故障時,Flink 作業可以從文件系統中的檢查點恢復狀態。
- 性能:由于狀態在內存中進行操作,并在檢查點時異步寫入文件系統,因此可以提供較快的狀態訪問速度,但可能受文件系統性能的限制。
- 限制:內存中的狀態大小仍然受限于 TaskManager 可用內存,但由于檢查點數據被寫入到更穩定的文件系統,因此可以支持更大的狀態。
RocksDBStateBackend(RocksDB 狀態后端)
RocksDB是一個嵌入式鍵值存儲(key-value store),它可以將數據保存到本地磁盤上,為了從RocksDB中讀寫數據,系統需要對數據進行序列化和反序列化。
- 存儲:狀態存儲在本地磁盤上的 RocksDB 數據庫中,檢查點數據會持久化到配置的文件系統中。
- 使用場景:適用于大規模狀態管理的場景。由于 RocksDB 是一個優化的鍵值存儲,因此可以有效地管理大量狀態數據。
- 性能:狀態訪問速度可能比內存狀態后端慢(磁盤讀寫以及序列化和反序列化對象的開銷),但 RocksDB 提供了針對大量狀態數據的優化。
- 限制:對本地磁盤空間有需求,但由于狀態是在本地磁盤上操作,因此可以支持非常大的狀態。
在選擇狀態后端時,需要考慮應用的狀態大小、恢復速度、持久性和部署環境。對于生產環境,通常推薦使用 RocksDBStateBackend,因為它能夠提供良好的擴展性和容錯性。
State對擴縮容的處理
Operator State 的擴容處理
在 Apache Flink 中,對于有狀態的流處理作業,當作業進行擴容(scaling out)或縮容(scaling in)時,即增加或減少并行子任務的數量時,Flink 需要重新分配 OperatorState。這個過程稱為狀態重分配(state redistribution)。
對于 Operator State 的擴容處理,Flink 提供了不同的重分配模式來處理狀態:
ListState
對于 ListState 類型的 Operator State,如果流任務的并行度從 N 增加到 M,Flink 會將每個并行實例的狀態分成 M 份,然后將這些分片分配給新的并行實例。如果并行度減少,則相反,狀態將會聚合起來。
圖片
擴容時:
- 假設原來有 2 個并行實例,每個實例有自己的 ListState。
- 擴容到 3 個并行實例。
- Flink 會將每個原來的 ListState 平均分成 3 份。
- 新的 3 個并行實例每個都會接收一份來自每個原始 ListState 的數據。
縮容時:
- 假設原來有 3個并行實例。
- 縮容到 1 個并行實例。
- 現有的狀態將會被聚合,確保新的 1 個實例完整地包含原始狀態的全部數據。
BroadcastState
BroadcastState 的數據在擴容或縮容時會被復制到所有的并行實例中。由于 BroadcastState 是以廣播的方式存儲數據,所有并行實例的狀態都是相同的。
圖片
UnionListState
對于 UnionListState 類型的 Operator State,在擴容或縮容時,狀態的每個元素將保持不變,原始狀態的所有元素將被統一地分發到新的并行實例中。這意味著每個元素僅分配給一個并行實例,但所有并行實例的狀態的并集會包括所有原始狀態的元素。隨后由任務自己決定哪些條目該保留,哪些該丟棄。
圖片
思考:Source的擴容(并發數)是否可以超過Source物理存儲的partition數量呢?
在使用像 Apache Kafka 這樣的消息隊列作為數據源(Source)時,消息隊列中的數據被劃分為多個分區(partitions)。這種設計主要是為了支持數據的并行處理以及提高吞吐量。在使用 Flink 或類似的流處理框架時,一個常見的做法是將每個分區分配給一個并行的 Source 實例(也稱為 Source Task 或 Source Operator)進行處理。
如果嘗試將 Source 的并行度(并發數)設置得比物理存儲(比如 Kafka 主題)的分區數量還要高,那么將會有一些并行實例分配不到任何分區,因為分區的數量是固定的,且每個分區只能被一個并行實例消費(至少在 Flink 的默認設置下是這樣)。這會導致資源浪費,因為超出分區數量的那部分并行實例不會做任何實際的數據處理工作,但仍然占用系統資源。
因此,在設置 Source 的并行度時,通常的最佳實踐是:
- 確保 Source 的并行度不超過其對應物理存儲(如 Kafka 主題)的分區數量。
如果需要增加并行度以提高處理能力,相應地也需要增加物理存儲的分區數量。對于 Kafka 來說,可以通過修改主題的分區配置來實現。
對于 Apache Flink,如果使用的是 Flink Kafka Connector,并且嘗試將并行度設置得比 Kafka 主題的分區數量還要高,Flink 會在作業啟動時進行檢查。如果發現這種配置不匹配的情況,Flink 會拋出異常并終止作業啟動,以避免資源浪費和潛在的配置錯誤。這種設計選擇確保了資源的有效利用和處理能力的合理分配,同時也避免了由于配置錯誤而導致的潛在問題。
KeyedState對擴容的處理
- 什么是Key-Groups
KeyedState的算子在擴容時會根據新的任務數量對key進行重分區,為了降低狀態在不同任務之間遷移的成本,Flink不會單獨對key進行在分配,而是會把所有的鍵值分別存到不同的key-group中,每個key-group都包含了部分鍵值對。一個key-group是State分配的原子單位。
- 什么決定Key-Groups的個數
key-group的數量在job啟動前必須是確定的且運行中不能改變。由于key-group是state分配的原子單位,而每個operator并行實例至少包含一個key-group,因此operator的最大并行度不能超過設定的key-group的個數,那么在Flink的內部實現上key-group的數量就是最大并行度的值。
- 如何決定key屬于哪個Key-Group
為了決定一個key屬于哪個Key-Group,通常會采用一種叫做一致性哈希(Consistent Hashing)的算法。一致性哈希算法的基本思想是將所有的Key和所有的Key-Group都映射到同一個哈希環上。對每個Key進行哈希運算得到一個哈希值,然后在哈希環上找到一個順時針方向最近的Key-Group,這個Key就屬于這個Key-Group。即:Key到指定的key-group的邏輯是利用key的hashCode和maxParallelism取余操作的來分配的。
如下圖當parallelism=2,maxParallelism=10的情況下流上key與key-group的對應關系如下圖所示:
圖片
如上圖key(a)的hashCode是97,與最大并發10取余后是7,被分配到了KG-7中,流上每個event都會分配到KG-0至KG-9其中一個Key-Group中。
上面的Stateful Operation節點的最大并行度maxParallelism的值是10,也就是我們一共有10個Key-Group,當我們并發是2的時候和并發是3的時候分配的情況如下圖:
圖片
先計算每個Operator實例至少分配的Key-Group個數,將不能整除的部分N個,平均分給前N個實例。最終每個Operator實例管理的Key-Groups會在GroupRange中表示,本質是一個區間值。比如上圖是2->3擴容,那每個task的key-group的數量是:10/3≈3,也即是每個task先分3個key-group,然后把剩余的1個key-group分配給第一task。
值得注意的是:
Key-Group機制的特點就是每個具體的key(event)不關心落到具體的哪個task來處理,只關心會落到哪個Key-Group中:
- 首先 一個job運行之后,如果要復用state,不允許在修改maxParallelism。
- key 值的hash code決定落到哪個KG中,key本身不關系被哪個task處理,也就是說相同的KG在擴容前后可能被不同的task處理。
總結
State是Flink流計算的關鍵部分。Flink 中的狀態分為兩種主要類型:Keyed State 和 Operator State。Flink提供了三種狀態后端:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。對于Keyed State 和 Operator State應對擴縮容時有不同的分配方式。