Kafka生產(chǎn)環(huán)境實戰(zhàn)經(jīng)驗深度總結(jié),讓你少走彎路
1.背景
在實際項目中接入 Kafka 已經(jīng)成為高并發(fā)系統(tǒng)的標配。然而,從簡單的“能跑”到“穩(wěn)定高效地跑”,中間有太多坑值得記錄和總結(jié)。本文結(jié)合本人在多個生產(chǎn)項目中使用 Kafka 的經(jīng)驗,圍繞以下幾個方面展開:消息丟失防范、重復(fù)消費控制、性能瓶頸優(yōu)化、集群運維策略,以及 Topic、分區(qū)、副本機制的設(shè)計要點。先來看看kafka的基礎(chǔ)架構(gòu)圖,有個整體認識:
圖片
接下我們我們就從生產(chǎn)者、服務(wù)端broker、服務(wù)端去講述下實戰(zhàn)經(jīng)驗心得。
2.生產(chǎn)者如何提高吞吐量?
下面來看看生產(chǎn)者發(fā)送一條消息到kafka服務(wù)端的流程:
圖片
在消息發(fā)送的過程中,涉及到了兩個線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個雙端隊列RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka服務(wù)端 Broker
可以適當調(diào)整以下四個生產(chǎn)者的參數(shù)來提高吞吐量:
參數(shù) | 說明 |
batch.size | 提交一批數(shù)據(jù)到緩沖區(qū)的最大值,默認 16k。適當增加該值,可以提高吞吐量,但是如果該值設(shè)置太大,會導(dǎo)致數(shù)據(jù)傳輸延遲增加。 |
linger.ms | 如果數(shù)據(jù)遲遲未達到 batch.size,sender 等待 linger.time之后就會發(fā)送數(shù)據(jù)。單位 ms,默認值是 0ms,表示沒有延遲。生產(chǎn)環(huán)境建議該值大小為 5-100ms 之間。 |
buffer.memory | RecordAccumulator 緩沖區(qū)總大小,默認 32m??梢赃m當增加該值提高緩沖區(qū)的存儲能力 |
compression.type | 生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認是 none,也就是不壓縮。支持壓縮類型:none、gzip、snappy、lz4 和 zstd。 |
這些參數(shù)的應(yīng)用思想都很好理解,就好比我們現(xiàn)實生活中集散中心大巴車拉人,一次拉一個,有人就走。這種方式效率低下,浪費資源。所以一般都是車到了多等一下,等人數(shù)差不多才走,這就是參數(shù)batch.size和linger.ms
的提醒,buffer.memory
其實也好理解,就好比車送到目的地只能容納100個人,你使勁送過去,收不下,只能目的把這個100個人安頓好,才能接著送,所以適當調(diào)大,可以增加吞吐量,至于壓縮compression.type
就是讓一次可以拉更多的人,就好比讓小孩子和大人用一個座位。
3.如何保證消息不丟失?
消息丟失可能發(fā)生在生產(chǎn)者發(fā)送消息、broker保存消息、消費者消費消息等環(huán)節(jié)。
3.1 生產(chǎn)者丟失消息
生產(chǎn)者丟失消息是比較常見的場景,生產(chǎn)者發(fā)送消息到kafka,因為網(wǎng)絡(luò)抖動最后發(fā)現(xiàn)kakfa沒保存,這鍋該誰背?答案是生產(chǎn)者,因為 Kafka Producer 是異步發(fā)送消息的,也就是說如果你調(diào)用的是 producer.send(msg) 這個 API,那么它通常會立即返回,但此時你不能認為消息發(fā)送已成功完成。解決辦法也很簡單:**Producer 永遠要使用帶有回調(diào)通知的發(fā)送 API,也就是說不要使用 producer.send(msg),而要使用 producer.send(msg, callback)**,通過回調(diào)callback才能真正知道消息是否成功發(fā)送
設(shè)置重試 retrie
s 。這里的 retries 同樣是 Producer 的參數(shù),對應(yīng)前面提到的 Producer 自動重試。當出現(xiàn)網(wǎng)絡(luò)的瞬時抖動時,消息發(fā)送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試消息發(fā)送,避免消息丟失。
3.2 broker丟失消息
設(shè)置 acks = all
。acks 是 Producer 的一個參數(shù),代表了你對“已提交”消息的定義。如果設(shè)置成 all,表示生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader和ISR隊列里面的所有節(jié)點收到數(shù)據(jù)后才應(yīng)答。
參數(shù) | 說明 |
acks | 0:生產(chǎn)者發(fā)送過來的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答。 |
設(shè)置 unclean.leader.election.enable = false
。這是 Broker 端的參數(shù),它控制的是哪些 Broker 有資格競選分區(qū)的 Leader。如果一個 Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會造成消息的丟失。故一般都要將該參數(shù)設(shè)置成 false,即不允許這種情況的發(fā)生。
設(shè)置 replication.factor >= 3
。這也是 Broker 端的參數(shù)。其實這里想表述的是,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機制就是冗余。
設(shè)置 min.insync.replicas > 1
。這依然是 Broker 端參數(shù),控制的是消息至少要被寫入到多少個副本才算是“已提交”。設(shè)置成大于 1 可以提升消息持久性。在實際環(huán)境中千萬不要使用默認值 1。
確保 replication.factor > min.insync.replicas
。如果兩者相等,那么只要有一個副本掛機,整個分區(qū)就無法正常工作了。我們不僅要改善消息的持久性,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成。推薦設(shè)置成 replication.factor = min.insync.replicas + 1
3.3 消費者丟失消息
Consumer 程序有個“位移”的概念,表示的是這個 Consumer 當前消費到的 Topic 分區(qū)的位置。如果我們一次消費offset為0-9的10條消息,拉取到消息之后就自動提交了位移,但是消費到位移5的時候報錯了,那么位移5-9的消息就被丟失了。
解決辦法也很簡單就是確保消息消費完成再提交。Consumer 端有個參數(shù) enable.auto.commit
,最好把它設(shè)置成 false關(guān)閉自動提交,并采用手動提交位移的方式。如果啟用了自動提交,Consumer 端還有個參數(shù)就派上用場了:auto.commit.interval.ms
。它的默認值是 5 秒,表明 Kafka 每 5 秒會為你自動提交一次位移。
手動提交位移是保證消費者消息消息過程中不丟失消息的核心所在,手動提交分為同步和異步,同步提交會使消費者處于阻塞狀態(tài),直到遠端的 Broker 返回提交結(jié)果。而異步提交它會立即返回,不會阻塞,因此不會影響 Consumer 應(yīng)用的 TPS。但是異步條件有一個缺點就是發(fā)生了異常我們無法立刻感知到并相應(yīng)邏輯處理,所以代碼里面的提交位移邏輯一般是:同步+異步
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
process(records); // 處理消息
kafkaConsumer.commitAsync(); // 使用異步提交規(guī)避阻塞
}
} catch (Exception e) {
handle(e); // 處理異常
} finally {
try {
kafkaConsumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
kafkaConsumer.close();
}
}
這段代碼同時使用了 commitSync() 和 commitAsync()。對于常規(guī)性、階段性的手動提交,我們調(diào)用 commitAsync() 避免程序阻塞,而在 Consumer 要關(guān)閉前,我們調(diào)用 commitSync() 方法執(zhí)行同步阻塞式的位移提交,以確保 Consumer 關(guān)閉前能夠保存正確的位移數(shù)據(jù)。將兩者結(jié)合后,我們既實現(xiàn)了異步無阻塞式的位移管理,也確保了 Consumer 位移的正確性,所以,如果你需要自行編寫代碼開發(fā)一套 Kafka Consumer 應(yīng)用,那么我推薦你使用上面的代碼范例來實現(xiàn)手動的位移提交。
關(guān)于提交位移有一個可能發(fā)生的異常:CommitFailedException
,顧名思義就是 Consumer 客戶端在提交位移時出現(xiàn)了錯誤或異常,而且還是那種不可恢復(fù)的嚴重異常。這是因為在拉取到消息消費完之后提交位移這期間,消費者組發(fā)生了重平衡。關(guān)于什么是重平衡可以看后續(xù)總結(jié)講述。
4.如何保證消息不會重復(fù)消費?
在生產(chǎn)者端可能由于開啟了重試機制導(dǎo)致同一條消息被發(fā)送了兩次,這時候可以讓生產(chǎn)者開啟冪等性配置參數(shù):enable.idempotence
默認為true, 即開啟的。
消費者端就是要保證實際消費消息的位移和提交的位移一致,使用手工同步位移。當然我們也可以在消費消息的代碼邏輯保證消費的冪等性:使用唯一索引或者分布式鎖都行
5.如何解決消息積壓問題
消息積壓會導(dǎo)致很多問題,?如磁盤被打滿、?產(chǎn)端發(fā)消息導(dǎo)致kafka性能過慢,最后導(dǎo)致出現(xiàn)服務(wù)雪崩不可用,解決方案如下:
- 如果是Kafka消費能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時提升消費組的消費者數(shù)量,消費者數(shù) = 分區(qū)數(shù)。因為主題的一個分區(qū)只能被消費者組中一個消費者消費,假如我們消費者組里有3個消費者,但是主題就一個分區(qū),這就白白空著兩個消費者無所事事。如果已經(jīng)是多個消費者對應(yīng)多個分區(qū)了,還是消費比較慢,就說明是消息消息的代碼邏輯過重處理過慢,可以引入多線程異步操作,但這時候需要自己控制代碼邏輯來保證消費的順序性,因為一個分區(qū)內(nèi)的消息是有序的,被一個消費者順序消費,但是當消費者開啟多線程處理之后就不能保證順序消費了。
- 如果是下游的數(shù)據(jù)處理不及時:提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過少(拉取數(shù)據(jù)/處理時間 < 生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會造成數(shù)據(jù)積壓。比如說可以從一次最多拉取500條,調(diào)整為一次最多拉取1000條。簡單來說就是在消費能力跟得上的同時,盡量保證消費速度>生產(chǎn)速度,這樣就不會堆積了。
6.如何保證消息的有序性。
生產(chǎn)者:在發(fā)送時將ack不能設(shè)置0,關(guān)閉重試,使?同步發(fā)送,等到發(fā)送成功再發(fā)送下?條。確保消息是順序發(fā)送的。
消費者:消息是發(fā)送到?個分區(qū)中,只能有?個消費組的消費者來接收消息。
因此,kafka的順序消費會犧牲掉性能。
7.什么是重平衡?
Rebalance
就是讓一個 Consumer Group 下所有的 Consumer 實例就如何消費訂閱主題的所有分區(qū)達成共識的過程。在 Rebalance 過程中,所有 Consumer 實例共同參與,在協(xié)調(diào)者組件的幫助下,完成訂閱主題分區(qū)的分配。但是,在整個過程中,所有實例都不能消費任何消息,因此它對 Consumer 的 TPS 影響很大。重平衡觸發(fā)的場景:
- 消費者組訂閱的主題的分區(qū)數(shù)增加了,注意主題分區(qū)數(shù)只能增加,不能減少
- 消費者組訂閱的主題數(shù)有變化,可能變多了也可能變少了。
- 消費者組成員有變化,可能變多了也可能變少了。
前兩個訂閱的分區(qū)數(shù)增加還是主題數(shù)變化,都是一個主動發(fā)起Rebalance,我們是能提前感知到的。Consumer 實例增加的情況很好理解,當我們啟動一個配置有相同 group.id 值的 Consumer 程序時,實際上就向這個 Group 添加了一個新的 Consumer 實例。此時,Coordinator 會接納這個新實例,將其加入到組中,并重新分配分區(qū)。通常來說,增加 Consumer 實例的操作都是計劃內(nèi)的,可能是出于增加 TPS 或提高伸縮性的需要。但是對于Consumer 實例減少,大部分不是人為操作下線的,更多情況是Consumer 實例會被 Coordinator 錯誤地認為“已停止”從而被“踢出”Group。如果是這個原因?qū)е碌?Rebalance,這種情況就得引起我們重視了。
Coordinator 會在什么情況下認為某個 Consumer 實例已掛從而要退組呢?
當 Consumer Group 完成 Rebalance 之后,每個 Consumer 實例都會定期地向 Coordinator 發(fā)送心跳請求,表明它還存活著。如果某個 Consumer 實例不能及時地發(fā)送這些心跳請求,Coordinator 就會認為該 Consumer 已經(jīng)“死”了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。Consumer 端有個參數(shù),叫 session.timeout.ms
,就是被用來表征此事的。該參數(shù)的默認值是 145秒,即如果 Coordinator 在 45 秒之內(nèi)沒有收到 Group 下某 Consumer 實例的心跳,它就會認為這個 Consumer 實例已經(jīng)掛了??梢赃@么說,session.timout.ms 決定了 Consumer 存活性的時間間隔。
Consumer 端還有一個參數(shù),用于控制 Consumer 實際消費能力對 Rebalance 的影響,即 max.poll.interval.ms
參數(shù)。它限定了 Consumer 端應(yīng)用程序兩次調(diào)用 poll 方法的最大時間間隔。它的默認值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內(nèi)無法消費完 poll 方法返回的消息,那么 Consumer 會主動發(fā)起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance。
8.kafka作為消息隊列為什么發(fā)送和消費消息這么快?
- 消息分區(qū):不受單臺服務(wù)器的限制,可以不受限的處理更多的數(shù)據(jù)
- 順序讀寫:磁盤順序讀寫,提升讀寫效率
- 頁緩存:把磁盤中的數(shù)據(jù)緩存到內(nèi)存中,把對磁盤的訪問變?yōu)閷?nèi)存的訪問
- 零拷貝:減少上下文切換及數(shù)據(jù)拷貝
- 消息壓縮:減少磁盤IO和網(wǎng)絡(luò)IO
- 分批發(fā)送:將消息打包批量發(fā)送,減少網(wǎng)絡(luò)開銷