Apache Flink 漫談系列 - Watermark是個啥?
原創實際問題(亂序)
在介紹Watermark相關內容之前我們先拋出一個具體的問題,在實際的流式計算中數據到來的順序對計算結果的正確性有至關重要的影響,比如:某數據源中的某些數據由于某種原因(如:網絡原因,外部存儲自身原因)會有5秒的延時,也就是在實際時間的第1秒產生的數據有可能在第5秒中產生的數據之后到來(比如到Window處理節點).選具體某個delay的元素來說,假設在一個5秒的Tumble窗口(詳見后續Window篇介紹),有一個EventTime是 11秒的數據,在第16秒時候到來了。圖示第11秒的數據,在16秒到來了,如下圖:
那么對于一個Count聚合的Tumble(5s)的window,上面的情況如何處理才能window2=4,window3=2 呢?
Apache Flink的時間類型
開篇我們描述的問題是一個很常見的TimeWindow中數據亂序的問題,亂序是相對于事件產生時間和到達Apache Flink 實際處理算子的順序而言的,關于時間在Apache Flink中有如下三種時間類型,如下圖:
- ProcessingTime 是數據流入到具體某個算子時候相應的系統時間。ProcessingTime 有最好的性能和最低的延遲。但在分布式計算環境中ProcessingTime具有不確定性,相同數據流多次運行有可能產生不同的計算結果。
- IngestionTimeIngestionTime是數據進入Apache Flink框架的時間,是在Source Operator中設置的。與ProcessingTime相比可以提供更可預測的結果,因為IngestionTime的時間戳比較穩定(在源處只記錄一次),同一數據在流經不同窗口操作時將使用相同的時間戳,而對于ProcessingTime同一數據在流經不同窗口算子會有不同的處理時間戳。
- EventTimeEventTime是事件在設備上產生時候攜帶的。在進入Apache Flink框架之前EventTime通常要嵌入到記錄中,并且EventTime也可以從記錄中提取出來。在實際的網上購物訂單等業務場景中,大多會使用EventTime來進行數據計算。
開篇描述的問題和本篇要介紹的Watermark所涉及的時間類型均是指EventTime類型。
什么是Watermark
Watermark是Apache Flink為了處理EventTime 窗口計算提出的一種機制,本質上也是一種時間戳,由Apache Flink Source或者自定義的Watermark生成器按照需求Punctuated或者Periodic兩種方式生成的一種系統Event,與普通數據流Event一樣流轉到對應的下游算子,接收到Watermark Event的算子以此不斷調整自己管理的EventTime clock。Apache Flink 框架保證Watermark單調遞增,算子接收到一個Watermark時候,框架知道不會再有任何小于該Watermark的時間戳的數據元素到來了,所以Watermark可以看做是告訴Apache Flink框架數據流已經處理到什么位置(時間維度)的方式。Watermark的產生和Apache Flink內部處理邏輯如下圖所示:
Watermark的產生方式
目前Apache Flink 有兩種生產Watermark的方式,如下:
- Punctuated數據流中每一個遞增的EventTime都會產生一個Watermark。 在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。
- Periodic周期性的(一定時間間隔或者達到一定的記錄條數)產生一個Watermark。在實際的生產中Periodic的方式必須結合時間和積累條數兩個維度繼續周期性產生Watermark,否則在極端情況下會有很大的延時。
所以Watermark的生成方式需要根據業務場景的不同進行不同的選擇。
Watermark的接口定義
對應Apache Flink Watermark兩種不同的生成方式,我們了解一下對應的接口定義,如下:
- Periodic Watermarks - AssignerWithPeriodicWatermarks
/**
* Returns the current watermark. This method is periodically called by the
* system to retrieve the current watermark. The method may return {@code null} to
* indicate that no new Watermark is available.
*
* <p>The returned watermark will be emitted only if it is non-null and itsTimestamp
* is larger than that of the previously emitted watermark (to preserve the contract of
* ascending watermarks). If the current watermark is still
* identical to the previous one, no progress in EventTime has happened since
* the previous call to this method. If a null value is returned, or theTimestamp
* of the returned watermark is smaller than that of the last emitted one, then no
* new watermark will be generated.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*
* @see org.Apache.flink.streaming.api.watermark.Watermark
* @see ExecutionConfig#getAutoWatermarkInterval()
*
* @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
*/
@Nullable
Watermark getCurrentWatermark();
- Punctuated Watermarks - AssignerWithPunctuatedWatermarks
public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
/**
* Asks this implementation if it wants to emit a watermark. This method is called right after
* the {@link #extractTimestamp(Object, long)} method.
*
* <p>The returned watermark will be emitted only if it is non-null and its timestamp
* is larger than that of the previously emitted watermark (to preserve the contract of
* ascending watermarks). If a null value is returned, or the timestamp of the returned
* watermark is smaller than that of the last emitted one, then no new watermark will
* be generated.
*
* <p>For an example how to use this method, see the documentation of
* {@link AssignerWithPunctuatedWatermarks this class}.
*
* @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
*/
@Nullable
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}
- AssignerWithPunctuatedWatermarks 繼承了TimestampAssigner接口 -TimestampAssigner
public interface TimestampAssigner<T> extends Function {
/**
* Assigns a timestamp to an element, in milliseconds since the Epoch.
*
* <p>The method is passed the previously assigned timestamp of the element.
* That previous timestamp may have been assigned from a previous assigner,
* by ingestion time. If the element did not carry a timestamp before, this value is
* {@code Long.MIN_VALUE}.
*
* @param element The element that the timestamp will be assigned to.
* @param previousElementTimestamp The previous internal timestamp of the element,
* or a negative value, if no timestamp has been assigned yet.
* @return The new timestamp.
*/
long extractTimestamp(T element, long previousElementTimestamp);
}
從接口定義可以看出,Watermark可以在Event(Element)中提取EventTime,進而定義一定的計算邏輯產生Watermark的時間戳。
Watermark解決如上問題
從上面的Watermark生成接口和Apache Flink內部對Periodic Watermark的實現來看,Watermark的時間戳可以和Event中的EventTime 一致,也可以自己定義任何合理的邏輯使得Watermark的時間戳不等于Event中的EventTime,Event中的EventTime自產生那一刻起就不可以改變了,不受Apache Flink框架控制,而Watermark的產生是在Apache Flink的Source節點或實現的Watermark生成器計算產生(如上Apache Flink內置的 Periodic Watermark實現), Apache Flink內部對單流或多流的場景有統一的Watermark處理。
回過頭來我們在看看Watermark機制如何解決上面的問題,上面的問題在于如何將遲來的EventTime 位11的元素正確處理。要解決這個問題我們還需要先了解一下EventTime window是如何觸發的?EventTime window 計算條件是當Window計算的Timer時間戳 小于等于 當前系統的Watermak的時間戳時候進行計算。
- 當Watermark的時間戳等于Event中攜帶的EventTime時候,上面場景(Watermark=EventTime)的計算結果如下:
上面對應的DDL定義如下:
create table t1(
ts timestamp(3),
other bigint,
WATERMARK FOR ts AS ts
) with (
'connector' = 'xx'
)
- 如果想正確處理遲來的數據可以定義Watermark生成策略為 Watermark = EventTime -5s, 如下:
上面對應的DDL定義如下:
create table t1(
ts timestamp(3),
other bigint,
WATERMARK FOR ts AS ts - interval '5' SECOND
) with (
'connector' = 'xx'
)
上面正確處理的根源是我們采取了 延遲觸發 window 計算 的方式正確處理了 Late Event. 與此同時,我們發現window的延時觸發計算,也導致了下游的LATENCY變大,本例子中下游得到window的結果就延遲了5s.
多流的Watermark處理
在實際的流計算中往往一個job中會處理多個Source的數據,對Source的數據進行GroupBy分組,那么來自不同Source的相同key值會shuffle到同一個處理節點,并攜帶各自的Watermark,Apache Flink內部要保證Watermark要保持單調遞增,多個Source的Watermark匯聚到一起時候可能不是單調自增的,這樣的情況Apache Flink內部是如何處理的呢?如下圖所示:
Apache Flink內部實現每一個邊上只能有一個遞增的Watermark, 當出現多流攜帶Eventtime匯聚到一起(Join or Union)時候,Apache Flink會選擇所有流入的Eventtime中最小min(stream1, stream2...streamN)的一個向下游流出。從而保證watermark的單調遞增和保證數據的完整性.如下圖:
小結
本節以一個流計算常見的亂序問題介紹了Apache Flink如何利用Watermark機制來處理亂序問題. 本篇內容在一定程度上也體現了EventTime Window中的Trigger機制依賴了Watermark(后續Window篇章會介紹)。Watermark機制是流計算中處理亂序,正確處理Late Event的核心手段。更多細節歡迎關注《Apache Flink 知其然,知其所以然》系列視頻課程!
作者介紹
孫金城,51CTO社區編輯,Apache Flink PMC 成員,Apache Beam Committer,Apache IoTDB PMC 成員,ALC Beijing 成員,Apache ShenYu 導師,Apache 軟件基金會成員。關注技術領域流計算和時序數據存儲。