Kafka集群是如何選擇Leader,你知道嗎?
前言
kafka集群是由多個broker節點組成,這里面包含了許多的知識點,以下的這些問題你都知道嗎?
- 你知道topic的分區leader是怎么選舉的嗎?
- 你知道zookeeper中存儲了kafka的什么信息嗎?起到什么做呢?
- 你知道kafka消息文件是怎么存儲的嗎?
- 如果kafka中leader節點或者follower節點發生故障,消息會丟失嗎?如何保證消息的一致性和可靠性呢?
如果你對這些問題比較模糊的話,那么很有必要看看本文,去了解以下kafka的核心設計,本文主要基于kafka3.x版本講解。
kafka broker核心機制
kafka集群整體架構
kafka集群是由多個kafka broker通過連同一個zookeeper組成,那么他們是如何協同工作對外提供服務的呢?zookeeper中又存儲了什么信息呢?
- kafka broker啟動后,會在zookeeper的/brokers/ids路徑下注冊。
- 同時,其中一個broker會被選舉為控制器(Kafka Controller)。選舉規則也很簡單,誰先注冊到zookeeper中的/controller節點,誰就是控制器。Controller主要負責管理整個集群中所有分區和副本的狀態。
- Kafka Controller會進行Leader選擇,比如上圖中針對TopicA中的0號分區,選擇broker0作為Leader, 然后會將選擇的節點信息注冊到zookeeper的/brokers/topics路徑下,記錄誰是Leader,有哪些服務器可用。
- 被選舉為Leader的topic分區提供對外的讀寫服務。為什么只有Leader節點提供讀寫服務,而不是設計成主從方式,Follower提供讀服務呢?
- 為了保證數據的一致性,因為消息同步延遲,可能導致消費者從不同節點讀取導致不一致。
- kafka設計目的是分布式日志系統,不是一個讀多寫少的場景,kafka的讀寫基本是對等的。
- 主從方式的話帶來設計上的復雜度。
kafka leader選舉機制
那么問題來了,kafka中topic分區是如何選擇leader的呢?為了更好的闡述,我們先來理解下面3個概念。
- ****ISR:表示和 Leader 保持同步的 Follower 集合。如果 Follower 長時間未向 Leader 發送通信請求或同步數據,則該 Follower 將被踢出 ISR。該時間閾值由replica.lag.time.max.ms參數設定,默認 30s。Leader 發生故障之后,就會從 ISR 中選舉新的Leader。
- ****OSR:表示 Follower 與 Leader 副本同步時,延遲過多的副本。
- ****AR: 指的是分區中的所有副本,所以AR = ISR + OSR。
Kafka Controller選舉Leader的規則:在isr隊列中存活為前提,按照AR中排在前面的優先。例如ar[1,0,2], isr [1,0,2],那么leader就會按照1,0,2的順序輪詢。而AR中的這個順序kafka會進行打散,分攤kafka broker的壓力。
當運行中的控制器突然宕機或意外終止時,Kafka 通過監聽zookeeper能夠快速地感知到,并立即啟用備用控制器來代替之前失敗的控制器。這個過程就被稱為 Failover,該過程是自動完成的,無需你手動干預。
開始的時候,Broker 0 是控制器。當 Broker 0 宕機后,ZooKeeper 通過`` Watch 機制感知到并刪除了 /controller 臨時節點。之后,所有存活的 Broker 開始競選新的控制器身份。Broker 3最終贏得了選舉,成功地在 ZooKeeper 上重建了 /controller 節點。之后,Broker 3 會從 ZooKeeper 中讀取集群元數據信息,并初始化到自己的緩存中,后面就有Broker 3來接管選擇Leader的功能了。
Leader 和 Follower 故障處理機制
如果topic分區的leader和follower發生了故障,那么對于數據的一致性和可靠性會有什么樣的影響呢?
- LEO(Log End Offset):每個副本的最后一個offset,LEO就是最新的offset + 1。
- HW(High Watermark):水位線,所有副本中最小的LEO ,消費者只能看到這個水位線左邊的消息,從而保證數據的一致性。
上圖所示,如果follower發生故障怎么辦?
- Follower發生故障后會被臨時踢出ISR隊列。
- 這個期間Leader和Follower繼續接收數據。
- 待該Follower恢復后,Follower會讀取本地磁盤記錄的上次的HW,并將log文件高于HW的部分截取掉,從HW開始向Leader進行同步。
- 等該Follower的LEO大于等于該Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。
如果leader發生故障怎么辦?
- Leader發生故障之后,會從ISR中選出一個新的Leader
- 為保證多個副本之間的數據一致性,其余的Follower會先將各自的log文件高于HW的部分截掉,然后從新的Leader同步數據。
所以為了讓kafka broker保證消息的可靠性和一致性,我們要做如下的配置:
- 設置 生產者producer 的配置acks=all或者-1。leader 在返回確認或錯誤響應之前,會等待所有副本收到悄息,需要配合min.insync.replicas配置使用。這樣就意味著leader和follower的LEO對齊。
- 設置topic 的配置replication.factor>=3副本大于3個,并且 min.insync.replicas>=2表示至少兩個副本應答。
- 設置broker配置unclean.leader.election.enable=false,默認也是false,表示不對落后leader很多的follower也就是非ISR隊列中的副本選擇為Leader, 這樣可以避免數據丟失和數據 不一致,但是可用性會降低。
Leader Partition 負載平衡
正常情況下,Kafka本身會自動把Leader Partition均勻分散在各個機器上,來保證每臺機器的讀寫吞吐量都是均勻的。但是如果某些broker宕機,會導致Leader Partition過于集中在其他少部分幾臺broker上,這會導致少數幾臺broker的讀寫請求壓力過高,其他宕機的broker重啟之后都是follower partition,讀寫請求很低,造成集群負載不均衡。那么該如何負載平衡呢?
- 自動負載均衡
通過broker配置設置自動負載均衡。
- auto.leader.rebalance.enable:默認是 true。自動 Leader Partition 平衡。生產環境中,leader 重選舉的代價比較大,可能會帶來性能影響,建議設置為 false 關閉。
- leader.imbalance.per.broker.percentage:默認是 10%。每個 broker 允許的不平衡的 leader的比率。如果每個 broker 超過了這個值,控制器會觸發 leader 的平衡。
- leader.imbalance.check.interval.seconds:默認值 300 秒。檢查 leader 負載是否平衡的間隔時間。
- 手動負載均衡
- 對所有topic進行負載均衡
./bin/kafka-preferred-replica-election.sh --zookeeper hadoop16:2181,hadoop17:2181,hadoop18:2181/kafka08
- 對指定topic負載均衡
cat topicPartitionList.json
{
"partitions":
[
{"topic":"test.example","partition": "0"}
]
}
./bin/kafka-preferred-replica-election.sh --zookeeper hadoop16:2181,hadoop17:2181,hadoop18:2181/kafka08 --path-to-json-file topicPartitionList.json
kafka的存儲機制
kafka消息最終會存儲到磁盤文件中,那么是如何存儲的呢?清理策略是什么呢?
一個topic分為多個partition,每個partition對應于一個log文件,為防止log文件過大導致數據定位效率低下,Kafka采取了分片和索引機制,每個partition分為多個segment。每個segment包括:“.index”文件、“.log”文件和.timeindex等文件,Producer生產的數據會被不斷追加到該log文件末端。
上圖中t1即為一個topic的名稱,而“t1-0/t1-1”則表明這個目錄是t1這個topic的哪個partition。
kafka中的索引文件以稀疏索引(sparseindex)的方式構造消息的索引,如下圖所示:
1.根據目標offset定位segment文件
2.找到小于等于目標offset的最大offset對應的索引項
3.定位到log文件
4.向下遍歷找到目標Record
注意:index為稀疏索引,大約每往log文件寫入4kb數據,會往index文件寫入一條索引。通過參數log.index.interval.bytes控制,默認4kb。
那kafka中磁盤文件保存多久呢?
kafka 中默認的日志保存時間為 7 天,可以通過調整如下參數修改保存時間。
- log.retention.hours,最低優先級小時,默認 7 天。
- log.retention.minutes,分鐘。
- log.retention.ms,最高優先級毫秒。
- log.retention.check.interval.ms,負責設置檢查周期,默認 5 分鐘。
kafka broker重要參數
前面講解了kafka broker中的核心機制,我們再來看下重要的配置參數。
首先來說下kafka服務端配置屬性Update Mode的作用:
- read-only。被標記為read-only 的參數和原來的參數行為一樣,只有重啟 Broker,才能令修改生效。
- per-broker。被標記為 per-broker 的參數屬于動態參數,修改它之后,無需重啟就會在對應的 broker 上生效。
- cluster-wide。被標記為 cluster-wide 的參數也屬于動態參數,修改它之后,會在整個集群范圍內生效,也就是說,對所有 broker 都生效。也可以為具體的 broker 修改cluster-wide 參數。
Broker重要參數
參數名稱 | 描述 |
replica.lag.time.max.ms | ISR 中,如果 Follower 長時間未向 Leader 發送通信請求或同步數據,則該 Follower 將被踢出 ISR。該時間閾值,默認 30s。 |
auto.leader.rebalance.enable | 默認是 true。自動 Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默認是 10%。每個 broker 允許的不平衡的 leader的比率。如果每個 broker 超過了這個值,控制器會觸發 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默認值 300 秒。檢查 leader 負載是否平衡的間隔時間。 |
log.segment.bytes | Kafka 中 log 日志是分成一塊塊存儲的,此配置是指 log 日志劃分 成塊的大小,默認值 1G。 |
log.index.interval.bytes | 默認 4kb,kafka 里面每當寫入了 4kb 大小的日志(.log),然后就往 index 文件里面記錄一個索引。 |
log.retention.hours | Kafka 中數據保存的時間,默認 7 天。 |
log.retention.minutes | Kafka 中數據保存的時間,分鐘級別,默認關閉。 |
log.retention.ms | Kafka 中數據保存的時間,毫秒級別,默認關閉。 |
log.retention.check.interval.ms | 檢查數據是否保存超時的間隔,默認是 5 分鐘。 |
log.retention.bytes | 默認等于-1,表示無窮大。超過設置的所有日志總大小,刪除最早的 segment。 |
log.cleanup.policy | 默認是 delete,表示所有數據啟用刪除策略;如果設置值為 compact,表示所有數據啟用壓縮策略。 |
num.io.threads | 默認是 8。負責寫磁盤的線程數。整個參數值要占總核數的 50%。 |
num.replica.fetchers | 副本拉取線程數,這個參數占總核數的 50%的 1/3 |
num.network.threads | 默認是 3。數據傳輸線程數,這個參數占總核數的50%的 2/3 。 |
log.flush.interval.messages | 強制頁緩存刷寫到磁盤的條數,默認是 long 的最大值,9223372036854775807。一般不建議修改,交給系統自己管理。 |
log.flush.interval.ms | 每隔多久,刷數據到磁盤,默認是 null。一般不建議修改,交給系統自己管理。 |
總結
Kafka集群的分區多副本架構是 Kafka 可靠性保證的核心,把消息寫入多個副本可以使 Kafka 在發生崩潰時仍能保證消息的持久性。本文圍繞這樣的核心架構講解了其中的一些核心機制,包括Leader的選舉、消息的存儲機制等等。