成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

實戰!Spring Boot 整合 阿里開源中間件 Canal 實現數據增量同步!

開發 架構
數據同步一直是一個令人頭疼的問題。在業務量小,場景不多,數據量不大的情況下我們可能會選擇在項目中直接寫一些定時任務手動處理數據,例如從多個表將數據查出來,再匯總處理,再插入到相應的地方。

[[442171]]

本文轉載自微信公眾號「碼猿技術專欄」,作者不才陳某 。轉載本文請聯系碼猿技術專欄公眾號。

數據同步一直是一個令人頭疼的問題。在業務量小,場景不多,數據量不大的情況下我們可能會選擇在項目中直接寫一些定時任務手動處理數據,例如從多個表將數據查出來,再匯總處理,再插入到相應的地方。

但是隨著業務量增大,數據量變多以及各種復雜場景下的分庫分表的實現,使數據同步變得越來越困難。

今天這篇文章使用阿里開源的中間件Canal解決數據增量同步的痛點。

文章目錄如下:

Canal是什么?

canal譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費。

從這句話理解到了什么?

基于MySQL,并且通過MySQL日志進行的增量解析,這也就意味著對原有的業務代碼完全是無侵入性的。

工作原理:解析MySQL的binlog日志,提供增量數據。

基于日志增量訂閱和消費的業務包括

  • 數據庫鏡像
  • 數據庫實時備份
  • 索引構建和實時維護(拆分異構索引、倒排索引等)
  • 業務 cache 刷新
  • 帶業務邏輯的增量數據處理

當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

官方文檔:https://github.com/alibaba/canal

Canal數據如何傳輸?

先來一張官方圖:

Canal分為服務端和客戶端,這也是阿里常用的套路,比如前面講到的注冊中心Nacos:

  • 服務端:負責解析MySQL的binlog日志,傳遞增量數據給客戶端或者消息中間件
  • 客戶端:負責解析服務端傳過來的數據,然后定制自己的業務處理。

目前為止支持的消息中間件很全面了,比如Kafka、RocketMQ,RabbitMQ。

數據同步還有其他中間件嗎?

有,當然有,還有一些開源的中間件也是相當不錯的,比如Bifrost。

常見的幾款中間件的區別如下:

當然要我選擇的話,首選阿里的中間件Canal。

Canal服務端安裝

服務端需要下載壓縮包,下載地址:https://github.com/alibaba/canal/releases

目前最新的是v1.1.5,點擊下載:

下載完成解壓,目錄如下:

本文使用Canal+RabbitMQ進行數據的同步,因此下面步驟完全按照這個base進行。

1、打開MySQL的binlog日志

修改MySQL的日志文件,my.cnf 配置如下:

  1. [mysqld] 
  2. log-bin=mysql-bin # 開啟 binlog 
  3. binlog-format=ROW # 選擇 ROW 模式 
  4. server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復 

2、設置MySQL的配置

需要設置服務端配置文件中的MySQL配置,這樣Canal才能知道需要監聽哪個庫、哪個表的日志文件。

一個 Server 可以配置多個實例監聽 ,Canal 功能默認自帶的有個 example 實例,本篇就用 example 實例 。如果增加實例,復制 example 文件夾內容到同級目錄下,然后在 canal.properties 指定添加實例的名稱。

修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

  1. # url 
  2. canal.instance.master.address=127.0.0.1:3306 
  3. # username/password 
  4. canal.instance.dbUsername=root 
  5. canal.instance.dbPassword=root 
  6. # 監聽的數據庫 
  7. canal.instance.defaultDatabaseName=test 
  8.  
  9. # 監聽的表,可以指定,多個用逗號分割,這里正則是監聽所有 
  10. canal.instance.filter.regex=.*\\..* 

3、設置RabbitMQ的配置

服務端默認的傳輸方式是tcp,需要在配置文件中設置MQ的相關信息。

這里需要修改兩處配置文件,如下;

1)、canal.deployer-1.1.5\conf\canal.properties

這個配置文件主要是設置MQ相關的配置,比如URL,用戶名、密碼...

  1. # 傳輸方式:tcp, kafka, rocketMQ, rabbitMQ 
  2. canal.serverMode = rabbitMQ 
  3. ################################################## 
  4. #########           RabbitMQ         ############# 
  5. ################################################## 
  6. rabbitmq.host = 127.0.0.1 
  7. rabbitmq.virtual.host =/ 
  8. # exchange 
  9. rabbitmq.exchange =canal.exchange 
  10. # 用戶名、密碼 
  11. rabbitmq.username =guest 
  12. rabbitmq.password =guest 
  13. ## 是否持久化 
  14. rabbitmq.deliveryMode = 2 

2)、canal.deployer-1.1.5\conf\example\instance.properties

這個文件設置MQ的路由KEY,這樣才能路由到指定的隊列中,如下:

  1. canal.mq.topic=canal.routing.key 

4、RabbitMQ新建exchange和Queue

在RabbitMQ中需要新建一個canal.exchange(必須和配置中的相同)的exchange和一個名稱為 canal.queue(名稱隨意)的隊列。

其中綁定的路由KEY為:canal.routing.key(必須和配置中的相同),如下圖:

5、啟動服務端

點擊bin目錄下的腳本,windows直接雙擊startup.bat,啟動成功如下:

6、測試

在本地數據庫test中的oauth_client_details插入一條數據,如下:

  1. INSERT INTO `oauth_client_details` VALUES ('myjszl''res1''$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W''all''password,refresh_token,authorization_code,client_credentials,implicit''http://www.baidu.com'NULL, 1000, 1000, NULL'false'); 

此時查看MQ中的canal.queue已經有了數據,如下:

其實就是一串JSON數據,這個JSON如下:

  1.  "data": [{ 
  2.   "client_id""myjszl"
  3.   "resource_ids""res1"
  4.   "client_secret""$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W"
  5.   "scope""all"
  6.   "authorized_grant_types""password,refresh_token,authorization_code,client_credentials,implicit"
  7.   "web_server_redirect_uri""http://www.baidu.com"
  8.   "authorities"null
  9.   "access_token_validity""1000"
  10.   "refresh_token_validity""1000"
  11.   "additional_information"null
  12.   "autoapprove""false" 
  13.  }], 
  14.  "database""test"
  15.  "es": 1640337532000, 
  16.  "id": 7, 
  17.  "isDdl"false
  18.  "mysqlType": { 
  19.   "client_id""varchar(48)"
  20.   "resource_ids""varchar(256)"
  21.   "client_secret""varchar(256)"
  22.   "scope""varchar(256)"
  23.   "authorized_grant_types""varchar(256)"
  24.   "web_server_redirect_uri""varchar(256)"
  25.   "authorities""varchar(256)"
  26.   "access_token_validity""int(11)"
  27.   "refresh_token_validity""int(11)"
  28.   "additional_information""varchar(4096)"
  29.   "autoapprove""varchar(256)" 
  30.  }, 
  31.  "old"null
  32.  "pkNames": ["client_id"], 
  33.  "sql"""
  34.  "sqlType": { 
  35.   "client_id": 12, 
  36.   "resource_ids": 12, 
  37.   "client_secret": 12, 
  38.   "scope": 12, 
  39.   "authorized_grant_types": 12, 
  40.   "web_server_redirect_uri": 12, 
  41.   "authorities": 12, 
  42.   "access_token_validity": 4, 
  43.   "refresh_token_validity": 4, 
  44.   "additional_information": 12, 
  45.   "autoapprove": 12 
  46.  }, 
  47.  "table""oauth_client_details"
  48.  "ts": 1640337532520, 
  49.  "type""INSERT" 

每個字段的意思已經很清楚了,有表名稱、方法、參數、參數類型、參數值.....

客戶端要做的就是監聽MQ獲取JSON數據,然后將其解析出來,處理自己的業務邏輯。

Canal客戶端搭建

客戶端很簡單實現,要做的就是消費Canal服務端傳遞過來的消息,監聽canal.queue這個隊列。

1、創建消息實體類

MQ傳遞過來的是JSON數據,當然要創建個實體類接收數據,如下:

  1. /** 
  2.  * @author 公眾號 碼猿技術專欄 
  3.  * Canal消息接收實體類 
  4.  */ 
  5. @NoArgsConstructor 
  6. @Data 
  7. public class CanalMessage<T> { 
  8.     @JsonProperty("type"
  9.     private String type; 
  10.  
  11.     @JsonProperty("table"
  12.     private String table
  13.  
  14.     @JsonProperty("data"
  15.     private List<T> data; 
  16.  
  17.     @JsonProperty("database"
  18.     private String database
  19.  
  20.     @JsonProperty("es"
  21.     private Long es; 
  22.  
  23.     @JsonProperty("id"
  24.     private Integer id; 
  25.  
  26.     @JsonProperty("isDdl"
  27.     private Boolean isDdl; 
  28.  
  29.     @JsonProperty("old"
  30.     private List<T> old; 
  31.  
  32.     @JsonProperty("pkNames"
  33.     private List<String> pkNames; 
  34.  
  35.     @JsonProperty("sql"
  36.     private String sql; 
  37.  
  38.     @JsonProperty("ts"
  39.     private Long ts; 

2、MQ消息監聽業務

接下來就是監聽隊列,一旦有Canal服務端有數據推送能夠及時的消費。

代碼很簡單,只是給出個接收的案例,具體的業務邏輯可以根據業務實現,如下:

  1. import cn.hutool.json.JSONUtil; 
  2. import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage; 
  3. import lombok.RequiredArgsConstructor; 
  4. import lombok.extern.slf4j.Slf4j; 
  5. import org.springframework.amqp.rabbit.annotation.Exchange; 
  6. import org.springframework.amqp.rabbit.annotation.Queue; 
  7. import org.springframework.amqp.rabbit.annotation.QueueBinding; 
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener; 
  9. import org.springframework.stereotype.Component; 
  10.  
  11. /** 
  12.  * 監聽MQ獲取Canal增量的數據消息 
  13.  */ 
  14. @Component 
  15. @Slf4j 
  16. @RequiredArgsConstructor 
  17. public class CanalRabbitMQListener { 
  18.  
  19.     @RabbitListener(bindings = { 
  20.             @QueueBinding( 
  21.                     value = @Queue(value = "canal.queue", durable = "true"), 
  22.                     exchange = @Exchange(value = "canal.exchange"), 
  23.                     key = "canal.routing.key" 
  24.             ) 
  25.     }) 
  26.     public void handleDataChange(String message) { 
  27.         //將message轉換為CanalMessage 
  28.         CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class); 
  29.         String tableName = canalMessage.getTable(); 
  30.         log.info("Canal 監聽 {} 發生變化;明細:{}", tableName, message); 
  31.         //TODO 業務邏輯自己完善............... 
  32.     } 

3、測試

下面向表中插入數據,看下接收的消息是什么樣的,SQL如下:

  1. INSERT INTO `oauth_client_details` 
  2. VALUES 
  3.  ( 'myjszl''res1''$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W''all''password,refresh_token,authorization_code,client_credentials,implicit''http://www.baidu.com'NULL, 1000, 1000, NULL'false' ); 

客戶端轉換后的消息如下圖:

上圖可以看出所有的數據都已經成功接收到,只需要根據數據完善自己的業務邏輯即可。

客戶端案例源碼已經上傳GitHub,關注公眾號:碼猿技術專欄,回復關鍵詞:9530 獲取!

總結

 

數據增量同步的開源工具并不只有Canal一種,根據自己的業務需要選擇合適的組件。

 

責任編輯:武曉燕 來源: 碼猿技術專欄
相關推薦

2023-10-06 22:35:19

2023-02-17 07:54:39

2022-12-27 08:56:28

2023-07-27 08:29:09

2024-12-06 08:29:29

2023-08-31 08:32:52

2011-10-28 09:20:36

dorado

2022-08-09 08:31:29

RocketMQ消息中間件

2011-10-24 07:41:38

SOA中間件應用服務器

2014-06-20 09:18:54

Dustjs中間件

2024-11-11 10:02:37

Spring搜索數據

2021-09-09 09:05:30

開源字節跳動CloudWeGo

2022-04-11 09:15:44

中間件開源

2025-04-29 08:36:28

SpringCanal數據庫

2015-12-21 14:56:12

Go語言Http網絡協議

2011-05-24 15:10:48

2021-02-11 08:21:02

中間件開發CRUD

2024-07-03 11:33:02

2013-05-17 15:08:19

紅帽

2013-05-17 17:01:32

紅帽OpenShifPaaS云
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲网站在线观看 | 亚洲视频在线一区 | 日韩精品一区二区三区在线播放 | 色婷婷一区二区三区四区 | 男女深夜网站 | 日韩在线中文 | 久久久做| 中文字幕 在线观看 | 毛片视频网址 | 欧美精品一区二区在线观看 | 欧美色综合一区二区三区 | 四季久久免费一区二区三区四区 | 少妇午夜一级艳片欧美精品 | 国产精品 欧美精品 | 国产精品成人在线 | 中文在线视频 | 91传媒在线观看 | 国产乱码精品一区二区三区五月婷 | 精品久久久久久亚洲综合网 | 国产精品美女久久久久aⅴ国产馆 | 亚洲xx在线 | 爱爱视频在线观看 | 亚洲成av人影片在线观看 | 黄色成人在线网站 | 伊人中文网| 国产欧美一区二区三区日本久久久 | 婷婷激情综合 | 亚洲精品一区二区三区 | 日日干夜夜操 | 国产探花在线精品一区二区 | 亚洲一区视频在线 | 日韩欧美高清dvd碟片 | 久草资源网站 | 亚洲精品久久久久久久不卡四虎 | 国产精品视频一区二区三区不卡 | 国产在线一区二区三区 | 日韩欧美精品一区 | 亚洲色图第一页 | 久久综合亚洲 | 久久久久久久久久久久亚洲 | 精品一二三 |