成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink SQL 知其所以然之維表 Join 的性能優化之路(上)附源碼

運維 數據庫運維
維表作為 sql 任務中一種常見表的類型,其本質就是關聯表數據的額外數據屬性,通常在 join 語句中進行使用。比如源數據有人的 id,你現在想要得到人的性別、年齡,那么可以通過用戶 id 去關聯人的性別、年齡,就可以得到更全的數據。

[[437676]]

本文轉載自微信公眾號「大數據羊說」,作者antigeneral了呀 。轉載本文請聯系大數據羊說公眾號。

1.序篇

源碼公眾號后臺回復1.13.2 sql lookup join獲取。

廢話不多說,咱們先直接上本文的目錄和結論,小伙伴可以先看結論快速了解博主期望本文能給小伙伴們帶來什么幫助:

  • 背景及應用場景介紹:博主期望你能了解到,flink sql 提供了輕松訪問外部存儲的 lookup join(與上節不同,上節說的是流與流的 join)。lookup join 可以簡單理解為使用 flatmap 訪問外部存儲數據然后將維度字段拼接到當前這條數據上面
  • 來一個實戰案例:博主以曝光用戶日志流關聯用戶畫像(年齡、性別)維表為例介紹 lookup join 應該達到的關聯的預期效果。
  • flink sql lookup join 的解決方案以及原理的介紹:主要介紹 lookup join 的在上述實戰案例的 sql 寫法,博主期望你能了解到,lookup join 是基于處理時間的,并且 lookup join 經常會由于訪問外部存儲的 qps 過高而導致背壓,產出延遲等性能問題。我們可以借鑒在 DataStream api 中的維表 join 優化思路在 flink sql 使用 local cache,異步訪問維表,批量訪問維表三種方式去解決性能問題。
  • 總結及展望:官方并沒有提供 批量訪問維表 的能力,因此博主自己實現了一套,具體使用方式和原理實現敬請期待下篇文章。

2.背景及應用場景介紹

維表作為 sql 任務中一種常見表的類型,其本質就是關聯表數據的額外數據屬性,通常在 join 語句中進行使用。比如源數據有人的 id,你現在想要得到人的性別、年齡,那么可以通過用戶 id 去關聯人的性別、年齡,就可以得到更全的數據。

維表 join 在離線數倉中是最常見的一種數據處理方式了,在實時數倉的場景中,flink sql 目前也支持了維表的 join,即 lookup join,生產環境可以用 mysql,redis,hbase 來作為高速維表存儲引擎。

Notes:

在實時數倉中,常用實時維表有兩種更新頻率

實時的更新:維度信息是實時新建的,實時寫入到高速存儲引擎中。然后其他實時任務在做處理時實時的關聯這些維度信息。

周期性的更新:對于一些緩慢變化維度,比如年齡、性別的用戶畫像等,幾萬年都不變化一次的東西??,實時維表的更新可以是小時級別,天級別的。

3.來一個實戰案例

來看看在具體場景下,對應輸入值的輸出值應該長啥樣。

需求指標:使用曝光用戶日志流(show_log)關聯用戶畫像維表(user_profile)關聯到用戶的維度之后,提供給下游計算分性別,年齡段的曝光用戶數使用。此處我們只關心關聯維表這一部分的輸入輸出數據。

來一波輸入數據:

曝光用戶日志流(show_log)數據(數據存儲在 kafka 中):

log_id timestamp user_id
1 2021-11-01 00:01:03 a
2 2021-11-01 00:03:00 b
3 2021-11-01 00:05:00 c
4 2021-11-01 00:06:00 b
5 2021-11-01 00:07:00 c

用戶畫像維表(user_profile)數據(數據存儲在 redis 中):

user_id(主鍵) age sex
a 12-18
b 18-24
c 18-24

注意:redis 中的數據結構存儲是按照 key,value 去存儲的。其中 key 為 user_id,value 為 age,sex 的 json。如下圖所示:

user_profile redis

預期輸出數據如下:

log_id timestamp user_id age sex
1 2021-11-01 00:01:03 a 12-18
2 2021-11-01 00:03:00 b 18-24
3 2021-11-01 00:05:00 c 18-24
4 2021-11-01 00:06:00 b 18-24
5 2021-11-01 00:07:00 c 18-24  

flink sql lookup join 登場。下面是官網的鏈接。

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join

4.flink sql lookup join

4.1.lookup join 定義

以上述案例來說,lookup join 其實簡單理解來,就是每來一條數據去 redis 里面摟一次數據。然后把關聯到的維度數據給拼接到當前數據中。

熟悉 DataStream api 的小伙伴萌,簡單來理解,就是 lookup join 的算子就是 DataStream api 中的 flatmap 算子中處理每一條來的數據,針對每一條數據去訪問用戶畫像的 redis。(實際上,flink sql api 中也確實是這樣實現的!sql 生成的 lookup join 代碼就是繼承了 flatmap)

4.2.上述案例解決方案

來看看上述案例的 flink sql lookup join sql 怎么寫:

  1. CREATE TABLE show_log ( 
  2.     log_id BIGINT
  3.     `timestampas cast(CURRENT_TIMESTAMP as timestamp(3)), 
  4.     user_id STRING, 
  5.     proctime AS PROCTIME() 
  6. WITH ( 
  7.   'connector' = 'datagen'
  8.   'rows-per-second' = '10'
  9.   'fields.user_id.length' = '1'
  10.   'fields.log_id.min' = '1'
  11.   'fields.log_id.max' = '10' 
  12. ); 
  13.  
  14. CREATE TABLE user_profile ( 
  15.     user_id STRING, 
  16.     age STRING, 
  17.     sex STRING 
  18.     ) WITH ( 
  19.   'connector' = 'redis'
  20.   'hostname' = '127.0.0.1'
  21.   'port' = '6379'
  22.   'format' = 'json'
  23.   'lookup.cache.max-rows' = '500'
  24.   'lookup.cache.ttl' = '3600'
  25.   'lookup.max-retries' = '1' 
  26. ); 
  27.  
  28. CREATE TABLE sink_table ( 
  29.     log_id BIGINT
  30.     `timestampTIMESTAMP(3), 
  31.     user_id STRING, 
  32.     proctime TIMESTAMP(3), 
  33.     age STRING, 
  34.     sex STRING 
  35. WITH ( 
  36.   'connector' = 'print' 
  37. ); 
  38.  
  39. -- lookup join 的 query 邏輯 
  40. INSERT INTO sink_table 
  41. SELECT  
  42.     s.log_id as log_id 
  43.     , s.`timestampas `timestamp
  44.     , s.user_id as user_id 
  45.     , s.proctime as proctime 
  46.     , u.sex as sex 
  47.     , u.age as age 
  48. FROM show_log AS s 
  49. LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u 
  50. ON s.user_id = u.user_id 

這里使用了 for SYSTEM_TIME as of 時態表的語法來作為維表關聯的標識語法。

Notes:

實時的 lookup 維表關聯能使用處理時間去做關聯。

運行結果如下:

log_id timestamp user_id age sex
1 2021-11-01 00:01:03 a 12-18
2 2021-11-01 00:03:00 b 18-24
3 2021-11-01 00:05:00 c 18-24
4 2021-11-01 00:06:00 b 18-24
5 2021-11-01 00:07:00 c 18-24

flink web ui 算子圖如下:

flink web ui

但是!!!但是!!!但是!!!

flink 官方并沒有提供 redis 的維表 connector 實現。

沒錯,博主自己實現了一套。關于 redis 維表的 connector 實現,直接參考下面的文章。都是可以從 github 上找到源碼拿來用的!

flink sql 知其所以然(二)| 自定義 redis 數據維表(附源碼)

4.3.關于維表使用的一些注意事項

同一條數據關聯到的維度數據可能不同:實時數倉中常用的實時維表都是在不斷的變化中的,當前流表數據關聯完維表數據后,如果同一個 key 的維表的數據發生了變化,已關聯到的維表的結果數據不會再同步更新。舉個例子,維表中 user_id 為 1 的數據在 08:00 時 age 由 12-18 變為了 18-24,那么當我們的任務在 08:01 failover 之后從 07:59 開始回溯數據時,原本應該關聯到 12-18 的數據會關聯到 18-24 的 age 數據。這是有可能會影響數據質量的。所以小伙伴萌在評估你們的實時任務時要考慮到這一點。

會發生實時的新建及更新的維表博主建議小伙伴萌應該建立起數據延遲的監控機制,防止出現流表數據先于維表數據到達,導致關聯不到維表數據

4.4.再說說維表常見的性能問題及優化思路

所有的維表性能問題都可以總結為:高 qps 下訪問維表存儲引擎產生的任務背壓,數據產出延遲問題。

舉個例子:

  • 在沒有使用維表的情況下:一條數據從輸入 flink 任務到輸出 flink 任務的時延假如為 0.1 ms,那么并行度為 1 的任務的吞吐可以達到 1 query / 0.1 ms = 1w qps。
  • 在使用維表之后:每條數據訪問維表的外部存儲的時長為 2 ms,那么一條數據從輸入 flink 任務到輸出 flink 任務的時延就會變成 2.1 ms,那么同樣并行度為 1 的任務的吞吐只能達到 1 query / 2.1 ms = 476 qps。兩者的吞吐量相差 21 倍。

這就是為什么維表 join 的算子會產生背壓,任務產出會延遲。

那么當然,解決方案也是有很多的。拋開 flink sql 想一下,如果我們使用 DataStream api,甚至是在做一個后端應用,需要訪問外部存儲時,常用的優化方案有哪些?這里列舉一下:

按照 redis 維表的 key 分桶 + local cache:通過按照 key 分桶的方式,讓大多數據的維表關聯的數據訪問走之前訪問過得 local cache 即可。這樣就可以把訪問外部存儲 2.1 ms 處理一個 query 變為訪問內存的 0.1 ms 處理一個 query 的時長。

異步訪問外存:DataStream api 有異步算子,可以利用線程池去同時多次請求維表外部存儲。這樣就可以把 2.1 ms 處理 1 個 query 變為 2.1 ms 處理 10 個 query。吞吐可變優化到 10 / 2.1 ms = 4761 qps。

批量訪問外存:除了異步訪問之外,我們還可以批量訪問外部存儲。舉一個例子:在訪問 redis 維表的 1 query 占用 2.1 ms 時長中,其中可能有 2 ms 都是在網絡請求上面的耗時 ,其中只有 0.1 ms 是 redis server 處理請求的時長。那么我們就可以使用 redis 提供的 pipeline 能力,在客戶端(也就是 flink 任務 lookup join 算子中),攢一批數據,使用 pipeline 去同時訪問 redis sever。這樣就可以把 2.1 ms 處理 1 個 query 變為 7ms(2ms + 50 * 0.1ms) 處理 50 個 query。吞吐可變為 50 query / 7 ms = 7143 qps。博主這里測試了下使用 redis pipeline 和未使用的時長消耗對比。如下圖所示。

redis pipeline

博主認為上述優化效果中,最好用的是 1 + 3,2 相比 3 還是一條一條發請求,性能會差一些。

既然 DataStream 可以這樣做,flink sql 必須必的也可以借鑒上面的這些優化方案。具體怎么操作呢?看下文騷操作

4.5.lookup join 的具體性能優化方案

按照 redis 維表的 key 分桶 + local cache:sql 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udaf 中做訪問 redis 處理,并且 udaf 產出的結果只能是一條,所以這種實現起來非常復雜。我們選擇不做 keyby 分桶。但是我們可以直接使用 local cache 去做本地緩存,雖然【直接緩存】的效果比【先按照 key 分桶再做緩存】的效果差,但是也能一定程度上減少訪問 redis 壓力。在博主實現的 redis connector 中,內置了 local cache 的實現,小伙伴萌可以參考下面這部篇文章進行配置。

異步訪問外存:目前博主實現的 redis connector 不支持異步訪問,但是官方實現的 hbase connector 支持這個功能,參考下面鏈接文章的,點開之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/

批量訪問外存:這玩意官方必然沒有實現啊,但是,但是,但是,經過博主周末兩天的瘋狂 debug,改了改源碼,搞定了基于 redis 的批量訪問外存優化的功能。

4.6.基于 redis connector 的批量訪問機制優化

先描述一下大概是個什么東西,具體怎么用。

你只需要在 StreamTableEnvironment 中的 table config 配置上 is.dim.batch.mode 為 true,sql 不用做任何改動的情況下,flink lookup join 算子會自動優化,優化效果如下:

lookup join 算子的每個 task 上,每攢夠 30 條數據 or 每隔五秒(處理時間) 去觸發一次批量訪問 redis 的請求,使用的是 jedis client 的 pipeline 功能訪問 redis server。實測性能有很大提升。

關于這個批量訪問機制的優化介紹和使用方式介紹,小伙伴們先別急,下篇文章會詳細介紹到。

5.總結與展望

源碼公眾號后臺回復1.13.2 sql lookup join獲取。

本文主要介紹了 flink sql lookup join 的使用方式,并介紹了一些經常出現的性能問題以及優化思路,總結如下:

背景及應用場景介紹:博主期望你能了解到,flink sql 提供了輕松訪問外部存儲的 lookup join(與上節不同,上節說的是流與流的 join)。lookup join 可以簡單理解為使用 flatmap 訪問外部存儲數據然后將維度字段拼接到當前這條數據上面

來一個實戰案例:博主以曝光用戶日志流關聯用戶畫像(年齡、性別)維表為例介紹 lookup join 應該達到的關聯的預期效果。

flink sql lookup join 的解決方案以及原理的介紹:主要介紹 lookup join 的在上述實戰案例的 sql 寫法,博主期望你能了解到,lookup join 是基于處理時間的,并且 lookup join 經常會由于訪問外部存儲的 qps 過高而導致背壓,產出延遲等性能問題。我們可以借鑒在 DataStream api 中的維表 join 優化思路在 flink sql 使用 local cache,異步訪問維表,批量訪問維表三種方式去解決性能問題。

 

總結及展望:官方并沒有提供 批量訪問維表 的能力,因此博主自己實現了一套,具體使用方式和原理實現敬請期待下篇文章。

 

責任編輯:武曉燕 來源: 大數據羊說
相關推薦

2021-11-27 09:03:26

flink join數倉

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2021-11-28 11:36:08

SQL Flink Join

2021-12-09 06:59:24

FlinkSQL 開發

2022-05-18 09:02:28

Flink SQLSQL字符串

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-05 08:28:39

Flink SQLbatch lookuSQL

2022-05-15 09:57:59

Flink SQL時間語義

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2022-06-06 09:27:23

FlinkSQLGroup

2022-06-18 09:26:00

Flink SQLJoin 操作

2021-12-13 07:57:47

Flink SQL Flink Hive Udf

2022-06-29 09:01:38

FlinkSQL時間屬性

2022-05-27 09:02:58

SQLHive語義

2022-05-12 09:02:47

Flink SQL數據類型

2022-05-09 09:03:04

SQL數據流數據

2022-08-10 10:05:29

FlinkSQL

2021-12-06 07:15:47

開發Flink SQL

2021-12-17 07:54:16

Flink SQLTable DataStream
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久精品视频12 | 欧美伊人久久久久久久久影院 | 国产三区视频在线观看 | 国产精品九九视频 | 亚洲精品不卡 | 亚洲一区二区在线视频 | 最新av中文字幕 | 日韩精品一区二区三区中文字幕 | 国产精品国产成人国产三级 | 亚洲 欧美 日韩在线 | 色橹橹欧美在线观看视频高清 | 国产一级片免费视频 | 亚洲一区二区电影网 | 亚洲在线一区 | 91啪影院 | 欧美一区二区网站 | 成人免费大片黄在线播放 | 久久久久久久久久一区二区 | 成人不卡一区二区 | 国产午夜在线观看 | 久久精品天堂 | 亚洲视频一区 | 黄色毛片视频 | 99综合| 一区二区三区四区不卡视频 | 黄色成人av | 一区二区精品 | 黄色国产在线视频 | 日韩免费激情视频 | 福利片在线看 | 国产精品久久国产精品久久 | 国产免费播放视频 | 久久久久久91香蕉国产 | 国产在线中文字幕 | 亚洲免费网| 成人久久一区 | 综合久久亚洲 | 国产日韩一区二区三免费高清 | 天天操天天舔 | 妞干网福利视频 | 久久精品亚洲 |