成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink常見維表Join方案,收藏學習開發很有用!

大數據
由于維表是一張不斷變化的表(靜態表只是動態表的一種特例)。那如何 JOIN 一張不斷變化的表呢?如果用傳統的 JOIN 語法來表達維表 JOIN,是不完整的。因為維表是一直在更新變化的,如果用這個語法那么關聯上的是哪個時刻的維表呢?我們是不知道的,結果是不確定的。

本文轉載自微信公眾號「大數據左右手」,作者左右 。轉載本文請聯系大數據左右手公眾號。

前言

實時數倉,難免會遇到join維表的業務。現總結幾種方案,供各位看官選擇:

  • 查找關聯(同步,異步)
  • 狀態編程,預加載數據到狀態中,按需取
  • 冷熱數據
  • 廣播維表
  • Temporal Table Join
  • Lookup Table Join

其中中間留下兩個問題,供大家思考,可留言一起討論?

查找關聯

查找關聯就是在主流數據中直接訪問外部數據(mysql,redis,impala ...)去根據主鍵或者某種關鍵條件去關聯取值。

適合: 維表數據量大,但是主數據不大的業務實時計算。

缺點:數據量大的時候,會給外部數據源庫帶來很大的壓力,因為某條數據都需要關聯。

同步

訪問數據庫是同步調用,導致 subtak 線程會被阻塞,影響吞吐量

  1. import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} 
  2. import com.wang.stream.env.{FlinkStreamEnv, KafkaSourceEnv} 
  3. import org.apache.flink.api.common.functions.FlatMapFunction 
  4. import org.apache.flink.api.common.serialization.SimpleStringSchema 
  5. import org.apache.flink.streaming.api.scala._ 
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer 
  7. import org.apache.flink.util.Collector 
  8.  
  9.  def analyses(): Unit ={ 
  10.     val env: StreamExecutionEnvironment = FlinkStreamEnv.get() 
  11.     KafkaSourceEnv.getKafkaSourceStream(env,List("test")) 
  12.       .map(JSON.parseObject(_)) 
  13.       .filter(_!=null
  14.       .flatMap( 
  15.         new FlatMapFunction[JSONObject,String] { 
  16.           override def flatMap(jSONObject: JSONObject, collector: Collector[String]): Unit = { 
  17.             // 如果topic就一張表,不用區分,如果多張表,可以通過database 與 table 區分,放到下一步去處理 
  18.             // 表的名字 
  19.             val databaseName:String = jSONObject.getString("database"
  20.             // 表的名字 
  21.             val tableName:String = jSONObject.getString("table"
  22.             // 數據操作類型 INSERT UPDATE DELETE 
  23.             val operationType:String = jSONObject.getString("type"
  24.             // 主體數據 
  25.             val tableData: JSONArray = jSONObject.getJSONArray("data"
  26.             // old 值 
  27.             val old: JSONArray = jSONObject.getJSONArray("old"
  28.             // canal json 可能存在批處理出現data數據多條 
  29.             for (i <- 0 until tableData.size()) { 
  30.               val data: String = tableData.get(i).toString 
  31.               val nObject: JSONObject = JSON.parseObject(data) 
  32.               val orderId: AnyRef = nObject.get("order_id"
  33.               // 下面寫(mysql,redis或者hbase)的連接,利用api 通過orderId查找 
  34.                
  35.               // 最后封裝數據格式 就是join所得 
  36.               collector.collect(null
  37.             } 
  38.           } 
  39.         } 
  40.       ) 
  41.       .addSink( 
  42.         new FlinkKafkaProducer[String]( 
  43.           ""
  44.           ""
  45.           new SimpleStringSchema() 
  46.         ) 
  47.       ) 
  48.     env.execute(""

異步

AsyncIO 可以并發地處理多個請求,很大程度上減少了對 subtask 線程的阻塞。

  1. def analyses(): Unit ={ 
  2.     val env: StreamExecutionEnvironment = FlinkStreamEnv.get() 
  3.     val source: DataStream[String] = KafkaSourceEnv.getKafkaSourceStream(env, List("test")) 
  4.       .map(JSON.parseObject(_)) 
  5.       .filter(_ != null
  6.       .flatMap( 
  7.         new FlatMapFunction[JSONObject, String] { 
  8.           override def flatMap(jSONObject: JSONObject, collector: Collector[String]): Unit = { 
  9.             // 如果topic就一張表,不用區分,如果多張表,可以通過database 與 table 區分,放到下一步去處理 
  10.             // 表的名字 
  11.             val databaseName: String = jSONObject.getString("database"
  12.             // 表的名字 
  13.             val tableName: String = jSONObject.getString("table"
  14.             // 數據操作類型 INSERT UPDATE DELETE 
  15.             val operationType: String = jSONObject.getString("type"
  16.             // 主體數據 
  17.             val tableData: JSONArray = jSONObject.getJSONArray("data"
  18.             // old 值 
  19.             val old: JSONArray = jSONObject.getJSONArray("old"
  20.             // canal json 可能存在批處理出現data數據多條 
  21.             for (i <- 0 until tableData.size()) { 
  22.               val data: String = tableData.get(i).toString 
  23.               collector.collect(data) 
  24.             } 
  25.           } 
  26.         } 
  27.       ) 
  28.        
  29.     AsyncDataStream.unorderedWait( 
  30.       source, 
  31.       new RichAsyncFunction[String,String] {//自定義的數據源異步處理類 
  32.         override def open(parameters: Configuration): Unit = { 
  33.           // 初始化 
  34.         } 
  35.         override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = { 
  36.            
  37.           // 將數據搜集 
  38.           resultFuture.complete(null
  39.        } 
  40.  
  41.         override def close(): Unit = { 
  42.           // 關閉 
  43.         } 
  44.     }, 
  45.     1000,//異步超時時間 
  46.     TimeUnit.MILLISECONDS,//時間單位 
  47.     100)//最大異步并發請求數量 
  48.      .addSink( 
  49.         new FlinkKafkaProducer[String]( 
  50.           ""
  51.           ""
  52.           new SimpleStringSchema() 
  53.         ) 
  54.       ) 
  55.  
  56.     env.execute(""
  57.   } 

狀態編程,預加載數據到狀態中,按需取

首先把維表數據初始化到state中,設置好更新時間,定時去把維表。

優點:flink 自己維護狀態數據,"榮辱與共",不需要頻繁鏈接外部數據源,達到解耦。

缺點:不適合大的維表和變化大的維表。

  1. .keyBy(_._1) 
  2. .process( 
  3.   new KeyedProcessFunction[String,(String,String,String,String,String), String]{ 
  4.     private var mapState:MapState[String,Map[String,String]] = _ 
  5.     private var first: Boolean = true 
  6.      
  7.     override def open(parameters: Configuration): Unit = { 
  8.       val config: StateTtlConfig = StateTtlConfig 
  9.         .newBuilder(org.apache.flink.api.common.time.Time.minutes(5)) 
  10.         .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) 
  11.         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) 
  12.         .build() 
  13.       val join = new MapStateDescriptor[String,Map[String,String]]("join",classOf[String],classOf[Map[String,String]]) 
  14.       join.enableTimeToLive(config) 
  15.       mapState = getRuntimeContext.getMapState(join
  16.     } 
  17.     override def processElement( 
  18.                                  in: (String, String, String, String, String), 
  19.                                  context: KeyedProcessFunction[String, (String, String, String, String, String),String]#Context, 
  20.                                  collector: Collector[String]): Unit = { 
  21.       // 加載維表 
  22.       if(first){ 
  23.         first = false 
  24.         val time: Long = System.currentTimeMillis() 
  25.         getSmallDimTableInfo() 
  26.         // 設置好更新時間,定時去把維表 
  27.         context.timerService().registerProcessingTimeTimer(time + 86400000) 
  28.       } 
  29.        
  30.       // 數據處理,過來一條條數據,然后按照自己的業務邏輯去取維表的數據即可 
  31.        
  32.       // 然后封裝 放到collect中 
  33.       collector.collect(null
  34.     } 
  35.  
  36.     override def onTimer( 
  37.                           timestamp: Long, 
  38.                           ctx: KeyedProcessFunction[String, (String, String, String, String, String),String]#OnTimerContext, 
  39.                           out: Collector[String]): Unit = { 
  40.       println("觸發器執行"
  41.       mapState.clear() 
  42.       getSmallDimTableInfo() 
  43.       println(mapState) 
  44.       ctx.timerService().registerProcessingTimeTimer(timestamp + 86400000) 
  45.     } 
  46.     def getSmallDimTableInfo(): Unit ={ 
  47.  
  48.       // 加載 字典數據 
  49.       val select_dictionary="select dic_code,pre_dictionary_id,dic_name from xxxx" 
  50.       val dictionary: util.List[util.Map[String, AnyRef]] = MysqlUtil.executeQuery(select_dictionary, null
  51.       dictionary.foreach(item=>{ 
  52.         mapState.put("dic_dictionary_"+item.get("pre_dictionary_id").toString,item) 
  53.       }) 
  54.  
  55.     } 
  56.   } 
  57. .filter(_!=null
  58. .addSink( 
  59.   new FlinkKafkaProducer[String]( 
  60.     ""
  61.     ""
  62.     new SimpleStringSchema() 
  63.   ) 
  64. v.execute(""

思考下:直接定義一個Map集合這樣的優缺點是什么?可以留言說出自己的看法?

冷熱數據

思想:先去狀態去取,如果沒有,去外部查詢,同時去存到狀態里面。StateTtlConfig 的過期時間可以設置短點。

優點:中庸取值方案,熱備常用數據到內存,也避免了數據join相對過多外部數據源。

缺點:也不能一勞永逸解決某些問題,熱備數據過多,或者冷數據過大,都會對state 或者 外部數據庫造成壓力。

  1. .filter(_._1 != null
  2. .keyBy(_._1) 
  3. .process( 
  4.   new KeyedProcessFunction[String,(String,String,String,String,String), String]{ 
  5.     private var mapState:MapState[String,Map[String,String]] = _ 
  6.     private var first: Boolean = true 
  7.     override def open(parameters: Configuration): Unit = { 
  8.       val config: StateTtlConfig = StateTtlConfig 
  9.         .newBuilder(org.apache.flink.api.common.time.Time.days(1)) 
  10.         .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) 
  11.         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) 
  12.         .build() 
  13.       val join = new MapStateDescriptor[String,Map[String,String]]("join",classOf[String],classOf[Map[String,String]]) 
  14.       join.enableTimeToLive(config) 
  15.       mapState = getRuntimeContext.getMapState(join
  16.     } 
  17.     override def processElement( 
  18.                                  in: (String, String, String, String, String), 
  19.                                  context: KeyedProcessFunction[String, (String, String, String, String, String),String]#Context, 
  20.                                  collector: Collector[String]): Unit = { 
  21.  
  22.       // 數據處理,過來一條條數據,然后按照自己的業務邏輯先去mapState去找,如果沒有再去 外部去找 
  23.       if (mapState.contains("xx_id")){ 
  24.         // 如果存在就取 
  25.  
  26.       }else
  27.         // 如果不存在去外部拿,然后放到mapState中 
  28.         val dim_sql="select dic_code,pre_dictionary_id,dic_name from xxxx where id=xx_id" 
  29.         val dim: util.List[util.Map[String, AnyRef]] = MysqlUtil.executeQuery(dim_sql, null
  30.         mapState.put("xx_id",null
  31.       } 
  32.       // 然后封裝 放到collect中 
  33.       collector.collect(null
  34.     } 
  35.   } 

廣播維表

比如上面提到的字典表,每一個Task都需要這份數據,那么需要join這份數據的時候就可以使用廣播維表。

  1. val dimStream=env.addSource(MysqlSource) 
  2.  
  3. // 廣播狀態 
  4. val broadcastStateDesc=new MapStateDescriptor[String,String]("broadcaststate", BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(Long.class, Dim.class)) 
  5.  
  6. // 廣播流 
  7. val broadStream=dimStream.broadcast() 
  8.  
  9. // 主數據流 
  10. val mainConsumer = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), kafkaConfig) 
  11.  
  12. val mainStream=env.addSource(mainConsumer) 
  13.  
  14. // 廣播狀態與維度表關聯 
  15. val connectedStream=mainStream.connect(broadStream).map(..User(id,name)).key(_.1) 
  16.  
  17. connectedStream.process(new KeyedBroadcastProcessFunction[String,User,Map[Long,Dim],String] { 
  18.  
  19.      override def processElement(value: User, ctx: KeyedBroadcastProcessFunction[String,User,Map[Long,Dim],String]#ReadOnlyContext, out: Collector[String]): Unit = { 
  20.       // 取到數據就可以愉快的玩耍了 
  21.      val state=ctx.getBroadcastState(broadcastStateDesc) 
  22.        xxxxxx 
  23.          
  24.   } 

「思考:」 如果把維表流也通過實時監控binlog到kafka,當維度數據發生變化時,更新放到狀態中,這種方式,是不是更具有時效性呢?

(1)通過canal把變更binlog方式發送到kafka中。

(2)數據流定義成為廣播流,廣播到數據到主數據流中。

(3)定義一個廣播狀態存儲數據,在主數據進行查找匹配,符合要求則join成功。

Temporal Table Join(FlinkSQL與Flink Table API)

由于維表是一張不斷變化的表(靜態表只是動態表的一種特例)。那如何 JOIN 一張不斷變化的表呢?如果用傳統的 JOIN 語法來表達維表 JOIN,是不完整的。因為維表是一直在更新變化的,如果用這個語法那么關聯上的是哪個時刻的維表呢?我們是不知道的,結果是不確定的。所以 Flink SQL 的維表 JOIN 語法引入了Temporal Table 的標準語法,用來聲明關聯的是維表哪個時刻的快照。

普通關聯會一直保留關聯雙側的數據,數據也就會一直膨脹,直到撐爆內存導致任務失敗,Temporal Join則可以定期清理過期數據,在合理的內存配置下即可避免內存溢出。

Event Time Temporal Join

語法

  1. SELECT [column_list] 
  2. FROM table1 [AS <alias1>] 
  3. [LEFTJOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>] 
  4. ON table1.column-name1 = table2.column-name1 

使用事件時間屬性(即行時間屬性),可以檢索過去某個時間點的鍵值。這允許在一個共同的時間點連接兩個表。

舉例

假設我們有一個訂單表,每個訂單都有不同貨幣的價格。為了將此表正確地規范化為單一貨幣,每個訂單都需要與下訂單時的適當貨幣兌換率相結合。

  1. CREATE TABLE orders ( 
  2.     order_id    STRING, 
  3.     price       DECIMAL(32,2), 
  4.     currency    STRING, 
  5.     order_time  TIMESTAMP(3), 
  6.     WATERMARK FOR order_time AS order_time 
  7. WITH (/* ... */); 
  8.  
  9.  
  10. CREATE TABLE currency_rates ( 
  11.     currency STRING, 
  12.     conversion_rate DECIMAL(32, 2), 
  13.     update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL 
  14.     WATERMARK FOR update_time AS update_time, 
  15.     PRIMARY KEY(currency) NOT ENFORCED 
  16. WITH ( 
  17.    'connector' = 'upsert-kafka'
  18.    /* ... */ 
  19. ); 
  20.  
  21. event-time temporal join需要temporal join條件的等價條件中包含的主鍵 
  22.  
  23. SELECT  
  24.      order_id, 
  25.      price, 
  26.      currency, 
  27.      conversion_rate, 
  28.      order_time, 
  29. FROM orders 
  30. LEFT JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time 
  31. ON orders.currency = currency_rates.currency 

Processing Time Temporal Join

處理時間時態表連接使用處理時間屬性將行與外部版本表中鍵的最新版本相關聯。

根據定義,使用processing-time屬性,連接將始終返回給定鍵的最新值。可以將查找表看作是一個簡單的HashMap,它存儲來自構建端的所有記錄。這種連接的強大之處在于,當在Flink中無法將表具體化為動態表時,它允許Flink直接針對外部系統工作。

使用FOR SYSTEM_TIME AS OF table1.proctime表示當左邊表的記錄與右邊的維表join時,只匹配當前處理時間維表所對應的的快照數據。

Lookup Table Join

Lookup Join 通常用于通過連接外部表(維度表)補充信息,要求一個表具有處理時間屬性,另一個表使 Lookup Source Connector。

JDBC 連接器可以用在時態表關聯中作為一個可 lookup 的 source (又稱為維表)。用到的語法是 Temporal Joins 的語法。

  1. s""
  2.         |CREATE TABLE users( 
  3.         |id int
  4.         |name string, 
  5.         |PRIMARY KEY (id) NOT ENFORCED 
  6.         |) 
  7.         |WITH ( 
  8.         |'connector' = 'jdbc'
  9.         |'url' = 'xxxx'
  10.         |'driver'='$DRIVER_CLASS_NAME'
  11.         |'table-name'='$tableName'
  12.         |'lookup.cache.max-rows'='100'
  13.         |'lookup.cache.ttl'='30s' 
  14.         |) 
  15.         |""".stripMargin 
  16.          
  17.          
  18. s""
  19.        |CREATE TABLE car( 
  20.        |`id`   bigint , 
  21.        |`user_id` bigint
  22.        |`proctime` as PROCTIME() 
  23.        |) 
  24.        |WITH ( 
  25.        |    'connector' = 'kafka'
  26.        |    'topic' = '$topic'
  27.        |    'scan.startup.mode' = 'latest-offset'
  28.        |    'properties.bootstrap.servers' = '$KAFKA_SERVICE'
  29.        |    'properties.group.id' = 'indicator'
  30.        |    'format' = 'canal-json' 
  31.        |) 
  32.        |""".stripMargin 
  33.         
  34.         
  35.         
  36.     SELECT 
  37.         mc.user_id user_id, 
  38.         count(1) AS `value` 
  39.     FROM car mc 
  40.         inner join users FOR SYSTEM_TIME AS OF mc.proctime as u on mc.user_id=s.id 
  41.     group by mc.user_id 

總結

總體來講,關聯維表有四個基礎的方式:

(1)查找外部數據源關聯

(2)預加載維表關聯(內存,狀態)

(3)冷熱數據儲備(算是1和2的結合使用)

(4)維表變更日志關聯(廣播也好,其他方式的流關聯也好)

「同時考慮:」 吞吐量,時效性,外部數據源的負載,內存資源,解耦性等等方面。

四種join方式不存在絕對的一勞永逸,更多的是針對業務場景在各指標上的權衡取舍,因此看官需要結合場景來選擇適合的。

 

責任編輯:武曉燕 來源: 大數據左右手
相關推薦

2021-07-13 10:02:52

Pandas函數Linux

2013-08-15 09:52:45

開發框架開發工具開發腳本

2016-12-14 20:53:04

Linuxgcc命令行

2016-12-14 19:19:19

Linuxgcc命令行

2014-06-13 11:26:53

CSS庫Web開發

2015-10-27 11:02:06

Web開發CSS 庫

2013-07-12 09:45:16

PHP功能

2023-03-06 10:42:34

CSS前端

2013-08-23 09:28:37

GitGit 命令

2023-09-07 16:28:46

JavaScrip

2021-06-29 10:50:30

Python函數文件

2011-05-10 08:47:55

開發者HTML 5W3C

2022-06-29 09:09:38

Python代碼

2022-08-23 09:01:02

HTMLWeb

2021-11-30 23:30:45

sql 性能異步

2014-09-09 09:32:50

項目管理管理工具

2020-03-06 08:35:45

GitHub設計瀏覽器

2011-05-16 08:37:56

JavaScript庫

2017-10-25 16:22:58

OpenStack操作Glance

2020-11-18 11:14:27

運維架構技術
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 福利在线看 | 亚洲免费一区二区 | 亚洲免费精品 | 亚洲精品区 | 中文字幕高清视频 | 欧美精产国品一二三区 | 欧美一级二级视频 | 午夜精品91 | 羞羞视频免费观看 | 国产激情视频在线观看 | 久久久久久亚洲 | 在线只有精品 | 97日日碰人人模人人澡分享吧 | 精品欧美黑人一区二区三区 | 国产视频二区 | 国产一区二区三区在线 | 日韩免费1区二区电影 | 日本免费视频 | 欧美日韩国产在线观看 | 久草精品在线 | 国产精品有限公司 | 亚洲欧美日韩成人在线 | 日韩一级免费观看 | 黄免费看 | 久久成人人人人精品欧 | 日韩久久久一区二区 | 亚洲在线一区二区三区 | 奇米在线 | 中文字幕综合 | 精品伊人 | 国产精品 亚洲一区 | 欧美精品中文字幕久久二区 | 日韩欧美在线不卡 | 我要看免费一级毛片 | 亚洲精品 在线播放 | 日韩精品一区中文字幕 | 日日爽 | 欧美日韩一| 人人爽人人草 | 超碰91在线 | 久久综合影院 |