讓API并行調用變得如絲般順滑的絕招
本文轉載自微信公眾號「猿天地」,作者尹吉歡。轉載本文請聯系猿天地公眾號。
當數據量較大的時候,都會通過分庫分表來拆分,分擔讀寫的壓力。分庫分表后比較麻煩的就是查詢的問題,如果不是直接根據分片鍵去查詢的話,需要對多個表進行查詢。
在一些復雜的業務場景下,比如訂單搜索,除了訂單號,用戶,商家 這些常用的搜索條件,可能還有時間,商品等等。
目前常見的做法將數據同步到 ES 這類搜索框架中進行查詢,然后通過搜出來的結果,一般是主鍵 ID, 再去具體的數據表中查詢完整的數據,組裝返回給調用方。
比如下面這段代碼,首先查詢出文章信息,然后根據文章中的用戶 ID 去查詢用戶的昵稱。
- List<ArticleBO> articleBos = articleDoPage.getRecords().stream().map(r -> {
- String nickname = userManager.getNickname(r.getUserId());
- return articleBoConvert.convertPlus(r, nickname);
- }).collect(Collectors.toList());
如果文章有 10 條數據,那么就需要調用 10 次用戶服務提供的接口,而且是同步調用操作。
當然我們也可以用并行流來實現并發調用,代碼如下:
- List<ArticleBO> articleBos = articleDoPage.getRecords().parallelStream().map(r -> {
- String nickname = userManager.getNickname(r.getUserId());
- return articleBoConvert.convertPlus(r, nickname);
- }).collect(Collectors.toList());
并行流的優點很明顯,代碼不用做特別大的改動。需要注意如果用并行流,最好單獨定義一個 ForkJoinPool。
除了用并行流,還可以使用批量查詢的方式來提高性能,降低 RPC 的調用次數,代碼如下:
- List<Long> userIds = articleDoPage.getRecords().stream().map(article -> article.getUserId()).collect(Collectors.toList());
- Map<Long, String> nickNameMap = userManager.queryByIds(userIds).stream().collect(Collectors.toMap(UserResponse::getId, UserResponse::getNickname));
- List<ArticleBO> articleBos = articleDoPage.getRecords().stream().map(r -> {
- String nickname = nickNameMap.containsKey(r.getUserId()) ? nickNameMap.get(r.getUserId()) : CommonConstant.DEFAULT_EMPTY_STR;
- return articleBoConvert.convertPlus(r, nickname);
- }).collect(Collectors.toList());
但批量查詢還是同步模式,下面介紹如果使用 CompletableFuture 來實現異步并發調用,直接用原生的 CompletableFuture 也可以,但是編排能力沒有那么強,這里我們選擇一款基于 CompletableFuture 封裝的并行編排框來實現。
稍微做了下封裝,提供了更方便使用的工具類來實現并發調用多個接口的邏輯。
第一種方式,適用于比如從 ES 查出了一批 ID, 然后根據 ID 去數據庫中或者調用 RPC 查詢真實數據,最后得到一個 Map,可以根據 Key 獲取對應的數據。
內部是多線程并發調用,會等到結果全部返回。
- public Object aggregationApi() {
- long s = System.currentTimeMillis();
- List<String> ids = new ArrayList<>();
- ids.add("1");
- ids.add("2");
- ids.add("3");
- Map<String, UserResponse> callResult = AsyncTemplate.call(ids, id -> {
- return userService.getUser(id);
- }, u -> u.getId(), COMMON_POOL);
- long e = System.currentTimeMillis();
- System.out.println("耗時:" + (e-s) + "ms");
- return "";
- }
另一個場景就是 API 聚合的場景,需要并行調用多個接口,將結果進行組裝。
- List<AsyncCall> params = new ArrayList<>();
- AsyncCall<Integer, Integer> goodsQuery = new AsyncCall("goodsQuery", 1);
- params.add(goodsQuery);
- AsyncCall<String, OrderResponse> orderQuery = new AsyncCall("orderQuery", "100");
- params.add(orderQuery);
- UserQuery q = new UserQuery();
- q.setAge(18);
- q.setName("yinjihuan");
- AsyncCall<UserQuery, UserResponse> userQuery = new AsyncCall("userQuery", q);
- params.add(userQuery);
- AsyncTemplate.call(params, p -> {
- if (p.getTaskId().equals("goodsQuery")) {
- AsyncCall<Integer, Integer> query = p;
- return goodsService.getGoodsName(query.getParam());
- }
- if (p.getTaskId().equals("orderQuery")) {
- AsyncCall<String, OrderResponse> query = p;
- return orderService.getOrder(query.getParam());
- }
- if (p.getTaskId().equals("userQuery")) {
- AsyncCall<UserQuery, UserResponse> query = p;
- return userService.getUser(query.getParam());
- }
- return null;
- });
AsyncCall 中定義參數和響應的類型,響應結果會在執行完后會自動設置到 AsyncCall 中。在 call 方法中需要根據 taskId 去做對應的處理邏輯,不同的 taskId 調用的接口不一樣。
源碼參考:https://github.com/yinjihuan/kitty
關于作者:尹吉歡,簡單的技術愛好者,《Spring Cloud 微服務-全棧技術與案例解析》, 《Spring Cloud 微服務 入門 實戰與進階》作者, 公眾號 猿天地 發起人。
原文鏈接:http://cxytiandi.com/blog/user/1