超詳細的RabbitMQ入門與實戰介紹,看這篇文章就夠了
一、前情提示
上一篇文章《?教你面試的時候如何迅速完成90%以上的海量數據處理題?》,我們已經給出了一整套的數據一致性的保障方案。
我們從如下三個角度,給出了方案如何實現。并且通過數據平臺和電商系統進行了舉例分析。
- 核心數據的監控。
- 數據鏈路追蹤。
- 自動化數據鏈路分析。
目前為止,我們的架構圖大概如下所示:
并且咱們之前對于這種架構下,如何基于MQ進行解耦的實現也做了詳細的說明。
那么這篇文章,我們就基于這個架構,在數據一致性方面做進一步的說明。同樣,我們以RabbitMQ這個消息中間件來舉例。
二、選擇性的訂閱部分核心數據
?首先一個基于MQ實現的細節點就在于,比如對數據監控系統而言,他可能僅僅只是要從MQ里訂閱部分數據來消費罷了。
這個是啥意思呢?因為比如實時計算平臺他是會將自己計算出來的所有的數據指標都投遞到MQ里去的。
但是這些數據指標可能是多達幾十個甚至是幾百個的,這里面不可能所有數據指標都是核心數據吧?
基本上按照我們過往經驗而言,對于這種數據類的系統核心數據指標,大概就占到10%左右的比例而已。
然后對于數據查詢平臺而言,他可能是需要把所有的數據指標都消費出來,然后落地到自己的存儲里去的。
但是對于數據監控系統而言,他只需要過濾出10%的核心數據指標即可,所以他需要的是有選擇性的訂閱數據。
咱們看看下面的圖,立馬就明白是什么意思了。?
三、RabbitMQ的queue與exchange的綁定
不知道大家是否還記得之前講解基于RabbitMQ實現多系統訂閱同一份數據的場景。
我們采用的是每個系統使用自己的一個queue,但是都綁定到一個fanout exchange上去,然后生產者直接投遞數據到fanout exchange。
fanout exchange會分發一份數據,綁定到自己的所有queue上去,然后各個系統都會從自己的queue里拿到相同的一份數據。
大家再看看下面的圖回顧一下。
在這里有一個關鍵的代碼如下所示:
?也就是說,把自己創建的queue綁定到exchange上去,這個綁定關系在RabbitMQ里有一個專業的術語叫做:binding。
四、direct exchange實現消息路由
如果僅僅使用之前的fanout exchange,那么是無法實現不同的系統按需訂閱數據的,如果要實現允許不同的系統按需訂閱數據,那么需要使用direct exchange。
direct exchange允許你在投遞消息的時候,給每個消息打上一個routing key。同時direct exchange還允許binding到自己的queue指定一個binding key。
這樣,direct exchange就會根據消息的routing key將這個消息路由到相同binding key對應的queue里去,這樣就可以實現不同的系統按需訂閱數據了。
說了這么多,是不是感覺有點暈,老規矩,咱們來一張圖,直觀的感受一下怎么回事兒:
而且一個queue是可以使用多個binding key的,比如說使用“k1”和“k2”兩個binding key的話,那么routing key為“k1”和“k2”的消息都會路由到那個queue里去。
同時不同的queue也可以指定相同的ruoting key,這個時候就跟fanout exchange其實是一樣的了,一個消息會同時路由到多個queue里去。
五、按需訂閱的代碼實現
首先在生產者那塊,比如說實時計算平臺吧,他就應該是要定義一個direct exchange了。
如下代碼所示,所有的數據都是投遞到這個exchange里去,比如我們這里使用的exchange名字就是“rt_data”,意思就是實時數據計算結果,類型是“direct”:
channel.exchangeDeclare(
"rt_data",
"direct");
而且,在投遞消息的時候,要給一個消息打上標簽,也就是他的routing key,表明這個消息是普通數據還是核心數據,這樣才能實現路由,如下代碼所示:
上面第一個參數是指定要投遞到哪個exchange里去,第二個參數就是routing key,這里的“common_data”代表了是普通數據,也可以用“core_data”代表核心數據,實時計算平臺根據自己的情況指定普通或者核心數據。
然后消費者在進行queue和exchange的binding的時候,需要指定binding key,代碼如下所示:
上面第一行就是在消費者那里,比如數據監控系統那里,也是定義一下direct exchange。
然后第二行就是定義一個“rt_data_monitor“這個queue。
第三行就是對queue和exchange進行綁定,指定了binding key是“core_data”。
如果是數據查詢系統,他是普通數據和核心數據都要的,那么就可以在binding key里指定多個值,用逗號隔開,如下所示:
channel.queueBind(
"rt_data_query",
"rt_data",
"common_data, core_data");
到這里,大家就明白如何對數據打上不同的標簽(也就是routing key),然后讓不同的系統按需訂閱自己需要的數據了(也就是指定binding key),這種方式用到了direct exchange這種類型,非常的靈活。
最后,再看看之前畫的那幅圖,大家再來感受一下即可:
六、更加強大而且靈活的按需訂閱
?RabbitMQ 還支持更加強大而且靈活的按需數據訂閱,也就是使用topic exchange,其實跟direct exchange是類似的,只不過功能更加的強大罷了。
比如說你定義一個topic exchange,然后routing key就需要指定為用點號隔開的多個單詞,如下所示:?
然后,你在設置binding key的時候,他是支持通配符的。 * 匹配一個單詞,# 匹配0個或者多個單詞,比如說你的binding key可以這么來設置:
這個product.*.* ,就會跟“product.common.data”匹配上,意思就是,可能某個系統就是對商品類的數據指標感興趣,不管是普通數據還是核心數據。
所以到這里,大家就應該很容易明白了,通過RabbitMQ的direct、topic兩種exchange,我們可以輕松實現各種強大的數據按需訂閱的功能。
通過本文,我們就將最近講的數據一致性保障方案里的一些MQ中間件落地的細節給大家說明白了。