成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Kafka 核心全面總結,高可靠高性能核心原理探究

開發 架構
為了提升系統的吞吐,一個 topic 下通常有多個 partition,partition 分布在不同的 Broker 上,用于存儲 topic 的消息,這使 Kafka 可以在多臺機器上處理、存儲消息,給 kafka 提供給了并行的消息處理能力和橫向擴容能力。

你好,我是碼哥,可以叫我靚仔

作者:mo

引言

在探究 Kafka 核心知識之前,我們先思考一個問題:什么場景會促使我們使用 Kafka?  說到這里,我們頭腦中或多或少會蹦出異步解耦和削峰填谷等字樣,是的,這就是 Kafka 最重要的落地場景。

異步解耦:同步調用轉換成異步消息通知,實現生產者和消費者的解耦。想象一個場景,在商品交易時,在訂單創建完成之后,需要觸發一系列其他的操作,比如進行用戶訂單數據的統計、給用戶發送短信、給用戶發送郵件等等。如果所有操作都采用同步方式實現,將嚴重影響系統性能。針對此場景,我們可以利用消息中間件解耦訂單創建操作和其他后續行為。

削峰填谷:利用 broker 緩沖上游生產者瞬時突發的流量,使消費者消費流量整體平滑。對于發送能力很強的上游系統,如果沒有消息中間件的保護,下游系統可能會直接被壓垮導致全鏈路服務雪崩。想象秒殺業務場景,上游業務發起下單請求,下游業務執行秒殺業務(庫存檢查,庫存凍結,余額凍結,生成訂單等等),下游業務處理的邏輯是相當復雜的,并發能力有限,如果上游服務不做限流策略,瞬時可能把下游服務壓垮。針對此場景,我們可以利用 MQ 來做削峰填谷,讓高峰流量填充低谷空閑資源,達到系統資源的合理利用。

通過上述例子可以發現交易、支付等場景常需要異步解耦和削峰填谷功能解決問題,而交易、支付等場景對性能、可靠性要求特別高。那么,我們本文的主角 Kafka 能否滿足相應要求呢?下面我們來探討下。

Kafka 宏觀認知

在探究 Kafka 的高性能、高可靠性之前,我們從宏觀上來看下 Kafka 的系統架構:

圖片

如上圖所示,Kafka 由 Producer、Broker、Consumer 以及負責集群管理的 ZooKeeper 組成,各部分功能如下:

  • Producer:生產者,負責消息的創建并通過一定的路由策略發送消息到合適的 Broker;
  • Broker:服務實例,負責消息的持久化、中轉等功能;
  • Consumer :消費者,負責從 Broker 中拉?。≒ull)訂閱的消息并進行消費,通常多個消費者構成一個分組,消息只能被同組中的一個消費者消費;
  • ZooKeeper:負責 broker、consumer 集群元數據的管理等;(注意:Producer 端直接連接 broker,不在 zk 上存任何數據,只是通過 ZK 監聽 broker 和 topic 等信息)

上圖消息流轉過程中,還有幾個特別重要的概念—主題(Topic)、分區(Partition)、分段(segment)、位移(offset)。

  • topic:消息主題。Kafka 按 topic 對消息進行分類,我們在收發消息時只需指定 topic。
  • partition:分區。為了提升系統的吞吐,一個 topic 下通常有多個 partition,partition 分布在不同的 Broker 上,用于存儲 topic 的消息,這使 Kafka 可以在多臺機器上處理、存儲消息,給 kafka 提供給了并行的消息處理能力和橫向擴容能力。另外,為了提升系統的可靠性,partition 通常會分組,且每組有一個主 partition、多個副本 partition,且分布在不同的 broker 上,從而起到容災的作用。
  • segment:分段。宏觀上看,一個 partition 對應一個日志(Log)。由于生產者生產的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導致數據檢索效率低下,Kafka 采取了分段和索引機制,將每個 partition 分為多個 segment,同時也便于消息的維護和清理。每個 segment 包含一個.log 日志文件、兩個索引(.index、timeindex)文件以及其他可能的文件。每個 Segment 的數據文件以該段中最小的 offset 為文件名,當查找 offset 的 Message 的時候,通過二分查找快找到 Message 所處于的 Segment 中。
  • offset:消息在日志中的位置,消息在被追加到分區日志文件的時候都會分配一個特定的偏移量。offset 是消息在分區中的唯一標識,是一個單調遞增且不變的值。Kafka 通過它來保證消息在分區內的順序性,不過 offset 并不跨越分區,也就是說,Kafka 保證的是分區有序而不是主題有序。

Kafka 高可靠性、高性能探究

在對 Kafka 的整體系統框架及相關概念簡單了解后,下面我們來進一步深入探討下高可靠性、高性能實現原理。

Kafka 高可靠性探究

Kafka 高可靠性的核心是保證消息在傳遞過程中不丟失,涉及如下核心環節:

  • 消息從生產者可靠地發送至 Broker;-- 網絡、本地丟數據;
  • 發送到 Broker 的消息可靠持久化;-- Pagecache 緩存落盤、單點崩潰、主從同步跨網絡;
  • 消費者從 Broker 消費到消息且最好只消費一次 -- 跨網絡消息傳輸 。
消息從生產者可靠地發送至 Broker

為了保障消息從生產者可靠地發送至 Broker,我們需要確保兩點;

  1. Producer 發送消息后,能夠收到來自 Broker 的消息保存成功 ack;
  2. Producer 發送消息后,能夠捕獲超時、失敗 ack 等異常 ack 并做處理。
ack 策略

針對問題 1,Kafka 為我們提供了三種 ack 策略,

  • Request.required.acks = 0:請求發送即認為成功,不關心有沒有寫成功,常用于日志進行分析場景;
  • Request.required.acks = 1:當 leader partition 寫入成功以后,才算寫入成功,有丟數據的可能;
  • Request.required.acks= -1:ISR 列表里面的所有副本都寫完以后,這條消息才算寫入成功,強可靠性保證;

為了實現強可靠的 kafka 系統,我們需要設置 Request.required.acks= -1,同時還會設置集群中處于正常同步狀態的副本 follower 數量 min.insync.replicas>2,另外,設置 unclean.leader.election.enable=false 使得集群中 ISR 的 follower 才可變成新的 leader,避免特殊情況下消息截斷的出現。

消息發送策略

針對問題 2,kafka 提供兩類消息發送方式:同步(sync)發送和異步(async)發送,相關參數如下:

圖片

以 sarama 實現為例,在消息發送的過程中,無論是同步發送還是異步發送都會涉及到兩個協程--負責消息發送的主協程和負責消息分發的 dispatcher 協程。

異步發送

對于異步發送(ack != 0 場景,等于 0 時不關心寫 kafka 結果,后文詳細講解)而言,其流程大概如下:

  1. 在主協程中調用異步發送 kafka 消息的時候,其本質是將消息體放進了一個 input 的 channel,只要入 channel 成功,則這個函數直接返回,不會產生任何阻塞。相反,如果入 channel 失敗,則會返回錯誤信息。因此調用 async 寫入的時候返回的錯誤信息是入 channel 的錯誤信息,至于具體最終消息有沒有發送到 kafka 的 broker,我們無法從返回值得知。
  2. 當消息進入 input 的 channel 后,會有另一個dispatcher 的協程負責遍歷 input,來真正發送消息到特定 Broker 上的主 Partition 上。發送結果通過一個異步協程進行監聽,循環處理 err channel 和 success channel,出現了 error 就記一個日志。因此異步寫入場景時,寫 kafka 的錯誤信息,我們暫時僅能夠從這個錯誤日志來得知具體發生了什么錯,并且也不支持我們自建函數進行兜底處理,這一點在 trpc-go 的官方也得到了承認。

同步發送

同步發送(ack != 0 場景)是在異步發送的基礎上加以條件限制實現的。同步消息發送在 newSyncProducerFromAsyncProducer 中開啟兩個異步協程處理消息成功與失敗的“回調”,并使用 waitGroup 進行等待,從而將異步操作轉變為同步操作,其流程大概如下:

通過上述分析可以發現,kafka 消息發送本質上都是異步的,不過同步發送通過 waitGroup 將異步操作轉變為同步操作。同步發送在一定程度上確保了我們在跨網絡向 Broker 傳輸消息時,消息一定可以可靠地傳輸到 Broker。因為在同步發送場景我們可以明確感知消息是否發送至 Broker,若因網絡抖動、機器宕機等故障導致消息發送失敗或結果不明,可通過重試等手段確保消息至少一次(at least once) 發送到 Broker。另外,Kafka(0.11.0.0 版本后)還為 Producer 提供兩種機制來實現精確一次(exactly once) 消息發送:冪等性(Idempotence)和事務(Transaction)。

小結

通過 ack 策略配置、同步發送、事務消息組合能力,我們可以實現exactly once 語意跨網絡向 Broker 傳輸消息。但是,Producer 收到 Broker 的成功 ack,消息一定不會丟失嗎?為了搞清這個問題,我們首先要搞明白 Broker 在接收到消息后做了哪些處理。

發送到 Broker 的消息可靠持久化

為了確保 Producer 收到 Broker 的成功 ack 后,消息一定不在 Broker 環節丟失,我們核心要關注以下幾點:

  • Broker 返回 Producer 成功 ack 時,消息是否已經落盤;
  • Broker 宕機是否會導致數據丟失,容災機制是什么;
  • Replica 副本機制帶來的多副本間數據同步一致性問題如何解決;

Broker 異步刷盤機制

kafka 為了獲得更高吞吐,Broker 接收到消息后只是將數據寫入 PageCache 后便認為消息已寫入成功,而 PageCache 中的數據通過 linux 的 flusher 程序進行異步刷盤(刷盤觸發條:主動調用 sync 或 fsync 函數、可用內存低于閥值、dirty data 時間達到閥值),將數據順序寫到磁盤。消息處理示意圖如下:

由于消息是寫入到 pageCache,單機場景,如果還沒刷盤 Broker 就宕機了,那么 Producer 產生的這部分數據就可能丟失。為了解決單機故障可能帶來的數據丟失問題,Kafka 為分區引入了副本機制。

Replica 副本機制

Kafka 每組分區通常有多個副本,同組分區的不同副本分布在不同的 Broker 上,保存相同的消息(可能有滯后)。副本之間是“一主多從”的關系,其中 leader 副本負責處理讀寫請求,follower 副本負責從 leader 拉取消息進行同步。分區的所有副本統稱為 AR(Assigned Replicas),其中所有與 leader 副本保持一定同步的副本(包括 leader 副本在內)組成 ISR(In-Sync Replicas),與 leader 同步滯后過多的副本組成 OSR(Out-of-Sync Replicas),由此可見,AR=ISR+OSR。

follower 副本是否與 leader 同步的判斷標準取決于 Broker 端參數 replica.lag.time.max.ms(默認為 10 秒),follower 默認每隔 500ms 向 leader fetch 一次數據,只要一個 Follower 副本落后 Leader 副本的時間不連續超過 10 秒,那么 Kafka 就認為該 Follower 副本與 leader 是同步的。在正常情況下,所有的 follower 副本都應該與 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合為空。

當 leader 副本所在 Broker 宕機時,Kafka 會借助 ZK 從 follower 副本中選舉新的 leader 繼續對外提供服務,實現故障的自動轉移,保證服務可用。為了使選舉的新 leader 和舊 leader 數據盡可能一致,當 leader 副本發生故障時,默認情況下只有在 ISR 集合中的副本才有資格被選舉為新的 leader,而在 OSR 集合中的副本則沒有任何機會(可通過設置 unclean.leader.election.enable 改變)。

當 Kafka 通過多副本機制解決單機故障問題時,同時也帶來了多副本間數據同步一致性問題。Kafka 通過高水位更新機制、副本同步機制、 Leader Epoch 等多種措施解決了多副本間數據同步一致性問題,下面我們來依次看下這幾大措施。

HW 和 LEO

首先,我們來看下兩個和 Kafka 中日志相關的重要概念 HW 和 LEO:

  • HW: High Watermark,高水位,表示已經提交(commit)的最大日志偏移量,Kafka 中某條日志“已提交”的意思是 ISR 中所有節點都包含了此條日志,并且消費者只能消費 HW 之前的數據;
  • LEO: Log End Offset,表示當前 log 文件中下一條待寫入消息的 offset;

如上圖所示,它代表一個日志文件,這個日志文件中有 8 條消息,0 至 5 之間的消息為已提交消息,5 至 7 的消息為未提交消息。日志文件的 HW 為 6,表示消費者只能拉取到 5 之前的消息,而 offset 為 5 的消息對消費者而言是不可見的。日志文件的 LEO 為 8,下一條消息將在此處寫入。

注意:所有副本都有對應的 HW 和 LEO,只不過 Leader 副本比較特殊,Kafka 使用 Leader 副本的高水位來定義所在分區的高水位。換句話說,分區的高水位就是其 Leader 副本的高水位。Leader 副本和 Follower 副本的 HW 有如下特點:

  • Leader HW:min(所有副本 LEO),為此 Leader 副本不僅要保存自己的 HW 和 LEO,還要保存 follower 副本的 HW 和 LEO,而 follower 副本只需保存自己的 HW 和 LEO;
  • Follower HW:min(follower 自身 LEO,leader HW)。

注意:為方便描述,下面Leader HW簡記為HWL,Follower HW簡記為F,Leader LEO簡記為LEOL ,Follower LEO簡記為LEOF。

下面我們演示一次完整的 HW / LEO 更新流程:

  1. 初始狀態

HWL=0,LEOL=0,HWF=0,LEOF=0。

  1. Follower 第一次 fetch
  • Leader收到Producer發來的一條消息完成存儲, 更新LEOL=1;
  • Follower從Leader fetch數據,  Leader收到請求,記錄follower的LEOF =0,并且嘗試更新HWL =min(全部副本LEO)=0;
  • eade返回HWL=0和LEOL=1給Follower,Follower存儲消息并更新LEOF =1, HW=min(LEOF,HWL)=0。
  1. Follower 第二次 fetch
  • Follower再次從Leader fetch數據,  Leader收到請求,記錄follower的LEOF =1,并且嘗試更新HWL =min(全部副本LEO)=1;
  • leade返回HWL=1和LEOL=1給Follower,Leader收到請求,更新自己的 HW=min(LEOF,HWL)=1。

上述更新流程中 Follower 和 Leader 的 HW 更新有時間 GAP。如果 Leader 節點在此期間發生故障,則 Follower 的 HW 和 Leader 的 HW 可能會處于不一致狀態,如果 Followe 被選為新的 Leader 并且以自己的 HW 為準對外提供服務,則可能帶來數據丟失或數據錯亂問題。

KIP-101 問題:數據丟失&數據錯亂 ^參 5^

數據丟失

第 1 步:

  1. 副本 B 作為 leader 收到 producer 的 m2 消息并寫入本地文件,等待副本 A 拉取。
  2. 副本 A 發起消息拉取請求,請求中攜帶自己的最新的日志 offset(LEO=1),B 收到后更新自己的 HW 為 1,并將 HW=1 的信息以及消息 m2 返回給 A。
  3. A 收到拉取結果后更新本地的 HW 為 1,并將 m2 寫入本地文件。發起新一輪拉取請求(LEO=2),B 收到 A 拉取請求后更新自己的 HW 為 2,沒有新數據只將 HW=2 的信息返回給 A,并且回復給 producer 寫入成功。此處的狀態就是圖中第一步的狀態。

第 2 步:

此時,如果沒有異常,A 會收到 B 的回復,得知目前的 HW 為 2,然后更新自身的 HW 為 2。但在此時 A 重啟了,沒有來得及收到 B 的回復,此時 B 仍然是 leader。A 重啟之后會以 HW 為標準截斷自己的日志,因為 A 作為 follower 不知道多出的日志是否是被提交過的,防止數據不一致從而截斷多余的數據并嘗試從 leader 那里重新同步。

第 3 步:

B 崩潰了,min.isr 設置的是 1,所以 zookeeper 會從 ISR 中再選擇一個作為 leader,也就是 A,但是 A 的數據不是完整的,從而出現了數據丟失現象。

問題在哪里?在于 A 重啟之后以 HW 為標準截斷了多余的日志。不截斷行不行?不行,因為這個日志可能沒被提交過(也就是沒有被 ISR 中的所有節點寫入過),如果保留會導致日志錯亂。

數據錯亂

在分析日志錯亂的問題之前,我們需要了解到 kafka 的副本可靠性保證有一個前提:在 ISR 中至少有一個節點。如果節點均宕機的情況下,是不保證可靠性的,在這種情況會出現數據丟失,數據丟失是可接受的。這里我們分析的問題比數據丟失更加槽糕,會引發日志錯亂甚至導致整個系統異常,而這是不可接受的。

第 1 步:

  1. A 和 B 均為 ISR 中的節點。副本 A 作為 leader,收到 producer 的消息 m2 的請求后寫入 PageCache 并在某個時刻刷新到本地磁盤。
  2. 副本 B 拉取到 m2 后寫入 PageCage 后(尚未刷盤)再次去 A 中拉取新消息并告知 A 自己的 LEO=2,A 收到更新自己的 HW 為 1 并回復給 producer 成功。
  3. 此時 A 和 B 同時宕機,B 的 m2 由于尚未刷盤,所以 m2 消息丟失。此時的狀態就是第 1 步的狀態。

第 2 步:

由于 A 和 B 均宕機,而 min.isr=1 并且 unclean.leader.election.enable=true(關閉 unclean 選擇策略),所以 Kafka 會等到第一個 ISR 中的節點恢復并選為 leader,這里不幸的是 B 被選為 leader,而且還接收到 producer 發來的新消息 m3。注意,這里丟失 m2 消息是可接受的,畢竟所有節點都宕機了。

第 3 步:

A 恢復重啟后發現自己是 follower,而且 HW 為 2,并沒有多余的數據需要截斷,所以開始和 B 進行新一輪的同步。但此時 A 和 B 均沒有意識到,offset 為 1 的消息不一致了。

問題在哪里?在于日志的寫入是異步的,上面也提到 Kafka 的副本策略的一個設計是消息的持久化是異步的,這就會導致在場景二的情況下被選出的 leader 不一定包含所有數據,從而引發日志錯亂的問題。

Leader Epoch

為了解決上述缺陷,Kafka 引入了 Leader Epoch 的概念。leader epoch 和 raft 中的任期號的概念很類似,每次重新選擇 leader 的時候,用一個嚴格單調遞增的 id 來標志,可以讓所有 follower 意識到 leader 的變化。而 follower 也不再以 HW 為準,每次奔潰重啟后都需要去 leader 那邊確認下當前 leader 的日志是從哪個 offset 開始的。下面看下 Leader Epoch 是如何解決上面兩個問題的。

數據丟失解決

這里的關鍵點在于副本 A 重啟后作為 follower,不是忙著以 HW 為準截斷自己的日志,而是先發起 LeaderEpochRequest 詢問副本 B 第 0 代的最新的偏移量是多少,副本 B 會返回自己的 LEO 為 2 給副本 A,A 此時就知道消息 m2 不能被截斷,所以 m2 得到了保留。當 A 選為 leader 的時候就保留了所有已提交的日志,日志丟失的問題得到解決。

如果發起 LeaderEpochRequest 的時候就已經掛了怎么辦?這種場景下,不會出現日志丟失,因為副本 A 被選為 leader 后不會截斷自己的日志,日志截斷只會發生在 follower 身上。

數據錯亂解決

這里的關鍵點還是在第 3 步,副本 A 重啟作為 follower 的第一步還是需要發起 LeaderEpochRequest 詢問 leader 當前第 0 代最新的偏移量是多少,由于副本 B 已經經過換代,所以會返回給 A 第 1 代的起始偏移(也就是 1),A 發現沖突后會截斷自己偏移量為 1 的日志,并重新開始和 leader 同步。副本 A 和副本 B 的日志達到了一致,解決了日志錯亂。

小結

Broker 接收到消息后只是將數據寫入 PageCache 后便認為消息已寫入成功,但是,通過副本機制并結合 ACK 策略可以大概率規避單機宕機帶來的數據丟失問題,并通過 HW、副本同步機制、 Leader Epoch 等多種措施解決了多副本間數據同步一致性問題,最終實現了 Broker 數據的可靠持久化。

消費者從 Broker 消費到消息且最好只消費一次

Consumer 在消費消息的過程中需要向 Kafka 匯報自己的位移數據,只有當 Consumer 向 Kafka 匯報了消息位移,該條消息才會被 Broker 認為已經被消費。因此,Consumer 端消息的可靠性主要和 offset 提交方式有關,Kafka 消費端提供了兩種消息提交方式:

正常情況下我們很難實現 exactly once 語意的消息,通常是通過手動提交+冪等實現消息的可靠消費。

Kafka 高性能探究

Kafka 高性能的核心是保障系統低延遲、高吞吐地處理消息,為此,Kafaka 采用了許多精妙的設計:

  • 異步發送
  • 批量發送
  • 壓縮技術
  • Pagecache 機制&順序追加落盤
  • 零拷貝
  • 稀疏索引
  • broker & 數據分區
  • 多 reactor 多線程網絡模型

異步發送

如上文所述,Kafka 提供了異步和同步兩種消息發送方式。在異步發送中,整個流程都是異步的。調用異步發送方法后,消息會被寫入 channel,然后立即返回成功。Dispatcher 協程會從 channel 輪詢消息,將其發送到 Broker,同時會有另一個異步協程負責處理 Broker 返回的結果。同步發送本質上也是異步的,但是在處理結果時,同步發送通過 waitGroup 將異步操作轉換為同步。使用異步發送可以最大化提高消息發送的吞吐能力。

批量發送

Kafka 支持批量發送消息,將多個消息打包成一個批次進行發送,從而減少網絡傳輸的開銷,提高網絡傳輸的效率和吞吐量。Kafka 的批量發送消息是通過以下兩個參數來控制的:

  1. batch.size:控制批量發送消息的大小,默認值為 16KB,可適當增加 batch.size 參數值提升吞吐。但是,需要注意的是,如果批量發送的大小設置得過大,可能會導致消息發送的延遲增加,因此需要根據實際情況進行調整。
  2. linger.ms:控制消息在批量發送前的等待時間,默認值為 0。當 linger.ms 大于 0 時,如果有消息發送,Kafka 會等待指定的時間,如果等待時間到達或者批量大小達到 batch.size,就會將消息打包成一個批次進行發送。可適當增加 linger.ms 參數值提升吞吐,比如 10 ~ 100。

在 Kafka 的生產者客戶端中,當發送消息時,如果啟用了批量發送,Kafka 會將消息緩存到緩沖區中。當緩沖區中的消息大小達到 batch.size 或者等待時間到達 linger.ms 時,Kafka 會將緩沖區中的消息打包成一個批次進行發送。如果在等待時間內沒有達到 batch.size,Kafka 也會將緩沖區中的消息發送出去,從而避免消息積壓。

壓縮技術

Kafka 支持壓縮技術,通過將消息進行壓縮后再進行傳輸,從而減少網絡傳輸的開銷(壓縮和解壓縮的過程會消耗一定的 CPU 資源,因此需要根據實際情況進行調整。),提高網絡傳輸的效率和吞吐量。Kafka 支持多種壓縮算法,在 Kafka2.1.0 版本之前,僅支持 GZIP,Snappy 和 LZ4,2.1.0 后還支持 Zstandard 算法(Facebook 開源,能夠提供超高壓縮比)。這些壓縮算法性能對比(兩指標都是越高越好)如下:

  • 吞吐量:LZ4>Snappy>zstd 和 GZIP,壓縮比:zstd>LZ4>GZIP>Snappy。

在 Kafka 中,壓縮技術是通過以下兩個參數來控制的:

  1. compression.type:控制壓縮算法的類型,默認值為 none,表示不進行壓縮。
  2. compression.level:控制壓縮的級別,取值范圍為 0-9,默認值為-1。當值為-1 時,表示使用默認的壓縮級別。

在 Kafka 的生產者客戶端中,當發送消息時,如果啟用了壓縮技術,Kafka 會將消息進行壓縮后再進行傳輸。在消費者客戶端中,如果消息進行了壓縮,Kafka 會在消費消息時將其解壓縮。注意:Broker 如果設置了和生產者不通的壓縮算法,接收消息后會解壓后重新壓縮保存。Broker 如果存在消息版本兼容也會觸發解壓后再壓縮。

Pagecache 機制&順序追加落盤

kafka 為了提升系統吞吐、降低時延,Broker 接收到消息后只是將數據寫入PageCache后便認為消息已寫入成功,而 PageCache 中的數據通過 linux 的 flusher 程序進行異步刷盤(避免了同步刷盤的巨大系統開銷),將數據順序追加寫到磁盤日志文件中。由于 pagecache 是在內存中進行緩存,因此讀寫速度非???,可以大大提高讀寫效率。順序追加寫充分利用順序 I/O 寫操作,避免了緩慢的隨機 I/O 操作,可有效提升 Kafka 吞吐。

如上圖所示,消息被順序追加到每個分區日志文件的尾部。

零拷貝

Kafka 中存在大量的網絡數據持久化到磁盤(Producer 到 Broker)和磁盤文件通過網絡發送(Broker 到 Consumer)的過程,這一過程的性能直接影響 Kafka 的整體吞吐量。傳統的 IO 操作存在多次數據拷貝和上下文切換,性能比較低。Kafka 利用零拷貝技術提升上述過程性能,其中網絡數據持久化磁盤主要用 mmap 技術,網絡數據傳輸環節主要使用 sendfile 技術。

索引加速之 mmap

傳統模式下,數據從網絡傳輸到文件需要 4 次數據拷貝、4 次上下文切換和兩次系統調用。如下圖所示:

為了減少上下文切換以及數據拷貝帶來的性能開銷,Kafka使用mmap來處理其索引文件。Kafka中的索引文件用于在提取日志文件中的消息時進行高效查找。這些索引文件被維護為內存映射文件,這允許Kafka快速訪問和搜索內存中的索引,從而加速在日志文件中定位消息的過程。mmap 將內核中讀緩沖區(read buffer)的地址與用戶空間的緩沖區(user buffer)進行映射,從而實現內核緩沖區與應用程序內存的共享,省去了將數據從內核讀緩沖區(read buffer)拷貝到用戶緩沖區(user buffer)的過程,整個拷貝過程會發生 4 次上下文切換,1 次CPU 拷貝和 2次 DMA 拷貝。

網絡數據傳輸之 sendfile

傳統方式實現:先讀取磁盤、再用 socket 發送,實際也是進過四次 copy。如下圖所示:

為了減少上下文切換以及數據拷貝帶來的性能開銷,Kafka 在 Consumer 從 Broker 讀數據過程中使用了 sendfile 技術。具體在這里采用的方案是通過 NIO 的 transferTo/transferFrom 調用操作系統的 sendfile 實現零拷貝??偣舶l生 2 次內核數據拷貝、2 次上下文切換和一次系統調用,消除了 CPU 數據拷貝,如下:

稀疏索引

為了方便對日志進行檢索和過期清理,kafka 日志文件除了有用于存儲日志的.log 文件,還有一個位移索引文件.index和一個時間戳索引文件.timeindex 文件,并且三文件的名字完全相同,如下:

Kafka 的索引文件是按照稀疏索引的思想進行設計的。稀疏索引的核心是不會為每個記錄都保存索引,而是寫入一定的記錄之后才會增加一個索引值,具體這個間隔有多大則通過 log.index.interval.bytes 參數進行控制,默認大小為 4 KB,意味著 Kafka 至少寫入 4KB 消息數據之后,才會在索引文件中增加一個索引項。可見,單條消息大小會影響 Kakfa 索引的插入頻率,因此 log.index.interval.bytes 也是 Kafka 調優一個重要參數值。由于索引文件也是按照消息的順序性進行增加索引項的,因此 Kafka 可以利用二分查找算法來搜索目標索引項,把時間復雜度降到了 O(lgN),大大減少了查找的時間。

位移索引文件.index

位移索引文件的索引項結構如下:

相對位移:保存于索引文件名字上面的起始位移的差值,假設一個索引文件為:00000000000000000100.index,那么起始位移值即 100,當存儲位移為 150 的消息索引時,在索引文件中的相對位移則為 150 - 100 = 50,這么做的好處是使用 4 字節保存位移即可,可以節省非常多的磁盤空間。

文件物理位置:消息在 log 文件中保存的位置,也就是說 Kafka 可根據消息位移,通過位移索引文件快速找到消息在 log 文件中的物理位置,有了該物理位置的值,我們就可以快速地從 log 文件中找到對應的消息了。下面我用圖來表示 Kafka 是如何快速檢索消息:

假設 Kafka 需要找出位移為 3550 的消息,那么 Kafka 首先會使用二分查找算法找到小于 3550 的最大索引項:[3528, 2310272],得到索引項之后,Kafka 會根據該索引項的文件物理位置在 log 文件中從位置 2310272 開始順序查找,直至找到位移為 3550 的消息記錄為止。

時間戳索引文件.timeindex

Kafka 在 0.10.0.0 以后的版本當中,消息中增加了時間戳信息,為了滿足用戶需要根據時間戳查詢消息記錄,Kafka 增加了時間戳索引文件,時間戳索引文件的索引項結構如下:

時間戳索引文件的檢索與位移索引文件類似,如下快速檢索消息示意圖:

broker & 數據分區

Kafka 集群包含多個 broker。一個 topic 下通常有多個 partition,partition 分布在不同的 Broker 上,用于存儲 topic 的消息,這使 Kafka 可以在多臺機器上處理、存儲消息,給 kafka 提供給了并行的消息處理能力和橫向擴容能力。

多 reactor 多線程網絡模型

多 Reactor 多線程網絡模型 是一種高效的網絡通信模型,可以充分利用多核 CPU 的性能,提高系統的吞吐量和響應速度。Kafka 為了提升系統的吞吐,在 Broker 端處理消息時采用了該模型,示意如下:

圖片

SocketServer和KafkaRequestHandlerPool是其中最重要的兩個組件:

  • SocketServer:實現 Reactor 模式,用于處理多個 Client(包括客戶端和其他 broker 節點)的并發請求,并將處理結果返回給 Client
  • KafkaRequestHandlerPool:Reactor 模式中的 Worker 線程池,里面定義了多個工作線程,用于處理實際的 I/O 請求邏輯。

整個服務端處理請求的流程大致分為以下幾個步驟:

  1. Acceptor 接收客戶端發來的請求
  2. 輪詢分發給 Processor 線程處理
  3. Processor 將請求封裝成 Request 對象,放到 RequestQueue 隊列
  4. KafkaRequestHandlerPool 分配工作線程,處理 RequestQueue 中的請求
  5. KafkaRequestHandler 線程處理完請求后,將響應 Response 返回給 Processor 線程
  6. Processor 線程將響應返回給客戶端

其他知識探究

負載均衡

生產者負載均衡

Kafka 生產端的負載均衡主要指如何將消息發送到合適的分區。Kafka 生產者生產消息時,根據分區器將消息投遞到指定的分區中,所以 Kafka 的負載均衡很大程度上依賴于分區器。Kafka 默認的分區器是 Kafka 提供的 DefaultPartitioner。它的分區策略是根據 Key 值進行分區分配的:

  • 如果 key 不為 null:對 Key 值進行 Hash 計算,從所有分區中根據 Key 的 Hash 值計算出一個分區號;擁有相同 Key 值的消息被寫入同一個分區,順序消息實現的關鍵;
  • 如果 key 為 null:消息將以輪詢的方式,在所有可用分區中分別寫入消息。如果不想使用 Kafka 默認的分區器,用戶可以實現 Partitioner 接口,自行實現分區方法。

消費者負載均衡

在 Kafka 中,每個分區(Partition)只能由一個消費者組中的一個消費者消費。當消費者組中有多個消費者時,Kafka 會自動進行負載均衡,將分區均勻地分配給每個消費者。在 Kafka 中,消費者負載均衡算法可以通過設置消費者組的 partition.assignment.strategy 參數來選擇。目前主流的分區分配策略以下幾種:

  • range: 在保證均衡的前提下,將連續的分區分配給消費者,對應的實現是 RangeAssignor;
  • round-robin:在保證均衡的前提下,輪詢分配,對應的實現是 RoundRobinAssignor;
  • 0.11.0.0 版本引入了一種新的分區分配策略 StickyAssignor,其優勢在于能夠保證分區均衡的前提下盡量保持原有的分區分配結果,從而避免許多冗余的分區分配操作,減少分區再分配的執行時間。

集群管理

Kafka 借助 ZooKeeper 進行集群管理。Kafka 中很多信息都在 ZK 中維護,如 broker 集群信息、consumer 集群信息、 topic 相關信息、 partition 信息等。Kafka 的很多功能也是基于 ZK 實現的,如 partition 選主、broker 集群管理、consumer 負載均衡等,限于篇幅本文將不展開陳述,這里先附一張網上截圖大家感受下:

圖片

參考文獻

  1. https://www.cnblogs.com/arvinhuang/p/16437948.html
  2. https://segmentfault.com/a/1190000039133960
  3. http://matt33.com/2018/11/04/kafka-transaction/
  4. https://blog.51cto.com/u_14020077/5836698
  5. https://t1mek1ller.github.io/2020/02/15/kafka-leader-epoch/
  6. https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
  7. https://xie.infoq.cn/article/c06fea629926e2b6a8073e2f0
  8. https://xie.infoq.cn/article/8191412c8da131e78cbfa6600
  9. https://mp.weixin.qq.com/s/iEk0loXsKsMO_OCVlUsk2Q
  10. https://cloud.tencent.com/developer/article/1657649
  11. https://www.cnblogs.com/vivotech/p/16347074.html
責任編輯:武曉燕 來源: 碼哥字節
相關推薦

2021-06-21 17:00:05

云計算Hologres云原生

2024-07-12 08:42:58

Redis高性能架構

2009-08-12 17:48:56

存儲高性能計算曙光

2011-07-01 09:36:30

高性能Web

2025-01-27 11:49:55

2022-06-28 08:42:03

磁盤kafka高性能

2010-03-11 15:31:11

核心交換機

2021-12-26 00:03:25

Spark性能調優

2020-11-02 09:35:04

ReactHook

2021-09-06 08:31:11

Kafka架構主從架構

2019-09-03 09:41:48

運維架構技術

2020-01-07 16:16:57

Kafka開源消息系統

2009-11-17 10:14:27

核心路由器

2020-12-03 08:14:45

Axios核心Promise

2024-08-15 06:51:31

2015-09-23 09:35:42

高性能高可靠塊存儲

2023-03-09 10:22:00

SpringBootRabbitMQ

2019-09-12 08:50:37

Kafka分布式系統服務器

2020-06-24 08:43:29

5G核心網通信

2025-04-03 00:20:00

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久久久国产一区二区三区四区 | 亚洲精品一区在线观看 | 国产精品99久久久久久动医院 | 国产激情一区二区三区 | 亚洲精品欧美 | av网址在线 | 色一情一乱一伦一区二区三区 | 国产超碰人人爽人人做人人爱 | 91视频在线观看 | 一级黄色毛片子 | 精品国产91乱码一区二区三区 | 日韩中文字幕视频 | 欧美成人精品激情在线观看 | 欧美大片一区二区 | 毛片一区二区三区 | 成人精品一区二区 | 久久91精品国产一区二区 | 精品国产乱码久久久久久牛牛 | 久久免费福利 | 精品国产欧美一区二区三区不卡 | 欧美精品一区二区三区四区五区 | 久久999| 蜜月va乱码一区二区三区 | 国产精品欧美一区喷水 | 国产美女一区二区三区 | 日韩久久久一区二区 | 精品在线播放 | 精一区二区 | 亚洲精品日韩一区二区电影 | 在线高清免费观看视频 | 亚洲高清视频一区 | 91精品久久久久久久 | 天天综合操 | avav在线看| 成人精品国产免费网站 | 99精品欧美一区二区三区综合在线 | 一区二区在线免费观看视频 | 天天av综合| www久久国产 | 日本a∨视频 | 中文字幕在线一区 |