我們在順序消息和事務消息方面的實踐
第一部分: 基本介紹
1. 領域模型概述
1.1 消息生產
生產者(Producer):
Apache RocketMQ 中用于產生消息的運行實體,一般集成于業務調用鏈路的上游。生產者是輕量級匿名無身份的。
1.2 消息存儲
- 主題(Topic):
Apache RocketMQ 消息傳輸和存儲的分組容器,主題內部由多個隊列組成,消息的存儲和水平擴展實際是通過主題內的隊列實現的。
- 隊列(MessageQueue):
Apache RocketMQ 消息傳輸和存儲的實際單元容器,類比于其他消息隊列中的分區。Apache RocketMQ 通過流式特性的無限隊列結構來存儲消息,消息在隊列內具備順序性存儲特征。
- 消息(Message):
Apache RocketMQ 的最小傳輸單元。消息具備不可變性,在初始化發送和完成存儲后即不可變。
1.3 消息消費
- 消費者分組(ConsumerGroup):
Apache RocketMQ 發布訂閱模型中定義的獨立的消費身份分組,用于統一管理底層運行的多個消費者(Consumer)。同一個消費組的多個消費者必須保持消費邏輯和配置一致,共同分擔該消費組訂閱的消息,實現消費能力的水平擴展。
- 消費者(Consumer):
Apache RocketMQ 消費消息的運行實體,一般集成在業務調用鏈路的下游。消費者必須被指定到某一個消費組中。
- 訂閱關系(Subscription):
Apache RocketMQ 發布訂閱模型中消息過濾、重試、消費進度的規則配置。訂閱關系以消費組粒度進行管理,消費組通過定義訂閱關系控制指定消費組下的消費者如何實現消息過濾、消費重試及消費進度恢復等。
Apache RocketMQ 的訂閱關系除過濾表達式之外都是持久化的,即服務端重啟或請求斷開,訂閱關系依然保留。
2. 消息傳輸模型介紹
主流的消息中間件的傳輸模型主要為點對點模型和發布訂閱模型。
點對點模型
點對點模型也叫隊列模型,具有如下特點:
- 消費匿名:消息上下游溝通的唯一的身份就是隊列,下游消費者從隊列獲取消息無法申明獨立身份。
- 一對一通信:基于消費匿名特點,下游消費者即使有多個,但都沒有自己獨立的身份,因此共享隊列中的消息,每一條消息都只會被唯一一個消費者處理。因此點對點模型只能實現一對一通信。
發布訂閱模型
發布訂閱模型具有如下特點:
- 消費獨立:相比隊列模型的匿名消費方式,發布訂閱模型中消費方都會具備的身份,一般叫做訂閱組(訂閱關系),不同訂閱組之間相互獨立不會相互影響。
- 一對多通信:基于獨立身份的設計,同一個主題內的消息可以被多個訂閱組處理,每個訂閱組都可以拿到全量消息。因此發布訂閱模型可以實現一對多通信。
傳輸模型對比
點對點模型和發布訂閱模型各有優勢,點對點模型更為簡單,而發布訂閱模型的擴展性更高。Apache RocketMQ 使用的傳輸模型為發布訂閱模型,因此也具有發布訂閱模型的特點。
注:以上信息來源于官網
3. 普通消息的可靠性
普通消息一般應用于微服務解耦、事件驅動、數據集成等場景,這些場景大多數要求數據傳輸通道具有可靠傳輸的能力,且對消息的處理時機、處理順序沒有特別要求。
3.1 發送端怎么保證可靠性
3.1.1 ACK機制
3.2 存儲端怎么保證消息可靠性
RocketMQ存儲端也即Broker端在存儲消息的時候會面臨以下的存儲可靠性挑戰:
- Broker正常關閉
- Broker異常Crash
- OS Crash
- 機器掉電,但是能立即恢復供電情況
- 機器無法開機(可能是cpu、主板、內存等關鍵設備損壞)
- 磁盤設備損壞
1正常關閉,Broker可以正常啟動并恢復所有數據。2、3、4同步刷盤可以保證數據不丟失,異步刷盤可能導致少量數據丟失。5、6屬于單點故障,且無法恢復。解決單點故障可以采用增加Slave節點,主從異步復制仍然可能有極少量數據丟失,同步復制可以完全避免單點問題。
這里一般來說就需要在性能和可靠性之間做出取舍,對于RocketMQ來說,Broker的可靠性主要由兩個方面保障:
- 單機的刷盤機制
- 主從同步
3.2.1 單機的刷盤機制
頁緩存:操作系統中用于存儲文件系統緩存的內存區域。RocketMQ通過將消息首先寫入頁緩存,實現了消息在內存中的持久化。
CommitLog:是RocketMQ中消息的物理存儲結構,包含了所有已發送的消息。CommitLog的持久化保證了即使在異常情況下,如Broker宕機,消息也能夠被恢復。同步刷盤:是指將內存中的數據同步刷寫到磁盤。RocketMQ確保消息在被發送后,首先在內存中得到持久化,然后再刷寫到磁盤,從而防止數據的丟失。
異步刷盤:消息寫入到頁緩存中,就立刻給客戶端返回寫操作成功,當頁緩存中的消息積累到一定的量時,觸發一次寫操作,或者定時等策略將頁緩存中的消息寫入到磁盤中。這種方式吞吐量大,性能高,但是頁緩存中的數據可能丟失,不能保證數據絕對的安全。
實際應用中要結合業務場景,合理設置刷盤方式,尤其是同步刷盤的方式,由于頻繁的觸發磁盤寫動作,會明顯降低性能。
3.2.2 主從同步
主 Broker:負責消息的讀寫和寫入 CommitLog。從 Broker:用于備份主 Broker 的消息,確保在主 Broker 故障時可以順利切換。同步復制:主節點將消息同步復制到所有從節點,確保從節點具有相同的消息副本。切換:在主節點發生故障時,從節點可以快速切換為新的主節點,確保消息服務的持續性。
3.3 消費端怎么保證可靠性
3.3.1 ACK
Rocket Mq是通過offset來標記一個消費者組在隊列上的消費進度,消費成功之后都會返回一個ACK消息告訴broker去更新offset,但是RocketMQ并不是每消費一條消息就做一次ACK,而是消費完批量消息后只做一次ACK。
所以ACK機制是為了準確的告知Broker批量消費成功的信息并且更新消費進度。
那批量消費時具體是如何更新消費進度?
- 每一條消息消費成功后,會按照當前消息最小的offset來更新本地的消費進度
- 由5秒的定時任務將offset提交到Broker
優點:防止消息丟失。
缺點:會造成消息重復消費(使用方需要做冪等。
3.3.2 重試
消費者從RocketMQ拉取到消息之后,需要返回消費成功來表示業務方正常消費完成。因此只有返回CONSUME_SUCCESS才算消費完成,如果返回 CONSUME_LATER 則會按照【重試次數】進行再次消費,【重試間隔為階梯時間】。如果消費滿16次之后還是未能消費成功,則不再重試,會將消息發送到死信隊列,從而保證【消息消費】的可靠性。
3.3.3 死信消息
默認最多重試16次,總時長4小時46分鐘。
未能成功消費的消息,消息隊列并不會立刻將消息丟棄,而是將消息發送到死信隊列,其名稱是在【消費組】前加 %DLQ% 的【特殊 topic】,如果消息最終進入了死信隊列,則可以通過RocketMQ提供的相關接口從死信隊列獲取到相應的消息,進行報警人工干預或其他手段,保證了消費組的至少一次消費。
小節
至此應該清晰的知道RocketMq為了保證可靠性做了哪些工作。接下來我們再把視角切換到今天的第一個核心問題順序消息。
第二部分: 順序消息
1. 我們遇到的線上問題
客服同學: @XXX 我們判責了,為啥客戶還收不到退款? 售后單號xxxxxx。
研發同學: 讓我看看。
…..
30分鐘后
研發同學: 修復了,再看一下。
測試同學: 提個online, 研發填一下問題原因,責任歸屬。
研發同學:問題原因: 歷史代碼,我們沒有順序消費消息,正常的流程是 先判責完成,再打款. 這一單 先消費了打款消息, 還沒有消費判責完成消息,狀態不對導致打款失敗。
測試同學:那后續如何修改啊。
研發同學: 為了保證消息的有序性,我等會把消息修改為順序消費。
……
以上故事純屬于虛構
2. RocketMQ消息隊列為什么會有順序問題?
從上面的消息隊列模型我們知道,1個topic有N個queue,將數據均勻分配到各個queue上,這樣可以提升消費端總體的消費性能。比如一個topic發送10條消息,這10條消息會自動分散在topic下的所有queue中,所以消費的時候不一定是先消費哪個queue,后消費哪個queue,這就導致了無序消費。
3. 順序消息的使用場景
- 金融交易、訂單流程處理。比如我們的暗拍場景下,相同出價,先出價商戶優先成單,比如我們的訂單的發生售后時,售后訂單的處理流程。
- 實時同步數據的場景,如數據庫增量同步,順序消息也可以發揮其作用。通過使用順序消息,可以確保數據按照正確的順序進行同步,從而保持數據的一致性和準確性。
4. 實際開發過程中如何保證消息的順序性?
4.1 生產順序性
- 多生產者單線程
消息生產的順序性僅支持單一生產者,如果不同生產者分布在不同的系統,那么不同生產者之間產生的消息,我們無法知道消息之間實際的先后順序。
- 單生產者多線程
- 相同業務的消息按照先后順序被存儲在同一個隊列。
- 不同業務的消息可以混合在同一個隊列中,且不保證連續。
- 如果生產者使用多個線程進行并行發送,那么不同線程間產生的消息,我們無法知道消息之間實際的先后順序。
4.2 消費者順序性
消費消息時需要嚴格按照接收—處理—應答的順序處理消息,避免使用異步回調或多線程處理,這樣可以防止消息處理過程中的并發問題。對于每條消息,只有當它完全處理完畢并發送應答后,才繼續處理下一條消息。
5. 使用順序消息需要注意的點
- 消費消息時異常如何處理
- 如果發生異常,需要消費方進行處理,順序消費默認是 無限重試消費的 , 無限重試會導致當前消息隊列阻塞,影響后續消息消費。
- 需要我們在重試一定次數后進行處理,即一條消息如果一直重試失敗,超過最大重試次數后將不再重試,跳過這條消息消費 并 監控和告警,人工介入處理,不能一直阻塞后續消息處理。
- 比如我們業務售后流程中,某個節點的售后單消息消費異常,我們目前解決方案是:把重試3次仍然失敗的消息存儲到數據庫中,同時把同一售后單后續消息也先存入數據庫中,同時發出告警,后續人工介入處理后重新消費該異常售后單的消息。保證不影響同一隊列下其它售后單消息消費。
- 消息組盡可能打散,避免集中導致熱點
- Apache RocketMQ 保證相同消息組的消息存儲在同一個隊列中,如果不同業務場景的消息都集中在少量或一個消息組中,則這些消息存儲壓力都會集中到服務端的少量隊列或一個隊列中。容易導致性能熱點,為了提高系統的吞吐量和穩定性,避免因為某些消息組過于集中而導致資源瓶頸或性能下降。
- 在設計消息鍵時,應盡量避免讓消息集中到少數幾個MessageQueue中。可以考慮將業務相關的多個字段組合成消息鍵,比如訂單ID、用戶ID作為消息鍵,或者使用哈希算法來生成消息鍵,以增加消息分發的隨機性和均勻性。可實現同一用戶、同一訂單的消息按照順序處理,不同用戶或者不同訂單的消息無需保證順序。
小節
順序消息是一個老生常談的問題,但是簡單粗暴的硬搬網上的解決方案往往效果不盡如意,實際想要解決的徹底的確不是那么容易,希望可以帶給各位看官一些思考和幫助。我們再把視角切換到事務消息身上去。
第三部分: 事務消息
1. 我們遇到的線上問題
客服同學: @XXX 用戶在我們的app上一直獲取不到報價,比較急,麻煩看一下怎么回事。
產品同學: 好的,收到,我這邊馬上找研發同學看一下,@XXX 需要幫忙看一下。
研發同學:好的,我看一下。
…..
30分鐘后
研發同學: 修復了,再看一下。
產品同學: 把問題原因同步一下吧。
研發同學:詢價過程中,風控命中了特殊報價池,我們這邊把特殊報價池的數據存到了數據庫,同時發送了MQ消息,結果MQ消息發送成功了,但是數據庫存儲失敗,導致再次詢價的時候,查不到數據導致的。
測試同學:那后續如何修改啊。
研發同學: 后續我們會把普通消息改成事務消息,這樣就能保證消息發送和數據庫存儲的一致性了。
……
以上故事純屬于虛構
2. 為什么使用了消息隊列反而不可控呢?
MQ消息本身就具有解耦性,消息本身并不關注接收方的狀態是否符合預期,只要消息成功發送并且被成功接收,在MQ本身看來就是成功,如果想要保證發送方和接受方的狀態變更符合預期,就要保證本次事務操作和消息發送的一致性,這里我們就必須要提到事務消息。
所謂事務消息,其實是為了解決上下游寫一致性,也即是完成當前操作的同時給下游發送指令,并且保證上下游要么同時成功或者同時失敗。
3. 事務消息的使用場景
- 經典場景
- 支付發起后,當筆訂單處于中間狀態,給支付網關發起指令,如果發起轉賬失敗則不發送指令,發送成功后等待支付網關反饋更新支付狀態。如果在同一個數據庫中進行,事務可以保證這兩步操作,要么同時成功,要么同時不成功。這樣就保證了轉賬的數據一致性。但是在微服務架構中,因為各個服務都是獨立的模塊,都是遠程調用,都沒法在同一個事務中,都會遇到分布式事務問題。
- 我方場景
- 在我們售后判責(用戶與賣家發生了糾紛)的過程中,如果用戶對于判責結果不滿意,可以進行復檢申訴,我方需要對復檢申訴進行改判或者維持原判,如果需要改判,我方需要將改判結果進行保存,同時,對于我們下游的行星售后等系統進行發送消息,之所以發送消息而不是直接調用,是因為消息的接收方不止一個,如果全部是RPD調用的話,代碼的侵入性太強,但是消息有很強的解耦性,并不能保證上下游狀態的一致性,這個時候,事務消息就很符合這個場景,如果我們本地事務提交成功,就發送事務消息,下游同步修改如果我們本地事務失敗,就不在發送消息,從而保持本地事務與消息的一致性。
4. 為什么需要引入分布式事務消息
- MQ本身就具備了實現了系統之間的解耦特性。
- 分布式事務保障本地事務和消息發送的原子性。
- 具備以上特性的同時還可以保證最終的數據一致性。
5. 開源版本事務消息
5.1 基本實現原理
基于MQ的事務消息方案主要依靠MQ的Half消息機制來實現投遞消息和參與者自身本地事務的一致性保障。
Half消息:在原有隊列消息執行后的邏輯,如果后面的本地邏輯出錯,則不發送該消息,如果通過則告知MQ發送。Half消息機制實現原理其實借鑒的2PC的思路,是二階段提交的廣義拓展。
- 事務發起方producer首先發送Half消息到broker
- MQ通知發送方消息發送成功
- 在發送Half消息成功后producer執行本地事務
- 本地事務完畢,根據事務的狀態,Producer向Broker發送二次確認消息,確認該Half Message的Commit或者Rollback狀態。Broker收到二次確認消息后,對于Commit狀態,則直接發送到Consumer端執行消費邏輯,而對于Rollback則直接標記為失敗,一段時間后清除,并不會發給Consumer。正常情況下,到此分布式事務已經完成,剩下要處理的就是超時問題,即一段時間后Broker仍沒有收到Producer的二次確認消息;
- 針對超時狀態,Broker主動向Producer發起消息回查;
- Producer處理回查消息,返回對應的本地事務的執行結果;
- Broker針對回查消息的結果,執行Commit或Rollback操作,同4;
事務消息共有三種狀態,提交狀態、回滾狀態、中間狀態:
CommitTransaction: 提交事務,它允許消費者消費此消息。RollbackTransaction: 回滾事務,它代表該消息將被刪除,不允許被消費。Unknown: 中間狀態,它代表需要檢查消息隊列來確定狀態。事務消息的核心類為TransactionListenerImpl,里面提供了兩個核心方法,具體的代碼如下圖:
executeLocalTransaction方法:用來執行本地事務,返回本地事務給到broker,同時,將事務狀態進行記錄:
checkLocalTransaction 方法:用來查詢本地事務的執行結果提供給broker
5.2 事務消息發送邏輯–producer發送
事務消息是由兩個消息來實現的,一個是RMQ_SYS_TRANS_HALF_TOPIC消息,作用是用來存儲第一階段的parpare消息,事務消息首先先進入到該主題消息,消息具體是提交還是回滾要根據第二階段的消息來判斷。另一個是RMQ_SYS_TRANS_OP_HALF_TOPIC消息,用來接收第二階段的Commit或Rollback消息。
特別需要注意的一點,RMQ_SYS_TRANS_HALF_TOPIC消息是用來存儲不能被消費者發現的消息,通過RMQ_SYS_TRANS_OP_HALF_TOPIC消息,來對RMQ_SYS_TRANS_HALF_TOPIC消息對應的事務狀態來進行確認的,確認commit之后,需要將一階段中設置的特殊Topic和Queue替換成真正的目標的Topic和Queue,后通過一次普通消息的寫入操作來生成一條對用戶可見的消息。所以RocketMQ事務消息二階段其實是利用了一階段存儲的消息的內容,在二階段時恢復出一條完整的普通消息。
5.3 事務消息發送邏輯--broker回查
如果在RocketMQ事務消息的二階段過程中失敗了,例如在做Commit操作時,出現網絡問題導致Commit失敗,那么需要通過一定的策略使這條消息最終被Commit。RocketMQ采用了一種補償機制,稱為“回查”。
Broker端對未確定狀態的消息發起回查,將消息發送到對應的Producer端(同一個Group的Producer),由Producer根據消息來檢查本地事務的狀態,進而執行Commit或者Rollback。Broker端通過對比Half消息和Op消息進行事務消息的回查并且推進CheckPoint(記錄那些事務消息的狀態是確定的)。
需要注意的是,RocketMQ并不會無休止的的信息事務狀態回查,默認回查15次,如果15次回查還是無法得知事務狀態,RocketMQ默認回滾該消息。
6. 轉轉版本事務消息
6.1 差異
- 設計方式的不同
- 開源版本基于MQ消息本身立場,在垂直方向做了拓展,在RocketMQ的服務里面,直接嵌入了事務消息,相當于把這種能力重新下沉到MQ中,便于使用,不用在做任何的額外工作。
- 轉轉版本基于公司業務場景發展,在水平方向做了拓展,對RocketMQ統一做了一層封裝(此時開源版本并沒有事務消息),方便使用,我們的設計思想可以遷移到任何不支持事務消息的MQ中,沒有額外依賴,便于拓展,使得事務消息不在局限于RocketMQ本身。
- 開源版本
- 轉轉版本
- 實現方式的不同
- 開源版本是通過內部隊列和狀態回查實現了事務的最終一致性。
- 轉轉版本由數據庫的本地事務來保證事務的原子性,并由重試機制保證消息發送的可靠性。相當于把業務系統和消息隊列的分布式事務重新“降級”為數據庫中的本地事務。
- 開源版本
- 轉轉版本
6.2 基本實現原理
- 事務消息的發送流程,在事務過程中,會將事務信息消息記錄到數據庫中。
- 獲取到msg之后,執行校驗邏輯,在發送失敗或者未查詢到數據后,會將這個msg丟入到另一個隊列timeWheelQueue ,由另一個定時任務去處理。具體的流程如下圖。
- 因為涉及到jvm的內存存儲,所以要考慮上線或者其他情況導致服務重啟,未發送完的消息該如何處理。
小節
事務消息無論是開源版本還是轉轉版本,都是繞不過去的點,因為我們作為業務側團隊在如今遍地都是分布式系統的情況下太需要這樣的能力來幫我們兜底了。而作為各位看官,為了能夠正確得使用事務消息以及方便排查這里的問題,也是很有必要了解清楚這里的技術實現細節。
作者
黃培祖 轉轉采貨俠后端工程師
朱洪旭 轉轉采貨俠后端工程師