成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

使用線程執行框架的一次經歷

開發 架構
一個線程從某個地方接收消息(數據),可以是其他主機或者消息隊列,然后轉由另外的一個線程池來執行具體處理消息的邏輯,并且消息的處理速度小于接收消息的速度。這種情景很常見,試想一下,你會怎么設計和實現?

場景

一個線程從某個地方接收消息(數據),可以是其他主機或者消息隊列,然后轉由另外的一個線程池來執行具體處理消息的邏輯,并且消息的處理速度小于接收消息的速度。這種情景很常見,試想一下,你會怎么設計和實現?

直觀想法

很顯然采用JUC的線程框架,可以迅速寫出代碼。

消息接收者:

 

  1. public class Receiver { 
  2.     private static volatile boolean inited = false
  3.     private static volatile boolean shutdown = false
  4.     private static volatile int cnt = 0
  5.  
  6.     private MessageHandler messageHandler; 
  7.  
  8.     public void start(){ 
  9.         Executors.newSingleThreadExecutor().execute(new Runnable() { 
  10.             @Override 
  11.             public void run() { 
  12.                 while(!shutdown){ 
  13.                     init(); 
  14.                     recv(); 
  15.                 } 
  16.             } 
  17.         }); 
  18.     } 
  19.  
  20.     /** 
  21.      * 模擬消息接收 
  22.      */ 
  23.     public void recv(){ 
  24.             Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){ if(!inited){ messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) { new Receiver().start(); 
  25.     } 

消息處理:

 

  1. public class MessageHandler { 
  2.  
  3.     private static final int THREAD_POOL_SIZE = 4
  4.  
  5.     private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE); 
  6.  
  7.     public void handle(Message msg) { 
  8.         try { 
  9.             service.execute(new Runnable() { 
  10.                 @Override 
  11.                 public void run() { 
  12.                     parseMsg(msg); 
  13.                 } 
  14.             }); 
  15.         } catch (Throwable e) { 
  16.             System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message) { while (true) { try { System.out.println("解析消息:" + message); Thread.sleep(5000); System.out.println("============================"); } catch (InterruptedException e) { 
  17.                 e.printStackTrace(); 
  18.             } 
  19.  
  20.         } 
  21.     } 

效果:這種方案導致的現象是接收到的消息會迅速堆積,我們從消息隊列(或者其他地方)取出了大量消息,但是處理線程的速度又跟不上,所以導致的問題是大量的Task會堆積在線程池底層維護的一個阻塞隊列中,這會極大的耗費存儲空間,影響系統的性能。

分析:當execute()一個任務的時候,如果有空閑的worker線程,那么投入運行,否則看設置的***線程個數,沒有達到線程個數限制就創建新線程,接新任務,否則就把任務緩沖到一個阻塞隊列中,問題就是這個隊列,默認的大小是沒有限制的,所以就會大量的堆積任務,必然耗費heap空間。

 

  1. public static ExecutorService newFixedThreadPool(int nThreads) { 
  2.         return new ThreadPoolExecutor(nThreads, nThreads, 
  3.                                       0L, TimeUnit.MILLISECONDS, 
  4.                                       new LinkedBlockingQueue<Runnable>()); 
  5.     } 
  6.  
  7. public LinkedBlockingQueue() { 
  8.         this(Integer.MAX_VALUE); // capacity 
  9.     } 

計數限制

面對上述問題,想到了要限制消息接收的速度,自然就想到了各種線程同步的原語,不過在這里最簡單的就是使用一個Volatile的計數器。

消息接收者:

 

  1. public class Receiver { 
  2.     private static volatile boolean inited = false
  3.     private static volatile boolean shutdown = false
  4.     private static volatile int cnt = 0
  5.     private MessageHandler messageHandler; 
  6.     public void start(){ 
  7.         Executors.newSingleThreadExecutor().execute(new Runnable() { 
  8.             @Override 
  9.             public void run() { 
  10.                 while(!shutdown){ 
  11.                     init(); 
  12.                     recv(); 
  13.                 } 
  14.             } 
  15.         }); 
  16.     } 
  17.  
  18.     /** 
  19.      * 模擬消息接收 
  20.      */ 
  21.     public void recv(){ 
  22.             Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){ if(!inited){ messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) { new Receiver().start(); 
  23.     } 

消息處理:

 

  1. public class MessageHandler { 
  2.     private static final int THREAD_POOL_SIZE = 1
  3.     private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE); 
  4.  
  5.     public void handle(Message msg){ 
  6.         try { 
  7.             service.execute(new Runnable() { 
  8.  
  9.                 @Override 
  10.                 public void run() { 
  11.                     parseMsg(msg); 
  12.                 } 
  13.             }); 
  14.         } catch (Throwable e) { 
  15.             System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message){ try { Thread.sleep(10000); System.out.println("解析消息:" + message); } catch (InterruptedException e) { e.printStackTrace(); }finally { 
  16.             Receiver.limit --; 
  17.         } 
  18.  
  19.     } 

效果:通過控制消息的個數來阻塞消息的接收過程,就不會導致任務的堆積,系統的內存消耗會比較平緩,限制消息的個數本質就和下面限制任務隊列大小一樣。

使用同步隊列 SynchronousQueue

SynchronousQueue 雖名為隊列,但是其實不會緩沖任務的對象,只是作為對象傳遞的控制點,如果有空閑線程或者沒有達到***線程限制,就會交付給worker線程去執行,否則就會拒絕,我們需要自己實現對應的拒絕策略RejectedExecutionHandler,默認的是拋出異常RejectedExecutionException。

消息接收者同上。

消息處理:

 

  1. public class MessageHandler { 
  2.     private static final int THREAD_POOL_SIZE = 4
  3.  
  4.     ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, 
  5.             new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() { 
  6.         @Override 
  7.         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
  8.             System.out.println("自定義拒絕策略"); try { executor.getQueue().put(r); System.out.println("重新放任務回隊列"); } catch (InterruptedException e) { e.printStackTrace(); } } }); public void handle(Message msg) { try { System.out.println(service.getTaskCount()); System.out.println(service.getQueue().size()); System.out.println(service.getCompletedTaskCount()); service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message) { while (true) { try { System.out.println("線程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); Thread.sleep(1000); } catch (InterruptedException e) { 
  9.                 e.printStackTrace(); 
  10.             } 
  11.         } 
  12.     } 

效果:能夠控制消息的接收速度,但是我們需要在rejectedExecution中實現某種阻塞的操作,但是選擇在發生拒絕的時候把任務重新放回隊列,帶來的問題就是這個Task會發生饑餓現象。

[[178175]]

使用大小限制的阻塞隊列

使用LinkedBlockingQueue作為線程框架底層的任務緩沖區,并且設置大小限制,思想上和上述方案一樣,都是有一個阻塞的點,但是通過***的jvm monitor看到這里的CPU消耗更少,內存使用有所降低,并且波動小(具體原因有待探索)。

消息接收者同上。

消息處理:

  1. public class MessageHandler { 
  2.     private static final int THREAD_POOL_SIZE = 4
  3.     private static final int BLOCK_QUEUE_CAP = 500
  4.     ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, 
  5.             new LinkedBlockingQueue<Runnable>(BLOCK_QUEUE_CAP), new SimpleThreadFactory(), new RejectedExecutionHandler() { 
  6.         @Override 
  7.         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
  8.             System.out.println("自定義拒絕策略"); try { executor.getQueue().put(r); System.out.println("重新放任務回隊列"); } catch (InterruptedException e) { e.printStackTrace(); } } }); public void handle(Message msg) { try { service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息處理異常" + e); } } /** * 比較耗時的消息處理流程 */ public void parseMsg(Message message) { try { Thread.sleep(5000); System.out.println("線程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); } catch (InterruptedException e) { e.printStackTrace(); } } static class SimpleThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("Thread-" + System.currentTimeMillis()); return thread; 
  9.         } 
  10.     } 

總結

多線程是比較容易出問題的地方,特別當對方法不熟悉的時候

 

責任編輯:張燕妮 來源: Chown
相關推薦

2012-08-28 09:21:59

Ajax查錯經歷Web

2023-03-29 09:36:32

2025-03-17 10:01:07

2021-12-06 19:29:17

LRU內存算法

2013-01-17 10:31:13

JavaScriptWeb開發firebug

2021-04-13 18:17:48

Hbase集群配置

2011-04-13 09:21:30

死鎖SQL Server

2013-04-01 10:27:37

程序員失業

2021-01-22 05:35:19

Lvm模塊Multipath

2014-08-06 11:24:24

Elasticsear劫持掛馬

2012-07-12 14:35:31

面試經歷

2015-04-28 15:31:09

2018-09-14 10:48:45

Java內存泄漏

2022-06-10 11:06:23

服務下線

2020-11-23 07:13:13

Nodejs源碼

2017-11-09 09:06:29

流量暴增優化

2019-04-04 15:00:40

SQL索引數據庫

2020-07-15 08:11:05

Linuxc++程序

2022-07-13 08:31:18

React問題排查

2018-12-06 16:25:39

數據庫服務器線程池
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 精品国产乱码久久久久久1区2区 | 在线中文字幕亚洲 | 91精品免费 | 一区在线观看 | 草草影院ccyy | 日本免费黄色一级片 | 99久久久国产精品免费消防器 | 国产精品久久久久久吹潮日韩动画 | 亚洲女优在线播放 | 精品国产黄a∨片高清在线 成人区精品一区二区婷婷 日本一区二区视频 | 国产精品免费一区二区三区 | 国产精品欧美一区二区三区不卡 | 亚洲不卡在线观看 | 日韩精品在线播放 | 成人国产精品久久 | 午夜伊人 | 99久久精品国产麻豆演员表 | 久久精品二区 | 欧美成ee人免费视频 | 91原创视频在线观看 | 午夜一级做a爰片久久毛片 精品综合 | 在线精品亚洲欧美日韩国产 | 国产欧美在线观看 | 91久久国产 | 精品国产aⅴ | 日韩在线观看 | 蜜月va乱码一区二区三区 | 久久99久久 | 伊人精品国产 | 日韩影音 | 国产精品久久久久久久一区探花 | 精品欧美一区二区精品久久久 | 91精品国产一区二区在线观看 | 午夜免费| 精品视频一区二区三区在线观看 | av大全在线 | 精国产品一区二区三区四季综 | 亚洲精品亚洲人成人网 | 欧美区在线 | 久久久久久国产精品 | 亚洲久久一区 |