阿里二面:要保證消息不丟失,又不重復,消息隊列怎么選型?
大家好,我是君哥。
在使用消息隊列時,有兩個經常讓我們煩惱的問題,消息丟失和消息重復。那我們在做技術選型時,有沒有一個消息隊列能解決消息丟失和消息重復這兩個問題呢?
消息丟失
如上圖,從生產者發(fā)送消息,Broker 保存消息,消費者消費消息,每一個環(huán)節(jié)都有可能丟失消息。
發(fā)送丟失
生產者發(fā)送消息時,如果處理不當,很可能會造成消息丟失。
生產者發(fā)送消息,主流消息隊列都支持同步發(fā)送和異步發(fā)送。如果使用同步發(fā)送,生產者發(fā)送消息后,會同步等待 Broker 返回的 ACK,收到 ACK 消息,就認為消息發(fā)送成功。如果長時間沒有收到,則會認為消息發(fā)送失敗,需要進行重試。
同步發(fā)送可以保證消息不丟失,但是會有性能問題,所以多數情況會選擇異步發(fā)送。異步發(fā)送如何保證消息不丟失呢?主流消息隊列(比如 Kafka 和 RocketMQ)實現方法基本類似,使用回調函數來實現。下面看一下 Kafka 的異步發(fā)送代碼:
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("發(fā)送消息失敗:", exception);
}
if (metadata != null) {
logger.info("消息發(fā)送成功");
}
}
});
消息存儲
生產者發(fā)送消息成功,也不能保證消息絕對不丟失。因為即使消息發(fā)送到 Broker,如果在消費者拉取到消息之前,Broker 宕機了,消息還沒有落盤,也會導致消息丟失。
在存儲階段要保證消息不丟失,可以考慮幾個方面:
同步刷盤
采用異步刷盤,如果在消息落盤之前 Broker 宕機了,就會造成消息丟失。而采用同步刷盤,等待消息落盤之后,再給 Sender 返回發(fā)送成功,可以從消息發(fā)送環(huán)節(jié)保證消息不丟失。
在 RocketMQ 中,把 flushDiskType 參數配置為 SYNC_FLUSH 就可以開啟同步刷盤。
Broker 集群
如果 Broker 集群中只有一個節(jié)點,即使消息落盤成功了,Broker 發(fā)送故障,在 Broker 恢復以前消費者也會拉取不到消息。而且如果 Broker 磁盤故障不可恢復,消息也會丟失。
采用 Broker 集群可以很好地解決這個問題。見下圖:
在 Broker 集群時,可以等待 2 個以上的節(jié)點同步消息完成后再給 Producer 返回成功。這樣即使一個 Broker 掛了,也可以很容易找到替代的 Broker。
消息消費
消費者保證不丟失消息,需要消費完成后再給 Broker 返回 ACK。在主流的消息隊列中,如果 Broker 收不到 ACK,都會給消費者再次發(fā)送這條消息。
有時候為了解決消息積壓的問題,消費者拉取到消息后會直接返回 ACK,然后再異步執(zhí)行消息處理邏輯。這樣要保證消息不丟失,需要在返回 ACK 之前把消息保存到本地,比如持久化到數據庫,后面可以取數據庫保存的消息進行處理。
消息重復
消息重復一般有兩個原因,一個是生產者發(fā)送消息后沒有收到 ACK,然后進行重復發(fā)送,另一個原因是消費者消費完成后 Broker 沒有收到 ACK,導致消息重復推送給消費者。
重復消息會對業(yè)務造成影響,比如電商場景中的重復支付、賬務場景中的重復記賬,對業(yè)務造成的影響都比較嚴重。
從目前主流的消息隊列來看,并沒有一個消息隊列能解決消息重復消費的問題,只能在消費端做冪等處理。下面提供幾個思路作為參考。
數據庫唯一鍵約束
如果消息會落本地數據庫,可以采用消息 ID 作為唯一鍵。如果消息不落數據庫,可以將消息 ID 或者消息中其他唯一能標識消息的屬性作為唯一鍵落業(yè)務數據表。
保存消費記錄
我們也可以將消息 ID 保存 Redis,消費消息前判斷消息 ID 是否已存在。
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
Boolean result = valueOperations.setIfAbsent(messageId, messageId);
if (result) {
//消費邏輯;
} else {
logger.error("這條消息已經消費,跳過,消息ID:{}", messageId);
}
這里有一個注意點,如果消費失敗了,需要刪除 Redis 中保存的消息 ID。
總結
消息不丟失、不重復是消息隊列的基本要求,但這個基本要求還是很難滿足的。
消息丟失這個要求,主流消息隊列通過消息重試和消息持久化的方式可以滿足。
但消息重試也同時帶來了消息重復的可能性,主流消息隊列在解決重復消息的問題上并沒有現成的方案,對不允許重復消費的場景,需要開發(fā)人員在消費端做冪等處理。