成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Nacos源碼—8.Nacos升級gRPC分析三

發(fā)布于 2025-5-15 20:11
瀏覽
0收藏

大綱

7.服務(wù)端對服務(wù)實(shí)例進(jìn)行健康檢查

8.服務(wù)下線如何注銷注冊表和客戶端等信息

9.事件驅(qū)動架構(gòu)源碼分析

7.服務(wù)端對服務(wù)實(shí)例進(jìn)行健康檢查

(1)服務(wù)端對服務(wù)實(shí)例進(jìn)行健康檢查的設(shè)計(jì)邏輯

(2)服務(wù)端對服務(wù)實(shí)例進(jìn)行健康檢查的源碼

(3)服務(wù)端檢查服務(wù)實(shí)例不健康后的注銷處理

(1)服務(wù)端對服務(wù)實(shí)例進(jìn)行健康檢查的設(shè)計(jì)邏輯

一.首先會獲取所有客戶端的Connection連接對象

Connection連接對象里有個屬性叫l(wèi)astActiveTime,表示的是最后存活時間。

二.然后判斷當(dāng)前時間-最后存活時間是否大于20s

如果大于,則把該Connection連接對象的connectionId放入到一個集合里。這個集合是一個名為outDatedConnections的待移除集合Set,此時該Connection連接對象并不會馬上刪除。

三.當(dāng)判斷完全部的Connection連接對象后會遍歷outDatedConnections集合

向遍歷到的Connection連接對象發(fā)起一次請求,確認(rèn)是否真的下線。如果響應(yīng)成功,則往successConnections集合中添加connectionId,并且刷新Connection連接對象的lastActiveTime屬性。這個機(jī)制有一個專業(yè)的名稱叫做:探活機(jī)制。

四.遍歷待移除集合進(jìn)行注銷并且在注銷之前先判斷一下是否探活成功

也就是connectionId存在于待移除集合outDatedConnections中,但是不存在于探活成功集合successConnections中,那么這個connectionId對應(yīng)的客戶端就會被注銷掉。

(2)服務(wù)端對服務(wù)實(shí)例進(jìn)行健康檢查的源碼

對服務(wù)實(shí)例進(jìn)行健康檢查的源碼入口是ConnectionManager的start()方法。

@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {
Map<String, Connection> connections = new ConcurrentHashMap<>();

//Start Task:Expel the connection which active Time expire.
@PostConstruct
public void start() {
    //Start UnHealthy Connection Expel Task.
    RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            ...
            //一.首先獲取所有的連接
            Set<Map.Entry<String, Connection>> entries = connections.entrySet();
            ...
            //二.然后判斷客戶端是否超過20s沒有發(fā)來心跳信息了,如果是則會將clientId加入outDatedConnections集合中
            Set<String> outDatedConnections = new HashSet<>();
            long now = System.currentTimeMillis();


            for (Map.Entry<String, Connection> entry : entries) {
                Connection client = entry.getValue();
                String clientIp = client.getMetaInfo().getClientIp();
                AtomicInteger integer = expelForIp.get(clientIp);
                if (integer != null && integer.intValue() > 0) {
                    integer.decrementAndGet();
                    expelClient.add(client.getMetaInfo().getConnectionId());
                    expelCount--;
                } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {//判斷心跳時間
                    //添加到待移除列表
                    outDatedConnections.add(client.getMetaInfo().getConnectionId());
                }
            }
            ...
            //client active detection.
            //三.初次檢測完超過20s的Connection連接對象后,并不會立馬進(jìn)行刪除,而是進(jìn)行探活,服務(wù)端主動請求客戶端,來確認(rèn)是否真的下線
            Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
            if (CollectionUtils.isNotEmpty(outDatedConnections)) {
                Set<String> successConnections = new HashSet<>();
                final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
                //遍歷超過20s沒有心跳的客戶端clientId
                for (String outDateConnectionId : outDatedConnections) {
                    try {
                        Connection connection = getConnection(outDateConnectionId);
                        if (connection != null) {
                            ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                            //調(diào)用GrpcConnection.asyncRequest()方法異步發(fā)送請求
                            connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                                @Override
                                public Executor getExecutor() {
                                    return null;
                                }


                                @Override
                                public long getTimeout() {
                                    return 1000L;
                                }


                                @Override
                                public void onResponse(Response response) {
                                    latch.countDown();
                                    if (response != null && response.isSuccess()) {
                                        //響應(yīng)成功刷新心跳時間
                                        connection.freshActiveTime();
                                        //并且加入到探活成功的集合列表中
                                        successConnections.add(outDateConnectionId);
                                    }
                                }

                                @Override
                                public void onException(Throwable e) {
                                    latch.countDown();
                                }
                            });
                            Loggers.REMOTE_DIGEST.info("[{}]send connection active request ", outDateConnectionId);
                        } else {
                            latch.countDown();
                        }                            
                    } catch (ConnectionAlreadyClosedException e) {
                        latch.countDown();
                    } catch (Exception e) {
                        Loggers.REMOTE_DIGEST.error("[{}]Error occurs when check client active detection ,error={}", outDateConnectionId, e);
                        latch.countDown();
                    }
                }


                latch.await(3000L, TimeUnit.MILLISECONDS);
                Loggers.REMOTE_DIGEST.info("Out dated connection check successCount={}", successConnections.size());
                //經(jīng)過探活還是不成功的Connection連接對象,就準(zhǔn)備進(jìn)行移除了
                //遍歷20s沒有心跳的客戶端,準(zhǔn)備移除客戶端信息
                for (String outDateConnectionId : outDatedConnections) {
                    //判斷探活是否成功,如果成功了則不需要移除
                    if (!successConnections.contains(outDateConnectionId)) {
                        Loggers.REMOTE_DIGEST.info("[{}]Unregister Out dated connection....", outDateConnectionId);
                        //執(zhí)行客戶端注銷邏輯
                        unregister(outDateConnectionId);
                    }
                }
            }
            ...
        }
    }, 1000L, 3000L, TimeUnit.MILLISECONDS);
}
...

}
(3)服務(wù)端檢查服務(wù)實(shí)例不健康后的注銷處理

進(jìn)行注銷處理的方法是ConnectionManager的unregister()方法。該方法主要會移除Connection連接對象 + 清除一些數(shù)據(jù),以及發(fā)布一個ClientDisconnectEvent客戶端注銷事件。

標(biāo)簽
收藏
回復(fù)
舉報(bào)
回復(fù)
相關(guān)推薦
主站蜘蛛池模板: 天堂视频中文在线 | 亚洲成人精品久久 | 日韩一级免费观看 | 久久精品免费观看 | 91一区二区三区 | 精品在线播放 | 国产午夜久久久 | 国产欧美日韩在线 | 亚洲免费在线 | 欧美日韩亚洲视频 | 国产一级片av | 精品一区二区在线观看 | 国产成人a亚洲精品 | 久久久久久久久久一区 | 精区3d动漫一品二品精区 | 中文精品视频 | 人人干视频在线 | 成人免费影院 | 亚洲精品888 | 日本一区二区高清不卡 | 亚洲精品一区二区 | 亚洲风情在线观看 | 天堂中文av | 一级黄色片在线看 | 天天操夜夜操免费视频 | 久久国产成人 | 北条麻妃av一区二区三区 | 在线欧美亚洲 | 99在线免费观看视频 | 日韩国产一区 | xx性欧美肥妇精品久久久久久 | 日韩视频―中文字幕 | 国产日韩欧美一区二区 | 国产日韩精品在线 | 国产精品久久久久久久久久免费看 | 久草青青草 | 一级久久久久久 | 精品国产第一区二区三区 | 黄网站涩免费蜜桃网站 | av在线免费观看网站 | 成人3d动漫一区二区三区91 |