聊聊Flink:Flink中的時(shí)間語(yǔ)義和Watermark詳解
該篇主要講Flink中的時(shí)間語(yǔ)義、Flink 水印機(jī)制以及Flink對(duì)亂序數(shù)據(jù)的三重保障。
一、Flink的三種時(shí)間語(yǔ)義
圖片
1.1 Event Time
Event Time指的是數(shù)據(jù)流中每個(gè)元素或者每個(gè)事件自帶的時(shí)間屬性,一般是事件發(fā)生的時(shí)間。由于事件從發(fā)生到進(jìn)入Flink時(shí)間算子之間有很多環(huán)節(jié),一個(gè)較早發(fā)生的事件因?yàn)檠舆t可能較晚到達(dá),因此使用Event Time意味著事件到達(dá)有可能是亂序的。
使用Event Time時(shí),最理想的情況下,我們可以一直等待所有的事件到達(dá)后再進(jìn)行時(shí)間窗口的處理。假設(shè)一個(gè)時(shí)間窗口內(nèi)的所有數(shù)據(jù)都已經(jīng)到達(dá),基于Event Time的流處理會(huì)得到正確且一致的結(jié)果:無論我們是將同一個(gè)程序部署在不同的計(jì)算環(huán)境還是在相同的環(huán)境下多次計(jì)算同一份數(shù)據(jù),都能夠得到同樣的計(jì)算結(jié)果。我們根本不同擔(dān)心亂序到達(dá)的問題。但這只是理想情況,現(xiàn)實(shí)中無法實(shí)現(xiàn),因?yàn)槲覀兗炔恢谰烤挂榷嚅L(zhǎng)時(shí)間才能確認(rèn)所有事件都已經(jīng)到達(dá),更不可能無限地一直等待下去。在實(shí)際應(yīng)用中,當(dāng)涉及到對(duì)事件按照時(shí)間窗口進(jìn)行統(tǒng)計(jì)時(shí),F(xiàn)link會(huì)將窗口內(nèi)的事件緩存下來,直到接收到一個(gè)Watermark,以確認(rèn)不會(huì)有更晚數(shù)據(jù)的到達(dá)。Watermark意味著在一個(gè)時(shí)間窗口下,F(xiàn)link會(huì)等待一個(gè)有限的時(shí)間,這在一定程度上降低了計(jì)算結(jié)果的絕對(duì)準(zhǔn)確性,而且增加了系統(tǒng)的延遲。在流處理領(lǐng)域,比起其他幾種時(shí)間語(yǔ)義,使用Event Time的好處是某個(gè)事件的時(shí)間是確定的,這樣能夠保證計(jì)算結(jié)果在一定程度上的可預(yù)測(cè)性。
一個(gè)基于Event Time的Flink程序中必須定義Event Time,以及如何生成Watermark。我們可以使用元素中自帶的時(shí)間,也可以在元素到達(dá)Flink后人為給Event Time賦值。
使用Event Time的優(yōu)勢(shì)是結(jié)果的可預(yù)測(cè)性,缺點(diǎn)是緩存較大,增加了延遲,且調(diào)試和定位問題更復(fù)雜。
1.2 Processing Time
對(duì)于某個(gè)算子來說,Processing Time指算子使用當(dāng)前機(jī)器的系統(tǒng)時(shí)鐘來定義時(shí)間。在Processing Time的時(shí)間窗口場(chǎng)景下,無論事件什么時(shí)候發(fā)生,只要該事件在某個(gè)時(shí)間段達(dá)到了某個(gè)算子,就會(huì)被歸結(jié)到該窗口下,不需要Watermark機(jī)制。對(duì)于一個(gè)程序在同一個(gè)計(jì)算環(huán)境來說,每個(gè)算子都有一定的耗時(shí),同一個(gè)事件的Processing Time,第n個(gè)算子和第n+1個(gè)算子不同。如果一個(gè)程序在不同的集群和環(huán)境下執(zhí)行時(shí),限于軟硬件因素,不同環(huán)境下前序算子處理速度不同,對(duì)于下游算子來說,事件的Processing Time也會(huì)不同,不同環(huán)境下時(shí)間窗口的計(jì)算結(jié)果會(huì)發(fā)生變化。因此,Processing Time在時(shí)間窗口下的計(jì)算會(huì)有不確定性。
Processing Time只依賴當(dāng)前執(zhí)行機(jī)器的系統(tǒng)時(shí)鐘,不需要依賴Watermark,無需緩存。Processing Time是實(shí)現(xiàn)起來非常簡(jiǎn)單也是延遲最小的一種時(shí)間語(yǔ)義。
1.3 Ingestion Time
Ingestion Time是事件到達(dá)Flink Souce的時(shí)間。從Source到下游各個(gè)算子中間可能有很多計(jì)算環(huán)節(jié),任何一個(gè)算子的處理速度快慢可能影響到下游算子的Processing Time。而Ingestion Time定義的是數(shù)據(jù)流最早進(jìn)入Flink的時(shí)間,因此不會(huì)被算子處理速度影響。
Ingestion Time通常是Event Time和Processing Time之間的一個(gè)折中方案。比起Event Time,Ingestion Time可以不需要設(shè)置復(fù)雜的Watermark,因此也不需要太多緩存,延遲較低。比起Processing Time,Ingestion Time的時(shí)間是Souce賦值的,一個(gè)事件在整個(gè)處理過程從頭至尾都使用這個(gè)時(shí)間,而且后續(xù)算子不受前序算子處理速度的影響,計(jì)算結(jié)果相對(duì)準(zhǔn)確一些,但計(jì)算成本稍高。
注:Ingestion Time1.13 版本已經(jīng)不再提了,這也是為啥官網(wǎng)的圖沒看到Ingestion Time的原因。目前推薦Event Time的時(shí)間語(yǔ)義。
1.4 Flink如何設(shè)置時(shí)間域
調(diào)用 setStreamTimeCharacteristic 設(shè)置時(shí)間域,枚舉類 TimeCharacteristic 預(yù)設(shè)了三種時(shí)間域,不顯式設(shè)置的情況下,默認(rèn)使用 TimeCharacteristic.EventTime(1.12 版本以前默認(rèn)是 TimeCharacteristic.ProcessingTime)。
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //過期方法
在 1.12 以后版本默認(rèn)是使用 EventTime,如果要顯示使用 ProcessingTime,可以關(guān)閉 watermark(自動(dòng)生成 watermark 的間隔設(shè)置為 0),設(shè)置
env.getConfig().setAutoWatermarkInterval(0);
二、Flink 水印機(jī)制
我們知道,流處理從事件產(chǎn)生,到流經(jīng) source,再到 operator,中間是有一個(gè)過程和時(shí)間的,雖然大部分情況下,流到 operator 的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指 Flink 接收到的事件的先后順序不是嚴(yán)格按照事件的 Event Time 順序排列的,為了保證計(jì)算結(jié)果的正確性,需要讓窗口等待延遲數(shù)據(jù)到達(dá)后再進(jìn)行計(jì)算,但是不能無限期地等待下去,必須有一種機(jī)制來確定何時(shí)觸發(fā)窗口計(jì)算,這種機(jī)制就是水印(Watermark)。
稍稍總結(jié)一下水位線的引入原因:
- 分布式系統(tǒng)的網(wǎng)絡(luò)傳輸?shù)牟淮_定性;
- 數(shù)據(jù)是亂序的;
- 支持事件時(shí)間的流處理器需要一種測(cè)量事件時(shí)間進(jìn)度的方法,用以正確的處理窗口等操作;
水位線的物理意義有兩點(diǎn):
- 水位線本質(zhì)是一個(gè)基于數(shù)據(jù)生成的、單調(diào)遞增的時(shí)間戳;
- 水位線 W(t)表示當(dāng)前數(shù)據(jù)流中的所有 t 時(shí)刻前的數(shù)據(jù)都已經(jīng)到了。
水印是一種用于衡量事件時(shí)間進(jìn)度的機(jī)制,其表示某個(gè)時(shí)刻(事件時(shí)間)以前的數(shù)據(jù)將不再產(chǎn)生,因此水印指的是一個(gè)時(shí)間點(diǎn)。水印作為數(shù)據(jù)流的一部分流動(dòng),并帶有時(shí)間戳t。t表示該流中不應(yīng)再有時(shí)間戳小于等于t的元素(即時(shí)間戳早于或等于水印的事件)。
下圖顯示了帶有時(shí)間戳和嵌入式水印的事件流,事件是按順序排列的(相對(duì)于其時(shí)間戳),這意味著水印只是流中的周期性標(biāo)記。
圖片
水印對(duì)于亂序流至關(guān)重要,如下圖所示,其中事件不是按其時(shí)間戳排序的。通常,水印是數(shù)據(jù)流中一個(gè)點(diǎn)的聲明,表示水印之前的所有事件都應(yīng)該到達(dá)。一旦水印到達(dá)算子,算子則認(rèn)為某個(gè)時(shí)間周期,所有事件已經(jīng)被收到,不會(huì)再有更多符合條件的事件。
圖片
水印是直接通過Source Function生成的或在后續(xù)的DataStream API中生成的。在實(shí)際的流計(jì)算中,一個(gè)作業(yè)往往會(huì)同時(shí)處理多個(gè)源的數(shù)據(jù),多個(gè)源的數(shù)據(jù)按照key分組后進(jìn)行Shuffle處理,數(shù)據(jù)會(huì)匯聚到同一個(gè)處理節(jié)點(diǎn)。而每個(gè)并行子任務(wù)通常獨(dú)立生成水印,這樣就容易導(dǎo)致匯聚到一起的水印不是單調(diào)遞增的。對(duì)于這種情況,F(xiàn)link會(huì)選擇所有流入的水印中事件時(shí)間最小的一個(gè)發(fā)往下游,如下圖所示。
圖片
多個(gè)流的水印流入算子后,由于當(dāng)前算子也有自己的水印,因此算子會(huì)綜合計(jì)算得出最終水印,計(jì)算規(guī)則為:取多個(gè)流中事件時(shí)間最小的水印與當(dāng)前算子的水印進(jìn)行對(duì)比,如果大于當(dāng)前算子水印,則更新當(dāng)前算子水印,并發(fā)往下游。例如抽象類AbstractStreamOperator中的源碼如下:
圖片
三、分布式環(huán)境下Watermark的傳播
在實(shí)際計(jì)算過程中,F(xiàn)link的算子一般分布在多個(gè)并行的分區(qū)(或者稱為實(shí)例)上,F(xiàn)link需要將Watermark在并行環(huán)境下向前傳播。如下圖所示,F(xiàn)link的每個(gè)并行算子子任務(wù)會(huì)維護(hù)針對(duì)該子任務(wù)的Event Time時(shí)鐘,這個(gè)時(shí)鐘記錄了這個(gè)算子子任務(wù)Watermark處理進(jìn)度,隨著上游Watermark數(shù)據(jù)不斷向下發(fā)送,算子子任務(wù)的Event Time時(shí)鐘也要不斷向前更新。由于上游各分區(qū)的處理速度不同,到達(dá)當(dāng)前算子的Watermark也會(huì)有先后快慢之分,每個(gè)算子子任務(wù)會(huì)維護(hù)來自上游不同分區(qū)的Watermark信息,這是一個(gè)列表,列表內(nèi)對(duì)應(yīng)上游算子各分區(qū)的Watermark時(shí)間戳等信息。
圖片
當(dāng)上游某分區(qū)有Watermark進(jìn)入該算子子任務(wù)后,F(xiàn)link先判斷新流入的Watermark時(shí)間戳是否大于Partition Watermark列表內(nèi)記錄的該分區(qū)的歷史Watermark時(shí)間戳,如果新流入的更大,則更新該分區(qū)的Watermark。例如,某個(gè)分區(qū)新流入的Watermark時(shí)間戳為4,算子子任務(wù)維護(hù)的該分區(qū)Watermark為1,那么Flink會(huì)更新Partition Watermark列表為最新的時(shí)間戳4。接著,F(xiàn)link會(huì)遍歷Partition Watermark列表中的所有時(shí)間戳,選擇最小的一個(gè)作為該算子子任務(wù)的Event Time。同時(shí),F(xiàn)link會(huì)將更新的Event Time作為Watermark發(fā)送給下游所有算子子任務(wù)。算子子任務(wù)Event Time的更新意味著該子任務(wù)將時(shí)間推進(jìn)到了這個(gè)時(shí)間,該時(shí)間之前的事件已經(jīng)被處理并發(fā)送到下游。例如,圖中第二步和第三步,Partition Watermark列表更新后,導(dǎo)致列表中最小時(shí)間戳發(fā)生了變化,算子子任務(wù)的Event Time時(shí)鐘也相應(yīng)進(jìn)行了更新。整個(gè)過程完成了數(shù)據(jù)流中的Watermark推動(dòng)算子子任務(wù)Watermark的時(shí)鐘更新過程。Watermark像一個(gè)幕后推動(dòng)者,不斷將流處理系統(tǒng)的Event Time向前推進(jìn)。我們可以將這種機(jī)制總結(jié)為:
- Flink某算子子任務(wù)根據(jù)各上游流入的Watermark來更新Partition Watermark列表。
- 選取Partition Watermark列表中最小的時(shí)間作為該算子的Event Time,并將這個(gè)時(shí)間發(fā)送給下游算子。
這樣的設(shè)計(jì)機(jī)制滿足了并行環(huán)境下Watermark在各算子中的傳播問題,但是假如某個(gè)上游分區(qū)的Watermark一直不更新,Partition Watermark列表其他地方都在正常更新,唯獨(dú)個(gè)別分區(qū)的時(shí)間停留在很早的某個(gè)時(shí)間,這會(huì)導(dǎo)致算子的Event Time時(shí)鐘不更新,相應(yīng)的時(shí)間窗口計(jì)算也不會(huì)被觸發(fā),大量的數(shù)據(jù)積壓在算子內(nèi)部得不到處理,整個(gè)流處理處于空轉(zhuǎn)狀態(tài)。這種問題可能出現(xiàn)在使用數(shù)據(jù)流自帶的Watermark,自帶的Watermark在某些分區(qū)下沒有及時(shí)更新。針對(duì)這種問題,一種解決辦法是根據(jù)機(jī)器當(dāng)前的時(shí)鐘周期性地生成Watermark。
此外,在union等多數(shù)據(jù)流處理時(shí),F(xiàn)link也使用上述Watermark更新機(jī)制,那就意味著,多個(gè)數(shù)據(jù)流的時(shí)間必須對(duì)齊,如果一方的Watermark時(shí)間較老,那整個(gè)應(yīng)用的Event Time時(shí)鐘也會(huì)使用這個(gè)較老的時(shí)間,其他數(shù)據(jù)流的數(shù)據(jù)會(huì)被積壓。一旦發(fā)現(xiàn)某個(gè)數(shù)據(jù)流不再生成新的Watermark,我們要在SourceFunction中的SourceContext里調(diào)用markAsTemporarilyIdle設(shè)置該數(shù)據(jù)流為空閑狀態(tài)。
四、Flink對(duì)亂序數(shù)據(jù)的三重保障
我們思考一個(gè)問題:怎樣避免亂序數(shù)據(jù)帶來計(jì)算不正確性?
常用的解決辦法是:當(dāng)最大的事件時(shí)間maxEventTime達(dá)到了窗口關(guān)閉時(shí)間,不應(yīng)該立刻觸發(fā)窗口計(jì)算,而是等待一段時(shí)間,等遲到的數(shù)據(jù)來了再關(guān)閉窗口。
但是,我們應(yīng)該等待多久的時(shí)間呢?由于網(wǎng)絡(luò)、分布式等原因造成的延時(shí),一般大多數(shù)遲到的數(shù)據(jù)都會(huì)在最近一段時(shí)間到來,這個(gè)最近一段時(shí)間一般是毫秒級(jí)的,Watermark就是做到了這樣的保障。還有很少的一部分?jǐn)?shù)據(jù)會(huì)遲到很久,我們可以通過allowedLateness和sideOutputLateData來兜底。
處理亂序數(shù)據(jù),三重保證機(jī)制:
3.1 Watermark
能夠保證遲到很短的時(shí)間的數(shù)據(jù)到來后(一般是遲到毫秒級(jí)別內(nèi)的數(shù)據(jù),最大不超過1s),觸發(fā)窗口關(guān)閉并輸出。(即能夠hold住短時(shí)間內(nèi)遲到的數(shù)據(jù))
3.2 allowedLateness
allowedLateness(lateness: Time):設(shè)置允許的延遲時(shí)間,默認(rèn)為0,該方法僅對(duì)事件時(shí)間窗口有效。在水印通過窗口結(jié)尾后(即水印>=窗口結(jié)束時(shí)間),該方法指定的允許延遲時(shí)間才開始生效。該延遲時(shí)間與水印指定的允許延遲時(shí)間不沖突,相當(dāng)于在水印延遲時(shí)間的基礎(chǔ)上進(jìn)行累加。落入該方法指定的允許延遲時(shí)間范圍內(nèi)的元素可能會(huì)導(dǎo)致窗口再次觸發(fā)(例如EventTimeTrigger)。為了使這些元素正常被計(jì)算,F(xiàn)link會(huì)保持窗口的狀態(tài),直到允許的延遲過期為止。一旦延遲過期,F(xiàn)link將刪除該窗口并刪除其狀態(tài)。
3.3 sideOutputLateData
sideOutputLateData(outputTag: OutputTag[T]):將延遲到達(dá)的數(shù)據(jù)保存到outputTag對(duì)象中,OutputTag是一種類型化的命名標(biāo)簽,用于標(biāo)記算子的側(cè)道輸出,單獨(dú)收集延遲數(shù)據(jù)。后面可通過DataStream的getSideOutput(outputTag)方法得到被丟棄數(shù)據(jù)組成的數(shù)據(jù)流。
當(dāng)指定的允許延遲大于0時(shí),在水印通過窗口結(jié)尾后,將保留窗口及其內(nèi)容。在這種情況下,當(dāng)一個(gè)遲到但未被丟棄的元素到達(dá)時(shí),它可能會(huì)導(dǎo)致該窗口的另一次觸發(fā)。這次觸發(fā)稱為延遲觸發(fā),因?yàn)槭怯裳舆t事件觸發(fā)的,與主觸發(fā)(即窗口的第一次觸發(fā))相反。對(duì)于會(huì)話窗口,后期觸發(fā)會(huì)進(jìn)一步導(dǎo)致窗口合并,因?yàn)榭赡芸s小兩個(gè)預(yù)先存在的未合并窗口之間的間隙。當(dāng)使用全局窗口時(shí),沒有數(shù)據(jù)是延遲的,因?yàn)槿执翱诘慕Y(jié)束時(shí)間戳是Long.MAX_VALUE。
注意:
后期觸發(fā)的元素應(yīng)更新先前計(jì)算的結(jié)果,即數(shù)據(jù)流將包含同一計(jì)算的多個(gè)結(jié)果。根據(jù)你的應(yīng)用程序,需要考慮這些重復(fù)的結(jié)果或?qū)λ鼈冞M(jìn)行重復(fù)數(shù)據(jù)刪除。
在水印的基礎(chǔ)上設(shè)置允許延遲機(jī)制后,數(shù)據(jù)可以延遲的時(shí)間范圍是多少?在只設(shè)置了水印的情況下,如果滿足當(dāng)前進(jìn)入Flink的最大事件時(shí)間>=窗口結(jié)束時(shí)間+允許的最大延遲時(shí)間,則觸發(fā)窗口計(jì)算,發(fā)射計(jì)算結(jié)果并銷毀窗口。在水印的基礎(chǔ)上設(shè)置了允許延遲機(jī)制后,如果滿足當(dāng)前進(jìn)入Flink的最大事件時(shí)間>=窗口結(jié)束時(shí)間+允許的最大延遲時(shí)間(水印指定的),則觸發(fā)窗口計(jì)算,發(fā)射計(jì)算結(jié)果,但不會(huì)銷毀窗口,窗口會(huì)保留計(jì)算狀態(tài)并繼續(xù)等待延遲數(shù)據(jù);每條延遲數(shù)據(jù)到達(dá)后,如果落入窗口內(nèi),都會(huì)再次觸發(fā)窗口計(jì)算,更新計(jì)算狀態(tài),發(fā)射出最新計(jì)算結(jié)果,直到滿足條件:當(dāng)前進(jìn)入Flink的最大事件時(shí)間>=窗口結(jié)束時(shí)間+允許的最大延遲時(shí)間(水印指定的)+允許延遲機(jī)制指定的延遲時(shí)間,則關(guān)閉并銷毀窗口。此后到達(dá)的延遲數(shù)據(jù),由于窗口已經(jīng)關(guān)閉,數(shù)據(jù)將進(jìn)入側(cè)道輸出流進(jìn)行單獨(dú)存放,后期根據(jù)業(yè)務(wù)單獨(dú)處理即可。
指定允許延遲時(shí)間可以使用如下代碼片段:
圖片
使用Flink的側(cè)道輸出機(jī)制可以獲得一個(gè)后來被丟棄的數(shù)據(jù)組成的數(shù)據(jù)流。使用時(shí)首先需要使用sideOutputLateData(OutputTag)方法指定要在窗口化流上獲取后期數(shù)據(jù)。然后可以使用getSideOutput(lateOutputTag)方法得到后期數(shù)據(jù)組成的數(shù)據(jù)流,代碼如下:
圖片
為了更好地理解允許延遲和側(cè)道輸出機(jī)制,假設(shè)有亂序數(shù)據(jù)按照ABCDEFG的順序依次到達(dá)Flink應(yīng)用程序,并且設(shè)置了水印允許的最大延遲時(shí)間為3分鐘,在水印的基礎(chǔ)上又通過allowedLateness(Time.minutes(3))方法設(shè)置了允許的延遲時(shí)間為3分鐘,使用sideOutputLateData(lateOutputTag)方法設(shè)置側(cè)道輸出,如下圖所示。
圖片
當(dāng)數(shù)據(jù)A到達(dá)時(shí),由于窗口開始時(shí)間<=數(shù)據(jù)A的事件時(shí)間<窗口結(jié)束時(shí)間,因此數(shù)據(jù)A落入窗口內(nèi)。
當(dāng)數(shù)據(jù)B到達(dá)時(shí),由于其事件時(shí)間>=窗口結(jié)束時(shí)間,因此數(shù)據(jù)B不屬于該窗口。此時(shí)Watermark=進(jìn)入Flink的當(dāng)前最大事件時(shí)間?允許的最大延遲時(shí)間=9:11?3分鐘=9:08。水印在窗口內(nèi),不會(huì)觸發(fā)窗口計(jì)算。
當(dāng)數(shù)據(jù)C到達(dá)時(shí),由于窗口開始時(shí)間<=數(shù)據(jù)C的事件時(shí)間<窗口結(jié)束時(shí)間,因此數(shù)據(jù)C落入窗口內(nèi)。
當(dāng)數(shù)據(jù)D到達(dá)時(shí),由于其事件時(shí)間>=窗口結(jié)束時(shí)間,因此數(shù)據(jù)D不屬于該窗口。此時(shí)Watermark=進(jìn)入Flink的當(dāng)前最大事件時(shí)間?允許的最大延遲時(shí)間=9:15?3(分鐘)=9:12>=窗口結(jié)束時(shí)間。水印在窗口外,觸發(fā)窗口計(jì)算并發(fā)射計(jì)算結(jié)果。由于設(shè)置了允許延遲機(jī)制的延遲時(shí)間為3分鐘,此時(shí)的窗口結(jié)束時(shí)間+允許的最大延遲時(shí)間(水印指定的)+允許延遲機(jī)制指定的延遲時(shí)間=9:10+3(分鐘)+3(分鐘)=9:16>9:15(進(jìn)入Flink的當(dāng)前最大事件時(shí)間),不滿足窗口關(guān)閉的條件,因此窗口會(huì)繼續(xù)等待延遲數(shù)據(jù),并保留計(jì)算狀態(tài)(此處的計(jì)算狀態(tài)指的就是計(jì)算結(jié)果,例如窗口內(nèi)數(shù)據(jù)的聚合結(jié)果)。
當(dāng)數(shù)據(jù)E到達(dá)時(shí),由于進(jìn)入Flink的當(dāng)前最大事件時(shí)間沒有改變,窗口不會(huì)關(guān)閉,而是繼續(xù)等待。窗口開始時(shí)間<=數(shù)據(jù)E的事件時(shí)間<窗口結(jié)束時(shí)間,因此數(shù)據(jù)E落入窗口內(nèi),并觸發(fā)窗口計(jì)算,與上次計(jì)算的結(jié)果進(jìn)行合并,發(fā)射出新的計(jì)算結(jié)果,如下圖所示。
圖片
當(dāng)數(shù)據(jù)F到達(dá)時(shí),此時(shí)的窗口結(jié)束時(shí)間+允許的最大延遲時(shí)間(水印指定的)+允許延遲機(jī)制指定的延遲時(shí)間=9:10+3(分鐘)+3(分鐘)=9:16<=9:16(進(jìn)入Flink的當(dāng)前最大事件時(shí)間),滿足窗口關(guān)閉的條件,因此窗口會(huì)關(guān)閉并銷毀。
當(dāng)數(shù)據(jù)G到達(dá)時(shí),窗口開始時(shí)間<=數(shù)據(jù)G的事件時(shí)間<窗口結(jié)束時(shí)間,但是窗口已經(jīng)關(guān)閉了,因此數(shù)據(jù)G將進(jìn)入側(cè)道輸出流進(jìn)行單獨(dú)存放。通過側(cè)道輸出API可從側(cè)道輸出流中取出延遲嚴(yán)重的數(shù)據(jù)進(jìn)行相應(yīng)的業(yè)務(wù)處理。