成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink state 序列化Java enum 竟然岔劈了

開發(fā) 前端
本文主要介紹 flink 枚舉值 serde 中的坑,當(dāng)在 enum 中添加刪除枚舉值時(shí),就有可能導(dǎo)致狀態(tài)岔劈。隨后給出了原因是由于 enum serde 器的實(shí)現(xiàn)導(dǎo)致的這種情況,最后給出了解決方案。

[[419876]]

1.序篇-先說結(jié)論

本文主要記錄博主在生產(chǎn)環(huán)境中踩的 flink 針對(duì) java enum serde 時(shí)的坑。

結(jié)論:在 flink 程序中,如果狀態(tài)中有存儲(chǔ) java enum,那么添加或者刪除 enum 中的一個(gè)枚舉值時(shí),就有可能導(dǎo)致狀態(tài)恢復(fù)異常,這里的異常可能不是在恢復(fù)過程中會(huì)實(shí)際拋出一個(gè)異常,而是有可能是 enum A 的值恢復(fù)給 enum B。

我從以下幾個(gè)章節(jié)說明、解決這個(gè)問題,希望能拋磚引玉,帶給大家一些啟發(fā)。

  • 踩坑場景篇-這個(gè)坑是啥樣的
  • 問題排查篇-坑的排查過程
  • 問題原理解析篇-導(dǎo)致問題的機(jī)制是什么
  • 避坑篇-如何避免這種問題
  • 總結(jié)篇

2.踩坑場景篇-這個(gè)坑是啥樣的

對(duì)任務(wù)做一個(gè)簡單的過濾條件修改,任務(wù)重新上線之后,從 flink web ui 確認(rèn)是從 savepoint 重啟成功了,但是實(shí)際最終產(chǎn)出的數(shù)據(jù)上來看卻像是沒有從 savepoint 重啟。

邏輯就是計(jì)算分維度的當(dāng)天累計(jì) pv。代碼很簡單,在后面會(huì)貼出來。

如下圖:

在 00:04 分重啟時(shí)出現(xiàn)了當(dāng)天累計(jì) pv 出現(xiàn)了從零累計(jì)的情況。

但是預(yù)期正常的曲線應(yīng)該張下面這樣。

任務(wù)是使用 DataStream 編寫(基于 flink 1.13.1)。

  1. public class SenerioTest { 
  2.      
  3.     public static void main(String[] args) throws Exception { 
  4.         StreamExecutionEnvironment env = 
  5.                 StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); 
  6.  
  7.         env.setParallelism(1); 
  8.  
  9.         env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); 
  10.  
  11.         env.addSource(new SourceFunction<SourceModel>() { 
  12.  
  13.             private volatile boolean isCancel = false
  14.  
  15.             @Override 
  16.             public void run(SourceContext<SourceModel> ctx) throws Exception { 
  17.                 // 數(shù)據(jù)源 
  18.             } 
  19.  
  20.             @Override 
  21.             public void cancel() { 
  22.                 this.isCancel = true
  23.             } 
  24.         }) 
  25.         .keyBy(new KeySelector<SourceModel, Long>() { 
  26.             @Override 
  27.             public Long getKey(SourceModel value) throws Exception { 
  28.                 return value.getUserId() % 1000; 
  29.             } 
  30.         }) 
  31.         .timeWindow(Time.minutes(1)) 
  32.         .aggregate( 
  33.             new AggregateFunction<SourceModel, Map<Tuple2<DimNameEnum, String>, Long>, Map<Tuple2<DimNameEnum, String>, Long>>() { 
  34.  
  35.                 @Override 
  36.                 public Map<Tuple2<DimNameEnum, String>, Long> createAccumulator() { 
  37.                     return new HashMap<>(); 
  38.                 } 
  39.  
  40.                 @Override 
  41.                 public Map<Tuple2<DimNameEnum, String>, Long> add(SourceModel value, 
  42.                         Map<Tuple2<DimNameEnum, String>, Long> accumulator) { 
  43.  
  44.                     Lists.newArrayList(Tuple2.of(DimNameEnum.province, value.getProvince()) 
  45.                             , Tuple2.of(DimNameEnum.age, value.getAge()) 
  46.                             , Tuple2.of(DimNameEnum.sex, value.getSex())) 
  47.                             .forEach(t -> { 
  48.                                 Long l = accumulator.get(t); 
  49.  
  50.                                 if (null == l) { 
  51.                                     accumulator.put(t, 1L); 
  52.                                 } else { 
  53.                                     accumulator.put(t, l + 1); 
  54.                                 } 
  55.                             }); 
  56.  
  57.                     return accumulator; 
  58.                 } 
  59.  
  60.                 @Override 
  61.                 public Map<Tuple2<DimNameEnum, String>, Long> getResult( 
  62.                         Map<Tuple2<DimNameEnum, String>, Long> accumulator) { 
  63.                     return accumulator; 
  64.                 } 
  65.  
  66.                 @Override 
  67.                 public Map<Tuple2<DimNameEnum, String>, Long> merge( 
  68.                         Map<Tuple2<DimNameEnum, String>, Long> a, 
  69.                         Map<Tuple2<DimNameEnum, String>, Long> b) { 
  70.                     return null
  71.                 } 
  72.             }, 
  73.             new ProcessWindowFunction<Map<Tuple2<DimNameEnum, String>, Long>, SinkModel, Long, TimeWindow>() { 
  74.  
  75.                 private transient ValueState<Map<Tuple2<DimNameEnum, String>, Long>> todayPv; 
  76.  
  77.                 @Override 
  78.                 public void open(Configuration parameters) throws Exception { 
  79.                     super.open(parameters); 
  80.                     this.todayPv = getRuntimeContext().getState(new ValueStateDescriptor<Map<Tuple2<DimNameEnum, String>, Long>>( 
  81.                             "todayPv", TypeInformation.of
  82.                             new TypeHint<Map<Tuple2<DimNameEnum, String>, Long>>() { 
  83.                             }))); 
  84.                 } 
  85.  
  86.                 @Override 
  87.                 public void process(Long aLong, Context context, 
  88.                         Iterable<Map<Tuple2<DimNameEnum, String>, Long>> elements, Collector<SinkModel> out
  89.                         throws Exception { 
  90.                     // 將 elements 數(shù)據(jù) merge 到 todayPv 中 
  91.                     // 每天零點(diǎn)將 state 清空重新累計(jì) 
  92.                     // 然后 out#collect 出去即可 
  93.                 } 
  94.             }); 
  95.  
  96.         env.execute(); 
  97.     } 
  98.  
  99.     @Data 
  100.     @Builder 
  101.     private static class SourceModel { 
  102.         private long userId; 
  103.         private String province; 
  104.         private String age; 
  105.         private String sex; 
  106.         private long timestamp
  107.     } 
  108.  
  109.  
  110.     @Data 
  111.     @Builder 
  112.     private static class SinkModel { 
  113.         private String dimName; 
  114.         private String dimValue; 
  115.         private long timestamp
  116.     } 
  117.  
  118.     enum DimNameEnum { 
  119.         province, 
  120.         age, 
  121.         sex, 
  122.         ; 
  123.     } 
  124.  

3.問題排查篇-坑的排查過程

3.1.愚蠢的懷疑引擎

首先懷疑是狀態(tài)沒有正常恢復(fù)。

但是查看 flink web ui 以及 tm 日志,都顯示是從 savepoint 正常恢復(fù)了。

還懷疑是不是出現(xiàn)了 flink web ui 展示的內(nèi)容和實(shí)際的執(zhí)行不一致的情況。

但是發(fā)現(xiàn)任務(wù)的 ck 大小是正常的,復(fù)合預(yù)期的。

3.2.老老實(shí)實(shí)打 log 吧

既然能從 savepoint 正常恢復(fù),那么就把狀態(tài)值用 log 打出來看看到底發(fā)生了什么事情唄。

如下列代碼,在 ProcessWindowFunction 中加上 log 日志。

  1. this.todayPv.value() 
  2.     .forEach(new BiConsumer<Tuple2<DimNameEnum, String>, Long>() { 
  3.         @Override 
  4.         public void accept(Tuple2<DimNameEnum, String> k, 
  5.                 Long v) { 
  6.             log.info("key 值:{},value 值:{}", k.toString(), v); 
  7.         } 
  8.     }); 

發(fā)現(xiàn)結(jié)果如下:

  1. ... 
  2. key 值:(uv_type,男),value 值:1000 
  3. ... 

發(fā)現(xiàn)狀態(tài)中存儲(chǔ)的 DimNameEnum.province,DimNameEnum.age 的數(shù)據(jù)都是正確的,但是缺缺少了 DimNameEnum.sex,多了 (uv_type,男) 這樣的數(shù)據(jù),于是查看代碼,發(fā)現(xiàn)之前多加了一種枚舉類型 DimNameEnum.uv_type。代碼如下:

  1. enum DimNameEnum { 
  2.     province, 
  3.     age, 
  4.     uv_type, 
  5.     sex, 
  6.     ; 

于是懷疑 flink 針對(duì)枚舉值的 serde 不是按照枚舉值名稱來進(jìn)行匹配的,而是按照枚舉值下標(biāo)來進(jìn)行匹配的。因此就出現(xiàn)了 DimNameEnum.uv_type 將 DimNameEnum.sex 的位置占了的情況。

4.問題原理解析篇-導(dǎo)致問題的機(jī)制是什么

來看看源碼吧。

測試代碼如下:

  1. public class EnumsStateTest { 
  2.  
  3.     public static void main(String[] args) throws Exception { 
  4.         StreamExecutionEnvironment env = 
  5.                 StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); 
  6.  
  7.         env.setParallelism(1); 
  8.  
  9.         env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); 
  10.  
  11.         TypeInformation<StateTestEnums> t = TypeInformation.of(StateTestEnums.class); 
  12.  
  13.         EnumSerializer<StateTestEnums> e = (EnumSerializer<StateTestEnums>) t.createSerializer(env.getConfig()); 
  14.  
  15.         DataOutputSerializer d = new DataOutputSerializer(10000); 
  16.  
  17.         e.serialize(StateTestEnums.A, d); 
  18.  
  19.         env.execute(); 
  20.     } 
  21.  
  22.     enum StateTestEnums { 
  23.         A, 
  24.         B, 
  25.         C 
  26.         ; 
  27.     } 

debug 結(jié)果如下:

首先看看對(duì)應(yīng)的 TypeInformation 和 TypeSerializer。

發(fā)現(xiàn) enum 類型的序列化器是 EnumSerializer, 看看 EnumSerializer 的 serde 實(shí)現(xiàn),如圖所示:

最關(guān)鍵的兩個(gè)變量:

  • 序列化時(shí)用 valueToOrdinal
  • 反序列化時(shí)用 values

從而印證了上面的說法。flink enum 序列化時(shí)使用的是枚舉值下標(biāo)進(jìn)行 serde,因此一旦枚舉值順序發(fā)生改變,或者添加、刪除一個(gè)枚舉值,就會(huì)導(dǎo)致其他枚舉值的下標(biāo)出現(xiàn)錯(cuò)位的情況。從而導(dǎo)致數(shù)據(jù)錯(cuò)誤。

5.避坑篇-如何避免這種問題

5.1.枚舉解決

在上述場景中,如果又想要把新枚舉值加進(jìn)去,又需要狀態(tài)能夠正常恢復(fù),正常產(chǎn)出數(shù)據(jù)。

那么可以把新的枚舉值在尾部添加,比如下面這樣。

  1. enum DimNameEnum { 
  2.     province, 
  3.     age, 
  4.     sex, 
  5.     uv_type, // 添加在尾部 
  6.     ; 

5.2.非枚舉解決

還有一種方法如標(biāo)題,就是別用枚舉值,直接用 string 就 vans 了。

6.總結(jié)篇

本文主要介紹了 flink 枚舉值 serde 中的坑,當(dāng)在 enum 中添加刪除枚舉值時(shí),就有可能導(dǎo)致狀態(tài)岔劈。隨后給出了原因是由于 enum serde 器的實(shí)現(xiàn)導(dǎo)致的這種情況,最后給出了解決方案。

本文轉(zhuǎn)載自微信公眾號(hào)「大數(shù)據(jù)羊說」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系大數(shù)據(jù)羊說公眾號(hào)。

 

責(zé)任編輯:姜華 來源: 大數(shù)據(jù)羊說
相關(guān)推薦

2018-03-19 10:20:23

Java序列化反序列化

2022-08-06 08:41:18

序列化反序列化Hessian

2013-03-11 13:55:03

JavaJSON

2009-06-14 22:01:27

Java對(duì)象序列化反序列化

2011-06-01 15:05:02

序列化反序列化

2009-08-24 17:14:08

C#序列化

2011-03-04 09:25:51

Java序列化

2009-08-06 11:16:25

C#序列化和反序列化

2011-05-18 15:20:13

XML

2023-12-13 13:49:52

Python序列化模塊

2010-03-19 15:54:21

Java Socket

2011-06-01 14:26:11

序列化

2009-03-10 13:38:01

Java序列化字節(jié)流

2016-09-21 00:15:27

2011-06-01 14:50:48

2019-11-20 10:07:23

web安全PHP序列化反序列化

2009-08-25 14:24:36

C#序列化和反序列化

2021-11-18 07:39:41

Json 序列化Vue

2011-04-02 09:04:49

Java序列化

2016-01-05 15:10:59

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 日本在线网址 | 色爽女 | 二区久久 | 六月婷婷久久 | 免费麻豆视频 | 成人精品视频在线观看 | 天天躁日日躁性色aⅴ电影 免费在线观看成年人视频 国产欧美精品 | 在线国产一区 | 免费a大片 | 福利视频网 | 99热精品在线观看 | 欧美xxxx做受欧美 | 亚洲精品视频一区二区三区 | 天堂成人国产精品一区 | 激情五月婷婷综合 | 在线观看中文视频 | 国产精华一区 | 毛片黄| 久久久久久久国产精品 | 亚洲激情综合 | 日韩一区二区三区在线 | 免费h视频| 国产精品久久久久久久久 | 欧美日韩在线国产 | 国产黄色大片在线免费观看 | 亚洲免费在线 | 天天夜夜操 | 91欧美激情一区二区三区成人 | 久久久久亚洲精品 | 亚洲最大福利网 | 欧美日批| 91在线观看视频 | 91精品国产综合久久久动漫日韩 | 久久婷婷av | 国产精品2区 | 亚洲一区| 久久久美女 | 久久亚洲一区 | 欧美中文字幕在线 | 伦理一区二区 | 在线视频a|