Hiriver —— MySQL 數據變化的監控與分發框架
什么是hiriver?
hiriver是純java開發的、高性能的、基于解析mysql row base binlog技術實現的用于監控mysql數據變化并分發這些變化的框架。它提供了一套完整的框架,內置數據監控線程和數據消費線程,對外提供簡單的Consumer接口,開發者可以根據自己的業務場景自行實現Consumer接口,而不不必關心線程問題。
實現原理
hiriver實現了mysql主從復制協議,把自己偽裝成一個mysql的從庫,在接收到binlog后按照mysql binlog協議進行解析,由此獲取mysql的數據變化。由于基于mysql的主從復制協議,它監控數據變化特別快,理論上與mysql本身的主從同步一樣快,甚至更快。同時與在應用層監控數據變化不同,它不需要考慮事務是否成功問題。當然,***限制***是mysql binlog的方式必須是***row***方式。
名字的由來
hiriver是hidden river的簡稱,中文名稱”暗渠”,用于隱喻在數據庫的后面導流數據,而不必要在應用層做任何控制。
支持mysql的版本
hiriver支持mysql 5.6.9+和 mysql5.1+版本。
- 強烈推薦 使用5.6.9+版本,并使用binlog file name + position的方式處理同步點。
- 雖然5.6.9+版本提供 gtid 功能,它是用于表示事務的唯一的id,理論上,基于它可以實現HA功能,當mysql出現故障時可以自動從一臺mysql從庫切換到另一臺,并且不會丟失或者重復數據, 但是 在實際的使用過程中gtid依然存在bug,并不穩定,而且存在多個gtid時很難找到mysql認識的初始同步點。
- mysql5.6.9之前的版本,必須binlog file name和在該文件中的偏移位置作為同步點。
javadoc
使用教程
quickstart
總體說明
- hiriver模塊組主要由2個組件和一個示例組成:mysql-proto、hiriver和hiriver-sample
- mysql-proto實現了mysql的client-server協議,包括Text protocol和主從復制協議。Text protocol是從mysql 正常 讀取數據的協議,它是mysql jdbc驅動背后的協議。主從復制協議顧名思義就是實現主從之間復制數據的協議。
- hiriver是基于mysql-proto組件封裝的監聽mysql變化、記錄同步點、控制數據消費的上層應用框架。它是hiriver業務流程的實現。它需要與spirng集成使用
- hiriver-sample一個使用hirvier的示例
準備數據庫環境
1.創建自己的mysql 5.6.28
2.開啟row base和gtid 模式(如果使用gtid作為同步點,必須開啟)
- log-bin=mysql-bin
- binlog_format=Row
- log-slave-updates=ON
- enforce_gtid_consistency=true
- gtid_mode=ON
3.創建自己的復制賬號,創建repl database和一張表,并在表示寫入數據
快速使用-binlogname + 偏移地址模式
1.下載代碼,找到hiriver-sample模塊,它是一個基于spring的web應用,有3 spring xml配置文件,分別是:
- spring-boot.xml # spring容器描述入口文件
- spring-bin.xml # binlogname + 偏移地址模式
- spring-gtid.xml # gtid模式
2.修改示例中hiriver-sample.properties的參數,修改數據庫相關屬性、初始同步點、同步點存儲路徑和表名過濾黑、白名單配置
3.初始化同步點使用channel.0000.binlog和channel.0000.binlog.pos屬性,可以通過執行
- show master status
命令獲取對應信息
修改后如圖:
4.修改spring-boot.xml中的***一行為:
- <import resource="classpath:spring/spring-binlog.xml"/>
5.使用tomcat/jetty或maven jetty插件運行示例即可
快速使用-gtid模式
下載代碼,找到hiriver-sample模塊,它是一個基于spring的web應用,有3 spring xml配置文件,分別是:
- spring-boot.xml # spring容器描述入口文件
- spring-bin.xml # binlogname + 偏移地址模式
- spring-gtid.xml # gtid模式
2.修改示例中hiriver-sample.properties的參數,修改數據庫相關屬性、初始同步點、同步點存儲路徑和表名過濾黑、白名單配置,其中channel_0000.gtid參數的配置需要從mysql中查詢數獲取,執行
- show master status
命令,得到如下結果:
這是一個范圍,你只需要使用
- 8c80613e-ac5b-11e5-b170-148044d6636f:1 or 8c80613e-ac5b-11e5-b170-148044d6636f:8
即可.修改后如圖:
修改spring-boot.xml中的***一行為: <import resource="classpath:spring/spring-gtid.xml"/>
使用tomcat/jetty或maven jetty插件運行示例即可
詳細參數說明
底層socket控制參數(使用TransportConfig類描述)
binlog讀取參數(DefaultChannelStream類)
數據庫配置
重點類說明
底層通信類
binlog dump類(BinlogStreamBlockingTransportImpl)
實現mysql binlog dump協議,負責與mysql建立socket連接,完成用戶名密碼驗證后,執行數據dump命令,并持續的讀取、解析mysql binlog event數據。
數據庫數據讀取類(TextProtocolBlockingTransportImpl)
mysql文本協議的實現,mysql文本協議即jdbc背后的協議,主要用于執行sql讀取數據,也可以執行一些其他的命令,比如讀取表定義的元數據等,之所以不使用mysql jdbc是由于兩個原因:一是不想引入一個第三方包,降低依賴性;二是mysql的文本協議支持更多指令,比如COM_FIELD_LIST指令方便的獲取到表字段是否為空、是否是索引字段等信息,而jdbc是個通用的api,并沒有暴露這些指令實現。
表名過濾類 (BlackWhiteNameListTableFilter)
支持黑白名單的過濾實現。 按照表名進行過濾時,表名格式為database.table(可以為正則),以逗號分隔.
當白名單和黑名單同時存在時,只有不在黑名單中同時在白名單中存在的才起作用.
e.g,在properties文件中描述
白名單:filert_white=test.account,test.user_sharding*
白名單:filert_black=test.*bak
binlog row event數據描述類(BinlogDataSet)
binlog數據是二進制數據,它遵循mysql rowbase binlog協議,在協議內部event作為一個基本單位用于描述數據庫的變更,這里的“變更”不僅僅是數據的修改,也可能是事務的開啟、結束,表的變更等,在hiriver里我們僅僅關注表數據的變更,BinlogDataSet用于描述一條或多條數據的變化,類似于jdbc的RowSet。BinlogDataSet 包括:
- channelId
- sourceHostUrl,該數據來自哪個數據庫
- gtId, 該數據所在的事務的gtid,在不支持gtid模式下,為null
- binlogPos, 當前數據所在事務的binlogfile + pos,無論哪種模式,一定補位null
- isStartTransEvent, 當前是否一個事務的開啟
- isPositionStoreTrigger,當前是否一個事務的結束,當時true時需要記錄同步點。
- rowDataMap, 行數據,每一行使用BinlogResultRow描述
- columnDefMap, 類定義描述
BinlogResultRow內部是有二個列表,一個記錄變更之前的數據,另一個記錄變更之后的數據。
數據消費類 (Consumer)
描述消費BinlogDataSet數據的接口,這個留給業務實現方來實現。
binlog流(DefaultChannelStream)
mysql binlog dump被抽象成一個流,每一個流僅僅針對一個mysql實例,這個流稱之為ChannelStream, ChannelStream負責源源不斷的從mysql實例讀取數據并過濾、解析和消費。
DefaultChannelStream是ChannelStream的缺省實現,在內部它開啟了2條線程:provider和consumer線程,provider線程負責從數據庫讀取數據,識別事務、根據表名過濾、解析成BinlogDataSet并放入ChannelBuffer;consumer線程負責從ChannelBuffer讀取數據并調用Consumer進行數據消費。
當provider線程產生數據的速度大于consumer線程消費數據的速度時,數據會被積壓在ChannelBuffer中,為了防止內存被打爆,ChannelBuffer需要實現成有界的,當ChannelBuffer達到上限時會阻塞provider線程產生新數據。
數據緩存類 (DefaultChannelBuffer)
ChannelStream中provider和consumer線程的數據通信基礎,它是ChannelBuffer的缺省實現。謹記,需要配置上限。
事務識別類(TransactionRecognizer)
用于識別事務的開啟、結束,并且記錄當前事務的開始位置。針對gtid和binlog file name + pos兩種模式,提供2種實現:GTIDTransactionRecognizer和BinlogNameAndPosTransactionRecognizer。