深度剖析:Kafka 請求是如何處理? 看完這篇文章徹底懂了!
開篇三問: Kafka Broker 端網絡架構和請求處理到底是使用了哪些高大上的技術?它到底解決了什么問題?究竟是怎么解決的?
只有了解了這些, 我們才能深刻掌握 Kafka 服務端設計精髓所在,更加深刻理解一個高并發、高性能服務端架構該如何設計。
相信使用過 Kafka 的朋友都知道其吞吐量可以高達百萬,但很少人理解其中的設計原理。認真讀完這篇文章,你會對Kafka Broker請求處理流程和網絡架構設計實現細節,有更加深刻的理解。
這篇文章干貨很多,希望大家可以耐心讀完。
01 總體概述
要想理解 Kafka Broker 請求處理架構設計,我們需要從簡單請求處理模型來說起。
對于日常系統開發,我們都知道是基于 Request/Response 的模式來實現的, 對于 Kafka 來說, 無論是 Producer 端、Consumer 端 還是 Broker 端,他們之間的請求交互也都是基于「Request/Response」模式來完成的。比如,客戶端會通過網絡發送消息生產請求給 Broker,而 Broker 處理完成后,會發送對應的響應給到客戶端。
下面,我會從自我設計角度出發,如果是我們會如何設計,帶你一步步演化出來「kafka Broker 的網絡請求處理」架構。
在這個過程中,你會看到 Kafka 在處理請求的過程中會遇到哪些高性能和高并發問題,以及架構為什么要這樣演進,從而理解 Kafka 這么設計的意義和精妙之處。
02 順序處理模式
我們從最簡單的網絡編程思路處理方式講起。
因為對于 Kafka Broker 來說就是用來接收生產者發送過來的請求,那這個時候最簡單的實現大概是這樣的:
如上述代碼所示:我們可以理解 Kafka 每個服務器啟動起來后就是一個 while 循環, 不斷的 accept 生產者提交上來的請求, 然后進行處理并存儲到磁盤上,這種方式實現最簡單,也非常好理解,但是這種方式存在2個致命的缺陷?
1)請求阻塞:只能順序處理每個請求,即每個請求都必須等待前一個請求處理完畢才能得到處理。
2)吞吐量非常差:由于只能順序處理,無法并發,效率太低,所以吞吐量非常差,只適合請求發送非常不頻繁的系統。
從上面來看很明顯,如果你的 Kafka 系統請求并發量很大,意味著要處理的時間就會越久。那按照前面我們提到的 Kafka「吞吐量」的標準,這個方案遠遠無法滿足我們對高性能、高并發的要求。
那有什么更好的方案可以快速處理請求嗎?
接下來我們可以試著采取這個方案:獨立線程異步處理模式。
03 多線程異步處理模式
既然同步方式會阻塞請求,吞吐量差, 我們可以嘗試著使用獨立線程異步方式進行處理, 即經典的 connection per thread 模型, 那這個時候的實現大概是這樣的:
如上述代碼所示:同上還是一個 while 循環不斷的 accept 生產者提交上來的請求,但是這時候 Kafka 系統會為每個請求都創建一個「單獨的線程」來處理。
這個實現方案的好處就是:
1)吞吐量稍強:相對上面同步方式的方案,一定程度上極大地提高了服務器的吞吐量。
2)非阻塞:它是完全異步的,每個請求的處理都不會阻塞下一個請求。
但同樣缺陷也同樣很明顯:即為每個請求都創建線程的做法開銷很大,在某些高并發場景下會壓垮整個服務。可見,這個方案也只適用于請求發送頻率很低的業務場景。還是無法滿足我們對高性能、高并發的要求。
既然這種方案還是不能滿足, 那么我們究竟該使用什么方案來支撐高并發呢?
這個時候我們可以想想我們日常開發用到的7層負載Nginx或者Redis在處理高并發請求的時候是使用什么方案呢?
從上面啟發你可以看出,提升系統 I/O 并發性能的關鍵思路就是:事件驅動。
想必大家已經猜到了,沒錯,就是「多路復用」。那么Kafka 是不是也是采用這種方案來實現呢?
這里我們先考慮采用基于「事件驅動」的設計方案,當有事件觸發時,才會調用處理器進行數據處理。
04 Reactor 模式
在高性能網絡編程領域,有一個非常著名的模式——Reactor模式。那么何為「Reactor模式」,首先它是基于事件驅動的,有一個或多個并發輸入源,有一個Service Handler,有多個Request Handler;這個Service Handler會同步的將輸入的請求輪詢地分發給相應的Request Handler進行處理。
借助于 Doug Lea(就是那位讓人無限景仰的大爺)的 "Scalable IO in Java" 中講述的Reactor模式。
"Scalable IO in Java" 的地址是:
??http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf ??
簡單來說,Reactor 模式特別適合應用于處理多個客戶端并發向服務器端發送請求的場景。這里借用大神 PDF 中的一幅圖來說明 Reactor 架構:
從上面這張圖中,我們可以看出多個客戶端會發送請求給到 Reactor。Reactor 有個請求分發線程 Dispatcher,也就是圖中的綠色的 Acceptor,它會將不同的請求下分發到多個工作線程中處理。
在這個架構中,Acceptor 線程只是用來進行請求分發,所以非常輕量級,因此會有很高的吞吐量。而這些工作線程可以根據實際系統負載情況動態調節系統負載能力,從而達到請求處理的平衡性。
基于上面的 Reactor 架構, 我們來看看如果是我們該如何設計 Kafka 服務端的架構?
1)這里我們采用多路復用方案,Reactor 設計模式,并引用 Java NIO 的方式可以更好的解決上面并發請求問題。
2)當 Client 端將請求發送到 Server 端的時候, 首先在 Server 端有個多路復用器(Selector),然后會啟動一個 Accepter 線程將 OP_CONNECT 事件注冊到多路復用器上, 主要用來監聽連接事件到來。
3)當監聽到連接事件后,就會在多路復用器上注冊 OP_READ 事件, 這樣 Cient 端發送過來的請求, 都會被接收到。如果請求特別多的話, 我們這里進行優化, 創建一個 Read HandlePool 線程池。
4)當 Read HandlePool 線程池接收到請求數據后,最終會交給 Handler ThreadPool 線程池進行后續處理。比如如果是生產者發送過來的請求,肯定會解析請求體,處理并最終存儲到磁盤中,待處理完后要返回處理結果狀態, 這時候就由它在多路復用器上注冊 OP_WRITE 事件來完成。這樣多路復用器遍歷到OP_WRITE 事件后就會將請求返回到 Client 端。
5)在上圖中我們看到在整個流程中還有一個 MessageQueue 的隊列組件存在, 為什么要加這個組件呢? 我們可以想象一下, 如果請求量非常大,直接交給 Handler ThreadPool 線程池進行處理, 可能會出現該線程池處理不過來的情況發生,如果處理不過來,也會出現阻塞瓶頸。所以這里我們在 Server 端內部也設計一個消息隊列, 起到一個緩沖的作用,Handler ThreadPool 線程池會根據自己的負載能力進行處理。
以上就是我們引用了「多路復用」的設計方案,但是 Kafka Broker 端就是這樣的架構設計方案嗎?如果我們是 Kafka 系統架構的設計者,采用這樣的架構設計方案會不會還是有什么問題,有沒有哪個環節會出現系統性能瓶頸呢?
這是個值得思考的問題, 很考驗你的架構設計能力。
細心的讀者可能會發現:對于 Kafka 這種超高并發系統來說,一個 Selector 多路復用器是 Hold 不住的,從上圖可以得出,我們監聽這些連接、接收請求、處理響應結果都是同一個 Selector 在進行處理,很容易成為系統性能瓶頸。
接下來,我們將進一步進行優化,為了減輕當前 Selector 的處理負擔,引入另外一個Selector 處理隊列,如下圖所示:
1)首先上圖是目前我認為最接近 Kafka Broker 真實架構設計方案的。
2)整體架構跟上一版的類似,只不過這里多引入了一個多 Selector 處理隊列,原來的 Selector 只負責監聽連接, 這時候有讀者就會有疑問,請求量超級大的時候,一個 Selector 會不會成為瓶頸呢? 這里可以大可放心, 這時候它的工作非常單一,是完全能 hold 住的。
3)那么對于我們接收請求、處理請求、返回狀態操作都會交由多 Selector 處理隊列,至于這里到底需要多少個 Selector,又會跟什么參數和配置有關系,我們后續再進行分析,總之這里記住有多個 Selector 就行了,這樣系統壓力就會被分散處理。
4)另外我們要搞清楚的一點就是對于 Kafka 服務端指的是每個 Broker 節點,如果我們的服務集群總共有10個節點, 每個節點內部都是上面的這樣的架構,這樣我們就有理由相信如果采用這樣的架構設計方案,是可以支持高并發和高性能的。
架構設計方案演進到這里,基本上已經差不多了,接下來我們看看 Kafka 真實超高并發的網絡架構是如何設計的。
05 Kafka 超高并發網絡架構
在上面 Kafka 高性能、高吞吐量架構演進的時候,我們提到了 Java NIO 以及 Reactor 設計模式。實際上,搞透了「Kafka 究竟是怎么使用 NIO 來實現網絡通信的」,不僅能讓我們掌握 Kafka 請求處理全流程處理,也能讓我們對 Reactor 設計模式有更深的理解,還能幫助我們解決很多實際問題。
在<kafka 三高架構設計剖析> 和 <聊聊 Kafka Producer 那點事> 中也講過關于 Kafka 網絡架構,讀者可以去查閱,今天我們再次深入的對其進行剖析。
那么接下來我們就來深入剖析下 Kafka 的 NIO 通訊機制吧。
我們先從整體上看一下完整的網絡通信層架構,如下圖所示:
1)從上圖中我們可以看出,Kafka 網絡通信架構中用到的組件主要由兩大部分構成:SocketServer 和 RequestHandlerPool。
2)SocketServer 組件是 Kafka 超高并發網絡通信層中最重要的子模塊。它包含 Acceptor 線程、Processor 線程和 RequestChannel 等對象,都是網絡通信的重要組成部分。它主要實現了 Reactor 設計模式,主要用來處理外部多個 Clients(這里的 Clients 可能包含 Producer、Consumer 或其他 Broker)的并發請求,并負責將處理結果封裝進 Response 中,返還給 Clients。
3)RequestHandlerPool 組件就是我們常說的 I/O 工作線程池,里面定義了若干個 I/O 線程,主要用來執行真實的請求處理邏輯。
4)這里注意的是:跟 RequestHandler 相比, 上面所說的Acceptor、Processor 線程 還有 RequestChannel 等都不做請求處理, 它們只是請求和響應的「搬運工」。
接下來我們來具體聊聊SocketServer中的實現原理,這里先來講講:
- Acceptor 線程
- Processor 線程
以Kafka 2.5版本,源碼位置:
1、我們先來聊聊 Acceptor 線程
在經典的 Reactor 設計模式有個 「Dispatcher」 的角色,主要用來接收外部請求并分發給下面的實際處理線程。通過上面分析我們知道在 Kafka 網絡架構設計中,這個 Dispatcher 就是「Acceptor 線程」。
Acceptor 線程是用來接收和創建外部 TCP 連接的線程。在Broker 端每個 SocketServer 實例只會創建一個 Acceptor 線程。它的主要功能就是創建連接,并將接收到的 Request 請求傳遞給下游的 Processor 線程處理。
/**
* Thread that accepts and configures new connections. There is one of these per endpoint.
*/
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
// 1. 創建底層的NIO Selector對象,用來監聽連接創建請求、讀寫請求等
private val nioSelector = NSelector.open()
// 2. Broker端創建對應的ServerSocketChannel實例,然后將Channel注冊到Selector對象上
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
// 3. 創建Processor線程池
private val processors = new ArrayBuffer[Processor]()
......
/**
* Accept loop that checks for new connection attempts
*/
def run(): Unit = {
//注冊OP_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
// 等待Acceptor線程啟動完成
startupComplete()
try {
// 當前使用的Processor序號,從0開始
var currentProcessorIndex = 0
while (isRunning) {
try {
// 每500毫秒獲取一次就緒I/O事件
val ready = nioSelector.select(500)
// 如果有I/O事件準備就緒
if (ready > 0) {
........
// 調用accept方法創建Socket連接
accept(key).foreach { socketChannel =>
........
// 指定由哪個Processor線程進行處理
processor = synchronized {
.........
processors(currentProcessorIndex)
}
// 更新Processor線程序號
currentProcessorIndex += 1
}
.........
}
}
這里重點看下 Acceptor 線程中三個非常關鍵且重要的屬性和方法:
1.1、nioSelector:它就是我們所熟悉的 Java NIO 庫中的 Selector 對象實例,所有網絡通信組件實現 Java NIO 機制的基礎。
1.2、processors:通過源碼我們可以知道在Acceptor 線程在初始化時,需要創建對應的 Processor 線程池。由此可以得出,Processor 線程是在 Acceptor 線程中管理和維護的。
1.3、run方法:它是處理 Reactor 模式中分發邏輯的主要實現方法。
1)從上述源碼中,我們可以看出 Acceptor 線程主要使用了 Java NIO 的 Selector 以及 SocketChannel 的方式循環的輪詢準備就緒的 I/O 事件。
2)這里的 I/O 事件主要是指網絡連接創建事件即:SelectionKey.OP_ACCEPT。
3)這樣注冊好事件后,一旦后續接收到連接請求后,Acceptor 線程就會指定一個 Processor 線程,并將該請求交給它并創建網絡連接用于后續處理。
2、接下來我們來聊聊 Processor 線程:
從上面分析我們知道 Acceptor 只是做了請求入口連接處理的,那么,真正創建網絡連接以及分發網絡請求是由 Processor 線程來完成的。
override def run(): Unit = {
// 等待Processor線程啟動完成
startupComplete()
try {
while (isRunning) {
try {
// 創建新連接
configureNewConnections()
// 發送Response
processNewResponses()
// 執行NIO poll,獲取對應SocketChannel上準備就緒的I/O操作
poll()
// 將接收到的Request放入Request隊列
processCompletedReceives()
.......
} catch {
.........
}
}
} finally {
........
}
}
........
// 默認連接對接大小
val ConnectionQueueSize = 20
// 保存要創建的新連接信息
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
// 一個臨時 Response 隊列
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
// Response 隊列
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
從上面 Processor 線程源碼,可以看出 Kafka 的代碼邏輯實現的非常好,各個子方法的邊界非常清楚。
這里我們就不展開源碼分析了, 更深入詳細的等到源碼分析專題再進行。我們簡單的看下 Processor 線程初始化時要做的事情。
看上面代碼最后部分,我們知道每個 Processor 線程在創建時都會創建 3 個隊列。
1)newConnections 隊列: 它主要是用來保存要創建的新連接信息,也就是SocketChannel 對象,目前是硬編碼隊列長度大小為20。每當 Processor 線程接收到新的連接請求時,都會將對應的 SocketChannel 對象放入隊列,等到后面創建連接時,從該隊列中獲取 SocketChannel,然后注冊新的連接。
2)inflightResponse 隊列:它是一個臨時的 Response 隊列, 當 Processor 線程將 Repsonse 返回給 Client 之后,要將 Response 放入該隊列。它存在的意義:由于有些 Response 回調邏輯要在 Response 被發送回 Request 發送方后,才能執行,因此需要暫存到臨時隊列。
3)ResponseQueue 隊列:它主要是存放需要返回給Request 發送方的所有 Response 對象。通過源碼得知:每個 Processor 線程都會維護自己的 Response 隊列。
06 請求處理核心流程剖析
上面深入的剖析了 Kafka 超高并發網絡架構 以及 SocketServer 中的 Acceptor 線程跟 Processor 線程的實現原理, 接下來我們來將請求處理核心流程給串起來。
只有搞透這部分的實現原理,才能幫助我們有針對性的進行 Broker端請求處理的性能調優。
比如:在上面網絡架構圖,有兩個參數跟整個流程有關系,分別是num.network.threads、num.io.threads。如果我們不掌握請求處理的整個流程,就不能更好的對此進行調整,來達到更高的性能要求。
其中 num.io.threads 就是 I/O 工作線程池的大小配置,即 KafkaRequestHandlerPool 線程池,它才是「真正處理 Kafka 請求」的地方。
源碼位置:
/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int, //I/O線程序號
brokerId: Int, //所在Broker序號,即broker.id值
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger, //I/O線程池大小
val requestChannel: RequestChannel, //請求處理通道
apis: KafkaApis, //KafkaApis類,用于真正實現請求處理邏輯的類
time: Time) extends Runnable with Logging {
......
def run(): Unit = {
while (!stopped) {
val startSelectTime = time.nanoseconds
// 從請求隊列中獲取下一個待處理的請求
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
// 統計線程空閑時間
val idleTime = endTime - startSelectTime
// 更新線程空閑百分比指標
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
req match {
// 當關閉線程請求處理
case RequestChannel.ShutdownRequest =>
......
// 當普通請求到來時
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
// 由KafkaApis.handle方法執行相應處理邏輯
apis.handle(request)
} catch {
....
} finally {
// 釋放請求對象資源
request.releaseBuffer()
}
case null => // continue
}
}
shutdownComplete.countDown()
}
}
下面我們結合 Kafka 超高并發網絡架構圖來講解下一個完整請求處理核心流程:
1)Clients 發送請求給 Acceptor 線程。
2)Acceptor 線程會創建 NIO Selector 對象,并創建 ServerSocketChannel 實例,然后將Channel 和 OP_ACCEPT 事件到 Selector 多路復用器上。
3)Acceptor 線程還會默認創建3個大小的 Processor 線程池,參數:num.network.threads, 并輪詢的將請求對象 SocketChannel 放入到連接隊列中(newConnections)。
4)這時候連接隊列就源源不斷有請求數據了,然后不停地執行 NIO Poll, 獲取對應 SocketChannel 上已經準備就緒的 I/O 事件。
5)Processor 線程向 SocketChannel 注冊了 OP_READ/OP_WRITE 事件,這樣 客戶端發過來的請求就會被該 SocketChannel 對象獲取到,具體就是CompleteReceives。
6)這個時候客戶端就可以源源不斷進行請求發送了,服務端通過 Selector NIO Poll 不停的獲取準備就緒的 I/O 事件。
7)然后根據Channel中獲取已經完成的 Receive 對象,構建 Request 對象,并將其存入到 Requestchannel 的 RequestQueue 請求隊列中 。
8)這個時候就該 I/O 線程池上場了,KafkaRequestHandler 線程循環地從請求隊列中獲取 Request 實例,然后交由KafkaApis 的 handle 方法,執行真正的請求處理邏輯,并最終將數據存儲到磁盤中。
9)待處理完請求后,KafkaRequestHandler 線程會將 Response 對象放入 Processor 線程的 Response 隊列。
10)然后 Processor 線程通過 Request 中的 ProcessorID 不停地從 Response 隊列中來定位并取出 Response 對象,返還給 Request 發送方。
至此,我們深入剖析Kafka請求處理「核心流程」。
07 系統調優
搞透了 Kafka 超高并發網絡架構設計和請求處理核心流程后,我們來聊聊 Broker 端參數調優。
對 Kafka 而言,性能一般是指吞吐量和延時。所以高吞吐量、低延時是我們調優 Kafka 集群的主要目標。
Broker 端調優主要就是合理地設置 Broker 端參數值,以匹配你的生產環境。另外還有一點要說明的就是「保證服務器端和客戶端版本的一致」,做到這一點,就能獲得很多性能收益了。
num.network.threads
創建 Processor 處理網絡請求線程個數,建議設置為 Broker 當前CPU核心數*2,這個值太低經常出現網絡空閑太低而缺失副本。
num.io.threads
創建 KafkaRequestHandler 處理具體請求線程個數,建議設置為Broker磁盤個數*2。
num.replica.fetchers
建議設置為CPU核心數/4,適當提高可以提升CPU利用率及 Follower同步 Leader 數據當并行度
compression.type
建議采用lz4壓縮類型,壓縮可以提升CPU利用率同時可以減少網絡傳輸數據量。
log.flush.xxx
log.flush.scheduler.interval.ms
log.flush.interval.ms
log.flush.interval.messages
這幾個參數表示日志數據刷新到磁盤的策略,應該保持默認配置,刷盤策略讓操作系統去完成,由操作系統來決定什么時候把數據刷盤;如果設置來這個參數,可能對吞吐量影響非常大
auto.leader.rebalance.enable
表示是否開啟leader自動負載均衡,默認true;我們應該把這個參數設置為false,因為自動負載均衡不可控,可能影響集群性能和穩定。
08 總結
這里,我們一起來總結一下這篇文章的重點。
1、對于 Kafka 這樣一個優秀的服務端系統架構來說,應該遵循高可用、高性能、高并發 3 大原則。
2、本文從最簡單的網絡編程思路出發一步一步演進到 Reactor 設計模式,假設我們就是 Kafka 架構的設計者,我們該如何設計其服務端網絡架構。
3、通過本文的深度剖析,提升系統I/O性能的核心是基于「事件驅動」模型實現。
4、在剖析完服務端網絡架構后,我們也深度剖析了 SocketServer中兩個最重要的線程:Acceptor 線程和 Processor 線程。
5、接著我們結合 Kafka 超高并發網絡架構圖又梳理了 Kafka 請求處理核心流程。
6、最后給大家分析并做了 Broker 端系統調優的方案。