談?wù)凞ubbo負(fù)載均衡是如何實(shí)現(xiàn)的?

dubbo的負(fù)載均衡全部由AbstractLoadBalance的子類來(lái)實(shí)現(xiàn)
RandomLoadBalance 隨機(jī)
在一個(gè)截面上碰撞的概率高,但調(diào)用量越大分布越均勻,而且按概率使用權(quán)重后也比較均勻,有利于動(dòng)態(tài)調(diào)整提供者權(quán)重。
- 獲取invoker的數(shù)量
- 獲取第一個(gè)invoker的權(quán)重,并復(fù)制給firstWeight
- 循環(huán)invoker集合,把它們的權(quán)重全部相加,并復(fù)制給totalWeight,如果權(quán)重不相等,那么sameWeight為false
- 如果invoker集合的權(quán)重并不是全部相等的,那么獲取一個(gè)隨機(jī)數(shù)在1到totalWeight之間,賦值給offset屬性
- 循環(huán)遍歷invoker集合,獲取權(quán)重并與offset相減,當(dāng)offset減到小于零,那么就返回這個(gè)inovker
- 如果權(quán)重相等,那么直接在invoker集合里面取一個(gè)隨機(jī)數(shù)返回
- @Override
- protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- int length = invokers.size(); // Number of invokers
- boolean sameWeight = true; // Every invoker has the same weight?
- int firstWeight = getWeight(invokers.get(0), invocation);
- int totalWeight = firstWeight; // The sum of weights
- for (int i = 1; i < length; i++) {
- int weight = getWeight(invokers.get(i), invocation);
- totalWeight += weight; // Sum
- if (sameWeight && weight != firstWeight) {
- sameWeight = false;
- }
- }
- if (totalWeight > 0 && !sameWeight) {
- // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
- int offset = ThreadLocalRandom.current().nextInt(totalWeight);
- // Return a invoker based on the random value.
- for (int i = 0; i < length; i++) {
- offset -= getWeight(invokers.get(i), invocation);
- if (offset < 0) {
- return invokers.get(i);
- }
- }
- }
- // If all invokers have the same weight value or totalWeight=0, return evenly.
- return invokers.get(ThreadLocalRandom.current().nextInt(length));
- }
RoundRobinLoadBalance 輪詢
存在慢的提供者累積請(qǐng)求的問(wèn)題,比如:第二臺(tái)機(jī)器很慢,但沒(méi)掛,當(dāng)請(qǐng)求調(diào)到第二臺(tái)時(shí)就卡在那,久而久之,所有請(qǐng)求都卡在調(diào)到第二臺(tái)上。
在老的版本上,dubbo會(huì)求出最大權(quán)重和最小權(quán)重,如果權(quán)重相等,那么就直接按取模的方式,每次取完后值加一;如果權(quán)重不相等,順序根據(jù)權(quán)重分配。
在新的版本上,對(duì)這個(gè)類進(jìn)行了重構(gòu)。
- 從methodWeightMap這個(gè)實(shí)例中根據(jù)ServiceKey+MethodName的方式獲取里面的一個(gè)map實(shí)例,如果沒(méi)有則說(shuō)明第一次進(jìn)到該方法,則實(shí)例化一個(gè)放入到methodWeightMap中,并把獲取到的實(shí)例命名為map
- 遍歷所有的invokers
- 拿到當(dāng)前的invoker的identifyString作為key,去map里獲取weightedRoundRobin實(shí)例,如果map里沒(méi)有則添加一個(gè)
- 如果weightedRoundRobin的權(quán)重和當(dāng)前invoker的權(quán)重不同,說(shuō)明權(quán)重變了,需要重新設(shè)置
- 獲取當(dāng)前invoker所對(duì)應(yīng)的weightedRoundRobin實(shí)例中的current,并加上當(dāng)前invoker的權(quán)重
- 設(shè)置weightedRoundRobin最后的更新時(shí)間
- maxCurrent一開(kāi)始是設(shè)置的0,如果當(dāng)前的weightedRoundRobin的current值大于maxCurrent則進(jìn)行賦值
- 遍歷完后會(huì)得到最大的權(quán)重的invoker的selectedInvoker和這個(gè)invoker所對(duì)應(yīng)的weightedRoundRobin賦值給了selectedWRR,還有權(quán)重之和totalWeight
- 然后把selectedWRR里的current屬性減去totalWeight,并返回selectedInvoker
這樣看顯然是不夠清晰的,我們來(lái)舉個(gè)例子:
- 假定有3臺(tái)dubbo provider:
- 10.0.0.1:20884, weight=2
- 10.0.0.1:20886, weight=3
- 10.0.0.1:20888, weight=4
- totalWeight=9;
- 那么第一次調(diào)用的時(shí)候:
- 10.0.0.1:20884, weight=2 selectedWRR -> current = 2
- 10.0.0.1:20886, weight=3 selectedWRR -> current = 3
- 10.0.0.1:20888, weight=4 selectedWRR -> current = 4
- selectedInvoker-> 10.0.0.1:20888
- 調(diào)用 selectedWRR.sel(totalWeight);
- 10.0.0.1:20888, weight=4 selectedWRR -> current = -5
- 返回10.0.0.1:20888這個(gè)實(shí)例
- 那么第二次調(diào)用的時(shí)候:
- 10.0.0.1:20884, weight=2 selectedWRR -> current = 4
- 10.0.0.1:20886, weight=3 selectedWRR -> current = 6
- 10.0.0.1:20888, weight=4 selectedWRR -> current = -1
- selectedInvoker-> 10.0.0.1:20886
- 調(diào)用 selectedWRR.sel(totalWeight);
- 10.0.0.1:20886 , weight=4 selectedWRR -> current = -3
- 返回10.0.0.1:20886這個(gè)實(shí)例
- 那么第三次調(diào)用的時(shí)候:
- 10.0.0.1:20884, weight=2 selectedWRR -> current = 6
- 10.0.0.1:20886, weight=3 selectedWRR -> current = 0
- 10.0.0.1:20888, weight=4 selectedWRR -> current = 3
- selectedInvoker-> 10.0.0.1:20884
- 調(diào)用 selectedWRR.sel(totalWeight);
- 10.0.0.1:20884, weight=2 selectedWRR -> current = -3
- 返回10.0.0.1:20884這個(gè)實(shí)例
- protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
- ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
- if (map == null) {
- methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
- map = methodWeightMap.get(key);
- }
- int totalWeight = 0;
- long maxCurrent = Long.MIN_VALUE;
- long now = System.currentTimeMillis();
- Invoker<T> selectedInvoker = null;
- WeightedRoundRobin selectedWRR = null;
- for (Invoker<T> invoker : invokers) {
- String identifyString = invoker.getUrl().toIdentityString();
- WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
- int weight = getWeight(invoker, invocation);
- if (weight < 0) {
- weight = 0;
- }
- if (weightedRoundRobin == null) {
- weightedRoundRobin = new WeightedRoundRobin();
- weightedRoundRobin.setWeight(weight);
- map.putIfAbsent(identifyString, weightedRoundRobin);
- weightedRoundRobin = map.get(identifyString);
- }
- if (weight != weightedRoundRobin.getWeight()) {
- //weight changed
- weightedRoundRobin.setWeight(weight);
- }
- long cur = weightedRoundRobin.increaseCurrent();
- weightedRoundRobin.setLastUpdate(now);
- if (cur > maxCurrent) {
- maxCurrent = cur;
- selectedInvoker = invoker;
- selectedWRR = weightedRoundRobin;
- }
- totalWeight += weight;
- }
- if (!updateLock.get() && invokers.size() != map.size()) {
- if (updateLock.compareAndSet(false, true)) {
- try {
- // copy -> modify -> update reference
- ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
- newMap.putAll(map);
- Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, WeightedRoundRobin> item = it.next();
- if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
- it.remove();
- }
- }
- methodWeightMap.put(key, newMap);
- } finally {
- updateLock.set(false);
- }
- }
- }
- if (selectedInvoker != null) {
- selectedWRR.sel(totalWeight);
- return selectedInvoker;
- }
- // should not happen here
- return invokers.get(0);
- }
LeastActiveLoadBalance 最少活躍調(diào)用數(shù)
使慢的提供者收到更少請(qǐng)求,因?yàn)樵铰奶峁┱叩恼{(diào)用前后計(jì)數(shù)差會(huì)越大。
- 遍歷所有的invoker
- 獲取當(dāng)前invoker的活躍數(shù),調(diào)用的是RpcStatus的getStatus方法,過(guò)濾器里面會(huì)記錄每個(gè)方法的活躍數(shù)
- 獲取當(dāng)前invoker的權(quán)重
- 如果是第一次進(jìn)來(lái)或者是當(dāng)前invoker的活躍數(shù)比最小的活躍數(shù)還小
- 那么把leastActive設(shè)置為當(dāng)前invoker的活躍數(shù),設(shè)置leastCount為1,leastIndexes數(shù)組的第一個(gè)位置設(shè)置為1,記錄一下totalWeight和firstWeight
- 如果不滿足第4點(diǎn)的條件,那么判斷當(dāng)前invoker的活躍數(shù)和最小的活躍數(shù)是否相等
- 如果滿足第6點(diǎn),那么把當(dāng)前的權(quán)重加入到totalWeight中,并把leastIndexes數(shù)組中記錄一下最小活躍數(shù)相同的下標(biāo);再看一下是否所有的權(quán)重相同
- 如果invoker集合中只有一個(gè)invoker活躍數(shù)是最小的,那么直接返回
- 如果權(quán)重不相等,隨機(jī)權(quán)重后,判斷在哪個(gè) Invoker 的權(quán)重區(qū)間中
- 權(quán)重相等,直接隨機(jī)選擇 Invoker 即可
- 最小活躍數(shù)算法實(shí)現(xiàn):
- 假定有3臺(tái)dubbo provider:
- 10.0.0.1:20884, weight=2,active=2
- 10.0.0.1:20886, weight=3,active=4
- 10.0.0.1:20888, weight=4,active=3
- active=2最小,且只有一個(gè)2,所以選擇10.0.0.1:20884
- 假定有3臺(tái)dubbo provider:
- 10.0.0.1:20884, weight=2,active=2
- 10.0.0.1:20886, weight=3,active=2
- 10.0.0.1:20888, weight=4,active=3
- active=2最小,且有2個(gè),所以從[10.0.0.1:20884,10.0.0.1:20886 ]中選擇;
- 接下來(lái)的算法與隨機(jī)算法類似:
- 假設(shè)offset=1(即random.nextInt(5)=1)
- 1-2=-1<0?是,所以選中 10.0.0.1:20884, weight=2
- 假設(shè)offset=4(即random.nextInt(5)=4)
- 4-2=2<0?否,這時(shí)候offset=2, 2-3<0?是,所以選中 10.0.0.1:20886, weight=3
- 1: public class LeastActiveLoadBalance extends AbstractLoadBalance {
- 2:
- 3: public static final String NAME = "leastactive";
- 4:
- 5: private final Random random = new Random();
- 6:
- 7: @Override
- 8: protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- 9: int length = invokers.size(); // 總個(gè)數(shù)
- 10: int leastActive = -1; // 最小的活躍數(shù)
- 11: int leastCount = 0; // 相同最小活躍數(shù)的個(gè)數(shù)
- 12: int[] leastIndexes = new int[length]; // 相同最小活躍數(shù)的下標(biāo)
- 13: int totalWeight = 0; // 總權(quán)重
- 14: int firstWeight = 0; // 第一個(gè)權(quán)重,用于于計(jì)算是否相同
- 15: boolean sameWeight = true; // 是否所有權(quán)重相同
- 16: // 計(jì)算獲得相同最小活躍數(shù)的數(shù)組和個(gè)數(shù)
- 17: for (int i = 0; i < length; i++) {
- 18: Invoker<T> invoker = invokers.get(i);
- 19: int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活躍數(shù)
- 20: int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 權(quán)重
- 21: if (leastActive == -1 || active < leastActive) { // 發(fā)現(xiàn)更小的活躍數(shù),重新開(kāi)始
- 22: leastActive = active; // 記錄最小活躍數(shù)
- 23: leastCount = 1; // 重新統(tǒng)計(jì)相同最小活躍數(shù)的個(gè)數(shù)
- 24: leastIndexes[0] = i; // 重新記錄最小活躍數(shù)下標(biāo)
- 25: totalWeight = weight; // 重新累計(jì)總權(quán)重
- 26: firstWeight = weight; // 記錄第一個(gè)權(quán)重
- 27: sameWeight = true; // 還原權(quán)重相同標(biāo)識(shí)
- 28: } else if (active == leastActive) { // 累計(jì)相同最小的活躍數(shù)
- 29: leastIndexes[leastCount++] = i; // 累計(jì)相同最小活躍數(shù)下標(biāo)
- 30: totalWeight += weight; // 累計(jì)總權(quán)重
- 31: // 判斷所有權(quán)重是否一樣
- 32: if (sameWeight && weight != firstWeight) {
- 33: sameWeight = false;
- 34: }
- 35: }
- 36: }
- 37: // assert(leastCount > 0)
- 38: if (leastCount == 1) {
- 39: // 如果只有一個(gè)最小則直接返回
- 40: return invokers.get(leastIndexes[0]);
- 41: }
- 42: if (!sameWeight && totalWeight > 0) {
- 43: // 如果權(quán)重不相同且權(quán)重大于0則按總權(quán)重?cái)?shù)隨機(jī)
- 44: int offsetWeight = random.nextInt(totalWeight);
- 45: // 并確定隨機(jī)值落在哪個(gè)片斷上
- 46: for (int i = 0; i < leastCount; i++) {
- 47: int leastIndex = leastIndexes[i];
- 48: offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
- 49: if (offsetWeight <= 0) {
- 50: return invokers.get(leastIndex);
- 51: }
- 52: }
- 53: }
- 54: // 如果權(quán)重相同或權(quán)重為0則均等隨機(jī)
- 55: return invokers.get(leastIndexes[random.nextInt(leastCount)]);
- 56: }
- 57:
- 58: }
ConsistentHashLoadBalance 一致性 Hash
相同參數(shù)的請(qǐng)求總是發(fā)到同一提供者。當(dāng)某一臺(tái)提供者掛時(shí),原本發(fā)往該提供者的請(qǐng)求,基于虛擬節(jié)點(diǎn),平攤到其它提供者,不會(huì)引起劇烈變動(dòng)。
- 基于 invokers 集合,根據(jù)對(duì)象內(nèi)存地址來(lái)計(jì)算定義哈希值
- 獲得 ConsistentHashSelector 對(duì)象。若為空,或者定義哈希值變更(說(shuō)明 invokers 集合發(fā)生變化),進(jìn)行創(chuàng)建新的 ConsistentHashSelector 對(duì)象
- 調(diào)用ConsistentHashSelector對(duì)象的select方法
- 1: public class ConsistentHashLoadBalance extends AbstractLoadBalance {
- 2:
- 3: /**
- 4: * 服務(wù)方法與一致性哈希選擇器的映射
- 5: *
- 6: * KEY:serviceKey + "." + methodName
- 7: */
- 8: private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
- 9:
- 10: @SuppressWarnings("unchecked")
- 11: @Override
- 12: protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
- 13: String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
- 14: // 基于 invokers 集合,根據(jù)對(duì)象內(nèi)存地址來(lái)計(jì)算定義哈希值
- 15: int identityHashCode = System.identityHashCode(invokers);
- 16: // 獲得 ConsistentHashSelector 對(duì)象。若為空,或者定義哈希值變更(說(shuō)明 invokers 集合發(fā)生變化),進(jìn)行創(chuàng)建新的 ConsistentHashSelector 對(duì)象
- 17: ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
- 18: if (selector == null || selector.identityHashCode != identityHashCode) {
- 19: selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
- 20: selector = (ConsistentHashSelector<T>) selectors.get(key);
- 21: }
- 22: return selector.select(invocation);
- 23: }
- 24: }
ConsistentHashSelector 一致性哈希選擇器
ConsistentHashSelector ,是 ConsistentHashLoadBalance 的內(nèi)部類,一致性哈希選擇器,基于 Ketama 算法。
- /**
- * 虛擬節(jié)點(diǎn)與 Invoker 的映射關(guān)系
- */
- private final TreeMap<Long, Invoker<T>> virtualInvokers;
- /**
- * 每個(gè)Invoker 對(duì)應(yīng)的虛擬節(jié)點(diǎn)數(shù)
- */
- private final int replicaNumber;
- /**
- * 定義哈希值
- */
- private final int identityHashCode;
- /**
- * 取值參數(shù)位置數(shù)組
- */
- private final int[] argumentIndex;
- 1: ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
- 2: this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
- 3: // 設(shè)置 identityHashCode
- 4: this.identityHashCode = identityHashCode;
- 5: URL url = invokers.get(0).getUrl();
- 6: // 初始化 replicaNumber
- 7: this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
- 8: // 初始化 argumentIndex
- 9: String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
- 10: argumentIndex = new int[index.length];
- 11: for (int i = 0; i < index.length; i++) {
- 12: argumentIndex[i] = Integer.parseInt(index[i]);
- 13: }
- 14: // 初始化 virtualInvokers
- 15: for (Invoker<T> invoker : invokers) {
- 16: String address = invoker.getUrl().getAddress();
- 17: // 每四個(gè)虛擬結(jié)點(diǎn)為一組,為什么這樣?下面會(huì)說(shuō)到
- 18: for (int i = 0; i < replicaNumber / 4; i++) {
- 19: // 這組虛擬結(jié)點(diǎn)得到惟一名稱
- 20: byte[] digest = md5(address + i);
- 21: // Md5是一個(gè)16字節(jié)長(zhǎng)度的數(shù)組,將16字節(jié)的數(shù)組每四個(gè)字節(jié)一組,分別對(duì)應(yīng)一個(gè)虛擬結(jié)點(diǎn),這就是為什么上面把虛擬結(jié)點(diǎn)四個(gè)劃分一組的原因
- 22: for (int h = 0; h < 4; h++) {
- 23: // 對(duì)于每四個(gè)字節(jié),組成一個(gè)long值數(shù)值,做為這個(gè)虛擬節(jié)點(diǎn)的在環(huán)中的惟一key
- 24: long m = hash(digest, h);
- 25: virtualInvokers.put(m, invoker);
- 26: }
- 27: }
- 28: }
- 29: }
- public Invoker<T> select(Invocation invocation) {
- // 基于方法參數(shù),獲得 KEY
- String key = toKey(invocation.getArguments());
- // 計(jì)算 MD5 值
- byte[] digest = md5(key);
- // 計(jì)算 KEY 值
- return selectForKey(hash(digest, 0));
- }
- private String toKey(Object[] args) {
- StringBuilder buf = new StringBuilder();
- for (int i : argumentIndex) {
- if (i >= 0 && i < args.length) {
- buf.append(args[i]);
- }
- }
- return buf.toString();
- }
- private Invoker<T> selectForKey(long hash) {
- // 得到大于當(dāng)前 key 的那個(gè)子 Map ,然后從中取出第一個(gè) key ,就是大于且離它最近的那個(gè) key
- Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
- // 不存在,則取 virtualInvokers 第一個(gè)
- if (entry == null) {
- entry = virtualInvokers.firstEntry();
- }
- // 存在,則返回
- return entry.getValue();
- }