基于 RocketMQ 的可靠事件處理策略
Seata 框架本身并沒有內(nèi)置針對可靠事件模式的解決方案,但我們可以使用另一款已經(jīng)介紹過的框架來實(shí)現(xiàn)這一目標(biāo),就是 RocketMQ。
RocketMQ 為開發(fā)人員提供了事務(wù)消息這一消息類型,專門用來應(yīng)對分布式環(huán)境下的數(shù)據(jù)一致性問題。
事務(wù)消息的基本概念
事務(wù)消息是RocketMQ提供的一種高級(jí)消息類型,支持在分布式場景下消息生產(chǎn)和本地事務(wù)的最終一致性。我們可以分別從生產(chǎn)者和消費(fèi)者維度出發(fā)來分析可靠事件實(shí)現(xiàn)上的需求。
- 消息發(fā)送方:對于消息發(fā)送方而言,我們需要解決執(zhí)行本地事務(wù)與發(fā)送消息的原子性問題,即保證本地事務(wù)執(zhí)行成功,消息一定發(fā)送成功。
- 消息接收方:對于消息接收方而言,我們需要解決接收消息與本地事務(wù)的原子性問題,即保證接收消息成功后,本地事務(wù)也一定執(zhí)行成功。
事務(wù)消息的出現(xiàn)完美解決了可靠事件模式執(zhí)行過程中可能出現(xiàn)的問題。事務(wù)消息提供了類似X/Open XA的分布事務(wù)功能,通過事務(wù)消息能達(dá)到分布式事務(wù)的最終一致性。
那么,RocketMQ 是如何做到這一點(diǎn)的呢?關(guān)鍵就在于它所提供的半消息機(jī)制。
所謂半消息(Half Message),是指暫不能投遞的消息。發(fā)送方已經(jīng)將消息成功發(fā)送到了服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對該消息的二次確認(rèn),此時(shí)該消息被標(biāo)記成暫不能投遞狀態(tài),處于該種狀態(tài)下的消息就是半消息。
介紹完半消息的概念,我們再來明確什么是半消息回查。
我們知道由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,可能會(huì)導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失。RocketMQ 服務(wù)端通過掃描發(fā)現(xiàn)某條消息長期處于半消息狀態(tài)時(shí),就會(huì)主動(dòng)向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit 或 Rollback),這一過程就是半消息回查。圖 1 展示了 RocketMQ 中事務(wù)消息的整體架構(gòu)。
圖1 RocketMQ 事務(wù)消息架構(gòu)
進(jìn)一步,我們梳理 RocketMQ 事務(wù)消息的執(zhí)行過程,如圖2所示。
圖2 RocketMQ 事務(wù)消息執(zhí)行過程
可以看到,圖 2 存在服務(wù)A和服務(wù)B這兩個(gè)微服務(wù)。其中服務(wù) A 是消息發(fā)布者,而服務(wù) B 是消息消費(fèi)者,我們需要確保兩者之間數(shù)據(jù)的一致性。這里有 7 個(gè)步驟。
- 服務(wù) A 向 RocketMQ 服務(wù)端發(fā)送事務(wù)消息。
- RocketMQ 將消息持久化成功之后,向服務(wù)A確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半消息。
- 服務(wù) A 開始執(zhí)行本地事務(wù)邏輯。
- 服務(wù) A 根據(jù)本地事務(wù)執(zhí)行結(jié)果向 RocketMQ 提交二次確認(rèn)(Commit 或是 Rollback)。如果 RocketMQ 收到 Commit 結(jié)果,則將半消息標(biāo)記為可投遞,服務(wù) B 最終將收到該消息。而如果 RocketMQ 收到 Rollback 結(jié)果,則刪除半消息,服務(wù) B 將不會(huì)接受該消息。
- 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,步驟 4 提交的二次確認(rèn)最終未到達(dá) RocketMQ,經(jīng)過一定時(shí)間后 RocketMQ 將基于該消息向服務(wù) A 發(fā)起消息回查。
- 服務(wù) A 收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
- 服務(wù) A 根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),RocketMQ 仍按照步驟 4 對半消息進(jìn)行操作。
圖 2 更多是站在消息發(fā)布者的角度看待事務(wù)消息的發(fā)布流程。而針對消息消費(fèi)而言,如果消費(fèi)者處理事務(wù)消息時(shí)出現(xiàn)異常,RocketMQ 會(huì)進(jìn)行重試操作,直到消息消費(fèi)和本地事務(wù)處理都成功。這是一種回調(diào)機(jī)制,會(huì)被 RocketMQ 自動(dòng)調(diào)用。
事務(wù)消息開發(fā)模式
介紹完 RocketMQ 事務(wù)消息的基本概念和執(zhí)行流程之后,我們接著介紹它的開發(fā)模式。
實(shí)現(xiàn)消息發(fā)布者
當(dāng)我們在微服務(wù)架構(gòu)中引入事務(wù)消息之前,需要?jiǎng)?chuàng)建一張事務(wù)執(zhí)行記錄表。事務(wù)執(zhí)行記錄表的作用有兩個(gè):一個(gè)是實(shí)現(xiàn)事務(wù)回查,另一個(gè)則是實(shí)現(xiàn)業(yè)務(wù)層冪等控制。
事務(wù)執(zhí)行記錄表的創(chuàng)建腳本如以下代碼所示。
代碼清單1 事務(wù)執(zhí)行記錄表 SQL 定義代碼
CREATE TABLE `tx_record` (
`tx_no` varchar(64) NOT NULL COMMENT '事務(wù)Id',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',
PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='事務(wù)記錄表'
接下來我們要引入 RocketMQ 內(nèi)置的TransactionListener接口。
為了實(shí)現(xiàn)事務(wù)消息,開發(fā)人員的主要開發(fā)工作量就體現(xiàn)在對這個(gè)接口的實(shí)現(xiàn)過程中。TransactionListener接口的定義如下所示。
代碼清單2 TransactionListener接口定義代碼
public interface TransactionListener {
//當(dāng)發(fā)送事務(wù)消息成功之后,該方法會(huì)被觸發(fā),本地事務(wù)將被執(zhí)行
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
//當(dāng)沒有收到事務(wù)消息的響應(yīng)時(shí),服務(wù)器會(huì)發(fā)送確認(rèn)消息來檢查事務(wù)狀態(tài),該方法會(huì)被觸發(fā)并獲取本地事務(wù)狀態(tài)
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
可以看到,TransactionListener接口的兩個(gè)方法分別完成了本地事務(wù)執(zhí)行和本地事務(wù)回查這兩個(gè)核心操作。那么我們應(yīng)該如何實(shí)現(xiàn)這兩個(gè)方法呢?這里給出這兩個(gè)方法的執(zhí)行偽代碼。
代碼清單3 TransactionListener接口兩個(gè)方法實(shí)現(xiàn)偽代碼
executeLocalTransaction {
執(zhí)行本地事務(wù)
如果失敗就選擇回滾事務(wù),反之提交事務(wù)
}
checkLocalTransaction {
實(shí)現(xiàn)事務(wù)回查
根據(jù)事務(wù)執(zhí)行記錄判斷,已執(zhí)行則提交事務(wù)
}
注意:這兩個(gè)方法需要消息的發(fā)布者來實(shí)現(xiàn),但調(diào)用方是 RocketMQ 自身,而且這個(gè)調(diào)用過程是自動(dòng)觸發(fā)的,不需要開發(fā)做任何干預(yù)。
圖 3 圍繞消息發(fā)布者展示了其所需要實(shí)現(xiàn)的各個(gè)核心步驟。
圖3 事務(wù)消息中消息發(fā)布者實(shí)現(xiàn)過程
如果我們使用 Spring 框架來集成 RocketMQ,那么圖 3 中的業(yè)務(wù)服務(wù)實(shí)現(xiàn)類的實(shí)現(xiàn)過程可以參考如下代碼示例。
代碼清單3 消息發(fā)布端業(yè)務(wù)服務(wù)實(shí)現(xiàn)類示例代碼。
@Service
public class CustomerTicketServiceImpl implements ICustomerTicketService {
@Autowired
TxRecordMapper txRecordMapper;
@Autowired
RocketMQTemplate rocketMQTemplate;
@Override
public void generateTicket(AddCustomerTicketReqVO addCustomerTicketReqVO) {
//從VO中創(chuàng)建TicketGeneratedEvent
TicketGeneratedEvent ticketGeneratedEvent = createTicketGeneratedEvent(addCustomerTicketReqVO);
//將Event轉(zhuǎn)化為JSON對象
JSONObject jsonObject =new JSONObject();
jsonObject.put("ticketGeneratedEvent",ticketGeneratedEvent);
String jsonString = jsonObject.toJSONString();
//生成消息對象
Message<String> message = MessageBuilder.withPayload(jsonString).build();
//發(fā)送事務(wù)消息
rocketMQTemplate.sendMessageInTransaction("producer_group_ticket","topic_ticket",message,null);
}
@Override
@Transactional
public void doGenerateTicket(TicketGeneratedEvent ticketGeneratedEvent) {
//冪等判斷
if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){
return ;
}
//插入工單
CustomerTicket customerTicket = CustomerTicketConverter.INSTANCE.convertEvent(ticketGeneratedEvent);
customerTicket.setStatus(1);
save(customerTicket);
//添加事務(wù)日志
txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo());
}
...
}
上述代碼展示的是一個(gè)插入客服工單(CustomerTicket)的過程,generateTicket和doGenerateTicket方法分別對應(yīng)圖 3 中的發(fā)送消息和執(zhí)行本地事務(wù)這兩個(gè)環(huán)節(jié)。
注意:這里使用了RocketMQTemplate的sendMessageInTransaction方法來發(fā)送事務(wù)消息。同時(shí),我們也看到了事務(wù)執(zhí)行記錄表的一種應(yīng)用場景,即實(shí)現(xiàn)業(yè)務(wù)層冪等控制。
接下來繼續(xù)實(shí)現(xiàn)圖3所示的TransactionListener接口,示例代碼如下:
代碼清單4 TransactionListener接口實(shí)現(xiàn)類示例代碼。
@Component
@RocketMQTransactionListener(txProducerGroup = "producer_group_ticket")
public class ProducerListener implements RocketMQLocalTransactionListener {
@Autowired
ICustomerTicketService customerTicketService;
@Autowired
TxRecordMapper txRecordMapper;
//事務(wù)消息發(fā)送后的回調(diào)方法,當(dāng)消息發(fā)送給MQ成功,此方法被回調(diào)
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
//解析消息,轉(zhuǎn)成Event對象
TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message);
//執(zhí)行本地事務(wù)
customerTicketService.doGenerateTicket(ticketGeneratedEvent);
//當(dāng)返回RocketMQLocalTransactionState.COMMIT,自動(dòng)向MQ發(fā)送commit消息,MQ將消息的狀態(tài)改為可消費(fèi)
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
//如果本地事務(wù)執(zhí)行失敗,就將消息設(shè)置為回滾狀態(tài)
return RocketMQLocalTransactionState.ROLLBACK;
}
}
//事務(wù)狀態(tài)回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
//解析消息,轉(zhuǎn)成Event對象
TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message);
//根據(jù)事務(wù)Id判斷是否存在已執(zhí)行的事務(wù)
Boolean isTxNoExisted = Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()));
//如果事務(wù)已執(zhí)行則返回COMMIT,反之返回UNKNOWN狀態(tài)
if(isTxNoExisted){
return RocketMQLocalTransactionState.COMMIT;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
...
}
這段代碼清晰地展示了TransactionListener接口中兩個(gè)核心方法的實(shí)現(xiàn)過程。在executeLocalTransaction方法中,我們通過調(diào)用CustomerTicketService業(yè)務(wù)服務(wù)類的doGenerateTicket方法完成了本地事務(wù);而在checkLocalTransaction方法中,我們則實(shí)現(xiàn)了事務(wù)回查機(jī)制。這里同樣展示了事務(wù)執(zhí)行記錄表的另一種應(yīng)用場景,即實(shí)現(xiàn)事務(wù)回查。
實(shí)現(xiàn)消息消費(fèi)者
類似,當(dāng)使用事務(wù)消息時(shí),消息消費(fèi)者的實(shí)現(xiàn)過程同樣遵循一定的開發(fā)規(guī)范,如圖 4 所示。
圖4 事務(wù)消息中消息消費(fèi)者實(shí)現(xiàn)過程
可以看到,相比于消息發(fā)布者,消息消費(fèi)者的實(shí)現(xiàn)過程要簡單很多。
代碼清單5 消息消費(fèi)實(shí)現(xiàn)類示例代碼。
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_ticket",topic = "topic_ticket")
public class Consumer implements RocketMQListener<String> {
@Autowired
IChatRecordService chatRecordService;
//接收消息
@Override
public void onMessage(String message) {
log.info("開始消費(fèi)消息:{}",message);
//解析消息
JSONObject jsonObject = JSONObject.parseObject(message);
String ticketGeneratedEventString = jsonObject.getString("ticketGeneratedEvent");
//轉(zhuǎn)成TicketGeneratedEvent
TicketGeneratedEvent ticketGeneratedEvent = JSONObject.parseObject(ticketGeneratedEventString, TicketGeneratedEvent.class);
//添加本地聊天記錄
chatRecordService.generateChatRecord(ticketGeneratedEvent);
}
}
可以看到,這個(gè)消息消費(fèi)者的實(shí)現(xiàn)過程沒有任何特殊之處,我們只需要實(shí)現(xiàn)RocketMQListener接口的onMessage方法,并在該方法中調(diào)用業(yè)務(wù)服務(wù)實(shí)現(xiàn)類中的業(yè)務(wù)方法即可。
消費(fèi)者端的業(yè)務(wù)服務(wù)實(shí)現(xiàn)類的實(shí)現(xiàn)過程如下。
代碼清單6 消息消費(fèi)端業(yè)務(wù)服務(wù)實(shí)現(xiàn)類示例代碼:
@Service
public class ChatRecordServiceImpl implements IChatRecordService {
@Autowired
TxRecordMapper txRecordMapper;
@Override
@Transactional
public void generateChatRecord(TicketGeneratedEvent ticketGeneratedEvent) {
//冪等判斷
if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){
return ;
}
//插入聊天記錄
ChatRecord chatRecord = ChatRecordConverter.INSTANCE.convertEvent(ticketGeneratedEvent);
save(chatRecord);
//添加事務(wù)日志
txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo());
}
}
這里同樣通過事務(wù)執(zhí)行記錄表實(shí)現(xiàn)了業(yè)務(wù)層冪等控制,并最終完成本地事務(wù)的提交。
作為總結(jié),我們使用時(shí)序圖來詳細(xì)展示事務(wù)消息發(fā)送和消費(fèi)過程,如圖 5 所示。
圖5 事務(wù)消息發(fā)送和消費(fèi)時(shí)序圖