成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

聊聊Flink:這次把Flink的觸發(fā)器(Trigger)、移除器(Evictor)講透

開發(fā) 前端
窗口的計算觸發(fā)依賴于窗口觸發(fā)器,每種類型的窗口都有對應的窗口觸發(fā)機制,都有一個默認的窗口觸發(fā)器,觸發(fā)器的作用就是去控制什么時候來觸發(fā)計算。flink內部定義多種觸發(fā)器,每種觸發(fā)器對應于不同的WindowAssigner。

一、觸發(fā)器(Trigger)

Trigger 決定了一個窗口(由 window assigner 定義)何時可以被 window function 處理。每個 WindowAssigner 都有一個默認的 Trigger。如果默認 trigger 無法滿足你的需要,你可以在 trigger(…) 調用中指定自定義的 trigger。

1.1 Flink中預置的Trigger

窗口的計算觸發(fā)依賴于窗口觸發(fā)器,每種類型的窗口都有對應的窗口觸發(fā)機制,都有一個默認的窗口觸發(fā)器,觸發(fā)器的作用就是去控制什么時候來觸發(fā)計算。flink內部定義多種觸發(fā)器,每種觸發(fā)器對應于不同的WindowAssigner。常見的觸發(fā)器如下:

  • EventTimeTrigger:通過對比EventTime和窗口的Endtime確定是否觸發(fā)窗口計算,如果EventTime大于Window EndTime則觸發(fā),否則不觸發(fā),窗口將繼續(xù)等待。
  • ProcessTimeTrigger:通過對比ProcessTime和窗口EndTme確定是否觸發(fā)窗口,如果ProcessTime大于EndTime則觸發(fā)計算,否則窗口繼續(xù)等待。
  • ProcessingTimeoutTrigger:可以將任何觸發(fā)器轉變?yōu)槌瑫r觸發(fā)器。
  • ContinuousEventTimeTrigger:根據間隔時間周期性觸發(fā)窗口或者Window的結束時間小于當前EndTime觸發(fā)窗口計算。
  • ContinuousProcessingTimeTrigger:根據間隔時間周期性觸發(fā)窗口或者Window的結束時間小于當前ProcessTime觸發(fā)窗口計算。
  • CountTrigger:根據接入數據量是否超過設定的闕值判斷是否觸發(fā)窗口計算。
  • DeltaTrigger:根據接入數據計算出來的Delta指標是否超過指定的Threshold去判斷是否觸發(fā)窗口計算。
  • PurgingTrigger:可以將任意觸發(fā)器作為參數轉換為Purge類型的觸發(fā)器,計算完成后數據將被清理。
  • NeverTrigger:任何時候都不觸發(fā)窗口計算

1.2 Trigger的抽象類

Trigger 接口提供了五個方法來響應不同的事件:

  • onElement() 方法在每個元素被加入窗口時調用。
  • onEventTime() 方法在注冊的 event-time timer 觸發(fā)時調用。
  • onProcessingTime() 方法在注冊的 processing-time timer 觸發(fā)時調用。
  • canMerge() 方法判斷是否可以合并。
  • onMerge() 方法與有狀態(tài)的 trigger 相關。該方法會在兩個窗口合并時, 將窗口對應 trigger 的狀態(tài)進行合并,比如使用會話窗口時。
  • clear() 方法處理在對應窗口被移除時所需的邏輯。

觸發(fā)器接口的源碼如下:

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

    private static final long serialVersionUID = -4104633972991191369L;

    /**
     * Called for every element that gets added to a pane. The result of this will determine whether
     * the pane is evaluated to emit results.
     *
     * @param element The element that arrived.
     * @param timestamp The timestamp of the element that arrived.
     * @param window The window to which the element is being added.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Called when a processing-time timer that was set using the trigger context fires.
     *
     * @param time The timestamp at which the timer fired.
     * @param window The window for which the timer fired.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Called when an event-time timer that was set using the trigger context fires.
     *
     * @param time The timestamp at which the timer fired.
     * @param window The window for which the timer fired.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Returns true if this trigger supports merging of trigger state and can therefore be used with
     * a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
     *
     * <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,
     * OnMergeContext)}
     */
    public boolean canMerge() {
        return false;
    }

    /**
     * Called when several windows have been merged into one window by the {@link
     * org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
     *
     * @param window The new window that results from the merge.
     * @param ctx A context object that can be used to register timer callbacks and access state.
     */
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }

    /**
     * Clears any state that the trigger might still hold for the given window. This is called when
     * a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and
     * {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as
     * state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
     */
    public abstract void clear(W window, TriggerContext ctx) throws Exception;

    // ------------------------------------------------------------------------

    /**
     * A context object that is given to {@link Trigger} methods to allow them to register timer
     * callbacks and deal with state.
     */
    public interface TriggerContext {
        // ...
    }

    /**
     * Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window,
     * OnMergeContext)}.
     */
    public interface OnMergeContext extends TriggerContext {
        <S extends MergingState<?, ?>> void mergePartitionedState(
                StateDescriptor<S, ?> stateDescriptor);
    }
}

關于上述方法,需要注意三件事:

(1)前三個方法返回TriggerResult枚舉類型,其包含四個枚舉值:

  • CONTINUE:表示對窗口不執(zhí)行任何操作。即不觸發(fā)窗口計算,也不刪除元素。
  • FIRE:觸發(fā)窗口計算,但是保留窗口元素。
  • PURGE:不觸發(fā)窗口計算,丟棄窗口,并且刪除窗口的元素。
  • FIRE_AND_PURGE:觸發(fā)窗口計算,輸出結果,然后將窗口中的數據和窗口進行清除。

源碼如下:

public enum TriggerResult {

    // 不觸發(fā),也不刪除元素
    CONTINUE(false, false),

    // 觸發(fā)窗口,窗口出發(fā)后刪除窗口中的元素
    FIRE_AND_PURGE(true, true),

    // 觸發(fā)窗口,但是保留窗口元素
    FIRE(true, false),

    // 不觸發(fā)窗口,丟棄窗口,并且刪除窗口的元素
    PURGE(false, true);

    // ------------------------------------------------------------------------

    private final boolean fire;
    private final boolean purge;

    TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return fire;
    }

    public boolean isPurge() {
        return purge;
    }
}

(2) 每一個窗口分配器都擁有一個屬于自己的 Trigger,Trigger上會有定時器,用來決定一個窗口何時能夠被計算或清除,當定時器觸發(fā)后,會調用對應的回調返回,返回TriggerResult。Trigger的返回結果可以是 continue(不做任何操作),fire(處理窗口數據),purge(移除窗口和窗口中的數據),或者 fire + purge。一個Trigger的調用結果只是fire的話,那么會計算窗口并保留窗口原樣,也就是說窗口中的數據仍然保留不變,等待下次Trigger fire的時候再次執(zhí)行計算。一個窗口可以被重復計算多次知道它被 purge 了。在purge之前,窗口會一直占用著內存。

1.3 ProcessingTimeTrigger源碼分析

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {}

    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        // only register a timer if the time is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the time is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    /** Creates a new trigger that fires once system time passes the end of the window. */
    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會注冊一個ProcessingTime定時器,時間參數是window.maxTimestamp(),也就是窗口的最終時間,當時間到達這個窗口最終時間,定時器觸發(fā)并調用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發(fā)窗口中數據的計算,但是會保留窗口元素。

需要注意的是ProcessingTimeTrigger類只會在窗口的最終時間到達的時候觸發(fā)窗口函數的計算,計算完成后并不會清除窗口中的數據,這些數據存儲在內存中,除非調用PURGE或FIRE_AND_PURGE,否則數據將一直存在內存中。實際上,Flink中提供的Trigger類,除了PurgingTrigger類,其他的都不會對窗口中的數據進行清除。

EventTimeTriggerr在onElement設置的定時器:

圖片圖片

EventTime通過registerEventTimeTimer注冊定時器,在內部Watermark達到或超過Timer設定的時間戳時觸發(fā)。

二、移除器(Evictor)

2.1 Evictor扮演的角色

圖片圖片

當一個元素進入stream中之后,一般要經歷Window(開窗)、Trigger(觸發(fā)器)、Evitor(移除器)、Windowfunction(窗口計算操作),具體過程如下:


  • Window中的WindowAssigner(窗口分配器)定義了數據應該被分配到哪個窗口中,每一個 WindowAssigner都會有一個默認的Trigger,如果用戶在代碼中指定了窗口的trigger,默認的 trigger 將會被覆蓋。
  • Trigger上會有定時器,用來決定一個窗口何時能夠被計算或清除。Trigger的返回結果可以是 continue(不做任何操作),fire(處理窗口數據),purge(移除窗口和窗口中的數據),或者 fire + purge。一個Trigger的調用結果只是fire的話,那么會計算窗口并保留窗口原樣,也就是說窗口中的數據仍然保留不變,等待下次Trigger fire的時候再次執(zhí)行計算。一個窗口可以被重復計算多次知道它被 purge 了。在purge之前,窗口會一直占用著內存。
  • 當Trigger fire了,窗口中的元素集合就會交給Evictor(如果指定了的話)。Evictor 主要用來遍歷窗口中的元素列表,并決定最先進入窗口的多少個元素需要被移除。剩余的元素會交給用戶指定的函數進行窗口的計算。如果沒有 Evictor 的話,窗口中的所有元素會一起交給WindowFunction進行計算。
  • WindowFunction收到了窗口的元素(可能經過了 Evictor 的過濾),并計算出窗口的結果值,并發(fā)送給下游。窗口計算操作有很多,比如預定義的sum(),min(),max(),還有 ReduceFunction,WindowFunction。WindowFunction 是最通用的計算函數,其他的預定義的函數基本都是基于該函數實現的。

現在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪個位置,讓我們繼續(xù)看為何使用Evictor。

Evictor接口定義如下:

圖片圖片

evictBefore()包含要在窗口函數之前應用的清除邏輯,而evictAfter()包含要在窗口函數之后應用的清除邏輯。應用窗口函數之前清除的元素將不會被窗口函數處理。

窗格是具有相同Key和相同窗口的元素組成的桶,即同一個窗口中相同Key的元素一定屬于同一個窗格。一個元素可以在多個窗格中(當一個元素被分配給多個窗口時),這些窗格都有自己的清除器實例。

注:window默認沒有evictor,一旦把window指定Evictor,該window會由EvictWindowOperator類來負責操作。

2.2 Flink內置的Evitor

  • CountEvictor:保留窗口中用戶指定的元素數量,并丟棄窗口緩沖區(qū)剩余的元素。
  • DeltaEvictor:依次計算窗口緩沖區(qū)中的最后一個元素與其余每個元素之間的delta值,若delta值大于等于指定的閾值,則該元素會被移除。使用DeltaEvictor清除器需要指定兩個參數,一個是double類型的閾值;另一個是DeltaFunction接口的實例,DeltaFunction用于指定具體的delta值計算邏輯。
  • TimeEvictor:傳入一個以毫秒為單位的時間間隔參數(例如以size表示),對于給定的窗口,取窗口中元素的最大時間戳(例如以max表示),使用TimeEvictor清除器將刪除所有時間戳小于或等于max-size的元素(即清除從窗口開頭到指定的截止時間之間的元素)。

2.2.1 CountEvictor

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
    if (size <= maxCount) {
        // 小于最大數量,不做處理
        return;
    } else {
        int evictedCount = 0;
        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
            iterator.next();
            evictedCount++;
            if (evictedCount > size - maxCount) {
                break;
            } else {
                // 移除前size - maxCount個元素,只剩下最后maxCount個元素
                iterator.remove();
            }
        }
    }
}

2.2.2 DeltaEvictor

DeltaEvictor通過計算DeltaFunction的值(依次傳入每個元素和最后一個元素),并將其與threshold進行對比,如果DeltaFunction計算結果大于等于threshold,則該元素會被移除。DeltaEvictor的實現如下:

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {

    // 獲取最后一個元素
    TimestampedValue<T> lastElement = Iterables.getLast(elements);

    for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
        TimestampedValue<T> element = iterator.next();
        // 依次計算每個元素和最后一個元素的delta值,同時和threshold的值進行比較
        // 若計算結果大于threshold值或者是相等,則該元素會被移除
        if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
            iterator.remove();
        }
    }
}

2.2.3 TimeEvictor

TimeEvictor以時間為判斷標準,決定元素是否會被移除。TimeEvictor會獲取窗口中所有元素的最大時間戳currentTime,currentTime減去窗口大小(windowSize) 可得到能保留最久的元素的時間戳evictCutoff,然后再遍歷窗口中的元素,如果元素的時間戳小于evictCutoff,就執(zhí)行移除操作,否則不移除。具體邏輯如下圖所示:

TimeEvictor的代碼實現如下:

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {

    // 如果element沒有timestamp,直接返回
    if (!hasTimestamp(elements)) {
        return;
    }

    // 獲取elements中最大的時間戳(到來最晚的元素的時間)
    long currentTime = getMaxTimestamp(elements);
    // 截止時間為: 到來最晚的元素的時間 - 窗口大小(可以理解為保留最近的多久的元素)
    long evictCutoff = currentTime - windowSize;

    for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
        TimestampedValue<Object> record = iterator.next();

        // 清除所有時間戳小于截止時間的元素
        if (record.getTimestamp() <= evictCutoff) {
            iterator.remove();
        }
    }
}


責任編輯:武曉燕 來源: 老周聊架構
相關推薦

2010-05-04 09:44:12

Oracle Trig

2024-02-27 08:05:32

Flink分區(qū)機制數據傳輸

2011-05-20 14:06:25

Oracle觸發(fā)器

2024-01-29 08:07:42

FlinkYARN架構

2024-04-09 07:50:59

Flink語義Watermark

2011-05-19 14:29:49

Oracle觸發(fā)器語法

2011-04-14 13:54:22

Oracle觸發(fā)器

2010-05-31 18:06:07

MySQL 觸發(fā)器

2010-10-12 10:04:15

MySQL觸發(fā)器

2009-09-18 14:31:33

CLR觸發(fā)器

2011-03-28 10:05:57

sql觸發(fā)器代碼

2009-04-26 22:27:54

觸發(fā)器密碼修改數據庫

2009-10-22 17:18:20

CLR觸發(fā)器

2009-04-07 13:56:03

SQL Server觸發(fā)器實例

2010-05-18 15:36:44

MySQL觸發(fā)器

2010-10-11 14:52:43

Mysql觸發(fā)器

2009-11-18 13:15:06

Oracle觸發(fā)器

2021-07-30 10:33:57

MySQL觸發(fā)器數據

2011-03-03 09:30:24

downmoonsql登錄觸發(fā)器

2010-05-18 15:58:39

MySQL觸發(fā)器
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 99爱在线| 欧美成年网站 | a级性视频 | 国产精品国产亚洲精品看不卡15 | 午夜一区二区三区 | 日韩av一二三区 | 国产精品1区 | 亚洲成人动漫在线观看 | 国产免费一二三区 | 日本三级电影免费 | 国产精品爱久久久久久久 | 国产精品日韩欧美一区二区 | 欧美激情久久久 | 亚洲一区二区三区国产 | 久久免费视频网 | 中文字幕精品一区二区三区精品 | 国产精品视频久久久 | 欧美在线成人影院 | 二区视频| 欧美在线激情 | 久久久成人免费一区二区 | 亚洲 欧美 日韩 在线 | 久久99这里只有精品 | 不卡视频一区二区三区 | 久久精品一级 | 中文字幕在线二区 | 日韩成人精品在线 | 中文字幕在线观看视频一区 | 日本不卡一区二区三区在线观看 | 国产精品久久久久久久免费观看 | 91.xxx.高清在线 | 岛国精品 | 中文字幕日韩一区 | 亚洲欧美男人天堂 | 亚洲成人免费在线 | 在线观看日韩精品视频 | 国产美女在线观看 | 人人澡人人射 | 欧一区| 久久精品小视频 | 亚洲精品视频播放 |