一篇學會NioEventLoopGroup源碼解析
本文轉(zhuǎn)載自微信公眾號「源碼學徒」,作者皇甫嗷嗷叫 。轉(zhuǎn)載本文請聯(lián)系源碼學徒公眾號。
NioEventLoopGroup的初始化源碼
一、尋找源碼的過程
我們前面說到過,NioEventLoopGroup我們可以近乎把它看作是一個線程池,該線程池會執(zhí)行一個一個的任務,我們常用的NioEventLoopGroup大概有兩種,NioEventLoopGroup(int nThreads),NioEventLoopGroup(),即一個是指定線程數(shù)量的,一個是默認指定線程數(shù)量的!這里我們以無參構(gòu)造為入口進行分析!
- EventLoopGroup work = new NioEventLoopGroup();
- public NioEventLoopGroup() {
- this(0);
- }
當我們使用默認的數(shù)量的時候,他會傳遞一個0,我們繼續(xù)往下跟!
- public NioEventLoopGroup(int nThreads) {
- this(nThreads, (Executor) null);
- }
注意這里傳遞的參數(shù)是:0,null
- public NioEventLoopGroup(int nThreads, Executor executor) {
- //每個 group維護一個 SelectorProvider 主要用它獲取selector選擇器
- this(nThreads, executor, SelectorProvider.provider());
- }
這里面多傳遞了一個 SelectorProvider.provider(),該方法是JDK NIO提供的API主要可以獲取NIO選擇器或者Channel,如下圖:
我們回歸主線繼續(xù)跟:
- public NioEventLoopGroup(
- int nThreads, Executor executor, final SelectorProvider selectorProvider) {
- this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
- }
這里多傳遞了一個 DefaultSelectStrategy選擇策略,這在后面講解NioEventLoop會具體講解,不做闡述!
- public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
- final SelectStrategyFactory selectStrategyFactory) {
- super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
- }
我們會發(fā)現(xiàn)這里還會默認傳遞一個拒絕策略RejectedExecutionHandlers.reject(),這個拒絕策略是干嘛的呢?
- @Override
- public void rejected(Runnable task, SingleThreadEventExecutor executor) {
- throw new RejectedExecutionException();
- }
我們得到一個結(jié)論,當某些條件觸發(fā)這個拒絕策略,那么他會拋出一個RejectedExecutionException異常,具體什么時候觸發(fā),后續(xù)也會詳細說明,這里只需要記住就OK了!
我們繼續(xù)回到主線, 這里我們開始調(diào)用父類,還記得上一節(jié)課我們分析的NioEventLoopGroup的父類是誰嗎?沒錯是:MultithreadEventLoopGroup, 我們會進入到MultithreadEventLoopGroup里面:
- protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
- //線程數(shù)量為0時 使用默認的cpu * 2
- super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
- }
nThreads還記得是幾嗎?是0對不對,這里有個判斷,當你的線程數(shù)量為0的時候,會使用DEFAULT_EVENT_LOOP_THREADS當作線程池的數(shù)量,DEFAULT_EVENT_LOOP_THREADS是多少呢?
- DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
默認是CPU的兩倍,所以我們現(xiàn)在得到一個結(jié)論,當我們使用默認的NioEventLoopGroup的時候,系統(tǒng)會默認使用系統(tǒng)CPU核數(shù)*2當作線程池的數(shù)量!
我們上一步傳遞過來的selectorProvider、拒絕策略、selectStrategyFactory被封裝為數(shù)組,并放在args[0],args[1], args[2]的位置!
我們繼續(xù)回到主線,這里又再次調(diào)用到父類,MultithreadEventExecutorGroup:
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
- this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
- }
注意,這里又再次多傳遞了一個參數(shù):DefaultEventExecutorChooserFactory一個選擇器工廠,這里會返回一個選擇器,他是DefaultEventExecutorChooserFactory類型的,具體分析后面會分析!我們繼續(xù)回到主線:
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
- ..........后續(xù)源碼補充..........
- }
到這里我們終于看到了一大段代碼,這里是EventLoopGroup的主要邏輯,我們逐行分析:
二、構(gòu)建線程執(zhí)行器
1. 源碼解析
- //newDefaultThreadFactory 構(gòu)建線程工廠
- if (executor == null) {
- //創(chuàng)建并保存線程執(zhí)行器 執(zhí)行器 執(zhí)行任務的 默認是 DefaultThreadFactory 線程池
- executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
- }
這里會判斷我們傳入的執(zhí)行器是否為空,否則就新建一個,我們還記得executor是什么值嗎?是null,對不對,所以它會進入到這里的邏輯我們進入到newDefaultThreadFactory源碼里面看一下:
newDefaultThreadFactory()主要邏輯
- protected ThreadFactory newDefaultThreadFactory() {
- return new DefaultThreadFactory(getClass());
- }
可以看到,這里向執(zhí)行器里面?zhèn)魅肓艘粋€ DefaultThreadFactory一個默認的線程工廠!
ThreadPerTaskExecutor主要邏輯:
- /**
- * io.netty.util.concurrent.DefaultThreadFactory#newThread(java.lang.Runnable)
- *
- * 執(zhí)行任務 每次執(zhí)行任務都會創(chuàng)建一個線程實體對象
- * @param command 線程
- */
- @Override
- public void execute(Runnable command) {
- //執(zhí)行任務
- threadFactory.newThread(command).start();
- }
我們發(fā)現(xiàn),這里調(diào)用了一個我們傳入的線程工廠,創(chuàng)建了一個新的線程并調(diào)用start方法啟動了起來,那么他是如何創(chuàng)建的呢? 我們進入到newThread源碼里面查看,由于我們默認使用的線程工廠是 DefaultThreadFactory, 所以,我們會進入到 DefaultThreadFactory#newThread
- @Override
- public Thread newThread(Runnable r) {
- //創(chuàng)建一個線程 每次執(zhí)行任務的時候都會創(chuàng)建一個線程實體
- Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
- try {
- if (t.isDaemon() != daemon) {
- t.setDaemon(daemon);
- }
- if (t.getPriority() != priority) {
- t.setPriority(priority);
- }
- } catch (Exception ignored) {
- // Doesn't matter even if failed to set.
- }
- return t;
- }
這里沒有太多的操作,只是會將一個 Runnable封裝為一個 Thread進行返回,我們重點關注一下這個Thread,它和我們傳統(tǒng)使用的Thread是一樣的嗎? 我們跟進到 newThread方法看一下:
- protected Thread newThread(Runnable r, String name) {
- //Netty自己封裝的線程
- return new FastThreadLocalThread(threadGroup, r, name);
- }
邏輯很簡單,就是將一個Thread包裝為Netty自定義的 FastThreadLocalThread,至于為什么,我們暫時不往下多做解釋,后續(xù)章節(jié)會很詳細的解釋它!
2. 線程執(zhí)行器總結(jié)
這里我們會創(chuàng)建一個線程執(zhí)行器 ThreadPerTaskExecutor,使用默認的線程工廠DefaultThreadFactory,線程執(zhí)行器會將一個任務包裝為一個 FastThreadLocalThread對象,然后調(diào)用start方法開啟一個新的線程執(zhí)行任務!
三、創(chuàng)建對應數(shù)量的執(zhí)行器
- //創(chuàng)建執(zhí)行器數(shù)組 數(shù)量和預設線程數(shù)量一致
- children = new EventExecutor[nThreads];
- for (int i = 0; i < nThreads; i ++) {
- boolean success = false;
- try {
- //創(chuàng)建執(zhí)行器 開始創(chuàng)建執(zhí)行器 這里的執(zhí)行機估計就會EventLoop 是NioEventLoop
- children[i] = newChild(executor, args);
- success = true;
- } catch (Exception e) {
- .....省略不必要代碼
- } finally {
- .....省略不必要代碼
- }
- }
1. 源碼解析
- children = new EventExecutor[nThreads];
首先他會創(chuàng)建一個空的EventExecutor執(zhí)行器數(shù)組,然后遍歷填充!
還記得 nThreads是幾嗎? 默認是CPU*2的大小,所以這里會創(chuàng)建 CPU * 2數(shù)量的執(zhí)行器! 我們發(fā)現(xiàn),for循環(huán)中填充的主要邏輯是newChild,所以,我們進入到 newChild方法, 這里提示一點,我們創(chuàng)建的Group對象是一個什么對象? 是NioEventLoopGroup對象對不對,所以我們這里會進入到 NioEventLoopGroup#newChild方法:
- @Override
- protected EventLoop newChild(Executor executor, Object... args) throws Exception {
- EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
- return new NioEventLoop(this, executor, (SelectorProvider) args[0],
- ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
- }
我們傳遞過來 的args長度為3,前面做過解析 :
args[0]為 selectorProvider、args[1]為拒絕策略、args[2]為selectStrategyFactory
所以 queueFactory為null, 然后我們再重點關注 NioEventLoop對象,可以看出,newChild方法返回的是 NioEventLoop,那么我們初步就可以確定,EventExecutor數(shù)組里面存在的是NioEventLoop對象!至此,我們就不深究了,NioEventLoop的初始化源碼分析我會放到下一節(jié)課分析,這里我們可以確定一件事, EventExecutor數(shù)組里面存在的是NioEventLoop對象!我們繼續(xù)回到主線:
2. 執(zhí)行器數(shù)組總結(jié)
for循環(huán)完畢之后,此時的EventExecutor[nThreads];數(shù)組就被填充滿了,里面的每一個元素都是NioEventLoop對象,每一個NioEventLoop對象都包含一個 ThreadPerTaskExecutor線程執(zhí)行器對象!
四、創(chuàng)建一個執(zhí)行器選擇器
1. 源碼解析
- chooser = chooserFactory.newChooser(children);
還記得 chooserFactory是什么類型的嗎? 是DefaultEventExecutorChooserFactory類型的,忘了的可以往上翻一下尋找源碼的過程中的代碼或者調(diào)試一下!
我們進入到 DefaultEventExecutorChooserFactory#newChooser 源碼邏輯中,并傳入剛剛我們循環(huán)填充好的數(shù)組:
- @Override
- public EventExecutorChooser newChooser(EventExecutor[] executors) {
- //判斷2的冪 isPowerOfTwo
- if (isPowerOfTwo(executors.length)) {
- return new PowerOfTwoEventExecutorChooser(executors);
- } else {
- //簡單的
- return new GenericEventExecutorChooser(executors);
- }
- }
可以看到,這里似乎有兩種情況,返回的是不同的策略對象,當你的數(shù)組長度是2的冪等次方的時候,返回的是 PowerOfTwoEventExecutorChooser對象,否則返回 GenericEventExecutorChooser對象,我們就兩種情況全部分析一下:
I、PowerOfTwoEventExecutorChooser
- PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
- this.executors = executors;
- }
- @Override
- public EventExecutor next() {
- //2的冪等性 實現(xiàn)這個 也能實現(xiàn)循環(huán)取數(shù)的
- //executors 就是NioEventLoop數(shù)組 按照2次冪求本次獲取的EventLoop是個啥
- return executors[idx.getAndIncrement() & executors.length - 1];
- }
這段代碼的主要邏輯是,取一個自增的CAS類,與數(shù)組長度做&運算,最終會出現(xiàn)循環(huán)取數(shù)的結(jié)果:
從上面的圖片可以基本看出來, 該功能可以實現(xiàn)一個循環(huán)取數(shù)的功能,每次達到數(shù)組的尾部部都會重新回到頭部重新獲取!
代碼案例:
- public static void main(String[] args) {
- String[] strings = {"第一個", "第二個", "第三個", "第四個"};
- AtomicInteger idx = new AtomicInteger();
- for (int i = 0; i < 9; i++) {
- System.out.println(strings[idx.getAndIncrement() & strings.length -1]);
- }
- }
結(jié)果集
- 第一個
- 第二個
- 第三個
- 第四個
- 第一個
- 第二個
- 第三個
- 第四個
- 第一個
II、GenericEventExecutorChooser
當你的線程數(shù)量不是2的冪次方的時候,會走一個通用的選擇器,具體實現(xiàn)源碼如下:
- GenericEventExecutorChooser(EventExecutor[] executors) {
- this.executors = executors;
- }
- @Override
- public EventExecutor next() {
- //自增 取模 以達到循環(huán)的目的
- //假設executors 長度為5 那么 不斷的循環(huán)就會不斷的得到 0 1 2 3 4 0 1 2 3 4。。。
- return executors[Math.abs(idx.getAndIncrement() % executors.length)];
- }
這個代碼就不用了我做演示了吧,他的功能和上面那個功能是一樣的能 能夠達到一個循環(huán)取數(shù)的功能
思考
為什么Netty要分為兩個策略類來實現(xiàn)呢,直接用第二種不行嗎?
Netty官方對性能的要求達到了極致,大家要知道位運算速度要高于直接取模運算的,所以Netty官方即使是這一點也做了一個優(yōu)化!
2. 執(zhí)行器選擇器總結(jié)
我們通過上述可以了解到,這里會通過一個選擇器工廠創(chuàng)建一個選擇器,并保存在NioEvenetLoopGroup中,調(diào)用該選擇器的next方法會返回一個NioEventLoop對象,其中的獲取方式是不斷的循環(huán),依次獲取NioEventLoop對象,這也是一個NioEventLoop對SocketChannel為一對多的基礎!這都是后話!
NioEventLoopGroup源碼總結(jié)
- 創(chuàng)建一個線程執(zhí)行器,當調(diào)用該線程執(zhí)行器的execute方法的時候,會講一個Runable對象包裝為Thread對象,再將Thread對象包裝為FastThreadLocalThread對象,然后啟動起來! 簡單來說,每調(diào)用一次execute方法,都去創(chuàng)建并啟動一條新線程執(zhí)行任務!
- 創(chuàng)建一個執(zhí)行器數(shù)組,數(shù)組長度與我們傳遞的數(shù)量有關,默認為CPU*2個數(shù)量,然后再循環(huán)填充這個空數(shù)組,數(shù)組里面的元素是一個NioEventLoop對象,每一個NioEventLoop對會持有一個線程執(zhí)行器的引用!
- 創(chuàng)建一個執(zhí)行器選擇器,調(diào)用該執(zhí)行器選擇器的next方法可以返回一個NioEventLoop對象,內(nèi)部是進行循環(huán)取數(shù)的,每一個NioEventLoop都可能會被多次獲取!