在家辦公這些天整理的Kafka知識點大全
Kakfa 廣泛應用于國內外大廠,例如 BAT、字節跳動、美團、Netflix、Airbnb、Twitter 等等。今天我們通過這篇文章深入了解 Kafka 的工作原理。
圖片來自 Pexels
Kafka 概述
Kakfa 是一個分布式的基于發布/訂閱模式的消息隊列(Message Queue),主要應用于大數據的實時處理領域。
消息隊列
傳統消息隊列與新式消息隊列模式如下圖:
上面是傳統的消息隊列,比如一個用戶要注冊信息,當用戶信息寫入數據庫后,后面還有一些其他流程,比如發送短信,則需要等這些流程處理完成后,再返回給用戶。
而新式隊列,比如一個用戶注冊信息,數據直接丟進數據庫,就直接返回給用戶成功。
使用消息隊列的好處如下:
- 解耦
- 可恢復性
- 緩沖
- 靈活性與峰值處理能力
- 異步通信
消息隊列的模式如下:
①點對點模式:消息生產者發送消息到消息隊列中,然后消息消費者從隊列中取出并且消費消息,消息被消費后,隊列中不在存儲。
所以消息消費者不可能消費到已經被消費的消息;隊列支持存在多個消費者,但是對于一個消息而言,只會有一個消費者可以消費;如果想發給多個消費者,則需要多次發送該條消息。
②發布/訂閱模式(一對多,消費者消費數據之后不會清除消息):消息生產者將消息發布到 Topic 中,同時有多個消息消費者(訂閱)消費該消息。
和點對點的方式不同,發布到 Topic 的消息會被所有的訂閱者消費;但是數據保留是有期限的,默認是 7 天,因為它不是存儲系統。
Kafka 就是這種模式的。有兩種方式,一種是消費者去主動去消費(拉取)消息,而不是生產者推送消息給消費者;另外一種就是生產者主動推送消息給消費者,類似公眾號。
Kafka 基礎架構
Kafka 的架構如下圖:
Kafka 的基礎架構主要有 Broker、生產者、消費者組構成,當前還包括 ZooKeeper。
生產者負責發送消息,Broker 負責緩沖消息,Broker 中可以創建 Topic,每個 Topic 又有 Partition 和 Replication 的概念。
消費者組負責處理消息,同一個消費者組的消費者不能消費同一個 Partition 中的數據。
消費者組主要是提高消費能力,比如之前是一個消費者消費 100 條數據,現在是 2 個消費者消費 100 條數據,可以提高消費能力。
所以消費者組的消費者的個數要小于 Partition 的個數,不然就會有消費者沒有 Partition 可以消費,造成資源的浪費。
注意:不同消費者組的消費者是可以消費相同的 Partition 數據。
Kakfa 如果要組件集群,則只需要注冊到一個 ZooKeeper 中就可以了,ZooKeeper 中還保留消息消費的進度或者說偏移量或者消費位置:
- 0.9 之前的版本偏移量存儲在 ZooKeeper。
- 0.9 之后的版本偏移量存儲在 Kafka中。Kafka 定義了一個系統 Topic,專用用來存儲偏移量的數據。
為什么要改?主要是考慮到頻繁更改偏移量,對 ZooKeeper 的壓力較大,而且 Kafka 本身自己的處理也較復雜。
安裝 Kafka
①Kafka 的安裝只需要解壓安裝包就可以完成安裝。
tar -zxvf kafka_2.11-2.1.1.tgz -C /usr/local/
②查看配置文件:
- [root@es1 config]# pwd
- /usr/local/kafka/config
- [root@es1 config]# ll
- total 84
- -rw-r--r--. 1 root root 906 Feb 8 2019 connect-console-sink.properties
- -rw-r--r--. 1 root root 909 Feb 8 2019 connect-console-source.properties
- -rw-r--r--. 1 root root 5321 Feb 8 2019 connect-distributed.properties
- -rw-r--r--. 1 root root 883 Feb 8 2019 connect-file-sink.properties
- -rw-r--r--. 1 root root 881 Feb 8 2019 connect-file-source.properties
- -rw-r--r--. 1 root root 1111 Feb 8 2019 connect-log4j.properties
- -rw-r--r--. 1 root root 2262 Feb 8 2019 connect-standalone.properties
- -rw-r--r--. 1 root root 1221 Feb 8 2019 consumer.properties
- -rw-r--r--. 1 root root 4727 Feb 8 2019 log4j.properties
- -rw-r--r--. 1 root root 1925 Feb 8 2019 producer.properties
- -rw-r--r--. 1 root root 6865 Jan 16 22:00 server-1.properties
- -rw-r--r--. 1 root root 6865 Jan 16 22:00 server-2.properties
- -rw-r--r--. 1 root root 6873 Jan 16 03:57 server.properties
- -rw-r--r--. 1 root root 1032 Feb 8 2019 tools-log4j.properties
- -rw-r--r--. 1 root root 1169 Feb 8 2019 trogdor.conf
- -rw-r--r--. 1 root root 1023 Feb 8 2019 zookeeper.properties
③修改配置文件 server.properties。
設置 broker.id 這個是 Kafka 集群區分每個節點的唯一標志符。
④設置 Kafka 的數據存儲路徑:
注意:這個目錄下不能有其他非 Kafka 目錄,不然會導致 Kafka 集群無法啟動。
⑤設置是否可以刪除 Topic,默認 Kafka 的 Topic 是不允許刪除的。
⑥Kafka 的數據保留的時間,默認是 7 天。
⑦Log 文件最大的大小,如果 Log 文件超過 1 G 會創建一個新的文件。
⑧Kafka 連接的 ZooKeeper 的地址和連接 Kafka 的超時時間。
⑨默認的 Partition 的個數。
啟動 Kafka
①啟動方式一,Kafka 只能單節點啟動,所以每個 Kakfa 節點都需要手動啟動,下面的方式是以阻塞的方式啟動。
②啟動方式二,守護的方式啟動,推薦使用。
Kafka 操作
①查看當前 Kafka 集群已有的 Topic。
注意:這里連接的 ZooKeeper,而不是連接的 Kafka。
②創建 Topic,指定分片和副本個數。
說明:replication-factor 副本數,replication-factor 分區數,topic 主題名。
如果當前 Kafka 集群只有 3 個 Broker 節點,則 replication-factor 最大就是 3 了,下面的例子創建副本為 4,則會報錯。
③刪除 Topic。
④查看 Topic 信息。
啟動生產者生產消息
Kafka 自帶一個生產者和消費者的客戶端。
①啟動一個生產者,注意此時連的 9092 端口,連接的 Kafka 集群。
②啟動一個消費者,注意此時連接的還是 9092 端口,在 0.9 版本之前連接的還是 2181 端口。
這里我們啟動 2 個消費者來測試一下。
說明:如果不指定消費者組的配置文件的話,默認每個消費者都屬于不同的消費者組。
③發送消息,可以看到每個消費者都能收到消息。
④Kakfa 中的實際數據。
Kafka 架構深入
Kafka 不能保證消息的全局有序,只能保證消息在 Partition 內有序,因為消費者消費消息是在不同的 Partition 中隨機的。
Kafka 的工作流程
Kafka 中的消息是以 Topic 進行分類的,生產者生成消息、消費者消費消息都面向 Topic。
Topic 是一個邏輯上的概念,而 Partition 是物理上的概念。每個 Partition 又有副本的概念。
每個 Partition 對應于一個 Log 文件,該 Log 文件中存儲的就是生產者生成的數據,生產者生成的數據會不斷的追加到該 Log 的文件末端。
且每條數據都有自己的 Offset,消費者都會實時記錄自己消費到了那個 Offset,以便出錯的時候從上次的位置繼續消費,這個 Offset 就保存在 Index 文件中。
Kafka 的 Offset 是分區內有序的,但是在不同分區中是無順序的,Kafka 不保證數據的全局有序。
Kafka 原理
由于生產者生產的消息會不斷追加到 Log 文件的末尾,為防止 Log 文件過大導致數據定位效率低下,Kafka 采用分片和索引的機制,將每個 Partition 分為多個 Segment,每個 Segment 對應 2 個文件 Index 文件和 Log 文件。
兩個文件位于一個相同的文件夾下,文件夾的命名規則為:Topic 名稱+分區序號。
Index 和 Log 的文件的文件名是當前這個索引是最小的數據的 Offset。Kafka 如何快速的消費數據呢?
Index 文件中存儲的數據的索引信息,第一列是 Offset,第二列這個數據所對應的 Log 文件中的偏移量,就像我們去讀文件,使用 seek() 設置當前鼠標的位置一樣,可以更快的找到數據。
如果要去消費 Offset 為 3 的數據,首先通過二分法找到數據在哪個 Index 文件中,然后在通過 Index 中 Offset 找到數據在 Log 文件中的 Offset;這樣就可以快速的定位到數據,并消費。
所以,Kakfa 雖然把數據存儲在磁盤中,但是他的讀取速度還是非??斓?。
Kafka 生產者和消費者
Kafka 生產者
Kafka 的 Partition 分區的作用:Kafka 分區的原因主要就是提供并發提高性能,因為讀寫是 Partition 為單位讀寫的。
那生產者發送消息是發送到哪個 Partition 中呢?
在客戶端中指定 Partition。
輪詢(推薦)消息 1 去 p1,消息 2 去 p2,消息 3 去 p3,消息 4 去 p1,消息 5 去 p2,消息 6 去 p3……
Kafka 如何保證數據可靠性
Kafka 如何保證數據可靠性呢?通過 Ack 來保證!
為保證生產者發送的數據,能可靠的發送到指定的 Topic,Topic 的每個 Partition 收到生產者發送的數據后,都需要向生產者發送 Ack(確認收到),如果生產者收到 Ack,就會進行下一輪的發送,否則重新發送數據。
那么 Kafka 什么時候向生產者發送 Ack?確保 Follower 和 Leader 同步完成,Leader 在發送 Ack 給生產者,這樣才能確保 Leader 掛掉之后,能在 Follower 中選舉出新的 Leader 后,數據不會丟失。
那多少個 Follower 同步完成后發送 Ack?
- 方案 1:半數已經完成同步,就發送 Ack。
- 方案 2:全部完成同步,才發送 Ack(Kafka 采用這種方式)。
采用第二種方案后,設想以下場景:Leader 收到數據,所有的 Follower 都開始同步數據,但是有一個 Follower 因為某種故障,一直無法完成同步,那 Leader 就要一直等下,直到他同步完成,才能發送 Ack。
這樣就非常影響效率,這個問題怎么解決?
Leader 維護了一個動態的 ISR 列表(同步副本的作用),只需要這個列表中的 Follower 和 Leader 同步。
當 ISR 中的 Follower 完成數據的同步之后,Leader 就會給生產者發送 Ack,如果 Follower 長時間未向 Leader 同步數據,則該 Follower 將被剔除 ISR,這個時間閾值也是自定義的。
同樣 Leader 故障后,就會從 ISR 中選舉新的 Leader。
怎么選擇 ISR 的節點呢?首先通信的時間要快,要和 Leader 可以很快的完成通信,這個時間默認是 10s。
然后就看 Leader 數據差距,消息條數默認是 10000 條(后面版本被移除)。
為什么移除?因為 Kafka 發送消息是批量發送的,所以會一瞬間 Leader 接受完成,但是 Follower 還沒有拉取,所以會頻繁踢出和加入 ISR,這個數據會保存到 ZooKeeper 和內存中,所以會頻繁更新 ZooKeeper 和內存。
但是對于某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒必要等 ISR 中的 Follower 全部接受成功。
所以 Kafka 為用戶提供了三種可靠性級別,用戶可以根據可靠性和延遲進行權衡,這個設置在 kafka 的生成中設置:Ack 參數設置。
①Acks 為 0:生產者不等 Ack,只管往 Topic 丟數據就可以了,這個丟數據的概率非常高。
②Ack 為 1:leader 落盤后就會返回 Ack,會有數據丟失的現象,如果 leader 在同步完成后出現故障,則會出現數據丟失。
③Ack 為 -1(all):Leader 和 Follower(ISR)落盤才會返回 Ack,會有數據重復現象,如果在 Leader 已經寫完成,且 Follower 同步完成,但是在返回 Ack 時出現故障,則會出現數據重復現象。
極限情況下,這個也會有數據丟失的情況,比如 Follower 和 Leader 通信都很慢,所以 ISR 中只有一個 Leader 節點。
這個時候,Leader 完成落盤,就會返回 Ack,如果此時 Leader 故障后,就會導致丟失數據。
Kafka 如何保證消費數據一致性
Kafka 如何保證消費數據的一致性?通過 HW 來保證:
- LEO:指每個 Follower 的最大的 Offset。
- HW(高水位):指消費者能見到的最大的 Offset,LSR 隊列中最小的 LEO,也就是說消費者只能看到 1~6 的數據,后面的數據看不到,也消費不了。
避免 Leader 掛掉后,比如當前消費者消費 8 這條數據后,Leader 掛了,此時比如 f2 成為 Leader,f2 根本就沒有 9 這條數據,那么消費者就會報錯,所以設計了 HW 這個參數,只暴露最少的數據給消費者,避免上面的問題。
HW 保證數據存儲的一致性:
①Follower 故障:Follower 發生故障后會被臨時踢出 LSR,待該 Follower 恢復后,Follower 會讀取本地的磁盤記錄的上次的 HW,并將該 Log 文件高于 HW 的部分截取掉,從 HW 開始向 Leader 進行同步,等該 Follower 的 LEO 大于等于該 Partition 的 HW,即 Follower 追上 Leader 后,就可以重新加入 LSR。
②Leader 故障:Leader 發生故障后,會從 ISR 中選出一個新的 Leader,之后,為了保證多個副本之間的數據一致性,其余的 Follower 會先將各自的 Log 文件高于 HW 的部分截掉(新 Leader 自己不會截掉),然后從新的 Leader 同步數據。
注意:這個是為了保證多個副本間的數據存儲的一致性,并不能保證數據不丟失或者不重復。
精準一次(冪等性),保證數據不重復:
- Ack 設置為 -1,則可以保證數據不丟失,但是會出現數據重復(at least once)。
- Ack 設置為 0,則可以保證數據不重復,但是不能保證數據不丟失(at most once)。
但是如果魚和熊掌兼得,該怎么辦?這個時候就就引入了 Exact Once(精準一次)。
在 0.11 版本后,引入冪等性解決 Kakfa 集群內部的數據重復,在 0.11 版本之前,在消費者處自己做處理。
如果啟用了冪等性,則 Ack 默認就是 -1,Kafka 就會為每個生產者分配一個 Pid,并未每條消息分配 Seqnumber。
如果 Pid、Partition、Seqnumber 三者一樣,則 Kafka 認為是重復數據,就不會落盤保存。
但是如果生產者掛掉后,也會出現有數據重復的現象;所以冪等性解決在單次會話的單個分區的數據重復,但是在分區間或者跨會話的是數據重復的是無法解決的。
Kafka 消費者
①消費方式
消息隊列有兩種消費消息的方式,Push(微信公眾號)Pull(kafka)。
Push 模式很難適應消費速率不同的消費者,因為消費發送速率是由 Broker 決定的,他的目標是盡可能以最快的的速度傳遞消息。
但是這樣很容易造成消費者來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 Pull 的方式可以消費者的消費能力以適當的速率消費消息。
Pull 模式的不足之處是如果 Kafka 沒有數據,消費者可能會陷入死循環,一直返回空數據,針對這一點,Kafka 消費者在消費數據時候回傳遞一個 Timeout 參數,如果當時沒有數據可供消費,消費者會等待一段時間在返回。
②分區分配策略
一個消費者組有多個消費者,一個 Topic 有多個 Partition。所以必然會涉及到 Partition 的分配問題,即確定哪個 Partition 由哪個消費者來消費。
Kafka 提供兩種方式,一種是輪詢(RountRobin)對于 Topic 組生效,一種是(Range)對于單個 Topic 生效。
輪詢:前置條件是需要一個消費者里的消費者訂閱的是相同的 Topic。不然就會出現問題;非默認的的方式。
同一個消費者組里的消費者不能同時消費同一個分區,比如三個消費者消費一個 Topic 的 9 個分區。
如果一個消費者組里有 2 個消費者,這個消費者組里同時消費 2 個 Topic,每個 Topic 又有三個 Partition。
首先會把 2 個 Topic 當做一個主題,然后根據 Topic 和 Partition 做 Hash,然后在按照 Hash 排序。然后輪詢分配給一個消費者組中的 2 個消費者。
如果是下面這樣的方式訂閱的呢?比如有 3 個 Topic,每個 Topic 有 3 個 Partition,一個消費者組中有 2 個消費者。
消費者 1 訂閱 Topic1 和 Topic2,消費者 2 訂閱 Topic2 和 Topic3。那么這樣的場景,使用輪詢的方式訂閱 Topic 就會有問題。
如果是下面這種方式訂閱呢?比如有 2 個 Topic,每個 Topic 有 3 個 Partition,一個消費者組有 2 個消費者,消費者 1 訂閱 Topic1,消費者 2 訂閱 Topic2,這樣使用輪詢的方式訂閱 Topic 也會有問題。
所以我們一直強調,使用輪詢的方式訂閱 Topic 的前提是一個消費者組中的所有消費者訂閱的主題是一樣的;所以輪詢的方式不是 Kafka 默認的方式;Range 是按照單個 Topic 來劃分的,默認的分配方式。
Range 的問題會出現消費者數據不均衡的問題。比如下面的例子,一個消費者組訂閱了 2 個 Topic,就會出現消費者 1 消費 4 個 Partition,而另外一個消費者只消費 2 個 Partition。
分區策略什么時候會觸發呢?當消費者組里的消費者個數變化的時候,會觸發分區策略調整,比如消費者里增加消費者,或者減少消費者。
③維護 Offset
由于消費者在消費過程中可能會出現斷電宕機等故障,消費者恢復后,需要從故障前的位置繼續消費,所以消費者需要實施記錄自己消費哪個 Offset,以便故障恢復后繼續消費。
Offset 保存的位置有 2 個,一個 ZooKeeper,一個是 Kafka。首先看下 Offset 保存到 ZooKeeper,由消費者組、Topic、Partition 三個元素確定唯一的 Offset。
所以消費者組中的某個消費者掛掉之后,或者消費者還是可以拿到這個 Offset。
Controller 這個節點和 ZooKeeper 通信,同步數據,這個節點就是誰先起來,誰就先注冊 Controller,誰就是 Controller。其他節點和 Controller 信息保持同步。
④消費者組的案例
修改消費者組 id:
啟動一個消費者發送 3 條數據:
指定消費者組啟動消費者,啟動三個消費者,可以看到每個消費者消費了一條數據。
在演示下不同組可以消費同一個 Topic 的,我們看到 2 個消費者的消費者都消費到同一條數據。再次啟動一個消費者,這個消費者屬于另外一個消費者組。
Kafka 的高效讀寫機制
分布式部署
多節點并行操作。
順序寫磁盤
Kafka 的 producer 生產數據,要寫入到 log 文件中,寫的過程中一直追加到文件末尾,為順序寫,官網有數據表明。
同樣的磁盤,順序寫能到 600M/S,而隨機寫只有 100K/S。這與磁盤的機械結構有關,順序寫之所以快,是因為其省去了大量磁頭尋址的時間。
零復制技術
正常情況下,先把數據讀到內核空間,在從內核空間把數據讀到用戶空間,然后在調操作系統的 IO 接口寫到內核空間,最終在寫到硬盤中。
Kafka 是這樣做的,直接在內核空間流轉 IO 流,所以 Kafka 的性能非常高。
ZooKeeper 在 Kafka 中的作用
Kafka 集群中有一個 Broker 會被選舉為 Controller,負責管理集群 Broker 的上下線,所有的 Topic 的分區副本分配和 Leader 選舉等工作。