成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

十分鐘入門Fink SQL

運維 數(shù)據(jù)庫運維
本篇文章主要講解了Flink SQL 入門操作,后面我會分享一些關于Flink SQL連接Kafka、輸出到kafka、MySQL等

[[358221]]

前言

Flink 本身是批流統(tǒng)一的處理框架,所以 Table API 和 SQL,就是批流統(tǒng)一的上層處理 API。目前功能尚未完善,處于活躍的開發(fā)階段。 Table API 是一套內嵌在 Java 和 Scala 語言中的查詢 API,它允許我們以非常直觀的方式,組合來自一些關系運算符的查詢(比如 select、filter 和 join)。而對于 Flink SQL,就是直接可以在代碼中寫 SQL,來實現(xiàn)一些查詢(Query)操作。Flink 的 SQL 支持,基于實現(xiàn)了 SQL 標準的 Apache Calcite(Apache 開源 SQL 解析工具)。圖片

1、導入所需要的的依賴包

  1. <dependency> 
  2.           <groupId>org.apache.flink</groupId> 
  3.           <artifactId>flink-table-planner_2.12</artifactId> 
  4.           <version>1.10.1</version> 
  5.       </dependency> 
  6.       <dependency> 
  7.           <groupId>org.apache.flink</groupId> 
  8.           <artifactId>flink-table-api-scala-bridge_2.12</artifactId> 
  9.           <version>1.10.1</version> 
  10.       </dependency> 
  11.       <dependency> 
  12.           <groupId>org.apache.flink</groupId> 
  13.           <artifactId>flink-csv</artifactId> 
  14.           <version>1.10.1</version> 
  15.      </dependency> 

flink-table-planner:planner 計劃器,是 table API 最主要的部分,提供了運行時環(huán)境和生成程序執(zhí)行計劃的 planner; flink-table-api-scala-bridge:bridge 橋接器,主要負責 table API 和 DataStream/DataSet API的連接支持,按照語言分 java 和 scala。

這里的兩個依賴,是 IDE 環(huán)境下運行需要添加的;如果是生產(chǎn)環(huán)境,lib 目錄下默認已經(jīng)有了 planner,就只需要有 bridge 就可以了。

當然,如果想使用用戶自定義函數(shù),或是跟 kafka 做連接,需要有一個 SQL client,這個包含在 flink-table-common 里。

2、兩種 planner(old& blink)的區(qū)別

  1. 批流統(tǒng)一:Blink 將批處理作業(yè),視為流式處理的特殊情況。所以,blink 不支持表和DataSet 之間的轉換,批處理作業(yè)將不轉換為 DataSet 應用程序,而是跟流處理一樣,轉換為 DataStream 程序來處理。
  2. 因 為 批 流 統(tǒng) 一 , Blink planner 也 不 支 持 BatchTableSource , 而 使 用 有 界 的
  3. Blink planner 只支持全新的目錄,不支持已棄用的 ExternalCatalog。
  4. 舊 planner 和 Blink planner 的 FilterableTableSource 實現(xiàn)不兼容。舊的 planner 會把PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 則會把 Expressions 下推。
  5. 基于字符串的鍵值配置選項僅適用于 Blink planner。
  6. PlannerConfig 在兩個 planner 中的實現(xiàn)不同。
  7. Blink planner 會將多個 sink 優(yōu)化在一個 DAG 中(僅在 TableEnvironment 上受支持,而在 StreamTableEnvironment 上不受支持)。而舊 planner 的優(yōu)化總是將每一個 sink 放在一個新的 DAG 中,其中所有 DAG 彼此獨立。
  8. 舊的 planner 不支持目錄統(tǒng)計,而 Blink planner 支持。

3、表(Table)的概念

TableEnvironment 可以注冊目錄 Catalog,并可以基于 Catalog 注冊表。它會維護一個Catalog-Table 表之間的 map。 表(Table)是由一個標識符來指定的,由 3 部分組成:Catalog 名、數(shù)據(jù)庫(database)名和對象名(表名)。如果沒有指定目錄或數(shù)據(jù)庫,就使用當前的默認值。

4、連接到文件系統(tǒng)(Csv 格式)

連接外部系統(tǒng)在 Catalog 中注冊表,直接調用 tableEnv.connect()就可以,里面參數(shù)要傳入一個 ConnectorDescriptor,也就是 connector 描述器。對于文件系統(tǒng)的 connector 而言,flink內部已經(jīng)提供了,就叫做 FileSystem()。

5、測試案例 (新)

需求: 將一個txt文本文件作為輸入流讀取數(shù)據(jù)過濾id不等于sensor_1的數(shù)據(jù)實現(xiàn)思路: 首先我們先構建一個table的env環(huán)境通過connect提供的方法來讀取數(shù)據(jù)然后設置表結構將數(shù)據(jù)注冊為一張表就可進行我們的數(shù)據(jù)過濾了(使用sql或者流處理方式進行解析)

準備數(shù)據(jù)

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

代碼實現(xiàn)

  1. import org.apache.flink.streaming.api.scala._ 
  2. import org.apache.flink.table.api.{DataTypes} 
  3. import org.apache.flink.table.api.scala._ 
  4. import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema
  5.  
  6. /** 
  7.  * @Package 
  8.  * @author 大數(shù)據(jù)老哥 
  9.  * @date 2020/12/12 21:22 
  10.  * @version V1.0 
  11.  *          第一個Flinksql測試案例 
  12.  */ 
  13.  
  14. object FlinkSqlTable { 
  15.   def main(args: Array[String]): Unit = { 
  16.     // 構建運行流處理的運行環(huán)境 
  17.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  18.     // 構建table環(huán)境 
  19.     val tableEnv = StreamTableEnvironment.create(env) 
  20.      //通過 connect 讀取數(shù)據(jù) 
  21.     tableEnv.connect(new FileSystem().path("D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt")) 
  22.       .withFormat(new Csv()) //設置類型 
  23.       .withSchema(new Schema() // 給數(shù)據(jù)添加元數(shù)信息 
  24.         .field("id", DataTypes.STRING()) 
  25.         .field("time", DataTypes.BIGINT()) 
  26.         .field("temperature", DataTypes.DOUBLE()) 
  27.       ).createTemporaryTable("inputTable")  // 創(chuàng)建一個臨時表 
  28.      
  29.     val resTable = tableEnv.from("inputTable"
  30.       .select("*").filter('id === "sensor_1"
  31.     // 使用sql的方式查詢數(shù)據(jù) 
  32.     var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'"
  33.     // 將數(shù)據(jù)轉為流進行輸出 
  34.     resTable.toAppendStream[(String, Long, Double)].print("resTable"
  35.     resSql.toAppendStream[(String, Long, Double)].print("resSql"
  36.  
  37.     env.execute("FlinkSqlWrodCount"
  38.   } 

6、TableEnvironment 的作用

  • 注冊 catalog
  • 在內部 catalog 中注冊表
  • 執(zhí)行 SQL 查詢
  • 注冊用戶自定義函數(shù)
  • 注冊用戶自定義函數(shù)
  • 保存對 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

在創(chuàng)建 TableEnv 的時候,可以多傳入一個 EnvironmentSettings 或者 TableConfig 參數(shù),可以用來配置 TableEnvironment 的一些特性。

7、 老版本創(chuàng)建流處理批處理

7.1老版本流處理

  1. val settings = EnvironmentSettings.newInstance() 
  2. .useOldPlanner() // 使用老版本 planner 
  3. .inStreamingMode() // 流處理模式 
  4. .build() 
  5. val tableEnv = StreamTableEnvironment.create(env, settings) 

7.2 老版本批處理

  1. val batchEnv = ExecutionEnvironment.getExecutionEnvironment  
  2. val batchTableEnv = BatchTableEnvironment.create(batchEnv) 

7.3 blink 版本的流處理環(huán)境

  1. val bsSettings = EnvironmentSettings.newInstance() 
  2. .useBlinkPlanner() 
  3. .inStreamingMode().build() 
  4. val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) 

7.4 blink 版本的批處理環(huán)境

  1. val bbSettings = EnvironmentSettings.newInstance() 
  2. .useBlinkPlanner() 
  3. .inBatchMode().build() 
  4. val bbTableEnv = TableEnvironment.create(bbSettings) 

總結:

本篇文章主要講解了Flink SQL 入門操作,后面我會分享一些關于Flink SQL連接Kafka、輸出到kafka、MySQL等

本文轉載自微信公眾號「 大數(shù)據(jù)老哥」,可以通過以下二維碼關注。轉載本文請聯(lián)系 大數(shù)據(jù)老哥公眾號。

 

責任編輯:武曉燕 來源: 大數(shù)據(jù)老哥
相關推薦

2012-07-10 01:22:32

PythonPython教程

2022-06-16 07:31:41

Web組件封裝HTML 標簽

2024-05-13 09:28:43

Flink SQL大數(shù)據(jù)

2019-04-01 14:59:56

負載均衡服務器網(wǎng)絡

2023-06-07 08:27:10

Docker容器

2015-09-06 09:22:24

框架搭建快速高效app

2023-11-30 10:21:48

虛擬列表虛擬列表工具庫

2024-06-19 09:58:29

2021-09-07 09:40:20

Spark大數(shù)據(jù)引擎

2023-04-12 11:18:51

甘特圖前端

2023-10-07 00:06:09

SQL數(shù)據(jù)庫

2023-11-09 14:44:27

Docker鏡像容器

2020-12-11 09:40:10

DevOpsCICD

2015-11-06 11:03:36

2022-04-13 22:01:44

錯誤監(jiān)控系統(tǒng)

2021-07-29 08:57:23

ViteReact模塊

2023-12-08 13:19:00

前端Reactour流行庫

2019-09-16 09:14:51

2009-10-09 14:45:29

VB程序

2023-07-15 18:26:51

LinuxABI
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文字幕在线视频观看 | 国产一区二区三区久久 | 成人精品国产一区二区4080 | 正在播放国产精品 | 久久久久久免费毛片精品 | 国产日韩精品一区 | 91天堂 | 国产激情91久久精品导航 | 亚洲视频 欧美视频 | 亚洲精品9999 | av国产精品毛片一区二区小说 | 国产高清性xxxxxxxx | 一区二区三区高清在线观看 | 国产69久久精品成人看动漫 | 免费观看毛片 | 成人av免费在线观看 | 岛国毛片| 欧美激情va永久在线播放 | 亚洲精品久久久久久国产精华液 | 欧美中文字幕一区 | 免费高潮视频95在线观看网站 | 极情综合网 | 91中文在线观看 | 久久精品视频一区二区三区 | 久久精品亚洲一区 | 97视频久久 | av在线免费观看网站 | 美女爽到呻吟久久久久 | 亚洲va国产日韩欧美精品色婷婷 | 在线欧美激情 | 天天操夜夜操免费视频 | 国产在线视频三区 | 国产精品福利在线观看 | 天天夜夜操 | 国产伦精品一区二区三区四区视频 | 欧美a在线观看 | 久久精品国产一区 | 欧美一级片在线观看 | 日本电影网站 | 亚洲精品68久久久一区 | 天堂色综合 |