聊聊Flink:這次把Flink的觸發(fā)器(Trigger)、移除器(Evictor)講透
一、觸發(fā)器(Trigger)
Trigger 決定了一個窗口(由 window assigner 定義)何時可以被 window function 處理。每個 WindowAssigner 都有一個默認的 Trigger。如果默認 trigger 無法滿足你的需要,你可以在 trigger(…) 調用中指定自定義的 trigger。
1.1 Flink中預置的Trigger
窗口的計算觸發(fā)依賴于窗口觸發(fā)器,每種類型的窗口都有對應的窗口觸發(fā)機制,都有一個默認的窗口觸發(fā)器,觸發(fā)器的作用就是去控制什么時候來觸發(fā)計算。flink內部定義多種觸發(fā)器,每種觸發(fā)器對應于不同的WindowAssigner。常見的觸發(fā)器如下:
- EventTimeTrigger:通過對比EventTime和窗口的Endtime確定是否觸發(fā)窗口計算,如果EventTime大于Window EndTime則觸發(fā),否則不觸發(fā),窗口將繼續(xù)等待。
- ProcessTimeTrigger:通過對比ProcessTime和窗口EndTme確定是否觸發(fā)窗口,如果ProcessTime大于EndTime則觸發(fā)計算,否則窗口繼續(xù)等待。
- ProcessingTimeoutTrigger:可以將任何觸發(fā)器轉變?yōu)槌瑫r觸發(fā)器。
- ContinuousEventTimeTrigger:根據間隔時間周期性觸發(fā)窗口或者Window的結束時間小于當前EndTime觸發(fā)窗口計算。
- ContinuousProcessingTimeTrigger:根據間隔時間周期性觸發(fā)窗口或者Window的結束時間小于當前ProcessTime觸發(fā)窗口計算。
- CountTrigger:根據接入數據量是否超過設定的闕值判斷是否觸發(fā)窗口計算。
- DeltaTrigger:根據接入數據計算出來的Delta指標是否超過指定的Threshold去判斷是否觸發(fā)窗口計算。
- PurgingTrigger:可以將任意觸發(fā)器作為參數轉換為Purge類型的觸發(fā)器,計算完成后數據將被清理。
- NeverTrigger:任何時候都不觸發(fā)窗口計算
1.2 Trigger的抽象類
Trigger 接口提供了五個方法來響應不同的事件:
- onElement() 方法在每個元素被加入窗口時調用。
- onEventTime() 方法在注冊的 event-time timer 觸發(fā)時調用。
- onProcessingTime() 方法在注冊的 processing-time timer 觸發(fā)時調用。
- canMerge() 方法判斷是否可以合并。
- onMerge() 方法與有狀態(tài)的 trigger 相關。該方法會在兩個窗口合并時, 將窗口對應 trigger 的狀態(tài)進行合并,比如使用會話窗口時。
- clear() 方法處理在對應窗口被移除時所需的邏輯。
觸發(fā)器接口的源碼如下:
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {
private static final long serialVersionUID = -4104633972991191369L;
/**
* Called for every element that gets added to a pane. The result of this will determine whether
* the pane is evaluated to emit results.
*
* @param element The element that arrived.
* @param timestamp The timestamp of the element that arrived.
* @param window The window to which the element is being added.
* @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
throws Exception;
/**
* Called when a processing-time timer that was set using the trigger context fires.
*
* @param time The timestamp at which the timer fired.
* @param window The window for which the timer fired.
* @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception;
/**
* Called when an event-time timer that was set using the trigger context fires.
*
* @param time The timestamp at which the timer fired.
* @param window The window for which the timer fired.
* @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
throws Exception;
/**
* Returns true if this trigger supports merging of trigger state and can therefore be used with
* a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
*
* <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,
* OnMergeContext)}
*/
public boolean canMerge() {
return false;
}
/**
* Called when several windows have been merged into one window by the {@link
* org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
*
* @param window The new window that results from the merge.
* @param ctx A context object that can be used to register timer callbacks and access state.
*/
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
/**
* Clears any state that the trigger might still hold for the given window. This is called when
* a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and
* {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as
* state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
*/
public abstract void clear(W window, TriggerContext ctx) throws Exception;
// ------------------------------------------------------------------------
/**
* A context object that is given to {@link Trigger} methods to allow them to register timer
* callbacks and deal with state.
*/
public interface TriggerContext {
// ...
}
/**
* Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window,
* OnMergeContext)}.
*/
public interface OnMergeContext extends TriggerContext {
<S extends MergingState<?, ?>> void mergePartitionedState(
StateDescriptor<S, ?> stateDescriptor);
}
}
關于上述方法,需要注意三件事:
(1)前三個方法返回TriggerResult枚舉類型,其包含四個枚舉值:
- CONTINUE:表示對窗口不執(zhí)行任何操作。即不觸發(fā)窗口計算,也不刪除元素。
- FIRE:觸發(fā)窗口計算,但是保留窗口元素。
- PURGE:不觸發(fā)窗口計算,丟棄窗口,并且刪除窗口的元素。
- FIRE_AND_PURGE:觸發(fā)窗口計算,輸出結果,然后將窗口中的數據和窗口進行清除。
源碼如下:
public enum TriggerResult {
// 不觸發(fā),也不刪除元素
CONTINUE(false, false),
// 觸發(fā)窗口,窗口出發(fā)后刪除窗口中的元素
FIRE_AND_PURGE(true, true),
// 觸發(fā)窗口,但是保留窗口元素
FIRE(true, false),
// 不觸發(fā)窗口,丟棄窗口,并且刪除窗口的元素
PURGE(false, true);
// ------------------------------------------------------------------------
private final boolean fire;
private final boolean purge;
TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
}
public boolean isFire() {
return fire;
}
public boolean isPurge() {
return purge;
}
}
(2) 每一個窗口分配器都擁有一個屬于自己的 Trigger,Trigger上會有定時器,用來決定一個窗口何時能夠被計算或清除,當定時器觸發(fā)后,會調用對應的回調返回,返回TriggerResult。Trigger的返回結果可以是 continue(不做任何操作),fire(處理窗口數據),purge(移除窗口和窗口中的數據),或者 fire + purge。一個Trigger的調用結果只是fire的話,那么會計算窗口并保留窗口原樣,也就是說窗口中的數據仍然保留不變,等待下次Trigger fire的時候再次執(zhí)行計算。一個窗口可以被重復計算多次知道它被 purge 了。在purge之前,窗口會一直占用著內存。
1.3 ProcessingTimeTrigger源碼分析
@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private ProcessingTimeTrigger() {}
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
// only register a timer if the time is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the time is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "ProcessingTimeTrigger()";
}
/** Creates a new trigger that fires once system time passes the end of the window. */
public static ProcessingTimeTrigger create() {
return new ProcessingTimeTrigger();
}
}
在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會注冊一個ProcessingTime定時器,時間參數是window.maxTimestamp(),也就是窗口的最終時間,當時間到達這個窗口最終時間,定時器觸發(fā)并調用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發(fā)窗口中數據的計算,但是會保留窗口元素。
需要注意的是ProcessingTimeTrigger類只會在窗口的最終時間到達的時候觸發(fā)窗口函數的計算,計算完成后并不會清除窗口中的數據,這些數據存儲在內存中,除非調用PURGE或FIRE_AND_PURGE,否則數據將一直存在內存中。實際上,Flink中提供的Trigger類,除了PurgingTrigger類,其他的都不會對窗口中的數據進行清除。
EventTimeTriggerr在onElement設置的定時器:
圖片
EventTime通過registerEventTimeTimer注冊定時器,在內部Watermark達到或超過Timer設定的時間戳時觸發(fā)。
二、移除器(Evictor)
2.1 Evictor扮演的角色
圖片
當一個元素進入stream中之后,一般要經歷Window(開窗)、Trigger(觸發(fā)器)、Evitor(移除器)、Windowfunction(窗口計算操作),具體過程如下:
- Window中的WindowAssigner(窗口分配器)定義了數據應該被分配到哪個窗口中,每一個 WindowAssigner都會有一個默認的Trigger,如果用戶在代碼中指定了窗口的trigger,默認的 trigger 將會被覆蓋。
- Trigger上會有定時器,用來決定一個窗口何時能夠被計算或清除。Trigger的返回結果可以是 continue(不做任何操作),fire(處理窗口數據),purge(移除窗口和窗口中的數據),或者 fire + purge。一個Trigger的調用結果只是fire的話,那么會計算窗口并保留窗口原樣,也就是說窗口中的數據仍然保留不變,等待下次Trigger fire的時候再次執(zhí)行計算。一個窗口可以被重復計算多次知道它被 purge 了。在purge之前,窗口會一直占用著內存。
- 當Trigger fire了,窗口中的元素集合就會交給Evictor(如果指定了的話)。Evictor 主要用來遍歷窗口中的元素列表,并決定最先進入窗口的多少個元素需要被移除。剩余的元素會交給用戶指定的函數進行窗口的計算。如果沒有 Evictor 的話,窗口中的所有元素會一起交給WindowFunction進行計算。
- WindowFunction收到了窗口的元素(可能經過了 Evictor 的過濾),并計算出窗口的結果值,并發(fā)送給下游。窗口計算操作有很多,比如預定義的sum(),min(),max(),還有 ReduceFunction,WindowFunction。WindowFunction 是最通用的計算函數,其他的預定義的函數基本都是基于該函數實現的。
現在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪個位置,讓我們繼續(xù)看為何使用Evictor。
Evictor接口定義如下:
圖片
evictBefore()包含要在窗口函數之前應用的清除邏輯,而evictAfter()包含要在窗口函數之后應用的清除邏輯。應用窗口函數之前清除的元素將不會被窗口函數處理。
窗格是具有相同Key和相同窗口的元素組成的桶,即同一個窗口中相同Key的元素一定屬于同一個窗格。一個元素可以在多個窗格中(當一個元素被分配給多個窗口時),這些窗格都有自己的清除器實例。
注:window默認沒有evictor,一旦把window指定Evictor,該window會由EvictWindowOperator類來負責操作。
2.2 Flink內置的Evitor
- CountEvictor:保留窗口中用戶指定的元素數量,并丟棄窗口緩沖區(qū)剩余的元素。
- DeltaEvictor:依次計算窗口緩沖區(qū)中的最后一個元素與其余每個元素之間的delta值,若delta值大于等于指定的閾值,則該元素會被移除。使用DeltaEvictor清除器需要指定兩個參數,一個是double類型的閾值;另一個是DeltaFunction接口的實例,DeltaFunction用于指定具體的delta值計算邏輯。
- TimeEvictor:傳入一個以毫秒為單位的時間間隔參數(例如以size表示),對于給定的窗口,取窗口中元素的最大時間戳(例如以max表示),使用TimeEvictor清除器將刪除所有時間戳小于或等于max-size的元素(即清除從窗口開頭到指定的截止時間之間的元素)。
2.2.1 CountEvictor
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (size <= maxCount) {
// 小于最大數量,不做處理
return;
} else {
int evictedCount = 0;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
iterator.next();
evictedCount++;
if (evictedCount > size - maxCount) {
break;
} else {
// 移除前size - maxCount個元素,只剩下最后maxCount個元素
iterator.remove();
}
}
}
}
2.2.2 DeltaEvictor
DeltaEvictor通過計算DeltaFunction的值(依次傳入每個元素和最后一個元素),并將其與threshold進行對比,如果DeltaFunction計算結果大于等于threshold,則該元素會被移除。DeltaEvictor的實現如下:
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
// 獲取最后一個元素
TimestampedValue<T> lastElement = Iterables.getLast(elements);
for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
TimestampedValue<T> element = iterator.next();
// 依次計算每個元素和最后一個元素的delta值,同時和threshold的值進行比較
// 若計算結果大于threshold值或者是相等,則該元素會被移除
if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
iterator.remove();
}
}
}
2.2.3 TimeEvictor
TimeEvictor以時間為判斷標準,決定元素是否會被移除。TimeEvictor會獲取窗口中所有元素的最大時間戳currentTime,currentTime減去窗口大小(windowSize) 可得到能保留最久的元素的時間戳evictCutoff,然后再遍歷窗口中的元素,如果元素的時間戳小于evictCutoff,就執(zhí)行移除操作,否則不移除。具體邏輯如下圖所示:
TimeEvictor的代碼實現如下:
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
// 如果element沒有timestamp,直接返回
if (!hasTimestamp(elements)) {
return;
}
// 獲取elements中最大的時間戳(到來最晚的元素的時間)
long currentTime = getMaxTimestamp(elements);
// 截止時間為: 到來最晚的元素的時間 - 窗口大小(可以理解為保留最近的多久的元素)
long evictCutoff = currentTime - windowSize;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
// 清除所有時間戳小于截止時間的元素
if (record.getTimestamp() <= evictCutoff) {
iterator.remove();
}
}
}