Netty-Reactor 模型常見知識點小結(jié)
Netty作為一款強大的高性能網(wǎng)絡(luò)編程框架,其底層Reactor的設(shè)計理念和實現(xiàn)都是非常值得我們研究和學習了解的,本文將從以下幾個問題為導(dǎo)向,帶讀者深入理解netty reactor線程模型。
1. Netty有Reactor線程模型
Reactor模型的用戶層面的IO模型,按照結(jié)構(gòu)它可分為:
- Reactor單線程
- Reactor多線程
- 主從Reactor模型
先來說說單Reactor單線程模型,每個客戶端與服務(wù)端建立連接時,所有的請求建立、讀寫事件分發(fā)都由這個Reactor線程處理。很明顯,所有的連接建立、讀寫和業(yè)務(wù)邏輯處理等工作都分配到一個線程上,對于現(xiàn)如今多核的服務(wù)器場景,這種方案未能很好的利用CPU資源,對應(yīng)高并發(fā)場景表現(xiàn)也不算特別出色(會比傳統(tǒng)的BIO好一些):
于是就有了單Reactor多線程模型,與前者相比,Reactor監(jiān)聽到就緒的IO連接并建立連接后,它會將所有的讀寫請求交給一個業(yè)務(wù)線程池進行處理。 該模型較好利用了CPU資源,提升的程序執(zhí)行效率,但是面對大量的并發(fā)連接請求時,因為只有一個Reactor處理IO請求,系統(tǒng)的吞吐量還是沒有提升。
最后就是主從Reactor模型,也就是如今主流的Reactor模型,該模型為用分為主Reactor和從Reactor,各自都是以線程池的形式存在,由主Reactor專門處理連接事件,隨后將每個建立連接的客戶端socket讀寫事件注冊到從Reactor中,由從Reactor負責處理這些讀寫以及業(yè)務(wù)邏輯。主從Reactor模型是一種改進的事件驅(qū)動編程模型,相比于單Reactor單線程模型,它具有以下幾個優(yōu)勢:
- 多線程并發(fā)處理:主從Reactor模型允許多個線程同時處理事件,每個線程都有一個獨立的Reactor負責事件分發(fā)。這樣可以充分利用多核處理器的優(yōu)勢,提高系統(tǒng)的并發(fā)處理能力和性能。
- 高吞吐量:由于使用了多線程并發(fā)處理,主從Reactor模型能夠同時處理多個事件,從而提高系統(tǒng)的吞吐量。每個線程都可以獨立處理事件,不會被其他事件的處理阻塞。
- 負載均衡:主從Reactor模型中,主Reactor負責監(jiān)聽和接收連接請求,然后將連接分配給從Reactor進行具體的事件處理。這種分配方式可以實現(xiàn)負載均衡,將連接均勻地分配給多個Reactor,避免某個Reactor的負載過重。
- 異步IO支持:主從Reactor模型可以結(jié)合異步IO技術(shù),充分利用操作系統(tǒng)提供的異步IO接口。這樣可以在進行IO操作時立即返回,不會阻塞線程,提高系統(tǒng)的并發(fā)性和響應(yīng)性能。
- 容錯能力:通過使用多個Reactor和線程,主從Reactor模型具有更好的容錯能力。如果某個Reactor或線程出現(xiàn)錯誤或崩潰,其他Reactor和線程仍然可以繼續(xù)處理事件,保證系統(tǒng)的正常運行。
主從Reactor模型通過多線程并發(fā)處理、負載均衡、異步IO支持和容錯能力的提升,能夠更好地滿足高并發(fā)、高性能的網(wǎng)絡(luò)應(yīng)用程序的需求。
2. Netty如何實現(xiàn)Reactor模式
通過上文我們大體了解了幾種常見的Reactor模式,實際上Netty已經(jīng)將這三種Reactor模式都封裝好了,假設(shè)我們需要單Reactor服務(wù)端,只需指明NioEventLoopGroup的線程數(shù)為1即可:
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1);
serverBootstrap.group(nioEventLoopGroup);
同理多Reactor則將線程數(shù)設(shè)置為大于1即可,當然我們也可以設(shè)置NioEventLoopGroup參數(shù)為空,因為如果NioEventLoopGroup不設(shè)置參數(shù)時,該分發(fā)內(nèi)部會創(chuàng)建CPU核心數(shù)2倍的線程:
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
serverBootstrap.group(nioEventLoopGroup);
這一點我們直接步入NioEventLoopGroup內(nèi)部只需流程即可看到,默認情況下我們傳入的thread為0,它就取DEFAULT_EVENT_LOOP_THREADS 的值,而這個值初始情況下回去CPU核心數(shù)2倍:
//不傳參時nThreads值為0,super即MultithreadEventLoopGroup
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
//MultithreadEventLoopGroup看到nThreads為0則取DEFAULT_EVENT_LOOP_THREADS
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
//DEFAULT_EVENT_LOOP_THREADS 取CPU核心數(shù)的2倍
privatestaticfinalint DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
3. 為什么 main Reactor大部分場景只用到一個線程
上文介紹Reactor模式時已經(jīng)介紹到了,它主要負責處理新連接,而Netty服務(wù)端初始化時只會綁定一個ip和端口號然后生成serverSocketChannel,而每個channel只能和一個線程綁定,這就導(dǎo)致了main Reactor主服務(wù)端連接大部分場景(連接沒有斷開)只會用到一個線程:
我們不妨通過代碼的方式進行印證,我們服務(wù)端初始化時都是通過這個bind方法完成連接建立:
// Start the server.
ChannelFuture f = serverBootstrap.bind(PORT).sync();
查看bind內(nèi)部的調(diào)用doBind即可看到它通過異步任務(wù)完成服務(wù)端serverSocketChannel創(chuàng)建之后,就會調(diào)用doBind0完成ip和端口號綁定:
private ChannelFuture doBind(final SocketAddress localAddress) {
//生成創(chuàng)建服務(wù)端serverSocketChannel的regFuture
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//如果regFuture完成了則將channel和ip端口號即localAddress綁定
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
//......
}
}
隨后我們查看doBind0(一般帶有do+0的方法都是執(zhí)行核心邏輯的方法)方法,即可看到它會從當前channel的eventLoopGroup找到一個線程真正執(zhí)行ip和端口綁定,這也就是我們所說的為什么main Reactor大部分場景只用到一個線程:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
//從channel的eventLoopGroup中找到一個線程執(zhí)行bind
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
有讀者可能會問,為什么時大部分呢?那么小部分是什么情況?
答案是連接失敗的情況,一旦綁定ip端口失敗,Netty內(nèi)部會拋出異常,如果服務(wù)端有斷線重連機制,進行重新綁定時,channel可能會綁定eventGroup中的另一個線程:
這里筆者也給出斷線重連的服務(wù)端實現(xiàn),可以看到我們通過channelInactive監(jiān)聽到斷線后會重新創(chuàng)建channel進行綁定ip端口生成新的socket,此時我們就可以用到線程組中別的線程了:
@Override
public void channelInactive(ChannelHandlerContext ctx) {
ctx.channel().eventLoop().execute(()->{
//創(chuàng)建新的引導(dǎo)類
ServerBootstrap serverBootstrap =......;
//在地調(diào)用bind
serverBootstrap.bind("127.0.0.1",8080);
});
}
4. Netty線程分配策略是什么
線程分配的負載均衡策略,也是在這里完成初始化的,chooserFactory會根據(jù)我們傳入的線程數(shù)給定一個負載均衡算法。對于負載均衡算法Netty也做了很多的優(yōu)化。我們查看chooserFactory創(chuàng)建策略可以看到,如果當前線程數(shù)的2的次冪則返回PowerOfTowEventExecutorChooser改選擇使用位運算替代取模,反之返回GenericEventExecutorChooser這就是常規(guī)的取模運算。
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
//如果是2的次冪則用PowerOfTwoEventExecutorChooser選擇器
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
//反之取常規(guī)的取模運算選擇器
return new GenericEventExecutorChooser(executors);
}
}
先來說說PowerOfTowEventExecutorChooser ,其實它們的本質(zhì)就是基于一個索引idx 通過原子自增并取模得到線程索引,只不過若線程數(shù)為2的次冪則可以通過位運算完成取模的工作,這么做的原因也是因為計算機對于位運算的執(zhí)行效率遠遠高于算術(shù)運算。
這種算法通過位運算的方式提升計算效率,那么是否存在索引越界問題呢?假設(shè)線程數(shù)組長度為8,也就是2的3次方,那么實際進行與運算的值就是7,這個值也正是線程數(shù)組索引的最大值。筆者分別帶入索引0、5、8,進行與運算時,真正參與的二進制永遠是和永遠是7以內(nèi)的進制,得出的結(jié)果分別是0、5、0,永遠不會越界,并且運算性能還能得到保證。
對此我們給出PowerOfTowEventExecutorChooser 選擇器的實現(xiàn),思路正如上文所說,通過按位與一個線程索引范圍的最大值得到executors線程組索引范圍以內(nèi)的線程:
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
//......
//原子類自增并和線程索引最大值進行按位與運算得到線程
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
而GenericEventExecutorChooser 則是原子自增和線程數(shù)組長度進行取模%運算得到線程,實現(xiàn)比較簡單,這里筆者就直接給出代碼了:
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
//......
@Override
public EventExecutor next() {
//原子類自增和線程長度進行取模
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
5. Netty中的IO多路復(fù)用的概念
默認情況,Netty是通過JDK的NIO的selector組件實現(xiàn)IO多路復(fù)用的,其實現(xiàn)的特點為:
- 功能上:輪詢其是多路復(fù)用的,它支持將多個客戶端(channel)的讀寫事件注冊到一個selector上,由單個selector進行輪詢。
- 非阻塞:selector進行輪詢是采用非阻塞輪詢,即非阻塞的到內(nèi)核態(tài)查看注冊的讀寫事件是否就緒,如果沒就緒則直接返回未就緒,而不是阻塞等待。
- 事件驅(qū)動:輪詢到就緒事件后selector就會將結(jié)果返回給對應(yīng)的EventLoop,交由其chanel pipeline上的處理器進行處理。
6. Netty基于那幾個組件搭配實現(xiàn)IO多路復(fù)用
我們以最經(jīng)典的reactor模型來探討這個問題,整體來說,Netty是通過以下幾個組件完成IO多路復(fù)用的方案落地:
- 聲明boss group線程組作為main reactor,它本質(zhì)是Selector(對于Linux系統(tǒng)下是EpollEventLoop 的封裝),對于接收新連接的客戶端socket,并通過acceptHandler分發(fā)連接請求。
- 通過work group為分發(fā)過過來的連接分配一個線程,對應(yīng)的它們都會被抽象為一個Channel對象,后續(xù)該socket的讀寫時間都在work group的線程上的處理,而這個線程內(nèi)部也會有一個EventLoop通過selector針對這幾個socket的讀寫事件進行io輪詢查看是否就緒。
- 而每個一個客戶端channel注冊到從reactor后續(xù)的讀寫事件都會通過對應(yīng)的channel pipeline上的handler處理器進行處理。
對此我們也將這些組件的協(xié)作流程進行總結(jié):
- 服務(wù)端初始化所有線程,各自都綁定一個selector。
- BossGroup 初始化連接,綁定ip和端口,其底層selector輪詢器會監(jiān)聽當前ServerSocketChannel 對應(yīng)的客戶端新接入的連接事件。
- 客戶端連接到達時,BossGroup將就緒的客戶端channel件分發(fā)到worker group的某個線程的EventLoop上。
- work group為該channel分配處理器并將其讀寫事件注冊到自己的selector上,同時監(jiān)聽其讀寫事件。
- 后續(xù)讀寫事件就緒時,EventLoop就會觸發(fā)ChannelPipeline 中的處理器處理事件。
7. Netty如何實現(xiàn)通用NIO多路復(fù)用器
實際上Netty對于JDK NIO SelectorProvider 做了一些靈活的處理,它可以讓用戶通過JVM參數(shù)或者SPI文件配置等方式讓用戶直接JDK NIO提供的selector。
我們配置引導(dǎo)類的時候,通常會聲明b.channel(NioServerSocketChannel.class);,一旦我們通過引導(dǎo)類進行初始化的時候,其底層就會按照如下順序執(zhí)行:
- 首先會通過loadProviderFromProperty查看用戶是否有通過系統(tǒng)配置指定創(chuàng)建,即通過JVM參數(shù)-D java.nio.channels.spi.SelectorProvider指定selectorProvider的全限定名稱,若存在則通過應(yīng)用程序加載器即(Application Classloader)完成反射創(chuàng)建。
- 若步驟1明確沒有配置,則查看SPI是否有配置,即查看工廠目錄META-INF/services下是否有定義名為SelectorProvider的SPI文件,若存在則會拿著第一個SelectorProvider的全限定名稱進行反射創(chuàng)建。
- 若都沒有則是創(chuàng)建DefaultSelectorProvider這個DefaultSelectorProvider會根據(jù)操作系統(tǒng)內(nèi)核版本決定提供那個DefaultSelectorProvider,以筆者為例是Windows操作系統(tǒng)所以提供的Provider是WindowsSelectorProvider,同理如果是Linux內(nèi)核2.6以上則是EpollSelectorProvider。
這就是Netty如何保證NIO多路復(fù)用器通用的原因:
我們直接查看NioServerSocketChannel,可以看到其默認構(gòu)造方法內(nèi)部使用默認DEFAULT_SELECTOR_PROVIDER 進行NioServerSocketChannel創(chuàng)建:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
//......
//使用DEFAULT_SELECTOR_PROVIDER創(chuàng)建server socket channel
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
我們查看DEFAULT_SELECTOR_PROVIDER 的實現(xiàn),即SelectorProvider.provider()內(nèi)部邏輯,可以正如我們上文所說的順序,這里筆者就不多做贅述了:
public static SelectorProvider provider() {
//臨界加載上個鎖
synchronized (lock) {
//......
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
//使用jvm方式嘗試反射創(chuàng)建
if (loadProviderFromProperty())
return provider;
//使用spi的方式進行反射創(chuàng)建
if (loadProviderAsService())
return provider;
//返回通過系統(tǒng)平臺jdk系統(tǒng)的DefaultSelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
8. Netty如何優(yōu)化工作線程調(diào)度平衡
Netty設(shè)計者為了提升單個NIO線程的利用率,對每一個線程調(diào)度分配都做了極致的壓榨,其工作流程為先查看定時任務(wù)隊列scheduledTaskQueue中查看是否有就緒的任務(wù),若有則查看它的到期時間距今的時差,并基于這個時差進行非阻塞輪詢查看是否存在就緒的任務(wù)。當然如果定時隊列中沒有就緒的任務(wù),那么輪詢IO任務(wù)的方法select就會阻塞輪詢,直到被移步任務(wù)喚醒或者select有就緒事件。
得到就緒的IO事件后,Netty會調(diào)用processSelectedKeys進行處理,然后基于這個IO事件的處理時長,按照同等執(zhí)行比例從taskQueue和tailTasks中獲取任務(wù)并執(zhí)行,可以看出Netty中的節(jié)點針對每一個時間點都做好了很好的安排,并完成相對公平的調(diào)度:
對應(yīng)的我們給出Netty每一個線程NioEventLoop的run方法,邏輯和筆者上文描述一致,讀者可自行參閱:
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
//......
case SelectStrategy.SELECT:
//查看是否有就緒的定時任務(wù),如果有則設(shè)置到期時間curDeadlineNanos
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
//如果沒有任務(wù),則基于curDeadlineNanos進行定長時阻塞輪詢就緒IO事件
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
//......
}
default:
}
} catch (IOException e) {
//......
}
//......
//默認情況下ioRatio 為50,我們直接看else邏輯
if (ioRatio == 100) {
//......
} elseif (strategy > 0) {
finallong ioStartTime = System.nanoTime();
try {
//處理IO事件
processSelectedKeys();
} finally {
//基于IO事件處理的耗時繼續(xù)處理其他異步任務(wù)
finallong ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
//......
}
}
9. Netty如何解決CPU 100% 即空輪詢問題
JDK的NIO底層由Epoll實現(xiàn),在部分Linux的2.6的kernel中,poll和epoll對于突然中斷的連接socket會對返回的eventSet事件集合置為POLLHUP或POLLERR,進而導(dǎo)致eventSet事件集合發(fā)生了變化,這就可能導(dǎo)致selector會被喚醒,由此引發(fā)CPU 100%.問題。
關(guān)于這個問題的bug,感興趣的讀者可移步下面這個鏈接查看bug詳情:
JDK-6670302:https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6670302
而Netty的NIO線程解決方案則比較簡單了,每一次循環(huán)它都會查看本次是否有執(zhí)行任務(wù),如果有則不做處理,反之它會累加一個selectCnt,一旦selectCnt值大于或者等于512(默認值)時,就會調(diào)用rebuildSelector重新構(gòu)建選擇器從而解決這個問題:
對應(yīng)的源碼仍然在NioEventLoop的run方法,當我們執(zhí)行了異步任務(wù)則ranTasks 為true,如果有輪詢到IO事件則strategy 大于0,在后續(xù)邏輯中selectCnt(這個變量代表空輪詢次數(shù)) 會被重置,反之selectCnt會不斷被累加直到超過512次,通過執(zhí)行rebuildSelector重新構(gòu)建輪詢器避免CPU100%問題:
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
//輪詢并處理任務(wù)
//......
//累加一次selectCnt
selectCnt++;
//如果有執(zhí)行任務(wù)則重置selectCnt
if (ranTasks || strategy > 0) {
//......
selectCnt = 0;
} elseif (unexpectedSelectorWakeup(selectCnt)) { //反之視為異常喚醒,執(zhí)行unexpectedSelectorWakeup
selectCnt = 0;
}
} catch (CancelledKeyException e) {
//......
} //......
}
}
步入unexpectedSelectorWakeup即可印證筆者所說的,當空輪詢大于或者等于512次之后就會重新構(gòu)建輪詢器:
private boolean unexpectedSelectorWakeup(int selectCnt) {
//......
//如果selectCnt 大于SELECTOR_AUTO_REBUILD_THRESHOLD(512)則執(zhí)行rebuildSelector重新構(gòu)建當前eventLoop的輪詢器
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//......
rebuildSelector();
return true;
}
return false;
}
10. Netty對于事件輪詢器做了哪些優(yōu)化
默認情況下JDK的DefaultSelectorProvider在Windows系統(tǒng)下創(chuàng)建的是WindowsSelectorImpl,而Linux則是EpollSelectorImpl,它們都繼承自SelectorImpl,查看SelectorImpl的源碼可以發(fā)現(xiàn)它如下幾個核心參數(shù):
- selectedKeys :存放就緒IO事件集。
- publicSelectedKeys:和上述概念一致,只不過是selectedKeys 的一個視圖,給用戶讀取就緒IO事件時用的,且外部線程對于這個publicSelectedKeys只能做刪除操作。
- keys :我們都知道對于socket感興趣的IO事件都會注冊到keys上。
- publicKeys:和上述概念類似,只不過是keys 一個對外的視圖,不可增加元素,只能讀取和刪除。
public abstractclass SelectorImpl extends AbstractSelector {
//存儲感興趣的IO事件
protected HashSet<SelectionKey> keys = new HashSet();
//keys的只讀視圖層
private Set<SelectionKey> publicKeys;
//存放就緒IO事件的集合
protected Set<SelectionKey> selectedKeys = new HashSet();
//上一個集合的視圖層
private Set<SelectionKey> publicSelectedKeys;
//......
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) {
//......
} else {
//......
//使用ungrowableSet封裝selectedKeys作為視圖
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}
}
應(yīng)用程序從內(nèi)核獲取就緒的IO事件也就是添加到selectedKeys 上,以服務(wù)端接收客戶端讀寫請求為例,我們的主reactor為了拿到客戶端的連接請求,就會將自己的channel依附即attach到SelectionKeyImpl上,一旦這個輪循到就緒連接事件繼續(xù)后就會調(diào)用attachment方法通知這個channel處理連接。很明顯遍歷就緒的key用HashSet效率不是很高效(無需的哈希集):
所以Netty為了提高處理時遍歷的效率,對存儲就緒事件的集合進行了優(yōu)化,它會判斷創(chuàng)建的selector 是否是默認的selector ,且DISABLE_KEYSET_OPTIMIZATION 這個變量是否為false(默認為false),如果符合這兩個條件,則初始化時會通過反射將selectedKeys改為數(shù)組,通過數(shù)組的連續(xù)性保證CPU緩存可以一次性加載盡可能多的key以及提升迭代效率:
對此我們給出NioEventLoop的構(gòu)造方法,可以看到NioEventLoop初始化時回調(diào)用openSelector完成selector創(chuàng)建,其內(nèi)部就存在我們上述所說的如果是原生jdk的selector且DISABLE_KEYSET_OPTIMIZATION為false(即允許key優(yōu)化)則通過反射修改集合類型:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
//......
//創(chuàng)建selector,如果是原生jdk的selector且DISABLE_KEYSET_OPTIMIZATION為false(即允許key優(yōu)化)則通過反射修改集合類型
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
//......
}
最終步入openSelector即可看到我們所說的條件判斷和反射修改集合的邏輯:
private SelectorTuple openSelector() {
//......
//反射獲取當前selector類型
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
//非平臺提供的selector則直接封裝返回
if (!(maybeSelectorImplClass instanceof Class) ||
//......
returnnew SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
//創(chuàng)建一個1024長度的SelectionKey數(shù)組存放事件
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
//反射獲取當前selector字段
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
//......
//通過反射將selector設(shè)置為數(shù)組類型的selector
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
returnnull;
} catch (NoSuchFieldException e) {
//......
}
}
});
//......
}
我們不妨看看SelectedSelectionKeySet做了那些優(yōu)化,首先從定義來看它的SelectionKey是一個數(shù)組,很明顯數(shù)組的添加和遍歷效率都是順序的所以處理效率相較于HashSet會高效需多。而且因為數(shù)組內(nèi)存空間是連續(xù)的,可以更好的利用CPU緩存行從而一次性讀取并遍歷更多的key進行高效處理,所以每次CPU都可以加載對應(yīng)元素和其鄰接元素,所以處理效率相較于不規(guī)則的HashSet要高效許多。
11. Netty無鎖化的串行設(shè)計理念
為盡可能提升NioEventLoop的執(zhí)行效率,出了上述提到的空閑等待、基于定時任務(wù)定長時輪詢以及IO和計算任務(wù)平衡配比等設(shè)計以外,在提交任務(wù)時,Netty采用MpscChunkedArrayQueue作為任務(wù)隊列,這是一個無鎖的多生產(chǎn)者單消費者的任務(wù)隊列,提交任務(wù)時,該隊列就會基于CAS得到這個隊列的索引位置,然后將任務(wù)提交到隊列中,然后我們的NioEventLoop一樣通過原子操作或者可以消費的索引位置進行任務(wù)消費:
這一點我們可以直接查看NioEventLoopGroup 的構(gòu)造函數(shù)即可看到,初始化時其內(nèi)部會調(diào)用newTaskQueue創(chuàng)建MpscChunkedArrayQueue來管理任務(wù):
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
//......
//指明創(chuàng)建隊列為mpscQueue
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
}
addTask本質(zhì)上就是調(diào)用MpscChunkedArrayQueue的offer方法,其本質(zhì)就是通過CAS操作獲得可以添加元素的索引位置pIdex,然后基于這個pIndex得到物理地址并完成賦值:
@Override
public boolean offer(final E e)
{
if (null == e)
{
thrownew NullPointerException();
}
long mask;
E[] buffer;
long pIndex;
while (true)
{
//各種索引位計算
//cas獲取生產(chǎn)者索引位置
if (casProducerIndex(pIndex, pIndex + 2))
{
break;
}
}
// 獲取cas之后得到的pIndex的位置然后賦值
finallong offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
soRefElement(buffer, offset, e); // release element e
returntrue;
}
而數(shù)據(jù)消費也是同理,Netty的NIO線程通過poll進行獲取,其內(nèi)部通過lpConsumerIndex進行CAS獲得消費者的消費端索引,然后通過原子操作拿到元素值,如果e不存在則繼續(xù)CAS自旋直到可以得到這個值為止:
@Override
public E poll()
{
//CAS獲取索引位置
finallong index = lpConsumerIndex();
//......
//定位到索引偏移量
finallong offset = modifiedCalcCircularRefElementOffset(index, mask);
Object e = lvRefElement(buffer, offset);
//如果元素為空,不斷自旋拿到值為止
if (e == null)
{
if (index != lvProducerIndex())
{
// poll() == null iff queue is empty, null element is not strong enough indicator, so we must
// check the producer index. If the queue is indeed not empty we spin until element is
// visible.
do
{
e = lvRefElement(buffer, offset);
}
while (e == null);
}
//......
}
//......
//返回元素
return (E) e;
}
關(guān)于網(wǎng)絡(luò)IO框架的一些展望——網(wǎng)絡(luò)IO模型io_uring
文章補充更新:近期和業(yè)界的一些大牛進行深入交流時了解到一個除了epoll以外更強大的io模型——io_uring,相較于epoll和其它io模型,它有著如下優(yōu)點:
- 用戶態(tài)和內(nèi)存態(tài)進行IO操作時共享一塊內(nèi)存區(qū)域,由此避免切態(tài)開銷。
- 發(fā)起IO調(diào)用無需內(nèi)核態(tài)調(diào)用,在SQPOLL模式下,sq線程會自行從提交隊列中獲取IO事件并處理,完成后會將結(jié)果寫入共享區(qū)域的完成隊列告知用戶。
- 用戶態(tài)的應(yīng)用程序可可直接通過完成隊列這個環(huán)形緩沖區(qū)獲得完成的IO事件并進行進一步操作。
這里我們以一個簡單的到磁盤中讀取數(shù)據(jù)的流程為例看看io_uring的整體流程:
- 用戶發(fā)起IO請求,希望從/tmp目錄下讀取某個文本文件的內(nèi)容
- 發(fā)起IO請求,該調(diào)用會在io_uring的提交隊列(它是一個環(huán)形緩沖區(qū))中追加該事件,用tail指針指向該事件。
- 底層的sq線程輪詢提交隊列中待完成的事件指針,拿到這個IO事件,發(fā)起磁盤IO調(diào)用。
- 完成數(shù)據(jù)讀寫之后,將該事件和結(jié)果寫入完成隊列。
- 應(yīng)用程序直接從完成隊列中讀取該事件結(jié)果并進行業(yè)務(wù)處理。
用戶態(tài)從完成隊列中獲取到我們的磁盤讀取事件的指針地址,從而拿到數(shù)據(jù)。可以看到,整個流程用戶態(tài)在完成一次IO期間完全沒有進行切態(tài)和數(shù)據(jù)拷貝的開銷,相較于epoll來說性能損耗小了很多。