成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink SQL 知其所以然:Table 與 DataStream 的轉轉轉

數據庫 其他數據庫
相信大家看到本文的標題時,會比較好奇,要寫 SQL 就純 SQL 唄,要寫 DataStream 就純 DataStream 唄,為啥還要把這兩個接口做集成呢?

1.序篇

廢話不多說,咱們先直接上本文的目錄和結論,小伙伴可以先看結論快速了解博主期望本文能給小伙伴們帶來什么幫助:

  1. 背景及應用場景介紹:博主期望你能了解到,Flink 支持了 SQL 和 Table API 中的 Table 與 DataStream 互轉的接口。通過這種互轉的方式,我們就可以將一些自定義的數據源(DataStream)創建為 SQL 表,也可以將 SQL 執行結果轉換為 DataStream 然后后續去完成一些在 SQL 中實現不了的復雜操作。肥腸的方便。
  2. 目前只有流任務支持互轉,批任務不支持:在 1.13 版本中,由于流和批的 env 接口不一樣,流任務為 StreamTableEnvironment,批任務為 TableEnvironment,目前只有 StreamTableEnvironment 支持了互轉的接口,TableEnvironment 沒有這樣的接口,因此目前流任務支持互轉,批任務不支持。但是 1.14 版本中流批任務的 env 都統一到了 StreamTableEnvironment 中,流批任務中就都可以進行互轉了。
  3. Retract 語義 SQL 轉 DataStream 需要重點注意:Append 語義的 SQL 轉為 DataStream 使用的 API 為 StreamTableEnvironment::toDataStream,Retract 語義的 SQL 轉為 DataStream 使用的 API 為 StreamTableEnvironment::toRetractStream,兩個接口不一樣,小伙伴萌一定要特別注意。

2.背景及應用場景介紹

相信大家看到本文的標題時,會比較好奇,要寫 SQL 就純 SQL 唄,要寫 DataStream 就純 DataStream 唄,為啥還要把這兩個接口做集成呢?

博主舉一個案例:在拼多多發優惠券的場景下,為了控制成本,希望能在每日優惠券發放金額加和超過 1w 時,及時報警出來,控制預算。

優惠券表的發放數據:

 

最終期望的結果是:每天的 money 之和超過 1w 的時候,報警報警報警!!!

那么針對上述場景,有兩種對應的解決方案:

  1. 方案 1:可想而知,DataStream 是必然能夠解決我們的問題的。
  2. 方案 2:DataStream 開發效率不高,可以使用 SQL 計算優惠券發放的結果,但是 SQL 無法做到報警。所以可以將 SQL 的查詢的結果(即 Table)轉為 DataStream,然后在 DataStream 后自定義報警邏輯的算子,超過閾值進行報警。

本節就介紹方案 2 的實現思路。

注意:

當然還有一些其他的比如模式識別監控異常然后報警的場景使用 DataStream 去實現就更加復雜了,所以我們也可以使用類似的思路,先 SQL 實現業務邏輯,然后接一個 DataStream 算子實現報警邏輯。

3.Table 與 DataStream API 的轉換具體實現

3.1.先看一個官網的簡單案例

官網的案例主要是讓大家看看要做到 Table 與 DataStream API 的轉換會涉及到使用哪些接口。

  1. import org.apache.flink.streaming.api.datastream.DataStream; 
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
  3. import org.apache.flink.table.api.Table
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
  5. import org.apache.flink.types.Row; 
  6.  
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  8. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); 
  9.  
  10. DataStream<String> dataStream = env.fromElements("Alice""Bob""John"); 
  11.  
  12. // 1. 使用 StreamTableEnvironment::fromDataStream API 將 DataStream 轉為 Table 
  13. Table inputTable = tableEnv.fromDataStream(dataStream); 
  14.  
  15. // 將 Table 注冊為一個臨時表 
  16. tableEnv.createTemporaryView("InputTable", inputTable); 
  17.  
  18. // 然后就可以在這個臨時表上做一些自定義的查詢了 
  19. Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable"); 
  20.  
  21. // 2. 也可以使用 StreamTableEnvironment::toDataStream 將 Table 轉為 DataStream 
  22. // 注意:這里只能轉為 DataStream<Row>,其中的數據類型只能為 Row 
  23. DataStream<Row> resultStream = tableEnv.toDataStream(resultTable); 
  24.  
  25. // 將 DataStream 結果打印到控制臺 
  26. resultStream.print(); 
  27. env.execute(); 
  28.  
  29. // prints: 
  30. // +I[Alice] 
  31. // +I[Bob] 
  32. // +I[John] 

可以看到重點的接口就是:

  1. StreamTableEnvironment::toDataStream:將 Table 轉為 DataStream
  2. StreamTableEnvironment::fromDataStream:將 DataStream 轉為 Table

3.2.實現第 2 節中的邏輯

我們使用上面介紹的兩個接口對優惠券發放金額預警的案例做一個實現。

  1. @Slf4j 
  2. public class AlertExample { 
  3.  
  4.     public static void main(String[] args) throws Exception { 
  5.  
  6.         FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args); 
  7.  
  8.         String createTableSql = "CREATE TABLE source_table (\n" 
  9.                 + "    id BIGINT,\n" 
  10.                 + "    money BIGINT,\n" 
  11.                 + "    row_time AS cast(CURRENT_TIMESTAMP as timestamp_LTZ(3)),\n" 
  12.                 + "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" 
  13.                 + ") WITH (\n" 
  14.                 + "  'connector' = 'datagen',\n" 
  15.                 + "  'rows-per-second' = '1',\n" 
  16.                 + "  'fields.id.min' = '1',\n" 
  17.                 + "  'fields.id.max' = '100000',\n" 
  18.                 + "  'fields.money.min' = '1',\n" 
  19.                 + "  'fields.money.max' = '100000'\n" 
  20.                 + ")\n"
  21.  
  22.         String querySql = "SELECT UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end, \n" 
  23.                 + "      window_start, \n" 
  24.                 + "      sum(money) as sum_money,\n" 
  25.                 + "      count(distinct id) as count_distinct_id\n" 
  26.                 + "FROM TABLE(CUMULATE(\n" 
  27.                 + "         TABLE source_table\n" 
  28.                 + "         , DESCRIPTOR(row_time)\n" 
  29.                 + "         , INTERVAL '5' SECOND\n" 
  30.                 + "         , INTERVAL '1' DAY))\n" 
  31.                 + "GROUP BY window_start, \n" 
  32.                 + "        window_end"
  33.  
  34.         // 1. 創建數據源表,即優惠券發放明細數據 
  35.         flinkEnv.streamTEnv().executeSql(createTableSql); 
  36.         // 2. 執行 query 查詢,計算每日發放金額 
  37.         Table resultTable = flinkEnv.streamTEnv().sqlQuery(querySql); 
  38.  
  39.         // 3. 報警邏輯(toDataStream 返回 Row 類型),如果 sum_money 超過 1w,報警 
  40.         flinkEnv.streamTEnv() 
  41.                 .toDataStream(resultTable, Row.class) 
  42.                 .flatMap(new FlatMapFunction<Row, Object>() { 
  43.                     @Override 
  44.                     public void flatMap(Row value, Collector<Object> out) throws Exception { 
  45.                         long l = Long.parseLong(String.valueOf(value.getField("sum_money"))); 
  46.  
  47.                         if (l > 10000L) { 
  48.                             log.info("報警,超過 1w"); 
  49.                         } 
  50.                     } 
  51.                 }); 
  52.  
  53.         flinkEnv.env().execute(); 
  54.     } 
  55.  

 執行效果如下:

3.3.Table 和 DataStream 轉換注意事項

3.3.1.目前只支持流任務互轉(1.13)

目前在 1.13 版本中,Flink 對于 Table 和 DataStream 的轉化是有一些限制的:

目前流任務使用的 env 為 StreamTableEnvironment,批任務為 TableEnvironment,而 Table 和 DataStream 之間的轉換目前只有 StreamTableEnvironment 的接口支持。

所以其實小伙伴萌可以理解為只有流任務才支持 Table 和 DataStream 之間的轉換,批任務是不支持的(雖然可以使用流模式處理有界流(批數據),但效率較低,這種騷操作不建議大家搞)。

那什么時候才能支持批任務的 Table 和 DataStream 之間的轉換呢?

1.14 版本支持。1.14 版本中,流和批的都統一到了 StreamTableEnvironment 中,因此就可以做 Table 和 DataStream 的互相轉換了。

3.3.2.Retract 語義 SQL 轉 DataStream 注意事項

Retract 語義的 SQL 使用 toDataStream 轉換會報錯不支持。具體報錯截圖如下。意思是不支持 update 類型的結果數據。

如果要把 Retract 語義的 SQL 轉為 DataStream,我們需要使用 toRetractStream。如下案例:

  1. @Slf4j 
  2. public class AlertExampleRetract { 
  3.  
  4.     public static void main(String[] args) throws Exception { 
  5.  
  6.         FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args); 
  7.  
  8.         String createTableSql = "CREATE TABLE source_table (\n" 
  9.                 + "    id BIGINT,\n" 
  10.                 + "    money BIGINT,\n" 
  11.                 + "    `time` as cast(CURRENT_TIMESTAMP as bigint) * 1000\n" 
  12.                 + ") WITH (\n" 
  13.                 + "  'connector' = 'datagen',\n" 
  14.                 + "  'rows-per-second' = '1',\n" 
  15.                 + "  'fields.id.min' = '1',\n" 
  16.                 + "  'fields.id.max' = '100000',\n" 
  17.                 + "  'fields.money.min' = '1',\n" 
  18.                 + "  'fields.money.max' = '100000'\n" 
  19.                 + ")\n"
  20.  
  21.         String querySql = "SELECT max(`time`), \n" 
  22.                 + "      sum(money) as sum_money\n" 
  23.                 + "FROM source_table\n" 
  24.                 + "GROUP BY (`time` + 8 * 3600 * 1000) / (24 * 3600 * 1000)"
  25.  
  26.         // 1. 創建數據源表,即優惠券發放明細數據 
  27.         flinkEnv.streamTEnv().executeSql(createTableSql); 
  28.         // 2. 執行 query 查詢,計算每日發放金額 
  29.         Table resultTable = flinkEnv.streamTEnv().sqlQuery(querySql); 
  30.         // 3. 報警邏輯(toRetractStream 返回 Tuple2<Boolean, Row> 類型),如果 sum_money 超過 1w,報警 
  31.         // Tuple2<Boolean, Row> f0 的 Boolean 標識是否是回撤消息 
  32.         flinkEnv.streamTEnv() 
  33.                 .toRetractStream(resultTable, Row.class) 
  34.                 .flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Object>() { 
  35.                     @Override 
  36.                     public void flatMap(Tuple2<Boolean, Row> value, Collector<Object> out) throws Exception { 
  37.                         long l = Long.parseLong(String.valueOf(value.f1.getField("sum_money"))); 
  38.  
  39.                         if (l > 10000L) { 
  40.                             log.info("報警,超過 1w"); 
  41.                         } 
  42.                     } 
  43.                 }); 
  44.  
  45.         flinkEnv.env().execute(); 
  46.     } 
  47.  

4.總結與展望

本文主要介紹了 flink 中 Table 和 DataStream 互轉使用方式,并介紹了一些使用注意事項,總結如下:

  1. 背景及應用場景介紹:博主期望你能了解到,Flink 支持了 SQL 和 Table API 中的 Table 與 DataStream 互轉的接口。通過這種互轉的方式,我們就可以將一些自定義的數據源(DataStream)創建為 SQL 表,也可以將 SQL 執行結果轉換為 DataStream 然后后續去完成一些在 SQL 中實現不了的復雜操作。肥腸的方便。
  2. 目前只有流任務支持互轉,批任務不支持:在 1.13 版本中,由于流和批的 env 接口不一樣,流任務為 StreamTableEnvironment,批任務為 TableEnvironment,目前只有 StreamTableEnvironment 支持了互轉的接口,TableEnvironment 沒有這樣的接口,因此目前流任務支持互轉,批任務不支持。但是 1.14 版本中流批任務的 env 都統一到了 StreamTableEnvironment 中,流批任務中就都可以進行互轉了。
  3. Retract 語義 SQL 轉 DataStream 需要重點注意:Append 語義的 SQL 轉為 DataStream 使用的 API 為 StreamTableEnvironment::toDataStream,Retract 語義的 SQL 轉為 DataStream 使用的 API 為 StreamTableEnvironment::toRetractStream,兩個接口不一樣,小伙伴萌一定要特別注意。

 

責任編輯:姜華 來源: 大數據羊說
相關推薦

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-05-15 09:57:59

Flink SQL時間語義

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2022-06-06 09:27:23

FlinkSQLGroup

2022-06-29 09:01:38

FlinkSQL時間屬性

2021-12-09 06:59:24

FlinkSQL 開發

2022-05-27 09:02:58

SQLHive語義

2022-05-12 09:02:47

Flink SQL數據類型

2021-11-28 11:36:08

SQL Flink Join

2022-08-10 10:05:29

FlinkSQL

2021-11-27 09:03:26

flink join數倉

2021-09-12 07:01:07

Flink SQL ETL datastream

2022-06-18 09:26:00

Flink SQLJoin 操作

2021-12-06 07:15:47

開發Flink SQL

2022-05-09 09:03:04

SQL數據流數據

2021-11-24 08:17:21

Flink SQLCumulate WiSQL

2018-08-27 06:30:49

InnoDBMySQLMyISAM

2021-12-13 07:57:47

Flink SQL Flink Hive Udf
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 美美女高清毛片视频免费观看 | 一区二区不卡高清 | 91精品国产91久久久久久密臀 | 成人深夜福利 | 国产一区| 性色视频 | 中文字幕高清在线 | 亚洲欧洲综合av | 亚洲国产一区二区三区, | 亚洲精品国产电影 | 伊人看片 | 国产精品久久国产精品 | 性视频一区 | 成人av一区二区三区 | 日韩av一区二区在线观看 | 一色一黄视频 | 韩日有码| 亚洲乱码国产乱码精品精的特点 | 色姑娘综合网 | 中文字幕 亚洲一区 | 91看片免费 | 玖玖视频 | 国产精品自在线 | 亚洲国产精品久久久 | 夜夜干夜夜操 | 九九热最新地址 | 欧美一区二区在线观看 | 久久国产成人精品国产成人亚洲 | 中国一级特黄真人毛片 | 亚洲精品一区二区二区 | 国产一区在线免费观看视频 | 欧美一级在线 | 国产精品视频在 | av黄色在线 | 亚洲一区二区三区四区视频 | 日本一二三区高清 | 一区二区三区在线 | 欧 | 国产精品亚洲成在人线 | 337p日韩| 国产美女精品 | 亚洲视频在线免费 |