RocketMQ這樣做,壓測后性能提高30%
從官方這邊獲悉,RocketMQ在4.9.1版本中對消息發(fā)送進(jìn)行了大量的優(yōu)化,性能提升十分顯著,接下來請跟著我一起來欣賞大神們的杰作。
根據(jù)RocketMQ4.9.1的更新日志,我們從中提取到關(guān)于消息發(fā)送性能優(yōu)化的【Issues:2883】,詳細(xì)鏈接如下:
具體優(yōu)化點如截圖所示:
首先先嘗試對上述優(yōu)化點做一個簡單的介紹:
- 對WaitNotifyObject的鎖進(jìn)行優(yōu)化(item2)
- 移除HAService中的鎖(item3)
- 移除GroupCommitService中的鎖(item4)
- 消除HA中不必要的數(shù)組拷貝(item5)
- 調(diào)整消息發(fā)送幾個參數(shù)的默認(rèn)值(item7)
- sendMessageThreadPoolNums
- useReentrantLockWhenPutMessage
- flushCommitLogTimed
- endTransactionThreadPoolNums
減少瑣的作用范圍(item8-12)
通過閱讀上述的變更,總結(jié)出優(yōu)化手段主要包括如下三點:
- 移除不必要的鎖
- 降低鎖粒度(范圍)
- 修改消息發(fā)送相關(guān)參數(shù)
接下來結(jié)合源碼,從中挑選具有代表性功能進(jìn)行詳細(xì)剖析,一起領(lǐng)悟Java高并發(fā)編程的魅力。
1、移除不必要的鎖
本次性能優(yōu)化,主要針對的是RocketMQ同步復(fù)制場景。
我們首先先來簡單介紹一下RocketMQ主從同步在編程方面的技巧。
RocketMQ主節(jié)點將消息寫入內(nèi)存后, 如果采用同步復(fù)制,需要等待從節(jié)點成功寫入后才能向消息發(fā)送客戶端返回成功,在代碼編寫方面也極具技巧性,時許圖如下圖所示:
溫馨提示:在RocketMQ4.7版本開始對消息發(fā)送進(jìn)行了優(yōu)化,同步消息發(fā)送模型引入了jdk的CompletableFuture實現(xiàn)消息的異步發(fā)送。
核心步驟解讀:
- 消息發(fā)送線程調(diào)用Commitlog的aysncPutMessage方法寫入消息。
- Commitlog調(diào)用submitReplicaRequest方法,將任務(wù)提交到GroupTransferService中,并獲取一個Future,實現(xiàn)異步編程。值得注意的是這里需要等待,待數(shù)據(jù)成功寫入從節(jié)點(內(nèi)部基于CompletableFuture機制的內(nèi)部線程池ForkJoin)。
- GroupTransferService中對提交的任務(wù)依次進(jìn)行判斷,判斷對應(yīng)的請求是否已同步到從節(jié)點。
- 如果已經(jīng)復(fù)制到從節(jié)點,則通過Future喚醒,并將結(jié)果返回給消息發(fā)送端。
GroupTransferService代碼如下圖所示:
為了更加方便大家理解接下來的優(yōu)化點,首先再總結(jié)提煉一下GroupTransferService的設(shè)計理念:
- 首先引入兩個List結(jié)合,分別命名為讀、寫鏈表。
- 外部調(diào)用GroupTransferService的putRequest請求,將存儲在寫鏈表中(requestWrite)。
- GroupTransferService的run方法從requestRead鏈表中獲取任務(wù),判斷這些任務(wù)對應(yīng)的請求的數(shù)據(jù)是否成功寫入到從節(jié)點。
- 每當(dāng)requestRead中沒有數(shù)據(jù)可讀時,兩個隊列進(jìn)行交互,從而實現(xiàn)讀寫分離,降低鎖競爭。
新版本的優(yōu)化點主要包括:
- 更改putRequest的鎖類型,用自旋鎖替換synchronized
- 去除doWaitTransfer方法中多余的鎖
1.1 使用自旋鎖替換synchronized
正入下圖所示,GroupTransferService向外提供接口putRequest,用來接受外部的同步任務(wù),需要對ArrayList加鎖進(jìn)行保護(hù),往ArrayList中添加數(shù)據(jù)屬于一個內(nèi)存操作,操作耗時小。
故這里沒必要采取synchronized這種synchronized,而是可以自旋鎖,自旋鎖的實現(xiàn)非常輕量級,其實現(xiàn)如下圖所示:
整個鎖的實現(xiàn)就只需引入一個AtomicBoolean,加鎖、釋放鎖都是基于CAS操作,非常的輕量,并且自旋鎖不會發(fā)生線程切換。
1.2 去除多余的鎖
“鎖”的濫用是一個非常普遍的現(xiàn)象,多線程環(huán)境編程是一個非常復(fù)雜的交互過程,在編寫代碼過程中我們可能覺得自己無法預(yù)知這段代碼是否會被多個線程并發(fā)執(zhí)行,為了謹(jǐn)慎起見,就直接簡單粗暴的對其進(jìn)行加鎖,帶來的自然是性能的損耗,這里將該鎖去除,我們就要結(jié)合該類的調(diào)用鏈條,判斷是否需要加鎖。
整個GroupTransferService中在多線程環(huán)境中運行需要被保護(hù)的主要是requestRead與requestWrite集合,引入的鎖的目的也是確保這兩個集合在多線程環(huán)境下安全訪問,故我們首先應(yīng)該梳理一下。
GroupTransferService的核心方法的運作流程:
doWaitTransfer方法操作的主要對象是requestRead鏈表,而且該方法只會被GroupTransferService線程調(diào)用,并且requestRead中方法會在swapRequest中被修改,但這兩個方法是串行執(zhí)行,而且在同一個線程中,故無需引入鎖,該鎖可以移除。
但由于該鎖被移除,在swapRequests中進(jìn)行加鎖,因為requestWrite這個隊列會被多個線程訪問,優(yōu)化后的代碼如下:
從這個角度來看,其實主要是將鎖的類型由synchronized替換為更加輕量的自旋鎖。
2、降低鎖的范圍
被鎖包裹的代碼塊是串行執(zhí)行,即無法并發(fā),在無法避免鎖的情況下,降低鎖的代碼塊,能有效提高并發(fā)度,圖解如下:
如果多個線程區(qū)訪問lock1,lock2,在lock1中domSomeThing1、domSomeThing2這兩個方法都必須串行執(zhí)行,而多個線程同時訪問lock2方法,doSomeThing1能被多個線程同時執(zhí)行,只有doSomething2時才需要串行執(zhí)行,其整體并發(fā)效果肯定是lock2,基于這樣理論:得出一個鎖使用的最佳實踐:被鎖包裹的代碼塊越少越好。
在老版本中,消息寫入加鎖的代碼塊比較大,一些可以并發(fā)執(zhí)行的動作也被鎖包裹,例如生成offsetMsgId。
新版本采用函數(shù)式編程的思路,只是定義來獲取msgId的方法,在進(jìn)行消息寫入時并不會執(zhí)行,降低鎖的粒度,使得offsetMsgId的生成并行化,其編程手段之巧妙,值得我們學(xué)習(xí)。
3、調(diào)整消息發(fā)送相關(guān)的參數(shù)
sendMessageThreadPoolNums
Broker端消息發(fā)送端線程池數(shù)量,該值在4.9.0版本之前默認(rèn)為1,新版本調(diào)整為操作系統(tǒng)的CPU核數(shù),并且不小于4。該參數(shù)的調(diào)整有利有弊。提高了消息發(fā)送的并發(fā)度,但同時會導(dǎo)致消息順序的亂序,其示例圖如下同步發(fā)送下不會有順序問題,可放心修改。
在順序消費場景,該參數(shù)不建議修改。在實際過程中應(yīng)該對RocketMQ集群進(jìn)行治理,順序消費的場景使用專門集群。
useReentrantLockWhenPutMessage MQ消息寫入時對內(nèi)存加鎖使用的鎖類型,低版本之前默認(rèn)為false,表示默認(rèn)使用自旋鎖;新版本使用ReentrantLock。自旋主要的優(yōu)勢是沒有線程切換成本,但自旋容易造成CPU的浪費,內(nèi)存寫入大部分情況下是很快,但RocketMQ比較依賴頁緩存,如果出現(xiàn)也緩存抖動,帶來的CPU浪費是非常不值得,在sendMessageThreadPoolNums設(shè)置超過1之后,鎖的類型使用ReentrantLock更加穩(wěn)定。
flushCommitLogTimed 首先我們通過觀察源碼了解一下該參數(shù)的含義:
其主要作用是控制刷盤線程阻塞等待的方式,低版本flushCommitLogTimed為false,默認(rèn)使用CountDownLatch,而高版本則直接使用Thread.sleep。猜想的原因是刷盤線程比較獨立,無需與其他線程進(jìn)行直接的交互協(xié)作,故無需使用CountDownLatch這種專門用來線程協(xié)作的“外來和尚”。
endTransactionThreadPoolNums
主要用于設(shè)置事務(wù)消息線程池的大小。
新版本主要是可通過調(diào)整發(fā)送線程池來動態(tài)調(diào)節(jié)事務(wù)消息的值,這個大家可以根據(jù)壓測結(jié)果動態(tài)調(diào)整。