RabbitMQ如何保證消息的可靠投遞?
Spring Boot整合RabbitMQ
github地址:
https://github.com/erlieStar/rabbitmq-examples
Spring有三種配置方式
- 基于XML
- 基于JavaConfig
- 基于注解
當然現在已經很少使用XML來做配置了,只介紹一下用JavaConfig和注解的配置方式
RabbitMQ整合Spring Boot,我們只需要增加對應的starter即可
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
基于注解
在application.yaml的配置如下
- spring:
- rabbitmq:
- host: myhost
- port: 5672
- username: guest
- password: guest
- virtual-host: /
- log:
- exchange: log.exchange
- info:
- queue: info.log.queue
- binding-key: info.log.key
- error:
- queue: error.log.queue
- binding-key: error.log.key
- all:
- queue: all.log.queue
- binding-key: '*.log.key'
消費者代碼如下
- @Slf4j
- @Component
- public class LogReceiverListener {
- /**
- * 接收info級別的日志
- */
- @RabbitListener(
- bindings = @QueueBinding(
- value = @Queue(value = "${log.info.queue}", durable = "true"),
- exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
- key = "${log.info.binding-key}"
- )
- )
- public void infoLog(Message message) {
- String msg = new String(message.getBody());
- log.info("infoLogQueue 收到的消息為: {}", msg);
- }
- /**
- * 接收所有的日志
- */
- @RabbitListener(
- bindings = @QueueBinding(
- value = @Queue(value = "${log.all.queue}", durable = "true"),
- exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
- key = "${log.all.binding-key}"
- )
- )
- public void allLog(Message message) {
- String msg = new String(message.getBody());
- log.info("allLogQueue 收到的消息為: {}", msg);
- }
- }
生產者如下
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class MsgProducerTest {
- @Autowired
- private AmqpTemplate amqpTemplate;
- @Value("${log.exchange}")
- private String exchange;
- @Value("${log.info.binding-key}")
- private String routingKey;
- @SneakyThrows
- @Test
- public void sendMsg() {
- for (int i = 0; i < 5; i++) {
- String message = "this is info message " + i;
- amqpTemplate.convertAndSend(exchange, routingKey, message);
- }
- System.in.read();
- }
- }
Spring Boot針對消息ack的方式和原生api針對消息ack的方式有點不同
原生api消息ack的方式
消息的確認方式有2種
自動確認(autoAck=true)
手動確認(autoAck=false)
消費者在消費消息的時候,可以指定autoAck參數
String basicConsume(String queue, boolean autoAck, Consumer callback)
autoAck=false: RabbitMQ會等待消費者顯示回復確認消息后才從內存(或者磁盤)中移出消息
autoAck=true: RabbitMQ會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者是否真正的消費了這些消息
手動確認的方法如下,有2個參數
basicAck(long deliveryTag, boolean multiple)
deliveryTag: 用來標識信道中投遞的消息。RabbitMQ 推送消息給Consumer時,會附帶一個deliveryTag,以便Consumer可以在消息確認時告訴RabbitMQ到底是哪條消息被確認了。
RabbitMQ保證在每個信道中,每條消息的deliveryTag從1開始遞增
multiple=true: 消息id<=deliveryTag的消息,都會被確認
myltiple=false: 消息id=deliveryTag的消息,都會被確認
消息一直不確認會發生啥?
如果隊列中的消息發送到消費者后,消費者不對消息進行確認,那么消息會一直留在隊列中,直到確認才會刪除。
如果發送到A消費者的消息一直不確認,只有等到A消費者與rabbitmq的連接中斷,rabbitmq才會考慮將A消費者未確認的消息重新投遞給另一個消費者
Spring Boot中針對消息ack的方式
有三種方式,定義在AcknowledgeMode枚舉類中
方式 | 解釋 |
---|---|
NONE | 沒有ack,等價于原生api中的autoAck=true |
MANUAL | 用戶需要手動發送ack或者nack |
AUTO | 方法正常結束,spring boot 框架返回ack,發生異常spring boot框架返回nack |
spring boot針對消息默認的ack的方式為AUTO。
在實際場景中,我們一般都是手動ack。
application.yaml的配置改為如下
- spring:
- rabbitmq:
- host: myhost
- port: 5672
- username: guest
- password: guest
- virtual-host: /
- listener:
- simple:
- acknowledge-mode: manual # 手動ack,默認為auto
相應的消費者代碼改為
- @Slf4j
- @Component
- public class LogListenerManual {
- /**
- * 接收info級別的日志
- */
- @RabbitListener(
- bindings = @QueueBinding(
- value = @Queue(value = "${log.info.queue}", durable = "true"),
- exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
- key = "${log.info.binding-key}"
- )
- )
- public void infoLog(Message message, Channel channel) throws Exception {
- String msg = new String(message.getBody());
- log.info("infoLogQueue 收到的消息為: {}", msg);
- try {
- // 這里寫各種業務邏輯
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- } catch (Exception e) {
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
- }
- }
- }
我們上面用到的注解,作用如下
注解 | 作用 |
---|---|
RabbitListener | 消費消息,可以定義在類上,方法上,當定義在類上時需要和RabbitHandler配合使用 |
QueueBinding | 定義綁定關系 |
Queue | 定義隊列 |
Exchange | 定義交換機 |
RabbitHandler | RabbitListener定義在類上時,需要用RabbitHandler指定處理的方法 |
基于JavaConfig
既然用注解這么方便,為啥還需要JavaConfig的方式呢?
JavaConfig方便自定義各種屬性,比如同時配置多個virtual host等
具體代碼看GitHub把
RabbitMQ如何保證消息的可靠投遞
一個消息往往會經歷如下幾個階段
在這里插入圖片描述
所以要保證消息的可靠投遞,只需要保證這3個階段的可靠投遞即可
生產階段
這個階段的可靠投遞主要靠ConfirmListener(發布者確認)和ReturnListener(失敗通知)
前面已經介紹過了,一條消息在RabbitMQ中的流轉過程為
producer -> rabbitmq broker cluster -> exchange -> queue -> consumer
ConfirmListener可以獲取消息是否從producer發送到broker
ReturnListener可以獲取從exchange路由不到queue的消息
我用Spring Boot Starter 的api來演示一下效果
application.yaml
- spring:
- rabbitmq:
- host: myhost
- port: 5672
- username: guest
- password: guest
- virtual-host: /
- listener:
- simple:
- acknowledge-mode: manual # 手動ack,默認為auto
- log:
- exchange: log.exchange
- info:
- queue: info.log.queue
- binding-key: info.log.key
發布者確認回調
- @Component
- public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
- @Autowired
- private MessageSender messageSender;
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- String msgId = correlationData.getId();
- String msg = messageSender.dequeueUnAckMsg(msgId);
- if (ack) {
- System.out.println(String.format("消息 {%s} 成功發送給mq", msg));
- } else {
- // 可以加一些重試的邏輯
- System.out.println(String.format("消息 {%s} 發送mq失敗", msg));
- }
- }
- }
失敗通知回調
- @Component
- public class ReturnCallback implements RabbitTemplate.ReturnCallback {
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- String msg = new String(message.getBody());
- System.out.println(String.format("消息 {%s} 不能被正確路由,routingKey為 {%s}", msg, routingKey));
- }
- }
- @Configuration
- public class RabbitMqConfig {
- @Bean
- public ConnectionFactory connectionFactory(
- @Value("${spring.rabbitmq.host}") String host,
- @Value("${spring.rabbitmq.port}") int port,
- @Value("${spring.rabbitmq.username}") String username,
- @Value("${spring.rabbitmq.password}") String password,
- @Value("${spring.rabbitmq.virtual-host}") String vhost) {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
- connectionFactory.setPort(port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- connectionFactory.setVirtualHost(vhost);
- connectionFactory.setPublisherConfirms(true);
- connectionFactory.setPublisherReturns(true);
- return connectionFactory;
- }
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
- ReturnCallback returnCallback, ConfirmCallback confirmCallback) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setReturnCallback(returnCallback);
- rabbitTemplate.setConfirmCallback(confirmCallback);
- // 要想使 returnCallback 生效,必須設置為true
- rabbitTemplate.setMandatory(true);
- return rabbitTemplate;
- }
- }
這里我對RabbitTemplate做了一下包裝,主要就是發送的時候增加消息id,并且保存消息id和消息的對應關系,因為RabbitTemplate.ConfirmCallback只能拿到消息id,并不能拿到消息內容,所以需要我們自己保存這種映射關系。在一些可靠性要求比較高的系統中,你可以將這種映射關系存到數據庫中,成功發送刪除映射關系,失敗則一直發送
- @Component
- public class MessageSender {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public final Map<String, String> unAckMsgQueue = new ConcurrentHashMap<>();
- public void convertAndSend(String exchange, String routingKey, String message) {
- String msgId = UUID.randomUUID().toString();
- CorrelationData correlationData = new CorrelationData();
- correlationData.setId(msgId);
- rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
- unAckMsgQueue.put(msgId, message);
- }
- public String dequeueUnAckMsg(String msgId) {
- return unAckMsgQueue.remove(msgId);
- }
- }
測試代碼為
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class MsgProducerTest {
- @Autowired
- private MessageSender messageSender;
- @Value("${log.exchange}")
- private String exchange;
- @Value("${log.info.binding-key}")
- private String routingKey;
- /**
- * 測試失敗通知
- */
- @SneakyThrows
- @Test
- public void sendErrorMsg() {
- for (int i = 0; i < 3; i++) {
- String message = "this is error message " + i;
- messageSender.convertAndSend(exchange, "test", message);
- }
- System.in.read();
- }
- /**
- * 測試發布者確認
- */
- @SneakyThrows
- @Test
- public void sendInfoMsg() {
- for (int i = 0; i < 3; i++) {
- String message = "this is info message " + i;
- messageSender.convertAndSend(exchange, routingKey, message);
- }
- System.in.read();
- }
- }
先來測試失敗者通知
輸出為
- 消息 {this is error message 0} 不能被正確路由,routingKey為 {test}
- 消息 {this is error message 0} 成功發送給mq
- 消息 {this is error message 2} 不能被正確路由,routingKey為 {test}
- 消息 {this is error message 2} 成功發送給mq
- 消息 {this is error message 1} 不能被正確路由,routingKey為 {test}
- 消息 {this is error message 1} 成功發送給mq
消息都成功發送到broker,但是并沒有被路由到queue中
再來測試發布者確認
輸出為
- 消息 {this is info message 0} 成功發送給mq
- infoLogQueue 收到的消息為: {this is info message 0}
- infoLogQueue 收到的消息為: {this is info message 1}
- 消息 {this is info message 1} 成功發送給mq
- infoLogQueue 收到的消息為: {this is info message 2}
- 消息 {this is info message 2} 成功發送給mq
消息都成功發送到broker,也成功被路由到queue中
存儲階段
這個階段的高可用還真沒研究過,畢竟集群都是運維搭建的,后續有時間的話會把這快的內容補充一下
消費階段
消費階段的可靠投遞主要靠ack來保證。
總而言之,在生產環境中,我們一般都是單條手動ack,消費失敗后不會重新入隊(因為很大概率還會再次失敗),而是將消息重新投遞到死信隊列,方便以后排查問題
總結一下各種情況
- ack后消息從broker中刪除
- nack或者reject后,分為如下2種情況
(1) reque=true,則消息會被重新放入隊列
(2) reque=fasle,消息會被直接丟棄,如果指定了死信隊列的話,會被投遞到死信隊列
本文轉載自微信公眾號「Java識堂」,可以通過以下二維碼關注。轉載本文請聯系Java識堂公眾號。