消息隊列 CMQ 七大功能實踐案例
背景
消息隊列,在業務解耦、削峰填谷、流量控制、廣播消息等場景下都有很好的應用,已經成為很多企業IT系統內部通信重要手段。
現有常用的開源消息中間件有RabbitMQ、Kafka、RocketMQ等,但各自有著不同的應用場景和特點,例如,Kafka注重的是消息的吞吐量,不保證消息存儲的可靠性以及一致性,因此多用于日志系統數據的上報;RabbitMQ能保證消息可靠存儲投遞,但性能較差。
CMQ(Cloud Message Queue)是騰訊云開發的一款高可靠、高可用、高性能的分布式消息隊列服務,具有低耦合、消息可靠、強一致性、可擴展性等特點,支持Push/Pull消費模型、消息回溯、延時消息、發布訂閱、路由廣播、消息加密等一系列功能,以滿足更多的mq應用場景。
相對Kafka,CMQ更多注重消息高可靠的應用場景,例如金融、交易、訂單等業務;相比RabbitMQ,CMQ在可用性和性能上做了很大的優化和提升。更詳細的對比,請參考官網介紹。
本文先簡單介紹CMQ底層的架構實現,然后著重結合CMQ的功能特點來介紹CMQ的實踐案例,讓大家快速理解和上手CMQ的開發。
底層架構
CMQ整體架構如上圖所示,每個set由三個broker節點副本組成,保證消息的可靠存儲以及高可用性,且基于raft算法保證數據的一致性。CMQ單個set 在CAP理論中優先保證了CP,當SET中過半數節點都正常工作時,才能進行消息的生產消費。
實踐案例
一、廣播拉取消息模型
CMQ支持隊列(queue)和主題(topic)兩種模型,如下所示:
其中,queue模型是一對一的消息拉取(pull)模式,client端主動pull消息;而topic模型,也稱發布/訂閱模型,是一對多的消息推送(push)模式,CMQ服務端廣播消息時,根據各個訂閱地址主動推送消息給client。兩種模型基本能滿足大部分應用場景了,對比如下:
- queue模型,client端可以靈活根據自身能力去消費pull消息,消息實時性依賴client的消費速度,如果消費速度比生產速度慢,會引起大量消息堆積。
- topic模型,服務端主動推送消息,消息實時性比較高,但要求client性能上能及時處理大量推送過來的消息,并且在client發生故障的時候可能會導致丟消息(有消息重發策略做基本保障)。
對于topic模型,有以下特殊場景需求:
- client端想根據自身能力去pull消息
- 創建訂閱的時候需要暴露client端的接收消息的地址,但在一些企業內網、vpc網絡等特殊情況下,CMQ無法推送到,只能用pull方式獲取消息。
針對以上特殊場景,CMQ結合queue和topic兩種模型實現了一對多的廣播拉取消息模型,如下所示:
topic的訂閱者可以是一個queue實例,topic發布消息后,會自動將消息推送到queue,然后client和使用queue模型一樣去消費消息即可。
- # python sdk demo code: create subscription of queue protocal
- my_sub = my_account.get_subscription(topic_name, subscription_name)
- subscription_meta = SubscriptionMeta()
- subscription_meta.Endpoint = "queue1"
- subscription_meta.Protocal = "queue"
- my_sub.create(subscription_meta)
二、Pull長輪詢
對于Queue模型,消費者需要pull獲取消息,但問題是:消費者不知道隊列什么時候有消息,只能不停輪詢請求去pull,如果輪詢間隔時間短,在隊列長時間沒有消息時會耗費消費者請求資源且效率低,如果輪詢間隔時間長,則消費速度慢,消息實時性低,且造成消息大量堆積。
針對以上問題,CMQ解決方案是設計了長輪詢功能。例如,假設設置隊列長輪詢時間為10s
- 當消費者pull消息時,如果隊列中有消息則馬上返回
- 如果隊列暫時沒有消息,消費者pull請求不會馬上返回,而是會等待阻塞10s:當10s內有新的生產消息到達隊列,CMQ會馬上將消息投遞給正在阻塞等待的消費者,消費者端感知就是阻塞的pull請求被喚醒并且收到消息返回;當10s內隊列都沒有消息,則請求返回告訴消費者當前隊列沒有消息。
- # python sdk demo code: receive message through long polling
- pollingWaitSeconds = 3
- recv_msg = my_queue.receive_message(pollingWaitSeconds)
三、延時消息
CMQ提供延時消息功能:消息發送到隊列后,從入隊時間算起,消息在設置的延時時間后才對消費者可見,即才能被消費者消費到。延時消息功能可以很輕松實現一些定時任務的應用場景。
如上圖所示,根據CMQ延遲消息功能實現的定時任務檢查告警系統。
- # python sdk demo code: send delayed message
- msg_body = "I am delay message"
- msg = Message(msg_body)
- delaySeconds = 3
- my_queue.send_message(msg, delaySeconds)
四、消息回溯
CMQ提供類似于Kafka的消息回溯能力,已經消費刪除的消息是可以通過回溯來重新消費的。目前支持指定回溯時間點,在這個時間點開始被刪除的消息可以重新消費到。此功能在一些金融業務對賬、業務系統重試等場景下有很好的實用性。
***可回溯時間點 = 當前時間 - 設置的可回溯時長。消息生產時間在這個值之前的不可回溯,之后的可回溯,如下圖所示:
- # python sdk demo code: rewind the queue
- # backtrack one hour
- backTrackingTime = int(time.time()) - 3600
- my_queue.rewindQueue(backTrackingTime)
五、Topic路由匹配
CMQ topic模型提供類似于RabbitMQ的消息路由匹配功能,在消息廣播基礎上實現了消息的自動分發。
訂閱者可以指定bindingKey,即路由規則,如上所示,*(星號)可以匹配一個單詞,#(井號)可以匹配一個或多個單詞。例如,生產者發布一個消息,且消息的路由鍵(routingKey)是”quick.orange.elephant”,那么該消息只會推送給消費者C1;如果routingKey=”quick.orange.rabbit”,則消息會推送給C1和C2;如果routingKey=”lazy.brown.fox”,則消息只會推送給C2。
- # python sdk demo code: set topic-subscription route-rule
- my_sub = my_account.get_subscription(topic_name, subscription_name)
- subscription_meta = SubscriptionMeta()
- subscription_meta.Endpoint = "http://test.com"
- subscription_meta.Protocal = "http"
- subscription_meta.bindingKey = ['*.*.rabbit','lazy.#']
- my_sub.create(subscription_meta)
- message = Message()
- message.msgBody = "route msg test"
- my_topic.publish_message(message, 'quick.orange.rabbit')
六、超大消息傳輸
目前CMQ的隊列消息大小***限制為1MB,而當消息大小不超過64KB時,收發消息的***QPS限制分別為正常的5k(有特殊需求可調整),當消息大小超過64KB而小于1MB時,CMQ不保證收發消息的QPS性能。因此,支持大于64KB的消息只是為了考慮業務偶爾傳輸少量大消息且不想做消息分片的應用場景。
一般來說,64KB的消息限制大小基本能滿足大部分業務場景需求了,但在某些特殊場景下,消息數據大于64KB甚至大于1MB時,業務和CMQ如何支持這種超大消息的傳輸呢?這里有兩種解決方案:
1.消息分片。類似IP數據包分片傳輸原理,生產者對消息分片標記后分別發送到隊列,消費者從隊列取出所有分片消息進行組裝。個人方案如下:
- 每個消息body分為header和data兩部分。其中,data就是原消息分片后的內容,header包含三個標記:業務指定消息的ID號,唯一記錄一個消息的ID值,具有同一個ID號的消息分片才會在消費端重新組裝;分片序號(從1開始),記錄一個消息分片的次序編號,消費端依據分片序號依次組裝消息;下一分片是否存在的標記,如果是,說明消息包還不完整,否則消息組裝完畢。
- 由于可能存在多個消費者client,不同分片可能被不同client接收到,為了能夠組裝分片,需要一個集中式的地方存儲所有分片并最終組裝成完整的消息包,但無疑大大增加了系統設計的復雜度。
2.COS代理存儲(COS是騰訊云的對象存儲服務)。類似編程中的指針原理,方案如下(具體代碼實現參考附件):
- 生產者先把超大消息的數據以文件形式上傳到COS,并返回消息文件的COS URL地址;
- 生產者將URL地址作為消息發送到CMQ隊列中;
- 消費者從CMQ隊列中讀取消息,判斷消息內容是否是COS的URL地址信息,如果是,則根據URL地址從COS下載相應的消息文件,并從文件中讀取出超大消息的數據。
七、消息加密傳輸
騰訊云提供秘鑰管理服務KMS,能對數據進行安全加密。CMQ消息加密功能有以下兩種方案:
1.CMQ SDK客戶端加密方案。客戶端發送消息時,根據設置的CMK(KMS的秘鑰ID)調用KMS生成數據秘鑰接口,會返回數據秘鑰的明文key以及加密后的密文key,使用明文key對消息進行本地加密,然后將加密的數據和密文key作為消息 發送給CMQ;消費者接收消息時,先獲取消息中的密文key,調用KMS接口解密(不必每次均調用,可做緩存)得到對應的明文key,***根據明文key本地解密密文數據即可。具體代碼實現參考附件。
2.CMQ服務端加密方案。該方案,由CMQ服務端和KMS服務打通,CMQ自動對消息加解密,用戶無感知,例如,用戶通過https接口發送消息,由CMQ自動加密后存儲,通過https接口接收消息時,CMQ對消息自動解密后返回給用戶。此功能正在開發中。
結語
CMQ更多功能正在開發中,例如,死信隊列、FIFO順序消息等,歡迎體驗:)
原文鏈接:https://cloud.tencent.com/community/article/211497,作者:莊秋濤
【本文是51CTO專欄作者“騰訊云技術社區”的原創稿件,轉載請通過51CTO聯系原作者獲取授權】