成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Spring Boot + RabbitMQ 消息100%可靠?3大絕招 + 手動 ACK,徹底告別丟失!

開發(fā) 架構(gòu)
我們還提到 死信隊列 及 消息軌跡監(jiān)控 作為生產(chǎn)級增強方案,進一步提升系統(tǒng)穩(wěn)定性。如果你在生產(chǎn)環(huán)境中使用 RabbitMQ,建議你根據(jù)本篇內(nèi)容進行配置,確保消息 零丟失、零誤判,構(gòu)建更加健壯的消息隊列架構(gòu)。

在分布式系統(tǒng)架構(gòu)中,RabbitMQ 作為強大的消息中間件,廣泛應用于訂單、庫存、支付等核心業(yè)務場景。然而,消息丟失問題時有發(fā)生,例如:

  • 訂單支付通知丟失導致客戶已付款但系統(tǒng)未更新狀態(tài);
  • 庫存扣減消息丟失導致庫存數(shù)據(jù)與實際銷量不一致;
  • 消息無法到達消費者導致業(yè)務流程中斷,嚴重影響用戶體驗。

要解決這些問題,我們需要建立 高可靠性的 RabbitMQ 消息傳輸機制,確保消息 生產(chǎn)、存儲、消費 三個環(huán)節(jié)的穩(wěn)定性。本文將基于 Spring Boot 3.4,介紹 生產(chǎn)者確認機制、消息持久化、手動 ACK 三大核心策略,并提供完整的可運行代碼示例,幫助你徹底告別 RabbitMQ 消息丟失。

消息丟失的3大“案發(fā)現(xiàn)場”

生產(chǎn)者消息投遞失敗

  • 問題原因網(wǎng)絡抖動、RabbitMQ 服務宕機、路由配置錯誤;
  • 后果消息未成功發(fā)送,導致數(shù)據(jù)不一致;
  • 解決方案生產(chǎn)者 Confirm 模式 + Return 回調(diào) 機制。

MQ 服務崩潰

  • 問題原因RabbitMQ 服務器故障、磁盤損壞、未開啟消息持久化;
  • 后果未持久化的消息在 RabbitMQ 宕機后丟失;
  • 解決方案交換機、隊列、消息持久化,保證消息不會因重啟而丟失。

消費者崩潰

  • 問題原因:消費者在處理消息時異常退出,或者自動 ACK 機制導致 RabbitMQ 認為消息已消費;
  • 后果:消息被 RabbitMQ 移除,但實際業(yè)務未處理成功;
  • 解決方案手動 ACK + 冪等性控制,確保消息消費的可靠性。

生產(chǎn)者可靠性:Confirm模式 + Return機制

啟用Confirm與Return機制

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 啟用Confirm模式
    publisher-returns: true             # 啟用Return機制
    template:
      mandatory: true                   # 讓生產(chǎn)者接收未被路由的消息通知

實現(xiàn)Confirm回調(diào)(確保消息成功落庫)

package com.icoderoad.mq;


import com.icoderoad.mapper.MessageLogMapper;
import com.icoderoad.model.MessageStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class MqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {


    @Autowired
    private MessageLogMapper messageLogMapper;


    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("? 消息到達 Exchange,ID:{}", correlationData.getId());
            messageLogMapper.updateStatus(correlationData.getId(), MessageStatus.SENT);
        } else {
            log.error("? 消息投遞失敗,ID:{},原因:{}", correlationData.getId(), cause);
        }
    }


    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("?? 消息路由失敗!交換機:{},路由鍵:{},消息:{}",
                returned.getExchange(), returned.getRoutingKey(), new String(returned.getMessage().getBody()));
    }
}

發(fā)送消息(攜帶唯一消息ID)

public void sendOrder(Order order) {
    String msgId = UUID.randomUUID().toString();
    messageLogMapper.insert(new MessageLog(msgId, order, MessageStatus.SENDING));


    rabbitTemplate.convertAndSend(
        "order-exchange", 
        "order.create", 
        order, 
        message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            message.getMessageProperties().setMessageId(msgId);
            return message;
        },
        new CorrelationData(msgId)
    );
}

MQ可靠性:隊列/消息持久化

package com.icoderoad;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {


    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order-exchange", true, false);
    }


    @Bean
    public Queue orderQueue() {
        return new Queue("order.queue", true);
    }


    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.create");
    }
}

消費者可靠性:手動ACK + 冪等性

關(guān)閉自動ACK,改為手動

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 開啟手動ACK
        prefetch: 10              # 限制單次拉取消息數(shù)

處理訂單消息(確保冪等性 + 手動ACK

package com.icoderoad.consumer;


import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Header;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.support.AmqpHeaders;
import java.util.concurrent.TimeUnit;


@Slf4j
@Component
public class OrderConsumer {


    @Autowired
    private OrderService orderService;
    @Autowired
    private RedisTemplate<String, String> redisTemplate;


    @RabbitListener(queues = "order.queue")
    public void handleOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            String redisKey = "order:" + order.getOrderNo();
            if (redisTemplate.opsForValue().setIfAbsent(redisKey, "processing", 30, TimeUnit.MINUTES)) {
                orderService.process(order);
                redisTemplate.delete(redisKey);
                channel.basicAck(tag, false);
                log.info("?? 訂單處理成功:{}", order.getOrderNo());
            } else {
                log.warn("?? 訂單正在處理中,直接ACK:{}", order.getOrderNo());
                channel.basicAck(tag, false);
            }
        } catch (Exception e) {
            log.error("? 訂單處理失敗:{}", order.getOrderNo(), e);
            channel.basicNack(tag, false, true);
        }
    }
}

全鏈路監(jiān)控(生產(chǎn)環(huán)境推薦)

  1. 消息追蹤記錄消息流轉(zhuǎn)狀態(tài)
  2. 死信隊列避免消息無限重試
  3. 消息監(jiān)控Grafana監(jiān)控消息積壓、ACK率、重試次數(shù)
@Bean
public Queue orderQueue() {
    return QueueBuilder.durable("order.queue")
            .deadLetterExchange("dlx.exchange")
            .deadLetterRoutingKey("dlx.order")
            .build();
}

核心配置一覽:

配置項

作用

publisher-confirm-type

確認消息到達Exchange

publisher-returns

監(jiān)聽未路由消息

隊列持久化

確保消息重啟后不丟失

acknowledge-mode=manual

關(guān)閉自動ACK,使用手動確認

delivery-mode=PERSISTENT

確保消息持久化

結(jié)論

在高并發(fā)分布式系統(tǒng)中,RabbitMQ 消息可靠性 直接關(guān)系到業(yè)務數(shù)據(jù)的完整性和一致性。本篇文章介紹了 Spring Boot 3.4 下 RabbitMQ 100% 可靠消息傳輸方案,核心思路包括:

  1. 生產(chǎn)者端采用 Confirm 機制 + Return 回調(diào),確保消息成功到達 RabbitMQ;
  2. RabbitMQ 服務器端開啟 隊列、交換機持久化,防止消息因宕機丟失;
  3. 消費者端使用 手動 ACK + 冪等控制,確保消息被正確消費。

此外,我們還提到 死信隊列 及 消息軌跡監(jiān)控 作為生產(chǎn)級增強方案,進一步提升系統(tǒng)穩(wěn)定性。如果你在生產(chǎn)環(huán)境中使用 RabbitMQ,建議你根據(jù)本篇內(nèi)容進行配置,確保消息 零丟失、零誤判,構(gòu)建更加健壯的消息隊列架構(gòu)。

責任編輯:武曉燕 來源: 路條編程
相關(guān)推薦

2022-07-27 18:34:32

RabbitMQ宕機服務器

2025-05-29 01:33:00

微服務架構(gòu)系統(tǒng)

2024-08-12 12:17:03

2020-10-14 08:36:10

RabbitMQ消息

2024-05-09 08:04:23

RabbitMQ消息可靠性

2021-09-15 09:02:20

Spring 6Spring BootJava

2021-09-03 06:46:34

Spring 6pring Boot 項目

2022-09-23 13:57:11

xxl-job任務調(diào)度中間件

2020-06-24 09:35:50

SpringSpring BooJava

2025-04-27 03:00:00

Spring集成測試

2025-04-30 07:43:21

2023-03-06 08:16:04

SpringRabbitMQ

2021-09-16 10:29:05

開發(fā)技能代碼

2022-08-29 18:14:55

MQ數(shù)據(jù)不丟失

2024-10-11 11:32:22

Spring6RSocket服務

2025-05-13 07:13:25

2021-08-10 09:59:15

RabbitMQ消息微服務

2024-07-03 11:33:02

2025-06-12 03:10:00

2024-01-30 08:01:15

RabbitMQ業(yè)務邏輯應用場景
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 三级在线视频 | 免费九九视频 | 国产精品一区二区av | 成人午夜免费在线视频 | 亚洲人在线 | 免费的日批视频 | 乱一性一乱一交一视频a∨ 色爱av | 国产精产国品一二三产区视频 | 中文字幕99 | 国产激情视频网 | 亚洲一区二区免费 | 97色在线视频 | 日日夜夜免费精品 | 免费一级片| 亚洲一区二区视频在线播放 | 成人久久久 | 久在线 | 天堂久久网| 中文在线一区二区 | 99热在线观看精品 | 精品日韩一区二区 | 国产美女网站 | 国产精品久久久久久久久久不蜜臀 | 精品久久久久久久久久久久久久久久久 | 久久www免费视频 | 欧美性video 精品亚洲一区二区 | 久久精品国产一区 | 自拍 亚洲 欧美 老师 丝袜 | 久热免费在线 | 亚洲国产欧美一区 | 欧美成人精品一区二区三区 | 蜜桃色网| 国产免费观看视频 | 国产探花在线观看视频 | 人人干人人草 | 天堂久久av| 国产精品地址 | 亚洲精品456 | 99久久99热这里只有精品 | 日韩欧美亚洲 | 精品久久99 |