來聊聊 Netty 幾個開箱即用的處理器框架
本篇文章我們將基于Netty幾個開箱即用的封裝快速落地一個易于拓展與維護的客戶端服務端通信示例,希望對你有所幫助。
基于Netty快速落地自定義協議消息通信
1.提出需求
我們需要使用Netty快速落地一套客戶端和服務端系統通信程序,客戶端會在與服務端建立連接后發送自定義協議的登錄包,然后服務端完成校驗之后返回自定義協議的登錄處理結果:
2.服務端設計與實現
按照我們以往的處理器邏輯,對于服務端我們可能會編寫一個處理器handler,其內部負責:
- 對收到的數據包解碼。
- 根據數據包類型走不同的if-else邏輯。
- 回復相應的加密后的數據包。
這種做法將編碼、解碼、數據邏輯全部耦合在一個處理器上,違背了單一職責的設計,導致代碼臃腫,后續的功能的拓展和維護都十分不便。
對此本文做法是針對不同數據包指定相應處理器,通過pipeline自帶的責任鏈模式將這些處理器串聯起來,并將編碼和解碼的handler單獨抽離出來維護:
因為客戶端會向服務端發送登錄包,對應文件編碼規則為:
- 第一個整形位,設置為登錄包類型為1。
- 第二個整型為設置為登錄包數據長度。
- 第三個字節數組設置為序列化后的數據包。
所以我們解碼的邏輯為:
- 獲取4個字節知曉類型。
- 獲取4個字節解析長度。
- 讀取對應長度的字節數組將其反序列化為對應類型的數據包。
而Netty也為我們解碼的邏輯提供了一個類MessageToMessageDecoder,我們只需繼承并重寫其decode方法,將bytebuf解碼后的結果傳入out列表中即可傳播到對應的處理器上:
對此我們給出解碼器的處理器Handler的邏輯,可以看到該解碼器會按照編碼的要求進行解析:
public class ServerDecodeHandler extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//獲取消息類型
int type = msg.readInt();
if (type == 1) {
//獲取實際消息長度
int length = msg.readInt();
//讀取數據并反序列化
byte[] data = new byte[length];
msg.readBytes(data);
LoginPacket loginPacket = JSON.parseObject(data, LoginPacket.class);
out.add(loginPacket);
}
}
}
消息被解碼器解碼之后,就可以傳播到對應業務處理器上,為了保證讀取到不同的消息被不同業務處理器處理,Netty提供了一個開箱即用的讀消息處理器,它會根據我們的指定的泛型為數據包進行匹配,只有與泛型類一致才會進行處理:
所以我們的認證處理器AuthHandler 繼承SimpleChannelInboundHandler并指明泛型LoginPacket專門處理讀取到的LoginPacket:
public class AuthHandler extends SimpleChannelInboundHandler<LoginPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginPacket msg) throws Exception {
//如果用戶名和密碼一致則通過loginResp發送一個hello包,反之回復發送失敗
if (msg.getUserName().equals("user") && msg.getPassword().equals("123456")) {
LoginRespPacket loginRespPacket = new LoginRespPacket();
loginRespPacket.setType(2);
loginRespPacket.setMessage("hello netty client");
ctx.writeAndFlush(loginRespPacket);
} else {
LoginRespPacket loginRespPacket = new LoginRespPacket();
loginRespPacket.setType(2);
loginRespPacket.setMessage("login failed");
ctx.writeAndFlush(loginRespPacket);
}
}
}
該處理器匹配消息包的邏輯我們可以通過源碼進行簡單介紹,當解碼后的數據包通過pipeline傳播來到AuthHandler 時,它會調用繼承自SimpleChannelInboundHandler的channelRead方法并通過acceptInboundMessage查看這個消息類型和泛型是否一致,如果一致則會調用channelRead0最終回調到我們的channelRead0方法,而且相較于channelHandler,我們的SimpleChannelInboundHandler還會在finally語句自動按需檢查并釋放bytebuf內存:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
//判斷當前消息類類型和指明的泛型是否匹配
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
//如果匹配則直接調用我們重寫的channelRead0
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
//調用結束后還會檢查按需釋放bytebuf內存
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
認證處理器確定登錄包正確,則發送loginResp響應,對應的數據包也需要按照類型、長度、序列化包字符串的格式進行編碼,所以我們還需要編寫一個編碼器,同理我們還是使用Netty開箱即用的MessageToByteEncoder將編碼后數據寫到out這個bytebuf中:
public class ServerEncodeHandler extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception { //如果是Resp類型,則依次寫入類型、長度、序列化包到ByteBuf中
if (msg.getType() == 2) {
LoginRespPacket loginRespPacket = (LoginRespPacket) msg;
out.writeInt(loginRespPacket.getType());
byte[] bytes = JSON.toJSONBytes(loginRespPacket);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
}
3.客戶端設計與實現
而客戶端也和上文類型,我們先編寫一個連接激活后發送登錄包的處理器:
public class LoginHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LoginPacket loginPacket = new LoginPacket();
loginPacket.setUserName("user");
loginPacket.setPassword("123456");
ctx.writeAndFlush(loginPacket).
}
}
然后就是編碼器,同樣是繼承MessageToByteEncoder實現:
public class ClientEncodeHandler extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception {
//按照類型、長度、序列化包進行編碼
if (msg.getType() == 1) {
LoginPacket loginPacket = (LoginPacket) msg;
byte[] jsonBytes = JSON.toJSONBytes(loginPacket);
out.writeInt(msg.getType());
out.writeInt(jsonBytes.length);
out.writeBytes(jsonBytes);
}
}
}
收到包后,根據第一個整型字節匹配到LoginRespPacket,將其解碼為LoginRespPacket:
public class ClientDecodeHandler extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
int type = msg.readInt();
//如果type為2則說明是loginResp,按照類型、長度、反序列化包處理器
if (type == 2) {
int length = msg.readInt();
byte[] data = new byte[length];
msg.readBytes(data);
LoginRespPacket loginRespPacket = JSON.parseObject(data, LoginRespPacket.class);
out.add(loginRespPacket);
}
}
}
最終傳播到LoginRespHandler打印輸出:
public class LoginRespHandler extends SimpleChannelInboundHandler<LoginRespPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRespPacket msg) throws Exception {
System.out.println(JSONUtil.toJsonStr(msg));
}
}
4.最終成果驗收
按照上述解耦的處理器完成開發之后,我們分別啟動服務端和客戶端,最終客戶端就會得到如下輸出:
{"message":"hello netty client","type":2}
由此基于Netty開箱即用的客戶端服務端通信模型完成。