成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

五分鐘了解Flink狀態管理

開發 架構
本文主要介紹了Flink的狀態和狀態管理,以及Demo和相關代碼,希望對你有幫助!

什么叫做Flink的有狀態計算呢?說白了就是將之前的中間結果暫時存儲起來,等待后續的事件數據過來后,可以使用之前的中間結果繼續計算。本文主要介紹Flink狀態計算和管理、代碼示例。

1、有狀態的計算

什么是Flink的有狀態的計算。在流式計算過程中將算子的中間結果保存在內存或者文件系統中,等下一個事件進入算子后可以從之前的狀態中獲取中間結果,以便計算當前的結果,從而無需每次都基于全部的原始數據來統計結果,極大地提升了系統性能。

每一個具有一定復雜度的流計算應用都是有狀態的,任何運行基本業務邏輯的流處理應用都需要在一定時間內存儲所接受的事件或者中間結果。

2、狀態管理

Flink如何管理狀態?主要就是:本地訪問和存儲、容錯性(可以自動按一定的時間間隔產生快照,并且在任務失敗后進行恢復)。

狀態(State)操作是指需要把當前數據和歷史計算結果進行累加計算,即當前數據的處理需要使用之前的數據或中間結果。

例如,對數據流中的實時單詞進行計數,每當接收到新的單詞時,需要將當前單詞數量累加到之前的結果中。這里單詞的數量就是狀態,對單詞數量的更新就是狀態的更新。如下圖:

狀態的計算模型,如下圖:

如下圖,Source、map()、keyBy()/window()/apply()算子的并行度為2,Sink算子的并行度為1。keyBy()/window()/apply()算子是有狀態的,并且map()與keyBy()/window()/apply()算子之間通過網絡進行數據分發。

Flink應用程序的狀態訪問都在本地進行,這樣有助于提高吞吐量和降低延遲。通常情況下,Flink應用程序都是將狀態存儲在JVM堆內存中,但如果狀態數據太大,也可以選擇將其以結構化數據格式存儲在高速磁盤中。

通過狀態快照,Flink能夠提供可容錯的、精確一次的計算語義。Flink應用程序在執行時會獲取并存儲分布式Pipeline(流處理管道)中整體的狀態,它會將數據源中消費數據的偏移量記錄下來,并將整個作業圖中算子獲取到該數據(記錄的偏移量對應的數據)時的狀態記錄并存儲下來。

當發生故障時,Flink作業會恢復上次存儲的狀態,重置數據源從狀態中記錄的上次消費的偏移量,開始重新進行消費處理。而且狀態快照在執行時會異步獲取狀態并存儲,并不會阻塞正在進行的數據處理邏輯。這個機制跟Kafka等消息中間件的消費方式很像。

Flink需要知道狀態,以便可以使用Checkpoint和Savepoint來保證容錯(下一篇會繼續介紹)。狀態還允許重新調整Flink應用程序,這意味著Flink負責在并行實例之間重新分配狀態。

3、Keyed State

Keyed State是Flink提供的按照Key進行分區的狀態機制。

在通過keyBy()分組的KeyedStream上使用,對每個Key的數據進行狀態存儲和管理,狀態是跟每個Key綁定的,即每個Key對應一個狀態對象。

Keyed State支持的狀態數據類型如下:ValueState、ListState、ReducingState、AggregatingState<IN, OUT>、MapState<UK, UV>。下文以ValueState為例,介紹如何使用。

4、狀態管理示例和代碼

我們來模擬這樣一個場景:如果某個用戶1分鐘內連續兩次退款,第二次則發出告警。

模擬訂單對象:

public class OrderBO {
    /**
     * ID
     */
    private Integer id;
    /***
     * 訂單標題
     */
    private String title;
    /**
     * 訂單金額
     */
    private Integer amount;
    /**
     * 訂單狀態:1-已支付,2-已退款
     */
    private Integer state;
    /**
     * 用戶ID
     */
    private String userId;
}

利用狀態管理,處理告警邏輯:

/**
* 告警處理邏輯
**/
private static class AlarmLogic extends KeyedProcessFunction<String, OrderBO, OrderBO> {
    // 是否已經出現退款的標記
    private ValueState<Boolean> flagState;
    // 定時器,時間到了會清掉狀態
    private ValueState<Long> timerState;
    private static final long ONE_MINUTE = 60 * 1000;

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
                "timer-state",
                Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

    @Override
    public void processElement(OrderBO value, KeyedProcessFunction<String, OrderBO, OrderBO>.Context context, Collector<OrderBO> collector) throws Exception {
        Boolean refundFlag = flagState.value();

        // 如果已經退款過一次了,如果再出現退款則發射給下個算子,然后清理掉定時器。狀態2代表退款
        if (refundFlag != null && refundFlag) {
            if (value.getState() == 2) {
                collector.collect(value);
            }
            cleanUp(context);
        } else {
            // 如果第一次出現退款,則寫入狀態,同時開啟定時器。狀態2代表退款
            if (value.getState() == 2) {
                flagState.update(true);
                long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
                context.timerService().registerProcessingTimeTimer(timer);
                timerState.update(timer);
            }
        }
    }

    /**
     * 定時器到了之后,清理狀態值
     */
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, OrderBO, OrderBO>.OnTimerContext ctx, Collector<OrderBO> out) throws Exception {
        timerState.clear();
        flagState.clear();
    }

    /**
     * 手動清理狀態值
     *
     * @param ctx
     * @throws Exception
     */
    private void cleanUp(Context ctx) throws Exception {
        Long timer = timerState.value();
        ctx.timerService().deleteProcessingTimeTimer(timer);

        timerState.clear();
        flagState.clear();
    }
}

模式生成數據和主流程代碼:

public static void main(String[] args) throws Exception {
    // 1、執行環境創建
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    // 2、讀取Socket數據端口。實際根據具體業務對接數據來源
    DataStreamSource<String> orderStream = environment.socketTextStream("localhost", 9527);
    // 3、數據讀取個切割方式
    SingleOutputStreamOperator<OrderBO> resultDataStream = orderStream
            .flatMap(new CleanDataAnd2Order()) // 清洗和處理數據
            .keyBy(x -> x.getUserId()) // 分區
            .process(new AlarmLogic()); // 處理告警邏輯

    // 4、打印分析結果
    resultDataStream.print("告警===>");
    // 5、環境啟動
    environment.execute("OrderAlarmApp");
}

模擬數據:

模擬場景:某個用戶1分鐘內連續兩次退款,第二次發出告警。
示例數據:
1,aaa,100,1,user1
2,bbb,200,1,user2
3,ccc,300,2,user1
4,ddd,400,2,user1

5,ddd,400,2,user1
6,bbb,200,2,user2
7,bbb,400,2,user2

完整代碼地址:https://github.com/yclxiao/flink-blog/blob/7eb84d18aa71d8f2023d6158796de34d331b9b3f/src/main/java/top/mangod/flinkblog/demo005/OrderAlarmApp.java#L43

責任編輯:趙寧寧 來源: 不焦躁的程序員
相關推薦

2021-10-19 07:27:08

HTTP代理網絡

2009-11-05 14:53:54

Visual Stud

2022-12-16 09:55:50

網絡架構OSI

2024-06-25 12:25:12

LangChain路由鏈

2024-05-13 09:28:43

Flink SQL大數據

2009-11-06 10:25:34

WCF元數據交換

2024-09-23 17:05:44

2009-10-26 15:45:43

VB.NET類構造

2020-02-19 19:26:27

K8S開源平臺容器技術

2020-05-12 09:10:24

瀏覽器服務器網絡

2023-07-26 07:11:50

LVM底層抽象

2020-03-06 10:45:48

機器學習人工智能神經網絡

2024-08-13 11:13:18

2009-11-02 18:07:58

Oracle數據庫

2021-09-18 11:36:38

混沌工程云原生故障

2024-04-28 12:55:46

redis頻道機制

2023-12-12 08:00:50

節點哈希算法

2024-12-11 07:00:00

面向對象代碼

2025-03-13 06:22:59

2009-11-16 10:53:30

Oracle Hint
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 老妇激情毛片免费 | 97伊人| 99视频免费播放 | 播放一级毛片 | 日韩精品免费在线 | 国产成人精品免高潮在线观看 | 精品国产第一区二区三区 | 日韩欧美在线观看视频 | 亚洲精品久久国产高清情趣图文 | 国产传媒在线观看 | 91视频亚洲| 久久婷婷香蕉热狠狠综合 | 日本精品一区二区三区视频 | 在线观看不卡av | 国产一区二区三区在线 | 狠狠操电影 | 天天弄天天操 | 日韩中文一区二区 | 青久草视频 | 一区免费观看 | 国产一级片av | 亚洲欧美激情四射 | 户外露出一区二区三区 | 男人视频网站 | 国产日韩精品视频 | 欧美激情网站 | 羞羞视频网站免费观看 | av免费在线播放 | 懂色av一区二区三区在线播放 | 午夜电影网站 | 成人av一区二区在线观看 | 国产a区 | 亚洲成人三级 | 亚洲国产电影 | 国产97在线 | 日韩 | 久久亚洲一区二区三 | 久久久91 | av天天干 | 日韩手机在线视频 | 成人在线免费电影 | 欧美成人免费 |