成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

請求合并的三種方式,大大提高接口性能!

開發 后端
文章介紹了 hystrix collapser、ConcurrentHashMultiset、自實現BatchCollapser 三種請求合并技術,并通過其具體實現對比各自適用的場景。

[[441351]]

 將相似或重復請求在上游系統中合并后發往下游系統,可以大大降低下游系統的負載,提升系統整體吞吐率。文章介紹了 hystrix collapser、ConcurrentHashMultiset、自實現BatchCollapser 三種請求合并技術,并通過其具體實現對比各自適用的場景。

前言

工作中,我們常見的請求模型都是”請求-應答”式,即一次請求中,服務給請求分配一個獨立的線程,一塊獨立的內存空間,所有的操作都是獨立的,包括資源和系統運算。我們也知道,在請求中處理一次系統 I/O 的消耗是非常大的,如果有非常多的請求都進行同一類 I/O 操作,那么是否可以將這些 I/O 操作都合并到一起,進行一次 I/O 操作,是否可以大大降低下游資源服務器的負擔呢?

最近我工作之余的大部分時間都花在這個問題的探究上了,對比了幾個現有類庫,為了解決一個小問題把 hystrix javanica 的代碼翻了一遍,也根據自己工作中遇到的業務需求實現了一個簡單的合并類,收獲還是挺大的。可能這個需求有點”偏門”,在網上搜索結果并不多,也沒有綜合一點的資料,索性自己總結分享一下,希望能幫到后來遇到這種問題的小伙伴。

Hystrix Collapser

hystrix

開源的請求合并類庫(知名的)好像也只有 Netflix 公司開源的 Hystrix 了, hystrix 專注于保持 WEB 服務器在高并發環境下的系統穩定,我們常用它的熔斷器(Circuit Breaker) 來實現服務的服務隔離和災時降級,有了它,可以使整個系統不至于被某一個接口的高并發洪流沖塌,即使接口掛了也可以將服務降級,返回一個人性化的響應。請求合并作為一個保障下游服務穩定的利器,在 hystrix 內實現也并不意外。

我們在使用 hystrix 時,常用它的 javanica 模塊,以注解的方式編寫 hystrix 代碼,使代碼更簡潔而且對業務代碼侵入更低。所以在項目中我們一般至少需要引用 hystrix-core 和 hystrix-javanica 兩個包。

另外,hystrix 的實現都是通過 AOP,我們要還要在項目 xml 里顯式配置 HystrixAspect 的 bean 來啟用它。 

  1. <aop:aspectj-autoproxy/>  
  2. <bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" /> 

collapser

hystrix collapser 是 hystrix 內的請求合并器,它有自定義 BatchMethod 和 注解兩種實現方式,自定義 BatchMethod 網上有各種教程,實現起來很復雜,需要手寫大量代碼,而注解方式只需要添加兩行注解即可,但配置方式我在官方文檔上也沒找見,中文方面本文應該是獨一份兒了。

其實現需要注意的是:

  •  我們在需要合并的方法上添加 @HystrixCollapser 注解,在定義好的合并方法上添加 @HystrixCommand 注解;
  •  single 方法只能傳入一個參數,多參數情況下需要自己包裝一個參數類,而 batch 方法需要 java.util.List<SingleParam>;
  •  single 方法返回 java.util.concurrent.Future<SingleReturn>, batch 方法返回 java.util.List<SingleReturn>,且要保證返回的結果數量和傳入的參數數量一致。

下面是一個簡單的示例: 

  1. public class HystrixCollapserSample {  
  2.     @HystrixCollapser(batchMethod = "batch" 
  3.     public Future<Boolean> single(String input) {  
  4.         return null; // single方法不會被執行到  
  5.     }  
  6.     public List<Boolean> batch(List<String> inputs) {  
  7.         return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());  
  8.     }  

源碼實現

為了解決 hystrix collapser 的配置問題看了下 hystrix javanica 的源碼,這里簡單總結一下 hystrix 請求合并器的具體實現,源碼的詳細解析在我的筆記:Hystrix collasper 源碼解析。

  •  在 spring-boot 內注冊切面類的 bean,里面包含 @HystrixCollapser 注解切面;
  •  在方法執行時檢測到方法被 HystrixCollapser 注解后,spring 調用 methodsAnnotatedWithHystrixCommand方法來執行 hystrix 代理;
  •  hystrix 獲取一個 collapser 實例(在當前 scope 內檢測不到即創建);
  •  hystrix 將當前請求的參數提交給 collapser, 由 collapser 存儲在一個 concurrentHashMap (RequestArgumentType -> CollapsedRequest)內,此方法會創建一個 Observable 對象,并返回一個 觀察此對象的 Future 給業務線程;
  •  collpser 在創建時會創建一個 timer 線程,定時消費存儲的請求,timer 會將多個請求構造成一個合并后的請求,調用 batch 執行后將結果順序映射到輸出參數,并通知 Future 任務已完成。

需要注意,由于需要等待 timer 執行真正的請求操作,collapser 會導致所有的請求的 cost 都會增加約 timerInterval/2 ms;

配置

hystrix collapser 的配置需要在 @HystrixCollapser 注解上使用,主要包括兩個部分,專有配置和 hystrixCommand 通用配置;

專有配置包括:

  •  collapserKey,這個可以不用配置,hystrix 會默認使用當前方法名;
  •  batchMethod,配置 batch 方法名,我們一般會將 single 方法和 batch 方法定義在同一個類內,直接填方法名即可;
  •  scope,最坑的配置項,也是逼我讀源碼的元兇,com.netflix.hystrix.HystrixCollapser.Scope 枚舉類,有 REQUEST, GLOBAL 兩種選項,在 scope 為 REQUEST 時,hystrix 會為每個請求都創建一個 collapser, 此時你會發現 batch 方法執行時,傳入的請求數總為1。而且 REQUEST 項還是默認項,不明白這樣請求合并還有什么意義;
  •  collapserProperties, 在此選項內我們可以配置 hystrixCommand 的通用配置;

通用配置包括:

  •  maxRequestsInBatch, 構造批量請求時,使用的單個請求的最大數量;
  •  timerDelayInMilliseconds, 此選項配置 collapser 的 timer 線程多久會合并一次請求;
  •  requestCache.enabled, 配置提交請求時是否緩存;

一個完整的配置如下: 

  1. @HystrixCollapser(  
  2.             batchMethod = "batch" 
  3.             collapserKey = "single" 
  4.             scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,  
  5.             collapserProperties = {  
  6.                     @HystrixProperty(name = "maxRequestsInBatch"value = "100"),  
  7.                     @HystrixProperty(name = "timerDelayInMilliseconds"value = "1000"),  
  8.                     @HystrixProperty(name = "requestCache.enabled"value = "true" 
  9.             }) 

BatchCollapser

設計

由于業務需求,我們并不太關心被合并請求的返回值,而且覺得 hystrix 保持那么多的 Future 并沒有必要,于是自己實現了一個簡單的請求合并器,業務線程簡單地將請求放到一個容器里,請求數累積到一定量或延遲了一定的時間,就取出容器內的數據統一發送給下游系統。

設計思想跟 hystrix 類似,合并器有一個字段作為存儲請求的容器,且設置一個 timer 線程定時消費容器內的請求,業務線程將請求參數提交到合并 器的容器內。不同之處在于,業務線程將請求提交給容器后立即同步返回成功,不必管請求的消費結果,這樣便實現了時間維度上的合并觸發。

另外,我還添加了另外一個維度的觸發條件,每次將請求參數添加到容器后都會檢驗一下容器內請求的數量,如果數量達到一定的閾值,將在業務線程內合并執行一次。

由于有兩個維度會觸發合并,就不可避免會遇到線程安全問題。為了保證容器內的請求不會被多個線程重復消費或都漏掉,我需要一個容器能滿足以下條件:

  •  是一種 Collection,類似于 ArrayList 或 Queue,可以存重復元素且有順序;
  •  在多線程環境中能安全地將里面的數據全取出來進行消費,而不用自己實現鎖。

java.util.concurrent 包內的 LinkedBlockingDeque 剛好符合要求,首先它實現了 BlockingDeque 接口,多線程環境下的存取操作是安全的;此外,它還提供 drainTo(Collection<? super E> c, int maxElements)方法,可以將容器內 maxElements 個元素安全地取出來,放到 Collection c 中。

實現

以下是具體的代碼實現: 

  1. public class BatchCollapser<E> implements InitializingBean {  
  2.      private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);  
  3.      private static volatile Map<Class, BatchCollapser> instance = Maps.newConcurrentMap();  
  4.      private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);  
  5.      private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>();  
  6.      private Handler<List<E>, Boolean> cleaner;  
  7.      private long interval;  
  8.      private int threshHold;  
  9.      private BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {  
  10.          this.cleaner = cleaner;  
  11.          this.threshHold = threshHold;  
  12.          this.interval = interval;  
  13.      }  
  14.      @Override  
  15.      public void afterPropertiesSet() throws Exception {  
  16.          SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {  
  17.              try {  
  18.                  this.clean();  
  19.              } catch (Exception e) {  
  20.                  logger.error("clean container exception", e);  
  21.              }  
  22.          }, 0, interval, TimeUnit.MILLISECONDS);  
  23.      }  
  24.      public void submit(E event) {  
  25.          batchContainer.add(event);  
  26.          if (batchContainer.size() >= threshHold) {  
  27.              clean();  
  28.          }  
  29.      }  
  30.      private void clean() {  
  31.          List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold);  
  32.          batchContainer.drainTo(transferList, 100);  
  33.          if (CollectionUtils.isEmpty(transferList)) {  
  34.              return;  
  35.          }  
  36.          try {  
  37.              cleaner.handle(transferList);  
  38.          } catch (Exception e) {  
  39.              logger.error("batch execute error, transferList:{}", transferList, e);  
  40.          }  
  41.      }  
  42.      public static <E> BatchCollapser getInstance(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {  
  43.          Class jobClass = cleaner.getClass();  
  44.          if (instance.get(jobClass) == null) {  
  45.              synchronized (BatchCollapser.class) {  
  46.                  if (instance.get(jobClass) == null) {  
  47.                      instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval));  
  48.                  }  
  49.              }  
  50.          }  
  51.          return instance.get(jobClass); 
  52.      }  
  53.  } 

以下代碼內需要注意的點:

  •  由于合并器的全局性需求,需要將合并器實現為一個單例,另外為了提升它的通用性,內部使用使用 concurrentHashMap 和 double check 實現了一個簡單的單例工廠。
  •  為了區分不同用途的合并器,工廠需要傳入一個實現了 Handler 的實例,通過實例的 class 來對請求進行分組存儲。
  •  由于 java.util.Timer 的阻塞特性,一個 Timer 線程在阻塞時不會啟動另一個同樣的 Timer 線程,所以使用 ScheduledExecutorService 定時啟動 Timer 線程。

ConcurrentHashMultiset

設計

上面介紹的請求合并都是將多個請求一次發送,下游服務器處理時本質上還是多個請求,最好的請求合并是在內存中進行,將請求結果簡單合并成一個發送給下游服務器。如我們經常會遇到的需求:元素分值累加或數據統計,就可以先在內存中將某一項的分值或數據累加起來,定時請求數據庫保存。

Guava 內就提供了這么一種數據結構:ConcurrentHashMultiset,它不同于普通的 set 結構存儲相同元素時直接覆蓋原有元素,而是給每個元素保持一個計數 count, 插入重復時元素的 count 值加1。而且它在添加和刪除時并不加鎖也能保證線程安全,具體實現是通過一個 while(true) 循環嘗試操作,直到操作夠所需要的數量。

ConcurrentHashMultiset 這種排重計數的特性,非常適合數據統計這種元素在短時間內重復率很高的場景,經過排重后的數量計算,可以大大降低下游服務器的壓力,即使重復率不高,能用少量的內存空間換取系統可用性的提高,也是很劃算的。

實現

使用 ConcurrentHashMultiset 進行請求合并與使用普通容器在整體結構上并無太大差異,具體類似于: 

  1. if (ConcurrentHashMultiset.isEmpty()) {  
  2.     return;  
  3.  
  4. List<Request> transferList = Lists.newArrayList();  
  5. ConcurrentHashMultiset.elementSet().forEach(request -> {  
  6.     int count = ConcurrentHashMultiset.count(request);  
  7.     if (count <= 0) {  
  8.         return;  
  9.     }  
  10.     transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));  
  11.     ConcurrentHashMultiset.remove(request, count);  
  12. }); 

小結

最后總結一下各個技術適用的場景:

  •  hystrix collapser: 需要每個請求的結果,并且不在意每個請求的 cost 會增加;
  •  BatchCollapser: 不在意請求的結果,需要請求合并能在時間和數量兩個維度上觸發;
  •  ConcurrentHashMultiset:請求重復率很高的統計類場景;

另外,如果選擇自己來實現的話,完全可以將 BatchCollapser 和 ConcurrentHashMultiset 結合一下,在BatchCollapser 里使用 ConcurrentHashMultiset 作為容器,這樣就可以結合兩者的優勢了。 

 

責任編輯:龐桂玉 來源: Java知音
相關推薦

2022-02-28 10:02:54

Linux技巧命令

2024-03-17 20:01:51

2024-05-20 09:19:45

請求合并容器

2022-08-02 16:38:53

惡意軟件密碼

2022-09-22 08:42:14

接口請求合并技巧

2023-03-27 08:25:28

技巧技術吞吐率

2013-03-29 10:23:02

數據庫癌癥治療

2021-09-09 08:23:11

Vue 技巧 開發工具

2023-11-20 23:02:36

Spring系統

2011-06-03 11:53:06

Spring接口

2024-07-08 09:03:31

2016-11-03 09:34:13

跳槽大數據求職

2011-07-22 17:22:20

Spring

2012-07-17 09:16:16

SpringSSH

2022-03-23 10:25:32

區塊鏈安全技術

2022-01-17 08:19:51

Javascript 接口前端

2022-01-20 08:38:02

Java接口Lambda

2021-11-05 21:33:28

Redis數據高并發

2019-11-20 18:52:24

物聯網智能照明智能恒溫器

2014-12-31 17:42:47

LBSAndroid地圖
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 99久久精品国产毛片 | 亚洲欧美激情四射 | 国产一级片91 | 超碰天天 | www.国产精| 久草在线 | 一区二区三区中文字幕 | 亚洲高清视频在线观看 | 超碰免费观看 | 成人日韩精品 | 亚洲免费一区二区 | 欧美日韩在线电影 | 91精品国产色综合久久 | 一级片免费观看 | 欧美9999 | 国产69久久精品成人看动漫 | 国产一区久久 | 国产69精品久久久久777 | 99视频入口 | 亚洲在线视频 | 欧美一区二不卡视频 | 日韩视频在线播放 | 欧美精品一区二区三区在线播放 | 国产农村妇女毛片精品久久麻豆 | 久久久久中文字幕 | 久久国产三级 | 天堂一区二区三区 | 久久综合一区 | 国产一区二区三区日韩 | 中文字幕成人av | 伊人色综合久久久天天蜜桃 | av在线成人| 日本亚洲一区 | 欧美日韩精品专区 | 国产一区二区视频在线观看 | 99精品久久久 | 日本又色又爽又黄的大片 | 日韩三级在线观看 | 成人动慢 | 乱一性一乱一交一视频a∨ 色爱av | 欧美日韩久久 |