消息隊列的七種經典應用場景
在筆者心中,消息隊列,緩存,分庫分表是高并發解決方案三劍客。
在職業生涯中,筆者曾經使用過 ActiveMQ 、RabbitMQ 、Kafka 、RocketMQ 這些知名的消息隊列 。
這篇文章,筆者結合自己的真實經歷,和大家分享消息隊列的七種經典應用場景。
圖片
1 異步&解耦
筆者曾經負責某電商公司的用戶服務,該服務提供用戶注冊,查詢,修改等基礎功能。用戶注冊成功之后,需要給用戶發送短信。
圖片
圖中,新增用戶和發送短信都揉在用戶中心服務里,這種方式缺點非常明顯:
- 假如短信渠道接口不穩定,發送短信發生超時,用戶注冊接口耗時很大,影響前端用戶體驗;
- 短信渠道接口發生變化,用戶中心代碼就必須修改了。但用戶中心是核心系統。每次上線都必要謹小慎微。這種感覺很別扭,非核心功能影響到核心系統了。
為了解決這個問題,筆者采用了消息隊列進行了重構。
圖片
- 異步:用戶中心服務保存用戶信息成功后,發送一條消息到消息隊列 ,立即將結果返回給前端,這樣能避免總耗時比較長,從而影響用戶的體驗的問題。
- 解耦:任務服務收到消息調用短信服務發送短信,將核心服務與非核心功能剝離,顯著的降低了系統間的耦合度。
2 消峰
高并發場景下,面對突然出現的請求峰值,非常容易導致系統變得不穩定,比如大量請求訪問數據庫,會對數據庫造成極大的壓力,或者系統的資源 CPU 、IO 出現瓶頸。
筆者曾服務于神州專車訂單團隊,在訂單的載客生命周期里,訂單的修改操作先修改訂單緩存,然后發送消息到 MetaQ ,訂單落盤服務消費消息,并判斷訂單信息是否正常(比如有無亂序),若訂單數據無誤,則存儲到數據庫中。
圖片
當面對請求峰值時,由于消費者的并發度在一個閾值范圍內,同時消費速度相對均勻,因此不會對數據庫造成太大的影響,同時真正面對前端的訂單系統生產者也會變得更穩定。
3 消息總線
所謂總線,就是像主板里的數據總線一樣, 具有數據的傳遞和交互能力,各方不直接通信,使用總線作為標準通信接口。
筆者曾經服務于某彩票公司訂單團隊,在彩票訂單的生命周期里,經過創建,拆分子訂單,出票,算獎等諸多環節。每一個環節都需要不同的服務處理,每個系統都有自己獨立的表,業務功能也相對獨立。假如每個應用都去修改訂單主表的信息,那就會相當混亂了。
因此,公司的架構師設計了調度中心的服務,調度中心維護訂單的信息,但它不與子服務通訊,而是通過消息隊列和出票網關,算獎服務等系統傳遞和交換信息。
圖片
消息總線這種架構設計,可以讓系統更加解耦,同時也可以讓每個系統各司其職。
4 延時任務
用戶在美團 APP 下單,假如沒有立即支付,進入訂單詳情會顯示倒計時,如果超過支付時間,訂單就會被自動取消。
非常優雅的方式是:使用消息隊列的延時消息。
訂單服務生成訂單后,發送一條延時消息到消息隊列。消息隊列在消息到達支付過期時間時,將消息投遞給消費者,消費者收到消息之后,判斷訂單狀態是否為已支付,假如未支付,則執行取消訂單的邏輯。
圖片
RocketMQ 4.X 生產者發送延遲消息代碼如下:
Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//設置延遲level為5,對應延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);
RocketMQ 4.X 版本默認支持 18 個 level 的延遲消息, 通過 broker 端的 messageDelayLevel 配置項確定的。
圖片
RocketMQ 5.X 版本支持任意時刻延遲消息,客戶端在構造消息時提供了 3 個 API 來指定延遲時間或定時時間。
圖片
5 廣播消費
廣播消費是指每條消息推送給集群內所有的消費者,保證消息至少被每個消費者消費一次。
圖片
廣播消費主要用于兩種場景:消息推送和緩存同步。
01 消息推送
下圖是專車的司機端推送機制,用戶下單之后,訂單系統生成專車訂單,派單系統會根據相關算法將訂單派給某司機,司機端就會收到派單推送消息。
圖片
推送服務是一個 TCP 服務(自定義協議),同時也是一個消費者服務,消息模式是廣播消費。
司機打開司機端 APP 后,APP 會通過負載均衡和推送服務創建長連接,推送服務會保存 TCP 連接引用 (比如司機編號和 TCP channel 的引用)。
派單服務是生產者,將派單數據發送到 MetaQ , 每個推送服務都會消費到該消息,推送服務判斷本地內存中是否存在該司機的 TCP channel , 若存在,則通過 TCP 連接將數據推送給司機端。
02 緩存同步
高并發場景下,很多應用使用本地緩存,提升系統性能 。
本地緩存可以是 HashMap 、ConcurrentHashMap ,也可以是緩存框架 Guava Cache 或者 Caffeine cache 。
圖片
如上圖,應用A啟動后,作為一個 RocketMQ 消費者,消息模式設置為廣播消費。為了提升接口性能,每個應用節點都會將字典表加載到本地緩存里。
當字典表數據變更時,可以通過業務系統發送一條消息到 RocketMQ ,每個應用節點都會消費消息,刷新本地緩存。
6 分布式事務
以電商交易場景為例,用戶支付訂單這一核心操作的同時會涉及到下游物流發貨、積分變更、購物車狀態清空等多個子系統的變更。
圖片
1、傳統XA事務方案:性能不足
為了保證上述四個分支的執行結果一致性,典型方案是基于 XA 協議的分布式事務系統來實現。將四個調用分支封裝成包含四個獨立事務分支的大事務。基于 XA 分布式事務的方案可以滿足業務處理結果的正確性,但最大的缺點是多分支環境下資源鎖定范圍大,并發度低,隨著下游分支的增加,系統性能會越來越差。
2、基于普通消息方案:一致性保障困難
圖片
該方案中消息下游分支和訂單系統變更的主分支很容易出現不一致的現象,例如:
- 消息發送成功,訂單沒有執行成功,需要回滾整個事務。
- 訂單執行成功,消息沒有發送成功,需要額外補償才能發現不一致。
- 消息發送超時未知,此時無法判斷需要回滾訂單還是提交訂單變更。
3、基于 RocketMQ 分布式事務消息:支持最終一致性
上述普通消息方案中,普通消息和訂單事務無法保證一致的原因,本質上是由于普通消息無法像單機數據庫事務一樣,具備提交、回滾和統一協調的能力。
而基于 RocketMQ 實現的分布式事務消息功能,在普通消息基礎上,支持二階段的提交能力。將二階段提交和本地事務綁定,實現全局提交結果的一致性。
交互流程如下圖所示:
圖片
1、生產者將消息發送至 Broker 。
2、Broker 將消息持久化成功之后,向生產者返回 Ack 確認消息已經發送成功,此時消息被標記為"暫不能投遞",這種狀態下的消息即為半事務消息。
3、生產者開始執行本地事務邏輯。
4、生產者根據本地事務執行結果向服務端提交二次確認結果( Commit 或是 Rollback ),Broker 收到確認結果后處理邏輯如下:
- 二次確認結果為 Commit :Broker 將半事務消息標記為可投遞,并投遞給消費者。
- 二次確認結果為 Rollback :Broker 將回滾事務,不會將半事務消息投遞給消費者。
5、在斷網或者是生產者應用重啟的特殊情況下,若 Broker 未收到發送者提交的二次確認結果,或 Broker 收到的二次確認結果為 Unknown 未知狀態,經過固定時間后,服務端將對消息生產者即生產者集群中任一生產者實例發起消息回查。
- 生產者收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
- 生產者根據檢查到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務消息進行處理。
7 數據中轉樞紐
近10多年來,諸如 KV 存儲(HBase)、搜索(ElasticSearch)、流式處理(Storm、Spark、Samza)、時序數據庫(OpenTSDB)等專用系統應運而生。這些系統是為單一的目標而產生的,因其簡單性使得在商業硬件上構建分布式系統變得更加容易且性價比更高。
通常,同一份數據集需要被注入到多個專用系統內。
例如,當應用日志用于離線日志分析時,搜索單個日志記錄同樣不可或缺,而構建各自獨立的工作流來采集每種類型的數據再導入到各自的專用系統顯然不切實際,利用消息隊列 Kafka 作為數據中轉樞紐,同份數據可以被導入到不同專用系統中。
日志同步主要有三個關鍵部分:日志采集客戶端,Kafka 消息隊列以及后端的日志處理應用。
- 日志采集客戶端,負責用戶各類應用服務的日志數據采集,以消息方式將日志“批量”“異步”發送Kafka客戶端。Kafka客戶端批量提交和壓縮消息,對應用服務的性能影響非常小。
- Kafka 將日志存儲在消息文件中,提供持久化。
- 日志處理應用,如 Logstash,訂閱并消費Kafka中的日志消息,最終供文件搜索服務檢索日志,或者由 Kafka 將消息傳遞給 Hadoop 等其他大數據應用系統化存儲與分析。
圖片
如果我的文章對你有所幫助,還請幫忙點贊、在看、轉發一下,你的支持會激勵我輸出更高質量的文章,非常感謝!