消息中間件深度系列|異構消息隊列的海量數據流轉Connect架構解析
一、背景
5G時代,萬物互聯,越來越多的企業期望搭建數據分析業務中臺,利用大數據技術、通過全局規劃來治理企業的數據資產。而在業務系統,或者大數據系統中異構數據源之間的數據同步是十分有必要的,傳統的點對點的數據同步工具,應對越來越多的異構數據源同步會產生N*N的問題,付出的開發成本和維護成本都是非常高的。因此,移動云消息隊列MQTT團隊積極打通數據孤島,基于開源RocketMQ Connect組件推出全新的MQTT-RocketMQ Connect架構,助力海量物聯網消息自由流轉,為萬物互聯保駕護航。
二、MQTT-RocketMQ Connect介紹
首先,先簡單介紹一下MQTT-RocketMQ Connect架構的基石—RocketMQ Connect,它是RocketMQ數據集成的重要組件,可將各種系統中的數據通過高效、可靠、流的方式,流入流出到RocketMQ,可以實現各種異構數據系統的連接,構建數據管道、ETL、CDC、數據湖等能力。
從架構上看,RocketMQ Connect就是借助RocketMQ從其他異構系統獲取數據且以消息的方式發送到RocketMQ作為中轉,然后從RocketMQ消費消息并寫入到其他系統。
圖1 RocketMQ Connect 總覽
MQTT-RocketMQ Connect在開源的Apache RocketMQ Connect組件基礎之上,根據移動云消息隊列MQTT的數據模型、業務場景和流轉規則等特點,做了深度的架構優化與設計,實現了移動云消息隊列RocketMQ與MQTT之間的消息流轉與規則管理。它主要由Connector、Runtime、Worker和Task組成。
Connector
包含 Source Connector和 Sink Connector兩類,其中,
1.Source Connector:負責從源數據中獲取數據并將其發送到 RocketMQ。
2.Sink Connector:負責使用來自 RocketMQ的消息并將數據寫入目標存儲。
Runtime
Runtime是Source、Sink Connector的運行時環境,負責加載Connector,提供RESTful接口,啟動Connector任務,集群節點之間服務發現、配置同步、消費進度保存、故障轉移、負載均衡等能力。
Worker
一個Worker進程代表一個Runtime 運行時環境進程,多個Worker進程組成了一個集群,支持更多的Connector 和 Task的并行運行工作。
Task
Task是執行具體的數據解析和轉儲的任務,其中,
1.SourceTask:從源數據系統中,執行完成數據解析工作,通過poll()接口暴露給Runtime。
2.SinkTask:Runtime從內存獲取數據并通過put()接口方法解析至目標數據源系統中。
3.DirectTask:同時包含SourceTask和SinkTask,兩者直接交互,不再經過Runtime。
三、MQTT-RocketMQ Connect架構設計
消息隊列MQTT以RocketMQ作為消息的存儲層,消息數據會在RocketMQ中保存一份。因此,可以將消息隊列MQTT的存儲層RocketMQ作為源數據端。采用標準的Connect架構要實現異構數據源的數據流轉,Source Task 和Sink Task必須一一對應,兩者通過中間的RocketMQ關聯。按照現在的架構兩端都是RocketMQ,使用一個特殊的Direct Task,讓消息不再經過中間的RocketMQ,而是直接流入到目標RocketMQ中,反之亦然。通過優化架構可以有效降低時延,提升速率。
圖2 移動云消息隊列MQTT消息存儲架構
在Runtime進程組成的集群中,將源消息隊列的海量數據,通過端到端Connector和Task以數據解析和轉儲的方式異步復制至目標集群,完成異構消息隊列的數據流轉。其中Runtime集群中每個Worker節點啟動Connector相關的配置信息,也會像集群信息一樣在集群中每個節點全量同步,同時會持久化到每個節點。集群中如果有某個Worker節點掛掉,集群信息會發生變化,當每個節點檢查到集群信息發生了變化就會觸發負載均衡,對集群中運行的Connector和Task重新分配,從而保證故障節點的任務分配到其它節點處理,保證高可用。
圖3 MQTT-RocketMQ Connect架構圖
了解了MQTT-RocketMQ Connect的架構,下面看一下如何自己實現一個簡單的MQTT和RocketMQ之間的消息流轉。
通過前面的介紹,應該清楚,需要實現兩個Connector和Task,一個是從作為MQTT存儲層的RocketMQ到目標RocketMQ的Connector和Task,第二個是從RocketMQ讀數據寫入到目標MQTT的Connector和Task。
圖4 MQTT消息流轉到RocketMQ流程圖
以消息從MQTT流轉到RocketMQ為例,主要由三組接口組成:SourceConnector、SourceTask和SinkTask。
圖5 Connector和Task接口概覽
1.SourceConnector負責connector生命周期的管理、創建對應的Task并將接收到的Connector配置信息拆分出每個task的配置信息。
2.SourceTask負責拉取消息,并對消費者的生命周期進行管理。用戶還可以根據實際需要添加消息封裝、轉存等方法。
3.SinkTask負責接收SourceTask推送的消息,并對生產者的生命周期進行管理。同樣的,用戶還可以根據實際需要添加消息解析,過濾等方法。
一個connector的生命周期主要分為三個階段:啟動、運行、停止。
創建并啟動connector
創建并啟動Connector過程大致可以分為以下幾個階段:
- 控制臺創建規則階段
- 初始化配置階段
- 負載均衡階段
圖6 Connector啟動階段流程圖
運行task任務
- 在Connector 實例被啟動后,Connector可以根據配置信息,對解析任務進行拆分,分配出task。這么做的目的是為了提高并行度,提升處理效率。
停止并刪除connector
停止并刪除Connector過程大致可以分為以下幾個階段:
- 控制臺停止規則階段
- 更新配置階段
- 負載均衡階段
圖7 Connector停止階段流程圖
四、MQTT-RocketMQ Connect高可用部署
MQTT-RocketMQ Connect Worker支持兩種運行模式,集群和單機模式。
4.1/集群模式
集群模式,顧名思義,由多個Worker節點組成高可用集群。集群間的config、offset和status信息通過指定RocketMQ Topic存儲,新增Worker節點也會獲取到集群中的這些config、offset和status信息,并且觸發負載均衡,重新分配集群中的任務,使集群達到均衡的狀態。減少Woker節點或者Worker宕機也會觸發負載均衡,從而保障集群中所有的任務都可以均衡的在集群中存活的節點中正常運行。
圖8 MQTT-RocketMQ Connect集群模式示意圖
4.2 /單機模式
單機模式,Connector任務運行在單機上,Worker本身沒有高可用,任務offset信息持久化在本地。適合一些對高可用要求不高或者不需要Worker保障高可用的場景,例如部署在k8s集群中,由k8s集群保障高可用。
五、MQTT-RocketMQ Connect優秀特性
為了保證MQTT和RocketMQ之間有高速穩定的消息流轉通道,MQTT-RocketMQ Connect具有許多優秀的特性:
六、總結與展望
本文介紹了異構消息隊列海量數據流轉的設計與實踐,基于RocketMQ Connect和移動云消息隊列MQTT本身的架構特點,做了深度的架構優化與設計,實現了移動云消息隊列RocketMQ與MQTT之間的消息流轉與規則管理。隨著萬物互聯的持續深入,未來消息隊列MQTT團隊還將基于現在的架構繼續優化和創新,例如:
1 ? ? ? ? ?
增加對其他異構數據源(Redis、MySQL、Kafka)等組件的消息流轉支持
2 ? ? ? ? ?
增加對集群Worker、Connector、Task狀態的管理
3 ? ? ? ? ?
優化不支持poll方式獲取消息的服務