一文讀懂kafka的冪等生產者
本文轉載自微信公眾號「明哥的IT隨筆」,作者 IT明哥 。轉載本文請聯系明哥的IT隨筆公眾號。
1 前言
大家好,我是明哥!
KAFKA 作為開源分布式事件流平臺,在大數據和微服務領域都有著廣泛的應用場景,是實時流處理場景下消息隊列事實上的標準。用一句話概括,KAFKA 是實時數倉的基石,是事件驅動架構的靈魂。
但是一些技術小伙伴,尤其是一些很早就開始使用 KAFKA 的技術小伙伴們,對 KAFKA 的發展趨勢和一些新特性,并不太熟悉,在使用過程中也踩了不少坑。
有鑒于此,我們接下來會有一個 KAFKA 系列文章,專門講述 KAFKA 的這些新特性。
本文是該系列文章之一,講述 KAFAK 的冪等生產者。
以下是正文。
2 從歷史視角看 KAFKA 的發展
首先我們從歷史視角,看下 KAFKA 的發展:
- KAFKA 在2013年12月推出了一個重要的版本 0.8.0,該版本相當重要,因為它通過 KAFKA-50 首次引進了多副本機制,為容錯打下了堅實的基礎;
- 然后在后續版本中逐步增添了很多新的功能特性:
- 如逐步擺脫對 zookeeper的依賴;
- 如支持 compact 清理策略;
- 如支持 kafka tired storage;
- 如生產者冪等性;
- 如對事務的支持;
- 如大的 kafka 生態的 kafka connect api, kafka stream api 以及 KSQL, 還有 kafka schema registry;
- 到目前為止(202109),KAFKA 最新的穩定版已經演進到了 2.8.0;
- KAFKA 已經從最開始僅僅作為一個高吞吐的消息中間件,發展到了如今實時流處理場景下消息隊列事實上的標準,用一句話概括,KAFKA 是實時數倉的基石,是事件驅動架構的靈魂。
- 但是如今在市面上生產環境中,還不乏有使用早期版本如 0.8.0 版本的情況。
kafka-timeline
kafka-api
3 什么是冪等生產者?
我們知道,當 kafka producer 向 broker 中的 topic發送數據時,可能會因為網絡抖動等各種原因,造成 producer 收不到 broker 的 ack 確認信息。此時 producer 有兩種選擇:
producer 可以選擇忽略沒有收到 ack 確認消息,不做任何進一步處理:此時有可能會丟失消息。(之所以說有可能,是因為消息有可能沒有寫到 broker 的topic 中,但也有可能已經正確地寫到了 broker 的 topic 中,只是回調的 ack 消息因網絡抖動 producer 沒有收到;)
producer 也可以選擇多次嘗試重發消息,直到收到ack 確認消息或重試最大次數到達: 此時有可能會造成消息的重復寫,即 broker 端的 topic 中,重復地存儲了重試發送的這些消息;
producer 重發沒有收到 ack 確認的消息, 也可能會造成 broker 端 topic 的 partition 中 消息的順序混亂,即因失敗重發的消息在部分沒有失敗不需要重發的消息之后。
因 producer 重發沒有收到 ack 確認的消息造成數據重復的問題,可以參見如下示意圖,圖中 message 7/8/9/10 即為重復的消息。
producer-resend-failure
KAFKA 的冪等生產者即 idempotent producer,就是解決上述問題的:它可以確保消息被正確地投遞到 broker端,不會丟失沒有重復,而且是以正確的順序存儲在 topic 的各個 partition 中。
4 如何啟用冪等生產者?
- 啟用冪等生產者,不涉及任何代碼層面的改動,只涉及以下配置項的更改:
- enable.idempotence=true;//冪等生產者功能開關
- message.send.max.retries=xx //發送失敗重試次數,可以配置很大比如10000000,甚至Integer.MAX_VALUE;
- max.in.flight.requests.per.connection=xx //xx <= 5, 代表每個連接中在途請求次數,有的博文說該參數必須配置為=1,其實不然,只需要<=5即可(max.in.flight must be set <= 5 when enable.idempotence is true");
- Acks=All //ACK 確認參數,可選 0/1/-1/ALL,-1 與 ALL 等價。在開啟冪等生產者功能時,該參數必須配置為ALL/-1,即所有 ISR 都要確認收到了消息,才認為消息投遞成功(acks must be set to all when enable.idempotence is true");
- 在開啟冪等生產者即 enable.idempotence=true 的情況下,也可以不配置參數 max.in.flight.requests.per.connection 和參數 Acks,此時這兩個參數會被自動配置;
5 冪等生產者的原理是什么?
首先需要說明下,在啟用冪等生產者的情況下,消息失敗時的重新發送,是由 kafka client 自動實現的,對我們來講是透明的,我們不需要在代碼中重試發送。(事實上,在代碼中重試消息發送,反而會引起消息重復).
其內部工作原理如下:
- 在 producer 端,每個 producer 都被 broker 自動分配了一個 Producer Id (PID), producer 向 broker 發送的每條消息,在內部都附帶著該 pid 和一個遞增的 sequence number;
- 在 broker 端,broker 為每個 topic 的每個 partition 都維護了一個當前寫成功的消息的最大 PID-Sequence Number 元組;
- 當 broker 收到一個比當前最大 PID-Sequence Number 元組小的 sequence number 消息時,就會丟棄該消息,以避免造成數據重復存儲;
- 當 broker 失敗重新選舉新的 leader 時, 以上去重機制仍然有效:因為 broker 的 topic 中存儲的消息體中附帶了 PID-sequence number 信息,且 leader 的所有消息都會被復制到 followers 中。當某個原來的 follower 被選舉為新的 leader 時,它內部的消息中已經存儲了PID-sequence number 信息,也就可以執行消息去重了。
- 冪等生產者,在 broker 端去重的工作原理,如下圖所示:圖片
6 冪等生產者與事務有何關系?
冪等生產者是 kafka 事務的必要不充分條件,即:
開啟冪等生長者,不一定需要開啟事務;
開始 kafka 事務,必須要開啟冪等生產者;
事實上,開啟 kafka事務時,kafka 會自動開啟冪等生產者。