阿里二面:使用消息隊(duì)列怎樣防止消息重復(fù)?
大家好,我是君哥。
使用消息隊(duì)列時(shí),我們經(jīng)常會(huì)遇到一個(gè)可能對(duì)業(yè)務(wù)產(chǎn)生影響的問題,消息重復(fù)。在訂單、扣款、對(duì)賬等對(duì)冪有要求的場(chǎng)景,消息重復(fù)的問題必須解決。
那怎樣應(yīng)對(duì)重復(fù)消息呢?今天來聊一聊這個(gè)話題。
1.三個(gè)語義
正確使用消息隊(duì)列,我們會(huì)考慮到消息防丟失、防重復(fù),我們介紹 3 個(gè)語義:
- At Least Once:在消息隊(duì)列中,指消息不丟失,一條消息最少被消費(fèi)一次,但是可能會(huì)有重復(fù)消費(fèi)。
- Exactly Once:在消息隊(duì)列中,消息被精準(zhǔn)消費(fèi)一次,不丟失,也不會(huì)重復(fù);
- At Most Once:在消息隊(duì)列中,消息不會(huì)被重復(fù)消費(fèi),但是可能會(huì)有消息丟失
不同的消息場(chǎng)景,需要的語義不同。比如 Exactly Once 最難實(shí)現(xiàn),一般需要引入事務(wù)消息。
不同使用場(chǎng)景,對(duì)語義的要求也不一樣。比如日志收集類的場(chǎng)景,At Most Once 就可以滿足,而支付類的場(chǎng)景則要求 Exactly Once。
2.消息重復(fù)
什么情況下會(huì)導(dǎo)致消息重復(fù)呢?
生產(chǎn)者發(fā)送消息后,Broker 保存成功,但是沒有成功給生產(chǎn)者返回 ACK,生產(chǎn)者以為消息發(fā)送失敗,重試,再次給 Broker 發(fā)送。Broker 保存了重復(fù)消息,導(dǎo)致 Consumer 多次消費(fèi)。
圖片
消費(fèi)者消費(fèi)消息后,給 Broker 返回 ACK 失敗,導(dǎo)致 Broker 沒有修改偏移量,同一條消息再次發(fā)送給消費(fèi)者,或者被消費(fèi)者拉取到。
圖片
3.生產(chǎn)者防重
有的消息中間件是支持生產(chǎn)者冥等的。比如 Kafka 從 0.11.0 版本開始引入了冪等 Producer,可以使用下面代碼開啟冪等 Producer:
Properties props = new Properties();
//省略其他代碼
//配置冪等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
//創(chuàng)建生產(chǎn)者實(shí)例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Kafka 實(shí)現(xiàn)生產(chǎn)者冪等的原理是在生產(chǎn)者引入了 Producer ID(PID)和 Sequence Number 這兩個(gè)參數(shù)。
- PID:Producer 擁有的 ID,唯一標(biāo)識(shí)一個(gè) Producer。
- Sequence Number:自增的數(shù)值,唯一標(biāo)識(shí)同一個(gè) Producer 發(fā)送到指定分區(qū)的消息 ID。
有了這兩個(gè)參數(shù),Broker 單分區(qū)就可以唯一標(biāo)識(shí)一個(gè)生產(chǎn)者發(fā)送的唯一一條消息<PID,SequenceNumber>。Broker 收到消息時(shí),如果檢查到消息的<PID,SequenceNumber>已經(jīng)存在,就不會(huì)再保留這條消息。
但冪等 Producer 只能在單分區(qū)下生效,多分區(qū)情況下是不生效的。因?yàn)槎鄠€(gè)分區(qū)之間并不能相互訪問對(duì)方的<PID,SequenceNumber>。
圖片
4.Broker 防重
Broker 如果可以防重,那對(duì)于生產(chǎn)者和消費(fèi)者來說,節(jié)省了大量的工作。下面我們看下 Pulsar 是怎樣防重的。
Broker 通過參數(shù) BrokerDeduplicationEnabled 開啟防重功能。對(duì)于 Producer 發(fā)送的重復(fù)消息,Broker 返回響應(yīng) -1:-1。
Producer 發(fā)送消息時(shí),會(huì)帶一個(gè) sequenceId 字段,Broker 會(huì)按照 ProducerName 維度記錄當(dāng)前生產(chǎn)者最大的 sequenceId(highestSequenceId)。Broker 收到消息時(shí),首先會(huì)判斷消息中的 sequenceId 是否大于自己保存的當(dāng)前生產(chǎn)者的 highestSequenceId,如果是則保存消息并更新 highestSequenceId,否則丟棄消息,并且給 Producer 返回 -1:-1。
下面是三個(gè)極端情況:
- Producer 斷開連接:這種情況下,跟 Broker 重新建立連接后,本地保存的 sequenceId 還在,只要使用 sequenceId 遞增后發(fā)送消息即可;
- Producer 宕機(jī):Producer 重啟后,緩存的 sequenceId 肯定不存在了,這時(shí)跟 Broker 重新建立連接后,Broker 會(huì)根據(jù) ProducerName 找出 highestSequenceId 發(fā)給 Producer,Producer 使用這個(gè) sequenceId 來發(fā)送消息;
- Producer 和 Broker 都宕機(jī):Broker 重啟后,可以從宕機(jī)前保存的快照中恢復(fù)各 Producer 對(duì)應(yīng)的 highestSequenceId 發(fā)送給各 Producer。但這個(gè) highestSequenceId 不一定準(zhǔn)確,因?yàn)?nbsp;Broker 宕機(jī)瞬間很有可能最新的 sequenceId 沒有來得及保存快照。
需要注意的是,跟 Kafka 的冪等 Producer 類似,Pulsar 的 Broker 冪等也只能保證 Topic/Partition 級(jí)別。
5.消費(fèi)者防重
從上面的分析可以看出,靠生產(chǎn)者防重和 Broker 防重,只能在 Topic/Partition 級(jí)別生效,這通常并不能滿足我們的需求。而為了避免消費(fèi)者重復(fù)消費(fèi)對(duì)業(yè)務(wù)造成影響,消息防重還是必要的。這就要求我們做最后一道防線,在消費(fèi)端進(jìn)行防重或冪等處理。
消費(fèi)端做防重,就不再考慮消息中間件層面的配置(比如 sequenceId),而是從消息體進(jìn)行下手。
生產(chǎn)者發(fā)送消息時(shí),給消息體賦值一個(gè)全局唯一的 ID,消費(fèi)者處理消息時(shí),根據(jù)全局唯一 ID 做防重。
比如消費(fèi)端的邏輯是保存一條訂單消息,那把唯一 ID 保存到數(shù)據(jù)庫并且加一個(gè)唯一索引,這樣根據(jù)唯一索引就可以做消息去重。
不過使用唯一索引也有缺點(diǎn):
- 如果使用 MySQL 數(shù)據(jù)庫,不能使用 Change Buffer;
- 非插入的場(chǎng)景(比如更新庫存)不能去重。
對(duì)于唯一索引的缺點(diǎn),我們可以引入 Redis 對(duì)唯一 ID 做保存,利用 setNx 判斷消息是否已經(jīng)處理過。如下圖:
圖片
if (jedis.setnx(ID, "1") == 1) {
//處理業(yè)務(wù),返回 ACK
}else {
//直接返回 返回 ACK
}
6.總結(jié)
使用消息隊(duì)列,在一些場(chǎng)景下是需要防重的。主流消息隊(duì)列提供了一些防重的能力,但并不是完全可靠的。在對(duì)重復(fù)消息敏感的場(chǎng)景下,最好是在消費(fèi)端處理消息時(shí),從業(yè)務(wù)層面進(jìn)行消息防重。