Kafka Connect如何實現同步RDS binlog數據?
1. 背景
在我們的業務開發中,往往會碰到下面這個場景:
- 業務更新數據寫到數據庫中
- 業務更新數據需要實時傳遞給下游依賴處理
所以傳統的處理架構可能會這樣:

但這個架構也存在著不少弊端:我們需要在項目中維護很多發送消息的代碼。新增或者更新消息都會帶來不少維護成本。所以,更好的處理方式應該是直接將數據庫的數據接入到流式系統中,如下圖:

本文將演示如何在E-MapReduce上實現將RDS binlog實時同步到Kafka集群中。
2. 環境準備
實驗中使用VPC網絡環境,以下實例創建時默認都是在VPC環境下。
2.1 準備一個測試RDS數據庫
創建一個RDS實例,版本選擇5.7。這里不贅述如何創建RDS,詳細流程請參考RDS文檔。創建完如圖:

2.2 準備一個Kafka集群
創建一個E-MapReduce Kafka集群,版本選擇EMR-3.11.0。需要注意,這里必須選擇EMR-3.11.0以上版本,否則不會默認安裝啟動Kafka Connect服務。詳細創建流程請參考E-MapReduce文檔。創建完如圖:

注意:RDS實例和E-MapReduce Kafka集群***在同一個VPC中,否則需要打通兩個VPC之間的網絡。
3. Kafka Connect
3.1 Connector
Kafka Connect是一個用于Kafka和其他數據系統之間進行數據傳輸的工具,它可以實現基于Kafka的數據管道,打通上下游數據源。我們需要做的就是在Kafka Connect服務上運行一個Connector,這個Connector是具體實現如何從/向數據源中讀/寫數據。Confluent提供了很多Connector實現,你可以在這里下載。不過今天我們使用Debezium提供的一個MySQL Connector插件,下載地址。
下載這個插件,并將解壓出來的jar包全部拷貝到kafka lib目錄下。注意:需要將這些jar包拷貝到Kafka集群所有機器上。
在Kafka集群的服務列表中重啟Kafka Connect組件。

3.2 啟動Connector
在創建connector前,我們需要做一番配置,這里羅列一些Debezium MySQL Connector的主要配置項:

登錄到Kafka集群,配置并創建一個connector,命令如下:

這時,我們可以看到一個創建好的connector,如圖:

3.3 注意事項
server_id是多少?:你可以在RDS執行"SELECT @@server_id;"查到。
創建connector時可能會出現連接失敗,請確保RDS的白名單已經授權了Kafka集群機器訪問。
4 測試
4.1 創建一張表

一會之后,Kafka集群中會自動創建一個對應的topic

插入幾條數據

查看binlog數據
查看fulfillment.mugen.students這個topic,是否有剛剛新插入的數據

結果如圖所示:

5. 資料
- confluent官方文檔 https://docs.confluent.io
- debezium官網 http://debezium.io/
- kafka官方文檔 http://kafka.apache.org/documentation.html