一文講清 Kafka 工作流程和存儲機制
一、Kafka 文件存儲機制
topic構(gòu)成
Kafka 中消息是以 topic 進行分類的,生產(chǎn)者生產(chǎn)消息,消費者消費消息,都是面向 topic 的。
在 Kafka 中,一個 topic 可以分為多個 partition,一個 partition 分為多個 segment,每個 segment 對應(yīng)兩個文件:.index 和 .log 文件
topic 是邏輯上的概念,而 patition 是物理上的概念,每個 patition 對應(yīng)一個 log 文件,而 log 文件中存儲的就是 producer 生產(chǎn)的數(shù)據(jù),patition 生產(chǎn)的數(shù)據(jù)會被不斷的添加到 log 文件的末端,且每條數(shù)據(jù)都有自己的 offset。
消費組中的每個消費者,都是實時記錄自己消費到哪個 offset,以便出錯恢復(fù),從上次的位置繼續(xù)消費。
消息存儲原理
由于生產(chǎn)者生產(chǎn)的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka 采取了分片和索引機制,將每個 partition 分為多個 segment。每個 segment 對應(yīng)兩個文件——.index文件和 .log文件。這些文件位于一個文件夾下,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號。
如下,我們創(chuàng)建一個只有一個分區(qū)一個副本的 topic
- > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic starfish
然后可以在 kafka-logs 目錄(server.properties 默認配置)下看到會有個名為 starfish-0 的文件夾。如果,starfish 這個 topic 有三個分區(qū),則其對應(yīng)的文件夾為 starfish-0,starfish-1,starfish-2。
這些文件的含義如下:
類別 | 作用 |
---|---|
.index | 偏移量索引文件,存儲數(shù)據(jù)對應(yīng)的偏移量 |
.timestamp | 時間戳索引文件 |
.log | 日志文件,存儲生產(chǎn)者生產(chǎn)的數(shù)據(jù) |
.snaphot | 快照文件 |
leader-epoch-checkpoint | 保存了每一任leader開始寫入消息時的offset,會定時更新。follower被選為leader時會根據(jù)這個確定哪些消息可用 |
index 和 log 文件以當前 segment 的第一條消息的 offset 命名。偏移量 offset 是一個 64 位的長整形數(shù),固定是20 位數(shù)字,長度未達到,用 0 進行填補,索引文件和日志文件都由此作為文件名命名規(guī)則。所以從上圖可以看出,我們的偏移量是從 0 開始的,.index 和 .log 文件名稱都為 00000000000000000000。
接著往 topic 中發(fā)送一些消息,并啟動消費者消費
- > bin /kafka-console-producer.sh --bootstrap-server localhost:9092 --topic starfishone
- > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic starfish --from-beginningone
查看 .log 文件下是否有數(shù)據(jù) one
內(nèi)容存在一些”亂碼“,因為數(shù)據(jù)是經(jīng)過序列化壓縮的。
那么數(shù)據(jù)文件 .log 大小有限制嗎,能保存多久時間?這些我們都可以通過 Kafka 目錄下 conf/server.properties 配置文件修改:
- # log文件存儲時間,單位為小時,這里設(shè)置為1周
- log.retention.hours=168
- # log文件大小的最大值,這里為1g,超過這個值,則會創(chuàng)建新的segment(也就是新的.index和.log文件)
- log.segment.bytes=1073741824
比如,當生產(chǎn)者生產(chǎn)數(shù)據(jù)量較多,一個 segment 存儲不下觸發(fā)分片時,在日志 topic 目錄下你會看到類似如下所示的文件:
- 00000000000000000000.index
- 00000000000000000000.log
- 00000000000000170410.index
- 00000000000000170410.log
- 00000000000000239430.index
- 00000000000000239430.log
下圖展示了Kafka查找數(shù)據(jù)的過程:
.index文件 存儲大量的索引信息,.log文件 存儲大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中 message 的物理偏移地址。
比如現(xiàn)在要查找偏移量 offset 為 3 的消息,根據(jù) .index 文件命名我們可以知道,offset 為 3 的索引應(yīng)該從00000000000000000000.index 里查找。根據(jù)上圖所示,其對應(yīng)的索引地址為 756-911,所以 Kafka 將讀取00000000000000000000.log 756~911區(qū)間的數(shù)據(jù)。
二、Kafka 生產(chǎn)過程
Kafka 生產(chǎn)者用于生產(chǎn)消息。通過前面的內(nèi)容我們知道,Kafka 的 topic 可以有多個分區(qū),那么生產(chǎn)者如何將這些數(shù)據(jù)可靠地發(fā)送到這些分區(qū)?生產(chǎn)者發(fā)送數(shù)據(jù)的不同的分區(qū)的依據(jù)是什么?針對這兩個疑問,這節(jié)簡單記錄下。
3.2.1 寫入流程
producer 寫入消息流程如下:
- producer 先從 zookeeper 的 "/brokers/.../state"節(jié)點找到該 partition 的 leader
- producer 將消息發(fā)送給該 leader
- leader 將消息寫入本地 log
- followers 從 leader pull 消息,寫入本地 log 后向 leader 發(fā)送 ACK
- leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 發(fā)送 ACK
2.1 寫入方式
producer 采用推(push) 模式將消息發(fā)布到 broker,每條消息都被追加(append) 到分區(qū)(patition) 中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機寫內(nèi)存要高,保障 kafka 吞吐率)。
2.2 分區(qū)(Partition)
消息發(fā)送時都被發(fā)送到一個 topic,其本質(zhì)就是一個目錄,而 topic 是由一些 Partition Logs(分區(qū)日志)組成
分區(qū)的原因:
方便在集群中擴展,每個 Partition 可以通過調(diào)整以適應(yīng)它所在的機器,而一個 topic 又可以有多個 Partition 組成,因此整個集群就可以適應(yīng)任意大小的數(shù)據(jù)了;
可以提高并發(fā),因為可以以 Partition 為單位讀寫了。
分區(qū)的原則:
我們需要將 producer 發(fā)送的數(shù)據(jù)封裝成一個 ProducerRecord 對象。
- public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
- public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value)
- public ProducerRecord (String topic, Integer partition, K key, V value, Iterable<Header> headers)
- public ProducerRecord (String topic, Integer partition, K key, V value)
- public ProducerRecord (String topic, K key, V value)
- public ProducerRecord (String topic, V value)
2.3 副本(Replication) 同一個 partition 可能會有多個 replication( 對應(yīng) server.properties 配置中的 default.replication.factor=N)。沒有 replication 的情況下,一旦 broker 宕機,其上所有 patition 的數(shù)據(jù)都不可被消費,同時 producer 也不能再將數(shù)據(jù)存于其上的 patition。引入 replication 之后,同一個 partition 可能會有多個 replication,而這時需要在這些 replication 之間選出一 個 leader, producer 和 consumer 只與這個 leader 交互,其它 replication 作為 follower 從 leader 中復(fù)制數(shù)據(jù)。 2.4 數(shù)據(jù)可靠性保證 為保證 producer 發(fā)送的數(shù)據(jù),能可靠的發(fā)送到指定的 topic,topic 的每個 partition 收到 producer 數(shù)據(jù)后,都需要向 producer 發(fā)送 ack(acknowledgement確認收到),如果 producer 收到 ack,就會進行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。 a) 副本數(shù)據(jù)同步策略主要有如下兩種 Kafka 選擇了第二種方案,原因如下: b) ISR 采用第二種方案之后,設(shè)想一下情景:leader 收到數(shù)據(jù),所有 follower 都開始同步數(shù)據(jù),但有一個 follower 掛了,遲遲不能與 leader 保持同步,那 leader 就要一直等下去,直到它完成同步,才能發(fā)送 ack,這個問題怎么解決呢? leader 維護了一個動態(tài)的 in-sync replica set(ISR),意為和 leader 保持同步的 follower 集合。當 ISR 中的follower 完成數(shù)據(jù)的同步之后,leader 就會給 follower 發(fā)送 ack。如果 follower 長時間未向 leader 同步數(shù)據(jù),則該 follower 將會被踢出 ISR,該時間閾值由 replica.lag.time.max.ms 參數(shù)設(shè)定。leader 發(fā)生故障之后,就會從 ISR 中選舉新的 leader。(之前還有另一個參數(shù),0.9 版本之后 replica.lag.max.messages 參數(shù)被移除了) c) ack應(yīng)答機制 對于某些不太重要的數(shù)據(jù),對數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等 ISR 中的follower全部接收成功。 所以Kafka為用戶提供了三種可靠性級別,用戶根據(jù)對可靠性和延遲的要求進行權(quán)衡,選擇以下的acks 參數(shù)配置 0:producer 不等待 broker 的 ack,這一操作提供了一個最低的延遲,broker 一接收到還沒有寫入磁盤就已經(jīng)返回,當 broker 故障時有可能丟失數(shù)據(jù); 1:producer 等待 broker 的 ack,partition 的 leader 落盤成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么將會丟失數(shù)據(jù); -1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盤成功后才返回 ack。但是 如果在 follower 同步完成后,broker 發(fā)送 ack 之前,leader 發(fā)生故障,那么就會造成數(shù)據(jù)重復(fù)。 d) 故障處理 由于我們并不能保證 Kafka 集群中每時每刻 follower 的長度都和 leader 一致(即數(shù)據(jù)同步是有時延的),那么當leader 掛掉選舉某個 follower 為新的 leader 的時候(原先掛掉的 leader 恢復(fù)了成為了 follower),可能會出現(xiàn)leader 的數(shù)據(jù)比 follower 還少的情況。為了解決這種數(shù)據(jù)量不一致帶來的混亂情況,Kafka 提出了以下概念: 消費者和 leader 通信時,只能消費 HW 之前的數(shù)據(jù),HW 之后的數(shù)據(jù)對消費者不可見。 針對這個規(guī)則: 所以數(shù)據(jù)一致性并不能保證數(shù)據(jù)不丟失或者不重復(fù),這是由 ack 控制的。HW 規(guī)則只能保證副本之間的數(shù)據(jù)一致性! 2.5 Exactly Once語義 將服務(wù)器的 ACK 級別設(shè)置為 -1,可以保證 Producer 到 Server 之間不會丟失數(shù)據(jù),即 At Least Once 語義。相對的,將服務(wù)器 ACK 級別設(shè)置為 0,可以保證生產(chǎn)者每條消息只會被發(fā)送一次,即 At Most Once語義。 At Least Once 可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù)。相對的,At Most Once 可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失。但是,對于一些非常重要的信息,比如說交易數(shù)據(jù),下游數(shù)據(jù)消費者要求數(shù)據(jù)既不重復(fù)也不丟失,即 Exactly Once 語義。在 0.11 版本以前的 Kafka,對此是無能為力的,只能保證數(shù)據(jù)不丟失,再在下游消費者對數(shù)據(jù)做全局去重。對于多個下游應(yīng)用的情況,每個都需要單獨做全局去重,這就對性能造成了很大的影響。 0.11 版本的 Kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指 Producer 不論向 Server 發(fā)送多少次重復(fù)數(shù)據(jù)。Server 端都會只持久化一條,冪等性結(jié)合 At Least Once 語義,就構(gòu)成了 Kafka 的 Exactily Once 語義,即:At Least Once + 冪等性 = Exactly Once 要啟用冪等性,只需要將 Producer 的參數(shù)中 enable.idompotence 設(shè)置為 true 即可。Kafka 的冪等性實現(xiàn)其實就是將原來下游需要做的去重放在了數(shù)據(jù)上游。開啟冪等性的 Producer 在初始化的時候會被分配一個 PID,發(fā)往同一 Partition 的消息會附帶 Sequence Number。而 Broker 端會對 但是 PID 重啟就會變化,同時不同的 Partition 也具有不同主鍵,所以冪等性無法保證跨分區(qū)會話的 Exactly Once。 三、Broker 保存消息 3.1 存儲方式 物理上把 topic 分成一個或多個 patition(對應(yīng) server.properties 中的 num.partitions=3 配置),每個 patition 物理上對應(yīng)一個文件夾(該文件夾存儲該 patition 的所有消息和索引文件)。 3.2 存儲策略 無論消息是否被消費, kafka 都會保留所有消息。有兩種策略可以刪除舊數(shù)據(jù): 基于時間:log.retention.hours=168 基于大小:log.retention.bytes=1073741824 需要注意的是,因為 Kafka 讀取特定消息的時間復(fù)雜度為 O(1),即與文件大小無關(guān), 所以這里刪除過期文件與提高 Kafka 性能無關(guān)。 四、Kafka 消費過程 Kafka 消費者采用 pull 拉模式從 broker 中消費數(shù)據(jù)。與之相對的 push(推)模式很難適應(yīng)消費速率不同的消費者,因為消息發(fā)送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息。而 pull 模式則可以根據(jù) consumer 的消費能力以適當?shù)乃俾氏M消息。 pull 模式不足之處是,如果 kafka 沒有數(shù)據(jù),消費者可能會陷入循環(huán)中,一直返回空數(shù)據(jù)。為了避免這種情況,我們在我們的拉請求中有參數(shù),允許消費者請求在等待數(shù)據(jù)到達的“長輪詢”中進行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大小,或者傳入等待超時時間)。 4.1 消費者組 消費者是以 consumer group 消費者組的方式工作,由一個或者多個消費者組成一個組, 共同消費一個 topic。每個分區(qū)在同一時間只能由 group 中的一個消費者讀取,但是多個 group 可以同時消費這個 partition。在圖中,有一個由三個消費者組成的 group,有一個消費者讀取主題中的兩個分區(qū),另外兩個分別讀取一個分區(qū)。某個消費者讀取某個分區(qū),也可以叫做某個消費者是某個分區(qū)的擁有者。 在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的 group 成員會自動負載均衡讀取之前失敗的消費者讀取的分區(qū)。 消費者組最為重要的一個功能是實現(xiàn)廣播與單播的功能。一個消費者組可以確保其所訂閱的 Topic 的每個分區(qū)只能被從屬于該消費者組中的唯一一個消費者所消費;如果不同的消費者組訂閱了同一個 Topic,那么這些消費者組之間是彼此獨立的,不會受到相互的干擾。 如果我們希望一條消息可以被多個消費者所消費,那么可以將這些消費者放到不同的消費者組中,這實際上就是廣播的效果;如果希望一條消息只能被一個消費者所消費,那么可以將這些消費者放到同一個消費者組中,這實際上就是單播的效果。 4.2 分區(qū)分配策略 一個 consumer group 中有多個 consumer,一個 topic 有多個 partition,所以必然會涉及到 partition 的分配問題,即確定哪個 partition 由哪個 consumer 來消費。 Kafka 有兩種分配策略,一是 RoundRobin,一是 Range。 RoundRobin RoundRobin 即輪詢的意思,比如現(xiàn)在有一個三個消費者 ConsumerA、ConsumerB 和 ConsumerC 組成的消費者組,同時消費 TopicA 主題消息,TopicA 分為 7 個分區(qū),如果采用 RoundRobin 分配策略,過程如下所示: 圖片:mrbird.cc 這種輪詢的方式應(yīng)該很好理解。但如果消費者組消費多個主題的多個分區(qū),會發(fā)生什么情況呢?比如現(xiàn)在有一個兩個消費者 ConsumerA 和 ConsumerB 組成的消費者組,同時消費 TopicA 和 TopicB 主題消息,如果采用RoundRobin 分配策略,過程如下所示: 注:TAP0 表示 TopicA Partition0 分區(qū)數(shù)據(jù),以此類推。 這種情況下,采用 RoundRobin 算法分配,多個主題會被當做一個整體來看,這個整體包含了各自的 Partition,比如在 Kafka-clients 依賴中,與之對應(yīng)的對象為 TopicPartition。接著將這些 TopicPartition 根據(jù)其哈希值進行排序,排序后采用輪詢的方式分配給消費者。 但這會帶來一個問題:假如上圖中的消費者組中,ConsumerA 只訂閱了 TopicA 主題,ConsumerB 只訂閱了TopicB 主題,采用 RoundRobin 輪詢算法后,可能會出現(xiàn) ConsumerA 消費了 TopicB 主題分區(qū)里的消息,ConsumerB 消費了 TopicA 主題分區(qū)里的消息。 綜上所述,RoundRobin 算法只適用于消費者組中消費者訂閱的主題相同的情況。同時會發(fā)現(xiàn),采用 RoundRobin 算法,消費者組里的消費者之間消費的消息個數(shù)最多相差 1 個。 Range Kafka 默認采用 Range 分配策略,Range 顧名思義就是按范圍劃分的意思。 比如現(xiàn)在有一個三個消費者 ConsumerA、ConsumerB 和 ConsumerC 組成的消費者組,同時消費 TopicA 主題消息,TopicA分為7個分區(qū),如果采用 Range 分配策略,過程如下所示: 假如現(xiàn)在有一個兩個消費者 ConsumerA 和 ConsumerB 組成的消費者組,同時消費 TopicA 和 TopicB 主題消息,如果采用 Range 分配策略,過程如下所示: Range 算法并不會把多個主題分區(qū)當成一個整體。 從上面的例子我們可以總結(jié)出Range算法的一個弊端:那就是同一個消費者組內(nèi)的消費者消費的消息數(shù)量相差可能較大。 4.3 offset 的維護 由于 consumer 在消費過程中可能會出現(xiàn)斷電宕機等故障,consumer 恢復(fù)后,需要從故障前的位置繼續(xù)消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復(fù)后繼續(xù)消費。 Kafka 0.9 版本之前,consumer 默認將 offset 保存在 Zookeeper 中,從 0.9 版本開始,consumer 默認將 offset保存在 Kafka 一個內(nèi)置的 topic 中,該 topic 為 _consumer_offsets。 消費 topic 后,查看 kafka-logs 目錄,會發(fā)現(xiàn)多出 50 個分區(qū)。 默認情況下__consumer_offsets 有 50 個分區(qū),如果你的系統(tǒng)中 consumer group 也很多的話,那么這個命令的輸出結(jié)果會很多 五、Kafka事務(wù) Kafka 從 0.11 版本開始引入了事務(wù)支持。事務(wù)可以保證 Kafka 在 Exactly Once 語義的基礎(chǔ)上,生產(chǎn)和消費可以跨分區(qū)和會話,要么全部成功,要么全部失敗。 5.1 Producer事務(wù) 為了了實現(xiàn)跨分區(qū)跨會話的事務(wù),需要引入一個全局唯一的 TransactionID,并將 Producer 獲得的 PID 和Transaction ID 綁定。這樣當 Producer 重啟后就可以通過正在進行的 TransactionID 獲得原來的 PID。 為了管理 Transaction,Kafka 引入了一個新的組件 Transaction Coordinator。Producer 就是通過和 Transaction Coordinator 交互獲得 Transaction ID 對應(yīng)的任務(wù)狀態(tài)。Transaction Coordinator 還負責將事務(wù)所有寫入 Kafka 的一個內(nèi)部 Topic,這樣即使整個服務(wù)重啟,由于事務(wù)狀態(tài)得到保存,進行中的事務(wù)狀態(tài)可以得到恢復(fù),從而繼續(xù)進行。 5.2 Consumer事務(wù) 對 Consumer 而言,事務(wù)的保證就會相對較弱,尤其是無法保證 Commit 的消息被準確消費。這是由于Consumer 可以通過 offset 訪問任意信息,而且不同的 SegmentFile 生命周期不同,同一事務(wù)的消息可能會出現(xiàn)重啟后被刪除的情況。 參考: 尚硅谷Kafka教學 部分圖片來源:mrbird.cc https://gitbook.cn/books/5ae1e77197c22f130e67ec4e/index.html
方案
優(yōu)點
缺點
半數(shù)以上完成同步,就發(fā)送ack
延遲低
選舉新的 leader 時,容忍n臺節(jié)點的故障,需要2n+1個副本
全部完成同步,才發(fā)送ack
選舉新的 leader 時,容忍n臺節(jié)點的故障,需要 n+1 個副本
延遲高