Flink state 序列化Java enum 竟然岔劈了
1.序篇-先說結(jié)論
本文主要記錄博主在生產(chǎn)環(huán)境中踩的 flink 針對(duì) java enum serde 時(shí)的坑。
結(jié)論:在 flink 程序中,如果狀態(tài)中有存儲(chǔ) java enum,那么添加或者刪除 enum 中的一個(gè)枚舉值時(shí),就有可能導(dǎo)致狀態(tài)恢復(fù)異常,這里的異常可能不是在恢復(fù)過程中會(huì)實(shí)際拋出一個(gè)異常,而是有可能是 enum A 的值恢復(fù)給 enum B。
我從以下幾個(gè)章節(jié)說明、解決這個(gè)問題,希望能拋磚引玉,帶給大家一些啟發(fā)。
- 踩坑場景篇-這個(gè)坑是啥樣的
- 問題排查篇-坑的排查過程
- 問題原理解析篇-導(dǎo)致問題的機(jī)制是什么
- 避坑篇-如何避免這種問題
- 總結(jié)篇
2.踩坑場景篇-這個(gè)坑是啥樣的
對(duì)任務(wù)做一個(gè)簡單的過濾條件修改,任務(wù)重新上線之后,從 flink web ui 確認(rèn)是從 savepoint 重啟成功了,但是實(shí)際最終產(chǎn)出的數(shù)據(jù)上來看卻像是沒有從 savepoint 重啟。
邏輯就是計(jì)算分維度的當(dāng)天累計(jì) pv。代碼很簡單,在后面會(huì)貼出來。
如下圖:
在 00:04 分重啟時(shí)出現(xiàn)了當(dāng)天累計(jì) pv 出現(xiàn)了從零累計(jì)的情況。
但是預(yù)期正常的曲線應(yīng)該張下面這樣。
任務(wù)是使用 DataStream 編寫(基于 flink 1.13.1)。
- public class SenerioTest {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
- env.setParallelism(1);
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
- env.addSource(new SourceFunction<SourceModel>() {
- private volatile boolean isCancel = false;
- @Override
- public void run(SourceContext<SourceModel> ctx) throws Exception {
- // 數(shù)據(jù)源
- }
- @Override
- public void cancel() {
- this.isCancel = true;
- }
- })
- .keyBy(new KeySelector<SourceModel, Long>() {
- @Override
- public Long getKey(SourceModel value) throws Exception {
- return value.getUserId() % 1000;
- }
- })
- .timeWindow(Time.minutes(1))
- .aggregate(
- new AggregateFunction<SourceModel, Map<Tuple2<DimNameEnum, String>, Long>, Map<Tuple2<DimNameEnum, String>, Long>>() {
- @Override
- public Map<Tuple2<DimNameEnum, String>, Long> createAccumulator() {
- return new HashMap<>();
- }
- @Override
- public Map<Tuple2<DimNameEnum, String>, Long> add(SourceModel value,
- Map<Tuple2<DimNameEnum, String>, Long> accumulator) {
- Lists.newArrayList(Tuple2.of(DimNameEnum.province, value.getProvince())
- , Tuple2.of(DimNameEnum.age, value.getAge())
- , Tuple2.of(DimNameEnum.sex, value.getSex()))
- .forEach(t -> {
- Long l = accumulator.get(t);
- if (null == l) {
- accumulator.put(t, 1L);
- } else {
- accumulator.put(t, l + 1);
- }
- });
- return accumulator;
- }
- @Override
- public Map<Tuple2<DimNameEnum, String>, Long> getResult(
- Map<Tuple2<DimNameEnum, String>, Long> accumulator) {
- return accumulator;
- }
- @Override
- public Map<Tuple2<DimNameEnum, String>, Long> merge(
- Map<Tuple2<DimNameEnum, String>, Long> a,
- Map<Tuple2<DimNameEnum, String>, Long> b) {
- return null;
- }
- },
- new ProcessWindowFunction<Map<Tuple2<DimNameEnum, String>, Long>, SinkModel, Long, TimeWindow>() {
- private transient ValueState<Map<Tuple2<DimNameEnum, String>, Long>> todayPv;
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.todayPv = getRuntimeContext().getState(new ValueStateDescriptor<Map<Tuple2<DimNameEnum, String>, Long>>(
- "todayPv", TypeInformation.of(
- new TypeHint<Map<Tuple2<DimNameEnum, String>, Long>>() {
- })));
- }
- @Override
- public void process(Long aLong, Context context,
- Iterable<Map<Tuple2<DimNameEnum, String>, Long>> elements, Collector<SinkModel> out)
- throws Exception {
- // 將 elements 數(shù)據(jù) merge 到 todayPv 中
- // 每天零點(diǎn)將 state 清空重新累計(jì)
- // 然后 out#collect 出去即可
- }
- });
- env.execute();
- }
- @Data
- @Builder
- private static class SourceModel {
- private long userId;
- private String province;
- private String age;
- private String sex;
- private long timestamp;
- }
- @Data
- @Builder
- private static class SinkModel {
- private String dimName;
- private String dimValue;
- private long timestamp;
- }
- enum DimNameEnum {
- province,
- age,
- sex,
- ;
- }
- }
3.問題排查篇-坑的排查過程
3.1.愚蠢的懷疑引擎
首先懷疑是狀態(tài)沒有正常恢復(fù)。
但是查看 flink web ui 以及 tm 日志,都顯示是從 savepoint 正常恢復(fù)了。
還懷疑是不是出現(xiàn)了 flink web ui 展示的內(nèi)容和實(shí)際的執(zhí)行不一致的情況。
但是發(fā)現(xiàn)任務(wù)的 ck 大小是正常的,復(fù)合預(yù)期的。
3.2.老老實(shí)實(shí)打 log 吧
既然能從 savepoint 正常恢復(fù),那么就把狀態(tài)值用 log 打出來看看到底發(fā)生了什么事情唄。
如下列代碼,在 ProcessWindowFunction 中加上 log 日志。
- this.todayPv.value()
- .forEach(new BiConsumer<Tuple2<DimNameEnum, String>, Long>() {
- @Override
- public void accept(Tuple2<DimNameEnum, String> k,
- Long v) {
- log.info("key 值:{},value 值:{}", k.toString(), v);
- }
- });
發(fā)現(xiàn)結(jié)果如下:
- ...
- key 值:(uv_type,男),value 值:1000
- ...
發(fā)現(xiàn)狀態(tài)中存儲(chǔ)的 DimNameEnum.province,DimNameEnum.age 的數(shù)據(jù)都是正確的,但是缺缺少了 DimNameEnum.sex,多了 (uv_type,男) 這樣的數(shù)據(jù),于是查看代碼,發(fā)現(xiàn)之前多加了一種枚舉類型 DimNameEnum.uv_type。代碼如下:
- enum DimNameEnum {
- province,
- age,
- uv_type,
- sex,
- ;
- }
于是懷疑 flink 針對(duì)枚舉值的 serde 不是按照枚舉值名稱來進(jìn)行匹配的,而是按照枚舉值下標(biāo)來進(jìn)行匹配的。因此就出現(xiàn)了 DimNameEnum.uv_type 將 DimNameEnum.sex 的位置占了的情況。
4.問題原理解析篇-導(dǎo)致問題的機(jī)制是什么
來看看源碼吧。
測試代碼如下:
- public class EnumsStateTest {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
- env.setParallelism(1);
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
- TypeInformation<StateTestEnums> t = TypeInformation.of(StateTestEnums.class);
- EnumSerializer<StateTestEnums> e = (EnumSerializer<StateTestEnums>) t.createSerializer(env.getConfig());
- DataOutputSerializer d = new DataOutputSerializer(10000);
- e.serialize(StateTestEnums.A, d);
- env.execute();
- }
- enum StateTestEnums {
- A,
- B,
- C
- ;
- }
- }
debug 結(jié)果如下:
首先看看對(duì)應(yīng)的 TypeInformation 和 TypeSerializer。
發(fā)現(xiàn) enum 類型的序列化器是 EnumSerializer, 看看 EnumSerializer 的 serde 實(shí)現(xiàn),如圖所示:
最關(guān)鍵的兩個(gè)變量:
- 序列化時(shí)用 valueToOrdinal
- 反序列化時(shí)用 values
從而印證了上面的說法。flink enum 序列化時(shí)使用的是枚舉值下標(biāo)進(jìn)行 serde,因此一旦枚舉值順序發(fā)生改變,或者添加、刪除一個(gè)枚舉值,就會(huì)導(dǎo)致其他枚舉值的下標(biāo)出現(xiàn)錯(cuò)位的情況。從而導(dǎo)致數(shù)據(jù)錯(cuò)誤。
5.避坑篇-如何避免這種問題
5.1.枚舉解決
在上述場景中,如果又想要把新枚舉值加進(jìn)去,又需要狀態(tài)能夠正常恢復(fù),正常產(chǎn)出數(shù)據(jù)。
那么可以把新的枚舉值在尾部添加,比如下面這樣。
- enum DimNameEnum {
- province,
- age,
- sex,
- uv_type, // 添加在尾部
- ;
- }
5.2.非枚舉解決
還有一種方法如標(biāo)題,就是別用枚舉值,直接用 string 就 vans 了。
6.總結(jié)篇
本文主要介紹了 flink 枚舉值 serde 中的坑,當(dāng)在 enum 中添加刪除枚舉值時(shí),就有可能導(dǎo)致狀態(tài)岔劈。隨后給出了原因是由于 enum serde 器的實(shí)現(xiàn)導(dǎo)致的這種情況,最后給出了解決方案。
本文轉(zhuǎn)載自微信公眾號(hào)「大數(shù)據(jù)羊說」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系大數(shù)據(jù)羊說公眾號(hào)。