消息隊列實現 Exactly Once,看 Pulsar 是怎樣實現的
大家好 ,我是君哥。
在使用消息隊列時,我們希望消息能夠精準推送(Exactly Once),不會丟失、也不會重復。Exactly Once 其實是很難實現的,Pulsar 這款消息中間件使用事務消息實現了 Exactly Once,今天就帶大家了解一下。
1.一個場景
為什么需要 Exactly Once 呢?下面我們看一個轉賬場景。
客戶從轉賬 APP 上操作,從 A 賬戶向 B 賬戶轉賬 100 元,但是 B 賬戶增加金額后,給 Broker 返回 ACK 失敗,導致 Broker 再次給賬戶 B 推送增加金額的消息,導致賬戶 B 增加了兩次,最終導致金額不一致。
當然,賬戶 B 通過消費者冪等可以避免這個問題,但如果是生產者重復發送導致 Broker 保存了兩條消息呢?
2.Pulsar 去重
通過消息去重可以解決上面的消息重復問題嗎?我們看一下 Pulsar 的去重機制。
Producer 發送消息時,消息體帶一個 sequenceId 字段,這個字段在同一個 Producer 內是嚴格遞增的。Broker 通過<ProducerName, sequenceId> 來記錄每一個 Producer 的最大 sequenceId。如果 Broker 收到 Producer 的消息小于等于保存的當前 Producer 的 sequenceId,說明是重復消息,直接返回失敗。
消息去重從一定程度上可以避免消息重復,但是只能保證在 Topic-Partition 這個維度進行去重,如果一個 Topic 對應多個 Partition,如下圖:
Producer 發送消息后,Broker1 保存成功,但是沒有返回 ack,Producer 把消息重新發送到了 Broker2,最終導致 Consumer 收到 2 條消息。
3.事務消息
Pusar 的事務消息不僅可以解決上面的去重問題,還可以解決一些復雜場景。比如下面這個場景:
Consumer 從 Topic1 的兩個 Partition 中各消費一條消息后,做加工計算(重復消費會影響加工結果),然后把結果分別發送到 Topic2 的兩個 Partition 中。這個復雜的事務,要保證消息既不會重復也不會丟失,僅僅靠去重,就很難實現了。Pulsar 參考了分布式事務的主流實現,支持了消息的分布式事務。
Pulsar 的事務模型能保證生產和消費都能精確一次,即使 Broker 宕機,也不會處理失敗。
同時,Pulsar 事務消息支持更復雜的場景,比如:
- 生產者在一個事務中分別發送一條消息到不同 Partition,要不同時成功,要不同時失?。?/span>
- 消費者從不同 Partition 消費多條消息,要不全部成功,要不全部失敗;
- 上面兩個場景的組合,見上面的圖。
那 Pulsar 的事務消息是怎么實現的呢?Pulsar 參考了分布式事務的實現方式,我們再回顧一下分布式事務的三個角色:
- TC: 事務協調器,管理全局事務和分支事務的狀態,Pulsar 會選擇 Topic 中 Partition 所在的一個 Broker 作為 TC;
- TM:管理全局事務,包括開啟全局事務,提交/回滾全局事務。Pulsar 使用
pulsarClient.newTransaction()
開啟一個事務,這會向 TC 注冊全局事務并且獲得全局事務 ID(TCID)。 - RM:管理分支事務。
下圖,我們把上面復雜的事務用分布式事務來實現:
說明幾點:
- Producer1 既是生產者也是 TM;
- Broker1 既是 TC 也是 RM;
- Producer 和 Consumer 的事務分開來管理。上圖中只是畫出了生產者的事務提交,消費者類似;
- 我們知道,分布式事務的實現模式一般包括 AT、TCC、SAGA 和 XA,那 Pulsar 的實現模式是哪一種呢?對于 Producer 和 Consumer,情況不一樣。
對于 Producer 的事務消息,更像是 AT 模式,消息直接發送給 Broker 并持久化,不過持久化之前會在 TopicTransactionBuffer 中記錄元數據(類似 AT 模式中的回滾日志),全局事務回滾時可以使用這些元數據回滾消息。當然回滾消息并不是刪除消息,而是讓消息不被消費到,具體做法是在回滾的事務會被打上 Aborted 標簽,根據這個標簽來決定消息不會推送給 Consumer。
對于 Consumer 的事務消息,我個人覺得有點參考 XA 模式,不過這里沒有數據源代理,而是用了消息緩存,這里緩存的不是消息本身,而是消費者的 ack 消息。也就是說消費者消費完成后并沒有直接發送 ack 給 Broker,而是先發送到 pendingAckSore 做緩存,在提交全局事務時才會真正地提交 ack 消息。
- 全局事務沒有提交之前,消息可能會被消費到嗎?不會,每個 Topic 都會記錄自己的 maxReadPosition 屬性,標識消費者可以從 Broker 拉取消息的最大位置,分布式事務提交全局事務之前,maxReadPosition 是不變的,所有未提交全局事務的消息不可能被消費到。但這里也會有一個隱患,那就是阻塞普通消息的消費,在當前事務提交之前,普通消息即使發送成功了,消費者也拉取不到。
4.總結
Pulsar 使用事務消息實現了 Exactly Once 這個消息投遞的最高要求。從上面的講解看,事務消息的實現還是比較復雜的,不過從 Producer 和 Consumer 端分開實現這個角度看 ,更容易理解一些。
最后,一起思考一個極端場景,如果分布式事務中有兩個消費者,一個消費者消費成功并且發送 ack,另一個消費者因為代碼問題消費失敗并且沒有回復 ack,最終全局事務因為超時而做回滾,那第一個消費者已經消費,這還能保證全局一致嗎?當然不能,除非消費者消費邏輯也加入這個全局事務。
消息隊列的分布式事務一直是一個復雜的話題,分布式事務的設計思想也非常值得我們借鑒學習。但無論使用哪個中間件,消費端冪等是保障業務正確性的底線,最靠譜的方式還是從業務代碼層面來保證冪等。