分布式場景下的事務機制
事務消息是RocketMQ的一個非常特色的高級特性,它的基礎訴求是通過RocketMQ的事務機制,來保證上下游的數據?致性。
我們在單機版本下面只需要在業務方法上加上對應的事務就可以達到效果,但是分布式的場景下,多個系統之間的協調配合,你無法知道到底是那個先執行那個后執行,當然在微服務里面存在Seate框架來保證事務,但是這事務的保證始終是心頭大患,只能用一句話形容魚和熊掌不可兼得。
而RocketMq的事務消息能夠在提升性能的情況下滿足要求,其主要實現是支持分布式情況下保障消息生產和本地事務的最終一致性,消息生產我們可以使用順序消息去執行,這樣我們只需要滿足這兩個的事務即可。
實現過程
圖片
準備階段:生產者將消息發送到Broker,Broker向生產者發送ack表示消息發送成功,但是此時的消息為一個等待狀態,不會被消費者去消費。(生產者繼續執行接下來的代碼)
確認階段:當我們執行完所有的代碼后,本地事務要么回滾要么提交,此時當我們了解本地事務的狀態后,將結果推送給Broker做二次確認結果,如果為Commit則將修改激活準備推送給消費者,如果為Rollback則將消息進行回滾。
補償機制:當出現異常情況沒有發生二次確認,此時我們在固定時間后將會進行回查,檢查回查消息對應的本地事務的狀態,重寫Commit或者Rollback。
涉及狀態以及注意點
事務消息存在三種狀態:
CommitTransaction:提交事務狀態,此狀態下允許消費者消費。
RollbackTransaction:回滾事務狀態,此狀態下消息會被刪除。
Unknown:中間狀態,此狀態下會等待本地事務處理結果進行對應操作。
注意點:
本消息狀態是一種對消費者不可見的狀態,將消息的內容放到系統Topic的RMQ_SYS_TRANS_HALF_TOPIC隊列里面去。
事務消息中的相關參數可以進行設置,比如:本地事務回查次數transactionCheckMax默認15次,本地事務回查的間隙transactionCheckInterval默認60s,超出后會直接將消息丟棄。
RocketMQ的事務消息是指應用本地事務和發送消息操作可以定義到全局事務中,要么同時成功,要么同時失敗,通過RocketMQ的事務信息可以實現可靠消息的最終一致性方案。
源碼解析
Producer端通過構建TransactionMQProducer對象綁定事務監聽。
TransactionListener transactionListener = new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return LocalTransactionState.COMMIT_MESSAGE; }
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { return LocalTransactionState.COMMIT_MESSAGE; }};TransactionMQProducer producer = new TransactionMQProducer(producerGroupTemp);producer.setTransactionListener(transactionListener);producer.setNamesrvAddr("127.0.0.1:9876");product.start();SendResult result = producer.sendMessageInTransaction(message, arg);
執行sendMessageInTransaction方法來發送消息。
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 檢查TransactionListener是否存在,如果不存在就直接拋異常
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// 事務消息不支持延遲等特性
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 設置half屬性,表明是事務屬性
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// 設置所屬生成者組
// broker向生產者發送回查事務請求根據這個producergroup找到指定的channel
// 生產者能找到所有在同一個組的機器實例從而檢查事務狀態
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
// 同步發送
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
// 消息返回信息
switch (sendResult.getSendStatus()) {
// 第一階段消息發送成功
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
// 設置事務ID屬性
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
// 執行本地事務
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
// 發送消息成功后,執行本地操作
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
// 本地事務執行完畢向broker提交事務或回滾事務
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
首先發送第一階段信息直接返回半提交狀態,然后執行本地事務返回事務的三種狀態,未知,回滾,提交,最后執行endTransaction方法,把事務執行的狀態告訴broker。
endTransaction方法
根據本地事務執行狀態構建requestHeader對象執行二階段提交。
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
// 獲取消息中的MessageId
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
// 找到broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
// 構建EndTransactionRequestHeader對象
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
// offset是prepare消息中offsetMsgId中獲取的
requestHeader.setCommitLogOffset(id.getOffset());
requestHeader.setBname(sendResult.getMessageQueue().getBrokerName());
// 社會提交/回滾狀態
switch (localTransactionState) {
case COMMIT_MESSAGE:
// 提交
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
// 回滾
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
// 未知
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 發送給broker端
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
將本地方法執行事務的結果發送給Broker,通過endTransactionOneway方法創建Code為END_TRANSACTION的消息,然后在Broker就會找出對應的Processor來處理。
Broker端處理
Broker總共存在兩個處理,首先針對第一個階段發送的Half消息,broker要進行相關的操作,后面endTransaction提交進來的事務狀態,針對三種狀態進行相關操作。
接收第一階段發送的Half消息
SendMessageProcessor的sendMessage方法中去執行處理事務消息。
// 發送Half消息時,在屬性中設置了PROPERTY_TRANSACTION_PREPARED為true,這里根據這個屬性判斷是否是事務消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
// 事務消息進入這里,把消息的topic改成RMQ_SYS_TRANS_HALF_TOPIC,以同步刷盤的方式存入store
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
}
如果消息攜帶事務標記就去執行TransactionMessageService類的prepareMessage方法進行相關的處理。
// 解析Half消息
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 把真實的topic和真實的queueId放在消息的屬性中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 設置默認的事務狀態為TRANSACTION_NOT_TYPE=>unknow
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 將消息的topic設置為RMQ_SYS_TRANS_HALF_TOPIC,這個是對消費者不可見的
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
// 設置queueId=0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
進行topic的切換,將原來的topic存入到消息的屬性里面,將消息的topic設置為RMQ_SYS_TRANS_HALF_TOPIC。
處理endTransaction方法
在endTransaction方法中將消息同步給Broker處理的Code對應為END_TRANSACTION,Broker就會找出對應的Processor來處理該類即調用EndTransactionProcessor類的processRequest方法處理。
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 根據commitLogOffset獲取文件中的message,獲取到了返回success
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 檢查消息是否一致
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 生成要保存的消息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 把真實的topic消息存儲到CommitLog中
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 移除prepare消息,存入opQueueMap中
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
// 回滾
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 查詢到half消息則返回成功
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 檢查消息是否一致
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 移除prepare消息,存入opQueueMap中
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
僅僅展示相關核心代碼,其主要邏輯:首先去判斷請求的方式是commit還是rollback,如果是commit查詢到消息還原消息原來的topic,然后刪除half topic上的消息轉存到opQueueMap中,如果是rollback直接進行刪除half topic上的消息并轉存到opQueueMap中去。
注意:opQueueMap的引入為了解決有可能出現網絡、進程、線程等各種因素導致消費端未能成功處理消息的情況,該機制的作用是在消費者端將未成功處理的消息重新發送到服務端進行重試,直到確認消息已經被成功處理或者達到最大重試次數后進行回滾操作。而 Op 消息本身則是通過修改消息狀態來實現的。
消息回查
當網絡中斷或者響應超時等各種異常信息導致消息并沒有傳送到broker端去,為了解決這一問題在Broker就開啟一個回查線程每隔一分鐘執行一次處理超過6s未回查的消息,當超過15次回查后直接將消息丟棄。
在啟動BrokerController類時,會去調用startProcessorByHa方法如果是Master節點就會去啟動一個線程每隔6s處理未回查的消息,檢查最大次數為15次。
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
// 檢查回查消息 timeout = 6s checkMax=15
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
在check方法里面去調用listener.resolveHalfMsg(msgExt)方法去處理事務消息。
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
執行sendCheckMessage方法發送一個檢查事務狀態的Code為CHECK_TRANSACTION_STATE的消息,在客戶端MQClientAPIImpl初始化的時候就會去注冊一個Code對應的Processor,最終就會去執行checkTransactionState方法,判斷本地事務的狀態,然后再去執行endTransactionOneway發起END_TRANSACTION處理。
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
// 執行線程方法
@Override
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
// 檢查本地事務
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
// 處理事務狀態
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
//
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
thisHeader.setBname(checkRequestHeader.getBname());
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
switch (localTransactionState) {
// 提交狀態
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
// 回滾狀態
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
// 未知狀態
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}
String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);
try {
// 再次執行endTransactionOneway發起END_TRANSACTION
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
this.checkExecutor.submit(request);
}
總結
首先客戶端Producer通過sendMessageInTransaction方法發送事務消息,Broker判斷是事務消息就將消息topic存入到RMQ_SYS_TRANS_HALF_TOPIC返回給客戶端,客戶端繼續執行邏輯。
然后調用endTransaction方法去提交本地事務通過endTransactionOneway將消息提交給Broker端,Broker端通過Code為END_TRANSACTION的處理器去處理消息調用processRequest方法來處理對應的消息,
如果由于各種原因導致消息的失敗傳輸,為了防止這些現象的出現所以在BrokerController啟動時就啟動一個線程每隔6s處理未回查的消息(檢查最大次數為15次)的任務來進行消息的回查,簡單來說就是通過sendCheckMessage方法去注冊一個Code為CHECK_TRANSACTION_STATE的消息將內容發送給客戶端,然后客戶端在啟動時也注冊對應Code的處理邏輯,通過processTransactionState方法去處理事務的狀態,如果正常最后還是會去執行endTransactionOneway方法,完成事務消息。