成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

事物消息的實現-RocketMQ知識體系6

開發 前端
RocketMQ提供了事務消息的功能,采用2PC(兩段式協議)+補償機制(事務回查)的分布式事務功能,通過消息隊列 RocketMQ 版事務消息能達到分布式事務的最終一致。

[[411281]]

分布式事務是指事務的參與者、支持事務的服務器、資源服務器以及事務管理器分別位于不同的分布式系統的不同節點之上。例如在大型電商系統中,下單接口通常會扣減庫存、減去優惠、生成訂單 id, 而訂單服務與庫存、優惠、訂單 id 都是不同的服務,下單接口的成功與否,不僅取決于本地的 db 操作,而且依賴第三方系統的結果,這時候分布式事務就保證這些操作要么全部成功,要么全部失敗。本質上來說,分布式事務就是為了保證不同數據庫的數據一致性。

目前解決分布式事物的解決方案有seata,lcn 等。

RocketMQ 分布式事物實現

RocketMQ提供了事務消息的功能,采用2PC(兩段式協議)+補償機制(事務回查)的分布式事務功能,通過消息隊列 RocketMQ 版事務消息能達到分布式事務的最終一致。

首先,我們要知道什么是半事物消息和消息回查:

  • 半事務消息:

暫不能投遞的消息,發送方已經成功地將消息發送到了消息隊列 RocketMQ 版服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處于該種狀態下的消息即半事務消息。

  • 消息回查:

由于網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,消息隊列 RocketMQ 版服務端通過掃描發現某條消息長期處于“半事務消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該詢問過程即消息回查。

【交互流程】

事務消息發送步驟如下:

  1. 發送方將半事務消息發送至消息隊列 RocketMQ 版服務端。
  2. 消息隊列 RocketMQ 版服務端將消息持久化成功之后,向發送方返回 Ack。確認消息已經發送成功,此時消息為半事務消息。
  3. 發送方開始執行本地事務邏輯。
  4. 發送方根據本地事務執行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到 Commit 狀態則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態則刪除半事務消息,訂閱方將不會接受該消息。

事務消息回查步驟如下:

  1. 在斷網或者是應用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務端,經過固定時間后服務端將對該消息發起消息回查。
  2. 發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
  3. 發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟 4 對半事務消息進行操作。

總體而言RocketMQ事務消息分為兩條主線:

  • 發送流程:發送half message(半消息),執行本地事務,發送事務執行結果
  • 定時任務回查流程:MQ定時任務掃描半消息,回查本地事務,發送事務執行結果

源碼相關

Producer發送事務半消息的(prepare)

在本地應用發送事務消息的核心類是TransactionMQProducer,該類通過繼承DefaultMQProducer來復用大部分發送消息相關的邏輯,這個類的代碼量非常少只有100來行,下面是這個類的sendMessageTransaction方法

這里的transactionListener就是上面所說的消息回查的類,它提供了2個方法:

  • executeLocalTransaction

執行本地事務

  • checkLocalTransaction

回查本地事務

接著看DefaultMQProducer.sendMessageInTransaction()方法:

該方法主要做了以下事情

  • 給消息打上事務消息相關的tag,用于broker區分普通消息和事務消息
  • 發送半消息(half message)
  • 發送成功則由transactionListener執行本地事務
  • 執行endTransaction方法,告訴 broker 執行 commit/rollback。

執行本地事務

Producer 半事務消息發送成功后,會調用transactionListener.executeLocalTransaction方法執行本地事務。只有半消息發送成功后,才會執行本地事務,如果半消息發送失敗,則設置回滾。

結束事務(commit/rollback)

本地事務執行后,則調用this.endTransaction()方法,根據本地事務執行狀態,去提交事務或者回滾事務。

如果半消息發送失敗或本地事務執行失敗告訴服務端是刪除半消息,半消息發送成功且本地事務執行成功則告訴服務端生效半消息

Broker端處理事務消息

Broker端通過SendMessageProcessor.processRequest()方法接收處理 Producer 發送的消息 最后會調用到SendMessageProcessor.sendMessage(),判斷消息類型,進行消息存儲。

存儲半消息

代碼 prepareMessage(msgInner) :

在這一步,備份消息的原主題名稱與原隊列ID,然后取消事務消息的消息標簽,重新設置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊列ID固定為0。與其他普通消息區分開,然后完成消息持久化。

到這里,Broker 就初步處理完了 Producer 發送的事務半消息。

半消息事務回查

兩段式協議發送與提交回滾消息,執行完本地事務消息的狀態為UNKNOW時,結束事務不做任何操作。通過事務狀態定時回查得到發送端的事務狀態是rollback或commit。

通過TransactionalMessageCheckService線程定時去檢測RMQ_SYS_TRANS_HALF_TOPIC主題中的消息,回查消息的事務狀態。

  • RMQ_SYS_TRANS_HALF_TOPIC

prepare消息的主題,事務消息首先先進入到該主題。

  • RMQ_SYS_TRANS_OP_HALF_TOPIC

當消息服務器收到事務消息的提交或回滾請求后,會將消息存儲在該主題下。

Broker處理END_TRANSACTION

當Producer或者回查定時任務提交/回滾事務的時候,Broker如何處理事務消息提交、回滾命令的?其核心實現如下:

  • 根據commitlogOffset找到消息
  • 如果是提交動作,就恢復原消息的主題與隊列,再次存入commitlog文件進而轉到消息消費隊列,供消費者消費,然后將原預處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
  • 回滾消息,則直接將原預處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理。

整體實現流程

如果消費端消費失敗了怎么辦?

如果有消息消費失敗了,則將失敗的消息回傳給broker,即重新寫入commitLog文件,消費者將重新消費;如果消息回傳的時候,consumer和broker之間網絡斷開,則consumer會調用submitConsumeRequestLater()方法,在consumer端進行重新消費,如果仍然消費失敗,會不斷重試直到達到默認的16次,你可以使用msg.getReconsumeTimes()方法來獲取當前重試次數,如果重試次數足夠多之后仍然無法消費成功,必須通過工單、日志等方式進行人工干預以讓producer事務進行回退處理。

Producer發送半消息失敗

可能由于網絡或者mq故障,導致 Producer 訂單系統 發送半消息(prepare)失敗。

這時訂單系統可以執行回滾操作,比如“訂單關閉”等,走逆向流程退款給用戶。

半消息發送成功,本地事務執行失敗

如果訂單系統發送的半消息成功了,但是執行本地事務失敗了,如更新訂單狀態為“已完成”。

這種情況下,執行本地事務失敗后,會返回rollback給 MQ,MQ會刪除之前發送的半消息。 也就不會調用優惠券系統了。

半消息發送成功,沒收到MQ返回的響應

假如訂單系統發送半消息成功后,沒有收到MQ返回的響應。

這個時候可能是因為網絡問題,或者其他異常報錯,訂單系統誤以為發送MQ半消息失敗,執行了逆向回滾流程。

但這個時候其實mq已經保存半消息成功了,那這個消息怎么處理?

這個時候MQ的后臺消息回查定時任務TransactionalMessageCheckService會每隔1分鐘掃描一次半消息隊列,判斷是否需要消息回查,然后回查訂單系統的本地事務,這時MQ就會發現訂單已經變成“已關閉”,此時就要發送rollback請求給mq,刪除之前的半消息。

如果commit/rollback失敗了

這個其實也是通過定時任務TransactionalMessageCheckService,它會發現這個消息超過一定時間還沒有進行二階段處理,就會回查本地事務。

小結

消息隊列RocketMQ分布式事務消息不僅可以實現應用之間的解耦,又能保證數據的最終一致性。同時,傳統的大事務可以被拆分為小事務,不僅能提升效率,還不會因為某一個關聯應用的不可用導致整體回滾,從而最大限度保證核心系統的可用性。在極端情況下,如果關聯的某一個應用始終無法處理成功,也只需對當前應用進行補償或數據訂正處理,而無需對整體業務進行回滾。

從RocketMQ事務型消息鏈路體現了面向失敗的設計思路,也體現了事務型系統的嚴謹性,在第二階段的消息沒有送達的時候,broker會主動請求producer端去做check,producer做完check后會將事務的狀態再次返回。雖然說實現最終一致的方案有很多,但是事務型消息是比較優雅實現方式之一。

本文轉載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關注。轉載本文請聯系小汪哥寫代碼公眾號。

 

責任編輯:武曉燕 來源: 小汪哥寫代碼
相關推薦

2021-07-13 11:52:47

順序消息RocketMQkafka

2021-07-08 07:16:24

RocketMQ數據結構Message

2021-07-07 15:29:52

存儲RocketMQ體系

2021-07-09 07:15:48

RocketMQ數據結構kafka

2021-07-16 18:44:42

RocketMQ知識

2021-07-12 10:25:03

RocketMQ數據結構kafka

2023-07-18 09:03:01

RocketMQ場景消息

2021-07-07 07:06:31

Brokerkafka架構

2015-07-28 17:52:36

IOS知識體系

2017-06-22 13:07:21

2012-03-08 11:13:23

企業架構

2017-02-27 16:42:23

Spark識體系

2017-04-03 15:35:13

知識體系架構

2021-07-05 06:26:08

生產者kafka架構

2021-07-08 05:52:34

Kafka架構主從架構

2015-07-16 10:15:44

web前端知識體系

2020-10-26 08:34:18

知識體系普適性

2020-09-09 09:15:58

Nginx體系進程

2020-03-09 10:31:58

vue前端開發

2017-07-25 17:34:54

大數據機器學習數據
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 操操日| 激情av在线| av毛片在线播放 | 色综合天天综合网国产成人网 | 污片在线免费观看 | 免费v片在线观看 | 中文字幕日韩专区 | 美女操网站 | 久久性av| 亚洲一二三视频 | 国产日韩欧美在线观看 | 国产福利在线视频 | 国产成人叼嘿视频在线观看 | 国产91亚洲精品一区二区三区 | 热久久性| 69福利影院 | 午夜视频在线观看一区二区 | 国产欧美久久精品 | 成人国产精品久久 | 国产精品久久久久久久久久免费看 | 日韩免费av | 日本成人中文字幕 | 精品一区二区三区不卡 | 日韩欧美手机在线 | 欧美九九九| 国产欧美精品区一区二区三区 | 日韩一级电影免费观看 | 亚洲高清视频一区二区 | 黄网站在线播放 | 欧美性精品 | 99久久精品国产毛片 | 天天操操 | 污片在线免费观看 | 久久99久久久久 | 久久久久国色av免费观看性色 | 亚洲精久| 福利视频网址 | 亚洲成人免费av | 91资源在线| 国产一级大片 | 久久久国产一区二区三区 |