解密 Netty 高性能之謎:NioEventLoop 線程池阻塞分析與調優策略
我們使用NioEventLoop常會出現一個奇怪的現象,在消息密集的情況下,服務端處理會斷斷續續的,偶發出現消息處理阻塞,經過不斷的摸索排查發現是線程池使用不當導致的,遂此文簡單介紹一下這個故障的現象和排查思路。
詳解NioEventLoop阻塞問題分析與解決過程
1. 故障復現
在演示代碼之前,我們不妨先來了解一下這個需求,客戶端和服務端建立連接之后,會向該通道不斷發送消息。然后服務端收到消息,會將消息提交到業務線程池中異步處理:
2. 客戶端代碼實現分析
先來看看客戶端的connect代碼,就是一套標準的模板代碼,設置好對應參數以及業務處理器之后,直接向服務端的9999端口發起連接:
public class NettyClient {
public void connect() throws Exception {
EventLoopGroup group = new NioEventLoopGroup(8);
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//業務處理器
ch.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 9999).sync();
//......
}
}
對應我們給出客戶端處理器的代碼,和服務端建立了連接之后,創建一個線程,無限循環,每次刷一個消息就休息1ms:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
static final int MSG_SIZE = 256;
@Override
public void channelActive(ChannelHandlerContext ctx) {
new Thread(() -> {
//無限循環,每隔一毫秒發送一次消息
while (true) {
ByteBuf firstMessage = Unpooled.buffer(MSG_SIZE);
for (int i = 0; i < firstMessage.capacity(); i++) {
firstMessage.writeByte((byte) i);
}
//刷一次消息后休眠1ms
ctx.writeAndFlush(firstMessage);
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (Exception e) {
//......
}
}
}).start();
}
}
3. 服務端處理邏輯分析
而服務端啟動類也比較簡單,就是一套比較經典的NIO模板:
public class NettyServer {
public static void main(String[] args) throws Exception {
//聲明主從reactor
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//追加業務處理器NettyServerHandler
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyServerHandler());
}
});
//監聽9999端口
ChannelFuture f = b.bind(9999).sync();
//......
}
}
NettyServerHandler 處理器的邏輯也比較簡單,簡單的將消息提交到業務線程池中執行即可,注意筆者代碼中的一行代碼Thread.currentThread() == ctx.channel().eventLoop()這就是后續問題引發的關鍵:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static AtomicInteger sum = new AtomicInteger(0);
//設置一個最大線程數為3的線程池,當線程處理不過來的時候采用CallerRunsPolicy策略
private static ExecutorService executorService = new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//原子類記錄收到消息數以及打印消息時間
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
String date = simpleDateFormat.format(new Date());
System.out.println("--> Server receive client message : " + sum.incrementAndGet() + "time: " + date);
//將消息提交到業務線程池中處理
executorService.execute(() -> {
ByteBuf req = (ByteBuf) msg;
//如果當前執行線程是nio線程則休眠15s
if (Thread.currentThread() == ctx.channel().eventLoop())
try {
TimeUnit.SECONDS.sleep(15);
} catch (Exception e) {
e.printStackTrace();
}
//轉發消息,此處代碼省略,轉發成功之后返回響應給終端
ctx.writeAndFlush(req);
});
}
//......
}
自此我們的代碼都編寫完成,我們不妨將服務端和客戶端代碼都啟動。通過控制臺可以發現,1毫秒發送的消息,會時不時的卡15s才能繼續處理消息。
4. 排查思路
這類問題我們用jvisualvm看看GC情況是否正常,看看是不是頻繁的Full GC導致整個進程處于STW狀態導致消息任務阻塞。
監控結果如下,很明顯GC沒有問題,我們只能看看CPU使用情況。
很明顯的CPU使用情況也是正常,沒有什么奇奇怪怪的任務導致使用率飆升。
所以我們只能看看線程使用情況了,果然,我們發現NioEventLoop居然長時間的處于休眠狀態:
所以我們用jps定位Java進程id后鍵入jstack查看線程使用情況:
jstack -l 17892
自此我們終于找到了線程長期休眠的原因,從下面的堆棧我們可以看出,正是任務量巨大,導致業務線程池無法及時處理消息,最終業務線程池走到了拒絕策略,這就使得業務線程池一直走到CallerRunsPolicy,也就是說業務線程池忙不過來的時候會將任務交由NioEventLoop執行。而一個連接只會有一個NioEventLoop的線程執行,使得原本非常忙碌的NioEventLoop還得分神處理一下我們業務線程池的任務。
為了驗證這一點,我們不妨在業務線程池中打印線程名:
//將消息提交到業務線程池中處理
executorService.execute(() -> {
System.out.println(" executorService execute thread name: "+Thread.currentThread().getName());
ByteBuf req = (ByteBuf) msg;
//其它業務邏輯處理,訪問數據庫
if ((Thread.currentThread() == ctx.channel().eventLoop()))
try {
//訪問數據庫,模擬偶現的數據庫慢,同步阻塞15秒
TimeUnit.SECONDS.sleep(15);
} catch (Exception e) {
e.printStackTrace();
}
//轉發消息,此處代碼省略,轉發成功之后返回響應給終端
ctx.writeAndFlush(req);
});
最終我們可以看到,線程池中的任務都被nioEventLoopGroup這個線程執行,所以這也是筆者為什么在模擬問題時在if中增加 (Thread.currentThread() == ctx.channel().eventLoop())的原因,就是為了模仿那些耗時的業務被nioEventLoopGroup的線程執行的情況,例如:一個耗時需要15s的任務剛剛好因為拒絕策略被nioEventLoopGroup執行,那么Netty服務端的消息處理自然就會阻塞,出現本文所說的問題。
5. 解決方案
從上文的分析中我們可以得出下面這樣一個結果,所以解決該問題的方式又兩種:
- 調整業務線程池大小,提升線程池處理效率并適當增加隊列長度。
- 調整拒絕策略,處理不過來時直接丟棄。
以筆者為例,結合各種耗時工具排查后發現夯住線程池的業務功能存在可以優化的空間,所以將功能優化后結合arthas等工具大體可以定位到阻塞隊列穩定的消息數,最終給的策略就是優化功能代碼+調大阻塞隊列和最大線程數:
對應我們給出線程池優化后的參數,整體上又優化了任務處理速度避免了線程池夯住:
//調大阻塞隊列
private static ExecutorService executorService = new ThreadPoolExecutor(1, 8, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10_0000), new ThreadPoolExecutor.CallerRunsPolicy());
自此之后我們再查看控制臺輸出和NioEventLoop線程狀態,發現運行都沒有阻塞,那些實在無法處理的消息都被丟棄了:
總結
自此我們對于本次的事件總結出以下幾點要求和建議:
- 耗時操作不要用NioEventLoop,尤其是本次這種高并發且拒絕策略配置為用執行線程接收忙碌任務的方式。
- 服務端收不到消息時,建議優先從CPU、GC、線程等角度分析問題。
- 服務端開發時建議使用兩個NioEventLoop構成主從Reactor模式,并結合業務場景壓測出合適的線程數。