Nacos 服務(wù)訂閱流程全解析
本文將基于之前源碼搭建系列的前置步驟針對nacos服務(wù)訂閱流程源碼進(jìn)行深入梳理和分析,希望對你有幫助。
客戶端發(fā)起服務(wù)訂閱
為了方便講解,筆者基于CommandLineRunner 這個擴(kuò)展點(diǎn)主動在服務(wù)完成初始化之后通過NamingService 的subscribe發(fā)起服務(wù)訂閱請求:
@Component
public class TestRunner implements CommandLineRunner {
private final static Logger log = LoggerFactory
.getLogger(TestRunner.class);
@Override
public void run(String... args) throws Exception {
//主動向nacos發(fā)起服務(wù)訂閱請求
NamingService naming = NamingFactory.createNamingService("127.0.0.1:8848");
//主動訂閱nacos-provider這個服務(wù)的實(shí)例信息
naming.subscribe("nacos-provider", event -> {
if (event instanceof NamingEvent) {
//日志打印監(jiān)聽到的服務(wù)名稱和結(jié)果
log.info("監(jiān)聽到服務(wù)名稱:{},實(shí)例信息:{}", ((NamingEvent) event).getServiceName(),
((NamingEvent) event).getInstances());
}
});
}
}
查看NamingService 的subscribe源碼可知,該方法會基于我們給定的服務(wù)名稱以及分組等信息主動發(fā)起RPC主動向nacos獲取nacos-provider的實(shí)例信息:
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
//......
//基于RPC代理發(fā)起服務(wù)訂閱
clientProxy.subscribe(serviceName, groupName, clusterString);
}
而clientProxy(底層就是NamingClientProxyDelegate)的執(zhí)行邏輯也比較簡單:
- 從緩存管理器serviceInfoHolder中嘗試獲取需要訂閱的服務(wù)信息。
- 如果不存在或者未訂閱則向nacos發(fā)起RPC請求。
- 通過serviceInfoHolder緩存訂閱的服務(wù)實(shí)例信息。
對應(yīng)的我們給出NamingClientProxyDelegate獲取服務(wù)實(shí)例的源碼段,和上述語義一致,讀者可以參考注釋了解一下過程:
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
//......
//查看緩存中是否存在訂閱的服務(wù)實(shí)例
//......
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
//如果不存在或者未訂閱則發(fā)起RPC請求
if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
//將查詢或者請求結(jié)果緩存到本地serviceInfoMap中
serviceInfoHolder.processServiceInfo(result);
return result;
}
默認(rèn)情況下,我們的緩存是沒有訂閱服務(wù)的信息的,所以會觸發(fā)RPC請求,然后nacos就會返回當(dāng)前服務(wù)的元信息,如下便是從nacos服務(wù)端返回的nacos-provider實(shí)例節(jié)點(diǎn)信息,后續(xù)如果需要調(diào)用,都會基于這份元信息發(fā)起請求:
我們也可以通過抓包工具看到這個請求的詳細(xì)內(nèi)容,如下圖所示,可以看到筆者基于nacos客戶端端口64393作為源端口號,nacos服務(wù)端端口號即9848作為目的端口進(jìn)行抓取捕獲到的TCP網(wǎng)絡(luò)包,可以看到這個訂閱的RPC接口請求參數(shù)詳情:
解碼后即可看到這個請求的數(shù)據(jù)就是源碼調(diào)試時看到的請求參數(shù),也就是對于nacos-provider的訂閱:
基于服務(wù)端的返回結(jié)果,客戶端會進(jìn)行如下操作:
- 將實(shí)例信息更新到本地緩存。
- 查看對應(yīng)服務(wù)實(shí)例在本地緩存中的數(shù)據(jù),并和服務(wù)端響應(yīng)的數(shù)據(jù)進(jìn)行比對,若一致則說明實(shí)例信息沒有更新,直接返回,如果發(fā)現(xiàn)不一致,進(jìn)入步驟3。
- 則說明服務(wù)發(fā)生變化,則基于零拷貝將響應(yīng)結(jié)果寫入本地持久化,便于后續(xù)服務(wù)重啟恢復(fù)數(shù)據(jù)。
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//獲取原有服務(wù)信息
String serviceKey = serviceInfo.getKey();
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
//......
//以服務(wù)名作為key,實(shí)例信息作為value寫入緩存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//比對實(shí)例,如果不一致則說明服務(wù)發(fā)生變化
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
//......
if (changed) {
//......
//如果發(fā)生變化則基于零拷貝刷盤技術(shù)將訂閱的服務(wù)信息寫入本地
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
Nacos基于緩存發(fā)起回復(fù)并注冊該客戶端
了解了客戶端的訂閱請求之后,我們再來聊聊nacos是如何處理該請求的,在之前的文章中我們說過,nacos是通過grpcCommonRequestAcceptor處理客戶端的RPC請求,對應(yīng)服務(wù)訂閱請求,服務(wù)端收到的將會收到一個類型為SubscribeServiceRequest 的服務(wù)訂閱請求:
找到對應(yīng)的處理器SubscribeServiceRequestHandler之后,grpcCommonRequestAcceptor會從RPC請求報(bào)文中拿到參數(shù)交由該處理器進(jìn)行處理:
對應(yīng)的我們也給出nacos服務(wù)端處理客戶端服務(wù)訂閱請求的代碼段,即位于GrpcRequestAcceptor的request方法:
@Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
//......
traceIfNecessary(grpcRequest, true);
//需要使用的服務(wù)器類型,例如服務(wù)注冊就是 InstanceRequest
String type = grpcRequest.getMetadata().getType();
long startTime = System.nanoTime();
//......
//基于type到找到對應(yīng)的請求處理器
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
//......
//......
Request request = (Request) parseObj;
try {
//......
//解析參數(shù)并處理該請求
Response response = requestHandler.handleRequest(request, requestMeta);
//......
} catch (Throwable e) {
//......
}
}
隨后SubscribeServiceRequestHandler的handle就會執(zhí)行如下步驟:
- 基于參數(shù)生成要獲取的服務(wù)請求參數(shù)service,service包含要請求的服務(wù)名稱、命名空間、分組等信息。
- 從緩存serviceStorage管理的緩存serviceDataIndexes中查詢出對應(yīng)服務(wù)實(shí)例即nacos-provider的所有實(shí)例信息。
- 過濾出有效即健康可用的實(shí)例響應(yīng)給客戶端。
對應(yīng)SubscribeServiceRequestHandler的源碼如下,讀者可參考注釋查閱:
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
//解析參數(shù)信息
String namespaceId = request.getNamespace();
String serviceName = request.getServiceName();
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//基于參數(shù)生成服務(wù)元信息,即要獲取的服務(wù)信息
Service service = Service.newService(namespaceId, groupName, serviceName, true);
//......
//基于serviceStorage從緩存中拿到訂閱的服務(wù)信息,再通過selectInstancesWithHealthyProtection篩選出健康的示例
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
true, subscriber.getIp());
//......
//返回結(jié)果
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
小結(jié)
自此我們通過源碼閱讀、網(wǎng)絡(luò)抓包、斷點(diǎn)調(diào)測查看等方式對nacos服務(wù)訂閱的源碼進(jìn)行的較為詳細(xì)的分析,希望對你有幫助。