成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

要給Nacos的Udp通信功能點個贊

網絡 通信技術
Nacos在服務注冊功能中使用到了UDP的通信方式,主要功能就是用來輔助服務實例變化時對客戶端進行通知。然而,對于大多數使用Nacos的程序員來說,可能還不知道這個功能,更別說靈活運用了。

[[421857]]

本文轉載自微信公眾號「程序新視界」,作者二師兄。轉載本文請聯系程序新視界公眾號。

學習不用那么功利,二師兄帶你從更高維度輕松閱讀源碼~

Nacos在服務注冊功能中使用到了UDP的通信方式,主要功能就是用來輔助服務實例變化時對客戶端進行通知。然而,對于大多數使用Nacos的程序員來說,可能還不知道這個功能,更別說靈活運用了。

看完整個源碼的實現,還是要為這一功能點個贊的,可以說非常巧妙和實用。但在實現上有一些不足,文末會進行指出。

本篇文章就帶大家從源碼層面來分析一下Nacos 2.0中是如何基于UDP協議來實現服務實例變更的通知。

UDP通知基本原理

在分析源碼之前,先來從整體上看一下Nacos中UDP的實現原理。

Nacos UDP基本原理

我們知道,UDP協議通信是雙向的,沒有所謂的客戶端和服務端,因此在客戶端和服務器端都會開啟UDP的監聽。客戶端是單獨開啟一個線程來處理UDP消息的。當采用HTTP協議與注冊中心通信時,,在客戶端調用服務訂閱接口時,會將客戶端的UPD信息(IP和端口)上送到注冊中心,注冊中心以PushClient對象來進行封裝和存儲。

當注冊中心有實例變化時,會發布一個ServiceChangeEvent事件,注冊中心監聽到這個事件之后,會遍歷存儲的PushClient,基于UDP協議對客戶端進行通知。客戶端接收到UDP通知,即可更新本地緩存的實例列表。

前面我們已經知道,基于HTTP協議進行服務注冊時,會有一個實例更新的時間差,因為是通過客戶端定時拉取服務器中的實例列表。如果拉取太頻繁,注冊中心壓力比較大,如果拉取的周期比較長,實例的變化又沒辦法快速感知到。而UDP協議的通知,恰恰彌補了這一缺點,所以說,要為基于UDP通知這個功能點個贊。

下面就來看看源碼層面是如何實現的。

客戶端UDP通知監聽與處理

客戶端在實例化NamingHttpClientProxy時,在其構造方法中會初始化PushReceiver。

  1. public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager, 
  2.         Properties properties, ServiceInfoHolder serviceInfoHolder) { 
  3.     // ... 
  4.     // 構建BeatReactor 
  5.     this.beatReactor = new BeatReactor(this, properties); 
  6.     // 構建UDP端口監聽 
  7.     this.pushReceiver = new PushReceiver(serviceInfoHolder); 
  8.     // ... 

PushReceiver的構造方法,如下:

  1. public PushReceiver(ServiceInfoHolder serviceInfoHolder) { 
  2.     try { 
  3.         // 持有ServiceInfoHolder引用 
  4.         this.serviceInfoHolder = serviceInfoHolder; 
  5.         // 獲取UDP端口 
  6.         String udpPort = getPushReceiverUdpPort(); 
  7.         // 根據端口情況,構建DatagramSocket,如果未設置端口,則采用隨機端口 
  8.         if (StringUtils.isEmpty(udpPort)) { 
  9.             this.udpSocket = new DatagramSocket(); 
  10.         } else { 
  11.             this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort))); 
  12.         } 
  13.         // 創建只有一個線程的ScheduledExecutorService 
  14.         this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { 
  15.             @Override 
  16.             public Thread newThread(Runnable r) { 
  17.                 Thread thread = new Thread(r); 
  18.                 thread.setDaemon(true); 
  19.                 thread.setName("com.alibaba.nacos.naming.push.receiver"); 
  20.                 return thread; 
  21.             } 
  22.         }); 
  23.  
  24.         // 執行線程,PushReceiver實現了Runnable接口 
  25.         this.executorService.execute(this); 
  26.     } catch (Exception e) { 
  27.         NAMING_LOGGER.error("[NA] init udp socket failed", e); 
  28.     } 

PushReceiver的構造方法做了以下操作:

  • 第一、持有ServiceInfoHolder對象引用;
  • 第二、獲取UDP端口;
  • 第三、實例化DatagramSocket對象,用于發送和接收Socket數據;
  • 第四,創建線程池,并執行PushReceiver(實現了Runnable接口);

既然PushReceiver實現了Runnable接口,run方法肯定是需要重新實現的:

  1. @Override 
  2. public void run() { 
  3.     while (!closed) { 
  4.         try { 
  5.              
  6.             // byte[] is initialized with 0 full filled by default 
  7.             byte[] buffer = new byte[UDP_MSS]; 
  8.             // 創建DatagramPacket用于存儲接收到的報文 
  9.             DatagramPacket packet = new DatagramPacket(buffer, buffer.length); 
  10.             // 接收報文,在未接收到報文時會進行線程阻塞 
  11.             udpSocket.receive(packet); 
  12.             // 將報文轉換為json格式 
  13.             String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim(); 
  14.             NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); 
  15.             // 將json格式的報文轉換為PushPacket對象 
  16.             PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class); 
  17.             String ack; 
  18.             // 如果符合條件,則調用ServiceInfoHolder進行接收報文處理,并返回應答報文 
  19.             if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) { 
  20.                 serviceInfoHolder.processServiceInfo(pushPacket.data); 
  21.                  
  22.                 // send ack to server 
  23.                 ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" 
  24.                         + "\"\"}"
  25.             } else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) { 
  26.                 // dump data to server 
  27.                 ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":" 
  28.                         + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap())) 
  29.                         + "\"}"
  30.             } else { 
  31.                 // do nothing send ack only 
  32.                 ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime 
  33.                         + "\", \"data\":" + "\"\"}"
  34.             } 
  35.             // 發送應答報文 
  36.             udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, 
  37.                     packet.getSocketAddress())); 
  38.         } catch (Exception e) { 
  39.             if (closed) { 
  40.                 return
  41.             } 
  42.             NAMING_LOGGER.error("[NA] error while receiving push data", e); 
  43.         } 
  44.     } 

PushReceiver#run方法主要處理了以下操作:

  • 第一、構建DatagramPacket用于接收報文數據;
  • 第二、通過DatagramSocket#receive方法阻塞等待報文的到來;
  • 第三、DatagramSocket#receive接收到報文之后,方法繼續執行;
  • 第四、解析JSON格式的報文為PushPacket對象;
  • 第五、判斷報文類型,調用ServiceInfoHolder#processServiceInfo處理接收到的報文信息,在該方法中會將PushPacket轉化為ServiceInfo對象;
  • 第六、封裝ACK信息(即應答報文信息);
  • 第七、通過DatagramSocket發送應答報文;

上面我們看到了Nacos客戶端是如何基于UDP進行報文的監聽和處理的,但并未找到客戶端是如何將UDP信息上送給注冊中心的。下面我們就來梳理一下,上送UDP信息的邏輯。

客戶端上送UDP信息

在NamingHttpClientProxy中存儲了UDP_PORT_PARAM,即UDP的端口參數信息。

UDP端口信息通過實例查詢類接口進行傳遞,比如:查詢實例列表、查詢單個健康實例、查詢所有實例、訂閱接口、訂閱的更新任務UpdateTask等接口。在這些方法中都調用了NamingClientProxy#queryInstancesOfService方法。

NamingHttpClientProxy中的queryInstancesOfService方法實現:

  1. @Override 
  2. public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort, 
  3.         boolean healthyOnly) throws NacosException { 
  4.     final Map<String, String> params = new HashMap<String, String>(8); 
  5.     params.put(CommonParams.NAMESPACE_ID, namespaceId); 
  6.     params.put(CommonParams.SERVICE_NAME, NamingUtils.getGroupedName(serviceName, groupName)); 
  7.     params.put(CLUSTERS_PARAM, clusters); 
  8.     // 獲取UDP端口 
  9.     params.put(UDP_PORT_PARAM, String.valueOf(udpPort)); 
  10.     params.put(CLIENT_IP_PARAM, NetUtils.localIP()); 
  11.     params.put(HEALTHY_ONLY_PARAM, String.valueOf(healthyOnly)); 
  12.     String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET); 
  13.     if (StringUtils.isNotEmpty(result)) { 
  14.         return JacksonUtils.toObj(result, ServiceInfo.class); 
  15.     } 
  16.     return new ServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), clusters); 

但查看源碼會發現,查詢實例列表、查詢單個健康實例、查詢所有實例、訂閱的更新任務UpdateTask中,UDP端口傳遞的參數值均為0。只有HTTP協議的訂閱接口取值為PushReceiver中的UDP端口號。

  1. @Override 
  2. public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { 
  3.     return queryInstancesOfService(serviceName, groupName, clusters, pushReceiver.getUdpPort(), false); 

在上面的代碼中我們已經知道PushReceiver中有一個getPushReceiverUdpPort的方法:

  1. public static String getPushReceiverUdpPort() { 
  2.     return System.getenv(PropertyKeyConst.PUSH_RECEIVER_UDP_PORT); 

很明顯,UDP的端口是通過環境變量設置的,對應的key為“push.receiver.udp.port”。

而在1.4.2版本中,HostReactor中的NamingProxy成員變量的queryList方法也會傳遞UDP端口:

  1. public void updateService(String serviceName, String clusters) throws NacosException { 
  2.     ServiceInfo oldService = getServiceInfo0(serviceName, clusters); 
  3.     try { 
  4.         String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); 
  5.         if (StringUtils.isNotEmpty(result)) { 
  6.             processServiceJson(result); 
  7.         } 
  8.     } finally { 
  9.         // ... 
  10.     } 

關于1.4.2版本中的實現,大家自行看源碼即可,這里不再展開。

完成了客戶端UDP基本信息的傳遞,再來看看服務器端是如何接收和存儲這些信息的。

UDP服務存儲

服務器端在獲取實例列表的接口中,對UDP端口進行了處理。

  1. @GetMapping("/list"
  2. @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ
  3. public Object list(HttpServletRequest request) throws Exception { 
  4.     // ... 
  5.     // 如果沒有獲得UDP端口信息,則默認端口為0 
  6.     int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort""0")); 
  7.     // ... 
  8.     // 客戶端的IP、UDP端口封裝到Subscriber對象中 
  9.     Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName, 
  10.             udpPort, clusters); 
  11.     return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly); 

在getInstanceOperator()方法中會獲得當前采用的哪個協議,然后選擇對應的處理類:

  1. /** 
  2.  * 判斷并返回采用V1版本或V2版本的操作服務 
  3.  * @return V1:Jraft協議(服務器端);V2:gRpc協議(客戶端) 
  4.  */ 
  5. private InstanceOperator getInstanceOperator() { 
  6.     return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1; 

這里具體的實現類為InstanceOperatorServiceImpl:

  1. @Override 
  2. public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster, 
  3.         boolean healthOnly) throws Exception { 
  4.     ClientInfo clientInfo = new ClientInfo(subscriber.getAgent()); 
  5.     String clientIP = subscriber.getIp(); 
  6.     ServiceInfo result = new ServiceInfo(serviceName, cluster); 
  7.     Service service = serviceManager.getService(namespaceId, serviceName); 
  8.     long cacheMillis = switchDomain.getDefaultCacheMillis(); 
  9.     // now try to enable the push 
  10.     try { 
  11.         // 處理支持UDP協議的客戶端信息 
  12.         if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) { 
  13.             subscriberServiceV1.addClient(namespaceId, serviceName, cluster, subscriber.getAgent(), 
  14.                     new InetSocketAddress(clientIP, subscriber.getPort()), pushDataSource, StringUtils.EMPTY, 
  15.                     StringUtils.EMPTY); 
  16.             cacheMillis = switchDomain.getPushCacheMillis(serviceName); 
  17.         } 
  18.     } catch (Exception e) { 
  19.         // ... 
  20.     } 
  21.     // ... 

當UDP端口大于0,且agent參數定義的客戶端支持UDP,則將對應的客戶端信息封裝到InetSocketAddress對象中,然后放入NamingSubscriberServiceV1Impl中(該類已經被廢棄,看后續如何調整該方法實現)。

在NamingSubscriberServiceV1Impl中,會將對應的參數封裝為PushClient,存放在Map當中。

  1. public void addClient(String namespaceId, String serviceName, String clusters, String agent, 
  2.         InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) { 
  3.      
  4.     PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant, 
  5.             app); 
  6.     addClient(client); 

addClient方法會將PushClient信息存放到ConcurrentMap

  1. private final ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>(); 
  2.  
  3. public void addClient(PushClient client) { 
  4.         // client is stored by key 'serviceName' because notify event is driven by serviceName change 
  5.         String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName()); 
  6.         ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey); 
  7.         if (clients == null) { 
  8.             clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024)); 
  9.             clients = clientMap.get(serviceKey); 
  10.         } 
  11.          
  12.         PushClient oldClient = clients.get(client.toString()); 
  13.         if (oldClient != null) { 
  14.             oldClient.refresh(); 
  15.         } else { 
  16.             PushClient res = clients.putIfAbsent(client.toString(), client); 
  17.            // ... 
  18.         } 
  19.     } 

此時,UDP的IP、端口信息已經封裝到PushClient當中,并存儲在NamingSubscriberServiceV1Impl的成員變量當中。

注冊中心的UDP通知

當服務端發現某個實例發生了變化,比如主動注銷了,會發布一個ServiceChangeEvent事件,UdpPushService會監聽到該事件,并進行業務處理。

在UdpPushService的onApplicationEvent方法中,會根據PushClient的具體情況進行移除或發送UDP通知。onApplicationEvent中核心邏輯代碼如下:

  1. ConcurrentMap<String, PushClient> clients = subscriberServiceV1.getClientMap() 
  2.         .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); 
  3. if (MapUtils.isEmpty(clients)) { 
  4.     return
  5.  
  6. Map<String, Object> cache = new HashMap<>(16); 
  7. long lastRefTime = System.nanoTime(); 
  8. for (PushClient client : clients.values()) { 
  9.     // 移除僵尸客戶端 
  10.     if (client.zombie()) { 
  11.         Loggers.PUSH.debug("client is zombie: " + client); 
  12.         clients.remove(client.toString()); 
  13.         Loggers.PUSH.debug("client is zombie: " + client); 
  14.         continue
  15.     } 
  16.      
  17.     AckEntry ackEntry; 
  18.     String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent()); 
  19.     byte[] compressData = null
  20.     Map<String, Object> data = null
  21.     if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) { 
  22.         org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key); 
  23.         compressData = (byte[]) (pair.getValue0()); 
  24.         data = (Map<String, Object>) pair.getValue1(); 
  25.     } 
  26.      
  27.     // 封裝AckEntry對象 
  28.     if (compressData != null) { 
  29.         ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); 
  30.     } else { 
  31.         ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime); 
  32.         if (ackEntry != null) { 
  33.             cache.put(key, new org.javatuples.Pair<>(ackEntry.getOrigin().getData(), ackEntry.getData())); 
  34.         } 
  35.     } 
  36.     // 通過UDP通知其他客戶端 
  37.     udpPush(ackEntry); 

事件處理的核心邏輯是就是先判斷PushClient的狀態信息,如果已經是僵尸客戶端,則移除。然后將發送UDP的報文信息和接收客戶端的信息封裝為AckEntry對象,然后調用udpPush方法,進行UDP消息的發送。

注冊中心的UDP接收

在看客戶端源碼的時候,我們看到客戶端不僅會接收UDP請求,而且還會進行應答。那么注冊中心怎么接收應答呢?也在UdpPushService類中,該類內部的靜態代碼塊初始化一個UDP的DatagramSocket,用來接收消息:

  1. static { 
  2.     try { 
  3.         udpSocket = new DatagramSocket(); 
  4.         Receiver receiver = new Receiver(); 
  5.         Thread inThread = new Thread(receiver); 
  6.         inThread.setDaemon(true); 
  7.         inThread.setName("com.alibaba.nacos.naming.push.receiver"); 
  8.         inThread.start(); 
  9.     } catch (SocketException e) { 
  10.         Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service"); 
  11.     } 

Receiver是一個內部類,實現了Runnable接口,在其run方法中主要就是接收報文信息,然后進行報文消息的判斷,根據判斷結果,操作本地Map中數據。

UDP設計不足

文章最開始就寫到,UDP的設計非常棒,即彌補了HTTP定時拉取的不足,又不至于太影響性能。但目前Nacos在UDP方面有一些不足,也可能是個人的吹毛求疵吧。

第一,文檔中沒有明確說明UDP的功能如何使用,這導致很多使用者在使用時并不知道UDP功能的存在,以及使用的限制條件。

第二,對云服務不友好。客戶端的UDP端口可以自定義,但服務器端的UDP端口是隨機獲取到。在云服務中,即便是內網服務,UDP端口也是被防火墻限制的。如果服務端的UDP端口是隨機獲取(客戶端默認也是),那么UDP的通信將直接被防火墻攔截掉,而用戶根本看不到任何異常(UDP協議不關注客戶端是否收到消息)。

至于這兩點,說起來算是瑕不掩瑜,讀完源碼或讀過我這篇文章的朋友大概已經知道怎么用了。后續可以給官方提一個Issue,看看是否可以改進。

小結

本文重點從三個方面講解的Nacos基于UDP的服務實例變更通知:

第一,客戶端監聽UDP端口,當接收注冊中心發來的服務實例變化,可以及時的更新本地的實例緩存;

第二,客戶端通過訂閱接口,將自身的UDP信息發送給注冊中心,注冊中心進行存儲;

第三,注冊中心中實例發生了變化,通過事件機制,將變更信息通過UDP協議發送給客戶端。

 

經過本篇文章,想必你不僅了解了Nacos中UDP協議的通知機制。同時,也開拓了一個新的思路,即如何使用UDP,在什么場景下使用UDP,以及在云服務中使用UDP可能會存在的問題。如果這篇文章對你有幫助,關注或點贊都可以。

 

責任編輯:武曉燕 來源: 程序新視界
相關推薦

2010-07-06 15:10:05

UDP協議

2010-06-21 17:51:50

UDP協議

2016-12-02 15:23:42

銳捷互聯網技術核心交換機

2021-02-04 07:54:40

JS工具擴展運算符

2022-02-08 15:37:22

微軟Windows 11

2014-09-16 17:00:02

UDP

2020-08-03 08:10:52

UDPTCP通信

2023-03-01 08:15:10

NginxNacos

2020-05-11 17:00:30

點贊MySQLRedis

2010-06-09 14:36:44

TCP與UDP協議

2020-07-02 16:40:40

MySQLRedis數據庫

2022-04-29 14:51:22

iOS蘋果電池

2025-04-07 00:55:00

RustUDP編程

2010-06-09 14:42:21

UDP協議TCP協議

2023-10-17 17:13:14

內存程序源碼

2022-11-27 08:08:42

2010-06-09 11:38:37

傳輸層通信協議

2024-10-10 17:27:12

2020-01-10 15:15:53

Redis點贊數據庫

2015-08-24 10:31:14

Windows 10功能
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 一区二区在线不卡 | 精品一区二区三区91 | 欧美中文一区 | 精品不卡 | 久久久国产视频 | 亚洲成人免费视频 | 国产精品一区二区三区久久久 | 中文字幕视频在线 | 黄色片免费在线观看 | 国产精品久久久久久久白浊 | 亚洲一区二区精品视频在线观看 | 久草资源网站 | 性色av一区二区三区 | 中文字幕日韩欧美一区二区三区 | 91高清免费 | 99re热精品视频国产免费 | 成人a在线 | 亚洲欧美精 | 中文字幕在线免费视频 | 呦呦在线视频 | 精品1区2区3区4区 | 亚洲精品乱码久久久久久按摩 | 美国一级黄色片 | 一级片视频免费观看 | 亚洲网站在线观看 | 日本精品一区二区三区视频 | 午夜影院 | 成人精品毛片国产亚洲av十九禁 | 国产粉嫩尤物极品99综合精品 | 日本激情视频中文字幕 | 精品国产99 | 国产免费一区二区三区最新6 | 日韩在线免费 | 毛片免费观看视频 | 一区二区在线 | 欧美成人a∨高清免费观看 91伊人 | 欧美成人激情视频 | 亚洲激情网站 | 91精品一区二区三区久久久久 | 91操操操| 色婷婷综合网 |