自己手寫RPC如何實(shí)現(xiàn)同步、異步、單向調(diào)用?直接上代碼!
很多好用的RPC框架都支持服務(wù)消費(fèi)者以同步、異步和單向調(diào)用的方式與服務(wù)提供者進(jìn)行交互,冰河你開發(fā)的這個(gè)RPC框架也可以嗎?
一、前言
在前面的章節(jié)中,實(shí)現(xiàn)了服務(wù)消費(fèi)者屏蔽掉基于Netty連接服務(wù)提供者的實(shí)現(xiàn)細(xì)節(jié)的前提下,以異步轉(zhuǎn)同步的方式調(diào)用服務(wù)提供者。在外部服務(wù)調(diào)用服務(wù)消費(fèi)者向服務(wù)提供者發(fā)送數(shù)據(jù)的方法時(shí),能夠直接獲取到服務(wù)提供者調(diào)用真實(shí)方法返回的結(jié)果數(shù)據(jù)。
那RPC框架只支持同步調(diào)用的話,在高并發(fā)環(huán)境下肯定會(huì)出現(xiàn)性能問題,我想讓RPC框架支持同步、異步和單向調(diào)用,這也是很多優(yōu)秀的RPC框架都支持的功能,這個(gè)有辦法實(shí)現(xiàn)嗎?
我:安排。。。
二、目標(biāo)
在服務(wù)提供者一端實(shí)現(xiàn)了按照自定義網(wǎng)絡(luò)傳輸協(xié)議和數(shù)據(jù)編解碼對(duì)接收到的數(shù)據(jù)進(jìn)行解析,并且能夠?qū)⒔馕龅降臄?shù)據(jù)作為參數(shù)調(diào)用真實(shí)方法,并接收真實(shí)方法返回的結(jié)果數(shù)據(jù),通過自定義網(wǎng)絡(luò)協(xié)議和數(shù)據(jù)編解碼,將數(shù)據(jù)編碼成二進(jìn)制字節(jié)流,傳輸給服務(wù)消費(fèi)者。
在服務(wù)消費(fèi)者一端實(shí)現(xiàn)了按照自定義的網(wǎng)絡(luò)傳輸協(xié)議和數(shù)據(jù)編解碼,將數(shù)據(jù)編碼成二進(jìn)制字節(jié)流發(fā)送給服務(wù)提供者,能夠接收到服務(wù)提供者響應(yīng)回來的二進(jìn)制字節(jié)流數(shù)據(jù),并且能夠根據(jù)自定義網(wǎng)絡(luò)傳輸協(xié)議和數(shù)據(jù)編解碼,將接收到的二進(jìn)制字節(jié)流數(shù)據(jù)解碼成對(duì)應(yīng)的明文數(shù)據(jù),接下來,進(jìn)行進(jìn)一步處理。
同時(shí),服務(wù)消費(fèi)者支持在屏蔽掉基于Netty連接服務(wù)提供者的實(shí)現(xiàn)細(xì)節(jié)的前提下,使得外部服務(wù)調(diào)用服務(wù)消費(fèi)者向服務(wù)提供者發(fā)送數(shù)據(jù)的方法時(shí),能夠直接獲取到服務(wù)提供者調(diào)用真實(shí)方法返回的結(jié)果數(shù)據(jù)。
做到這里,已經(jīng)初步實(shí)現(xiàn)了RPC框架最基本的功能。這還遠(yuǎn)遠(yuǎn)不夠,服務(wù)消費(fèi)者除了能夠以同步的方式調(diào)用服務(wù)提供者,也要支持異步調(diào)用和單向調(diào)用,看看人家Dubbo,做的是真特么牛逼。
好了,不羨慕人家,我們自己踏踏實(shí)實(shí)手?jǐn)]吧,接下來,我們就實(shí)現(xiàn)服務(wù)消費(fèi)者以同步、異步、單向調(diào)用的方式與服務(wù)提供者進(jìn)行交互。
三、設(shè)計(jì)
服務(wù)消費(fèi)者與服務(wù)提供者之間基于同步、異步和單向調(diào)用的設(shè)計(jì)圖分別如下圖所示
- 同步調(diào)用
- 異步調(diào)用
- 單向調(diào)用
通過上圖可以看出:
(1)同步調(diào)用的方式,服務(wù)消費(fèi)者發(fā)起數(shù)據(jù)請(qǐng)求后,會(huì)同步等待返回結(jié)果。
(2)異步調(diào)用的方式,服務(wù)消費(fèi)者發(fā)起數(shù)據(jù)請(qǐng)求后,會(huì)立刻返回,后續(xù)會(huì)通過異步的方式獲取數(shù)據(jù)。
(3)單向調(diào)用的方式,服務(wù)消費(fèi)者發(fā)起數(shù)據(jù)請(qǐng)求后,會(huì)立刻返回,不必關(guān)注后續(xù)數(shù)據(jù)的處理結(jié)果。
可以看到,從設(shè)計(jì)上還是比較簡(jiǎn)單的,接下來,我們就一起實(shí)現(xiàn)它。
四、實(shí)現(xiàn)
1.工程結(jié)構(gòu)
- bhrpc-annotation:實(shí)現(xiàn)bhrpc框架的核心注解工程。
- bhrpc-codec:實(shí)現(xiàn)bhrpc框架的自定義編解碼功能。
- bhrpc-common:實(shí)現(xiàn)bhrpc框架的通用工具類,包含服務(wù)提供者注解與服務(wù)消費(fèi)者注解的掃描器。
- bhrpc-constants:存放實(shí)現(xiàn)bhrpc框架通用的常量類。
- bhrpc-consumer:服務(wù)消費(fèi)者父工程
- bhrpc-consumer-common:服務(wù)消費(fèi)者通用工程
- bhrpc-protocol:實(shí)現(xiàn)bhrpc框架的自定義網(wǎng)絡(luò)傳輸協(xié)議的工程。
- bhrpc-provider:服務(wù)提供者父工程。
- bhrpc-provider-common:服務(wù)提供者通用工程。
- bhrpc-provider-native:以純Java方式啟動(dòng)bhrpc框架的工程。
- bhrpc-serialization:實(shí)現(xiàn)bhrpc框架序列化與反序列化功能的父工程。
- bhrpc-serialization-api:實(shí)現(xiàn)bhrpc框架序列化與反序列化功能的通用接口工程。
- bhrpc-serialization-jdk:以JDK的方式實(shí)現(xiàn)序列化與反序列化功能。
- bhrpc-test:測(cè)試bhrpc框架的父工程。
- bhrpc-test-consumer-codec:測(cè)試服務(wù)消費(fèi)者基于自定義網(wǎng)絡(luò)協(xié)議與編解碼與服務(wù)提供者進(jìn)行數(shù)據(jù)交互
- bhrpc-test-consumer-handler:測(cè)試屏蔽服務(wù)消費(fèi)者基于Netty與服務(wù)提供者建立連接的細(xì)節(jié)后,與服務(wù)提供者進(jìn)行數(shù)據(jù)通信
- bhrpc-test-api:測(cè)試的通用Servcie接口工程
- bhrpc-test-provider:測(cè)試服務(wù)提供者的工程。
- bhrpc-test-consumer:測(cè)試服務(wù)消費(fèi)者的工程
- bhrpc-test-scanner:測(cè)試掃描器的工程。
2.核心類實(shí)現(xiàn)關(guān)系
服務(wù)消費(fèi)者與服務(wù)提供者之間基于同步、異步和單向調(diào)用的實(shí)現(xiàn)類關(guān)系如下圖所示。
可以看到,核心類之間的實(shí)現(xiàn)關(guān)系還是比較清晰的。
3.RPC上下文RpcContext類的實(shí)現(xiàn)
RpcContext類位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.context.RpcContext,源碼如下所示。
public class RpcContext {
private RpcContext(){
}
/**
* RpcContext實(shí)例
*/
private static final RpcContext AGENT = new RpcContext();
/**
* 存放RPCFuture的InheritableThreadLocal
*/
private static final InheritableThreadLocal<RPCFuture> RPC_FUTURE_INHERITABLE_THREAD_LOCAL = new InheritableThreadLocal<>();
/**
* 獲取上下文
* @return RPC服務(wù)的上下文信息
*/
public static RpcContext getContext(){
return AGENT;
}
/**
* 將RPCFuture保存到線程的上下文
* @param rpcFuture
*/
public void setRPCFuture(RPCFuture rpcFuture){
RPC_FUTURE_INHERITABLE_THREAD_LOCAL.set(rpcFuture);
}
/**
* 獲取RPCFuture
*/
public RPCFuture getRPCFuture(){
return RPC_FUTURE_INHERITABLE_THREAD_LOCAL.get();
}
/**
* 移除RPCFuture
*/
public void removeRPCFuture(){
RPC_FUTURE_INHERITABLE_THREAD_LOCAL.remove();
}
}
可以看到,在RpcContext類中主要是通過InheritableThreadLocal在維護(hù)RPCFuture,并且每個(gè)線程維護(hù)RPCFuture時(shí),都是相互隔離的。RpcContext類中維護(hù)的RPCFuture會(huì)在RPC框架全局有效。
4.修改消費(fèi)者RpcConsumerHandler處理器類
RpcConsumerHandler類位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.handler.RpcConsumerHandler,具體修改步驟如下所示。
(1)新增sendRequestSync()方法
sendRequestSync()方法表示同步調(diào)用的方法,源碼如下所示。
private RPCFuture sendRequestSync(RpcProtocol<RpcRequest> protocol) {
RPCFuture rpcFuture = this.getRpcFuture(protocol);
channel.writeAndFlush(protocol);
return rpcFuture;
}
可以看到,在sendRequestSync()方法中,調(diào)用channel的writeAndFlush()方法發(fā)送數(shù)據(jù)后,會(huì)返回RPCFuture對(duì)象。
(2)新增sendRequestAsync()方法
sendRequestAsync()方法表示異步調(diào)用的方法,源碼如下所示。
private RPCFuture sendRequestAsync(RpcProtocol<RpcRequest> protocol) {
RPCFuture rpcFuture = this.getRpcFuture(protocol);
//如果是異步調(diào)用,則將RPCFuture放入RpcContext
RpcContext.getContext().setRPCFuture(rpcFuture);
channel.writeAndFlush(protocol);
return null;
}
可以看到,sendRequestAsync()方法中,會(huì)將RPCFuture對(duì)象放入RpcContext上下文中,最終返回null。外部服務(wù)調(diào)用服務(wù)消費(fèi)者向服務(wù)提供者發(fā)送數(shù)據(jù)后,會(huì)通過RpcContext獲取到RPCFuture對(duì)象,進(jìn)而通過RPCFuture對(duì)象獲取最終結(jié)果數(shù)據(jù)。
(3)新增sendRequestOneway()方法
sendRequestOneway()方法表示單向調(diào)用的方法,源碼如下所示。
private RPCFuture sendRequestOneway(RpcProtocol<RpcRequest> protocol) {
channel.writeAndFlush(protocol);
return null;
}
可以看到,單向調(diào)用方法并不關(guān)心返回結(jié)果。sendRequestOneway()方法直接調(diào)用channel的writeAndFlush()方法,并返回null。
(4)修改sendRequest()方法
在sendRequest()方法的參數(shù)中新增是否是異步調(diào)用的async參數(shù)和是否是單向調(diào)用的oneway參數(shù),以這些參數(shù)來判斷是執(zhí)行同步調(diào)用、異步調(diào)用還是單向調(diào)用,源碼如下所示。
public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol, boolean async, boolean oneway){
logger.info("服務(wù)消費(fèi)者發(fā)送的數(shù)據(jù)===>>>{}", JSONObject.toJSONString(protocol));
return oneway ? this.sendRequestOneway(protocol) : async ?
sendRequestAsync(protocol) : this.sendRequestSync(protocol);
}
5.修改RpcConsumer服務(wù)消費(fèi)者類
RpcConsumer類位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.RpcConsumer,主要是修改RpcConsumer類中的sendRequest()方法,調(diào)用RpcConsumerHandler處理器類的sendRequest()方法時(shí),需要傳遞是否是異步調(diào)用async的標(biāo)識(shí)和是否是單向調(diào)用oneway的標(biāo)識(shí),源碼如下所示。
public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol) throws Exception {
//################省略其他代碼################
RpcRequest request = protocol.getBody();
return handler.sendRequest(protocol, request.getAsync(), request.getOneway());
}
至此,整個(gè)實(shí)現(xiàn)就完畢了。實(shí)現(xiàn)起來是不是很簡(jiǎn)單呢?
五、測(cè)試
1.啟動(dòng)服務(wù)提供者
整個(gè)測(cè)試過程不需要修改服務(wù)提供者的代碼,所以,先啟動(dòng)服務(wù)提供者,啟動(dòng)bhrpc-test-provider工程下的io.binghe.rpc.test.provider.single.RpcSingleServerTest,輸出的結(jié)果信息如下所示。
INFO BaseServer:82 - Server started on 127.0.0.1:27880
可以看到,服務(wù)提供者啟動(dòng)成功。
2.測(cè)試同步調(diào)用
(1)修改同步調(diào)用的main()方法
修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,源碼如下所示。
public static void main(String[] args) throws Exception {
RpcConsumer consumer = RpcConsumer.getInstance();
RPCFuture future = consumer.sendRequest(getRpcRequestProtocol());
LOGGER.info("從服務(wù)消費(fèi)者獲取到的數(shù)據(jù)===>>>" + future.get());
consumer.close();
}
可以看到,同步調(diào)用時(shí),會(huì)直接回去方法調(diào)用的結(jié)果數(shù)據(jù)。
(2)啟動(dòng)服務(wù)消費(fèi)者
啟動(dòng)bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,輸出的結(jié)果信息如下所示。
13:45:12,576 INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.
13:45:12,693 INFO RpcConsumerHandler:90 - 服務(wù)消費(fèi)者發(fā)送的數(shù)據(jù)===>>>{"body":{"async":false,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":false,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}
13:45:12,868 INFO RpcConsumerHandler:77 - 服務(wù)消費(fèi)者接收到的數(shù)據(jù)===>>>{"body":{"async":false,"oneway":false,"result":"hello binghe"},"header":{"magic":16,"msgLen":211,"msgType":2,"requestId":1,"serializationType":"jdk","status":0}}
13:45:12,869 INFO RpcConsumerHandlerTest:38 - 從服務(wù)消費(fèi)者獲取到的數(shù)據(jù)===>>>hello binghe
可以看到,在服務(wù)消費(fèi)者輸出的信息中,除了向服務(wù)提供者發(fā)送的數(shù)據(jù)與接收服務(wù)提供者響應(yīng)的數(shù)據(jù)外,還在RpcConsumerHandlerTest類的main()方法中打印出了通過自定義的RPCFuture對(duì)象獲取的最終結(jié)果數(shù)據(jù)為hello binghe。符合預(yù)期的效果。
(3)再次查看服務(wù)提供者日志
再次查看服務(wù)提供者輸出的日志信息,如下所示。
13:45:12,748 INFO RpcProviderHandler:132 - use cglib reflect type invoke method...
13:45:12,748 INFO ProviderDemoServiceImpl:33 - 調(diào)用hello方法傳入的參數(shù)為===>>>binghe
可以看到,服務(wù)提供者使用CGLib的方式調(diào)用了真實(shí)的方法。
3.測(cè)試異步調(diào)用
(1)修改同步調(diào)用的main()方法
修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,源碼如下所示。
public static void main(String[] args) throws Exception {
RpcConsumer consumer = RpcConsumer.getInstance();
consumer.sendRequest(getRpcRequestProtocol());
RPCFuture future = RpcContext.getContext().getRPCFuture();
LOGGER.info("從服務(wù)消費(fèi)者獲取到的數(shù)據(jù)===>>>" + future.get());
consumer.close();
}
可以看到,執(zhí)行異步調(diào)用時(shí),并沒有從調(diào)用consumer的sendRequest()方法直接獲取返回的RPCFuture結(jié)果數(shù)據(jù),而是通過RpcContext上下文獲取到RPCFuture對(duì)象,再由RPCFuture對(duì)象獲取結(jié)果數(shù)據(jù)。
(2)修改構(gòu)建RpcProtocol對(duì)象的方法
修改getRpcRequestProtocol()方法中構(gòu)建RpcRequest的方法參數(shù),將是否是異步調(diào)用的參數(shù)設(shè)置為true,源碼如下所示。
private static RpcProtocol<RpcRequest> getRpcRequestProtocol(){
//模擬發(fā)送數(shù)據(jù)
RpcProtocol<RpcRequest> protocol = new RpcProtocol<RpcRequest>();
//################省略其他代碼##########################
request.setAsync(true);
request.setOneway(false);
protocol.setBody(request);
return protocol;
}
(3)啟動(dòng)服務(wù)消費(fèi)者
啟動(dòng)bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,輸出的結(jié)果信息如下所示
13:47:55,800 INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.
13:47:55,905 INFO RpcConsumerHandler:90 - 服務(wù)消費(fèi)者發(fā)送的數(shù)據(jù)===>>>{"body":{"async":true,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":false,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}
13:47:55,971 INFO RpcConsumerHandler:77 - 服務(wù)消費(fèi)者接收到的數(shù)據(jù)===>>>{"body":{"async":true,"oneway":false,"result":"hello binghe"},"header":{"magic":16,"msgLen":211,"msgType":2,"requestId":1,"serializationType":"jdk","status":0}}
13:47:55,971 INFO RpcConsumerHandlerTest:40 - 從服務(wù)消費(fèi)者獲取到的數(shù)據(jù)===>>>hello binghe
可以看到,在服務(wù)消費(fèi)者輸出的信息中,除了向服務(wù)提供者發(fā)送的數(shù)據(jù)與接收服務(wù)提供者響應(yīng)的數(shù)據(jù)外,還在RpcConsumerHandlerTest類的main()方法中打印出了通過自定義的RPCFuture對(duì)象獲取的最終結(jié)果數(shù)據(jù)為hello binghe。符合預(yù)期的效果。
(4)再次查看服務(wù)提供者日志
再次查看服務(wù)提供者輸出的日志信息,如下所示。
13:47:55,948 INFO RpcProviderHandler:132 - use cglib reflect type invoke method...
13:47:55,948 INFO ProviderDemoServiceImpl:33 - 調(diào)用hello方法傳入的參數(shù)為===>>>binghe
可以看到,服務(wù)提供者使用CGLib的方式調(diào)用了真實(shí)的方法。
4.測(cè)試單向調(diào)用
(1)修改同步調(diào)用的main()方法
修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,源碼如下所示。
public static void main(String[] args) throws Exception {
RpcConsumer consumer = RpcConsumer.getInstance();
consumer.sendRequest(getRpcRequestProtocol());
LOGGER.info("無需獲取返回的結(jié)果數(shù)據(jù)");
consumer.close();
}
可以看到,在單向調(diào)用中,并沒有獲取返回結(jié)果。
(2)修改構(gòu)建RpcProtocol對(duì)象的方法
修改getRpcRequestProtocol()方法中構(gòu)建RpcRequest的方法參數(shù),將是否是單向調(diào)用的參數(shù)設(shè)置為true,源碼如下所示。
private static RpcProtocol<RpcRequest> getRpcRequestProtocol(){
//模擬發(fā)送數(shù)據(jù)
//#############省略其他代碼#################
request.setAsync(false);
request.setOneway(true);
protocol.setBody(request);
return protocol;
}
(3)啟動(dòng)服務(wù)消費(fèi)者
啟動(dòng)bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,輸出的結(jié)果信息如下所示。
13:58:26,417 INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.
13:58:26,524 INFO RpcConsumerHandler:90 - 服務(wù)消費(fèi)者發(fā)送的數(shù)據(jù)===>>>{"body":{"async":false,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":true,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}
13:58:26,531 INFO RpcConsumerHandlerTest:39 - 無需獲取返回的結(jié)果數(shù)據(jù)
可以看到,服務(wù)消費(fèi)者向服務(wù)提供者發(fā)送數(shù)據(jù)后,并沒有獲取返回的結(jié)果數(shù)據(jù)。
(4)再次查看服務(wù)提供者日志
再次查看服務(wù)提供者輸出的日志信息,如下所示。
13:58:26,565 INFO RpcProviderHandler:132 - use cglib reflect type invoke method...
13:58:26,566 INFO ProviderDemoServiceImpl:33 - 調(diào)用hello方法傳入的參數(shù)為===>>>binghe
可以看到,服務(wù)提供者使用CGLib的方式調(diào)用了真實(shí)的方法。
六、總結(jié)
目前實(shí)現(xiàn)的RPC框架以Java原生進(jìn)程的方式啟動(dòng)后,能夠?qū)崿F(xiàn)服務(wù)消費(fèi)者以同步、異步和單向調(diào)用的方式與服務(wù)提供者之間進(jìn)行數(shù)據(jù)交互。至此,我們寫的RPC框架的功能又進(jìn)一步得到了增強(qiáng)。
我們寫的RPC框架正在一步步實(shí)現(xiàn)它該有的功能。