成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

微服務:剖析一下源碼,Nacos的健康檢查竟如此簡單

開發 架構
Nacos中臨時實例基于心跳上報方式維持活性,基本的健康檢查流程基本如下:Nacos客戶端會維護一個定時任務,每隔5秒發送一次心跳請求,以確保自己處于活躍狀態。

[[409195]]

本文轉載自微信公眾號「程序新視界」,作者二師兄。轉載本文請聯系程序新視界公眾號。

前言

前面我們多次提到Nacos的健康檢查,比如《微服務之:服務掛的太干脆,Nacos還沒反應過來,怎么辦?》一文中還對健康檢查進行了自定義調優。那么,Nacos的健康檢查和心跳機制到底是如何實現的呢?在項目實踐中是否又可以參考Nacos的健康檢查機制,運用于其他地方呢?

這篇文章,就帶大家來揭開Nacos健康檢查機制的面紗。

Nacos的健康檢查

Nacos中臨時實例基于心跳上報方式維持活性,基本的健康檢查流程基本如下:Nacos客戶端會維護一個定時任務,每隔5秒發送一次心跳請求,以確保自己處于活躍狀態。Nacos服務端在15秒內如果沒收到客戶端的心跳請求,會將該實例設置為不健康,在30秒內沒收到心跳,會將這個臨時實例摘除。

原理很簡單,關于代碼層的實現,下面來就逐步來進行解析。

客戶端的心跳

實例基于心跳上報的形式來維持活性,當然就離不開心跳功能的實現了。這里以客戶端心跳實現為基準來進行分析。

Spring Cloud提供了一個標準接口ServiceRegistry,Nacos對應的實現類為NacosServiceRegistry。Spring Cloud項目啟動時會實例化NacosServiceRegistry,并調用它的register方法來進行實例的注冊。

  1. @Override 
  2. public void register(Registration registration) {  
  3.    // ... 
  4.    NamingService namingService = namingService(); 
  5.    String serviceId = registration.getServiceId(); 
  6.    String group = nacosDiscoveryProperties.getGroup(); 
  7.  
  8.    Instance instance = getNacosInstanceFromRegistration(registration); 
  9.  
  10.    try { 
  11.       namingService.registerInstance(serviceId, group, instance); 
  12.       log.info("nacos registry, {} {} {}:{} register finished"group, serviceId, 
  13.             instance.getIp(), instance.getPort()); 
  14.    }catch (Exception e) { 
  15.       // ... 
  16.    } 

在該方法中有兩處需要注意,第一處是構建Instance的getNacosInstanceFromRegistration方法,該方法內會設置Instance的元數據(metadata),通過源元數據可以配置服務器端健康檢查的參數。比如,在Spring Cloud中配置的如下參數,都可以通過元數據項在服務注冊時傳遞給Nacos的服務端。

  1. spring: 
  2.   application: 
  3.     nameuser-service-provider 
  4.   cloud: 
  5.     nacos: 
  6.       discovery: 
  7.         server-addr: 127.0.0.1:8848 
  8.         heart-beat-interval: 5000 
  9.         heart-beat-timeout: 15000 
  10.        ip-delete-timeout: 30000 

其中的heart-beat-interval、heart-beat-timeout、ip-delete-timeout這些健康檢查的參數,都是基于元數據上報上去的。

register方法的第二處就是調用NamingService#registerInstance來進行實例的注冊。NamingService是由Nacos的客戶端提供,也就是說Nacos客戶端的心跳本身是由Nacos生態提供的。

在registerInstance方法中最終會調用到下面的方法:

  1. @Override 
  2. public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { 
  3.     NamingUtils.checkInstanceIsLegal(instance); 
  4.     String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); 
  5.     if (instance.isEphemeral()) { 
  6.         BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); 
  7.         beatReactor.addBeatInfo(groupedServiceName, beatInfo); 
  8.     } 
  9.     serverProxy.registerService(groupedServiceName, groupName, instance); 

其中BeatInfo#addBeatInfo便是進行心跳處理的入口。當然,前提條件是當前的實例需要是臨時(瞬時)實例。

對應的方法實現如下:

  1. public void addBeatInfo(String serviceName, BeatInfo beatInfo) { 
  2.     NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); 
  3.     String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); 
  4.     BeatInfo existBeat = null
  5.     //fix #1733 
  6.     if ((existBeat = dom2Beat.remove(key)) != null) { 
  7.         existBeat.setStopped(true); 
  8.     } 
  9.     dom2Beat.put(key, beatInfo); 
  10.     executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); 
  11.     MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); 

在倒數第二行可以看到,客戶端是通過定時任務來處理心跳的,具體的心跳請求由BeatTask完成。定時任務的執行頻次,封裝在BeatInfo,回退往上看,會發現BeatInfo的Period來源于Instance#getInstanceHeartBeatInterval()。該方法具體實現如下:

  1. public long getInstanceHeartBeatInterval() { 
  2.     return this.getMetaDataByKeyWithDefault("preserved.heart.beat.interval", Constants.DEFAULT_HEART_BEAT_INTERVAL); 

可以看出定時任務的執行間隔就是配置的metadata中的數據preserved.heart.beat.interval,與上面提到配置heart-beat-interval本質是一回事,默認是5秒。

BeatTask類具體實現如下:

  1. class BeatTask implements Runnable { 
  2.      
  3.     BeatInfo beatInfo; 
  4.      
  5.     public BeatTask(BeatInfo beatInfo) { 
  6.         this.beatInfo = beatInfo; 
  7.     } 
  8.      
  9.     @Override 
  10.     public void run() { 
  11.         if (beatInfo.isStopped()) { 
  12.             return
  13.         } 
  14.         long nextTime = beatInfo.getPeriod(); 
  15.         try { 
  16.             JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); 
  17.             long interval = result.get("clientBeatInterval").asLong(); 
  18.             boolean lightBeatEnabled = false
  19.             if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { 
  20.                 lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); 
  21.             } 
  22.             BeatReactor.this.lightBeatEnabled = lightBeatEnabled; 
  23.             if (interval > 0) { 
  24.                 nextTime = interval; 
  25.             } 
  26.             int code = NamingResponseCode.OK; 
  27.             if (result.has(CommonParams.CODE)) { 
  28.                 code = result.get(CommonParams.CODE).asInt(); 
  29.             } 
  30.             if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { 
  31.                 Instance instance = new Instance(); 
  32.                 instance.setPort(beatInfo.getPort()); 
  33.                 instance.setIp(beatInfo.getIp()); 
  34.                 instance.setWeight(beatInfo.getWeight()); 
  35.                 instance.setMetadata(beatInfo.getMetadata()); 
  36.                 instance.setClusterName(beatInfo.getCluster()); 
  37.                 instance.setServiceName(beatInfo.getServiceName()); 
  38.                 instance.setInstanceId(instance.getInstanceId()); 
  39.                 instance.setEphemeral(true); 
  40.                 try { 
  41.                     serverProxy.registerService(beatInfo.getServiceName(), 
  42.                             NamingUtils.getGroupName(beatInfo.getServiceName()), instance); 
  43.                 } catch (Exception ignore) { 
  44.                 } 
  45.             } 
  46.         } catch (NacosException ex) { 
  47.             NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}"
  48.                     JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); 
  49.              
  50.         } 
  51.         executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); 
  52.     } 

在run方法中通過NamingProxy#sendBeat完成了心跳請求的發送,而在run方法的最后,再次開啟了一個定時任務,這樣周期性的進行心跳請求。

NamingProxy#sendBeat方法實現如下:

  1. public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { 
  2.      
  3.     if (NAMING_LOGGER.isDebugEnabled()) { 
  4.         NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString()); 
  5.     } 
  6.     Map<String, String> params = new HashMap<String, String>(8); 
  7.     Map<String, String> bodyMap = new HashMap<String, String>(2); 
  8.     if (!lightBeatEnabled) { 
  9.         bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); 
  10.     } 
  11.     params.put(CommonParams.NAMESPACE_ID, namespaceId); 
  12.     params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); 
  13.     params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); 
  14.     params.put("ip", beatInfo.getIp()); 
  15.     params.put("port", String.valueOf(beatInfo.getPort())); 
  16.     String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT); 
  17.     return JacksonUtils.toObj(result); 

實際上,就是調用了Nacos服務端提供的"/nacos/v1/ns/instance/beat"服務。

在客戶端的常量類Constants中定義了心跳相關的默認參數:

  1. static { 
  2.     DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15L); 
  3.     DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L); 
  4.     DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5L); 

這樣就呼應了最開始說的Nacos健康檢查機制的幾個時間維度。

服務端接收心跳

分析客戶端的過程中已經可以看出請求的是/nacos/v1/ns/instance/beat這個服務。Nacos服務端是在Naming項目中的InstanceController中實現的。

  1. @CanDistro 
  2. @PutMapping("/beat"
  3. @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) 
  4. public ObjectNode beat(HttpServletRequest request) throws Exception { 
  5.  
  6.     // ... 
  7.     Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); 
  8.  
  9.     if (instance == null) { 
  10.         // ... 
  11.         instance = new Instance(); 
  12.         instance.setPort(clientBeat.getPort()); 
  13.         instance.setIp(clientBeat.getIp()); 
  14.         instance.setWeight(clientBeat.getWeight()); 
  15.         instance.setMetadata(clientBeat.getMetadata()); 
  16.         instance.setClusterName(clusterName); 
  17.         instance.setServiceName(serviceName); 
  18.         instance.setInstanceId(instance.getInstanceId()); 
  19.         instance.setEphemeral(clientBeat.isEphemeral()); 
  20.  
  21.         serviceManager.registerInstance(namespaceId, serviceName, instance); 
  22.     } 
  23.  
  24.     Service service = serviceManager.getService(namespaceId, serviceName); 
  25.     // ... 
  26.     service.processClientBeat(clientBeat); 
  27.     // ... 
  28.     return result; 

服務端在接收到請求時,主要做了兩件事:第一,如果發送心跳的實例不存在,則將其進行注冊;第二,調用其Service的processClientBeat方法進行心跳處理。

processClientBeat方法實現如下:

  1. public void processClientBeat(final RsInfo rsInfo) { 
  2.     ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); 
  3.     clientBeatProcessor.setService(this); 
  4.     clientBeatProcessor.setRsInfo(rsInfo); 
  5.     HealthCheckReactor.scheduleNow(clientBeatProcessor); 

再來看看ClientBeatProcessor中對具體任務的實現:

  1. @Override 
  2. public void run() { 
  3.     Service service = this.service; 
  4.     // logging     
  5.     String ip = rsInfo.getIp(); 
  6.     String clusterName = rsInfo.getCluster(); 
  7.     int port = rsInfo.getPort(); 
  8.     Cluster cluster = service.getClusterMap().get(clusterName); 
  9.     List<Instance> instances = cluster.allIPs(true); 
  10.      
  11.     for (Instance instance : instances) { 
  12.         if (instance.getIp().equals(ip) && instance.getPort() == port) { 
  13.             // logging 
  14.             instance.setLastBeat(System.currentTimeMillis()); 
  15.             if (!instance.isMarked()) { 
  16.                 if (!instance.isHealthy()) { 
  17.                     instance.setHealthy(true); 
  18.                     // logging 
  19.                     getPushService().serviceChanged(service); 
  20.                 } 
  21.             } 
  22.         } 
  23.     } 

在run方法中先檢查了發送心跳的實例和IP是否一致,如果一致則更新最后一次心跳時間。同時,如果該實例之前未被標記且處于不健康狀態,則將其改為健康狀態,并將變動通過PushService提供事件機制進行發布。事件是由Spring的ApplicationContext進行發布,事件為ServiceChangeEvent。

通過上述心跳操作,Nacos服務端的實例的健康狀態和最后心跳時間已經被刷新。那么,如果沒有收到心跳時,服務器端又是如何判斷呢?

服務端心跳檢查

客戶端發起心跳,服務器端來檢查客戶端的心跳是否正常,或者說對應的實例中的心跳更新時間是否正常。

服務器端心跳的觸發是在服務實例注冊時觸發的,同樣在InstanceController中,register注冊實現如下:

  1. @CanDistro 
  2. @PostMapping 
  3. @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) 
  4. public String register(HttpServletRequest request) throws Exception { 
  5.     // ... 
  6.     final Instance instance = parseInstance(request); 
  7.  
  8.     serviceManager.registerInstance(namespaceId, serviceName, instance); 
  9.     return "ok"

ServiceManager#registerInstance實現代碼如下:

  1. public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { 
  2.      
  3.     createEmptyService(namespaceId, serviceName, instance.isEphemeral()); 
  4.     // ... 

心跳相關實現在第一次創建空的Service中實現,最終會調到如下方法:

  1. public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) 
  2.         throws NacosException { 
  3.     Service service = getService(namespaceId, serviceName); 
  4.     if (service == null) { 
  5.          
  6.         Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); 
  7.         service = new Service(); 
  8.         service.setName(serviceName); 
  9.         service.setNamespaceId(namespaceId); 
  10.         service.setGroupName(NamingUtils.getGroupName(serviceName)); 
  11.         // now validate the service. if failed, exception will be thrown 
  12.         service.setLastModifiedMillis(System.currentTimeMillis()); 
  13.         service.recalculateChecksum(); 
  14.         if (cluster != null) { 
  15.             cluster.setService(service); 
  16.             service.getClusterMap().put(cluster.getName(), cluster); 
  17.         } 
  18.         service.validate(); 
  19.          
  20.         putServiceAndInit(service); 
  21.         if (!local) { 
  22.             addOrReplaceService(service); 
  23.         } 
  24.     } 

在putServiceAndInit方法中對Service進行初始化:

  1. private void putServiceAndInit(Service service) throws NacosException { 
  2.     putService(service); 
  3.     service = getService(service.getNamespaceId(), service.getName()); 
  4.     service.init(); 
  5.     consistencyService 
  6.             .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); 
  7.     consistencyService 
  8.             .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); 
  9.     Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); 

service.init()方法實現:

  1. public void init() { 
  2.     HealthCheckReactor.scheduleCheck(clientBeatCheckTask); 
  3.     for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { 
  4.         entry.getValue().setService(this); 
  5.         entry.getValue().init(); 
  6.     } 

HealthCheckReactor#scheduleCheck方法實現:

  1. public static void scheduleCheck(ClientBeatCheckTask task) { 
  2.     futureMap.computeIfAbsent(task.taskKey(), 
  3.             k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS)); 

延遲5秒執行,每5秒檢查一次。

在init方法的第一行便可以看到執行健康檢查的Task,具體Task是由ClientBeatCheckTask來實現,對應的run方法核心代碼如下:

  1. @Override 
  2. public void run() { 
  3.     // ...         
  4.     List<Instance> instances = service.allIPs(true); 
  5.      
  6.     // first set health status of instances: 
  7.     for (Instance instance : instances) { 
  8.         if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { 
  9.             if (!instance.isMarked()) { 
  10.                 if (instance.isHealthy()) { 
  11.                     instance.setHealthy(false); 
  12.                     // logging... 
  13.                     getPushService().serviceChanged(service); 
  14.                     ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); 
  15.                 } 
  16.             } 
  17.         } 
  18.     } 
  19.      
  20.     if (!getGlobalConfig().isExpireInstance()) { 
  21.         return
  22.     } 
  23.      
  24.     // then remove obsolete instances: 
  25.     for (Instance instance : instances) { 
  26.          
  27.         if (instance.isMarked()) { 
  28.             continue
  29.         } 
  30.          
  31.         if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { 
  32.             // delete instance 
  33.             deleteIp(instance); 
  34.         } 
  35.     } 

在第一個for循環中,先判斷當前時間與上次心跳時間的間隔是否大于超時時間。如果實例已經超時,且為被標記,且健康狀態為健康,則將健康狀態設置為不健康,同時發布狀態變化的事件。

在第二個for循環中,如果實例已經被標記則跳出循環。如果未標記,同時當前時間與上次心跳時間的間隔大于刪除IP時間,則將對應的實例刪除。

小結

 

通過本文的源碼分析,我們從Spring Cloud開始,追蹤到Nacos Client中的心跳時間,再追蹤到Nacos服務端接收心跳的實現和檢查實例是否健康的實現。想必通過整個源碼的梳理,你已經對整個Nacos心跳的實現有所了解。關注我,持續更新Nacos的最新干貨。

 

責任編輯:武曉燕 來源: 程序新視界
相關推薦

2023-03-02 07:20:10

GRPC服務健康檢查協議

2023-03-01 08:33:37

gRPC健康檢查代碼

2023-03-03 08:19:35

KubernetesgRPC

2021-06-29 21:36:21

微服務Nacos日志

2017-08-25 10:20:46

Docker容器機制

2022-02-28 07:40:23

Nacos注冊中心客戶端

2023-02-18 13:34:14

Nacos健康檢查機制

2021-07-15 10:25:15

集群節點檢查

2020-12-07 06:29:13

SpringBoot

2023-05-09 07:34:25

Docker健康檢查方式

2023-10-14 15:36:14

PodKubernetes

2024-02-27 17:30:11

2022-09-07 09:19:49

Docker健康檢查

2024-09-04 10:44:19

2022-07-08 08:37:23

Nacos服務注冊動態配置

2021-02-26 13:59:41

RocketMQProducer底層

2017-05-03 16:36:32

Android圖片動畫

2019-10-11 09:39:44

HTTP調用系統

2021-09-18 16:10:48

Spring BootJava微服務

2021-08-02 07:57:03

注冊Nacos源碼
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美男人天堂 | 一区二区不卡 | 91天堂网| 天堂一区二区三区 | 精品久久久久久中文字幕 | 亚洲社区在线 | 伊人网综合在线 | 一级做a爰片久久毛片 | 欧美a在线看 | 黄久久久 | 国产在线观看 | 日韩精品一区二区三区免费观看 | 国产三区视频在线观看 | 亚洲五码久久 | 亚洲乱码一区二区三区在线观看 | 欧美专区在线视频 | 欧美a在线看 | 一区二区不卡视频 | 91在线一区二区三区 | 国产精品成人久久久久a级 久久蜜桃av一区二区天堂 | 中文字幕一区二区在线观看 | 本道综合精品 | 日韩国产在线观看 | 亚洲精品99999| 精品欧美一区二区在线观看 | a黄视频| 中文字幕亚洲免费 | 国产精品久久久久久久久久免费看 | 激情网站在线观看 | 久久久91精品国产一区二区三区 | 成人综合视频在线观看 | 国外成人在线视频网站 | 欧美一级特黄aaa大片在线观看 | 欧美一区二区在线免费观看 | 一级日批片| 国产一区二 | 亚洲欧美精品 | 天堂网色 | 国产精品区二区三区日本 | 午夜寂寞影院在线观看 | 91精品国产乱码久久久 |