成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

踩坑記 | Flink Sql Count 還有這種坑!

開發 前端
在各類游戲中都會有一種場景,一個用戶可以從 A 等級升級到 B 等級,用戶可以不斷的升級,但是一個用戶同一時刻只會在同一個等級。需求指標就是當前分鐘各個等級的用戶數。

[[421468]]

本文轉載自微信公眾號「大數據羊說」,作者antigeneral了呀。轉載本文請聯系大數據羊說公眾號。

 1.序篇

通過本文你可了解到

  1. 踩坑場景篇-這個坑是啥樣的
  2. 問題排查篇-坑的排查過程
  3. 問題原理解析篇-導致問題的機制是什么
  4. 避坑篇-如何避免這種問題
  5. 展望篇-有什么機制可以根本避免這種情況

先說下結論:在非窗口類 flink sql 任務中,會存在 retract 機制,即上游會向下游發送「撤回消息(做減法)」,**最新的結果消息(做加法)**兩條消息來計算結果,保證結果正確性。

而如果我們在上下游中間使用了映射類 udf 改變了**撤回消息(做減法)「的一些字段值時,就可能會導致」撤回消息(做減法)**不能被正常處理,最終導致結果的錯誤。

2.踩坑場景篇-這個坑是啥樣的

在介紹坑之前我們先介紹下我們的需求、實現方案的背景。

2.1.背景

在各類游戲中都會有一種場景,一個用戶可以從 A 等級升級到 B 等級,用戶可以不斷的升級,但是一個用戶同一時刻只會在同一個等級。需求指標就是當前分鐘各個等級的用戶數。

2.2.預期效果

2

2.3.解決思路

獲取到當前所有用戶的最新等級

一個用戶同一時刻只會在一個等級,所以對每一個等級的用戶做 count 操作

2.4.解決方案

獲取到當前所有用戶的最新等級:flink sql row_number() 就可以實現,按照數據的 rowtime 進行逆序排序就可以獲取到用戶當前最新的等級

對每一個等級的用戶做 count 操作:對 row_number() 的后的明細結果進行 count 操作

2.4.1.sql

具體實現 sql 如下,非常簡單:

  1. WITH detail_tmp AS ( 
  2.   SELECT 
  3.     等級, 
  4.     id, 
  5.     `timestamp
  6.   FROM 
  7.     ( 
  8.       SELECT 
  9.         等級, 
  10.         id, 
  11.         `timestamp`, 
  12.         -- row_number 獲取最新狀態 
  13.         row_number() over( 
  14.           PARTITION by id 
  15.           ORDER BY 
  16.             `timestampDESC 
  17.         ) AS rn 
  18.       FROM 
  19.         source_db.source_table 
  20.     ) 
  21.   WHERE 
  22.     rn = 1 
  23. SELECT 
  24.   DIM.中文等級 as 等級, 
  25.   sum(part_uv) as uv 
  26. FROM 
  27.   ( 
  28.     SELECT 
  29.       等級, 
  30.       count(id) as part_uv 
  31.     FROM 
  32.       detail_tmp 
  33.     GROUP BY 
  34.       等級, 
  35.       mod(id, 1024) 
  36.   ) 
  37. -- 上游數據的等級名稱是數字,需求方要求給轉換成中文,所以這里加了一個 udf 映射 
  38. LEFT JOIN LATERAL TABLE(等級中文映射_UDF(等級)) AS DIM(中文等級) ON TRUE 
  39. GROUP BY 
  40.   DIM.中文等級 

2.4.2.參數配置

使用 minibatch 參數方式控制數據輸出頻率。

  1. table.exec.mini-batch.enabled : true 
  2. -- 設定 60s 的觸發間隔 
  3. table.exec.mini-batch.allow-latency : 60s 
  4. table.exec.mini-batch.size : 10000000000 

任務 plan。

1

2.5.問題場景

這段 SQL 跑了 n 年都沒有問題,但是有一天運營在配置【等級中文映射_UDF】時,不小心將一個等級的中文名給映射錯了,雖然馬上恢復了,但是當天的實時數據和離線數據對比后卻發現,實時產出的數值比離線大很多!!!而之前都是保持一致的。

3.問題排查篇-坑的排查過程

首先我們想一下,這個指標是算 uv 的,運營將等級中文名配置錯了,也應該是把原有等級的最終結果算少啊,怎么會算多呢???

然后我們將場景復現了下,來看看代碼:

任務代碼,大家可以直接 copy 到本地運行:

  1. public class Test { 
  2.  
  3.     public static void main(String[] args) throws Exception { 
  4.  
  5.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  6.  
  7.         env.setParallelism(1); 
  8.  
  9.         EnvironmentSettings settings = EnvironmentSettings 
  10.                 .newInstance() 
  11.                 .useBlinkPlanner() 
  12.                 .inStreamingMode().build(); 
  13.  
  14.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); 
  15.  
  16.         // 模擬輸入 
  17.         DataStream<Tuple3<String, Long, Long>> tuple3DataStream = 
  18.                 env.fromCollection(Arrays.asList( 
  19.                         Tuple3.of("2", 1L, 1627218000000L), 
  20.                         Tuple3.of("2", 101L, 1627218000000L + 6000L), 
  21.                         Tuple3.of("2", 201L, 1627218000000L + 7000L), 
  22.                         Tuple3.of("2", 301L, 1627218000000L + 7000L))); 
  23.         // 分桶取模 udf 
  24.         tEnv.registerFunction("mod", new Mod_UDF()); 
  25.  
  26.         // 中文映射 udf 
  27.         tEnv.registerFunction("status_mapper", new StatusMapper_UDF()); 
  28.  
  29.         tEnv.createTemporaryView("source_db.source_table", tuple3DataStream, 
  30.                 "status, id, timestamp"); 
  31.  
  32.         String sql = "WITH detail_tmp AS (\n" 
  33.                 + "  SELECT\n" 
  34.                 + "    status,\n" 
  35.                 + "    id,\n" 
  36.                 + "    `timestamp`\n" 
  37.                 + "  FROM\n" 
  38.                 + "    (\n" 
  39.                 + "      SELECT\n" 
  40.                 + "        status,\n" 
  41.                 + "        id,\n" 
  42.                 + "        `timestamp`,\n" 
  43.                 + "        row_number() over(\n" 
  44.                 + "          PARTITION by id\n" 
  45.                 + "          ORDER BY\n" 
  46.                 + "            `timestamp` DESC\n" 
  47.                 + "        ) AS rn\n" 
  48.                 + "      FROM source_db.source_table" 
  49.                 + "    )\n" 
  50.                 + "  WHERE\n" 
  51.                 + "    rn = 1\n" 
  52.                 + ")\n" 
  53.                 + "SELECT\n" 
  54.                 + "  DIM.status_new as status,\n" 
  55.                 + "  sum(part_uv) as uv\n" 
  56.                 + "FROM\n" 
  57.                 + "  (\n" 
  58.                 + "    SELECT\n" 
  59.                 + "      status,\n" 
  60.                 + "      count(distinct id) as part_uv\n" 
  61.                 + "    FROM\n" 
  62.                 + "      detail_tmp\n" 
  63.                 + "    GROUP BY\n" 
  64.                 + "      status,\n" 
  65.                 + "      mod(id, 100)\n" 
  66.                 + "  )\n" 
  67.                 + "LEFT JOIN LATERAL TABLE(status_mapper(status)) AS DIM(status_new) ON TRUE\n" 
  68.                 + "GROUP BY\n" 
  69.                 + "  DIM.status_new"
  70.  
  71.         Table result = tEnv.sqlQuery(sql); 
  72.  
  73.         tEnv.toRetractStream(result, Row.class).print(); 
  74.  
  75.         env.execute(); 
  76.     } 
  77.  

UDF 代碼:

  1. public class StatusMapper_UDF extends TableFunction<String> { 
  2.  
  3.     public void eval(String status) { 
  4.         if (status.equals("1")) { 
  5.             collector.collect("等級1"); 
  6.         } else if (status.equals("2")) { 
  7.             collector.collect("等級2"); 
  8.         } else if (status.equals("3")) { 
  9.             collector.collect("等級3"); 
  10.         } 
  11.     } 
  12.  

在正確情況(模擬 UDF 沒有任何變動的情況下)的輸出結果:

  1. (true,等級2,1) 
  2. (false,等級2,1) 
  3. (true,等級2,2) 
  4. (false,等級2,2) 
  5. (true,等級2,3) 
  6. (false,等級2,3) 
  7. (true,等級2,4) 

最終等級2 的 uv 數為 4,結果復合預期?。

模擬下用戶修改了 udf 配置之后,UDF 代碼如下:

  1. public class StatusMapper_UDF extends TableFunction<String> { 
  2.  
  3.     private int i = 0; 
  4.  
  5.     public void eval(String status) { 
  6.  
  7.         if (i == 5) { 
  8.             collect("等級4"); 
  9.         } else { 
  10.             if ("1".equals(status)) { 
  11.                 collector.collect("等級1"); 
  12.             } else if ("2".equals(status)) { 
  13.                 collector.collect("等級2"); 
  14.             } else if ("3".equals(status)) { 
  15.                 collector.collect("等級3"); 
  16.             } 
  17.         } 
  18.         i++; 
  19.     } 
  20.  

得到的結果如下:

  1. (true,等級2,1) 
  2. (false,等級2,1) 
  3. (true,等級2,2) 
  4. (false,等級2,2) 
  5. (true,等級2,3) 
  6. (false,等級2,3) 
  7. (true,等級2,7) 

最終等級2 的 uv 數為 7,很明顯這是錯誤結果?。

因此可以確定是由于這個 UDF 的處理邏輯變換而導致的結果出現錯誤。

下文就讓我們來分析下其中緣由。

問題原理解析篇-導致問題的機制是什么

我們首先來分析下上述 SQL,可以發現整個 flink sql 任務是使用了 unbounded + minibatch 實現的,在 minibatch 觸發條件觸發時,上游算子會將之前的結果撤回,然后將最新的結果發出。

這個任務的 execution plan 如圖所示。

7

可以從算子圖中的一些計算邏輯可以看到,整個任務都是基于 retract 機制運行(count_retract、sum_retract 等)。

而涉及到 udf 的核心邏輯是在 Operator(ID = 7),和 Operator(ID = 12) 之間。當 Operator(ID = 7) GroupAggregate 結果發生改變之后,會發一條「撤回消息(做減法)」,一條**最新的結果消息(做加法)**到 Operator(ID = 12) GroupAggregate。

5

Notes:簡單解釋下上面說的「撤回消息(做減法)」,「最新的結果消息(做加法)」。舉個算 count 的例子:當整個任務的第一條數據來之后,之前沒有數據,所以不用撤回,結果就是 0(沒有數據) + 1(第一條數據) = 1(結果),當第二條結果來之后,就要將上次發的 1 消息(可以理解為是整個任務的一個中間結果)撤回,將最新的結果 2 發下去。那么計算方法就是 1(上次的結果) - 1(撤回) + 2(當前最新的結果消息)= 2(結果)。

通過算子圖可以發現,【中文名稱映射】UDF 是處于兩個 GroupAggregate 之間的。也就是說 Operator(ID = 7) GroupAggregate 發出的「撤回消息(做減法)」,**最新的結果消息(做加法)「都會執行這個 UDF,那么就有可能」撤回消息(做減法)「中的某個作為下游 GroupAggregate 算子 key 的字段會被更改成其他值,那么這條消息就不會發到原來下游 GroupAggregate 算子的原始 key 中,那么原來的 key 的歷史結果就撤回不了了。。。但是」最新的結果消息(做加法)**的字段沒有被更改時,那么這個消息依然被發到了下游 GroupAggregate 算子,這就會導致沒做減法,卻做了加法,就會導致結果增加,如下圖所示。

從這個角度出發,我們來分析下上面的 case,從內層發給外層的消息一條一條來分析。

內層消息怎么來看呢?其實就是將上面的 SQL 中的 left join 刪除,重新跑一遍就可以得到結果,結果如下:

  1. (true,等級2,1) 
  2. (false,等級2,1) 
  3. (true,等級2,2) 
  4. (false,等級2,2) 
  5. (true,等級2,3) 
  6. (false,等級4,3) 
  7. (true,等級2,4) 

來分析下內層消息發出之后對應到外層消息的操作:

內層 外層
(true,等級2,1) (true,等級2,1)
(false,等級2,1) (false,等級2,1)
(true,等級2,2) (true,等級2,2)
(false,等級2,2) (false,等級2,2)
(true,等級2,3) (true,等級2,3)

前五條消息不會導致錯誤,不用詳細說明。

內層 外層
(true,等級2,1) (true,等級2,1)
(false,等級2,1) (false,等級2,1)
(true,等級2,2) (true,等級2,2)
(false,等級2,2) (false,等級2,2)
(true,等級2,3) (true,等級2,3)
(false,等級4,3)  

第六條消息發出之后,經過 udf 的處理之后,中文名被映射成了【等級4】,而其再通過 hash partition 策略向下發送消息時,就不能將這條撤回消息發到原本 key 為【等級2】的算子中了,這條撤回消息也無法被處理了。

內層 外層
(true,等級2,1) (true,等級2,1)
(false,等級2,1) (false,等級2,1)
(true,等級2,2) (true,等級2,2)
(false,等級2,2) (false,等級2,2)
(true,等級2,3) (true,等級2,3)
(false,等級4,3)  
(true,等級2,4) (false,等級2,3) (true,等級2,7)

第七條消息 (true,等級2,4) 發出后,外層 GroupAggregate 算子首先會將上次發出的記過撤回,即(false,等級2,3),然后將(true,等級2,4)累加到當前的記過上,即 3(上次結果)+ 4(這次最新的結果)= 7(結果)。就導致了上述的錯誤結果。

定位到問題原因之后,我們來看看怎么避免上述錯誤。

6.避坑篇-如何避免這種問題

6.1.從源頭避免

udf 這種映射維度的 udf 盡量在上線前就固定下來,避免后續更改造成的數據錯誤。

6.2.替換為 ScalarFunction 進行映射

  1. WITH detail_tmp AS ( 
  2.   SELECT 
  3.     status, 
  4.     id, 
  5.     `timestamp
  6.   FROM 
  7.     ( 
  8.       SELECT 
  9.         status, 
  10.         id, 
  11.         `timestamp`, 
  12.         row_number() over( 
  13.           PARTITION by id 
  14.           ORDER BY 
  15.             `timestampDESC 
  16.         ) AS rn 
  17.       FROM 
  18.         ( 
  19.           SELECT 
  20.             status, 
  21.             id, 
  22.             `timestamp
  23.           FROM 
  24.             source_db.source_table 
  25.         ) t1 
  26.     ) t2 
  27.   WHERE 
  28.     rn = 1 
  29. SELECT 
  30.   -- 在此處進行中文名稱映射 
  31.   等級中文映射_UDF(status) as status, 
  32.   sum(part_uv) as uv 
  33. FROM 
  34.   ( 
  35.     SELECT 
  36.       status, 
  37.       count(distinct id) as part_uv 
  38.     FROM 
  39.       detail_tmp 
  40.     GROUP BY 
  41.       status, 
  42.       mod(id, 100) 
  43.   ) 
  44. GROUP BY 
  45.   status 

還是剛剛的邏輯,剛剛的配方,我們先來看一下結果。

  1. public class StatusMapper_UDF extends ScalarFunction { 
  2.  
  3.     private int i = 0; 
  4.  
  5.     public String eval(String status) { 
  6.  
  7.         if (i == 5) { 
  8.             i++; 
  9.             return "等級4"
  10.         } else { 
  11.             i++; 
  12.             if ("1".equals(status)) { 
  13.                 return "等級1"
  14.             } else if ("2".equals(status)) { 
  15.                 return "等級2"
  16.             } else if ("3".equals(status)) { 
  17.                 return "等級3"
  18.             } 
  19.         } 
  20.         return "未知"
  21.     } 
  22.  

發現雖然依然會有 (false,等級4,3) 這樣的錯誤撤回數據(這是 udf 決定的,沒法避免),但是我們可以發現最終的結果是 (true,等級2,4),結果依然是正確的。

再來分析下問什么這種方式可以解決,如圖 plan。

6

發現映射 udf 算子所處的位置不在兩個 GroupAggregrate 之間了,因此在 retract 消息發送之后,不會被映射到錯誤其他 key 中,因此所有的 retract 消息都會正常處理。

7.展望篇-有什么機制可以根本避免這種情況

可以將「撤回消息(做減法)」,**最新的結果消息(做加法)**做成一個原子消息從上游發給下游,下游統一進行原子性處理,關聯 udf 時,也只對 group key 關聯一次即可。

 

責任編輯:武曉燕 來源: 大數據羊說
相關推薦

2020-09-15 08:46:26

Kubernetes探針服務端

2023-10-30 08:16:33

數據庫插件Mybatis

2021-10-28 19:10:02

Go語言編碼

2017-05-05 08:12:51

Spark共享變量

2018-07-12 14:59:44

獵網

2022-01-07 11:48:59

RabbitMQGolang 項目

2022-03-21 08:55:53

RocketMQ客戶端過濾機制

2022-07-26 01:00:12

Eureka延遲注冊

2015-09-07 10:15:53

移動端開發

2023-01-18 23:20:25

編程開發

2024-04-10 08:39:56

BigDecimal浮點數二進制

2019-12-12 14:32:26

SQL語句數據庫

2023-02-20 08:11:04

2021-06-30 13:31:18

線程安全ThreadLocal

2023-06-30 08:10:14

JavaBigDecimal

2020-08-20 07:38:51

Java字符串整形

2023-09-22 11:29:11

JavasubList

2021-05-27 22:46:00

Nacos Clien版本Nacos

2017-07-17 15:46:20

Oracle并行機制

2022-07-06 11:47:27

JAVAfor循環
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产丝袜一区二区三区免费视频 | 中文字幕一区二区三区在线观看 | 91看片官网 | 看av片网站 | 日韩欧美中文字幕在线观看 | 围产精品久久久久久久 | 最新国产视频 | 免费亚洲网站 | 国产视频一区二区三区四区五区 | 欧美久久久 | 欧美日一区二区 | 亚洲精品色 | 免费毛片网 | 超碰电影 | 午夜国产| m豆传媒在线链接观看 | 在线免费观看视频你懂的 | 国产伦精品一区二区三区四区视频 | 亚洲九九 | 韩日有码 | 国产视频黄色 | 91av在线免费 | 日本在线一二 | 日韩精品在线看 | 国产精品亚洲二区 | 人人干天天干 | 91福利电影在线观看 | 免费一级欧美在线观看视频 | 国产精品视频一区二区三区四蜜臂 | 午夜a√ | 国产乱码精品一区二区三区中文 | www视频在线观看 | 国产 亚洲 网红 主播 | 日韩视频免费 | 能看的av | 黄色大片毛片 | 国内自拍视频在线观看 | 亚洲视频在线观看 | 午夜影院在线观看 | 国产日韩一区二区三区 | 九九伦理片 |