聊聊 Kafka 那點破事!
本文轉載自微信公眾號「微觀技術」,作者微觀技術。轉載本文請聯系微觀技術公眾號。
大家好,我是Tom哥~
Kafka作為一款開源的消息引擎,很多人并不陌生,但深入其源碼的同學估計不多,除非你是中間件團隊消息系統維護者。但術業有專攻,市面上那么多開源框架且每個框架又經常迭代升級,花精力深入了解每一個框架源碼不太現實,本文會以業務視角羅列工作中大家需要熟知的一些知識
本篇文章的目錄:
首先,為什么使用kafka?
- 削峰填谷。緩沖上下游瞬時突發流量,保護“脆弱”的下游系統不被壓垮,避免引發全鏈路服務“雪崩”。
- 系統解耦。發送方和接收方的松耦合,一定程度簡化了開發成本,減少了系統間不必要的直接依賴。
Kafka 名詞術語,一網打盡
- Broker:接收客戶端發送過來的消息,對消息進行持久化
- 主題:Topic。主題是承載消息的邏輯容器,在實際使用中多用來區分具體的業務。
- 分區:Partition。一個有序不變的消息序列。每個主題下可以有多個分區。
- 消息:這里的消息就是指 Kafka 處理的主要對象。
- 消息位移:Offset。表示分區中每條消息的位置信息,是一個單調遞增且不變的值。
- 副本:Replica。Kafka 中同一條消息能夠被拷貝到多個地方以提供數據冗余,這些地方就是所謂的副本。副本還分為領導者副本和追隨者副本,各自有不同的角色劃分。每個分區可配置多個副本實現高可用。一個分區的N個副本一定在N個不同的Broker上。
- 生產者:Producer。向主題發布新消息的應用程序。
- 消費者:Consumer。從主題訂閱新消息的應用程序。
- 消費者位移:Consumer Offset。表示消費者消費進度,每個消費者都有自己的消費者位移。offset保存在broker端的內部topic中,不是在clients中保存
- 消費者組:Consumer Group。多個消費者實例共同組成的一個組,同時消費多個分區以實現高吞吐。
- 重平衡:Rebalance。消費者組內某個消費者實例掛掉后,其他消費者實例自動重新分配訂閱主題分區
ZooKeeper 在里面的職責是什么?
它是一個分布式協調框架,負責協調管理并保存 Kafka 集群的所有元數據信息,比如集群都有哪些 Broker 在運行、創建了哪些 Topic,每個 Topic 都有多少分區以及這些分區的 Leader 副本都在哪些機器上等信息。
消息傳輸的格式
純二進制的字節序列。當然消息還是結構化的,只是在使用之前都要將其轉換成二進制的字節序列。
消息傳輸協議
- 點對點模型。系統 A 發送的消息只能被系統 B 接收,其他任何系統都不能讀取 A 發送的消息
- 發布/訂閱模型。該模型也有發送方和接收方,只不過提法不同。發送方也稱為發布者(Publisher),接收方稱為訂閱者(Subscriber)。和點對點模型不同的是,這個模型可能存在多個發布者向相同的主題發送消息,而訂閱者也可能存在多個,它們都能接收到相同主題的消息。
消息壓縮
生產者程序中配置compression.type 參數即表示啟用指定類型的壓縮算法。
props.put(“compression.type”, “gzip”),它表明該 Producer 的壓縮算法使用的是GZIP。這樣 Producer 啟動后生產的每個消息集合都是經 GZIP 壓縮過的,故而能很好地節省網絡傳輸帶寬以及 Kafka Broker 端的磁盤占用。
但如果Broker又指定了不同的壓縮算法,如:Snappy,會將生產端的消息解壓然后按自己的算法重新壓縮。
各壓縮算法比較:吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
kafka默認不指定壓縮算法。
消息解壓縮
當 Consumer pull消息時,Broker 會原樣發送出去,當消息到達 Consumer 端后,由 Consumer 自行解壓縮還原成之前的消息。
分區策略
編寫一個類實現org.apache.kafka.clients.Partitioner接口。實現內部兩個方法:partition()和close()。然后顯式地配置生產者端的參數partitioner.class
常見的策略:
- 輪詢策略(默認)。保證消息最大限度地被平均分配到所有分區上。
- 隨機策略。隨機策略是老版本生產者使用的分區策略,在新版本中已經改為輪詢了。
- 按key分區策略。key可能是uid或者訂單id,將同一標志位的所有消息都發送到同一分區,這樣可以保證一個分區內的消息有序
- 其他分區策略。如:基于地理位置的分區策略
生產者管理TCP連接
在new KafkaProducer 實例時,生產者應用會在后臺創建并啟動一個名為 Sender 的線程,該 Sender 線程開始運行時首先會創建與 Broker 的連接。此時還不知道給哪個topic發消息,所以Producer 啟動時會發起與所有的 Broker 的連接。
Producer 通過metadata.max.age.ms 參數定期地去更新元數據信息,默認值是 300000,即 5 分鐘,不管集群那邊是否有變化,Producer 每 5 分鐘都會強制刷新一次元數據以保證它是最新的數據。
Producer 發送消息:
Producer 使用帶回調通知的發送 API, producer.send(msg, callback)。
設置 acks = all。Producer 的一個參數,表示所有副本都成功接收到消息,該消息才算是“已提交”,最高等級,acks的其它值說明。min.insync.replicas > 1,表示消息至少要被寫入到多少個副本才算是“已提交”
retries 是 Producer 的參數。當出現網絡的瞬時抖動時,消息發送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試消息發送,避免消息丟失。
冪等性 Producer
設置參數props.put(“enable.idempotence”, ture),Producer 自動升級成冪等性 Producer,其他所有的代碼邏輯都不需要改變。Kafka 自動幫你做消息的重復去重。
原理很簡單,就是經典的空間換時間,即在 Broker 端多保存一些字段。當 Producer 發送了具有相同字段值的消息后,Broker 能夠自動知曉這些消息已經重復了,可以在后臺默默地把它們“丟棄”掉。
只能保證單分區、單會話上的消息冪等性。一個冪等性 Producer 能夠保證某個topic的一個分區上不出現重復消息,但無法實現多個分區的冪等性。比如采用輪詢,下一次提交換了一個分區就無法解決
事務型 Producer
能夠保證將消息原子性地寫入到多個分區中。這批消息要么全部寫入成功,要么全部失敗。能夠保證跨分區、跨會話間的冪等性。
- producer.initTransactions();
- try {
- producer.beginTransaction();
- producer.send(record1);
- producer.send(record2);
- //提交事務
- producer.commitTransaction();
- } catch (KafkaException e) {
- //事務終止
- producer.abortTransaction();
- }
實際上即使寫入失敗,Kafka 也會把它們寫入到底層的日志中,也就是說 Consumer 還是會看到這些消息。要不要處理在 Consumer 端設置 isolation.level ,這個參數有兩個值:
- read_uncommitted:這是默認值,表明 Consumer 能夠讀取到 Kafka 寫入的任何消息
- read_committed:表明 Consumer 只會讀取事務型 Producer 成功提交事務寫入的消息
Kafka Broker 是如何存儲數據?
Kafka 使用消息日志(Log)來保存數據,一個日志就是磁盤上一個只能追加寫(Append-only)消息的物理文件。因為只能追加寫入,故避免了緩慢的隨機 I/O 操作,改為性能較好的順序 I/O 寫操作,這也是實現 Kafka 高吞吐量特性的一個重要手段。
不過如果你不停地向一個日志寫入消息,最終也會耗盡所有的磁盤空間,因此 Kafka 必然要定期地刪除消息以回收磁盤。怎么刪除呢?
簡單來說就是通過日志段(Log Segment)機制。在 Kafka 底層,一個日志又近一步細分成多個日志段,消息被追加寫到當前最新的日志段中,當寫滿了一個日志段后,Kafka 會自動切分出一個新的日志段,并將老的日志段封存起來。Kafka 在后臺還有定時任務會定期地檢查老的日志段是否能夠被刪除,從而實現回收磁盤空間的目的。
Kafka 的備份機制
相同的數據拷貝到多臺機器上。副本的數量是可以配置的。Kafka 中follow副本不會對外提供服務。
副本的工作機制也很簡單:生產者總是向leader副本寫消息;而消費者總是從leader副本讀消息。至于follow副本,它只做一件事:向leader副本以異步方式發送pull請求,請求leader把最新的消息同步給它,必然有一個時間窗口導致它和leader中的數據是不一致的,或者說它是落后于leader。
為什么要引入消費者組?
主要是為了提升消費者端的吞吐量。多個消費者實例同時消費,加速整個消費端的吞吐量(TPS)。
在一個消費者組下,一個分區只能被一個消費者消費,但一個消費者可能被分配多個分區,因而在提交位移時也就能提交多個分區的位移。如果1個topic有2個分區,消費者組有3個消費者,有一個消費者將無法分配到任何分區,處于idle狀態。
理想情況下,Consumer 實例的數量應該等于該 Group 訂閱topic(可能多個)的分區總數。
消費端拉取(批量)、ACK
消費端先拉取并消費消息,然后再ack更新offset。
1)消費者程序啟動多個線程,每個線程維護專屬的 KafkaConsumer 實例,負責完整的消息拉取、消息處理流程。一個KafkaConsumer負責一個分區,能保證分區內的消息消費順序。
缺點:線程數受限于 Consumer 訂閱topic的總分區數。
2)任務切分成了消息獲取和消息處理兩個部分。消費者程序使用單或多線程拉取消息,同時創建專門線程池執行業務邏輯。優點:可以靈活調節消息獲取的線程數,以及消息處理的線程數。
缺點:無法保證分區內的消息消費順序。另外引入了多組線程,使得整個消息消費鏈路被拉長,最終導致正確位移提交會變得異常困難,可能會出現消息的重復消費或丟失。
消費端offset管理
1)老版本的 Consumer組把位移保存在 ZooKeeper 中,但很快發現zk并不適合頻繁的寫更新。
2)在新版本的 Consumer Group 中,Kafka 社區重新設計了 Consumer組的位移管理方式,采用了將位移保存在 Broker端的內部topic中,也稱為“位移主題”,由kafka自己來管理。原理很簡單, Consumer的位移數據作為一條條普通的 Kafka 消息,提交到__consumer_offsets 中。它的消息格式由 Kafka 自己定義,用戶不能修改。位移主題的 Key 主要包括 3 部分內容:
Kafka Consumer 提交位移的方式有兩種:自動提交位移和手動提交位移。
Kafka 使用Compact策略來刪除位移主題中的過期消息,避免該topic無限期膨脹。提供了專門的后臺線程定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除數據。
Rebalance 觸發條件
1)組成員數發生變更。比如有新的 Consumer 實例加入組或者離開組,又或是有 Consumer 實例崩潰被“踢出”組。(99%原因是由它導致)
2) 訂閱topic數發生變更。Consumer Group 可以使用正則表達式的方式訂閱topic,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結尾的topic。在 Consumer Group 的運行過程中,你新創建了一個滿足這樣條件的topic,那么該 Group 就會發生 Rebalance。
3) 訂閱topic的分區數發生變化。Kafka 目前只允許增加topic的分區數。當分區數增加時,也會觸發訂閱該topic的所有 Group 開啟 Rebalance。
消息的順序性
Kafka的設計中多個分區的話無法保證全局的消息順序。如果一定要實現全局的消息順序,只能單分區。
方法二:通過有key分組,同一個key的消息放入同一個分區,保證局部有序
歷史數據清理策略
基于保存時間,log.retention.hours
基于日志大小的清理策略。通過log.retention.bytes控制
組合方式