成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Concurrent In Java

開發 后端
本篇文章一系列只是對JUC各個部分做了說明和介紹,希望對大家有所啟發。

這一系列只是對JUC各個部分做了說明和介紹,沒人深入原理!

concurrent并發包,讓你易于編寫并發程序。并發下我們經常需要使用的基礎設施和解決的問題有ThreadPool、Lock、管道、集合點、線程之間等待和喚醒、線程間數據傳輸、共享資源訪問控制、并發線程之間的相互等待,等待。

concurrent提供的工具能夠解決絕大部分的場景,還能提高程序吞吐量。

現代的服務器多采用多核CPU,從而不同線程之間有可能真正地在同時運行而不是cpu時間切片。在處理大計算量的程序上要盡可能利用CPU多核特性,提高系統吞吐量。

并發編程主要面臨三個問題:

1.如何讓多個線程同時為同一個任務工作(并發編程設計)

2.多個線程之間對共享資源的爭用。

3.多個線程之間如何相互合作、傳遞數據。

1. concurrent包提供的集合

concurrent包直接提供了標準集合的一些實現,在下面做簡單介紹。在大部分情況下可以使用它們提供高并發環境下對集合訪問的吞吐量。

1.1 ConcurrentHashMap

Map的一個并發實現。在多線程環境下,它具有很高的吞吐量和具備可靠的數據一致性。它支持并發讀和一定程度的并發修改(默認16個并發,可以通過構造函數修改)。

HashMap的實現是非線程安全的,高并發下會get方法常會死鎖,有的時候會表現為CPU居高不下。

  1. public V get(Object key) {  
  2. if (key == null)  
  3. return getForNullKey();  
  4. int hash = hash(key.hashCode());  
  5. for (Entry e = table[indexFor(hash, table.length)];  
  6. e != null;  
  7. e = e.next) {  
  8. Object k;  
  9. if (e.hash == hash && ((k = e.key) == key || key.equals(k)))  
  10. return e.value;  
  11. }  
  12. return null;  
  13. }  
  14.  

在get操作里面for循環取對象的操作,由于高并發同時讀寫,for循環的結果變得不可預知,所以有可能一直循環。

所以高并發環境下盡量不要直接使用HashMap,對系統造成的影響很難排除。

和Collections.synchronizedMap(new HashMap(...))相比,外ConcurrentHashMap在高并發的環境下有著更優秀的吞吐量。因為ConcurrentHashMap可以支持寫并發,基本原理是內部分段,分段的數量決定著并發程度。通過concurrencyLevel參數可以設置。如果你能預期并發數量那么設置該參數可以獲取更優吞吐量。

另外為ConcurrentHashMap還實現了:

V putIfAbsent(K key, V value);

boolean remove(Object key, Object value);

boolean replace(K key, V oldValue, V newValue);

V replace(K key, V value);

這四個一致性的操作方法。

1.2 BlockingQueue

BlockingQueue定義了一個接口,繼承了Queue接口。Queue是一種數據結構,意思是它的項以先入先出(FIFO)順序存儲。

BlockingQueue為我們提供了一些多線程阻塞語義的方法,新增和重定義了一些方法插入:

 

 

BlockingQueue是線程安全的,非常適合多個生產者和多個消費者線程之間傳遞數據。

形象地理解,BlockingQueue好比有很多格子的傳輸帶系統,不過當你(生產者)調用put方法的時候,如果有空閑的格子那么放入物體后立刻返回,如果沒有空閑格子那么一直處于等待狀態。add方法意味著如果沒有空閑格子系統就會報警,然后如果處理該報警則按照你的意愿。offer方法優先于add方法,它通過返回true 或 flase來告訴你是否放入成功。offer超時方法,如果不空閑的情況下,嘗試等待一段時間。

BlockingQueue有很多實現ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue

補充Dueue是個雙向隊列,可以當做堆棧來使用。

BlockingQueue在ThreadPool中,作為任務隊列來使用,用來保存沒有立刻執行的工作任務對象。

1.3 SynchronousQueue

SychronousQueue是BlockingQueue的一個實現,它看起來是一個隊列,但是其實沒有容量,是特定條件下的一個精簡實現。

做個比喻,SychronousQueue對象就像一個接力棒,現在有兩個運動員交棒者和接棒者(線程)要做交接。在交接點,交棒者沒有交出之前是不能松開的(一種等待狀態),接棒者在接到棒之前是必須等待。換一句話說不管誰先到交接點,必須處于等待狀態。

在生產者和消費者模型中。如果生產者向SychronousQueue進行put操作,直到有另外的消費者線程進行take操作時才能返回。對消費者也是一樣,take操作會被阻塞,直到生產者put。

在這種生產者-消費者模型下,生產者和消費者是進行手對手傳遞產品,在消費者消費一個產品之前,生產者必須處于等待狀態。它給我們提供了在線程之間交換單一元素的極輕量級方法,并且具有阻塞語義。

提示:上面舉例中有寫局限性。其實生產者和消費者進程是可以任意數量的。M:N。生產線程之間會對SychronousQueue進行爭用,消費者也是一樣。

對SychronousQueue類似于其他語境中“會合通道”或 “連接”點問題。它非常適合于傳遞性設計,在這種設計中,在一個線程中運行的對象要將某些信息、事件或任務傳遞給在另一個線程中運行的對象,它就必須與該對象同步。

1.4Exchanger

是SychronousQueue的雙向實現。用來伙伴線程間交互對象。Exchanger 可能在比如遺傳算法和管道設計中很有用。

形象地說,就是兩個人在預定的地方交互物品,任何一方沒到之前都處于等待狀態。

1.5 CopyOnWriteArrayList 和 CopyOnWriteArraySet

它們分別是List接口和Set接口的實現。正如類名所描述的那樣,當數據結構發生變化的時候,會復制自身的內容,來保證一致性。大家都知道復制全部副本是非常昂貴的操作,看來這是一個非常不好的實現。事實上沒有最好和最差的方案,只有最合適的方案。一般情況下,處理多線程同步問題,我們傾向使用同步的 ArrayList,但同步也有其成本。

那么在什么情況下使用CopyOnWriteArrayList 或者CopyOnWriteArraySet呢?

數據量小。

對數據結構的修改是偶然發生的,相對于讀操作。

舉例來說,如果我們實現觀察者模式的話,作為監聽器集合是非常合適的。

1.6 TimeUnit

雖然是個時間單位,但是它也是concurrent包里面的。也許你以前的代碼里面經常出現1*60*1000來表示一分鐘,代碼可讀性很差。現在你可以通過TimeUnit來編寫可讀性更好的代碼,concurrent的api里面涉及到時間的地方都會使用該對象。

我之所以先進并發框架常用的集合,是因為線程池的實現特性都利用了BlockingQueue的一些特性。

#p#

2. ThreadPool

雖然線程和進程相比是輕量級許多,但是線程的創建成本還是不可忽律,所以就有了線程池化的設計。線程池的創建、管理、回收、任務隊列管理、任務分配等細節問題依然負責,沒有必要重復發明輪子,concurrent包已經為我們準備了一些優秀線程池的實現。

2.1 認識ExecutorService 接口

ExecutorService 接口,它能提供的功能就是用來在將來某一個時刻異步地執行一系列任務。雖然簡單一句話,但是包含了很多需求點。它的實現至少包含了線程池和任務隊列兩個方面,其實還包括了任務失敗處理策略等。

 

 

經常使用submit方法,用來提交任務對象。

簡單的例子:

ExecutorService es = Executors.newCachedThreadPool();

es.submit(new Runnable(){

@Override

public void run() {

System.out.println("do some thing");

}

});

es.shutdown();

上面的例子只是完成了提交了一個任務,異步地去執行它。但是有些使用場景更為復雜,比如等待獲得異步任務的返回結果,或者最多等上固定的時間。

submit 方法返回一個對象,Future。看起來有點別扭,代表將來的對象。其實看一下Future的方法就明白了。

 

其實Future對象代表了一個異步任務的結果,可以用來取消任務、查詢任務狀態,還有通過get方法獲得異步任務返回的結果。當調用get方法的時候,當前線程被阻塞直到任務被處理完成或者出現異常。

 

我們可以通過保存Future對象來跟蹤查詢異步任務的執行情況。

顯然Runnable接口中定義的 public void run();方法并不能返回結果對象,所以concurrent包提供了Callable接口,它可以被用來返回結果對象。

2.2 ThreadPoolExecutor

ThreadPoolExecutor實現了ExecutorService 接口,也是我們最主要使用的實現類。

首先非常有必要看一些類的最完整的構造函數

 

  1. ThreadPoolExecutor(int corePoolSize,  
  2.  
  3.    int maximumPoolSize,  
  4.  
  5. long keepAliveTime,  
  6.  
  7. TimeUnit unit,  
  8.  
  9. BlockingQueue workQueue,  
  10.  
  11. ThreadFactory threadFactory,  
  12.  
  13. RejectedExecutionHandler handler)  
  14.  

 

ThreadPoolExecutor對象中有個poolSize變量表示當前線程池中正在運行的線程數量。

注意:這個有關非常重要的關系,常常被誤解。poolSize變量和corePoolSize、maximumPoolSize以及workQueue的關系。

首先線程池被創建初期,還沒有執行任何任務的時候,poolSize等于0;

每次向線程池提交任務的時候,線程池處理過程如下:

1. 如果poolSize少于 corePoolSize,則首選添加新的線程,而不進行排隊。

2. 如果poolSize等于或多于 corePoolSize,則首選將請求加入隊列workQueue,而不添加新的線程。

3. 如果第二步執行失敗(隊已滿),則創建新的線程執行任務,但是如果果poolSize已經達到maximumPoolSize,那么就拒絕該任務。如果處理被拒絕的任務就取決于RejectedExecutionHandler handler的設置了,默認情況下會拋出異常。

系統存在四種任務拒絕策略:

在默認的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。

在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調用運行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。

在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。

在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程序尚未關閉,則位于工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程)。

keepAliveTime活動線程如果空閑一段時間是否可以回收,通常只作用于超出corePoolSize的線程。corePoolSize的線程創建了就不會被回收。但是到java 6 之后增加了public void allowCoreThreadTimeOut(boolean value)方法,允許core進程也可以根據keepAliveTime來回收,默認為false。

決定線程池特性的還有workQueue的實現類,有三種類SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue,分別對應同步隊列、無界隊列、有界隊列。

(摘自JavaDoc)

類SynchronousQueue,直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用于立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(設置maximumPoolSizes 為Integer.MAX_VALUE)。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。

LinkedBlockingQueue,無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立于其他任務,即任務執行互不影響時,適合于使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用于處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。

ArrayBlockingQueue,有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。

綜上:構造參數的設置是互相制約和影響的。只有當你重復了解其相互關系的時候、或有特殊需求的時候,才可以自己構造ThreadPoolExecutor對象,否則可以使用Executores是個工廠類。

提示使用線程池是注意處理shutdown,確保你系統關閉的時候主動關閉shutdown。

2.3 ScheduledExecutorService

擴展了ExecutorService接口,提供時間排程的功能。

 

 

schedule方法被用來延遲指定時間來執行某個指定任務。如果你需要周期性重復執行定時任務可以使用scheduleAtFixedRate或者scheduleWithFixedDelay方法,它們不同的是前者以固定頻率執行,后者以相對固定頻率執行。

(感謝wenbois2000 提出原先的錯誤,我在這里重新描述!對于原先的錯誤,實在不好意思啊,再次感謝!)不管任務執行耗時是否大于間隔時間,scheduleAtFixedRate和scheduleWithFixedDelay都不會導致同一個任務并發地被執行。唯一不同的是scheduleWithFixedDelay是當前一個任務結束的時刻,開始結算間隔時間,如0秒開始執行第一次任務,任務耗時5秒,任務間隔時間3秒,那么第二次任務執行的時間是在第8秒開始。

ScheduledExecutorService的實現類,是ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor對象包含的線程數量是沒有可伸縮性的,只會有固定數量的線程。不過你可以通過其構造函數來設定線程的優先級,來降低定時任務線程的系統占用。

特別提示:通過ScheduledExecutorService執行的周期任務,如果任務執行過程中拋出了異常,那么過ScheduledExecutorService就會停止執行任務,且也不會再周期地執行該任務了。所以你如果想保住任務都一直被周期執行,那么catch一切可能的異常。

2.4 Executors

Executores是個工廠類,用來生成ThreadPoolExecutor對象,它提供了一些常用的線程池配置方案,滿足我們大部分場景。

1. newCachedThreadPool

 

  1. public static ExecutorService newCachedThreadPool() {  
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
  3. 60L, TimeUnit.SECONDS,  
  4. new SynchronousQueue());  
  5. }  
  6.  

 

分析下出這個線程池配置的工作模式,當沒有空閑進程時就新建線程執行,當有空閑線程時就使用空閑線程執行。當線程空閑大60秒時,系統自動回收線程。

該線程池非常適合執行短小異步任務時吞吐量非常高,會重復利用CPU的能力。但是如果任務處理IO邊界任務,那么會消耗大量線程切換,降低系統吞吐量。所以執行短小的計算任務非常高效,且當沒有任務時不會消耗系統資源。

注意:線程池中沒有變量表示線程是否空閑。那么程序是如何控制的呢?不得不贊嘆concurrent實現的非常精巧。當創建出來的線程完成原來的任務后,會調用BlockingQueue的Poll方法,對于SynchronousQueue實現而言會阻塞調用線程,直到另外的線程offer調用。

然而ThreadPool在分配任務的時候總是先去嘗試調用offer方法,所以就會觸發空閑線程再次調用。

精妙的是ThreadPoolExecutor的處理邏輯一樣,但是用BlockingQueue實現變了就產生不同的行為。

2. newFixedThreadPool

 

  1. public static ExecutorService newFixedThreadPool(int nThreads) {  
  2.  
  3. return new ThreadPoolExecutor(nThreads, nThreads,  
  4.  
  5. 0L, TimeUnit.MILLISECONDS,  
  6.  
  7. new LinkedBlockingQueue());  
  8.  
  9. }  
  10.  

 

創建固定線程數量的線程池,采用無界隊列,當有更多任務的時候將被放入工作隊列中排隊。如果線程池不經常執行任務時,你可以調用allowCoreThreadTimeOut(boolean value)的方法讓系統自動回收core的進程,以節約系統資源。

3. newSingleThreadExecutor

 

  1. public static ExecutorService newSingleThreadExecutor() {  
  2.  
  3. return new FinalizableDelegatedExecutorService  
  4.  
  5. (new ThreadPoolExecutor(11,  
  6.  
  7. 0L, TimeUnit.MILLISECONDS,  
  8.  
  9. new LinkedBlockingQueue()));  
  10.  
  11. }  
  12.  

只有一個工作線程的線程池。和newFixedThreadPool(1)相比,不同之處有兩點:

1. 不可以重新配置newSingleThreadExecutor創建出來的線程池。

2. 當創建出來的線程池對象被GC回收時,會自動調用shutdown方法。

4.newScheduledThreadPool

 

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
  2.        return new ScheduledThreadPoolExecutor(corePoolSize);  
  3.    }  

 

生成一個可以執行時間調度的線程池。其實內部使用無界工作隊列,線程數量最多能達到corePoolSize。

2.5 ExecutorCompletionService

這是個巧妙的設計,內部維護了一已經完成了任務結果隊列,通過take方法可以同步地等待一個個結果對象。

詳情見http://www.oschina.net/uploads/doc/javase-6-doc-api-zh_CN/java/util/concurrent/ExecutorCompletionService.html

#p#

3. java.util.concurrent.locks

java 早期內置synchronized關鍵字解決多線程對共享資源訪問的一些問題,和其還配套了Object的notify 和 wait方法,用來控制線程之間的同步。

concurrent軟件包提供了更為高級和抽象的Lock工具,能解決更多的問題。

Lock是控制多個線程對共享資源進行訪問的工具。通常Lock限定多線程對同一個共享資源訪問的限制,一次只允許一個線程獲得Lock,即獲得對共享資源的訪問權限,線程間是互斥的。但是也有一些鎖如果ReadWriteLock是允許部分線程同時訪問共享資源的。

幾個術語:

爭用:當多個Thread在同一時間內(相對概念)想要占有同一個Lock對象。那么JVM會調度解決爭用。

獲取順序:當多個線程爭用同一個Lock對象,那么JVM就要決定哪個線程將會獲得鎖權限。存在兩種模式:公平和不公平。 默認都是不公平模式,包括synchronized關鍵字,jvm決定順序的時候也是采用不公平策略。因為公平策略需要系統記錄大量輔助信息來判斷分配順序,而不公平策略由JVM決定一直快速高效的算法來分配Lock。所以不公平策略的系統吞吐量會比較高(花費更少的空間和計算在分配上),如果沒有特殊需要則默認采用不公平策略。

重入:當前線程獲取指定的鎖對象權限后,還可以再次獲取該鎖。Lock內部會有一個計數器來表明當前線程獲取了該鎖的數量。如果一個線程獲取了一個鎖兩次,那么線程必須釋放鎖兩次,才能被看作完全釋放了該鎖,所以編程的時候一定要注意使用重入。synchronized關鍵字也是支持重入語義的。

3.1 Lock & ReentrantLock

ReentrantLock實現了Lock接口,一個可重入(reentrant)的互斥鎖 Lock,它具有與使用 synchronized 方法和語句所訪問的隱式監視器鎖相同的一些基本行為和語義,但功能更強大。

摘自JavaDoc的一段獲取規則 “當鎖沒有被另一個線程所擁有時,調用 lock 的線程將成功獲取該鎖并返回。如果當前線程已經擁有該鎖,此方法將立即返回。ReentrantLock 將由最近成功獲得鎖,并且還沒有釋放該鎖的線程所擁有。”

經典使用方法。

 

  1. public void m() {   
  2.     lock.lock();  // block until condition holds  
  3.     try {  
  4.       // ... method body  
  5.     } finally {  
  6.       lock.unlock()  
  7.     }  
  8.   }  
  9.  

 

ReentrantLock除了實現了Lock規定的方法外,還實現了tryLock、isLocked 等方法,幫助你實現更多的場景。

Condition

和Object的wait和notify方法類似。ReentrantLock對象附加了Conditon對象,用來完成掛起和喚醒操作,使用lock.newCondition() 方法生成。

一個來自JKD的例子:

 

  1. class BoundedBuffer {  
  2.   final Lock lock = new ReentrantLock();  
  3.   final Condition notFull  = lock.newCondition();   
  4.   final Condition notEmpty = lock.newCondition();   
  5.  
  6.   final Object[] items = new Object[100];  
  7.   int putptr, takeptr, count;  
  8.  
  9.   public void put(Object x) throws InterruptedException {  
  10.     lock.lock();  
  11.     try {  
  12.       while (count == items.length)   
  13.         notFull.await();  
  14.       items[putptr] = x;   
  15.       if (++putptr == items.length) putptr = 0;  
  16.       ++count;  
  17.       notEmpty.signal();  
  18.     } finally {  
  19.       lock.unlock();  
  20.     }  
  21.   }  
  22.  
  23.   public Object take() throws InterruptedException {  
  24.     lock.lock();  
  25.     try {  
  26.       while (count == 0)   
  27.         notEmpty.await();  
  28.       Object x = items[takeptr];   
  29.       if (++takeptr == items.length) takeptr = 0;  
  30.       --count;  
  31.       notFull.signal();  
  32.       return x;  
  33.     } finally {  
  34.       lock.unlock();  
  35.     }  
  36.   }   
  37. }  

 

利用Conditon對象可以讓所有對同一個鎖對象進行爭用的Thread之間進行同步。

Lock VS synchronized

除非你有明確的需求或者并發遇到瓶頸的時候再決定使用ReentrantLock。synchronized在大部分時候還是可以工作的很好,jvm會自動處理和回收鎖。

ReentrantLock提供了更多的選擇和狀態信息。和

3.2 ReadWriteLock & ReentrantReadWriteLock

列舉一個場景對象X,擁有方法a、b、c。a和b方法不改表X的內部狀態,c改變內部狀態。在多線程環境下,我們要求只讀和寫(變更狀態)是不能同時進行的,而只讀操作是可以同時并發的,且實際運行過程中讀操作數量遠遠大于寫操作的數量。

如果用synchronized關鍵字的話,兩個只讀方法a、b也會互斥,并發性能收到限制。

那么這個情況下ReadWriteLock就非常有用,使用也非常簡單。

 

  1. class RWDictionary {  
  2.    private final Map m = new TreeMap();  
  3.    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();  
  4.    private final Lock r = rwl.readLock();  
  5.    private final Lock w = rwl.writeLock();  
  6.  
  7.    public Data get(String key) {  
  8.        r.lock();  
  9.        try { return m.get(key); }  
  10.        finally { r.unlock(); }  
  11.    }  
  12.    public String[] allKeys() {  
  13.        r.lock();  
  14.        try { return m.keySet().toArray(); }  
  15.        finally { r.unlock(); }  
  16.    }  
  17.    public Data put(String key, Data value) {  
  18.        w.lock();  
  19.        try { return m.put(key, value); }  
  20.        finally { w.unlock(); }  
  21.    }  
  22.    public void clear() {  
  23.        w.lock();  
  24.        try { m.clear(); }  
  25.        finally { w.unlock(); }  
  26.    }  
  27. }  
  28.  

 

要記得write鎖是獨占的,它一樣可以使用ReentrantLock的Condition功能。

使用任何的鎖都要通過try catch 或者 finally 來處理異常,避免忘記unlock。

#p#

4. 同步輔助類

你提交了一些任務,但你想等它們都完成了再做另外一些事情;你提交了一些任務,但是不想讓它們立刻執行,等你喊123開始的時候,它們才開始執行;等等這些場景,線程之間需要相互配合,或者等待某一個條件成熟執行。這些場景想你就需要用到同步輔助類。

4.1 CountDownLatch

CountDownLatch 內部有個計數器,通過構造函數來指定。這個類就好比是倒計時的電子牌,當倒計時為0的時候就可以一起做一些事情。

摘自JavaDoc的方法介紹

 

 

摘自JavaDoc的例子

 

  1. class Driver { // ...  
  2.   void main() throws InterruptedException {  
  3.     CountDownLatch startSignal = new CountDownLatch(1);  
  4.     CountDownLatch doneSignal = new CountDownLatch(N);  
  5.  
  6.     for (int i = 0; i < N; ++i) // create and start threads  
  7.       new Thread(new Worker(startSignal, doneSignal)).start();  
  8.  
  9.     doSomethingElse();            // don't let run yet  
  10.     startSignal.countDown();      // let all threads proceed  
  11.     doSomethingElse();  
  12.     doneSignal.await();           // wait for all to finish  
  13.   }  
  14. }  
  15.  
  16. class Worker implements Runnable {  
  17.   private final CountDownLatch startSignal;  
  18.   private final CountDownLatch doneSignal;  
  19.   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {  
  20.      this.startSignal = startSignal;  
  21.      this.doneSignal = doneSignal;  
  22.   }  
  23.   public void run() {  
  24.      try {  
  25.        startSignal.await();  
  26.        doWork();  
  27.        doneSignal.countDown();  
  28. catch (InterruptedException ex) {} // return;  
  29.   }  
  30.  
  31.   void doWork() { ... }  
  32. }  
  33.  

 

當CountDownLatch(1)的時候,它就好比是個信號槍了。

4.2 CyclicBarrier

  1. new CyclicBarrier(N,   
  2.             new Runnable() {  
  3.               public void run() {   
  4.                mergeRows(...);   
  5.     }  
  6. }); 

這個同步輔助類,它讓多個線程可以在多個屏障點進行等待,所以叫cyclic,而且有個附加選擇你可以在線程到達屏障點后執行一個任務(在釋放其他線程之前)

 

 

為了幫助你理解,假設一個場景。

有一個任務,A、B、C分別從三個倉庫(甲乙丙)搬運不同3個不同的零件到客戶X的公司,然后再一起組裝機器,完成后一起坐車去公司總部。

這個任務需要ABC三個線程同時進行,但是由于從倉庫到客戶X那邊距離不等、交通狀態未知的情況下,所花費的時間也不等。同時由于三個人負責的零件不同,所以安裝機器的時候花費時間也不一樣。這個場景中有兩個需要線程間等待的地方。CyclicBarrier就可以閃亮登場了。

 

  1. public class Main3 {  
  2.  
  3. public static void main(String[] args) {  
  4.  
  5. CyclicBarrier barrier = new CyclicBarrier(3,new Runnable() {  
  6.  
  7. @Override 
  8.  
  9. public void run() {  
  10.  
  11. System.out.println("到達公共屏障點");  
  12.  
  13. }  
  14.  
  15. });  
  16.  
  17. ExecutorService es = Executors.newCachedThreadPool();  
  18.  
  19. es.submit(new Worker("A"50008000, barrier));  
  20.  
  21. es.submit(new Worker("B"200016000, barrier));  
  22.  
  23. es.submit(new Worker("C"90002000, barrier));  
  24.  
  25. es.shutdown();  
  26.  
  27. }  
  28.  
  29. static class Worker implements Runnable {  
  30.  
  31. String name;  
  32.  
  33. int t1;// 搬運零件所需要的時間  
  34.  
  35. int t2;// 參與組裝工作需要的時間  
  36.  
  37. CyclicBarrier barrier;  
  38.  
  39. public Worker(String name, int t1, int t2, CyclicBarrier barrier) {  
  40.  
  41. super();  
  42.  
  43. this.name = name;  
  44.  
  45. this.t1 = t1;  
  46.  
  47. this.t2 = t2;  
  48.  
  49. this.barrier = barrier;  
  50.  
  51. }  
  52.  
  53. @Override 
  54.  
  55. public void run() {  
  56.  
  57. try {  
  58.  
  59. print(name + " 開始搬運零件");  
  60.  
  61. Thread.sleep(t1);// 模擬搬運時間  
  62.  
  63. print(name + " 到達目的地");  
  64.  
  65. int a = barrier.await(); // 等待其他人  
  66.  
  67. if(a==0){  
  68.  
  69. //說明是最后一個到的可以執行特殊操作  
  70.  
  71. }  
  72.  
  73. print(name + " 開始組裝機器");  
  74.  
  75. Thread.sleep(t2);// 模擬組裝時間.  
  76.  
  77. print(name + " 完成組裝機器");  
  78.  
  79. barrier.await(); // 等待其他人組裝完畢  
  80.  
  81. print(name + " 一起回總公司");  
  82.  
  83. catch (Exception e) {  
  84.  
  85. e.printStackTrace();  
  86.  
  87. }  
  88.  
  89. }  
  90.  
  91. }  
  92.  
  93. static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
  94.  
  95. static void print(String x) {  
  96.  
  97. System.out.println( sdf.format(new Date()) + ": "+x);  
  98.  
  99. }  
  100.  
  101. }  

 

4.3 Semaphore

一個經典的信號量計數器。一般被用來控制對共享資源同時訪問線程數量的控制。

特殊情況下信號量設置為1,那么就類似互斥鎖的功能。

此類的構造方法可選地接受一個公平 參數。當設置為 false 時,此類不對線程獲取鎖的順序做任何保證。和之前提到的爭用獲取順序一樣,在非公平模式下,系統將獲得更好的吞吐量,jvm也會保證在非公平模式下讓所有線程得到訪問機會。

【編輯推薦】

  1. java.util.concurrent 您不知道的 5 件事
  2. util.concurrent移植到C#
責任編輯:金賀 來源: JavaEye博客
相關推薦

2013-02-26 09:23:16

JavaJava類接口

2021-11-17 07:44:29

React 前端 組件

2010-07-12 10:03:50

ibmdwjava

2009-08-14 14:50:41

util.concur

2021-01-13 11:29:43

Python多線程異步

2024-01-17 12:44:23

Python并發編程

2024-12-24 08:03:56

2020-11-06 13:25:38

React Concu

2013-03-22 09:56:29

大數據應用框架MapReduce

2024-10-31 09:30:05

線程池工具Java

2022-06-15 07:32:35

Lock線程Java

2025-03-28 04:00:00

互聯網Java讀操作

2009-07-09 10:28:19

線程池JDK5

2022-12-15 19:27:33

多線程代碼性能

2021-06-07 14:04:13

并發編程Future

2013-04-03 11:07:46

JavaJava線程

2020-07-08 12:05:55

Java線程池策略

2009-06-18 08:51:03

Spring3.0 M

2021-01-20 08:36:15

工具AtomicRefer JDK

2025-04-03 07:41:55

API阻塞隊列數據
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲精品国产成人 | 亚洲精品久久久久久首妖 | 久久久精品影院 | 国产精品亚洲成在人线 | 亚洲不卡视频 | 欧美日韩国产精品一区 | 超碰日本 | 欧美6一10sex性hd | 久久久久久免费观看 | 欧美日韩精品免费观看 | 成人精品毛片国产亚洲av十九禁 | 日日干夜夜操天天操 | 一级免费毛片 | 亚洲欧美中文日韩在线v日本 | av三级在线观看 | 四虎影音| 欧美精品三区 | 不卡一二区 | 国产丝袜av | 亚洲精品二三区 | 五月天激情综合网 | 天天操伊人 | 国产伦精品一区二区三区高清 | 精品欧美一区二区在线观看欧美熟 | 日韩色在线 | 亚洲精品久久久久中文字幕欢迎你 | 亚洲一区二区三区欧美 | 亚洲欧美精品在线 | 日韩网站免费观看 | 日韩av在线一区二区 | 欧美高清性xxxxhdvideosex | 欧美成人精品激情在线观看 | 欧美成年人网站 | 日本视频一区二区三区 | 欧美一区二区三区在线视频 | 欧美激情区| 亚洲精品一区二区三区 | 999久久久久久久久6666 | 91极品尤物在线播放国产 | 国产精品三级 | 亚洲女人天堂网 |