RabbitMQ是如何確定消息是否投遞到隊(duì)列中的
1. 前言
在使用RabbitMQ消息中間件時(shí),因?yàn)橄⒌耐哆f是異步的,默認(rèn)情況下,RabbitMQ會(huì)刪除那些無(wú)法路由的消息。為了能夠檢出消息是否順利投遞到隊(duì)列,我們需要相應(yīng)的處理機(jī)制。今天就來(lái)驗(yàn)證一下相關(guān)的驗(yàn)證機(jī)制。
2. 消息投遞失敗
那么哪些情況消息會(huì)投遞失敗呢?RabbitMQ消息會(huì)先到達(dá)指定的交換機(jī),然后由交換機(jī)路由到對(duì)應(yīng)的隊(duì)列。所以以下幾種情況會(huì)導(dǎo)致消息投遞失敗。
- 投遞的交換機(jī)不可用。
- 投遞的交換機(jī)可用,但是沒(méi)有匹配到隊(duì)列。
3. 投遞失敗的處理機(jī)制
對(duì)應(yīng)上面的兩種情況,RabbitMQ提供了對(duì)應(yīng)的解決方案。
ConfirmCallback
RabbitMQ提供了ConfirmCallback接口用于實(shí)現(xiàn)消息發(fā)送到RabbitMQ交換器后進(jìn)行確認(rèn)回調(diào)。
在Spring Boot中需要開(kāi)啟:
- spring:
- rabbitmq:
- # 通常選擇 correlated
- publisher-confirm-type:
通常有三種選擇:
- NONE ,禁用發(fā)布確認(rèn)模式,是默認(rèn)值。
- CORRELATED,發(fā)布消息時(shí)會(huì)攜帶一個(gè)CorrelationData,被ack/nack時(shí)CorrelationData會(huì)被返回進(jìn)行對(duì)照處理,CorrelationData可以包含比較豐富的元信息進(jìn)行回調(diào)邏輯的處理。
- SIMPLE,當(dāng)被ack/nack后會(huì)等待所有消息被發(fā)布,如果超時(shí)會(huì)觸發(fā)異常,甚至關(guān)閉連接通道。
這里我使用CORRELATED模式,聲明一個(gè)ConfirmCallback并設(shè)置到RabbitTemplate中
- rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- // correlationData 可能為空
- if (ack) {
- log.debug("消息發(fā)送到exchange成功,id: {}", correlationData.getId());
- } else {
- log.debug("消息發(fā)送到exchange失敗,原因: {}", cause);
- }
- });
當(dāng)消息投遞到一個(gè)不存在的交換機(jī)Exchange且ack=false時(shí)會(huì)輸出日志:
- - Publishing message [(Body:'"hello"' MessageProperties [headers={spring_listener_return_correlation=a088eb3f-a234-4e15-bb7a-3aa9a6f043e6, spring_returned_message_correlation=29975bc1-f363-4e3a-85ca-010d13888720, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=7, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [DIRECT_EXCHANGE1], routingKey = [DIRECT_ROUTING_KEY2]
- - 消息發(fā)送到exchange失敗,原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXCHANGE1' in vhost 'my_vhost', class-id=60, method-id=40)
這里實(shí)現(xiàn)的比較簡(jiǎn)單你可以增加一些消息投遞到交換機(jī)失敗后的操作處理邏輯。
ReturnCallback
ReturnCallback接口用于實(shí)現(xiàn)消息已經(jīng)成功發(fā)送到RabbitMQ交換機(jī),但沒(méi)有匹配到隊(duì)列時(shí)的回調(diào)。
在Spring Boot中需要同時(shí)開(kāi)啟:
- spring:
- rabbitmq:
- publisher-returns: true
- template:
- mandatory: true
RabbitTemplate中的mandatory設(shè)置值優(yōu)先級(jí)要高一些。
我們聲明一個(gè)ReturnCallback并設(shè)置到RabbitTemplate中
- rabbitTemplate.setMandatory(true);
- rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
- String correlationId = message.getMessageProperties()
- .getHeader(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY);
- log.debug("消息:{} 發(fā)送失敗, 應(yīng)答碼:{} 原因:{} 交換機(jī): {} 路由鍵: {}", correlationId,
- replyCode, replyText, exchange, routingKey);
- });
當(dāng)消息成功投遞到交換機(jī)但是無(wú)法匹配到隊(duì)列時(shí):
- - Publishing message [(Body:'"hello"' MessageProperties [headers={spring_listener_return_correlation=155648bd-fc3e-4c8b-a650-7b1ce720c7a6, spring_returned_message_correlation=7029ee49-357a-42fc-8532-dc41b4bb8e87, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=7, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [DIRECT_EXCHANGE], routingKey = [DIRECT_ROUTING_KEY2]
- - 消息:7029ee49-357a-42fc-8532-dc41b4bb8e87 發(fā)送失敗, 應(yīng)答碼:312 原因:NO_ROUTE 交換機(jī): DIRECT_EXCHANGE 路由鍵: DIRECT_ROUTING_KEY2
- - 消息發(fā)送到exchange成功,id: 7029ee49-357a-42fc-8532-dc41b4bb8e87
從上面我們也可以看出ReturnCallback只處理投遞到隊(duì)列失敗的情況,并不像ConfirmCallback既能處理失敗的情況也能處理成功的情況。
4. 總結(jié)
消息投遞失敗的處理在使用RabbitMQ的使用中時(shí)非常必要的,能夠幫助我們追蹤消息的投遞情況,以及處理消息投遞異常或者成功后的邏輯處理,為消息丟失進(jìn)行一些兜底或者記錄。但是請(qǐng)注意這個(gè)并不是發(fā)生在消費(fèi)階段,是否成功消費(fèi)并不是由這兩種回調(diào)來(lái)處理,我們有空再對(duì)消息的消費(fèi)確認(rèn)進(jìn)行講解。
本文轉(zhuǎn)載自微信公眾號(hào)「碼農(nóng)小胖哥」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系碼農(nóng)小胖哥公眾號(hào)。