Nacos 服務注冊原理全解析
本著引導性啟發的理念,筆者將以個人調試閱讀的角度帶讀者了解一下筆者如何完成nacos服務注冊模塊閱讀,希望對你有幫助。
一、從自動裝配入手
1. 基于配置定位裝配信息
一般來說我們服務進行注冊時都會引入nacos服務發現的starter:
<!-- nacos服務注冊 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>${revision}</version>
</dependency>
通過引入這個依賴,服務啟動時就會將自己的信息發送給nacos從而完成服務注冊,結合spring boot自動裝配的工作機制,我們不難猜出這個依賴的spring.factories文件肯定存在某個完成服務注冊的bean。
于是我們查看spring-cloud-starter-alibaba-nacos-discovery的spring.factories文件就看到了NacosServiceRegistryAutoConfiguration這個帶有服務自動注冊的bean:
2. NacosServiceRegistry注冊器的裝配
先來說說第一個裝配的bean即nacosServiceRegistry,通過對整個類的結構和內部邏輯我們可知曉,它會基于我們如下的服務啟動的配置參數完成初始化:
# 服務名稱
spring.application.name=nacos-provider
# 服務注冊地址
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
在spring boot啟動后并發布ServletWebServerInitializedEvent這個web初始化事件之后,spring cloud的AbstractAutoServiceRegistration就會調用register方法完成服務注冊,此時該方法內部就會走到我們的NacosAutoServiceRegistration的register從而完成服務注冊:
對此我們也給出NacosServiceRegistry 的初始化裝配函數,可以看到它的入參就是和服務注冊相關的屬性配置和管理類:
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
//基于服務管理類和服務注冊屬性配置完成初始化
return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
}
查看其實現的register方法也可以看出,本質上NacosServiceRegistry就是基于namingService獲取注冊中心地址,然后拿著服務的id(serviceId)、分組(group )等信息通過通過注冊中心實例的registerInstance發起RPC調用完成服務注冊:
@Override
public void register(Registration registration) {
//......
//解析出服務的id和服務使用的分組
NamingService namingService = namingService();//獲取name server
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
//基于注冊信息生成實例信息
Instance instance = getNacosInstanceFromRegistration(registration);
try {
//向nacos發起服務注冊
namingService.registerInstance(serviceId, group, instance);
//......
}
catch (Exception e) {
//......
}
}
3. NacosRegistration的裝配
隨后這個裝配會執行NacosRegistration 的初始化,這個類我們著重了解nacosDiscoveryProperties 和ApplicationContext 參數即可,所以我們大體可以推測出這個bean是基于spring上下文完成一些服務注冊的元信息的維護:
/**
*
* @param registrationCustomizers
* @param nacosDiscoveryProperties 服務發現屬性配置
* @param context spring上下文
* @return
*/
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(),
nacosDiscoveryProperties, context);
}
這一點我們可直接通過該裝配bean的init方法查看其作用,可以看到這個類做了如下幾件事:
- 通過nacosDiscoveryProperties拿到一些元信息配置存入metadata,這個bean默認情況下這些信息都是空,所以沒有做什么很核心的事情
- 通過Spring context拿到環境變量配置
對應NacosRegistration的init源碼如下,讀者可參考注釋了解:
@PostConstruct
public void init() {
Map<String, String> metadata = nacosDiscoveryProperties.getMetadata();
//基于上下文拿到環境的配置
Environment env = context.getEnvironment();
//元信息存入 management.endpoints.web.base-path即actuator的基本路徑
String endpointBasePath = env.getProperty(MANAGEMENT_ENDPOINT_BASE_PATH);
if (StringUtils.hasLength(endpointBasePath)) {
metadata.put(MANAGEMENT_ENDPOINT_BASE_PATH, endpointBasePath);
}
//基于spring上下文拿到管理端口號
Integer managementPort = ManagementServerPortUtils.getPort(context);
//......
//完成一些心跳、超時、ip過期刪除等元信息的維護
if (null != nacosDiscoveryProperties.getHeartBeatInterval()) {
metadata.put(PreservedMetadataKeys.HEART_BEAT_INTERVAL,
nacosDiscoveryProperties.getHeartBeatInterval().toString());
}
if (null != nacosDiscoveryProperties.getHeartBeatTimeout()) {
metadata.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT,
nacosDiscoveryProperties.getHeartBeatTimeout().toString());
}
if (null != nacosDiscoveryProperties.getIpDeleteTimeout()) {
metadata.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT,
nacosDiscoveryProperties.getIpDeleteTimeout().toString());
}
//......
}
4. 服務注冊執行者NacosAutoServiceRegistration的裝配
最后一個就是比較核心的bean了,也就是NacosAutoServiceRegistration,它從spring的容器中拿到上述兩個bean作為成員變量存入,同時繼承了AbstractAutoServiceRegistration完成register的調用。也就是說第一個bean是以成員變量的方式聚合在當前bean被調用從而完成服務注冊的:
我們首先看看這個bean的初始化邏輯:
/**
* 用上述兩個bean作為入參,即NacosAutoServiceRegistration是服務注冊的核心bean
* @param registry
* @param autoServiceRegistrationProperties
* @param registration
* @return
*/
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
//基于上述初始化的NacosServiceRegistry 和NacosRegistration完成初始化
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
其內部在spring完成web容器初始化之后就會調用一個start方法,該方法內部就會調用register方法最終走到第一個bean的注冊邏輯:
public void start() {
//......
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
//發起服務注冊
register();
//......
this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
二、服務注冊請求
1. 構建服務請求入參
基于上述步驟該register最終就會走到自動裝配的第一個bean即NacosServiceRegistry發起RPC服務注冊請求,如下圖所示,可以看到NacosServiceRegistry會拿著當前服務的id、組號以及實例信息通過NacosNamingService的registerInstance方法發起服務注冊:
2. 發起RPC調用
最終NacosNamingService的registerInstance的內部會通過服務初始化的RPC客戶端即clientProxy向nacos發起服務注冊請求:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
//基于初始化得到的RPC客戶端發起服務注冊請求
clientProxy.registerService(serviceName, groupName, instance);
}
順著這個代理的調用最終就會走到NamingGrpcClientProxy的doRegisterService方法,其內部的requestToServer就是實質發起請求的方法,該方法內部會組裝RPC請求頭并基于服務名稱等入參信息組裝RPC 請求頭向9848端口發起服務注冊請求:
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
//基于服務名稱、組名、實例信息生成入參
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
//調用requestToServer發起RPC調用
requestToServer(request, Response.class);
redoService.instanceRegistered(serviceName, groupName);
}
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
throws NacosException {
try {
//基于服務名稱等入參信息組裝RPC header
request.putAllHeader(
getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
// 發起RPC調用與獲得響應
Response response =
requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
//......
} catch (NacosException e) {
//......
}
//......
}
對此筆者也通過抓包工具tcp.dstport==9848 && tcp.srcport==50155捕獲到這個請求:
將data數據序列化解析之后即可看到這個請求的數據,很明顯就是我們上文源碼調試得到的入參:
{
"headers": {},
"namespace": "public",
"serviceName": "nacos-provider",
"groupName": "DEFAULT_GROUP",
"type": "registerInstance",
"instance": {
"ip": "192.168.x.x",
"port": 8080,
"weight": 1.0,
"healthy": true,
"enabled": true,
"ephemeral": true,
"clusterName": "DEFAULT",
"metadata": {
"IPv6": "[240e:466:640:4adc:e06d:bf83:ef55:c3f]",
"preserved.register.source": "SPRING_CLOUD"
},
"ipDeleteTimeout": 30000,
"instanceHeartBeatInterval": 5000,
"instanceHeartBeatTimeOut": 15000
},
"module": "naming"
}
三、服務端解析客戶端注冊請求
1. 請求處理器的初始化
為了能夠處理不同的RPC請求,nacos根據不同的請求創建了不同的請求處理器RequestHandler,啟動時,RequestHandlerRegistry在ContextRefreshedEvent即容器刷新完成后被調用并執行擴展點函數onApplicationEvent,獲取所有的RequestHandler并緩存,便于后續收到請求后直接基于這個緩存獲取處理器處理:
@Service
public class RequestHandlerRegistry implements ApplicationListener<ContextRefreshedEvent> {
/**
* 基于擴展點遍歷所有的 RequestHandler 包括注冊實例的方法包括 instanceRequestHandler
* @param event
*/
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//獲取所有的RequestHandler
Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
Collection<RequestHandler> values = beansOfType.values();
for (RequestHandler requestHandler : values) {
//......
try {
//.......
//將rpc對應的請求處理器直接丟到registryHandlers這個map中
registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
}
//......
}
2. 注冊服務注冊監聽回調
我們再來說說nacos服務端,nacos服務端在啟動初始化時就會初始化一個BaseGrpcServer來監聽RPC請求。 而BaseGrpcServer是繼承自BaseRpcServer,所以在啟動時也沿襲父類的start調用startServer執行RPC服務端啟動:
對應的BaseGrpcServer的startServer源碼如下:
@PostConstruct
public void start() throws Exception {
String serverName = getClass().getSimpleName();
Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort());
//啟動服務
startServer();
//......
}
而startServer內部執行如下步驟:
- addServices啟動rpc請求處理器并存入緩存中
- 完成rpcServer基礎構建
- 啟動rpcServer
@Override
public void startServer() throws Exception {
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
//handlerRegistry 記錄注冊服務的容器
addServices(handlerRegistry, getSeverInterceptors().toArray(new ServerInterceptor[0]));
//rpc 為nacos端口加上1000
NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());
//......
//構建rpcserver
server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS)
.permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS).build();
//啟動rpcserver
server.start();
}
我們著重查看addServices的調用,它會創建一個處理異步RPC調用的ServerCallHandler。當客戶端發起一個請求時,grpcCommonRequestAcceptor.request方法會被調用,從而解析請求并處理返回響應:
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
//創建一個處理異步RPC調用的ServerCallHandler。當客戶端發起一個請求時,grpcCommonRequestAcceptor.request方法會被調用
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall(
(request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition
.builder(GrpcServerConstants.REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler)
.build();
//將處理器存入緩存中
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
//......
}
3. 以策略模式的姿態處理服務注冊請求
基于上述的鋪墊,當nacos收到客戶端的RPC請求時,grpcCommonRequestAcceptor的request就會收到這個請求并獲取其類型,處理器的獲取就以策略模式的姿態從requestHandlerRegistry底層的緩存registryHandlers(也就是我們上文說明的緩存處理器的容器)中獲取對應的requestHandler:
@Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
//......
//需要使用的服務器類型,例如服務注冊就是 InstanceRequest
String type = grpcRequest.getMetadata().getType();
long startTime = System.nanoTime();
//......
//基于type以策略模式的姿態到找到對應的請求處理器
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
//......
Request request = (Request) parseObj;
try {
//......
//解析參數并處理該請求
Response response = requestHandler.handleRequest(request, requestMeta);
//......
} catch (Throwable e) {
//......
}
}
上述的解析最終會走到InstanceRequestHandler即實例請求處理器,同樣的基于報文的細節定位到這個是服務注冊請求,便直接調用registerInstance進行處理:
對應我們給出InstanceRequestHandler的handle方法解析和處理的邏輯,和上文說明一致,讀者可參考注釋了解一下:
@Override
@TpsControl(pointName = "RemoteNamingInstanceRegisterDeregister", name = "RemoteNamingInstanceRegisterDeregister")
@Secured(action = ActionTypes.WRITE)
@ExtractorManager.Extractor(rpcExtractor = InstanceRequestParamExtractor.class)
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
//基于請求元信息生成服務實例
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
//......
switch (request.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE: //執行服務端的服務注冊
return registerInstance(service, request, meta);
//......
}
}
4. 緩存并通知刷新
基于上一步的調用我們來到的registerInstance方法,其內部邏輯為:
- 將注冊實例信息解析并緩存后
- 發布服務注冊事件告知訂閱者服務更新
@Override
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
//獲取單例的服務信息
Service singleton = ServiceManager.getInstance().getSingleton(service);
//......
InstancePublishInfo instanceInfo = getPublishInfo(instance);
//將服務信息緩存到publishers這個map中
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
client.recalculateRevision();
//發布服務注冊事件通知感興趣的客戶端服務上線
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
四、小結
自此我們從服務裝配、nacos服務端初始化以及客戶端自動服務注冊三個角度的源碼完成的Nacos服務注冊原理的全解析,希望對你有幫助。