Nacos 服務變更推送流程全解析
我們以Nacos 2.3.x版本為例,作為服務注冊中心,它的服務注冊和服務下線實時性感知相較于Consul的Raft一致性確認,亦或者Eureka定時拉取同步的機制來說做了很好的折中,既避免了實現的復雜性又能保證較好的實時性,所以本文將從一次服務下線的請求結合并結合源碼分析的方式來講解一下Nacos服務實例狀態變更時是如何實時推送的服務消費者的。
一、項目架構介紹
1. 服務通信流程
這里我們先簡單介紹的一下項目的架構,筆者將基于Nacos的源碼搭建一套基礎的服務注冊中心,然后提供兩個nacos-provider服務作為服務提供者(端口號分別是9001和9002),而nacos-consumer作為服務消費者通過nacos或者nacos-provider的信息發起服務調用。
基于這個架構,我們會嚴格按照如下步驟完成實驗和源碼分析:
- 將9002端口的服務提供者下線,查看服務提供者如何完成服務下線通知。
- 查看nacos收到該請求后,內部如何處理該消息,并將消息通知給消費者。
- 消費者收到該請求后,如何更新本地緩存。
2. 服務提供者查詢接口
這里我們也給出服務提供者nacos-provider的http接口,可以看到該接口會返回當前服務的名稱和端口號:
@GetMapping("/provide")
public String provide() {
log.info("請求打到服務提供者provide上");
Map<String, String> map = new HashMap<>();
map.put("provider", env.getProperty("spring.application.name"));
map.put("port", env.getProperty("server.port"));
return JSONUtil.toJsonStr(map);
}
3. 服務消費者
而服務消費者也通過feign聲明引入該調用:
@FeignClient(name = "nacos-provider")
public interface NacosProvider {
@GetMapping("/provide")
String provide();
}
后續我們就可以通過服務提供者的test接口調用到nacos-provider的provide接口:
@Resource
private NacosProvider nacosProvider;
@GetMapping("/test")
public String test() {
return nacosProvider.provide();
}
需要注意的是,nacos-consumer如果沒有顯示調用nacos-provider是不會訂閱該提供者的所有實例信息,所以我們為了方便索性在服務啟動時主動發起訂閱:
@Component
publicclass TestRunner implements CommandLineRunner {
privatefinalstatic Logger log = LoggerFactory
.getLogger(TestRunner.class);
@Override
public void run(String... args) throws Exception {
//主動向nacos發起服務訂閱請求
NamingService naming = NamingFactory.createNamingService("127.0.0.1:8848");
naming.subscribe("nacos-provider", event -> {
if (event instanceof NamingEvent) {
//日志打印監聽到的服務名稱和結果
log.info("監聽到服務名稱:{},實例信息:{}", ((NamingEvent) event).getServiceName(),
((NamingEvent) event).getInstances());
}
});
}
}
二、基于服務提供者下線詳解Nacos實例狀態推送
1. 服務提供者優雅關閉并推送服務下線消息
基于上述架構,我們通過IDEA將9002的服務提供者關閉,注意如果用IDEA停止按鈕操作就會斷開調試的連接,我們就無法調試服務下線的源碼,正確是做法是如下代碼的方式主動獲取啟動時的上下文通過close方法顯示關閉:
@SpringBootApplication
@EnableDiscoveryClient
public class NacosProviderApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(NacosProviderApplication.class, args);
//主動將springboot容器關閉
context.close();
}
}
Spring上下文close方法執行關閉操作,此時spring就會遍歷所有的虛擬機鉤子即shutdown hook,對應我們的服務提供者在啟動時注冊的shutdown Hook即NacosAutoServiceRegistration的close方法就會發起服務下線請求,一旦完成服務下線請求通知之后,服務提供者就會銷毀RPC連接以及所有工作線程:
對應的我給出這個close方法的入口,因為NacosAutoServiceRegistration繼承自AbstractAutoServiceRegistration,所以它繼承了這個抽象類的shutdown hook方法destroy,這就使得spring boot容器關閉后,就會觸發下面這個方法:
@PreDestroy
public void destroy() {
stop();
}
此時,這個stop方法在進行CAS樂觀鎖狀態修改后,執行如下兩件事:
- 發起RPC下線請求。
- 銷毀相關工作線程和nacos維護的RPC連接。
public void stop() {
if (this.getRunning().compareAndSet(true, false) && isEnabled()) {
//發起RPC下線請求
deregister();
//......
//銷毀相關工作線程和nacos維護的RPC連接
this.serviceRegistry.close();
}
}
此時deregister會通過getRegistration拿到nacos的元信息,再通過NacosServiceRegistry的deregister發起服務下線請求:
protected void deregister() {
this.serviceRegistry.deregister(getRegistration());
}
最終就會走到NacosNamingService的deregisterInstance,很直觀的看到,它通過RPC代理clientProxy傳入服務名、分組、和實例信息并調用deregisterService發起服務下線請求:
@Override
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
clientProxy.deregisterService(serviceName, groupName, instance);
}
對應我們給出發起調用時傳入的參數信息:
2. Nacos服務端基于RPC推送服務下線
隨后Nacos服務端GrpcRequestAcceptor收到該請求后,流程比較長,執行如下步驟:
- 基于請求定位到處理器RequestHandler以服務下線為例就是InstanceRequestHandler。
- InstanceRequestHandler發布一個ClientDeregisterServiceEvent事件,交由NotifyCenter投遞到任務隊列中。
- NamingEventPublisher從隊列獲取到這個任務之后,找到ClientServiceIndexesManager處理該事件。
- ClientServiceIndexesManager還是發布一個ServiceChangedEvent到上述的阻塞隊列中。
- NamingSubscriberServiceV2Impl將其封成一個延遲任務提交到tasks中。
此時有個100ms執行一次的定時器也就是PushDelayTaskExecuteEngine,將任務取出分發給TaskExecuteWorker,這個執行者就會生成RPC請求將服務狀態變更通知給所有服務消費者。
總體來說,Nacos服務端收到下線請求后,為避免下線通知影響服務端整體性能,其內部設計了一套非常好的事件通知訂閱模型,當服務端收到請求后,其內部會根據請求類型找到相應的處理器發布事件,讓對應的訂閱者異步處理該消息。基于該消息最終會封裝成指定類型的任務,提交到工作線程池中的某個worker的隊列中讓其異步消費,由此種大量解耦結合線程池的方式基于了nacos服務端最大的吞吐量和調優空間。
對應的我們也給出整體的業務流程圖,讀者可以參考該圖了解一下全過程:
對應我們找到GrpcRequestAcceptor的request方法,可以看到它會基于該請求找到對應的處理器,然后調用處理器的handleRequest方法處理該請求:
@Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
traceIfNecessary(grpcRequest, true);
//需要使用的服務器類型,例如服務下線就是 InstanceRequest
String type = grpcRequest.getMetadata().getType();
long startTime = System.nanoTime();
//......
//基于type到容器中獲取到響應的處理器
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
//......
Request request = (Request) parseObj;
try {
//組裝連接信息
Connection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
requestMeta.setAbilityTable(connection.getAbilityTable());
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
//實際處理rpc請求的方法
Response response = requestHandler.handleRequest(request, requestMeta);
//......
} catch (Throwable e) {
//......
}
}
于是就找到了InstanceRequestHandler,該方法就會通過clientOperationService(也就是EphemeralClientOperationServiceImpl)發布ClientDeregisterServiceEvent事件:
private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
//基于ClientDeregisterServiceEvent發布服務下線事件
clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
//.....
}
隨后NamingEventPublisher收到該事件后調用handleEvent找到對應的事件處理器處理器該事件:
private void handleEvents() {
while (!shutdown) {
try {
//取出上述的任務
final Event event = queue.take();
handleEvent(event);//處理發布的事件
} catch (InterruptedException e) {
//......
}
}
}
如下便是筆者的調試記錄,可以看到服務下線事件定位到了ClientServiceIndexesManager這個管理器進行處理:
于是就來到了ClientServiceIndexesManager的onEvent方法,再次發布一個ServiceChangedEvent事件到上述提到的同一個阻塞隊列中:
@Override
public void onEvent(Event event) {
if (event instanceof ClientOperationEvent.ClientReleaseEvent) {
handleClientDisconnect((ClientOperationEvent.ClientReleaseEvent) event);
} elseif (event instanceof ClientOperationEvent) {//處理服務注冊或者下線后的事件
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {//處理服務注冊事件,實際上就是發布一個ServiceChangedEvent事件
addPublisherIndexes(service, clientId);
} elseif (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) { //......
} elseif (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
//......
} elseif (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
//......
}
}
隊列會通知NamingSubscriberServiceV2Impl進行處理,它會將事件推送到延遲隊列中,這個隊列內部是采用并發安全的ConcurrentHashMap進行管理。
@Override
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) {//給客戶端的服務改變事件
// If service changed, push to all subscribers.
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
//將處理事件推送到隊列中
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
MetricsMonitor.incrementServiceChangeCount(service);
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
//......
}
}
任務提交之后就會另外一個線程processingExecutor(100ms處理一次)會將其取出后找到任務處理器處理PushDelayTaskExecuteEngine,隨后,這個任務處理引擎將任務交給NacosExecuteTaskExecuteEngine這個任務處理引擎:
protected void processTasks() {//通過remove拿出隊列的數據
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
//......
//找到相應處理器即PushDelayTaskExecuteEngine
NacosTaskProcessor processor = getProcessor(taskKey);
try {
// ReAdd task if process failed
if (!processor.process(task)) {//PushDelayTaskExecuteEngine將任務交給NacosExecuteTaskExecuteEngine這個任務處理引擎
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
//......
}
}
}
最后PushDelayTaskExecuteEngine會將該任務交給NacosExecuteTaskExecuteEngine中的某個工作線程TaskExecuteWorker的阻塞隊列中,最后TaskExecuteWorker就會取出該任務并消費:
@Override
public void run() {
while (!closed.get()) {
try {
//取出任務并處理
Runnable task = queue.take();
long begin = System.currentTimeMillis();
task.run();
//......
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e, e);
}
}
}
}
最后這個服務下線的任務即PushExecuteTask就會遍歷所有客戶端并通知它們nacos-provider下線:
@Override
public void run() {
try {
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
for (String each : getTargetClientIds()) {//逐個遍歷客戶端,然后事件推送
Client client = clientManager.getClient(each);
//......
//通過RPC接口推送服務下線通知
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));//發起RPC通知消費者
}
} catch (Exception e) {
//......
}
}
3. 消費者更新實例緩存
服務消費者RpcClient收到該請求后,會基于請求類型定位到服務端請求處理器,以我們下線通知為例就是NamingPushRequestHandler,由該處理器更新客戶端中記錄9002端口號的服務提供者nacos-provider狀態更新為下線,后續服務消費者看到緩存中記錄的提供不可用時就會調用9001端口號的nacos-provider:
對應的我們給出RpcClient的處理服務端請求的方法handleServerRequest,該方法會遍歷所有的服務端請求處理器,只要有一個處理器處理結果非空,就說明找到相應的處理器處理了,直接將響應結果返回:
protected Response handleServerRequest(final Request request) {
//.....
//遍歷所有的服務端請求處理器
for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
try {
//交給該處理器看看能否處理,若能處理則返回值非空
Response response = serverRequestHandler.requestReply(request);
//若非空說明處理完成,直接返回結果
if (response != null) {
//.....
return response;
}
} catch (Exception e) {
//.....
}
}
returnnull;
}
實際上上述的步驟會走到NamingPushRequestHandler處理服務端下線請求,該處理器會調用serviceInfoHolder將請求中的實例信息更新到緩存中,由此保證客戶端可以完成正確的服務調用:
@Override
public Response requestReply(Request request) {
if (request instanceof NotifySubscriberRequest) {
NotifySubscriberRequest notifyRequest = (NotifySubscriberRequest) request;
//從請求中拿到服務實例信息,并調用processServiceInfo更新緩存
serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo());
return new NotifySubscriberResponse();
}
return null;
}
最終ServiceInfoHolder的processServiceInfo就會基于入參拿到服務示例信息,并將緩存更新,然后發布一個實例更新的事件并將更新結果持久化到磁盤中:
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//拿到服務實例緩存
String serviceKey = serviceInfo.getKey();
//若為空直接返回
if (serviceKey == null) {
returnnull;
}
//取出緩存中原有緩存信息
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
//......
//基于請求更新緩存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//比對新舊緩存變化
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
//......
//如果緩存發生變化,則發布一個實例更新的事件InstancesChangeEvent,并將更新結果采用零拷貝的方式持久化到磁盤中
if (changed) {
//.....
//發布實例更新事件
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
//零拷貝持久化
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
對應的我們給出下線9002端口的nacos-provider下線的請求值,可以看到基于這個結果,服務端給出的可用服務實例值僅有9001號端口的nacos-provider,服務消費者基于此信息更新緩存,保證了服務消費的正確性:
三、小結
本文以代碼示例為導向通過源碼的方式完成了Nacos服務實例狀態變更推送的講解,這里筆者也簡單的補充一下個人對于源碼閱讀的一些技巧:
- 在閱讀源碼前,明確了解項目的設計理念和原理,即對項目有個基礎的認知。
- 以問題為導向針對性的進行調試理解。
- 適當查找一些高質量的源碼分析文章,針對性的梳理源碼結構。
- 如果能夠明確源碼的最終斷點,我們可以采用以終為始的方式,在目標斷點上打住,結合調試的棧幀了解整體調用過程。
- 調試過程中注意觀察各個類之間的繼承、聚合等關系,以便梳理設計架構和理念。
- 最后一點,建議直接拉取源碼進行調試,方便注釋。