MySQL 同步 ES 實戰,肝到爆!
大家好呀,我是樓仔。
技術是什么?就是拿來玩的,邊玩邊學,才能成長得更快。
之前已經給大家講解了 MySQL 同步 ES 的幾種方案,下面就教大家如何通過 Canal,將 MySQL 同步到 ES,文章內容絕對妥妥干貨!
本文會先講解需要用到的基礎知識,然后再是軟件安裝,最后就是實戰部分。
不 BB,上文章目錄:
01 基礎知識
1.1 主從復制原理
MySQL 的主從復制是依賴于 binlog,也就是記錄 MySQL 上的所有變化并以二進制形式保存在磁盤上二進制日志文件。
主從復制就是將 binlog 中的數據從主庫傳輸到從庫上,一般這個過程是異步的,即主庫上的操作不會等待 binlog 同步地完成。
詳細流程如下:
- 主庫寫 binlog:主庫的更新 SQL(update、insert、delete) 被寫到 binlog;
- 主庫發送 binlog:主庫創建一個 log dump 線程來發送 binlog 給從庫;
- 從庫寫 relay log:從庫在連接到主節點時會創建一個 IO 線程,以請求主庫更新的 binlog,并且把接收到的 binlog 信息寫入一個叫做 relay log 的日志文件;
- 從庫回放:從庫還會創建一個 SQL 線程讀取 relay log 中的內容,并且在從庫中做回放,最終實現主從的一致性。
1.2 Cannel 基礎
Canel 是一款常用的數據同步工具,其原理是基于 Binlog 訂閱的方式實現,模擬一個 MySQL Slave 訂閱 Binlog 日志,從而實現 CDC(Change Data Capture),將已提交的更改發送到下游。
主要流程如下:
- Canal 服務端向 MySQL 的 master 節點傳輸 dump 協議;
- MySQL 的 master 節點接收到 dump 請求后推送 Binlog 日志給 Canal 服務端,解析 Binlog 對象(原始為 byte 流)轉成 Json 格式;
- Canal 客戶端通過 TCP 協議或 MQ 形式監聽 Canal 服務端,同步數據到 ES。
下面是 Cannel 執行的核心流程,其中 Binlog Parser 主要負責 Binlog 的提取、解析和推送,EventSink 負責數據的過濾 、路由和加工,僅作了解即可。
02 軟件下載安裝
我的電腦是 Macos-x64,所以后面的軟件安裝,都是基于這個。
2.1 Java JDK
- 官網:https://www.oracle.com/java/technologies/downloads/
- JDK 版本:11.0.19
由于 Canal 和 ES 的安裝,都強依賴 JDK,所以這里有必要先說明。
前方高能,這里有坑!!!
如果你選的版本不對,ES 安裝可能會失敗,然后 Canal 同步數據到 ES 時,也會出現很多詭異的問題。
2.2 MySQL
MySQL 大家應該都安裝了,這里需要打開 MySQL 的 BinLog。
我是 Mac,主要新建一個 my.cnf 文件,然后再重啟 MySQL。
這里重啟 MySQL,我搞了半天,BinLog 開啟后,會看到 BinLog 日志。
然后需要創建一個賬號,賬號和密碼都是 Cannal,給后面 Canal 使用。
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost' IDENTIFIED BY 'canal' ;
2.3 Canal
- 官網:https://github.com/alibaba/canal/releases
- 版本:v1.1.6
下載 canal.adapter 和 canal.deployer 兩個就可以:
- canal.deployer:相當于 canal 的服務端,啟動它才可以在客戶端接收數據庫變更信息。
- canal.adapter:增加客戶端數據落地的適配及啟動功能(當 deployer 接收到消息后,會根據不同的目標源做適配,比如是 es 目標源適配和 hbase 適配等等)。
備注:canal.admin 為 canal提供整體配置管理、節點運維等面向運維的功能,提供相對友好的 WebUI 操作界面,方便更多用戶快速和安全的操作,我這邊使用的是單機的,因此就沒有下載安裝,大家也可以拉 source code 源碼去研究下。
2.4 ES
- ES 官網:https://www.elastic.co/cn/downloads/elasticsearch
- ES 版本:7.17.4
Mac 安裝 ES 非常簡單:
brew install elasticsearch
安裝細節不贅述,安裝成功后,輸入以下網址:
http://localhost:9200/?pretty
2.5 Kibana
- 下載網址:https://www.elastic.co/cn/downloads/past-releases#kibana
- 版本:7.14.0
它是 ES 的界面化操作工具,安裝細節不贅述,安裝成功后,輸入以下網址:
http://localhost:5601/app/dev_tools#/console
2.6 IK 分詞器
- 下載網址:https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.17.2
- 版本:v7.17.2
它是 ES 的分詞器,安裝細節不贅述,安裝成功后,可以驗證一下分詞效果。
2.7 小節
MySQL 開啟 BinLog,這個不難,主要觀察是否有 BinLog 日志。
Canal 的安裝是最復雜的,涉及到很多配置修改,后面會講解。
最后是 ES + Kibana + IK 分詞器,這個其實也不難,主要關注 ES 綁定的 JDK 版本,三款軟件的安裝可以參考這篇:https://blog.csdn.net/weixin_46049028/article/details/129956485
03 Canal 配置
3.1 canal.deployer 配置
修改 conf—>example 文件夾的 instance.properties 監聽的數據庫配置。
這里主要修改的監聽 MySQL 的 URL、用戶名和密碼。
這里默認的賬號密碼就是 canal,前面已經教大家如何創建了。
3.2 canal.deployer 啟動
在 canal.deployer 中的 bin 文件下去啟動命令 startup.sh
這樣就代表已經啟動了,我們可以去看下啟動日志。
上面 start successful 代表已經啟動成功,并且已經監聽我的 MySQL 數據庫。
3.3 canal.adapte 配置
Step1: 先把 adapter 下面的 bootstrap.yml,全部注釋掉,否則會提示你 XX 表不存在,這里坑了我好慘。
Step2: 再修改 adapter 的 application.yml 配置文件。
這里的坑,一般就是 mysql 的賬號密碼不對,或者給的 es 鏈接,沒有"http://"前綴,這些都是我過踩的坑。
Step3: 修改我們在 application.yml 中配置的目標數據源 es7 文件夾內容。
由于我們這里是對表 article 進行去監聽,因此我們在 es7 文件夾中去創建 article.yml 文件。
由于我們需要把技術派項目中的文章查詢功能,改造成 ES 的查詢方式,所以我們就把技術派的文章表 article,同步到 ES 中。
yml文件配置如下:
dataSourceKey: defaultDS # 源數據源的key, 對應上面配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 對應MQ模式下的groupId, 只會同步對應groupId的數據
esMapping:
_index: article # es 的索引名稱
_id: _id # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
sql: "SELECT t.id AS _id,t.id,t.user_id,t.article_type,t.title,t.short_title,t.picture,
t.summary,t.category_id,t.source,t.source_url,t.offical_stat,t.topping_stat,
t.cream_stat,t.`status`,t.deleted,t.create_time,t.update_time
FROM article t" # sql映射
commitBatch: 1 # 提交批大小
Step4: 在 Kibana 中創建 ES 的 article 索引
代碼如下:
PUT /article
{
"mappings" : {
"properties" : {
"id" : {
"type" : "integer"
},
"user_id" : {
"type" : "integer"
},
"article_type" : {
"type" : "integer"
},
"title" : {
"type" : "text",
"analyzer": "ik_max_word"
},
"short_title" : {
"type" : "text",
"analyzer": "ik_max_word"
},
"picture" : {
"type" : "text",
"analyzer": "ik_max_word"
},
"summary" : {
"type" : "text",
"analyzer": "ik_max_word"
},
"category_id" : {
"type" : "integer"
},
"source" : {
"type" : "integer"
},
"source_url" : {
"type" : "text",
"analyzer": "ik_max_word"
},
"offical_stat" : {
"type" : "integer"
},
"topping_stat" : {
"type" : "integer"
},
"cream_stat" : {
"type" : "integer"
},
"status" : {
"type" : "integer"
},
"deleted" : {
"type" : "integer"
},
"create_time" : {
"type" : "date"
},
"update_time" : {
"type" : "date"
}
}
}
}
3.4 canal.adapte 啟動
我們看下啟動日志:
上面沒有任何報錯,并且已經啟動了 8081 端口,說明已經啟動成功,此時我們就可以操作了。
04 數據同步實戰
4.1 全量同步
在開始 adapter 之后,我們應該先來一把全量數據同步,在源碼中提供了一個接口進行全量同步,命令如下:
curl http://127.0.0.1:8081/etl/es7/article.yml -X POST
上面就是執行同步成功后,提示已經導入 10 條。
4.2 增量同步
增量數據就是當我在 MySQL 中 update、delete 和 insert 時,那么 ES 中數據也會對應發生變化,我下面演示下修改:
日志打印如下:
ES查詢結果如下:
上面結果中說明 ES 已經更改成功。
05 總結
我們再回顧一下整體執行流程:
寫到這里,就結束了,是不是滿滿的干貨呢?基本是手把手教你如何將 MySQL 同步到 ES,不僅是增量同步,還包括全量同步,如果你的項目也需要用到該場景,基本可以直接照搬。