要給Nacos的Udp通信功能點個贊
本文轉載自微信公眾號「程序新視界」,作者二師兄。轉載本文請聯系程序新視界公眾號。
學習不用那么功利,二師兄帶你從更高維度輕松閱讀源碼~
Nacos在服務注冊功能中使用到了UDP的通信方式,主要功能就是用來輔助服務實例變化時對客戶端進行通知。然而,對于大多數使用Nacos的程序員來說,可能還不知道這個功能,更別說靈活運用了。
看完整個源碼的實現,還是要為這一功能點個贊的,可以說非常巧妙和實用。但在實現上有一些不足,文末會進行指出。
本篇文章就帶大家從源碼層面來分析一下Nacos 2.0中是如何基于UDP協議來實現服務實例變更的通知。
UDP通知基本原理
在分析源碼之前,先來從整體上看一下Nacos中UDP的實現原理。
Nacos UDP基本原理
我們知道,UDP協議通信是雙向的,沒有所謂的客戶端和服務端,因此在客戶端和服務器端都會開啟UDP的監聽。客戶端是單獨開啟一個線程來處理UDP消息的。當采用HTTP協議與注冊中心通信時,,在客戶端調用服務訂閱接口時,會將客戶端的UPD信息(IP和端口)上送到注冊中心,注冊中心以PushClient對象來進行封裝和存儲。
當注冊中心有實例變化時,會發布一個ServiceChangeEvent事件,注冊中心監聽到這個事件之后,會遍歷存儲的PushClient,基于UDP協議對客戶端進行通知。客戶端接收到UDP通知,即可更新本地緩存的實例列表。
前面我們已經知道,基于HTTP協議進行服務注冊時,會有一個實例更新的時間差,因為是通過客戶端定時拉取服務器中的實例列表。如果拉取太頻繁,注冊中心壓力比較大,如果拉取的周期比較長,實例的變化又沒辦法快速感知到。而UDP協議的通知,恰恰彌補了這一缺點,所以說,要為基于UDP通知這個功能點個贊。
下面就來看看源碼層面是如何實現的。
客戶端UDP通知監聽與處理
客戶端在實例化NamingHttpClientProxy時,在其構造方法中會初始化PushReceiver。
- public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager,
- Properties properties, ServiceInfoHolder serviceInfoHolder) {
- // ...
- // 構建BeatReactor
- this.beatReactor = new BeatReactor(this, properties);
- // 構建UDP端口監聽
- this.pushReceiver = new PushReceiver(serviceInfoHolder);
- // ...
- }
PushReceiver的構造方法,如下:
- public PushReceiver(ServiceInfoHolder serviceInfoHolder) {
- try {
- // 持有ServiceInfoHolder引用
- this.serviceInfoHolder = serviceInfoHolder;
- // 獲取UDP端口
- String udpPort = getPushReceiverUdpPort();
- // 根據端口情況,構建DatagramSocket,如果未設置端口,則采用隨機端口
- if (StringUtils.isEmpty(udpPort)) {
- this.udpSocket = new DatagramSocket();
- } else {
- this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
- }
- // 創建只有一個線程的ScheduledExecutorService
- this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setDaemon(true);
- thread.setName("com.alibaba.nacos.naming.push.receiver");
- return thread;
- }
- });
- // 執行線程,PushReceiver實現了Runnable接口
- this.executorService.execute(this);
- } catch (Exception e) {
- NAMING_LOGGER.error("[NA] init udp socket failed", e);
- }
- }
PushReceiver的構造方法做了以下操作:
- 第一、持有ServiceInfoHolder對象引用;
- 第二、獲取UDP端口;
- 第三、實例化DatagramSocket對象,用于發送和接收Socket數據;
- 第四,創建線程池,并執行PushReceiver(實現了Runnable接口);
既然PushReceiver實現了Runnable接口,run方法肯定是需要重新實現的:
- @Override
- public void run() {
- while (!closed) {
- try {
- // byte[] is initialized with 0 full filled by default
- byte[] buffer = new byte[UDP_MSS];
- // 創建DatagramPacket用于存儲接收到的報文
- DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
- // 接收報文,在未接收到報文時會進行線程阻塞
- udpSocket.receive(packet);
- // 將報文轉換為json格式
- String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
- NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
- // 將json格式的報文轉換為PushPacket對象
- PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
- String ack;
- // 如果符合條件,則調用ServiceInfoHolder進行接收報文處理,并返回應答報文
- if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {
- serviceInfoHolder.processServiceInfo(pushPacket.data);
- // send ack to server
- ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
- + "\"\"}";
- } else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {
- // dump data to server
- ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
- + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
- + "\"}";
- } else {
- // do nothing send ack only
- ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
- + "\", \"data\":" + "\"\"}";
- }
- // 發送應答報文
- udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
- packet.getSocketAddress()));
- } catch (Exception e) {
- if (closed) {
- return;
- }
- NAMING_LOGGER.error("[NA] error while receiving push data", e);
- }
- }
- }
PushReceiver#run方法主要處理了以下操作:
- 第一、構建DatagramPacket用于接收報文數據;
- 第二、通過DatagramSocket#receive方法阻塞等待報文的到來;
- 第三、DatagramSocket#receive接收到報文之后,方法繼續執行;
- 第四、解析JSON格式的報文為PushPacket對象;
- 第五、判斷報文類型,調用ServiceInfoHolder#processServiceInfo處理接收到的報文信息,在該方法中會將PushPacket轉化為ServiceInfo對象;
- 第六、封裝ACK信息(即應答報文信息);
- 第七、通過DatagramSocket發送應答報文;
上面我們看到了Nacos客戶端是如何基于UDP進行報文的監聽和處理的,但并未找到客戶端是如何將UDP信息上送給注冊中心的。下面我們就來梳理一下,上送UDP信息的邏輯。
客戶端上送UDP信息
在NamingHttpClientProxy中存儲了UDP_PORT_PARAM,即UDP的端口參數信息。
UDP端口信息通過實例查詢類接口進行傳遞,比如:查詢實例列表、查詢單個健康實例、查詢所有實例、訂閱接口、訂閱的更新任務UpdateTask等接口。在這些方法中都調用了NamingClientProxy#queryInstancesOfService方法。
NamingHttpClientProxy中的queryInstancesOfService方法實現:
- @Override
- public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
- boolean healthyOnly) throws NacosException {
- final Map<String, String> params = new HashMap<String, String>(8);
- params.put(CommonParams.NAMESPACE_ID, namespaceId);
- params.put(CommonParams.SERVICE_NAME, NamingUtils.getGroupedName(serviceName, groupName));
- params.put(CLUSTERS_PARAM, clusters);
- // 獲取UDP端口
- params.put(UDP_PORT_PARAM, String.valueOf(udpPort));
- params.put(CLIENT_IP_PARAM, NetUtils.localIP());
- params.put(HEALTHY_ONLY_PARAM, String.valueOf(healthyOnly));
- String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
- if (StringUtils.isNotEmpty(result)) {
- return JacksonUtils.toObj(result, ServiceInfo.class);
- }
- return new ServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), clusters);
- }
但查看源碼會發現,查詢實例列表、查詢單個健康實例、查詢所有實例、訂閱的更新任務UpdateTask中,UDP端口傳遞的參數值均為0。只有HTTP協議的訂閱接口取值為PushReceiver中的UDP端口號。
- @Override
- public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
- return queryInstancesOfService(serviceName, groupName, clusters, pushReceiver.getUdpPort(), false);
- }
在上面的代碼中我們已經知道PushReceiver中有一個getPushReceiverUdpPort的方法:
- public static String getPushReceiverUdpPort() {
- return System.getenv(PropertyKeyConst.PUSH_RECEIVER_UDP_PORT);
- }
很明顯,UDP的端口是通過環境變量設置的,對應的key為“push.receiver.udp.port”。
而在1.4.2版本中,HostReactor中的NamingProxy成員變量的queryList方法也會傳遞UDP端口:
- public void updateService(String serviceName, String clusters) throws NacosException {
- ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
- try {
- String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
- if (StringUtils.isNotEmpty(result)) {
- processServiceJson(result);
- }
- } finally {
- // ...
- }
- }
關于1.4.2版本中的實現,大家自行看源碼即可,這里不再展開。
完成了客戶端UDP基本信息的傳遞,再來看看服務器端是如何接收和存儲這些信息的。
UDP服務存儲
服務器端在獲取實例列表的接口中,對UDP端口進行了處理。
- @GetMapping("/list")
- @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
- public Object list(HttpServletRequest request) throws Exception {
- // ...
- // 如果沒有獲得UDP端口信息,則默認端口為0
- int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
- // ...
- // 客戶端的IP、UDP端口封裝到Subscriber對象中
- Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName,
- udpPort, clusters);
- return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly);
- }
在getInstanceOperator()方法中會獲得當前采用的哪個協議,然后選擇對應的處理類:
- /**
- * 判斷并返回采用V1版本或V2版本的操作服務
- * @return V1:Jraft協議(服務器端);V2:gRpc協議(客戶端)
- */
- private InstanceOperator getInstanceOperator() {
- return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
- }
這里具體的實現類為InstanceOperatorServiceImpl:
- @Override
- public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
- boolean healthOnly) throws Exception {
- ClientInfo clientInfo = new ClientInfo(subscriber.getAgent());
- String clientIP = subscriber.getIp();
- ServiceInfo result = new ServiceInfo(serviceName, cluster);
- Service service = serviceManager.getService(namespaceId, serviceName);
- long cacheMillis = switchDomain.getDefaultCacheMillis();
- // now try to enable the push
- try {
- // 處理支持UDP協議的客戶端信息
- if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) {
- subscriberServiceV1.addClient(namespaceId, serviceName, cluster, subscriber.getAgent(),
- new InetSocketAddress(clientIP, subscriber.getPort()), pushDataSource, StringUtils.EMPTY,
- StringUtils.EMPTY);
- cacheMillis = switchDomain.getPushCacheMillis(serviceName);
- }
- } catch (Exception e) {
- // ...
- }
- // ...
- }
當UDP端口大于0,且agent參數定義的客戶端支持UDP,則將對應的客戶端信息封裝到InetSocketAddress對象中,然后放入NamingSubscriberServiceV1Impl中(該類已經被廢棄,看后續如何調整該方法實現)。
在NamingSubscriberServiceV1Impl中,會將對應的參數封裝為PushClient,存放在Map當中。
- public void addClient(String namespaceId, String serviceName, String clusters, String agent,
- InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {
- PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant,
- app);
- addClient(client);
- }
addClient方法會將PushClient信息存放到ConcurrentMap
- private final ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>();
- public void addClient(PushClient client) {
- // client is stored by key 'serviceName' because notify event is driven by serviceName change
- String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
- ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);
- if (clients == null) {
- clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));
- clients = clientMap.get(serviceKey);
- }
- PushClient oldClient = clients.get(client.toString());
- if (oldClient != null) {
- oldClient.refresh();
- } else {
- PushClient res = clients.putIfAbsent(client.toString(), client);
- // ...
- }
- }
此時,UDP的IP、端口信息已經封裝到PushClient當中,并存儲在NamingSubscriberServiceV1Impl的成員變量當中。
注冊中心的UDP通知
當服務端發現某個實例發生了變化,比如主動注銷了,會發布一個ServiceChangeEvent事件,UdpPushService會監聽到該事件,并進行業務處理。
在UdpPushService的onApplicationEvent方法中,會根據PushClient的具體情況進行移除或發送UDP通知。onApplicationEvent中核心邏輯代碼如下:
- ConcurrentMap<String, PushClient> clients = subscriberServiceV1.getClientMap()
- .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
- if (MapUtils.isEmpty(clients)) {
- return;
- }
- Map<String, Object> cache = new HashMap<>(16);
- long lastRefTime = System.nanoTime();
- for (PushClient client : clients.values()) {
- // 移除僵尸客戶端
- if (client.zombie()) {
- Loggers.PUSH.debug("client is zombie: " + client);
- clients.remove(client.toString());
- Loggers.PUSH.debug("client is zombie: " + client);
- continue;
- }
- AckEntry ackEntry;
- String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
- byte[] compressData = null;
- Map<String, Object> data = null;
- if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
- org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
- compressData = (byte[]) (pair.getValue0());
- data = (Map<String, Object>) pair.getValue1();
- }
- // 封裝AckEntry對象
- if (compressData != null) {
- ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
- } else {
- ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
- if (ackEntry != null) {
- cache.put(key, new org.javatuples.Pair<>(ackEntry.getOrigin().getData(), ackEntry.getData()));
- }
- }
- // 通過UDP通知其他客戶端
- udpPush(ackEntry);
- }
事件處理的核心邏輯是就是先判斷PushClient的狀態信息,如果已經是僵尸客戶端,則移除。然后將發送UDP的報文信息和接收客戶端的信息封裝為AckEntry對象,然后調用udpPush方法,進行UDP消息的發送。
注冊中心的UDP接收
在看客戶端源碼的時候,我們看到客戶端不僅會接收UDP請求,而且還會進行應答。那么注冊中心怎么接收應答呢?也在UdpPushService類中,該類內部的靜態代碼塊初始化一個UDP的DatagramSocket,用來接收消息:
- static {
- try {
- udpSocket = new DatagramSocket();
- Receiver receiver = new Receiver();
- Thread inThread = new Thread(receiver);
- inThread.setDaemon(true);
- inThread.setName("com.alibaba.nacos.naming.push.receiver");
- inThread.start();
- } catch (SocketException e) {
- Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
- }
- }
Receiver是一個內部類,實現了Runnable接口,在其run方法中主要就是接收報文信息,然后進行報文消息的判斷,根據判斷結果,操作本地Map中數據。
UDP設計不足
文章最開始就寫到,UDP的設計非常棒,即彌補了HTTP定時拉取的不足,又不至于太影響性能。但目前Nacos在UDP方面有一些不足,也可能是個人的吹毛求疵吧。
第一,文檔中沒有明確說明UDP的功能如何使用,這導致很多使用者在使用時并不知道UDP功能的存在,以及使用的限制條件。
第二,對云服務不友好。客戶端的UDP端口可以自定義,但服務器端的UDP端口是隨機獲取到。在云服務中,即便是內網服務,UDP端口也是被防火墻限制的。如果服務端的UDP端口是隨機獲取(客戶端默認也是),那么UDP的通信將直接被防火墻攔截掉,而用戶根本看不到任何異常(UDP協議不關注客戶端是否收到消息)。
至于這兩點,說起來算是瑕不掩瑜,讀完源碼或讀過我這篇文章的朋友大概已經知道怎么用了。后續可以給官方提一個Issue,看看是否可以改進。
小結
本文重點從三個方面講解的Nacos基于UDP的服務實例變更通知:
第一,客戶端監聽UDP端口,當接收注冊中心發來的服務實例變化,可以及時的更新本地的實例緩存;
第二,客戶端通過訂閱接口,將自身的UDP信息發送給注冊中心,注冊中心進行存儲;
第三,注冊中心中實例發生了變化,通過事件機制,將變更信息通過UDP協議發送給客戶端。
經過本篇文章,想必你不僅了解了Nacos中UDP協議的通知機制。同時,也開拓了一個新的思路,即如何使用UDP,在什么場景下使用UDP,以及在云服務中使用UDP可能會存在的問題。如果這篇文章對你有幫助,關注或點贊都可以。