多線程與高并發干貨筆記分享,造起來
本文轉載自微信公眾號「牧小農」,作者牧小農。轉載本文請聯系牧小農公眾號。
1. 創建線程的四種方式
- 實現Runnable 重寫run方法
- 繼承Thread 重寫run方法
- 線程池創建 Executors.newCachedThreadPool()
- 實現Callable接口
2. Thread線程操作方法
當前線程睡眠指定mills毫秒
- Thread.sleep([mills])
當前線程優雅讓出執行權
- Thread.yield()
例如Thread t1, t2,在t2的run方法中調用t1.join(),線程t2將等待t1完成后執行
- join
3. Thread狀態
狀態 | 使用場景 |
---|---|
NEW | Thread被創建之后,未start之前 |
RUNNABLE | 在調用start()方法之后,這也是線程進入運行狀態的唯一一種方式。 具體分為ready跟running,當線程被掛起或者調用Thread.yield()的時候為ready |
WAITING | 當一個線程執行了Object.wait()的時候,它一定在等待另一個線程執行Object.notify()或者Object.notifyAll()。 或者一個線程thread,其在主線程中被執行了thread.join()的時候,主線程即會等待該線程執行完成。當一個線程執行了LockSupport.park()的時候,其在等待執行LockSupport.unpark(thread)。當該線程處于這種等待的時候,其狀態即為WAITING。需要關注的是,這邊的等待是沒有時間限制的,當發現有這種狀態的線程的時候,若其長時間處于這種狀態,也需要關注下程序內部有無邏輯異常。 |
TIMED_WAITING |
這個狀態和WAITING狀態的區別就是,這個狀態的等待是有一定時效的 Thread.sleep(long) Object.wait(long) Thread.join(long) LockSupport.parkNanos() LockSupport.parkUntil() |
BLOCKED | 在進入synchronized關鍵字修飾的方法或代碼塊(獲取鎖)時的狀態 |
TERMINATED | 線程執行結束之后的狀態。 線程一旦終止了,就不能復生。 在一個終止的線程上調用start()方法,會拋出java.lang.IllegalThreadStateException異常 |
4. synchronized
- 鎖住的是對象而不是代碼
- this 等價于 當前類.class
- 鎖定方法,非鎖定方法同時進行
- 鎖在執行過程中發生異常會自動釋放鎖
- synchronized獲得的鎖是可重入的
- 鎖升級 偏向鎖-自旋鎖-重量級鎖
- synchronized(object)不能用String常量/Integer,Long等基本數據類型
- 鎖定對象的時候要保證對象不能被重寫,最好加final定義
4. volatile
- 保證線程可見性
- 禁止指令重排序
- volatile并不能保證多個線程修改的一致性,要保持一致性還是需要synchronized關鍵字
- volatile 引用類型(包括數組)只能保證引用本身的可見性,不能保證內部字段的可見性 volatile關 鍵字只能用于變量而不可以修飾方法以及代碼塊
5. synchronized與AtomicLong以及LongAdder的效率對比
Synchronized 是需要加鎖的,效率偏低;AtomicLong 不需要申請鎖,使用CAS機制;LongAdder 使用分段鎖,所以效率好,在并發數量特別高的時候,LongAdder最合適
6. ConcurrentHashMap的分段鎖原理
分段鎖就是將數據分段上鎖,把鎖進一步細粒度化,有助于提升并發效率。HashTable容器在競爭激烈的并發環境下表現出效率低下的原因是所有訪問HashTable的線程都必須競爭同一把鎖,假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效提高并發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。首先將數據分成一段一段地存儲,然后給每一段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。
7. ReentrantLock
ReentrantLock可以替代synchronized 但是ReentrantLock必須手動開啟鎖/關閉鎖,synchronized遇到異常會自動釋放鎖,ReentrantLock需要手動關閉,一般都是放在finally中關閉 定義鎖 Lock lock = new ReentrantLock(); 開啟 lock.lock(); 關閉 lock.unlock(); 使用Reentrantlock可以進行“嘗試鎖定”tryLock,這樣無法鎖定,或者在指定時間內無法鎖定,線程可以決定是否繼續等待。使用tryLock進行嘗試鎖定,不管鎖定與否,方法都將繼續執行 可以根據tryLock的返回值來判定是否鎖定 也可以指定tryLock的時間,由于tryLock(time)拋出異常,所以要注意unclock的處理,必須放到finally中,如果tryLock未鎖定,則不需要unlock 使用ReentrantLock還可以調用lockInterruptibly方法,可以對線程interrupt方法做出響應,在一個線程等待鎖的過程中,可以被打斷 new ReentrantLock(true) 表示公平鎖,不帶參數默認為false,非公平鎖
8. CountDownLatch
countDownLatch這個類可以使一個線程等待其他線程各自執行完畢后再執行。是通過一個計數器來實現的,計數器的初始值是線程的數量。當調用countDown()方法后,每當一個線程執行完畢后,計數器的值就-1,當計數器的值為0時,表示所有線程都執行完畢,然后在閉鎖上等待的線程就可以恢復工作了。
線程中調用countDown()方法開始計數;在調用await()方法的線程中,當計數器為0后續才會繼續執行,否則一直等待;也可以使用latch.await(timeout, unit)在等待timeout時間后如果計數器不為0,線程仍將繼續。countDown()之后的代碼不受計數器控制 與join區別,使用join的線程將被阻塞,使用countDown的線程不受影響,只有調用await的時候才會阻塞
8. CyclicBarrier
作用就是會讓指定數量的(數量由構造函數指定)所有線程都等待完成后才會繼續下一步行動。構造函數:public CyclicBarrier(int parties)
- public CyclicBarrier(int parties)
- public CyclicBarrier(int parties, Runnable barrierAction)
parties 是線程的個數;barrierAction為最后一個到達線程要做的任務
所有線程會等待全部線程到達柵欄之后才會繼續執行,并且最后到達的線程會完成 Runnable 的任務。
實現原理:在CyclicBarrier的內部定義了一個Lock對象,每當一個線程調用await方法時,將攔截的線程數減1,然后判斷剩余攔截數是否為初始值parties,如果不是,進入Lock對象的條件隊列等待。如果是,執行barrierAction對象的Runnable方法,然后將鎖的條件隊列中的所有線程放入鎖等待隊列中,這些線程會依次的獲取鎖、釋放鎖。
9. Phaser
可重復使用的同步屏障,功能類似于CyclicBarrier和CountDownLatch,但支持更靈活的使用。
Phaser使我們能夠建立在邏輯線程需要才去執行下一步的障礙等。
我們可以協調多個執行階段,為每個程序階段重用Phaser實例。每個階段可以有不同數量的線程等待前進到另一個階段。我們稍后會看一個使用階段的示例。
要參與協調,線程需要使用Phaser實例 register() 本身。請注意:這只會增加注冊方的數量,我們無法檢查當前線程是否已注冊 - 我們必須將實現子類化以支持此操作。
線程通過調用 arriAndAwaitAdvance() 來阻止它到達屏障,這是一種阻塞方法。當數量到達等于注冊的數量時,程序的執行將繼續,并且數量將增加。我們可以通過調用getPhase()方法獲取當前數量。
10. ReadWriteLock
ReadWriteLock的具體實現是ReentrantReadWriteLock
ReadWriteLock允許分別創建讀鎖跟寫鎖
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- Lock readLock = readWriteLock.readLock();
- Lock writeLock = readWriteLock.writeLock();
使用ReadWriteLock時,適用條件是同一個數據,有大量線程讀取,但僅有少數線程修改。ReadWriteLock可以保證:
只允許一個線程寫入(其他線程既不能寫入也不能讀取);
沒有寫入時,多個線程允許同時讀(提高性能)
讀寫分離鎖可以有效地幫助減少鎖競爭,以提高系統性能,讀寫鎖讀讀之間不互斥,讀寫,寫寫都是互斥的
11. Semaphore
Semaphore 是一個計數信號量,必須由獲取它的線程釋放。常用于限制可以訪問某些資源的線程數量,例如通過 Semaphore 限流。
對于Semaphore來說,它要保證的是資源的互斥而不是資源的同步,在同一時刻是無法保證同步的,但是卻可以保證資源的互斥。只是限制了訪問某些資源的線程數,其實并沒有實現同步。
常用方法:
1、acquire(int permits)
從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被中斷。就好比是一個學生占兩個窗口。這同時也對應了相應的release方法。
2、release(int permits)
釋放給定數目的許可,將其返回到信號量。這個是對應于上面的方法,一個學生占幾個窗口完事之后還要釋放多少
3、availablePermits()
返回此信號量中當前可用的許可數。也就是返回當前還有多少個窗口可用。
4、reducePermits(int reduction)
根據指定的縮減量減小可用許可的數目。
5、hasQueuedThreads()
查詢是否有線程正在等待獲取資源。
6、getQueueLength()
返回正在等待獲取的線程的估計數目。該值僅是估計的數字。
7、tryAcquire(int permits, long timeout, TimeUnit unit)
如果在給定的等待時間內此信號量有可用的所有許可,并且當前線程未被中斷,則從此信號量獲取給定數目的許可。
8、acquireUninterruptibly(int permits)
從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。
12. Exchanger
用于兩個工作線程之間交換數據的封裝工具類,簡單說就是一個線程在完成一定的事務后想與另一個線程交換數據,則第一個先拿出數據的線程會一直等待第二個線程,直到第二個線程拿著數據到來時才能彼此交換對應數據。其定義為 Exchanger泛型類型,其中 V 表示可交換的數據類型,對外提供的接口很簡單,具體如下:
Exchanger():無參構造方法。
V exchange(V v):等待另一個線程到達此交換點(除非當前線程被中斷),然后將給定的對象傳送給該線程,并接收該線程的對象。
V exchange(V v, long timeout, TimeUnit unit):等待另一個線程到達此交換點(除非當前線程被中斷或超出了指定的等待時間),然后將給定的對象傳送給該線程,并接收該線程的對象。
13. LockSupport
LockSupport 是一個非常方便實用的線程阻塞工具,他可以在任意位置讓線程阻塞。
LockSupport 的靜態方法 park()可以阻塞當前線程,類似的還有 parkNanos(),parkUntil()等,他們實現了一個限時的等待。
方法 | 描述 |
---|---|
void park(): | 阻塞當前線程,如果調用unpark方法或者當前線程被中斷,從能從park()方法中返回 |
void park(Object blocker) | 功能同方法1,入參增加一個Object對象,用來記錄導致線程阻塞的阻塞對象,方便進行問題排查; |
void parkNanos(long nanos) | 阻塞當前線程,最長不超過nanos納秒,增加了超時返回的特性; |
void parkNanos(Object blocker, long nanos) | 功能同方法3,入參增加一個Object對象,用來記錄導致線程阻塞的阻塞對象,方便進行問題排查; |
void parkUntil(long deadline) | 阻塞當前線程,直到deadline; |
void parkUntil(Object blocker, long deadline) | 功能同方法5,入參增加一個Object對象,用來記錄導致線程阻塞的阻塞對象,方便進行問題排查; |
同樣的,有阻塞的方法,當然有喚醒的方法,什么呢?unpark(Thread) 方法。該方法可以將指定線程喚醒。
需要注意的是:park 方法和 unpark 方法執行順序不是那么的嚴格。比如我們在 Thread 類中提到的 suspend 方法 和resume 方法,如果順序錯誤,將導致永遠無法喚醒,但 park 方法和 unpark 方法則不會,因為 LockSupport 使用了類似信號量的機制。他為每一個線程準備了一個許可(默認不可用),如果許可能用,那么 park 函數會立即返回,并且消費這個許可(也就是將許可變為不可用),如果許可不可用,將會阻塞。而 unpark 方法則使得一個許可變為可用
14. AQS
AQS 為 AbstractQueuedSynchronizer 的簡稱
AQS是JDK下提供的一套用于實現基于FIFO等待隊列的阻塞鎖和相關的同步器的一個同步框架。這個抽象類被設計為作為一些可用原子int值來表示狀態的同步器的基類。AQS管理一個關于狀態信息的單一整數,該整數可以表現任何狀態。比如 Semaphore 用它來表現剩余的許可數, ReentrantLock 用它來表現擁有它的線程已經請求了多少次鎖;FutureTask 用它來表現任務的狀態(尚未開始、運行、完成和取消)
使用須知:
Usage
- To use this class as the basis of a synchronizer, redefine the
- following methods, as applicable, by inspecting and/or modifying
- the synchronization state using {@link #getState}, {@link
- #setState} and/or {@link #compareAndSetState}:
- {@link #tryAcquire}
- {@link #tryRelease}
- {@link #tryAcquireShared}
- {@link #tryReleaseShared}>
- {@link #isHeldExclusively}
以上方法不需要全部實現,根據獲取的鎖的種類可以選擇實現不同的方法: 支持獨占(排他)獲取鎖的同步器應該實現tryAcquire、 tryRelease、isHeldExclusively; 支持共享獲取鎖的同步器應該實現tryAcquireShared、tryReleaseShared、isHeldExclusively。
- AQS淺析
AQS的實現主要在于維護一個"volatile int state"(代表共享資源)和 一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。隊列中的每個節點是對線程的一個封裝,包含線程基本信息,狀態,等待的資源類型等。
state的訪問方式有三種:
getState() setState() compareAndSetState()
AQS定義兩種資源共享方式
Exclusive(獨占,只有一個線程能執行,如ReentrantLock) Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch) 不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可, 至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:
isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。
以ReentrantLock為例
state初始化為0,表示未鎖定狀態。
A線程lock()時,會調用tryAcquire()獨占該鎖并將state+1。
此后,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會獲取該鎖。
當然,釋放鎖之前,A線程自己是可以重復獲取此鎖的(state會累加),這就是可重入的概念。
但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態的。
以CountDownLatch為例
任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一致)。
這N個子線程是并行執行的,每個子線程執行完后countDown()一次,state會CAS減1。
等到所有子線程都執行完后(即state=0),會unpark()主調用線程,然后主調用線程就會從await()函數返回,繼續后余動作。
一般來說,自定義同步器要么是獨占方法,要么是共享方式,
他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。
但AQS也支持自定義同步器同時實現獨占和共享兩種方式,如"ReentrantReadWriteLock"。
15. 鎖基本概念
公平鎖/非公平鎖
可重入鎖
獨享鎖/共享鎖
互斥鎖/讀寫鎖
樂觀鎖/悲觀鎖
分段鎖
偏向鎖/輕量級鎖/重量級鎖
自旋鎖
- 公平鎖/非公平鎖
公平鎖是指多個線程按照申請鎖的順序來獲取鎖。
非公平鎖是指多個線程獲取鎖的順序并不是按照申請鎖的順序, 有可能后申請的線程比先申請的線程優先獲取鎖; 有可能會造成優先級反轉或者饑餓現象。
對于Java ReentrantLock而言,通過構造函數指定該鎖是否是公平鎖,默認是非公平鎖。
非公平鎖的優點在于吞吐量比公平鎖大。
對于Synchronized而言,也是一種非公平鎖。由于其并不像ReentrantLock是通過AQS的來實現線程調度, 所以并沒有任何辦法使其變成公平鎖。
- 可重入鎖
可重入鎖又名遞歸鎖,是指在同一個線程在外層方法獲取鎖的時候,在進入內層方法會自動獲取鎖。
ReentrantLock, Synchronized都是可重入鎖。
可重入鎖的一個好處是可一定程度避免死鎖
- 獨享(排他)鎖/共享鎖
獨享鎖是指該鎖一次只能被一個線程所持有。
共享鎖是指該鎖可被多個線程所持有。
對于ReentrantLock而言,其是獨享鎖。
但是對于Lock的另一個實現類ReadWriteLock,其讀鎖是共享鎖,其寫鎖是獨享鎖。讀鎖的共享鎖可保證并發讀是非常高效的,讀寫,寫讀 ,寫寫的過程是互斥的。獨享鎖與共享鎖也是通過AQS來實現的,通過實現不同的方法,來實現獨享或者共享。
對于Synchronized而言,當然是獨享鎖。
- 互斥鎖/讀寫鎖
上面講的獨享鎖/共享鎖就是一種廣義的說法,互斥鎖/讀寫鎖就是具體的實現。
互斥鎖在Java中的具體實現就是ReentrantLock
讀寫鎖在Java中的具體實現就是ReadWriteLock
- 樂觀鎖/悲觀鎖
樂觀鎖與悲觀鎖不是指具體的什么類型的鎖,而是指看待并發同步的角度。
悲觀鎖(Synchronized 和 ReentrantLock)
認為對于同一個數據的并發操作,一定是會發生修改的,哪怕沒有修改,也會認為修改。
因此對于同一個數據的并發操作,悲觀鎖采取加鎖的形式。悲觀的認為,不加鎖的并發操作一定會出問題。
- 樂觀鎖(java.util.concurrent.atomic包)
認為對于同一個數據的并發操作,是不會發生修改的。在更新數據的時候,會采用嘗試更新,不斷重新的方式更新數據。樂觀的認為,不加鎖的并發操作是沒有事情的。
悲觀鎖適合寫操作非常多的場景,樂觀鎖適合讀操作非常多的場景,
不加鎖會帶來大量的性能提升。
悲觀鎖在Java中的使用,就是利用各種鎖。
樂觀鎖在Java中的使用,是無鎖編程,常常采用的是CAS算法。典型的例子就是原子類,通過CAS自旋實現原子操作的更新。
- 分段鎖
分段鎖其實是一種鎖的設計,并不是具體的一種鎖,ConcurrentHashMap并發的實現就是通過分段鎖的形式來實現高效的并發操作。
ConcurrentHashMap中的分段鎖稱為Segment, 它類似于HashMap(JDK7與JDK8中HashMap的實現)的結構, 即內部擁有一個Entry數組,數組中的每個元素又是一個鏈表;同時又是一個ReentrantLock(Segment繼承了ReentrantLock)。當需要put元素的時候,并不是對整個hashmap進行加鎖, 而是先通過hashcode來知道他要放在那一個分段中,然后對這個分段進行加鎖, 所以當多線程put的時候,只要不是放在一個分段中,就實現了真正的并行的插入。但是,在統計size的時候,可就是獲取hashmap全局信息的時候,就需要獲取所有的分段鎖才能統計。
分段鎖的設計目的是細化鎖的粒度,當操作不需要更新整個數組的時候, 就僅僅針對數組中的一項進行加鎖操作。
- 偏向鎖/輕量級鎖/重量級鎖
這三種鎖是指鎖的狀態,并且是針對Synchronized。在Java 5通過引入鎖升級的機制來實現高效Synchronized。這三種鎖的狀態是通過對象監視器在對象頭中的字段來表明的。
- 偏向鎖
是指一段同步代碼一直被一個線程所訪問,那么該線程會自動獲取鎖。降低獲取鎖的代價。
- 輕量級鎖
是指當鎖是偏向鎖的時候,被另一個線程所訪問,偏向鎖就會升級為輕量級鎖, 其他線程會通過自旋的形式嘗試獲取鎖,不會阻塞,提高性能。
- 重量級鎖
是指當鎖為輕量級鎖的時候,另一個線程雖然是自旋,但自旋不會一直持續下去, 當自旋一定次數的時候,還沒有獲取到鎖,就會進入阻塞,該鎖膨脹為重量級鎖。重量級鎖會讓其他申請的線程進入阻塞,性能降低。
- 自旋鎖
在Java中,自旋鎖是指嘗試獲取鎖的線程不會立即阻塞,而是采用循環的方式去嘗試獲取鎖, 這樣的好處是減少線程上下文切換的消耗,缺點是循環會消耗CPU。典型的自旋鎖實現的例子,可以參考自旋鎖的實現