靈魂發問:重復消費順序消費分布式事務
hello大家好
我是大家的學習成長小伙伴Captain
我們繼續學習RocketMQ,上一篇我們學習了廣播消息、延遲消息、批量消息、過濾消息這些在RocketMQ中的特性,這一篇我們繼續來學習RocketMQ中的那些奇奇怪怪的特性,讓你在開發中如魚得水
這一篇我們要說的是重復消費、順序消費這兩個在消息隊列中常見的問題,以及一種事務消息,這種事務消息可以在消息隊列中完成分布式事務的特性
把之前的這些技術點有關的文章貼到這里,大家可以先讀一讀
- 搞懂什么是RocketMQ
- 我怎么不知道RocketMQ生產者有這么多用法?(圖片在末尾,不謝)
- 面試官問我:分布式事務是什么?
像這種啊,應該都是面試場上非Ban必選的技術點,除非面試官忘記了,否則他大概率會問起這些問題相關的技術棧,到時候可以到了發揮大家技術海和技術深度的時候了
01 重復消費問題
問題開始
我們來聊一聊消息隊列中的重復消費問題吧
這種問題應該是必然存在的,也是大家使用消費隊列必須考慮的問題之一,反正我用消息隊列這個問題都是首先考慮的,因為這個問題如果不去考慮,可能會造成業務上的不可接受的問題
重復消費,大家肯定也明白啥意思,就是同樣的消息消費了多次
為什么說這種問題必然存在呢,因為消息隊列一定有它的重試機制,也就是消息重發,一旦消費端出現異常的情況下,消息隊列會進行消息的重發
你重發消息重新處理沒問題,但是一般一個消息的監聽者不止一個,也就是可能多個系統都在監聽著處理這個消息,別的系統要是不支持重復消費,那豈不很糟糕
別的系統的數據就會出現混亂,各個系統之間的數據便會出現不一致的情況
舉個例子,電商系統中的支付成功消息,支付成功之后發送一個消息,積分系統、物流系統多個系統監聽這一消息,積分系統處理出現異常,該支付成功的消息重新發送了一條,物流系統要是不支持消息的重試,那就出現了兩個物流單子,那可能會造成客戶買了一件商品,付了一件商品的錢,結果呢,給用戶發了多個該商品
???
啊這...
這樣豈不糟糕透了?你也可能該收拾東西了
其實出現消息重試這真的真的是很常見的情況,也是大家在使用消息隊列必須必須要考慮的,比如網絡抖動、系統業務的處理bug等,這個問題不處理,系統后患無窮
那這種重復消費問題如何避免呢
解決方案:冪等
???
簡單來說,冪等是一個數學上的概念,通俗的解釋就是同樣的參數多次調用同樣的接口,調用的結果都是一樣的,也就是你支付成功的消息發送多少次,最終生成的物流數據還是一條
這樣就沒問題了
那如何去保證冪等呢
像這種問題我一般是分為兩種場景去回答的,一種是生產端的冪等,另一種是消費端的冪等
生產者端的冪等一般都是通過第三方的存儲來完成的,比如Redis,或者是流水表,在消息發送之后,將記錄暫時保存起來,下次發送消息之前,在Redis中檢查該消息是否發送過,不過這種在很多場景下是不合適的,這種會在生產端就限制了重試這一機制
如果生產端發送成功,消費失敗,則不會重新發送該消息
另一種消費者端的冪等,這種是屬于最常見的,生產者無論發送多少次同樣的消息,最終的執行結果都是一樣的,可以分為強冪等和弱冪等來處理
強冪等其實就是用于必須冪等的業務場景,不允許出現差錯的,這種更為謹慎些,比如上面的支付成功的這種消息,物流消費方的處理肯定要是強冪等咯
這里可以引進一個三方存儲,流水表或者Redis都可以,支付成功之后,記錄到流水表中,這里用Redis可能會丟失,把支付成功和記錄到流水表放入到同一個事務中,要么一起成功,要么一起失敗
每次消息過來之后根據訂單號去流水表中檢查是否有這條流水,有流水則直接return就可以了
也可以直接用數據庫的唯一約束來做insert操作
還以一種屬于是弱冪等性,這種不能保證百分百情況下冪等,比如用Redis來存儲業務ID作為唯一key來處理,Redis宕機可能導致短信發送情況的丟失,不過問題不大,用戶也是可以接受的,我們來看一下實例代碼
String idempotentValue = RedisUtil.get(RedisConstant.IDEMPOTENT.concat(msgId), String.class); if (!StringUtils.isEmpty(idempotentValue)) { log.info("========該消息已經被消費:【{}】", msgBody); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //業務代碼 //冪等處理 RedisUtil.setEx(RedisConstant.IDEMPOTENT.concat(msgId), "1", 5, TimeUnit.DAYS);
02 順序消費
那你說一下你有沒有遇到過順序消費這個場景呢?
順序消費這個場景其實不是特別的常見,但是也是必不可少的,因為在某些業務場景下順序是很關鍵的,保證消息的消費順序也是很關鍵的
比如我們有一個操作需要對數據進行刪除、增加、修改三個操作,這種在一般的系統中我們都會采用SQL來進行操作,但是當數據量很大的時候,我們做備份同步數據的時候,這種同步有的時候會通過消息隊列來慢慢的去執行,這個時候就很有必要保證消息的順序性,如果上面的三個操作變成了修改、刪除、增加這樣的順序,那就不是我們想要的效果了
普通的消息的消費當然是沒有固定順序的,消息發送的時候默認是采用的輪詢的方式發送到不同的分區中
???
而消費端消費的時候則是會分配到多個分區的,多個分區是同時拉取提交消費的,在同一個分區queue中,是可以保證FIFO的,但是普通消息是沒法達到順序消費的,只需要將消息投遞到同一條queue中即可
???
按照上面所說,我們只需要保證需要保持順序的消息投遞到相同的queue中即可,這樣同一個queue中的消息肯定會投遞到同一個消費實例,同一個消費實例肯定是順序拉取消息,然后順序的去消費
即使觸發重排導致queue分配給了別的消費者也沒有關系,由于queue的消息永遠是FIFO的,所以只需要保證消息的重復消費的冪等性即可,queue的內部順序還是沒問題的
順序消費分配全局順序和分區順序
- 全局順序:對于指定的一個Topic,所有消息按照嚴格的先入先出FIFO(First In First Out)的順序進行發布和消費。
- 分區順序:對于指定的一個Topic,所有消息根據Sharding Key進行區塊分區。同一個分區內的消息按照嚴格的FIFO順序進行發布和消費。Sharding Key是順序消息中用來區分不同分區的關鍵字段,和普通消息的Key是完全不同的概念。
為什么全局魂虛順序消息消費性能一般
全局順序消息是嚴格按照FIFO的消息阻塞原則,即上一條消息沒有被成功消費,那么下一條消息會一直被存儲到Topic隊列中。如果想提高全局順序消息的TPS,可以升級實例配置,同時消息客戶端應用盡量減少處理本地業務邏輯的耗時。
在rocketmq中,一個topic下有多個隊列queue,于是乎為了保證消息的順序性,將消息發送到同一個queue中,rocketmq提供了MessageQueueSelector隊列選擇機制,有三種實現
???
使用Hash取模法讓需要順序消費的消息發送到同一個queue中,再使用同步發送,當然這個取模根據的是這些消息的共同屬性
rocketmq僅僅保證了發送的順序性,至于最終的順序消費還是要由消費者業務來保證,就是我保證我發給你的是按照順序的消息,但是你要是自己給處理亂了就不關我rocketmq的事了,那就是你自己的代碼問題了
其實還是存在一些異常的場景會導致出現亂序的情況,比如master宕機,導致寫入隊列的數量發生了變化,你想啊,采用上面的hash取模就會出現消息分散到其它的queue中,這樣就不能保證有序了,除非選擇master如果掛了就無法發送接下來的消息
03 分布式事務
聊一下分布式事務吧
大家看一下這篇面試官問我:分布式事務是什么?
簡單來說就是,事務是要么全部執行成功,要么全部執行失敗;而分布式事務就是跨機器的,跨服務的,跨系統的事務保證,現在的系統都是拆分成很多的服務,每個服務最少部署兩臺,分別部署在不同的機器上
這樣系統之間的事務保證就是分布式事務
而rocketmq中的事務消息則天然支持分布式事務
事務消息:實現類似X或者Open XA的分布式事務功能,以達到最終一致性
消息隊列RocketMQ版提供類似X或Open XA的分布式事務功能,通過消息隊列RocketMQ版事務消息,能達到分布式事務的最終一致。
半事務消息:暫不能投遞的消息,發送方已經成功地將消息發送到了消息隊列RocketMQ版服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處于該種狀態下的消息即半事務消息。
消息回查:由于網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,消息隊列RocketMQ版服務端通過掃描發現某條消息長期處于“半事務消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit或是Rollback),該詢問過程即消息回查。
???
跟Captain來看看事務消息發送步驟:
1、發送方將半事務消息發送到服務端Broker,服務端會將消息持久化,成功之后會返回ACK確認消息已經發送成功,此時消息為半事務消息
2、發送方開始執行本地事務的邏輯
3、發送方會根據本地事務的執行結果向服務端提交二次確認,決定Commit還是Rollback,服務端收到Commit之后則把這個消息標記為可投遞,發送到消費方;服務端收到Rollback之后則刪除半事務消息,服務端不會發送,則消費方也不會收到
如可是如果斷網或者應用重啟這些情況,上述的步驟的二次確認信息無法到達服務端,怎么辦?
這里其實有個回查機制,發送方發送消息之后,需要本地執行事務,如果事務執行的過程出現卡死的情況,或者事務執行結果因為網絡等問題,無法傳遞事務結果到服務端,服務端會執行一個回查機制,來確認這個半事務消息的最終提交情況
本文轉載自微信公眾號「Java賊船」
???