成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

徹底搞清 Flink 中的 Window 機制

系統 Windows
在流處理應用中,數據是連續不斷的,有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少用戶點擊了我們的網頁。

[[432700]]

一、 為什么需要Window

在流處理應用中,數據是連續不斷的,有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少用戶點擊了我們的網頁。

在這種情況下,我們必須定義一個窗口(window),用來收集最近1分鐘內的數據,并對這個窗口內的數據進行計算

二、Window的分類

2.1 按照time和count分類

time-window:時間窗口:根據時間劃分窗口,如:每xx分鐘統計最近xx分鐘的數據

count-window:數量窗口:根據數量劃分窗口,如:每xx個數據統計最近xx個數據

2.2 按照slide和size分類

窗口有兩個重要的屬性: 窗口大小size和滑動間隔slide,根據它們的大小關系可分為:

tumbling-window:滾動窗口:size=slide,如:每隔10s統計最近10s的數據

sliding-window:滑動窗口:size>slide,如:每隔5s統計最近10s數據

注意:當size<slide的時候,如每隔15s統計最近10s的數據,那么中間5s

小結

按照上面窗口的分類方式進行組合,可以得出如下的窗口:

  • 基于時間的滾動窗口tumbling-time-window--用的較多
  • 基于時間的滑動窗口sliding-time-window--用的較多
  • 基于數量的滾動窗口tumbling-count-window--用的較少
  • 基于數量的滑動窗口sliding-count-window--用的較少

注意:Flink還支持一個特殊的窗口:Session會話窗口,需要設置一個會話超時時間,如30s,則表示30s內沒有數據到來,則觸發上個窗口的計算

三、WindowAPI

3.1 window和windowAll

使用keyby的流,應該使用window方法

未使用keyby的流,應該調用windowAll方法

區別:

Window算子:是可以設置并行度的

WindowAll 算子:并行度始終為1

3.2 WindowAssigner

Windows Assigner的作用是指定窗口的類型,定義如何將數據流分配到一個或者多個窗口,API中通過window (WindowsAssigner assigner)指定。在Flink中支持兩種類型的窗口,一種是基于時間的窗口(TimeWindow),另一種是基于數量的窗口(countWindow)。窗口所表現出的類型特性取決于window assigner的定義。

Flink底層Window模型僅有TimeWindow以及GlobalWindow。

Flink提供了很多各種場景用的WindowAssigner:

如果需要自定制數據分發策略,則可以實現一個 class,繼承自 WindowAssigner。

3.3 evictor

evictor 主要用于做一些數據的自定義操作,可以在執行用戶代碼之前,也可以在執行

用戶代碼之后,更詳細的描述可以參考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter兩個方法。

Flink 提供了如下三種通用的 evictor:

CountEvictor 保留指定數量的元素

TimeEvictor 設定一個閾值 interval,刪除所有不再 max_ts - interval 范圍內的元

素,其中 max_ts 是窗口內時間戳的最大值。

DeltaEvictor 通過執行用戶給定的 DeltaFunction 以及預設的 theshold,判斷是否刪 除一個元素。

3.4 trigger

trigger 用來判斷一個窗口是否需要被觸發,每個 WindowAssigner 都自帶一個默認的trigger,

如果默認的 trigger 不能滿足你的需求,則可以自定義一個類,繼承自Trigger 即可,我們詳細描述下 Trigger 的接口以及含義:

onEventTime() 當 event-time timer 被觸發的時候會調用

onElement() 每次往 window 增加一個元素的時候都會觸發

onMerge() 對兩個 `rigger 的 state 進行 merge 操作

clear() window 銷毀的時候被調用

上面的接口中前三個會返回一個 TriggerResult, TriggerResult 有如下幾種可能的選 擇:

  • CONTINUE 不做任何事情
  • FIRE 觸發 window
  • PURGE 清空整個 window 的元素并銷毀窗口
  • PURGE 清空整個 window 的元素并銷毀窗口

四、WindowAPI調用案例示例

4.1 基于時間的滾動和滑動窗口

測試數據

  1. 信號燈編號和通過該信號燈的車的數量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

需求1:每5秒鐘統計一次,最近5秒鐘內,各個路口通過紅綠燈汽車的數量--基于時間的滾動窗口

需求2:每5秒鐘統計一次,最近10秒鐘內,各個路口通過紅綠燈汽車的數量--基于時間的滑動窗口

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.scala._ 
  5. import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows} 
  6. import org.apache.flink.streaming.api.windowing.time.Time
  7.  
  8. /** 
  9.  * @Package com.flink.source 
  10.  * @File :WindowDemo_TimeWindow.java 
  11.  * @author 大數據老哥 
  12.  * @date 2021/10/26 10:50 
  13.  * @version V1.0 
  14.  */ 
  15. object WindowDemo_TimeWindow { 
  16.   def main(args: Array[String]): Unit = { 
  17.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  18.  
  19.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  20.     val socketMap = socketData.map(new MapFunction[String, CartInfo]() { 
  21.       override def map(t: String): CartInfo = { 
  22.         val arr = t.split(","
  23.         CartInfo(arr(0), arr(1).toInt) 
  24.       } 
  25.     }) 
  26.     //需求1:每5秒鐘統計一次,最近5秒鐘內,各個路口通過紅綠燈汽車的數量 
  27.     val result = socketMap.keyBy(_.sensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count"
  28.     //需求2:每5秒鐘統計一次,最近10秒鐘內,各個路口通過紅綠燈汽車的數量 
  29.     val result2 = socketMap.keyBy(_.sensorId).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(10))).sum("count"
  30.     result.print() 
  31.     result2.print() 
  32.     env.execute("winds"
  33.   } 
  34.  
  35.  
  36. case class CartInfo(var sensorId: String, var countInt

4.2 基于數量的滾動和滑動窗口

測試數據

  1. 信號燈編號和通過該信號燈的車的數量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

需求1:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現5次進行統計--基于數量的滾動窗口

需求2:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現3次進行統計--基于數量的滑動窗口

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.scala._ 
  5.  
  6. /** 
  7.  * @Package com.flink.source 
  8.  * @File :WindosDemoo_CountWindos.java 
  9.  * @author 大數據老哥 
  10.  * @date 2021/10/26 14:04 
  11.  * @version V1.0 
  12.  */ 
  13. object WindowDemo_CountWindow { 
  14.   def main(args: Array[String]): Unit = { 
  15.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  16.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  17.     val socketMap = socketData.map(new MapFunction[String, CartInfo] { 
  18.       override def map(t: String): CartInfo = { 
  19.         val arr = t.split(","
  20.         CartInfo(arr(0), arr(1).toInt) 
  21.       } 
  22.     }) 
  23.      // 需求1:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現5次進行統計 
  24.     val result = socketMap.keyBy(_.sensorId).countWindow(5L).sum("count"
  25.      // 需求2:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現3次進行統計 
  26.     val result2 = socketMap.keyBy(_.sensorId).countWindow(5L,3L).sum("count"
  27.     result.print("result"
  28.     result2.print("result2"
  29.     env.execute() 
  30.  
  31.   } 
  32. case class CartInfo(var sensorId: String, var countInt

case class CartInfo(var sensorId: String, var count: Int)

4.3 會話窗口

測試數據

  1. 信號燈編號和通過該信號燈的車的數量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

設置會話超時時間為10s,10s內沒有數據到來,則觸發上個窗口的計算

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator 
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment 
  6. import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows 
  7. import org.apache.flink.streaming.api.windowing.time.Time 
  8.  
  9.   
  10. /** 
  11.  * @Package com.flink.source 
  12.  * @File :WindowDemo_SessionWindow.java 
  13.  * @author 大數據老哥 
  14.  * @date 2021/11/1 16:10 
  15.  * @version V1.0 
  16.  */ 
  17. object WindowDemo_SessionWindow { 
  18.   def main(args: Array[String]): Unit = { 
  19.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  20.  
  21.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  22.     val socketMap: SingleOutputStreamOperator[CartInfo] = socketData.map(new MapFunction[String, CartInfo]() { 
  23.       override def map(t: String): CartInfo = { 
  24.         val arr = t.split(","
  25.         CartInfo(arr(0), arr(1).toInt) 
  26.       } 
  27.     }) 
  28.     //設置會話超時時間為10s,10s內沒有數據到來,則觸發上個窗口的計算 
  29.     val result = socketMap.keyBy(0) 
  30.       .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) 
  31.       .sum("count"
  32.     result.print() 
  33.     env.execute("winds"
  34.   } 
  35.  
  36.  
  37. case class CartInfo(var sensorId: String, var countInt

 

責任編輯:武曉燕 來源: 大數據老哥
相關推薦

2024-02-27 08:05:32

Flink分區機制數據傳輸

2020-11-02 11:40:24

Node.jsRequire前端

2022-01-14 07:56:38

Checkpoint機制Flink

2024-06-21 08:32:24

2020-06-03 08:19:00

Kubernetes

2022-04-25 09:03:16

JavaScript代碼

2023-03-22 18:34:30

Flink調度部署

2024-05-11 08:31:20

中斷機制插隊機制React

2021-09-12 07:01:07

Flink SQL ETL datastream

2022-05-19 08:47:30

Flinkwatermark窗口計算

2020-10-14 09:11:44

IO 多路復用實現機

2021-12-29 17:29:07

KubernetesEvents集群

2024-05-28 08:02:08

Vue3父組件子組件

2021-09-04 07:29:57

Android

2023-04-12 08:38:44

函數參數Context

2018-07-19 10:16:25

華光昱能

2024-05-17 10:05:06

Java機制應用

2024-04-09 07:50:59

Flink語義Watermark

2018-11-30 09:03:55

HTTP緩存Web

2022-08-16 09:03:01

JavaScript前端
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 狠狠综合久久av一区二区小说 | 一区精品在线观看 | 欧美亚洲视频 | 欧美久久久网站 | 视频一区二区中文字幕 | 一级黄色淫片 | 日韩欧美在线免费观看视频 | 中文在线观看视频 | 91精品国产综合久久久密闭 | 国产精品福利视频 | 国产在线第一页 | 欧美小视频在线观看 | 成人精品一区二区三区四区 | 亚洲国产视频一区 | 老牛影视av一区二区在线观看 | 国产精品美女久久久久久免费 | 亚洲国产成人精品女人久久久 | 狠狠综合网 | 91影院| 国产成人精品区一区二区不卡 | 99精品一级欧美片免费播放 | 精品国产色 | 日韩中文字幕一区二区 | 精品久久久久久久久久久 | 国产一区二区欧美 | 欧美一级黄色免费看 | 日韩av手机在线观看 | 国产成人精品一区二区三区四区 | 亚洲国产一区二区三区在线观看 | 九九热在线免费视频 | 高清免费在线 | 日本久草视频 | 黄色片在线 | 亚洲人成在线播放 | 伊人久久成人 | 亚洲小说图片 | 色播av| 国产精品视频www | 日韩一区二区三区在线 | 另类 综合 日韩 欧美 亚洲 | 日韩激情视频一区 |