徹底搞清 Flink 中的 Window 機制
一、 為什么需要Window
在流處理應用中,數據是連續不斷的,有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少用戶點擊了我們的網頁。
在這種情況下,我們必須定義一個窗口(window),用來收集最近1分鐘內的數據,并對這個窗口內的數據進行計算
二、Window的分類
2.1 按照time和count分類
time-window:時間窗口:根據時間劃分窗口,如:每xx分鐘統計最近xx分鐘的數據
count-window:數量窗口:根據數量劃分窗口,如:每xx個數據統計最近xx個數據
2.2 按照slide和size分類
窗口有兩個重要的屬性: 窗口大小size和滑動間隔slide,根據它們的大小關系可分為:
tumbling-window:滾動窗口:size=slide,如:每隔10s統計最近10s的數據
sliding-window:滑動窗口:size>slide,如:每隔5s統計最近10s數據
注意:當size<slide的時候,如每隔15s統計最近10s的數據,那么中間5s
小結
按照上面窗口的分類方式進行組合,可以得出如下的窗口:
- 基于時間的滾動窗口tumbling-time-window--用的較多
- 基于時間的滑動窗口sliding-time-window--用的較多
- 基于數量的滾動窗口tumbling-count-window--用的較少
- 基于數量的滑動窗口sliding-count-window--用的較少
注意:Flink還支持一個特殊的窗口:Session會話窗口,需要設置一個會話超時時間,如30s,則表示30s內沒有數據到來,則觸發上個窗口的計算
三、WindowAPI
3.1 window和windowAll
使用keyby的流,應該使用window方法
未使用keyby的流,應該調用windowAll方法
區別:
Window算子:是可以設置并行度的
WindowAll 算子:并行度始終為1
3.2 WindowAssigner
Windows Assigner的作用是指定窗口的類型,定義如何將數據流分配到一個或者多個窗口,API中通過window (WindowsAssigner assigner)指定。在Flink中支持兩種類型的窗口,一種是基于時間的窗口(TimeWindow),另一種是基于數量的窗口(countWindow)。窗口所表現出的類型特性取決于window assigner的定義。
Flink底層Window模型僅有TimeWindow以及GlobalWindow。
Flink提供了很多各種場景用的WindowAssigner:
如果需要自定制數據分發策略,則可以實現一個 class,繼承自 WindowAssigner。
3.3 evictor
evictor 主要用于做一些數據的自定義操作,可以在執行用戶代碼之前,也可以在執行
用戶代碼之后,更詳細的描述可以參考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter兩個方法。
Flink 提供了如下三種通用的 evictor:
CountEvictor 保留指定數量的元素
TimeEvictor 設定一個閾值 interval,刪除所有不再 max_ts - interval 范圍內的元
素,其中 max_ts 是窗口內時間戳的最大值。
DeltaEvictor 通過執行用戶給定的 DeltaFunction 以及預設的 theshold,判斷是否刪 除一個元素。
3.4 trigger
trigger 用來判斷一個窗口是否需要被觸發,每個 WindowAssigner 都自帶一個默認的trigger,
如果默認的 trigger 不能滿足你的需求,則可以自定義一個類,繼承自Trigger 即可,我們詳細描述下 Trigger 的接口以及含義:
onEventTime() 當 event-time timer 被觸發的時候會調用
onElement() 每次往 window 增加一個元素的時候都會觸發
onMerge() 對兩個 `rigger 的 state 進行 merge 操作
clear() window 銷毀的時候被調用
上面的接口中前三個會返回一個 TriggerResult, TriggerResult 有如下幾種可能的選 擇:
- CONTINUE 不做任何事情
- FIRE 觸發 window
- PURGE 清空整個 window 的元素并銷毀窗口
- PURGE 清空整個 window 的元素并銷毀窗口
四、WindowAPI調用案例示例
4.1 基于時間的滾動和滑動窗口
測試數據
- 信號燈編號和通過該信號燈的車的數量
- 9,3
- 9,2
- 9,7
- 4,9
- 2,6
- 1,5
- 2,3
- 5,7
- 5,4
需求1:每5秒鐘統計一次,最近5秒鐘內,各個路口通過紅綠燈汽車的數量--基于時間的滾動窗口
需求2:每5秒鐘統計一次,最近10秒鐘內,各個路口通過紅綠燈汽車的數量--基于時間的滑動窗口
- package com.flink.source
- import org.apache.flink.api.common.functions.MapFunction
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
- import org.apache.flink.streaming.api.windowing.time.Time;
- /**
- * @Package com.flink.source
- * @File :WindowDemo_TimeWindow.java
- * @author 大數據老哥
- * @date 2021/10/26 10:50
- * @version V1.0
- */
- object WindowDemo_TimeWindow {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val socketData = env.socketTextStream("192.168.100.101", 9999)
- val socketMap = socketData.map(new MapFunction[String, CartInfo]() {
- override def map(t: String): CartInfo = {
- val arr = t.split(",")
- CartInfo(arr(0), arr(1).toInt)
- }
- })
- //需求1:每5秒鐘統計一次,最近5秒鐘內,各個路口通過紅綠燈汽車的數量
- val result = socketMap.keyBy(_.sensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count")
- //需求2:每5秒鐘統計一次,最近10秒鐘內,各個路口通過紅綠燈汽車的數量
- val result2 = socketMap.keyBy(_.sensorId).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(10))).sum("count")
- result.print()
- result2.print()
- env.execute("winds")
- }
- }
- case class CartInfo(var sensorId: String, var count: Int)
4.2 基于數量的滾動和滑動窗口
測試數據
- 信號燈編號和通過該信號燈的車的數量
- 9,3
- 9,2
- 9,7
- 4,9
- 2,6
- 1,5
- 2,3
- 5,7
- 5,4
需求1:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現5次進行統計--基于數量的滾動窗口
需求2:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現3次進行統計--基于數量的滑動窗口
- package com.flink.source
- import org.apache.flink.api.common.functions.MapFunction
- import org.apache.flink.streaming.api.scala._
- /**
- * @Package com.flink.source
- * @File :WindosDemoo_CountWindos.java
- * @author 大數據老哥
- * @date 2021/10/26 14:04
- * @version V1.0
- */
- object WindowDemo_CountWindow {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val socketData = env.socketTextStream("192.168.100.101", 9999)
- val socketMap = socketData.map(new MapFunction[String, CartInfo] {
- override def map(t: String): CartInfo = {
- val arr = t.split(",")
- CartInfo(arr(0), arr(1).toInt)
- }
- })
- // 需求1:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現5次進行統計
- val result = socketMap.keyBy(_.sensorId).countWindow(5L).sum("count")
- // 需求2:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現3次進行統計
- val result2 = socketMap.keyBy(_.sensorId).countWindow(5L,3L).sum("count")
- result.print("result")
- result2.print("result2")
- env.execute()
- }
- }
- case class CartInfo(var sensorId: String, var count: Int)
case class CartInfo(var sensorId: String, var count: Int)
4.3 會話窗口
測試數據
- 信號燈編號和通過該信號燈的車的數量
- 9,3
- 9,2
- 9,7
- 4,9
- 2,6
- 1,5
- 2,3
- 5,7
- 5,4
設置會話超時時間為10s,10s內沒有數據到來,則觸發上個窗口的計算
- package com.flink.source
- import org.apache.flink.api.common.functions.MapFunction
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
- import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
- import org.apache.flink.streaming.api.windowing.time.Time
- /**
- * @Package com.flink.source
- * @File :WindowDemo_SessionWindow.java
- * @author 大數據老哥
- * @date 2021/11/1 16:10
- * @version V1.0
- */
- object WindowDemo_SessionWindow {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val socketData = env.socketTextStream("192.168.100.101", 9999)
- val socketMap: SingleOutputStreamOperator[CartInfo] = socketData.map(new MapFunction[String, CartInfo]() {
- override def map(t: String): CartInfo = {
- val arr = t.split(",")
- CartInfo(arr(0), arr(1).toInt)
- }
- })
- //設置會話超時時間為10s,10s內沒有數據到來,則觸發上個窗口的計算
- val result = socketMap.keyBy(0)
- .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
- .sum("count")
- result.print()
- env.execute("winds")
- }
- }
- case class CartInfo(var sensorId: String, var count: Int)