《一起玩Dubbo》系列四之服務(wù)如何被調(diào)用
了解過rpc的大概都聽過,rpc就是為了解決遠(yuǎn)程方法的本地調(diào)用的難題的,其實(shí)說穿了,就是為了解決方法在被調(diào)用到遠(yuǎn)程服被執(zhí)行的流程問題,那么這個(gè)流程到底是怎么樣的呢?
同樣的,我繼續(xù)在 dubbo流程圖 中繼續(xù)繪畫我的流程
首先是根據(jù)文章一起玩dubbo,先入個(gè)門搭建起demo,包括注冊中心、服務(wù)消費(fèi)方和服務(wù)提供方,接下來來擼擼整個(gè)過程
這邊為了方便解說,先直接給個(gè)demo
這是服務(wù)提供方
- public class DemoServiceImpl implements DemoService {
- @Override
- public String sayHello(String name) {
- System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
- return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
- }
- }
這是服務(wù)消費(fèi)方
- public class Consumer {
- public static void main(String[] args) {
- ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring" +
- "/dubbo-demo-consumer.xml"});
- context.start();
- DemoService demoService = (DemoService) context.getBean("demoService");
- while (true) {
- try {
- Thread.sleep(1000);
- String hello = demoService.sayHello("world");
- System.out.println(hello);
- } catch (Throwable throwable) {
- throwable.printStackTrace();
- }
- }
- }
- }
我斷點(diǎn)了下這里
image-20210714015553406
走到服務(wù)消費(fèi)方的最底層可以看到
在開始分析細(xì)節(jié)之前我們先在大腦風(fēng)暴下大致流程
一次調(diào)用過程需要經(jīng)歷哪些步驟?
不用看dubbo代碼都可以大概猜到:
- 要知道遠(yuǎn)程服務(wù)的地址,
- 把要調(diào)用的方法的具體信息告訴遠(yuǎn)程服務(wù),讓遠(yuǎn)程服務(wù)解析這些信息
- 遠(yuǎn)程服務(wù)根據(jù)這些信息找到對應(yīng)的實(shí)現(xiàn)類,進(jìn)行調(diào)用,調(diào)用完了
- 調(diào)用結(jié)果原路返回,然后客戶端解析響應(yīng)
第一點(diǎn),我們通過前幾篇文章已經(jīng)知道,消費(fèi)方在發(fā)起調(diào)用的時(shí)候已經(jīng)知曉了遠(yuǎn)程服務(wù)的地址
那么要調(diào)用的方法的具體信息包括哪些呢?
客戶端肯定要告訴服務(wù)方調(diào)用的哪個(gè)接口,所以需要方法名、方法的參數(shù)類型、方法的參數(shù)值,然后有可能存在多個(gè)版本的情況,所以還得帶上版本號,有這些數(shù)據(jù)后,服務(wù)方就可以精準(zhǔn)的調(diào)用具體的方法了。
我這邊將上面調(diào)用的例子先貼出來
mdata也就是我上面說的那些數(shù)據(jù)。
看到這個(gè)Request這里,應(yīng)該就清楚了遠(yuǎn)程調(diào)用的基本原理了。
這個(gè)時(shí)候很容易就想到另一個(gè)問題,消費(fèi)方和提供方是如何通信的?
消費(fèi)方和提供方如何通信?
其實(shí)很簡單,就是消費(fèi)方和提供方通過協(xié)議進(jìn)行了通信罷了,dubbo的協(xié)議屬于很常見的header+body 形式,而且也有特殊的字符 0xdabb,用來解決 TCP 網(wǎng)絡(luò)粘包問題的。這種header是固定長度的,然后header里面填寫 body 的長度是比較常見的做法,包括我司的游戲框架也是用這種模式。
我們可以看看dubbo協(xié)議的鬼樣
可以看到,協(xié)議分為協(xié)議頭和協(xié)議體,16 字節(jié)的頭部主要攜帶了魔法數(shù),也就是之前說的 0xdabb,然后一些請求的設(shè)置,消息體的長度等等,16 字節(jié)之后就是協(xié)議體了,包括協(xié)議版本、接口名字、接口版本、方法名字等等。
看到這里又很容易的引申出另一個(gè)問題了,協(xié)議是如何序列化的?
協(xié)議的序列化?
序列化的概念其實(shí)也簡答, 在消費(fèi)方先把Java對象轉(zhuǎn)換為字節(jié)序列,這個(gè)過程也被稱為對象的序列化,然后在服務(wù)方又把字節(jié)序列恢復(fù)為Java對象,這個(gè)過程稱為對象的反序列化。
dubbo默認(rèn)使用的是 hessian2 序列化協(xié)議,hessian2是阿里對于hessian進(jìn)行行了修改的版本,應(yīng)該還不錯(cuò)。
大致總結(jié)下,消費(fèi)方發(fā)起調(diào)用,在那一刻,實(shí)際調(diào)用的是代理類,代理類最終調(diào)用的是Client,Client將 Java 的對象序列化生成協(xié)議體,然后通過網(wǎng)絡(luò)傳輸給服務(wù)方,服務(wù)方Server接到這個(gè)請求之后,分發(fā)給業(yè)務(wù)線程池,由業(yè)務(wù)線程調(diào)用具體的實(shí)現(xiàn)方法。
先see see官網(wǎng)圖吧
分析下消費(fèi)方的調(diào)用鏈路
我們先看看服務(wù)消費(fèi)方的調(diào)用邏輯,大家可以對著我這張圖來
好了,我繼續(xù)說
可以看到調(diào)用的接口生成的代理類是
而在invoke的時(shí)候會(huì)先釋放掉部分不需要攔截的方法啦,比如toString什么的,這樣正常吧,這些方法確實(shí)不需要攔截的嘛
看看RpcInvocation是什么
可以看到生成的 RpcInvocation 包含了方法名、參數(shù)類和參數(shù)值什么的。
接下來往里進(jìn)一步看看MockClusterInvoker#invoke 代碼,先解釋下為啥會(huì)進(jìn)來了MockClusterInvoker,看過文章 想學(xué)dubbo的看過來,2萬字整理服務(wù)引入流程 應(yīng)該可以理解這個(gè)過程,這個(gè)過程可以認(rèn)為是套娃吧,A套B,B套C,一直套到最外層的invoker就是MockClusterInvoker,如果不理解這個(gè)過程可以往回看我的文章,很肝卻很實(shí)用
這里可以看到就是判斷配置里面有沒有配置mock,mock 的話后續(xù)再展開說,繼續(xù)看看this.invoker.invoke 的實(shí)現(xiàn),實(shí)際上會(huì)調(diào)用 AbstractClusterInvoker#invoker
這里倒是涉及到了一個(gè)模板方法的設(shè)計(jì)模式,其實(shí)很簡單,就是在抽象類中定好代碼的執(zhí)行骨架,之后將具體的實(shí)現(xiàn)延遲到子類中,由子類來決定邏輯,這樣可以在不改變整體執(zhí)行步驟的情況下修改步驟里面的實(shí)現(xiàn),減少了重復(fù)的代碼,也利于擴(kuò)展,符合了開閉原則。
接下來看看
做了啥,這一步算是比較重要吧,單獨(dú)拎出來講講
這里其實(shí)就是先由路由過濾一波,然后返回invoker
繼續(xù)看看doInvoke的流程,我們默認(rèn)使用的是 FailoverClusterInvoker,也就是失敗自動(dòng)切換的容錯(cuò)方式,
這里說說為啥默認(rèn)是這個(gè)哦,其實(shí)從實(shí)際應(yīng)用上來說,失敗后自動(dòng)切換下個(gè)服務(wù)實(shí)例還是比較符合場景的,如果想替換其他模式可以在xml里邊配置
那我們繼續(xù)看看那doInvoke的實(shí)現(xiàn)
- public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
- List<Invoker<T>> copyinvokers = invokers;
- checkInvokers(copyinvokers, invocation);
- String methodName = RpcUtils.getMethodName(invocation);
- int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
- if (len <= 0) {
- len = 1;
- }
- // retry loop.
- RpcException le = null; // last exception.
- List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
- Set<String> providers = new HashSet<String>(len);
- // 重試次數(shù)
- for (int i = 0; i < len; i++) {
- //Reselect before retry to avoid a change of candidate `invokers`.
- //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
- if (i > 0) {
- checkWhetherDestroyed();
- copyinvokers = list(invocation);
- // check again
- checkInvokers(copyinvokers, invocation);
- }
- // 通過負(fù)載選擇了一個(gè)invoker
- Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
- invoked.add(invoker);
- // 上下文保留了調(diào)用過的invoker
- RpcContext.getContext().setInvokers((List) invoked);
- try {
- // 發(fā)起調(diào)用
- Result result = invoker.invoke(invocation);
- if (le != null && logger.isWarnEnabled()) {
- logger.warn("Although retry the method " + methodName
- + " in the service " + getInterface().getName()
- + " was successful by the provider " + invoker.getUrl().getAddress()
- + ", but there have been failed providers " + providers
- + " (" + providers.size() + "/" + copyinvokers.size()
- + ") from the registry " + directory.getUrl().getAddress()
- + " on the consumer " + NetUtils.getLocalHost()
- + " using the dubbo version " + Version.getVersion() + ". Last error is: "
- + le.getMessage(), le);
- }
- return result;
- } catch (RpcException e) {
- if (e.isBiz()) { // biz exception.
- throw e;
- }
- le = e;
- } catch (Throwable e) {
- le = new RpcException(e.getMessage(), e);
- } finally {
- providers.add(invoker.getUrl().getAddress());
- }
- }
- throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
- + methodName + " in the service " + getInterface().getName()
- + ". Tried " + len + " times of the providers " + providers
- + " (" + providers.size() + "/" + copyinvokers.size()
- + ") from the registry " + directory.getUrl().getAddress()
- + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
- + Version.getVersion() + ". Last error is: "
- + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
- }
這個(gè)調(diào)用稍微總結(jié)一下就是FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表,并且經(jīng)過路由之后,通過LoadBalance 從 Invoker 列表中選擇一個(gè) Invoker,也就是負(fù)載均衡啦,最后FailoverClusterInvoker會(huì)將參數(shù)傳給選擇出的那個(gè) Invoker 實(shí)例的 invoke 方法,進(jìn)行真正的遠(yuǎn)程調(diào)用。
后面發(fā)起調(diào)用的這個(gè) invoke 又是調(diào)用抽象類中的 invoke 然后再調(diào)用子類的 doInvoker,抽象類中的方法很簡單我就不展示了,我們直接看子類 DubboInvoker 的 doInvoke 方法。
- protected Result doInvoke(final Invocation invocation) throws Throwable {
- RpcInvocation inv = (RpcInvocation) invocation;
- final String methodName = RpcUtils.getMethodName(invocation);
- inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
- inv.setAttachment(Constants.VERSION_KEY, version);
- ExchangeClient currentClient;
- // 選擇client
- if (clients.length == 1) {
- currentClient = clients[0];
- } else {
- currentClient = clients[index.getAndIncrement() % clients.length];
- }
- try {
- // 是否異步
- boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
- // 是否oneway方式發(fā)送,也就是需不需要返回值
- boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
- // 超時(shí)時(shí)間
- int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
- // 不需要返回值
- if (isOneway) {
- boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
- // 協(xié)議發(fā)送
- currentClient.send(inv, isSent);
- // future直接是Null
- RpcContext.getContext().setFuture(null);
- // 返回空的結(jié)果
- return new RpcResult();
- } else if (isAsync) {
- // 異步發(fā)送
- ResponseFuture future = currentClient.request(inv, timeout);
- // 設(shè)置future
- RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
- // 返回空結(jié)果
- return new RpcResult();
- } else {
- // 同步發(fā)送
- RpcContext.getContext().setFuture(null);
- // 直接調(diào)用了future.get去等待
- return (Result) currentClient.request(inv, timeout).get();
- }
- } catch (TimeoutException e) {
- throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
- } catch (RemotingException e) {
- throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
- }
- }
這里可以看到調(diào)用的方式有三種,分別是 oneway、異步、同步,我分別說說
- oneway是比較常見的方式了,就是當(dāng)我們不關(guān)心請求是否發(fā)送成功的情況下,就用 oneway 的方式發(fā)送,這種方式消耗最小。
- 異步調(diào)用,我們可以看到其實(shí) Dubbo 天然支持異步的,client 發(fā)送請求之后會(huì)得到一個(gè) ResponseFuture,然后把 future 包裝一下塞到上下文中,這樣用戶就可以從上下文中拿到這個(gè) future,然后調(diào)用方可以做了一波操作之后再調(diào)用 future.whenComplete什么的異步做點(diǎn)什么。
- 同步調(diào)用,Dubbo 底層也幫我們做了,可以看到在 Dubbo 源碼中就調(diào)用了 future.get,所以給我們的感覺就是我調(diào)用了這個(gè)接口的方法之后就阻塞住了,必須要等待結(jié)果到了之后才能返回,所以就是同步的。
那么這個(gè)回調(diào)是怎么做的?
其實(shí)很簡單的,就是在調(diào)用的時(shí)候生成一個(gè)唯一的id,將回調(diào)和這個(gè)id緩存起來,然后將這個(gè)id傳遞到服務(wù)方,服務(wù)方在處理好業(yè)務(wù)后將結(jié)果和這個(gè)id重新發(fā)回到消費(fèi)方,消費(fèi)方拿到回調(diào)觸發(fā)即可。
我們看看代碼層面的
看看DefaultFuture是什么
看到啦,里邊生成了唯一id,然后放到FUTURES這個(gè)并發(fā)容器里邊,我們看看用的地方
這里比較清楚了吧,在收到返回的協(xié)議后將future拿出來去觸發(fā),基于這種思路,很多做回調(diào)的都可以用這種設(shè)計(jì)思路。
到這里服務(wù)消費(fèi)方怎么去觸發(fā)rpc的這個(gè)行為基本上就到這了,其實(shí)還是很清晰的,先是起服訂閱的時(shí)候?qū)訉臃庋b了invoker,然后搞出了一個(gè)代理對象注入到我們的接口中,然后在調(diào)用接口的時(shí)候就一個(gè)個(gè)調(diào)用invoker啦,最后就是發(fā)協(xié)議給服務(wù)提供方。
愛了愛了,簡單清晰的邏輯。
接下來說說服務(wù)提供方的調(diào)用流程。
分析下提供方的調(diào)用電路
同樣的,我們先看看服務(wù)提供方的調(diào)用鏈
這個(gè)流程也是特別長的,我這邊只拎幾個(gè)重點(diǎn)出來,先看下HeaderExchangeHandler,handleRequest
這里很容易理解啦,就是把request對象中的data取出來傳到DubboProtocol.requestHandler中,這個(gè)data就是前面的解碼后的DecodeableRpcInvocation對象它是Invocation接口的一個(gè)實(shí)現(xiàn),我們可以看看里邊有啥
可以看到調(diào)用信息都在這里啦,接下來就簡單了,根據(jù)這些參數(shù)拿到對應(yīng)的對象反射調(diào)用下就可以了,接下來看看DubboProtocol比較核心的reply方法
- @Override
- public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
- if (message instanceof Invocation) {
- Invocation inv = (Invocation) message;
- // 根據(jù)調(diào)用的參數(shù)拿到對應(yīng)的invoker,其實(shí)就是之前服務(wù)暴露的時(shí)候有說過的Exporter里邊取的
- Invoker<?> invoker = getInvoker(channel, inv);
- // 這里邊是對callback回來的一些處理,先不管
- if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
- String methodsStr = invoker.getUrl().getParameters().get("methods");
- boolean hasMethod = false;
- if (methodsStr == null || methodsStr.indexOf(",") == -1) {
- hasMethod = inv.getMethodName().equals(methodsStr);
- } else {
- String[] methods = methodsStr.split(",");
- for (String method : methods) {
- if (inv.getMethodName().equals(method)) {
- hasMethod = true;
- break;
- }
- }
- }
- if (!hasMethod) {
- logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
- + " not found in callback service interface ,invoke will be ignored."
- + " please update the api interface. url is:"
- + invoker.getUrl()) + " ,invocation is :" + inv);
- return null;
- }
- }
- RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
- // 最后invoke一下啦
- return invoker.invoke(inv);
- }
- throw new RemotingException(channel, "Unsupported request: "
- + (message == null ? null : (message.getClass().getName() + ": " + message))
- + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
- }
getInvoker的邏輯也簡單,之前的文章服務(wù)暴露有說過這個(gè)過程啦,其實(shí)就是從一個(gè)DubboProtocol.exporterMap內(nèi)找到一個(gè)Exporter,再從里邊取出invoker,那么key是啥呢,key其實(shí)是由URL生成的serviceKey,此時(shí)通過Invocation中的信息就可還原該serviceKey并且找到對應(yīng)的Exporter和Invoker。再看看之前提過的 JavassistProxyFactory,這是一個(gè)給提供方的服務(wù)對象生成代理的工廠類
這個(gè)也說過啦,調(diào)用invoker.invoke時(shí),通過反射調(diào)用最終的服務(wù)實(shí)現(xiàn)執(zhí)行相關(guān)邏輯,入口就是這里了。因?yàn)檫@塊之前的文章比較詳細(xì)的說過,這里就不重復(fù)了。
到了這一步,調(diào)用就已經(jīng)技術(shù)了,我們再看看調(diào)用結(jié)束后怎么將結(jié)果返回給服務(wù)消費(fèi)方。
調(diào)用結(jié)束后,服務(wù)提供方方就會(huì)創(chuàng)建一個(gè)Response對象返回給服務(wù)消費(fèi)方,那么自然在執(zhí)行服務(wù)實(shí)現(xiàn)時(shí)會(huì)出現(xiàn)兩種結(jié)果:成功和失敗
如果成功的話,則把返回值設(shè)置到Response的result中,Response的status設(shè)置成OK
如果失敗,把失敗異常設(shè)置到Response的errorMessage中,status設(shè)置成SERVICE_ERROR
我們會(huì)回到HeaderExchangeHandler.received中的代碼來看看,在handleRequest之后,調(diào)用channel.send把Response發(fā)送到客戶端,這個(gè)channel封裝客戶端-服務(wù)端通信鏈路,最終會(huì)調(diào)用Netty框架,把響應(yīng)寫回到客戶端。
慣例總結(jié)下
終于將調(diào)用這個(gè)過程說完啦,其實(shí)思路還是比較清晰的,不過最好是自己全程斷點(diǎn)細(xì)看下啦,可以學(xué)到很多東西的。
說說后續(xù)安排:
- SPI
- dubbo中的AOP機(jī)制
- 服務(wù)治理
- ....
等好幾個(gè)模塊,最后就是帶大家擼一個(gè)RPC框架了,還是那句話,想學(xué)dubbo的可以持續(xù)關(guān)注這一系列。