高級測試:如何使用Flink對Strom任務的邏輯功能進行復現測試?
Flink和Strom都是時下較為流行的數據流平臺,考慮以下一種應用場景:已經使用Strom完成了對于某一邏輯功能的開發,如果現在期望使用Flink實現相同的邏輯,那么就需要考慮如何使用Flink來對Strom任務的邏輯功能進行最簡單的復現測試。
使用Flink來測試Strom任務的邏輯主要存在兩個最基本的問題:第一,Storm通過自定義的Bolt類實現自定義的邏輯,在Flink中如何實現?第二,Storm按照自定義標準實現數據分發的邏輯,在Flink中如何實現?
本文主要通過兩個最基本的Flink程序實例對上述兩個使用Flink測試Strom任務邏輯存在的基本問題進行解答。
第一個問題,我們可以通過Flink的ProcessFuction類進行實現,通過繼承該類,在該類的processElement方法中實現自定義邏輯。ProcessFuction類如下圖所示,我們可以通過var1這個參數直接獲取當前流中的數據,然后進行自定義的邏輯加工,再通過Collector類var3的collect方法將處理后的數據發送到下一個流中。
假設某一Strom任務的功能邏輯是:① 對初始數據源(一個字符串)末尾添加一個字符串。② 然后再次添加另一個字符串。
我們以上述對字符串加工的Strom任務為例,說明Flink程序如何通過ProcessFuction類對該任務實現復現測試。
(1)Flink主程序,假設初始數據源為“abc”。
(2)第一個業務加工類,給數據流末尾添加“def”。
(3)第二個業務加工類,給數據流末尾添加“ghi”。
(4)執行Flink程序,觀察輸出結果,“abc”被二次加工為“abcdefghi”。
第二個分發數據的問題,我們假設某一Strom任務的功能邏輯是對數據源(股票對象)進行分類,將股價高于X的分為一類,將股價小于等于X的分為另一類。
我們以上述對股票數據對象分類處理的Strom任務為例,說明Flink程序如何通過旁路輸出特性實現對數據流按照自定義標準分類,輸出到不同的子數據流中處理。
Flink 的旁路輸出依然涉及ProcessFunction類的processElement方法,該方法的Context類型的var2參數的主要作用是利用其output方法進行旁路輸出(我們用于進行數據分流)。
Flink的旁路輸出特性可以用來對數據進行分流,通過創建一個流的標簽(OutputTag),再利用這個OutputTag標簽對象作為參數,調用初始/父級數據流的getSideOutput(OutputTag)方法獲取子數據流。
每個流標簽都有一個id,也可以不創建對象,只要流標簽的id相同,其中的數據就相同。因此,可以通過匿名內部類的形式來獲取子數據流。第一個參數是id,第二個參數是數據類型(不能省略)。
(1)創建股票類Stock,屬性包括名稱和價格。
(2)創建消費消息的Flink程序。
(3)創建生產消息的Flink程序。
我們用“STOCK_LOW_PRICE”和“STOCK_HIGH_PRICE”這兩個ID作為兩個旁路輸出標簽的ID。
在processElement方法中,我們通過判斷股票的價格是否大于50區分出低價股和高價股,利用Context對象的output方法進行旁路輸出,把price小于50的Stock對象輸出到ID為“STOCK_LOW_PRICE”的低價股標簽旁路中,而把price大于等于50的Stock對象輸出到ID為“STOCK_HIGH_PRICE”的高價股標簽旁路中。
(4)依次啟動消費者程序、生產者程序,觀察消費者程序控制臺中的輸出:
此時,桌面生成了兩個文件夾,當中記錄了股票數據,result1記錄了小于50的低價股,result2中記錄了股價大于等于50的高價股。
? ?