Java 從零開始手寫 RPC—如何實現客戶端調用服務端?
寫完了客戶端和服務端,那么如何實現客戶端和服務端的調用呢?
下面就讓我們一起來看一下。
接口定義
計算方法
- package com.github.houbb.rpc.common.service;
- import com.github.houbb.rpc.common.model.CalculateRequest;
- import com.github.houbb.rpc.common.model.CalculateResponse;
- /**
- * <p> 計算服務接口 </p>
- *
- * <pre> Created: 2018/8/24 下午4:47 </pre>
- * <pre> Project: fake </pre>
- *
- * @author houbinbin
- * @since 0.0.1
- */
- public interface Calculator {
- /**
- * 計算加法
- * @param request 請求入參
- * @return 返回結果
- */
- CalculateResponse sum(final CalculateRequest request);
- }
pojo
對應的參數對象:
- CalculateRequest
- package com.github.houbb.rpc.common.model;
- import java.io.Serializable;
- /**
- * <p> 請求入參 </p>
- *
- * <pre> Created: 2018/8/24 下午5:05 </pre>
- * <pre> Project: fake </pre>
- *
- * @author houbinbin
- * @since 0.0.3
- */
- public class CalculateRequest implements Serializable {
- private static final long serialVersionUID = 6420751004355300996L;
- /**
- * 參數一
- */
- private int one;
- /**
- * 參數二
- */
- private int two;
- public CalculateRequest() {
- }
- public CalculateRequest(int one, int two) {
- this.one = one;
- this.two = two;
- }
- //getter setter toString
- }
- CalculateResponse
- package com.github.houbb.rpc.common.model;
- import java.io.Serializable;
- /**
- * <p> 請求入參 </p>
- *
- * <pre> Created: 2018/8/24 下午5:05 </pre>
- * <pre> Project: fake </pre>
- *
- * @author houbinbin
- * @since 0.0.3
- */
- public class CalculateResponse implements Serializable {
- private static final long serialVersionUID = -1972014736222511341L;
- /**
- * 是否成功
- */
- private boolean success;
- /**
- * 二者的和
- */
- private int sum;
- public CalculateResponse() {
- }
- public CalculateResponse(boolean success, int sum) {
- this.success = success;
- this.sum = sum;
- }
- //getter setter toString
- }
客戶端
核心部分
RpcClient 需要添加對應的 Handler,調整如下:
- Bootstrap bootstrap = new Bootstrap();
- ChannelFuture channelFuture = bootstrap.group(workerGroup)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<Channel>(){
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline()
- .addLast(new LoggingHandler(LogLevel.INFO))
- .addLast(new CalculateRequestEncoder())
- .addLast(new CalculateResponseDecoder())
- .addLast(new RpcClientHandler());
- }
- })
- .connect(RpcConstant.ADDRESS, port)
- .syncUninterruptibly();
netty 中的 handler 泳道設計的非常優雅,讓我們的代碼可以非常靈活地進行拓展。
接下來我們看一下對應的實現。
RpcClientHandler
- package com.github.houbb.rpc.client.handler;
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.client.core.RpcClient;
- import com.github.houbb.rpc.common.model.CalculateRequest;
- import com.github.houbb.rpc.common.model.CalculateResponse;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- /**
- * <p> 客戶端處理類 </p>
- *
- * <pre> Created: 2019/10/16 11:30 下午 </pre>
- * <pre> Project: rpc </pre>
- *
- * @author houbinbin
- * @since 0.0.2
- */
- public class RpcClientHandler extends SimpleChannelInboundHandler {
- private static final Log log = LogFactory.getLog(RpcClient.class);
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- CalculateRequest request = new CalculateRequest(1, 2);
- ctx.writeAndFlush(request);
- log.info("[Client] request is :{}", request);
- }
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- CalculateResponse response = (CalculateResponse)msg;
- log.info("[Client] response is :{}", response);
- }
- }
這里比較簡單,channelActive 中我們直接發起調用,入參的對象為了簡單,此處固定寫死。
channelRead0 中監聽服務端的相應結果,并做日志輸出。
CalculateRequestEncoder
請求參數是一個對象,netty 是無法直接傳輸的,我們將其轉換為基本對象:
- package com.github.houbb.rpc.client.encoder;
- import com.github.houbb.rpc.common.model.CalculateRequest;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToByteEncoder;
- /**
- * @author binbin.hou
- * @since 0.0.3
- */
- public class CalculateRequestEncoder extends MessageToByteEncoder<CalculateRequest> {
- @Override
- protected void encode(ChannelHandlerContext ctx, CalculateRequest msg, ByteBuf out) throws Exception {
- int one = msg.getOne();
- int two = msg.getTwo();
- out.writeInt(one);
- out.writeInt(two);
- }
- }
CalculateResponseDecoder
針對服務端的響應,也是同理。
我們需要把基本的類型,封裝轉換為我們需要的對象。
- package com.github.houbb.rpc.client.decoder;
- import com.github.houbb.rpc.common.model.CalculateResponse;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- import java.util.List;
- /**
- * 響應參數解碼
- * @author binbin.hou
- * @since 0.0.3
- */
- public class CalculateResponseDecoder extends ByteToMessageDecoder {
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- boolean success = in.readBoolean();
- int sum = in.readInt();
- CalculateResponse response = new CalculateResponse(success, sum);
- out.add(response);
- }
- }
服務端
設置處理類
RpcServer 中的處理類要稍微調整一下,其他的保持不變。
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(workerGroup, bossGroup)
- .channel(NioServerSocketChannel.class)
- // 打印日志
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline()
- .addLast(new CalculateRequestDecoder())
- .addLast(new CalculateResponseEncoder())
- .addLast(new RpcServerHandler());
- }
- })
- // 這個參數影響的是還沒有被accept 取出的連接
- .option(ChannelOption.SO_BACKLOG, 128)
- // 這個參數只是過一段時間內客戶端沒有響應,服務端會發送一個 ack 包,以判斷客戶端是否還活著。
- .childOption(ChannelOption.SO_KEEPALIVE, true);
RpcServerHandler
一開始這里是空實現,我們來添加一下對應的實現。
- package com.github.houbb.rpc.server.handler;
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.common.model.CalculateRequest;
- import com.github.houbb.rpc.common.model.CalculateResponse;
- import com.github.houbb.rpc.common.service.Calculator;
- import com.github.houbb.rpc.server.service.CalculatorService;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- /**
- * @author binbin.hou
- * @since 0.0.1
- */
- public class RpcServerHandler extends SimpleChannelInboundHandler {
- private static final Log log = LogFactory.getLog(RpcServerHandler.class);
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- final String id = ctx.channel().id().asLongText();
- log.info("[Server] channel {} connected " + id);
- }
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- final String id = ctx.channel().id().asLongText();
- CalculateRequest request = (CalculateRequest)msg;
- log.info("[Server] receive channel {} request: {} from ", id, request);
- Calculator calculator = new CalculatorService();
- CalculateResponse response = calculator.sum(request);
- // 回寫到 client 端
- ctx.writeAndFlush(response);
- log.info("[Server] channel {} response {}", id, response);
- }
- }
讀取到客戶端的訪問之后,我們獲取到計算的入參 CalculateRequest,然后調用 sum 方法,獲取到對應的 CalculateResponse,將結果通知客戶端。
CalculateRequestDecoder
這里和客戶端是一一對應的,我們首先把 netty 傳遞的基本類型轉換為 CalculateRequest 對象。
- package com.github.houbb.rpc.server.decoder;
- import com.github.houbb.rpc.common.model.CalculateRequest;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- import java.util.List;
- /**
- * 請求參數解碼
- * @author binbin.hou
- * @since 0.0.3
- */
- public class CalculateRequestDecoder extends ByteToMessageDecoder {
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- int one = in.readInt();
- int two = in.readInt();
- CalculateRequest request = new CalculateRequest(one, two);
- out.add(request);
- }
- }
CalculateResponseEncoder
這里和客戶端類似,我們需要把 response 轉換為基本類型進行網絡傳輸。
- package com.github.houbb.rpc.server.encoder;
- import com.github.houbb.rpc.common.model.CalculateResponse;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToByteEncoder;
- /**
- * @author binbin.hou
- * @since 0.0.3
- */
- public class CalculateResponseEncoder extends MessageToByteEncoder<CalculateResponse> {
- @Override
- protected void encode(ChannelHandlerContext ctx, CalculateResponse msg, ByteBuf out) throws Exception {
- boolean success = msg.isSuccess();
- int result = msg.getSum();
- out.writeBoolean(success);
- out.writeInt(result);
- }
- }
CalculatorService
服務端對應的實現類。
- public class CalculatorService implements Calculator {
- @Override
- public CalculateResponse sum(CalculateRequest request) {
- int sum = request.getOne()+request.getTwo();
- return new CalculateResponse(true, sum);
- }
- }
測試
服務端
啟動服務端:
- new RpcServer().start();
服務端啟動日志:
- [DEBUG] [2021-10-05 11:53:11.795] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
- [INFO] [2021-10-05 11:53:11.807] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務開始啟動服務端
- 十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler channelRegistered
- 信息: [id: 0xd399474f] REGISTERED
- 十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler bind
- 信息: [id: 0xd399474f] BIND: 0.0.0.0/0.0.0.0:9527
- 十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler channelActive
- 信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE
- [INFO] [2021-10-05 11:53:13.101] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服務端啟動完成,監聽【9527】端口
客戶端
啟動客戶端:
- new RpcClient().start();
日志如下:
- [DEBUG] [2021-10-05 11:54:12.158] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
- [INFO] [2021-10-05 11:54:12.164] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服務開始啟動客戶端
- 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRegistered
- 信息: [id: 0x4d75c580] REGISTERED
- 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler connect
- 信息: [id: 0x4d75c580] CONNECT: /127.0.0.1:9527
- 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelActive
- 信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] ACTIVE
- [INFO] [2021-10-05 11:54:13.403] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服務啟動客戶端完成,監聽端口:9527
- 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler write
- 信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] WRITE: 8B
- +-------------------------------------------------+
- | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
- +--------+-------------------------------------------------+----------------+
- |00000000| 00 00 00 01 00 00 00 02 |........ |
- +--------+-------------------------------------------------+----------------+
- 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler flush
- 信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] FLUSH
- [INFO] [2021-10-05 11:54:13.450] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelActive] - [Client] request is :CalculateRequest{one=1, two=2}
- 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRead
- 信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] READ: 5B
- +-------------------------------------------------+
- | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
- +--------+-------------------------------------------------+----------------+
- |00000000| 01 00 00 00 03 |..... |
- +--------+-------------------------------------------------+----------------+
- 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
- 信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] READ COMPLETE
- [INFO] [2021-10-05 11:54:13.508] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :CalculateResponse{success=true, sum=3}
可以看到,輸出了對應的請求參數和響應結果。
當然,此時服務端也有對應的新增日志:
- 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRead
- 信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ: [id: 0xbc9f5927, L:/127.0.0.1:9527 - R:/127.0.0.1:54030]
- 十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
- 信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ COMPLETE
- [INFO] [2021-10-05 11:54:13.432] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelActive] - [Server] channel {} connected 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927
- [INFO] [2021-10-05 11:54:13.495] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 request: CalculateRequest{one=1, two=2} from
- [INFO] [2021-10-05 11:54:13.505] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 response CalculateResponse{success=true, sum=3}