真實案例 | 如何處理 MQ 消費失敗的消息?
01、背景介紹
再次回顧一下當時提過的問題,為何項目中要引用 MQ 消息中間件?
我們知道,在電商平臺中常見的用戶下單,會經歷以下幾個流程。
當用戶下單時,創建完訂單之后,會調用第三方支付平臺,對用戶的賬戶金額進行扣款,如果平臺支付扣款成功,會將結果通知到對應的業務系統,接著業務系統會更新訂單狀態,同時調用倉庫接口,進行減庫存,通知物流進行發貨!
圖片
試想一下,從訂單狀態更新、到扣減庫存、通知物流發貨都在一個方法內同步完成,假如用戶支付成功、訂單狀態更新也成功,但是在扣減庫存或者通知物流發貨步驟失敗了,那么就會造成一個問題,用戶已經支付成功了,只是在倉庫扣減庫存方面失敗,從而導致整個交易失?。?/p>
一單失敗,老板可以假裝看不見,但是如果上千個單子都因此失敗,那么因系統造成的業務損失,將是巨大的,老板可能坐不住了!
因此,針對這種業務場景,架構師們引入了異步通信技術方案,從而保證服務的高可用,大體流程如下:
圖片
當訂單系統收到支付平臺發送的扣款結果之后,會將訂單消息發送到 MQ 消息中間件,同時也會更新訂單狀態。
在另一端,由倉庫系統來異步監聽訂單系統發送的消息,當收到訂單消息之后,再操作扣減庫存、通知物流公司發貨等服務!
在優化后的流程下,即使扣減庫存服務失敗,也不會影響用戶交易。
正如《人月神話》中所說的,軟件工程,沒有銀彈!
當引入了 MQ 消息中間件之后,同樣也會帶來另一個問題,假如 MQ 消息中間件突然宕機了,導致消息無法發送出去,那倉庫系統就無法接受到訂單消息,進而也無法發貨!
針對這個問題,業界主流的解決辦法是采用集群部署,一主多從模式,從而實現服務的高可用,即使一臺機器突然宕機了,也依然能保證服務可用,在服務器故障期間,通過運維手段,將服務重新啟動,之后服務依然能正常運行!
但是還有另一個問題,假如倉庫系統已經收到訂單消息了,但是業務處理異常,或者服務器異常,導致當前商品庫存并沒有扣減,也沒有發貨!
這個時候又改如何處理呢?
今天我們所要介紹的正是這種場景,假如消息消費失敗,我們應該如何處理?
02、解決方案
針對消息消費失敗的場景,我們一般會通過如下方式進行處理:
- 當消息消費失敗時,會對消息進行重新推送
- 如果重試次數超過最大值,會將異常消息存儲到數據庫,然后人工介入排查問題,進行手工重試
圖片
當消息在客戶端消費失敗時,我們會將異常的消息加入到一個消息重試對象中,同時設置最大重試次數,并將消息重新推送到 MQ 消息中間件里,當重試次數超過最大值時,會將異常的消息存儲到 MongoDB
數據庫中,方便后續查詢異常的信息。
基于以上系統模型,我們可以編寫一個公共重試組件,話不多說,直接干!
03、代碼實踐
本次補償服務采用 rabbitmq 消息中間件進行處理,其他消息中間件處理思路也類似!
3.1、創建一個消息重試實體類
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MessageRetryDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 原始消息body
*/
private String bodyMsg;
/**
* 消息來源ID
*/
private String sourceId;
/**
* 消息來源描述
*/
private String sourceDesc;
/**
* 交換器
*/
private String exchangeName;
/**
* 路由鍵
*/
private String routingKey;
/**
* 隊列
*/
private String queueName;
/**
* 狀態,1:初始化,2:成功,3:失敗
*/
private Integer status = 1;
/**
* 最大重試次數
*/
private Integer maxTryCount = 3;
/**
* 當前重試次數
*/
private Integer currentRetryCount = 0;
/**
* 重試時間間隔(毫秒)
*/
private Long retryIntervalTime = 0L;
/**
* 任務失敗信息
*/
private String errorMsg;
/**
* 創建時間
*/
private Date createTime;
@Override
public String toString() {
return "MessageRetryDTO{" +
"bodyMsg='" + bodyMsg + '\'' +
", sourceId='" + sourceId + '\'' +
", sourceDesc='" + sourceDesc + '\'' +
", exchangeName='" + exchangeName + '\'' +
", routingKey='" + routingKey + '\'' +
", queueName='" + queueName + '\'' +
", status=" + status +
", maxTryCount=" + maxTryCount +
", currentRetryCount=" + currentRetryCount +
", retryIntervalTime=" + retryIntervalTime +
", errorMsg='" + errorMsg + '\'' +
", createTime=" + createTime +
'}';
}
/**
* 檢查重試次數是否超過最大值
*
* @return
*/
public boolean checkRetryCount() {
retryCountCalculate();
//檢查重試次數是否超過最大值
if (this.currentRetryCount < this.maxTryCount) {
return true;
}
return false;
}
/**
* 重新計算重試次數
*/
private void retryCountCalculate() {
this.currentRetryCount = this.currentRetryCount + 1;
}
}
3.2、編寫服務重試抽象類
public abstract class CommonMessageRetryService {
private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MongoTemplate mongoTemplate;
/**
* 初始化消息
*
* @param message
*/
public void initMessage(Message message) {
log.info("{} 收到消息: {},業務數據:{}", this.getClass().getName(), message.toString(), new String(message.getBody()));
try {
//封裝消息
MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
if (log.isInfoEnabled()) {
log.info("反序列化消息:{}", messageRetryDto.toString());
}
prepareAction(messageRetryDto);
} catch (Exception e) {
log.warn("處理消息異常,錯誤信息:", e);
}
}
/**
* 準備執行
*
* @param retryDto
*/
protected void prepareAction(MessageRetryDTO retryDto) {
try {
execute(retryDto);
doSuccessCallBack(retryDto);
} catch (Exception e) {
log.error("當前任務執行異常,業務數據:" + retryDto.toString(), e);
//執行失敗,計算是否還需要繼續重試
if (retryDto.checkRetryCount()) {
if (log.isInfoEnabled()) {
log.info("重試消息:{}", retryDto.toString());
}
retrySend(retryDto);
} else {
if (log.isWarnEnabled()) {
log.warn("當前任務重試次數已經到達最大次數,業務數據:" + retryDto.toString(), e);
}
doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
}
}
}
/**
* 任務執行成功,回調服務(根據需要進行重寫)
*
* @param messageRetryDto
*/
private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
try {
successCallback(messageRetryDto);
} catch (Exception e) {
log.warn("執行成功回調異常,隊列描述:{},錯誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}
/**
* 任務執行失敗,回調服務(根據需要進行重寫)
*
* @param messageRetryDto
*/
private void doFailCallBack(MessageRetryDTO messageRetryDto) {
try {
saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
failCallback(messageRetryDto);
} catch (Exception e) {
log.warn("執行失敗回調異常,隊列描述:{},錯誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}
/**
* 執行任務
*
* @param messageRetryDto
*/
protected abstract void execute(MessageRetryDTO messageRetryDto);
/**
* 成功回調
*
* @param messageRetryDto
*/
protected abstract void successCallback(MessageRetryDTO messageRetryDto);
/**
* 失敗回調
*
* @param messageRetryDto
*/
protected abstract void failCallback(MessageRetryDTO messageRetryDto);
/**
* 構建消息補償實體
* @param message
* @return
*/
private MessageRetryDTO buildMessageRetryInfo(Message message){
//如果頭部包含補償消息實體,直接返回
Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();
if(messageHeaders.containsKey("message_retry_info")){
Object retryMsg = messageHeaders.get("message_retry_info");
if(Objects.nonNull(retryMsg)){
return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
}
}
//自動將業務消息加入補償實體
MessageRetryDTO messageRetryDto = new MessageRetryDTO();
messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
messageRetryDto.setCreateTime(new Date());
return messageRetryDto;
}
/**
* 異常消息重新入庫
* @param retryDto
*/
private void retrySend(MessageRetryDTO retryDto){
//將補償消息實體放入頭部,原始消息內容保持不變
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
}
/**
* 將異常消息存儲到mongodb中
* @param retryDto
*/
private void saveMessageRetryInfo(MessageRetryDTO retryDto){
try {
mongoTemplate.save(retryDto, "message_retry_info");
} catch (Exception e){
log.error("將異常消息存儲到mongodb失敗,消息數據:" + retryDto.toString(), e);
}
}
}
3.3、編寫監聽服務類
在消費端應用的時候,也非常簡單,例如,針對扣減庫存操作,我們可以通過如下方式進行處理!
@Component
public class OrderServiceListener extends CommonMessageRetryService {
private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);
/**
* 監聽訂單系統下單成功消息
* @param message
*/
@RabbitListener(queues = "mq.order.add")
public void consume(Message message) {
log.info("收到訂單下單成功消息: {}", message.toString());
super.initMessage(message);
}
@Override
protected void execute(MessageRetryDTO messageRetryDto) {
//調用扣減庫存服務,將業務異常拋出來
}
@Override
protected void successCallback(MessageRetryDTO messageRetryDto) {
//業務處理成功,回調
}
@Override
protected void failCallback(MessageRetryDTO messageRetryDto) {
//業務處理失敗,回調
}
}
當消息消費失敗,并超過最大次數時,會將消息存儲到 mongodb 中,然后像常規數據庫操作一樣,可以通過web接口查詢異常消息,并針對具體場景進行重試!
04、小結
可能有的同學會問,為啥不將異常消息存在數據庫?
起初的確是存儲在 MYSQL 中,但是隨著業務的快速發展,訂單消息數據結構越來越復雜,數據量也非常的大,甚至大到 MYSQL 中的 text 類型都無法存儲,同時這種數據結構也不太適合在 MYSQL 中存儲,因此將其遷移到 mongodb!
本文主要圍繞消息消費失敗這種場景,進行基礎的方案和代碼實踐講解,如果有描述不對的地方,歡迎大家留言指出!
05、參考
1、https://blog.csdn.net/qq_42046105/article/details/114156904