聊聊 Netty 客戶端斷線重連的設計與實現
其實Netty基于網絡連接聲明周期暴露了很多提供用戶自實現的API,而本文將基于其中的一個拓展點實現連接可靠性,希望對你有幫助。
詳解Netty客戶端斷線重連的設計和實現
Netty生命周期中的channelInactive方法
讀過筆者往期文章的讀者大體是都知道channelInactive這個回調方法,我們從其注釋即可知曉:注冊的ChannelHandlerContext 的 Channel現在已經是不活躍即已經不可用的連接,就會調用pipeline上所有的處理器執行其內部實現的channelInactive處理剩余業務:
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
* end of lifetime.
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
實際上channelInactive的執行我們也可以通過源碼的方式讓讀者了解,我們以客戶端連接為例,一旦客戶端斷開連接,客戶端的selector就會輪循到連接關閉事件,便會將對應客戶端的channel取消并調用channelInactive方法:
從源碼角度來NioEventLoop輪詢到關閉事件后會直接執行該事件closeOnRead方法,其內部判斷連接非open狀態則會直接調用close進行連接關閉操作:
protected class NioByteUnsafe extends AbstractNioUnsafe {
private void closeOnRead(ChannelPipeline pipeline) {
if (isOpen()) {
//......
} else {
//調用close執行關閉連接
close(voidPromise());
}
}
}
close邏輯內部最終會定位到客戶端的socketchannel執行到AbstractChannel的close方法,其內部會向eventLoop注冊一個doDeregister的事件,該事件會將客戶端socket注冊的讀寫事件取消,完成后就會調用fireChannelInactive走到channelInactive回調,通知當前客戶端netty這個socket的遠程連接不再活躍,已經斷開了:
對此我們給出上圖所示的源碼片段,改代碼位于AbstractChannel的close方法,其內部核心邏輯就是調用fireChannelInactiveAndDeregister移除客戶端socket的讀寫事件并觸發channelInactive的回調通知:
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
//......
if (closeExecutor != null) {
//......
} else {
//......
} else {
//調用fireChannelInactiveAndDeregister移除斷開連接的客戶端socket并觸發channelInactive回調
fireChannelInactiveAndDeregister(wasActive);
}
}
}
fireChannelInactiveAndDeregister內部核心邏輯就是deregister方法,可以看到該方法核心邏輯就是提交給eventLoop一個異步任務,也就是我們上圖所說的移除客戶端讀寫事件的方法,方法名是doDeregister,完成該方法調用后就會調用fireChannelInactive方法,告知服務端這個客戶端channel連接已斷開:
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
//......
invokeLater(new Runnable() {
@Override
public void run() {
try {
//移除客戶端讀寫事件
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
//觸發客戶端channel的channelInactive回調
if (fireChannelInactive) {
pipeline.fireChannelInactive();
}
//......
}
}
});
}
對此我們給出doDeregister的邏輯,可以看到其內部拿到eventLoop事件輪詢器,通過調用cancel移除當前客戶端socket讀寫事件:
@Override
protected void doDeregister() throws Exception {
//通過selectionKey獲取斷開連接的客戶端讀寫事件的key,通過cancel移除這些事件
eventLoop().cancel(selectionKey());
}
Netty斷線重連思路與實現
由此我們知曉要想實現斷線重連,客戶端可以通過重寫channelInactive方法,確保在感知到連接斷開時再次提交一個連接的延遲事件,知道斷線的連接再次恢復,由此保證客戶端連接可靠性:
最終我們給出斷線重連的ReconnectHandler,其內部邏輯很簡單,延遲5秒后向eventLoop提交一個斷線重連的異步連接任務直到成功,完成后我們將這個處理器添加到客戶端的pipeline即可:
public class ReconnectHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//提交斷線重連的延遲任務
scheduledDoReConnect(ctx);
ctx.fireChannelInactive();
}
private ScheduledFuture<?> scheduledDoReConnect(ChannelHandlerContext ctx) {
//拿到當前channel的eventLoop提交一個連接遠程服務端的延遲任務
ScheduledFuture<?> scheduledFuture = ctx.channel().eventLoop().schedule(() -> {
ChannelFuture channelFuture = ctx.channel().connect(new InetSocketAddress("127.0.0.1", 8888));
channelFuture.addListener(f -> {
if (!f.isSuccess()) {
//如果失敗則遞歸調用scheduledDoReConnect再次嘗試
scheduledDoReConnect(ctx);
} else {
System.out.println("reconnect success.");
}
});
}, 5, TimeUnit.SECONDS);
return scheduledFuture;
}
}
小結
自此我們基于Netty生命周期的源碼剖析給出客戶端斷線重連的設計和落地思路,希望對你有幫助。