成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

聊聊java高并發系統之異步非阻塞

開發 開發工具
在Java中,如使用Tomcat,一個請求會分配一個線程進行請求處理,該線程負責獲取數據、拼裝數據或模板然后返回給前端;在同步調用獲取數據接口的情況下(等待依賴系統返回數據),整個線程是一直被占用并阻塞的。

[[177170]]

在做電商系統時,流量入口如首頁、活動頁、商品詳情頁等系統承載了網站的大部分流量,而這些系統的主要職責包括聚合數據拼裝模板、熱點統計、緩存、下游功能降級開關、托底數據等等。其中聚合數據需要調用其它多個系統服務獲取數據、拼裝數據/模板然后返回給前端,聚合數據來源主要有依賴系統/服務、緩存、數據庫等;而系統之間的調用可以通過如http接口調用(如HttpClient)、SOA服務調用(如dubbo、thrift)等等。

在Java中,如使用Tomcat,一個請求會分配一個線程進行請求處理,該線程負責獲取數據、拼裝數據或模板然后返回給前端;在同步調用獲取數據接口的情況下(等待依賴系統返回數據),整個線程是一直被占用并阻塞的。如果有大量的這種請求,每個請求占用一個線程,但線程一直處于阻塞,降低了系統的吞吐量,這將導致應用的吞吐量下降;我們希望在調用依賴的服務響應比較慢,此時應該讓出線程和CPU來處理下一個請求,當依賴的服務返回了再分配相應的線程來繼續處理。而這應該有更好的解決方案:異步/協程。而Java是不支持協程的(雖然有些Java框架說支持,但還是高層API的封裝),因此在Java中我們還可以使用異步來提升吞吐量。目前java一些開源框架(HttpClient\HttpAsyncClient、dubbo、thrift等等)大部分都支持。

幾種調用方式

同步阻塞調用

即串行調用,響應時間為所有服務的響應時間總和;

半異步(異步Future)

線程池,異步Future,使用場景:并發請求多服務,總耗時為最長響應時間;提升總響應時間,但是阻塞主請求線程,高并發時依然會造成線程數過多,CPU上下文切換;

全異步(Callback)

Callback方式調用,使用場景:不考慮回調時間且只能對結果做簡單處理,如果依賴服務是兩個或兩個以上服務,則不能合并兩個服務的處理結果;不阻塞主請求線程,但使用場景有限。

異步回調鏈式編排

異步回調鏈式編排(JDK8 CompletableFuture),使用場景:其實不是異步調用方式,只是對依賴多服務的Callback調用結果處理做結果編排,來彌補Callback的不足,從而實現全異步鏈式調用。

接下來看看如何設計利用全異步Callback調用和異步回調鏈式編排處理結果來實現全異步系統設計。

同步阻塞調用

  1. public class Test { 
  2.    public static void main(String[] args) throws Exception { 
  3.        RpcService rpcService = new RpcService(); 
  4.        HttpService httpService = new HttpService(); 
  5.        //耗時10ms 
  6.        Map<String, String> result1 = rpcService.getRpcResult(); 
  7.        //耗時20ms 
  8.        Integer result2 = httpService.getHttpResult(); 
  9.        //總耗時30ms 
  10.     } 
  11.    static class RpcService { 
  12.        Map<String, String> getRpcResult() throws Exception { 
  13.            //調用遠程方法(遠程方法耗時約10ms,可以使用Thread.sleep模擬) 
  14.        } 
  15.     } 
  16.    static class HttpService { 
  17.        Integer getHttpResult() throws Exception { 
  18.            //調用遠程方法(遠程方法耗時約20ms,可以使用Thread.sleep模擬) 
  19.            Thread.sleep(20); 
  20.            return 0; 
  21.        } 
  22.     } 

半異步(異步Future)

  1. public class Test { 
  2.    final static ExecutorService executor = Executors.newFixedThreadPool(2); 
  3.    public static void main(String[] args) { 
  4.        RpcService rpcService = new RpcService(); 
  5.        HttpService httpService = new HttpService(); 
  6.        Future<Map<String, String>> future1 = null
  7.        Future<Integer> future2 = null
  8.        try { 
  9.            future1 = executor.submit(() -> rpcService.getRpcResult()); 
  10.            future2 = executor.submit(() -> httpService.getHttpResult()); 
  11.            //耗時10ms 
  12.            Map<String, String> result1 = future1.get(300, TimeUnit.MILLISECONDS); 
  13.            //耗時20ms 
  14.            Integer result2 = future2.get(300, TimeUnit.MILLISECONDS); 
  15.            //總耗時20ms 
  16.        } catch (Exception e) { 
  17.            if (future1 != null) { 
  18.                 future1.cancel(true); 
  19.            } 
  20.            if (future2 != null) { 
  21.                 future2.cancel(true); 
  22.            } 
  23.            throw new RuntimeException(e); 
  24.        } 
  25.     } 
  26.    static class RpcService { 
  27.        Map<String, String> getRpcResult() throws Exception { 
  28.            //調用遠程方法(遠程方法耗時約10ms,可以使用Thread.sleep模擬) 
  29.        } 
  30.     } 
  31.    static class HttpService { 
  32.        Integer getHttpResult() throws Exception { 
  33.            //調用遠程方法(遠程方法耗時約20ms,可以使用Thread.sleep模擬) 
  34.        } 
  35.     } 
  36.   

全異步(Callback)

  1. public class AsyncTest { 
  2. public staticHttpAsyncClient httpAsyncClient; 
  3.    public static CompletableFuture<String> getHttpData(String url) { 
  4.        CompletableFuture asyncFuture = new CompletableFuture(); 
  5.        HttpPost post = new HttpPost(url); 
  6.        HttpAsyncRequestProducer producer = HttpAsyncMethods.create(post); 
  7.        AsyncCharConsumer<HttpResponse> consumer = newAsyncCharConsumer<HttpResponse>() { 
  8.             HttpResponse response; 
  9.            protected HttpResponse buildResult(final HttpContext context) { 
  10.                 return response; 
  11.            } 
  12. …... 
  13.        }; 
  14.        FutureCallback callback = new FutureCallback<HttpResponse>() { 
  15.            public void completed(HttpResponse response) { 
  16.                asyncFuture.complete(EntityUtils.toString(response.getEntity())); 
  17.            } 
  18. …... 
  19.        }; 
  20.        httpAsyncClient.execute(producer, consumer, callback); 
  21.        return asyncFuture; 
  22.     } 
  23.   
  24.    public static void main(String[] args) throws Exception { 
  25.        AsyncTest.getHttpData("http://www.jd.com"); 
  26.        Thread.sleep(1000000); 
  27.     } 

本示例使用HttpAsyncClient演示。

異步回調鏈式編排

CompletableFuture提供了50多個API,可以滿足所需的各種場景的異步處理的編排,在此列舉三個場景:

場景1:三個服務并發異步調用,返回CompletableFuture,不阻塞主線程;

三個服務并發異步調用,返回CompletableFuture

方法test1:

  1. public static void test1() throws Exception { 
  2.       HelloClientDemoTest service = new HelloClientDemoTest(); 
  3.       /** 
  4.        * 場景1 兩個以上服務并發異步調用,返回CompletableFuture,不阻塞主線程 
  5.        * 并且兩個服務也是異步非阻塞調用 
  6.        */ 
  7.       CompletableFuture future1 = service.getHttpData("http://www.jd.com"); 
  8.       CompletableFuture future2 = service.getHttpData("http://www.jd.com"); 
  9.       CompletableFuture future3 =service.getHttpData("http://www.jd.com"); 
  10.       List<CompletableFuture> futureList = Lists.newArrayList(future1,future2, future3); 
  11.       CompletableFuture<Void> allDoneFuture =CompletableFuture.allOf(futureList.toArray(newCompletableFuture[futureList.size()])); 
  12.       CompletableFuture<String> future4 =allDoneFuture.thenApply(v -> { 
  13.            List<Object> result =futureList.stream().map(CompletableFuture::join) 
  14.                   .collect(Collectors.toList()); 
  15.            //注意順序 
  16.            String result1 = (String)result.get(0); 
  17.            String result2 = (String)result.get(1); 
  18.            String result3 = (String)result.get(2); 
  19.            //處理業務.... 
  20.            return result1 + result2 + result3; 
  21.        }).exceptionally(e -> { 
  22.            //e.printStackTrace(); 
  23.            return ""; 
  24.        }); 
  25.       //返回 
  26.    } 

場景2、兩個服務并發異步調用,返回CompletableFuture,不阻塞主線程;

兩個服務并發異步調用,返回CompletableFuture

方法test2:

  1. public void test2() throws Exception { 
  2.       HelloClientDemoTest service = new HelloClientDemoTest(); 
  3.       /** 
  4.        * 場景2 兩個接口并發異步調用,返回CompletableFuture,不阻塞主線程 
  5.        * 并且兩個服務也是異步非阻塞調用 
  6.        */ 
  7.       CompletableFuture future1 = service.getHttpData("http://www.jd.com"); 
  8.       CompletableFuture future2 =service.getHttpData("http://www.jd.com"); 
  9.       CompletableFuture future3 =future1.thenCombine(future2, (f1, f2) -> { 
  10.            //處理業務.... 
  11.            return f1 + "," + f2; 
  12.        }).exceptionally(e -> { 
  13.            return ""; 
  14.        }); 
  15.       //返回 
  16.    } 

場景3、兩個服務,并發異步調用兩個服務,并且一個服務的結果返回后再次調用另一服務,然后將三個結果后并處理,返回CompletableFuture,整個處理過程中不阻塞任何線程;

方法test3:

  1. publicvoid test3() throws Exception { 
  2.        HelloClientDemoTest service = new HelloClientDemoTest(); 
  3.        /** 
  4.         * 場景3 兩請求依賴調用,然后與另一服務結果組合處理,返回CompletableFuture,不阻塞主線程 
  5.         * 并且兩個服務也是異步非阻塞調用 
  6.         */ 
  7.         CompletableFuture future1 = service.getHttpData("http://www.jd.com"); 
  8.         CompletableFuture future2 = service.getHttpData("http://www.jd.com"); 
  9.         CompletableFuture<String> future3future1.thenApply((param) -> { 
  10.             CompletableFuture future4 =service.getHttpData("http://www.jd.com"); 
  11.             return future4; 
  12.         }); 
  13.         CompletableFuture future5 =future2.thenCombine(future3, (f2, f3) -> { 
  14.             //....處理業務 
  15.             return f2 + "," + f3; 
  16.         }).exceptionally(e -> { 
  17.             return ""; 
  18.         }); 
  19.         //返回future5 
  20.     } 

全異步Web系統設計

主要技術:servlet3,JDK8 CompletableFuture,支持異步Callback調用的RPC框架。

先看一下處理流程圖:

全異步Web系統設計主要技術處理流程圖

servlet3:Servlet 接收到請求之后,可能首先需要對請求攜帶的數據進行一些預處理;接著,Servlet 線程將請求轉交給一個異步線程來執行業務處理,線程本身返回至容器。針對業務處理較耗時的情況,這將大大減少服務器資源的占用,并且提高并發處理速度。servlet3可參考商品詳情頁系統的Servlet3異步化實踐,結合其中講解的servlet3整合:

  1. public void submitFuture(finalHttpServletRequest req, final Callable<CompletableFuture> task) throwsException{ 
  2.        final String uri = req.getRequestURI(); 
  3.        final Map<String, String[]> params = req.getParameterMap(); 
  4.        final AsyncContext asyncContext = req.startAsync(); 
  5.        asyncContext.getRequest().setAttribute("uri", uri); 
  6.        asyncContext.getRequest().setAttribute("params", params); 
  7.        asyncContext.setTimeout(asyncTimeoutInSeconds * 1000); 
  8.        if(asyncListener != null) { 
  9.            asyncContext.addListener(asyncListener); 
  10.        } 
  11.        CompletableFuture future = task.call(); 
  12.        future.thenAccept(result -> { 
  13.            HttpServletResponse resp = (HttpServletResponse)asyncContext.getResponse(); 
  14.            try { 
  15.                 if(result instanceof String) { 
  16.                     byte[] bytes = new byte[0]; 
  17.                     if (StringUtils.isBlank(result)){ 
  18.                        resp.setContentType("text/html;charset=gbk"); 
  19.                        resp.setContentLength(0); 
  20.                     } else { 
  21.                         bytes =result.getBytes("GBK"); 
  22.                     } 
  23.                    //resp.setBufferSize(bytes.length); 
  24.                    resp.setContentType("text/html;charset=gbk"); 
  25.                    if(StringUtils.isNotBlank(localIp)) { 
  26.                        resp.setHeader("t.ser", localIp); 
  27.                     } 
  28.                    resp.setContentLength(bytes.length); 
  29.                    resp.getOutputStream().write(bytes); 
  30.                 } else { 
  31.                     write(resp,JSONUtils.toJSON(result)); 
  32.                 } 
  33.            } catch (Throwable e) { 
  34.                resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); //程序內部錯誤 
  35.                 try { 
  36.                     LOG.error("get infoerror, uri : {},  params : {}", uri,JSONUtils.toJSON(params), e); 
  37.                 } catch (Exception ex) { 
  38.                 } 
  39.            } finally { 
  40.                 asyncContext.complete(); 
  41.            } 
  42.        }).exceptionally(e -> { 
  43.            asyncContext.complete(); 
  44.            return null; 
  45.        }); 

另外還有Java中協程庫Quasar,可參考《Java的纖程庫 - Quasar》,目前沒有在應用中使用并在測試FiberHttpServlet的時候遇到很多坑,日后把Quasar自如運用后形成日記,希望能結實更多的朋友一起研究,踩坑。

作者:孫偉,目前負責京東商品詳情頁統一服務系統,寫過java,寫過ngx_lua,還寫過storm等,喜歡學習研究新事物。

【本文來自51CTO專欄作者張開濤的微信公眾號(開濤的博客),公眾號id: kaitao-1234567】

 戳這里,看該作者更多好文

責任編輯:趙寧寧 來源: 開濤的博客
相關推薦

2016-11-25 00:45:37

隊列數據

2016-11-28 08:40:17

系統降級服務

2016-11-28 09:00:10

瀏覽器瀏覽器緩存服務端

2018-03-28 08:52:53

阻塞非阻塞I

2016-11-28 08:58:43

系統限流

2016-11-28 08:58:43

系統限流算法

2023-12-06 07:28:47

阻塞IO異步IO

2024-09-23 17:15:28

Python并發并行

2021-01-10 11:21:33

JavaScript語言開發

2021-02-27 16:08:17

Java異步非阻塞

2019-07-23 11:01:57

Python同步異步

2012-10-10 10:00:27

同步異步開發Java

2016-11-25 00:38:45

隔離負載均衡系統

2012-02-22 21:15:41

unixIO阻塞

2025-02-17 13:23:34

Python同步阻塞MySQL

2022-06-22 08:16:29

異步非阻塞框架

2022-06-12 06:45:26

高并發防重

2024-10-14 12:34:08

2023-09-25 08:06:44

工具非阻塞式接口

2015-07-03 10:12:04

編程同步非阻塞
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美黑人体内she精在线观看 | 中文精品一区二区 | 中文字幕精品一区二区三区精品 | 精品国产亚洲一区二区三区大结局 | 一区二区三区四区不卡视频 | 国产成人网 | 国产超碰人人爽人人做人人爱 | 成人不卡视频 | 欧美日韩电影免费观看 | 久久久123 | 久久久精品一区二区三区 | 色婷婷综合久久久中字幕精品久久 | 国产馆 | 九九久久久 | av手机在线免费观看 | 精品国产成人 | 国产精品99久久久久久久久久久久 | 区一区二在线观看 | 国产亚洲精品久久久久动 | 久久久精品国产 | 国产一二三区电影 | 久久精品—区二区三区 | 黄色成人在线观看 | 亚洲一区在线播放 | 中文字幕亚洲区一区二 | 国产成人精品综合 | 国产99久久 | 久久69精品久久久久久久电影好 | 国产高清在线精品一区二区三区 | 国产一区二区成人 | 日韩精品一区二区三区中文在线 | 日韩中文字幕在线观看视频 | 九九亚洲| 久久久婷婷 | 久综合 | 在线中文视频 | 欧美日韩视频在线播放 | 国产农村一级国产农村 | 中文字幕一区二区三区四区五区 | 精品乱码一区二区 | 亚洲www|