成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

實時數倉 | 三分鐘搞定Flink Cdc

大數據
Flink CDC Connector 是ApacheFlink的一組數據源連接器,使用變化數據捕獲change data capture (CDC)從不同的數據庫中提取變更數據。Flink CDC連接器將Debezium集成為引擎來捕獲數據變更。

簡介

Flink CDC Connector 是ApacheFlink的一組數據源連接器,使用變化數據捕獲change data capture (CDC)從不同的數據庫中提取變更數據。Flink CDC連接器將Debezium集成為引擎來捕獲數據變更。因此,它可以充分利用Debezium的功能。

特點

  • 支持讀取數據庫快照,并且能夠持續讀取數據庫的變更日志,即使發生故障,也支持exactly-once 的處理語義
  • 對于DataStream API的CDC connector,用戶無需部署Debezium和Kafka,即可在單個作業中使用多個數據庫和表上的變更數據。
  • 對于Table/SQL API 的CDC connector,用戶可以使用SQL DDL創建CDC數據源,來監視單個表上的數據變更。

使用場景

  • 數據庫之間的增量數據同步
  • 審計日志
  • 數據庫之上的實時物化視圖
  • 基于CDC的維表join

Flink提供的 table format

Flink提供了一系列可以用于table connector的table format,具體如下:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

使用過程中的注意點

使用MySQL CDC的注意點

如果要使用MySQL CDC connector,對于程序而言,需要添加如下依賴:

  1. <dependency> 
  2.   <groupId>com.alibaba.ververica</groupId> 
  3.   <artifactId>flink-connector-mysql-cdc</artifactId> 
  4.   <version>1.0.0</version> 
  5. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。

使用canal-json的注意點

如果要使用Kafka的canal-json,對于程序而言,需要添加如下依賴:

  1. <!-- universal --> 
  2. <dependency> 
  3.     <groupId>org.apache.flink</groupId> 
  4.     <artifactId>flink-connector-kafka_2.11</artifactId> 
  5.     <version>1.11.0</version> 
  6. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。由于Flink1.11的安裝包 的lib目錄下并沒有提供該jar包,所以必須要手動添加依賴包,否則會報如下錯誤:

  1. [ERROR] Could not execute SQL statement. Reason: 
  2. org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. 
  3.  
  4. Available factory identifiers are: 
  5.  
  6. datagen 
  7. mysql-cdc 

使用changelog-json的注意點

如果要使用Kafka的changelog-json Format,對于程序而言,需要添加如下依賴:

  1. <dependency> 
  2.   <groupId>com.alibaba.ververica</groupId> 
  3.   <artifactId>flink-format-changelog-json</artifactId> 
  4.   <version>1.0.0</version> 
  5. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。

mysql-cdc的操作實踐

創建MySQL數據源表

在創建MySQL CDC表之前,需要先創建MySQL的數據表,如下:

  1. -- MySQL 
  2. /*Table structure for table `order_info` */ 
  3. DROP TABLE IF EXISTS `order_info`; 
  4. CREATE TABLE `order_info` ( 
  5.   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號'
  6.   `consignee` varchar(100) DEFAULT NULL COMMENT '收貨人'
  7.   `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人電話'
  8.   `total_amount` decimal(10,2) DEFAULT NULL COMMENT '總金額'
  9.   `order_status` varchar(20) DEFAULT NULL COMMENT '訂單狀態,1表示下單,2表示支付'
  10.   `user_id` bigint(20) DEFAULT NULL COMMENT '用戶id'
  11.   `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式'
  12.   `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送貨地址'
  13.   `order_comment` varchar(200) DEFAULT NULL COMMENT '訂單備注'
  14.   `out_trade_no` varchar(50) DEFAULT NULL COMMENT '訂單交易編號(第三方支付用)'
  15.   `trade_body` varchar(200) DEFAULT NULL COMMENT '訂單描述(第三方支付用)'
  16.   `create_time` datetime DEFAULT NULL COMMENT '創建時間'
  17.   `operate_time` datetime DEFAULT NULL COMMENT '操作時間'
  18.   `expire_time` datetime DEFAULT NULL COMMENT '失效時間'
  19.   `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流單編號'
  20.   `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父訂單編號'
  21.   `img_url` varchar(200) DEFAULT NULL COMMENT '圖片路徑'
  22.   `province_id` int(20) DEFAULT NULL COMMENT '地區'
  23.   PRIMARY KEY (`id`) 
  24. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單表'
  25. -- ---------------------------- 
  26. -- Records of order_info 
  27. -- ---------------------------- 
  28. INSERT INTO `order_info`  
  29. VALUES (476, 'lAXjcL''13408115089', 433.00, '2', 10, '2''OYyAdSdLxedceqovndCD''ihjAYsSjrgJMQVdFQnSy''8728720206''''2020-06-18 02:21:38'NULLNULLNULLNULLNULL, 9); 
  30. INSERT INTO `order_info` 
  31. VALUES (477, 'QLiFDb''13415139984', 772.00, '1', 90, '2''OizYrQbKuWvrvdfpkeSZ''wiBhhqhMndCCgXwmWVQq''1679381473''''2020-06-18 09:12:25'NULLNULLNULLNULLNULL, 3); 
  32. INSERT INTO `order_info` 
  33. VALUES (478, 'iwKjQD''13320383859', 88.00, '1', 107, '1''cbXLKtNHWOcWzJVBWdAs''njjsnknHxsxhuCCeNDDi''0937074290''''2020-06-18 15:56:34'NULLNULLNULLNULLNULL, 7); 
  34.  
  35. /*Table structure for table `order_detail` */ 
  36. CREATE TABLE `order_detail` ( 
  37.   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號'
  38.   `order_id` bigint(20) DEFAULT NULL COMMENT '訂單編號'
  39.   `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id'
  40.   `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱(冗余)'
  41.   `img_url` varchar(200) DEFAULT NULL COMMENT '圖片名稱(冗余)'
  42.   `order_price` decimal(10,2) DEFAULT NULL COMMENT '購買價格(下單時sku價格)'
  43.   `sku_num` varchar(200) DEFAULT NULL COMMENT '購買個數'
  44.   `create_time` datetime DEFAULT NULL COMMENT '創建時間'
  45.   PRIMARY KEY (`id`) 
  46. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單明細表'
  47.  
  48. -- ---------------------------- 
  49. -- Records of order_detail 
  50. -- ---------------------------- 
  51. INSERT INTO `order_detail`  
  52. VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3''2020-06-18 02:21:38'); 
  53. INSERT INTO `order_detail`  
  54. VALUES (1330, 477, 9, '榮耀10 GT游戲加速 AIS手持夜景 6GB+64GB 幻影藍全網通 移動聯通電信''http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4''2020-06-18 09:12:25'); 
  55. INSERT INTO `order_detail` 
  56. VALUES (1331, 478, 4, '小米Play 流光漸變AI雙攝 4GB+64GB 夢幻藍 全網通4G 雙卡雙待 小水滴全面屏拍照游戲智能手機''http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1''2020-06-18 15:56:34'); 
  57. INSERT INTO `order_detail`  
  58. VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3''2020-06-18 15:56:34'); 
  59. INSERT INTO `order_detail`  
  60. VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1''2020-06-18 15:56:34'); 

Flink SQL Cli創建CDC數據源

啟動 Flink 集群,再啟動 SQL CLI,執行下面命令:

  1. -- 創建訂單信息表 
  2. CREATE TABLE order_info( 
  3.     id BIGINT
  4.     user_id BIGINT
  5.     create_time TIMESTAMP(0), 
  6.     operate_time TIMESTAMP(0), 
  7.     province_id INT
  8.     order_status STRING, 
  9.     total_amount DECIMAL(10, 5) 
  10.   ) WITH ( 
  11.     'connector' = 'mysql-cdc'
  12.     'hostname' = 'kms-1'
  13.     'port' = '3306'
  14.     'username' = 'root'
  15.     'password' = '123qwe'
  16.     'database-name' = 'mydw'
  17.     'table-name' = 'order_info' 
  18. ); 

在Flink SQL Cli中查詢該表的數據:result-mode: tableau,+表示數據的insert。

在SQL CLI中創建訂單詳情表:

  1. CREATE TABLE order_detail( 
  2.     id BIGINT
  3.     order_id BIGINT
  4.     sku_id BIGINT
  5.     sku_name STRING, 
  6.     sku_num BIGINT
  7.     order_price DECIMAL(10, 5), 
  8.  create_time TIMESTAMP(0) 
  9.  ) WITH ( 
  10.     'connector' = 'mysql-cdc'
  11.     'hostname' = 'kms-1'
  12.     'port' = '3306'
  13.     'username' = 'root'
  14.     'password' = '123qwe'
  15.     'database-name' = 'mydw'
  16.     'table-name' = 'order_detail' 
  17. ); 

查詢結果如下:

執行JOIN操作:

  1. SELECT 
  2.     od.id, 
  3.     oi.id order_id, 
  4.     oi.user_id, 
  5.     oi.province_id, 
  6.     od.sku_id, 
  7.     od.sku_name, 
  8.     od.sku_num, 
  9.     od.order_price, 
  10.     oi.create_time, 
  11.     oi.operate_time 
  12. FROM 
  13.    ( 
  14.     SELECT *  
  15.     FROM order_info 
  16.     WHERE  
  17.       order_status = '2'-- 已支付 
  18.    ) oi 
  19.    JOIN 
  20.   ( 
  21.     SELECT * 
  22.     FROM order_detail 
  23.   ) od  
  24.   ON oi.id = od.order_id; 

canal-json的操作實踐

關于cannal的使用方式,可以參考我的另一篇文章:基于Canal與Flink實現數據實時增量同步(一)。我已經將下面的表通過canal同步到了kafka,具體格式為:

  1.     "data":[ 
  2.         { 
  3.             "id":"1"
  4.             "region_name":"華北" 
  5.         }, 
  6.         { 
  7.             "id":"2"
  8.             "region_name":"華東" 
  9.         }, 
  10.         { 
  11.             "id":"3"
  12.             "region_name":"東北" 
  13.         }, 
  14.         { 
  15.             "id":"4"
  16.             "region_name":"華中" 
  17.         }, 
  18.         { 
  19.             "id":"5"
  20.             "region_name":"華南" 
  21.         }, 
  22.         { 
  23.             "id":"6"
  24.             "region_name":"西南" 
  25.         }, 
  26.         { 
  27.             "id":"7"
  28.             "region_name":"西北" 
  29.         } 
  30.     ], 
  31.     "database":"mydw"
  32.     "es":1597128441000, 
  33.     "id":102, 
  34.     "isDdl":false
  35.     "mysqlType":{ 
  36.         "id":"varchar(20)"
  37.         "region_name":"varchar(20)" 
  38.     }, 
  39.     "old":null
  40.     "pkNames":null
  41.     "sql":""
  42.     "sqlType":{ 
  43.         "id":12, 
  44.         "region_name":12 
  45.     }, 
  46.     "table":"base_region"
  47.     "ts":1597128441424, 
  48.     "type":"INSERT" 

在SQL CLI中創建該canal-json格式的表:

  1. CREATE TABLE region ( 
  2.   id BIGINT
  3.   region_name STRING 
  4. WITH ( 
  5.  'connector' = 'kafka'
  6.  'topic' = 'mydw.base_region'
  7.  'properties.bootstrap.servers' = 'kms-3:9092'
  8.  'properties.group.id' = 'testGroup'
  9.  'format' = 'canal-json' , 
  10.  'scan.startup.mode' = 'earliest-offset'  
  11. ); 

查詢結果如下:

changelog-json的操作實踐

創建MySQL數據源

參見上面的order_info

Flink SQL Cli創建changelog-json表

  1. CREATE TABLE order_gmv2kafka ( 
  2.   day_str STRING, 
  3.   gmv DECIMAL(10, 5) 
  4. WITH ( 
  5.     'connector' = 'kafka'
  6.     'topic' = 'order_gmv_kafka'
  7.     'scan.startup.mode' = 'earliest-offset'
  8.     'properties.bootstrap.servers' = 'kms-3:9092'
  9.     'format' = 'changelog-json' 
  10. ); 
  11.  
  12. INSERT INTO order_gmv2kafka 
  13. SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd'as day_str, SUM(total_amount) as gmv 
  14. FROM order_info 
  15. WHERE order_status = '2' -- 訂單已支付 
  16. GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd');  

查詢表看一下結果:

再查一下kafka的數據:

  1. {"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"

當將另外兩個訂單的狀態order_status更新為2時,總金額=443+772+88=1293再觀察數據:

再看kafka中的數據:

 

責任編輯:武曉燕 來源: 大數據技術與數倉
相關推薦

2009-11-05 16:04:19

Oracle用戶表

2020-11-20 08:36:59

Jpa數據代碼

2024-05-16 11:13:16

Helm工具release

2009-11-09 12:55:43

WCF事務

2024-12-18 10:24:59

代理技術JDK動態代理

2022-02-16 19:42:25

Spring配置開發

2024-01-16 07:46:14

FutureTask接口用法

2021-04-20 13:59:37

云計算

2022-02-17 09:24:11

TypeScript編程語言javaScrip

2024-08-30 08:50:00

2023-12-27 08:15:47

Java虛擬線程

2009-11-12 09:16:15

ADO.NET數據庫連

2020-06-30 10:45:28

Web開發工具

2013-06-28 14:30:26

棱鏡計劃棱鏡棱鏡監控項目

2021-12-17 07:47:37

IT風險框架

2020-06-29 07:42:20

邊緣計算云計算技術

2023-12-04 18:13:03

GPU編程

2024-07-05 09:31:37

2024-10-15 09:18:30

2024-01-12 07:38:38

AQS原理JUC
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 综合久久一区 | 日韩在线免费视频 | 婷婷在线免费 | 欧美在线一区二区三区 | h网站在线观看 | 久久久网 | 午夜免费看 | 亚洲人精品午夜 | 日本一区二区高清不卡 | 国精品一区二区 | 少妇午夜一级艳片欧美精品 | 亚洲自拍偷拍视频 | 日韩爱爱网 | 国产精品一区在线观看 | 男人天堂999 | h视频在线播放 | 久久综合国产精品 | 亚洲精品久久国产高清情趣图文 | 亚洲一区二区三区免费在线观看 | 日本精品一区二区三区在线观看视频 | 国产免费av在线 | 国产免费一级一级 | 久久爱一区| 日本欧美在线视频 | 久久精品国产一区 | 国产精品2区 | 91人人看| 日韩在线中文字幕 | 波多野结衣在线观看一区二区三区 | 日本不卡一区二区三区在线观看 | 久久久久亚洲 | 亚洲免费观看视频网站 | 国产婷婷精品av在线 | 狠狠干2020 | 国产精品日韩一区 | 国产乱一区二区三区视频 | 国产成人免费在线 | 国产精品日韩欧美 | 婷婷久久综合 | 99精品国产在热久久 | 国产毛片av |