一文搞定消息隊列(MQ)之生產者-消費者
大家好,我是狼王,一個愛打球的程序員
隨著互聯網的發展,技術也在快速的迭代中,由于大流量,高并發的出現,很多問題也隨之而來了,為了解決這些問題,一些高端的人才研究出了各種解決這些問題的東西,消息隊列就是其中一種。那么今天,我們就來聊聊消息隊列吧!
什么是消息隊列?
消息隊列不知道大家看到這個詞的時候,會不會覺得它是一個比較高端的技術,反正我是覺得它好像是挺牛逼的。
消息隊列,一般我們會簡稱它為MQ(Message Queue),嗯,就是很直白的簡寫。
我們先不管消息(Message)這個詞,來看看隊列(Queue)。這一看,隊列大家應該都熟悉吧。
隊列是一種先進先出的數據結構。
在Java里邊,已經實現了不少的隊列了。
那為什么還需要消息隊列(MQ)這種中間件呢???
其實這個問題,跟之前我學Redis的時候很像。Redis是一個以key-value形式存儲的內存數據庫,明明我們可以使用類似HashMap這種實現類就可以達到類似的效果了,那還為什么要Redis?
到這里,大家可以先猜猜為什么要用消息隊列(MQ)這種中間件
消息隊列可以簡單理解為:把要傳輸的數據放在隊列中。
科普:
- 把數據放到消息隊列叫做生產者
- 從消息隊列里邊取數據叫做消費者
市面上的消息隊列產品有很多,比如老牌的 ActiveMQ、RabbitMQ ,目前比較火的有Kafka ,和阿里巴巴捐贈給 Apache 的 RocketMQ ,連 redis 這樣的 NoSQL 數據庫也支持 MQ 功能。總之這塊知名的產品就有十幾種。
為什么要用消息隊列,也就是在問:用了消息隊列有什么好處。
解耦
以常見的訂單系統為例
用戶點擊【下單】按鈕之后的業務邏輯可能包括:扣減庫存、生成相應單據、發貨、發短信通知等。
在業務發展初期這些邏輯可能放在一起同步執行,隨著業務的發展訂單量增長,需要提升系統服務的性能。
這時可以將一些不需要立即生效的操作拆分出來異步執行,比如發貨、發短信通知等。
這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發貨或發短信之類的消息時,執行相應的業務邏輯。
簡單的說就是原來a服務需要調用b服務的接口或者方法來進行數據的傳遞,這個時候使用消息隊列的話,a服務只需將數據發送到消息隊列中,b服務從消息隊列中取出相應的數據即可,就實現了解耦
異步
異步其實就是a服務將數據發送到消息隊列之后就可以進行返回或者執行其他過程,不需要等待b服務處理數據,從而來提高一些使用異步的業務場景的效率問題
削峰/限流
我們再來一個場景,比如現在我們每個月要搞一次大促,大促期間的并發可能會很高的,比如每秒5000個請求。假設我們現在有兩臺機器處理請求,并且每臺機器只能每次處理2000個請求。
那多出來的1000個請求,可能就把我們整個系統給搞崩了,所以,有一種辦法,我們可以寫到消息隊列中:
服務器A和服務器B根據自己的能夠處理的請求數去消息隊列中拿數據,這樣即便有每秒有1w個請求,那只是把請求放在消息隊列中,去拿消息隊列的消息由系統自己去控制,這樣就不會把整個系統給搞崩。
使用消息隊列有什么問題?
經過我們上面的場景,我們已經可以發現,消息隊列能做的事其實還是蠻多的。
說到這里,我們先回到文章的開頭,"明明JDK已經有不少的隊列實現了,我們還需要消息隊列中間件呢?"
其實很簡單,JDK實現的隊列種類雖然有很多種,但是都是簡單的內存隊列。為什么我說JDK是簡單的內存隊列呢?
下面我們來看看要實現消息隊列(中間件)可能要考慮什么問題。
高可用
無論是我們使用消息隊列來做解耦、異步還是削峰,消息隊列肯定不能是單機的。試著想一下,如果是單機的消息隊列,萬一這臺機器掛了,那我們整個系統幾乎就是不可用了,就出現了單點故障。
所以,當我們項目中使用消息隊列,都是得集群/分布式的。要做集群/分布式就必然希望該消息隊列能夠提供現成的支持,而不是自己寫代碼手動去實現。
數據丟失問題
我們將數據寫到消息隊列上,服務器A和服務器B還沒來得及消費消息隊列的數據,就掛掉了。如果沒有做任何的措施,我們的數據就丟了。
學過Redis的都知道,Redis可以將數據持久化磁盤上,萬一Redis掛了,還能從磁盤將數據恢復過來。同樣地,消息隊列中的數據也需要存在別的地方,這樣才盡可能減少數據的丟失。
- 那存在哪呢?
- 磁盤?
- 數據庫?
- 同步存儲還是異步存儲?
不同的MQ針對消息丟失的處理和解決方案都有所不同,但是肯定都是從生產者和消費者兩端進行分析的。
生產者端丟失消息
生產者要確保消息發送到了MQ,就會有回調確認機制的處理和事務的方式
消息隊列丟失消息
在消息隊列中假如因為MQ掛了導致消息丟了,那么就可以將消息持久化,或者使用生產者端重發消息的方式
消費者端丟消息
一般消費者丟了消息的原因就是從MQ中取到了消息,但是可能消費失敗了需要重新消費,但是MQ中已經沒有該條消息了,這樣的話可以通過消費者端手動確認的機制,或者讓生產者端重發消息的方式
消費者怎么得到消息隊列的數據?
消費者怎么從消息隊列里邊得到數據?一般有兩種辦法:
生產者將數據放到消息隊列中,消息隊列有數據了,主動叫消費者去拿(俗稱push)
消費者不斷去輪訓消息隊列,看看有沒有新的數據,如果有就消費(俗稱pull)
其他問題
除了這些,我們在使用的時候還得考慮各種的問題:
消息重復消費了怎么辦啊?我想保證消息是絕對有順序的怎么做?……..
雖然消息隊列給我們帶來了那么多的好處,但同時我們發現引入消息隊列也會提高系統的復雜性。市面上現在已經有不少消息隊列輪子了,每種消息隊列都有自己的特點,選取哪種MQ還得好好斟酌。
這次我們先來講講RabbitMQ
RabbitMQ
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。
AMQP : Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,為面向消息的中間件設計,基于此協議的客戶端與消息中間件可傳遞消息,并不受產品、開發語言等條件的限制。
RabbitMQ 最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特點包括:
- 可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。
- 靈活的路由(Flexible Routing) 在消息進入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange 。
- 消息集群(Clustering) 多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker 。
- 高可用(Highly Available Queues) 隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
- 多種協議(Multi-protocol) RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT 等等。
- 多語言客戶端(Many Clients) RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。
- 管理界面(Management UI) RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面。
- 跟蹤機制(Tracing) 如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什么。
- 插件機制(Plugin System) RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。
RabbitMQ 中的概念模型
消息模型
所有 MQ 產品從模型抽象上來說都是一樣的過程:消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,最后將消息發送到監聽的消費者。
RabbitMQ 基本概念
上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念需要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,所以其內部實際上也是 AMQP 中的基本概念:
- Message 消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
- Publisher 消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
- Exchange 交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。
- Binding 綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
- Queue 消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
- Connection 網絡連接,比如一個TCP連接。
- Channel 信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
- Consumer 消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
- Virtual Host 虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
- Broker 表示消息隊列服務器實體。
本文主要講解了什么是消息隊列,消息隊列可以為我們帶來什么好處,以及一個消息隊列可能會涉及到哪些問題,后來會更加深入的去探討哦!希望給大家帶來一定的幫助。
好了。今天就說到這了,我還會不斷分享自己的所學所想,希望我們一起走在成功的道路上!