精通Java并發(fā)鎖機(jī)制:24種鎖技巧+業(yè)務(wù)鎖匹配方案
在 Java 并發(fā)編程中,鎖是確保線程安全、協(xié)調(diào)多線程訪問(wèn)共享資源的關(guān)鍵機(jī)制。從基本的 synchronized 同步關(guān)鍵字到高級(jí)的 ReentrantLock、讀寫(xiě)鎖 ReadWriteLock、無(wú)鎖設(shè)計(jì)如 AtomicInteger,再到復(fù)雜的同步輔助工具如 CountDownLatch、 CyclicBarrier 和 Semaphore,每種鎖都針對(duì)特定的并發(fā)場(chǎng)景設(shè)計(jì),以解決多線程環(huán)境下的同步問(wèn)題。 StampedLock 提供了樂(lè)觀讀鎖和悲觀寫(xiě)鎖的選項(xiàng),而 ConcurrentHashMap 和 ConcurrentLinkedQueue 等并發(fā)集合則通過(guò)內(nèi)部機(jī)制優(yōu)化了并發(fā)訪問(wèn)。了解不同鎖的特點(diǎn)和適用場(chǎng)景,對(duì)于構(gòu)建高效、穩(wěn)定的并發(fā)應(yīng)用程序至關(guān)重要。
1、鎖選擇維度
選擇適合的鎖通常依賴于特定的應(yīng)用場(chǎng)景和并發(fā)需求。以下是一個(gè)表格,概述了不同鎖類型的關(guān)鍵特性和選擇它們的考量維度:
鎖類型 | 適用場(chǎng)景 | 鎖模式 | 性能特點(diǎn) | 公平性 | 鎖的粗細(xì) | 條件支持 | 阻塞策略 | 用途舉例 |
| 簡(jiǎn)單的同步需求,無(wú)需復(fù)雜控制 | 獨(dú)占式 | 適中,偏向鎖、輕量級(jí)鎖優(yōu)化 | 無(wú)公平策略 | 粗粒度鎖 | 不支持 | 阻塞等待 | 單例模式、簡(jiǎn)單的計(jì)數(shù)器 |
| 需要靈活的鎖控制,如可中斷、超時(shí)、嘗試鎖定等 | 獨(dú)占式 | 高,支持多種鎖定方式 | 可配置公平性 | 細(xì)粒度鎖 | 支持 | 可中斷、超時(shí)、嘗試 | 同步代碼塊或方法、復(fù)雜同步控制 |
| 讀多寫(xiě)少的場(chǎng)景 | 共享-獨(dú)占式 | 高,提高讀操作并發(fā)性 | 不支持公平性 | 細(xì)粒度鎖 | 不支持 | 阻塞等待 | 緩存系統(tǒng)、文件系統(tǒng) |
| 讀多寫(xiě)多,需要樂(lè)觀讀和悲觀寫(xiě)的場(chǎng)景 | 樂(lè)觀讀-悲觀寫(xiě) | 高,提供讀寫(xiě)鎖的擴(kuò)展 | 可配置公平性 | 細(xì)粒度鎖 | 支持 | 可中斷、超時(shí)、嘗試 | 高性能計(jì)數(shù)器、數(shù)據(jù)緩存 |
| 需要等待一組操作完成的場(chǎng)景 | 無(wú) | 低,一次性 | 不支持公平性 | 粗粒度鎖 | 不支持 | 阻塞等待 | 任務(wù)協(xié)調(diào)、初始化操作 |
| 需要控制資源訪問(wèn)數(shù)量的場(chǎng)景 | 信號(hào)量 | 高,控制并發(fā)數(shù)量 | 不支持公平性 | 細(xì)粒度鎖 | 支持 | 阻塞等待 | 限流、資源池管理 |
| 需要周期性執(zhí)行一組操作的場(chǎng)景 | 無(wú) | 低,重用性 | 支持公平性 | 粗粒度鎖 | 支持 | 阻塞等待 | 并行計(jì)算、批處理 |
2、鎖詳細(xì)分析
2.7. CyclicBarrier
CyclicBarrier 是 Java 中用于線程間同步的一種工具,它允許一組線程互相等待,直到所有線程都到達(dá)一個(gè)公共屏障點(diǎn)。
圖片
圖解說(shuō)明:
- Java 線程:表示運(yùn)行中的線程,它們可能需要在某個(gè)點(diǎn)同步。
- CyclicBarrier 實(shí)例:是 CyclicBarrier 類的實(shí)例,用于協(xié)調(diào)一組線程在屏障點(diǎn)同步。
- 屏障:表示線程需要到達(dá)的同步點(diǎn),所有線程必須到達(dá)這個(gè)點(diǎn)才能繼續(xù)執(zhí)行。
- 共享資源或任務(wù):表示線程需要訪問(wèn)的共享資源或執(zhí)行的任務(wù),它們?cè)谄琳宵c(diǎn)同步后可以安全地執(zhí)行。
- 等待區(qū):表示等待其他線程到達(dá)屏障點(diǎn)的線程集合。
- 計(jì)數(shù)器: CyclicBarrier 內(nèi)部維護(hù)一個(gè)計(jì)數(shù)器,用于跟蹤尚未到達(dá)屏障點(diǎn)的線程數(shù)量。
- 屏障動(dòng)作(Runnable) :可選的,當(dāng)所有線程到達(dá)屏障點(diǎn)時(shí),可以執(zhí)行一個(gè)特定的動(dòng)作或任務(wù)。
綜合說(shuō)明:
- 作用: CyclicBarrier 是一種同步幫助工具,允許一組線程相互等待,直到所有線程都到達(dá)某個(gè)公共屏障點(diǎn)。
- 背景:在需要多個(gè)線程協(xié)作完成任務(wù)時(shí), CyclicBarrier 提供了一種機(jī)制,使得所有線程可以在屏障點(diǎn)同步,然后繼續(xù)執(zhí)行。
- 優(yōu)點(diǎn):
可重復(fù)使用:與 CountDownLatch 不同, CyclicBarrier 可以重復(fù)使用,適用于周期性的任務(wù)同步。
支持屏障動(dòng)作:可以設(shè)置一個(gè)在所有線程到達(dá)屏障點(diǎn)后執(zhí)行的回調(diào)。
- 缺點(diǎn):
可能導(dǎo)致死鎖:如果一個(gè)或多個(gè)線程未到達(dá)屏障點(diǎn),其他線程將一直等待。
復(fù)雜性:需要合理設(shè)計(jì)以避免線程永久等待。
場(chǎng)景:適用于需要周期性同步多個(gè)線程的場(chǎng)景。
業(yè)務(wù)舉例:在多階段數(shù)據(jù)處理流程中,每個(gè)階段需要所有數(shù)據(jù)都準(zhǔn)備好后才能開(kāi)始處理。使用 CyclicBarrier可以確保所有數(shù)據(jù)加載線程在每個(gè)階段開(kāi)始前都已準(zhǔn)備好。
使用方式:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Race {
private final CyclicBarrier barrier;
public Race(int numberOfRunners) {
barrier = new CyclicBarrier(numberOfRunners, () -> {
System.out.println("比賽開(kāi)始!");
// 這里可以放置所有參與者到達(dá)屏障后要執(zhí)行的操作
});
}
public void run() {
System.out.println("等待其他參賽者...");
try {
barrier.await(); // 等待其他線程
System.out.println("開(kāi)始跑步!");
// 跑步時(shí)間
Thread.sleep((long) (Math.random() * 10000));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
final int numberOfRunners = 5;
Race race = new Race(numberOfRunners);
// 創(chuàng)建參賽者線程
for (int i = 0; i < numberOfRunners; i++) {
final int runnerNumber = i + 1;
new Thread(() -> {
System.out.println("參賽者 " + runnerNumber + " 已準(zhǔn)備就緒");
race.run();
}).start();
}
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說(shuō)明: 在大數(shù)據(jù)處理系統(tǒng)中,經(jīng)常需要對(duì)大量數(shù)據(jù)進(jìn)行多階段處理,例如,數(shù)據(jù)清洗、轉(zhuǎn)換、聚合和加載。這些處理階段通常需要按順序執(zhí)行,且每個(gè)階段開(kāi)始前必須確保所有數(shù)據(jù)都已準(zhǔn)備好。
為什么需要 CyclicBarrier 技術(shù): 在多階段數(shù)據(jù)處理的場(chǎng)景中,不同的處理任務(wù)可能由不同的線程執(zhí)行,而這些線程的執(zhí)行時(shí)間可能不同。 CyclicBarrier 允許每個(gè)階段的處理在開(kāi)始前等待所有相關(guān)線程完成上一階段的任務(wù),確保數(shù)據(jù)的一致性和完整性。
沒(méi)有 CyclicBarrier 技術(shù)會(huì)帶來(lái)什么后果:
沒(méi)有使用 CyclicBarrier 或其他同步協(xié)調(diào)機(jī)制可能會(huì)導(dǎo)致以下問(wèn)題:
- 數(shù)據(jù)不一致:如果后續(xù)階段的處理在前一階段的數(shù)據(jù)未完全準(zhǔn)備好時(shí)開(kāi)始,可能會(huì)導(dǎo)致處理結(jié)果不準(zhǔn)確。
- 資源浪費(fèi):在等待數(shù)據(jù)準(zhǔn)備的過(guò)程中,系統(tǒng)資源可能被無(wú)效占用,導(dǎo)致資源利用效率低下。
- 錯(cuò)誤和異常:由于階段間的依賴關(guān)系沒(méi)有得到妥善處理,可能會(huì)引發(fā)程序錯(cuò)誤或運(yùn)行時(shí)異常。
代碼實(shí)現(xiàn):
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DataProcessingPipeline {
private final ExecutorService executor = Executors.newFixedThreadPool(4);
private final CyclicBarrier barrier;
private final int numberOfPhases;
private final int numberOfTasks;
public DataProcessingPipeline(int numberOfTasks, int numberOfPhases) {
this.numberOfTasks = numberOfTasks;
this.numberOfPhases = numberOfPhases;
this.barrier = new CyclicBarrier(numberOfTasks, () -> {
System.out.println("一個(gè)階段完成,準(zhǔn)備進(jìn)入下一階段");
});
}
public void processData() throws Exception {
for (int phase = 1; phase <= numberOfPhases; phase++) {
System.out.println("階段 " + phase + " 開(kāi)始");
for (int task = 0; task < numberOfTasks; task++) {
final int currentTask = task;
executor.submit(() -> {
try {
// 數(shù)據(jù)處理任務(wù)
System.out.println("任務(wù) " + currentTask + " 在階段 " + phase + " 執(zhí)行");
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
barrier.await(); // 等待所有任務(wù)完成
}
executor.shutdown();
}
public static void main(String[] args) {
DataProcessingPipeline pipeline = new DataProcessingPipeline(4, 3);
try {
pipeline.processData();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.8. Atomic Variables
原子變量是 Java 中 java.util.concurrent.atomic 包提供的一些類,它們利用底層硬件的原子性指令來(lái)保證操作的原子性,無(wú)需使用鎖。
圖片
圖解說(shuō)明:
- Java 線程:表示運(yùn)行中的線程,它們可能需要對(duì)共享資源進(jìn)行原子操作。
- Atomic Variables:表示原子變量的集合,包括 AtomicInteger、 AtomicLong、 AtomicReference 等。
- AtomicInteger、AtomicLong、AtomicReference:分別表示整型、長(zhǎng)整型和引用類型的原子變量。
- 硬件支持的原子指令:底層硬件提供的原子性指令,如 compare-and-swap (CAS)、load-linked、store-conditional 等。
- 共享資源:表示被多個(gè)線程共享的數(shù)據(jù),如計(jì)數(shù)器、累加器等。
- 內(nèi)存:表示 Java 程序使用的內(nèi)存空間,包括堆和棧等。
- 變量狀態(tài):表示原子變量在內(nèi)存中的當(dāng)前狀態(tài)。
綜合說(shuō)明:
- 作用:原子變量類(如 AtomicInteger, AtomicLong, AtomicReference 等)提供了一種機(jī)制,使得對(duì)變量的某些操作(如自增、自減、讀取和寫(xiě)入)是原子性的,無(wú)需使用傳統(tǒng)的鎖。
- 背景:在多線程環(huán)境中,對(duì)共享變量的并發(fā)訪問(wèn)需要同步措施以防止數(shù)據(jù)競(jìng)爭(zhēng)。原子變量利用底層硬件的原子指令來(lái)保證操作的原子性,從而簡(jiǎn)化了線程同步。
- 優(yōu)點(diǎn):
無(wú)鎖設(shè)計(jì):避免使用傳統(tǒng)鎖,減少了線程切換的開(kāi)銷。
性能優(yōu)化:對(duì)于高競(jìng)爭(zhēng)的簡(jiǎn)單變量訪問(wèn),原子變量通常比鎖有更好的性能。
- 缺點(diǎn):
功能限制:僅適用于簡(jiǎn)單的操作,復(fù)雜的操作無(wú)法通過(guò)原子變量實(shí)現(xiàn)。
可組合性問(wèn)題:復(fù)雜的原子操作需要仔細(xì)設(shè)計(jì),否則可能引入競(jìng)態(tài)條件。
場(chǎng)景:適用于對(duì)簡(jiǎn)單變量進(jìn)行原子操作的場(chǎng)景,如計(jì)數(shù)器、累加器等。
業(yè)務(wù)舉例:在電商平臺(tái)的庫(kù)存管理中, AtomicInteger 可以用來(lái)原子地更新商品的庫(kù)存數(shù)量,確保在高并發(fā)環(huán)境下庫(kù)存數(shù)據(jù)的一致性。
使用方式:
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class Counter {
// 使用 AtomicInteger 來(lái)確保計(jì)數(shù)器的線程安全
private final AtomicInteger count = new AtomicInteger(0);
// 提供一個(gè)方法來(lái)增加計(jì)數(shù)器的值
public void increment() {
// 原子地增加計(jì)數(shù)器的值
count.incrementAndGet();
}
// 提供一個(gè)方法來(lái)獲取當(dāng)前計(jì)數(shù)器的值
public int getCount() {
// 原子地獲取計(jì)數(shù)器的值
return count.get();
}
}
public class DataStore {
// 使用 AtomicLong 來(lái)統(tǒng)計(jì)數(shù)據(jù)總量
private final AtomicLong dataCount = new AtomicLong(0);
public void addData(long size) {
// 原子地將數(shù)據(jù)大小累加到總量
dataCount.addAndGet(size);
}
public long getDataCount() {
// 原子地獲取當(dāng)前數(shù)據(jù)總量
return dataCount.get();
}
}
// 測(cè)試類
public class AtomicVariablesDemo {
public static void main(String[] args) {
Counter counter = new Counter();
DataStore dataStore = new DataStore();
// 多線程環(huán)境中對(duì)計(jì)數(shù)器和數(shù)據(jù)總量的更新
for (int i = 0; i < 10; i++) {
new Thread(() -> {
counter.increment();
dataStore.addData(100); // 假設(shè)每次操作增加100單位數(shù)據(jù)
}).start();
}
// 等待所有線程完成
while (Thread.activeCount() > 1) {
Thread.yield();
}
// 輸出計(jì)數(shù)器的值和數(shù)據(jù)總量
System.out.println("Counter value: " + counter.getCount());
System.out.println("Data store size: " + dataStore.getDataCount());
}
}
業(yè)務(wù)代碼案例:
場(chǎng)景描述:社交網(wǎng)絡(luò)的實(shí)時(shí)消息計(jì)數(shù)
業(yè)務(wù)說(shuō)明: 社交網(wǎng)絡(luò)平臺(tái)需要顯示每個(gè)用戶的實(shí)時(shí)消息通知數(shù)。每當(dāng)用戶收到新消息時(shí),消息計(jì)數(shù)需要增加;用戶閱讀消息時(shí),計(jì)數(shù)可能會(huì)減少或被重置。此計(jì)數(shù)需要對(duì)所有用戶可見(jiàn),且在高并發(fā)環(huán)境下保持準(zhǔn)確。
為什么需要 AtomicVariables 技術(shù): 在社交網(wǎng)絡(luò)中,多個(gè)用戶可能同時(shí)發(fā)送消息給同一個(gè)接收者,或者一個(gè)用戶可能同時(shí)在多個(gè)設(shè)備上接收消息。這導(dǎo)致對(duì)消息計(jì)數(shù)的讀取和更新操作非常頻繁。使用 AtomicInteger 可以確保消息計(jì)數(shù)更新的原子性,并且在多線程環(huán)境下保持?jǐn)?shù)據(jù)的一致性。
沒(méi)有 AtomicVariables 技術(shù)會(huì)帶來(lái)什么后果:
沒(méi)有使用 AtomicVariables 或其他并發(fā)控制機(jī)制可能會(huì)導(dǎo)致以下問(wèn)題:
- 數(shù)據(jù)不一致:消息計(jì)數(shù)可能會(huì)出錯(cuò),導(dǎo)致用戶看到不正確的消息數(shù)量。
- 用戶體驗(yàn)下降:如果消息通知不準(zhǔn)確,用戶可能會(huì)錯(cuò)過(guò)重要通知,或者對(duì)應(yīng)用的可靠性產(chǎn)生懷疑。
- 系統(tǒng)復(fù)雜度增加:在沒(méi)有有效同步機(jī)制的情況下,維護(hù)數(shù)據(jù)一致性將變得復(fù)雜且容易出錯(cuò)。
代碼實(shí)現(xiàn):
import java.util.concurrent.atomic.AtomicInteger;
public class MessageNotificationCounter {
private final AtomicInteger messageCount = new AtomicInteger(0);
// 接收新消息時(shí)調(diào)用此方法
public void receiveMessage() {
// 原子地增加消息計(jì)數(shù)
messageCount.incrementAndGet();
System.out.println("New message received. Total messages: " + messageCount.get());
}
// 用戶閱讀消息時(shí)調(diào)用此方法
public void messagesRead() {
// 原子地減少消息計(jì)數(shù)
messageCount.decrementAndGet();
System.out.println("Messages read. Remaining messages: " + messageCount.get());
}
// 獲取當(dāng)前消息計(jì)數(shù)
public int getMessageCount() {
return messageCount.get();
}
}
// 測(cè)試類
public class AtomicVariablesDemo {
public static void main(String[] args) {
MessageNotificationCounter counter = new MessageNotificationCounter();
// 多個(gè)用戶同時(shí)發(fā)送消息
Thread sender1 = new Thread(() -> {
counter.receiveMessage();
});
Thread sender2 = new Thread(() -> {
counter.receiveMessage();
});
// 用戶閱讀消息
Thread reader = new Thread(() -> {
counter.messagesRead();
});
sender1.start();
sender2.start();
reader.start();
sender1.join();
sender2.join();
reader.join();
System.out.println("Final message count: " + counter.getMessageCount());
}
}
2.9. ConcurrentHashMap
ConcurrentHashMap 是 Java 中一個(gè)線程安全的哈希表,它通過(guò)分段鎖(Segmentation)和 CAS 操作來(lái)支持高并發(fā)的讀寫(xiě)操作。
圖解說(shuō)明:
- Java 線程:表示運(yùn)行中的線程,它們可能需要對(duì) ConcurrentHashMap 進(jìn)行讀寫(xiě)操作。
- ConcurrentHashMap 實(shí)例:是 ConcurrentHashMap 類的實(shí)例,用于存儲(chǔ)鍵值對(duì)并提供線程安全的訪問(wèn)。
- Segment 數(shù)組: ConcurrentHashMap 將哈希表分為多個(gè)段(Segment),每個(gè)段維護(hù)一部分哈希桶,通過(guò)分段鎖減少鎖的競(jìng)爭(zhēng)。
- Hash 桶:存儲(chǔ)哈希桶數(shù)組,每個(gè)桶可以包含一個(gè)或多個(gè)鍵值對(duì)。
- 鏈表或紅黑樹(shù):在哈希桶中,鍵值對(duì)最初以鏈表形式存儲(chǔ),當(dāng)鏈表長(zhǎng)度超過(guò)閾值時(shí),鏈表可能會(huì)被轉(zhuǎn)換為紅黑樹(shù)以提高搜索效率。
- 共享資源:表示存儲(chǔ)在 ConcurrentHashMap 中的鍵值對(duì)數(shù)據(jù)。
- 讀操作:線程可以并發(fā)地讀取 ConcurrentHashMap 中的數(shù)據(jù),在讀多寫(xiě)少的場(chǎng)景下,讀操作不會(huì)阻塞其他讀操作。
- 寫(xiě)操作:線程對(duì) ConcurrentHashMap 的寫(xiě)入操作,寫(xiě)操作需要獲取相應(yīng)段的鎖。
- 鎖:每個(gè)段擁有自己的鎖,寫(xiě)操作需要獲取鎖,而讀操作通常不需要。
升級(jí)設(shè)計(jì)說(shuō)明:
Java 1.7 ConcurrentHashMap 鎖機(jī)制
在 Java 1.7 中, ConcurrentHashMap 使用分段鎖機(jī)制,其中每個(gè)段相當(dāng)于一個(gè)小的哈希表,擁有自己的鎖。
Java 1.8 ConcurrentHashMap 鎖機(jī)制
在 Java 1.8 中, ConcurrentHashMap 摒棄了分段鎖機(jī)制,采用了 CAS 和 synchronized 來(lái)確保線程安全。
綜合說(shuō)明:
- 作用: ConcurrentHashMap 是 Java 中提供的一個(gè)線程安全的哈希表,它通過(guò)分段鎖的概念來(lái)允許并發(fā)的讀寫(xiě)操作,從而提高并發(fā)訪問(wèn)的性能。
- 背景:傳統(tǒng)的 HashMap 在多線程環(huán)境下需要外部同步,而 ConcurrentHashMap 通過(guò)鎖分離技術(shù)減少了鎖的競(jìng)爭(zhēng),提供了更好的并發(fā)性能。
- 優(yōu)點(diǎn):
- 高并發(fā):通過(guò)細(xì)分鎖到段,允許多個(gè)線程同時(shí)操作不同段的數(shù)據(jù)。
- 動(dòng)態(tài)擴(kuò)容:內(nèi)部采用動(dòng)態(tài)數(shù)組和鏈表結(jié)構(gòu),提高了空間和時(shí)間效率。
- 缺點(diǎn):
復(fù)雜度高:實(shí)現(xiàn)復(fù)雜,需要維護(hù)多個(gè)鎖和復(fù)雜的數(shù)據(jù)結(jié)構(gòu)。
性能調(diào)優(yōu):在極端高并發(fā)場(chǎng)景下,可能需要調(diào)整默認(rèn)的并發(fā)級(jí)別。
場(chǎng)景:適用于需要高并發(fā)訪問(wèn)的緩存或數(shù)據(jù)存儲(chǔ)。
業(yè)務(wù)舉例:在大數(shù)據(jù)處理系統(tǒng)中, ConcurrentHashMap 可以用來(lái)存儲(chǔ)實(shí)時(shí)計(jì)算結(jié)果,支持大量并發(fā)的讀寫(xiě)操作而不會(huì)導(dǎo)致性能瓶頸。
使用方式:
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
public class ConcurrentHashMapDemo {
// 創(chuàng)建一個(gè) ConcurrentHashMap 實(shí)例
private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 將一個(gè)鍵值對(duì)插入到 Map 中
public void put(String key, Integer value) {
// put 方法是線程安全的
map.put(key, value);
}
// 從 Map 中獲取與指定鍵關(guān)聯(lián)的值
public Integer get(String key) {
// get 方法是線程安全的
return map.get(key);
}
// 計(jì)算 Map 中的元素?cái)?shù)量
public int size() {
// size 方法是線程安全的
return map.size();
}
// 演示刪除操作
public void remove(String key) {
// remove 方法是線程安全的
map.remove(key);
}
// 演示如何批量添加數(shù)據(jù)
public void addAll(Map<String, Integer> newData) {
// putAll 方法是線程安全的
map.putAll(newData);
}
public static void main(String[] args) {
ConcurrentHashMapDemo demo = new ConcurrentHashMapDemo();
// 批量添加數(shù)據(jù)
demo.addAll(Map.of("key1", 1, "key2", 2, "key3", 3));
// 單獨(dú)添加一條數(shù)據(jù)
demo.put("key4", 4);
// 獲取并打印一條數(shù)據(jù)
System.out.println("Value for 'key1': " + demo.get("key1"));
// 獲取 Map 的大小
System.out.println("Map size: " + demo.size());
// 刪除一條數(shù)據(jù)
demo.remove("key2");
// 再次獲取 Map 的大小
System.out.println("Map size after removal: " + demo.size());
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說(shuō)明: 在分布式緩存系統(tǒng)中,經(jīng)常需要存儲(chǔ)和檢索用戶會(huì)話信息、應(yīng)用配置、熱點(diǎn)數(shù)據(jù)等。這些數(shù)據(jù)需要被多個(gè)應(yīng)用實(shí)例共享,并且要求在高并發(fā)環(huán)境下依然保持高性能。緩存數(shù)據(jù)通常有過(guò)期時(shí)間,需要定期清理。
為什么需要 ConcurrentHashMap 技術(shù): ConcurrentHashMap 提供了一種高效的方式來(lái)處理并發(fā)的讀取和更新操作,并且它的分段鎖機(jī)制允許多個(gè)線程同時(shí)對(duì)不同段進(jìn)行操作,從而提高并發(fā)處理能力。此外, ConcurrentHashMap 在 Java 8 中引入的紅黑樹(shù)結(jié)構(gòu)使得即使在高并發(fā)更新導(dǎo)致哈希沖突時(shí),也能保持高效的性能。
沒(méi)有 ConcurrentHashMap 技術(shù)會(huì)帶來(lái)什么后果:
沒(méi)有使用 ConcurrentHashMap 可能會(huì)導(dǎo)致以下問(wèn)題:
- 性能瓶頸:在高并發(fā)環(huán)境下,如果使用 HashMap 加 synchronized,可能導(dǎo)致嚴(yán)重的性能瓶頸,因?yàn)樗芯€程必須等待一個(gè)鎖。
- 數(shù)據(jù)不一致:在沒(méi)有適當(dāng)同步的情況下,多個(gè)線程同時(shí)更新數(shù)據(jù)可能導(dǎo)致緩存數(shù)據(jù)不一致。
- 擴(kuò)展性差:隨著系統(tǒng)負(fù)載的增加,基于 HashMap 的緩存解決方案可能難以擴(kuò)展,因?yàn)殒i競(jìng)爭(zhēng)和線程安全問(wèn)題。
代碼實(shí)現(xiàn):
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class DistributedCache<K, V> {
private final ConcurrentHashMap<K, V> cacheMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<K, Long> expirationMap = new ConcurrentHashMap<>();
public void put(K key, V value, long ttl) {
cacheMap.put(key, value);
expirationMap.put(key, System.currentTimeMillis() + ttl);
scheduleEviction(key, ttl);
}
public V get(K key) {
Long expirationTime = expirationMap.get(key);
if (expirationTime == null || expirationTime < System.currentTimeMillis()) {
cacheMap.remove(key);
expirationMap.remove(key);
return null;
}
return cacheMap.get(key);
}
private void scheduleEviction(final K key, final long ttl) {
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(ttl);
cacheMap.computeIfPresent(key, (k, v) -> null);
expirationMap.remove(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
public static void main(String[] args) {
DistributedCache<String, String> cache = new DistributedCache<>();
cache.put("userSession", "sessionData", 5000); // 緩存設(shè)置5秒過(guò)期
// 多個(gè)線程并發(fā)訪問(wèn)緩存
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
String result = cache.get("userSession");
System.out.println("Thread " + finalI + " retrieved: " + result);
}).start();
}
}
}
2.10.ConcurrentSkipListMap
ConcurrentSkipListMap 是 Java 中實(shí)現(xiàn)的一個(gè)高性能并發(fā)的有序映射,它使用跳表(Skip List)作為其底層數(shù)據(jù)結(jié)構(gòu)。
圖片
圖解說(shuō)明:
- Java 線程:表示運(yùn)行中的線程,它們可能需要對(duì) ConcurrentSkipListMap 進(jìn)行讀寫(xiě)操作。
- ConcurrentSkipListMap 實(shí)例:是 ConcurrentSkipListMap 類的實(shí)例,用于存儲(chǔ)鍵值對(duì)并提供線程安全的訪問(wèn)。
- Skip List 層級(jí)結(jié)構(gòu):跳表由多層索引構(gòu)成,每一層都是一個(gè)有序的鏈表。
- 索引層:跳表中的索引層,用于加速搜索操作。
- 數(shù)據(jù)層:跳表中的底層數(shù)據(jù)結(jié)構(gòu),存儲(chǔ)實(shí)際的鍵值對(duì)。
- Node 節(jié)點(diǎn):跳表中的節(jié)點(diǎn),包含鍵值對(duì)和指向其他節(jié)點(diǎn)的鏈接。
- 共享資源:表示存儲(chǔ)在 ConcurrentSkipListMap 中的鍵值對(duì)數(shù)據(jù)。
- 讀操作:線程可以并發(fā)地讀取 ConcurrentSkipListMap 中的數(shù)據(jù)。
- 寫(xiě)操作:線程可以并發(fā)地修改 ConcurrentSkipListMap 中的數(shù)據(jù)。
- CAS 操作:在更新節(jié)點(diǎn)鏈接或修改數(shù)據(jù)時(shí),使用 CAS 操作來(lái)保證線程安全。
- 自旋鎖/同步塊:在某些情況下,如果 CAS 操作失敗,可能會(huì)使用自旋鎖或同步塊來(lái)確保操作的原子性。
操作流程:
- 讀操作:
線程通過(guò)索引層快速定位到數(shù)據(jù)層的節(jié)點(diǎn)。
線程使用 volatile 讀取節(jié)點(diǎn)的值,確保內(nèi)存可見(jiàn)性。
- 寫(xiě)操作:
線程在更新或添加節(jié)點(diǎn)時(shí),首先嘗試使用 CAS 操作。
如果 CAS 操作失敗,線程可能會(huì)使用自旋鎖或同步塊來(lái)確保原子性。
綜合說(shuō)明:
作用: ConcurrentSkipListMap 是一種線程安全的有序映射,它通過(guò)使用跳表(Skip List)數(shù)據(jù)結(jié)構(gòu)來(lái)支持高效的并發(fā)訪問(wèn)和排序操作。 背景:在需要高效并發(fā)訪問(wèn)和保持元素有序的場(chǎng)景中,傳統(tǒng)的 TreeMap 由于其加鎖策略在高并發(fā)環(huán)境下性能受限, ConcurrentSkipListMap 提供了一種替代方案。 優(yōu)點(diǎn):
- 高性能并發(fā)訪問(wèn):通過(guò)跳表結(jié)構(gòu)和細(xì)粒度鎖定,實(shí)現(xiàn)了高效的并發(fā)讀取和更新。
- 有序性:保持元素的有序性,支持范圍查詢等操作。
- 動(dòng)態(tài)調(diào)整:可以根據(jù)訪問(wèn)模式動(dòng)態(tài)調(diào)整結(jié)構(gòu),優(yōu)化性能。 缺點(diǎn):
- 內(nèi)存占用:相比無(wú)序的 ConcurrentHashMap,由于維護(hù)了有序性,內(nèi)存占用可能更高。
- 復(fù)雜性:實(shí)現(xiàn)相對(duì)復(fù)雜,涉及多級(jí)索引和節(jié)點(diǎn)的管理。 場(chǎng)景:適用于需要有序數(shù)據(jù)且高并發(fā)訪問(wèn)的場(chǎng)景,如實(shí)時(shí)數(shù)據(jù)索引、范圍查詢等。 業(yè)務(wù)舉例:在一個(gè)金融市場(chǎng)分析系統(tǒng)中,需要維護(hù)一個(gè)實(shí)時(shí)更新的價(jià)格索引, ConcurrentSkipListMap 可以用來(lái)存儲(chǔ)和快速檢索各種金融工具的當(dāng)前價(jià)格。
使用方式:
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
public class ConcurrentSkipListMapDemo {
// 創(chuàng)建一個(gè) ConcurrentSkipListMap 實(shí)例
private final ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();
// 將一個(gè)鍵值對(duì)插入到 Map 中
public void put(Integer key, String value) {
// put 方法是線程安全的
map.put(key, value);
}
// 從 Map 中獲取與指定鍵關(guān)聯(lián)的值
public String get(Integer key) {
// get 方法是線程安全的
return map.get(key);
}
// 獲取 Map 的鍵集合
public java.util.NavigableSet<Integer> keySet() {
// keySet 方法返回 Map 的鍵集合視圖
return map.keySet();
}
// 獲取 Map 的值集合
public java.util.Collection<String> values() {
// values 方法返回 Map 的值集合視圖
return map.values();
}
// 獲取 Map 的大小
public int size() {
// size 方法是線程安全的
return map.size();
}
// 演示刪除操作
public void remove(Integer key) {
// remove 方法是線程安全的
map.remove(key);
}
public static void main(String[] args) {
ConcurrentSkipListMapDemo demo = new ConcurrentSkipListMapDemo();
// 插入一些數(shù)據(jù)
demo.put(1, "One");
demo.put(2, "Two");
demo.put(3, "Three");
// 獲取并打印一條數(shù)據(jù)
System.out.println("Value for key 2: " + demo.get(2));
// 獲取 Map 的大小
System.out.println("Map size: " + demo.size());
// 獲取并打印所有鍵
System.out.println("All keys: " + demo.keySet());
// 刪除一條數(shù)據(jù)
demo.remove(2);
// 再次獲取 Map 的大小
System.out.println("Map size after removal: " + demo.size());
// 獲取并打印所有值
System.out.println("All values: " + demo.values());
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說(shuō)明: 實(shí)時(shí)股票交易系統(tǒng)需要維護(hù)一個(gè)動(dòng)態(tài)變化的股票價(jià)格索引,該索引需要根據(jù)實(shí)時(shí)的市場(chǎng)數(shù)據(jù)進(jìn)行更新,并且允許多個(gè)交易線程并發(fā)地讀取和更新股票價(jià)格。此外,系統(tǒng)還需要定期根據(jù)價(jià)格波動(dòng)進(jìn)行調(diào)整,如計(jì)算價(jià)格的平均值、執(zhí)行價(jià)格范圍查詢等。
為什么需要 ConcurrentSkipListMap 技術(shù): ConcurrentSkipListMap 是一個(gè)線程安全的有序映射,它允許高效的范圍查詢和有序訪問(wèn),這對(duì)于股票價(jià)格索引來(lái)說(shuō)至關(guān)重要。由于股票價(jià)格會(huì)頻繁更新,且需要快速響應(yīng)市場(chǎng)變化,使用 ConcurrentSkipListMap 可以提供高效的插入、刪除和查找操作,同時(shí)保持?jǐn)?shù)據(jù)的有序性。
沒(méi)有 ConcurrentSkipListMap 技術(shù)會(huì)帶來(lái)什么后果:
沒(méi)有使用 ConcurrentSkipListMap 或其他適合有序并發(fā)操作的數(shù)據(jù)結(jié)構(gòu)可能會(huì)導(dǎo)致以下問(wèn)題:
- 性能瓶頸:如果使用 HashMap 或 ConcurrentHashMap,雖然可以實(shí)現(xiàn)并發(fā)更新,但無(wú)法高效執(zhí)行有序操作和范圍查詢,可能導(dǎo)致查詢性能不佳。
- 數(shù)據(jù)不一致:在高并發(fā)更新的情況下,如果沒(méi)有適當(dāng)?shù)耐綑C(jī)制,可能會(huì)導(dǎo)致價(jià)格信息的不一致。
- 復(fù)雜性增加:如果使用 synchronized 列表或數(shù)組來(lái)維護(hù)價(jià)格索引,可能需要手動(dòng)管理復(fù)雜的同步和排序邏輯,增加系統(tǒng)復(fù)雜性和出錯(cuò)的風(fēng)險(xiǎn)。
代碼實(shí)現(xiàn):
import java.util.concurrent.ConcurrentSkipListMap;
public class StockPriceIndex {
private final ConcurrentSkipListMap<String, Double> priceIndex = new ConcurrentSkipListMap<>();
public void updatePrice(String stockSymbol, Double newPrice) {
// 更新股票價(jià)格
priceIndex.put(stockSymbol, newPrice);
}
public Double getPrice(String stockSymbol) {
// 獲取股票價(jià)格
return priceIndex.get(stockSymbol);
}
public void removeStock(String stockSymbol) {
// 移除股票信息
priceIndex.remove(stockSymbol);
}
public ConcurrentSkipListMap<String, Double> headMap(String toKey) {
// 獲取指定范圍內(nèi)的股票價(jià)格索引
return priceIndex.headMap(toKey);
}
public static void main(String[] args) {
StockPriceIndex index = new StockPriceIndex();
index.updatePrice("AAPL", 150.00);
index.updatePrice("GOOGL", 2750.50);
index.updatePrice("MSFT", 250.00);
System.out.println("Price of AAPL: " + index.getPrice("AAPL"));
System.out.println("Price of GOOGL: " + index.getPrice("GOOGL"));
// 獲取所有小于 "MSFT" 的股票價(jià)格索引
ConcurrentSkipListMap<String, Double> subMap = index.headMap("MSFT");
subMap.forEach((k, v) -> System.out.println("Stock: " + k + ", Price: " + v));
}
}
2.11. ConcurrentLinkedQueue
ConcurrentLinkedQueue 是 Java 中一個(gè)線程安全的無(wú)鎖隊(duì)列,它使用 CAS (Compare-And-Swap) 操作來(lái)保證線程安全。
圖片
圖解說(shuō)明:
- Java 線程:表示運(yùn)行中的線程,它們可能需要對(duì) ConcurrentLinkedQueue 進(jìn)行入隊(duì)或出隊(duì)操作。
- ConcurrentLinkedQueue 實(shí)例:是 ConcurrentLinkedQueue 類的實(shí)例,用于存儲(chǔ)隊(duì)列中的元素并提供線程安全的訪問(wèn)。
- Node 節(jié)點(diǎn)結(jié)構(gòu): ConcurrentLinkedQueue 使用內(nèi)部的 Node 類來(lái)存儲(chǔ)隊(duì)列中的每個(gè)元素。每個(gè)節(jié)點(diǎn)包含隊(duì)列中的一個(gè)元素和指向下一個(gè)節(jié)點(diǎn)的鏈接。
- 虛擬頭節(jié)點(diǎn):隊(duì)列使用一個(gè)虛擬頭節(jié)點(diǎn)來(lái)簡(jiǎn)化出隊(duì)操作。虛擬頭節(jié)點(diǎn)不存儲(chǔ)實(shí)際的隊(duì)列元素。
- 虛擬尾節(jié)點(diǎn):隊(duì)列使用一個(gè)虛擬尾節(jié)點(diǎn)來(lái)簡(jiǎn)化入隊(duì)操作。虛擬尾節(jié)點(diǎn)指向隊(duì)列中的最后一個(gè)節(jié)點(diǎn)。
- 隊(duì)列元素:表示存儲(chǔ)在隊(duì)列中的實(shí)際數(shù)據(jù)。
- 入隊(duì)操作:線程將新元素添加到隊(duì)列尾部的過(guò)程,通過(guò) CAS 更新虛擬尾節(jié)點(diǎn)的鏈接。
- 出隊(duì)操作:線程從隊(duì)列頭部移除元素的過(guò)程,通過(guò) CAS 更新虛擬頭節(jié)點(diǎn)的鏈接。
- CAS 操作: ConcurrentLinkedQueue 使用 CAS 操作來(lái)更新節(jié)點(diǎn)之間的鏈接,從而實(shí)現(xiàn)無(wú)鎖的線程安全隊(duì)列。
- 自旋等待:在 CAS 操作失敗時(shí),線程可能會(huì)自旋等待直到操作成功。
操作流程:
- 入隊(duì)操作:線程通過(guò) CAS 操作將新節(jié)點(diǎn)插入到隊(duì)列尾部,并更新尾節(jié)點(diǎn)指針。
- 出隊(duì)操作:線程通過(guò) CAS 操作移除隊(duì)列頭部的節(jié)點(diǎn),并更新頭節(jié)點(diǎn)指針。
- CAS 操作:在入隊(duì)和出隊(duì)過(guò)程中,線程使用 CAS 來(lái)保證節(jié)點(diǎn)鏈接的原子性更新。
綜合說(shuō)明:
作用: ConcurrentLinkedQueue 是一種基于鏈接節(jié)點(diǎn)的無(wú)界線程安全隊(duì)列,支持高并發(fā)的入隊(duì)和出隊(duì)操作。 背景:在多線程環(huán)境中,需要一種高效的隊(duì)列來(lái)處理任務(wù)或消息傳遞, ConcurrentLinkedQueue 提供了一種無(wú)鎖的解決方案。 優(yōu)點(diǎn):
- 無(wú)鎖設(shè)計(jì):利用 CAS 操作實(shí)現(xiàn)無(wú)鎖的線程安全隊(duì)列,提高了并發(fā)性能。
- 簡(jiǎn)單高效:提供了簡(jiǎn)單的入隊(duì)和出隊(duì)操作,適合作為任務(wù)隊(duì)列或消息傳遞隊(duì)列。
- 無(wú)界隊(duì)列:理論上隊(duì)列大小無(wú)界,適用于處理大量任務(wù)。 缺點(diǎn):
- 可能的內(nèi)存消耗:由于是無(wú)界隊(duì)列,在極端情況下可能會(huì)消耗大量?jī)?nèi)存。
- 性能限制:在某些高競(jìng)爭(zhēng)場(chǎng)景下,CAS 操作可能導(dǎo)致性能瓶頸。 場(chǎng)景:適用于作為任務(wù)隊(duì)列或消息傳遞隊(duì)列,支持高并發(fā)的入隊(duì)和出隊(duì)操作。 業(yè)務(wù)舉例:在一個(gè)分布式計(jì)算系統(tǒng)中, ConcurrentLinkedQueue 可以用于收集各個(gè)計(jì)算節(jié)點(diǎn)的輸出結(jié)果,然后由一個(gè)或多個(gè)消費(fèi)者線程進(jìn)行處理。
這兩個(gè)并發(fā)集合類在 Java 中提供了強(qiáng)大的工具,以支持復(fù)雜的并發(fā)數(shù)據(jù)處理需求,它們各自適用于不同的應(yīng)用場(chǎng)景,可以根據(jù)具體需求選擇合適的并發(fā)集合。kedQueue
使用方式:
import java.util.concurrent.ConcurrentLinkedQueue;
public class ConcurrentLinkedQueueDemo {
// 創(chuàng)建一個(gè) ConcurrentLinkedQueue 實(shí)例
private final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 向隊(duì)列中添加元素
public void add(int number) {
// add 方法是線程安全的
queue.add(number);
System.out.println("Added " + number);
}
// 從隊(duì)列中獲取并移除元素
public Integer poll() {
// poll 方法是線程安全的,返回并移除隊(duì)列頭部的元素
Integer result = queue.poll();
if (result != null) {
System.out.println("Polled " + result);
} else {
System.out.println("Queue is empty");
}
return result;
}
// 查看隊(duì)列頭部的元素但不移除
public Integer peek() {
// peek 方法是線程安全的,返回隊(duì)列頭部的元素但不移除
Integer result = queue.peek();
if (result != null) {
System.out.println("Peeked " + result);
} else {
System.out.println("Queue is empty");
}
return result;
}
// 獲取隊(duì)列的大小
public int size() {
// size 方法估算隊(duì)列的大小
int size = queue.size();
System.out.println("Queue size: " + size);
return size;
}
public static void main(String[] args) {
ConcurrentLinkedQueueDemo demo = new ConcurrentLinkedQueueDemo();
// 啟動(dòng)生產(chǎn)者線程
Thread producerThread = new Thread(() -> {
demo.add(1);
demo.add(2);
demo.add(3);
});
// 啟動(dòng)消費(fèi)者線程
Thread consumerThread = new Thread(() -> {
demo.poll();
demo.poll();
demo.poll();
demo.poll(); // 這次調(diào)用應(yīng)該會(huì)返回 null,因?yàn)殛?duì)列已空
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
// 在所有線程完成后獲取隊(duì)列大小
demo.size();
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說(shuō)明: 大規(guī)模日志處理系統(tǒng)需要從多個(gè)源實(shí)時(shí)收集、存儲(chǔ)并分析日志數(shù)據(jù)。這些日志數(shù)據(jù)通常由分布在不同服務(wù)器上的應(yīng)用程序生成,并且需要被快速地處理以避免數(shù)據(jù)丟失或延遲問(wèn)題。
為什么需要 ConcurrentLinkedQueue 技術(shù): 在日志處理場(chǎng)景中,日志數(shù)據(jù)的產(chǎn)生速度往往非常快,且來(lái)源眾多,因此需要一個(gè)高效且線程安全的隊(duì)列來(lái)緩存這些日志數(shù)據(jù)。 ConcurrentLinkedQueue 提供了高吞吐量和低延遲的并發(fā)訪問(wèn),無(wú)需使用鎖,使得它特別適合用作日志數(shù)據(jù)的緩沖區(qū)。此外,由于 ConcurrentLinkedQueue 是無(wú)界的,因此不會(huì)阻塞生產(chǎn)者線程,即使在高負(fù)載情況下也能保持高性能。
沒(méi)有 ConcurrentLinkedQueue 技術(shù)會(huì)帶來(lái)什么后果: 沒(méi)有使用 ConcurrentLinkedQueue 或其他高效的并發(fā)隊(duì)列可能會(huì)導(dǎo)致以下問(wèn)題:
- 數(shù)據(jù)丟失:如果使用有界隊(duì)列且沒(méi)有適當(dāng)?shù)纳a(chǎn)者速率控制,可能會(huì)因?yàn)殛?duì)列滿導(dǎo)致日志數(shù)據(jù)丟失。
- 性能瓶頸:如果使用鎖或其他同步機(jī)制來(lái)保護(hù)共享隊(duì)列,可能會(huì)導(dǎo)致性能瓶頸,尤其是在高并發(fā)場(chǎng)景下。
- 系統(tǒng)不穩(wěn)定:在高負(fù)載情況下,如果隊(duì)列處理速度跟不上數(shù)據(jù)產(chǎn)生速度,可能會(huì)導(dǎo)致系統(tǒng)崩潰或重啟。
代碼實(shí)現(xiàn):
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class LogProcessor {
private final ConcurrentLinkedQueue<String> logQueue = new ConcurrentLinkedQueue<>();
private final ExecutorService processorPool = Executors.newFixedThreadPool(10);
public void log(String message) {
// 生產(chǎn)者線程調(diào)用此方法來(lái)添加日志到隊(duì)列
logQueue.add(message);
}
public void startLogProcessing() {
// 消費(fèi)者線程池,用于處理隊(duì)列中的日志
processorPool.submit(() -> {
while (true) {
try {
// 消費(fèi)者線程調(diào)用此方法來(lái)處理隊(duì)列中的日志
String logEntry = logQueue.poll();
if (logEntry != null) {
processLog(logEntry);
} else {
TimeUnit.MILLISECONDS.sleep(100); // 避免 CPU 過(guò)載
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
private void processLog(String logEntry) {
// 實(shí)際處理日志的邏輯
System.out.println("Processing log: " + logEntry);
}
public static void main(String[] args) {
LogProcessor logProcessor = new LogProcessor();
logProcessor.startLogProcessing();
// 多個(gè)生產(chǎn)者線程生成日志
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
logProcessor.log("Log entry " + finalI);
}).start();
}
}
}
2.12. BlockingQueue
BlockingQueue 是 Java 中用于線程間通信的隊(duì)列,支持阻塞操作,當(dāng)隊(duì)列為空時(shí),獲取元素的操作會(huì)阻塞;當(dāng)隊(duì)列滿時(shí),插入元素的操作會(huì)阻塞。
圖片
圖解說(shuō)明:
- Java 線程:表示運(yùn)行中的線程,它們可能需要向隊(duì)列中添加或移除元素。
- BlockingQueue 實(shí)例:是 BlockingQueue 接口的具體實(shí)現(xiàn),如 ArrayBlockingQueue、 LinkedBlockingQueue 等,用于線程間通信。
- 內(nèi)部數(shù)據(jù)結(jié)構(gòu):表示 BlockingQueue 內(nèi)部用于存儲(chǔ)元素的數(shù)據(jù)結(jié)構(gòu),如數(shù)組、鏈表等。
- 隊(duì)列容量:表示 BlockingQueue 的最大容量,如果隊(duì)列有界,則插入操作在隊(duì)列滿時(shí)會(huì)阻塞。
- 等待區(qū)(元素) :表示當(dāng)隊(duì)列為空時(shí),等待獲取元素的線程集合。
- 等待區(qū)(空間) :表示當(dāng)隊(duì)列滿時(shí),等待空間釋放的線程集合。
- 元素添加操作:表示向 BlockingQueue 中添加元素的操作,如果隊(duì)列滿,則操作會(huì)阻塞。
- 元素移除操作:表示從 BlockingQueue 中移除元素的操作,如果隊(duì)列為空,則操作會(huì)阻塞。
綜合說(shuō)明:
- 作用: BlockingQueue 是一個(gè)線程安全的隊(duì)列,支持阻塞操作,當(dāng)隊(duì)列為空時(shí),獲取元素的操作會(huì)阻塞;當(dāng)隊(duì)列滿時(shí),插入元素的操作會(huì)阻塞。
- 背景:在生產(chǎn)者-消費(fèi)者模型中,需要一種機(jī)制來(lái)協(xié)調(diào)生產(chǎn)者和消費(fèi)者之間的操作, BlockingQueue 提供了這種協(xié)調(diào)。
- 優(yōu)點(diǎn):
- 線程協(xié)調(diào):自然地實(shí)現(xiàn)了生產(chǎn)者-消費(fèi)者之間的線程協(xié)調(diào)。
- 阻塞操作:提供了阻塞獲取和阻塞插入的方法,簡(jiǎn)化了并發(fā)編程。
- 缺點(diǎn):
可能的死鎖:不當(dāng)使用可能導(dǎo)致死鎖,例如一個(gè)線程永久阻塞等待一個(gè)不會(huì)到來(lái)的元素。
性能考慮:在高并發(fā)環(huán)境下,隊(duì)列的容量和鎖策略需要仔細(xì)調(diào)優(yōu)。
場(chǎng)景:適用于生產(chǎn)者-消費(fèi)者場(chǎng)景,如任務(wù)分配、資源池管理等。
業(yè)務(wù)舉例:在消息處理系統(tǒng)中, BlockingQueue 可以用于緩存待處理的消息,生產(chǎn)者線程生成消息并放入隊(duì)列,消費(fèi)者線程從隊(duì)列中取出并處理消息,確保了消息的順序性和系統(tǒng)的響應(yīng)性。
使用方式:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueDemo {
// 創(chuàng)建一個(gè) LinkedBlockingQueue 實(shí)例,容量限制為10
private final BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(10);
// 向 BlockingQueue 中添加元素
public void produce(Integer element) throws InterruptedException {
// put 方法在隊(duì)列滿時(shí)阻塞,直到隊(duì)列中有空間
blockingQueue.put(element);
System.out.println("Produced: " + element);
}
// 從 BlockingQueue 中獲取元素
public Integer consume() throws InterruptedException {
// take 方法在隊(duì)列空時(shí)阻塞,直到隊(duì)列中有元素
Integer element = blockingQueue.take();
System.out.println("Consumed: " + element);
return element;
}
// 獲取 BlockingQueue 的大小
public int size() {
// size 方法返回隊(duì)列當(dāng)前的元素?cái)?shù)量
return blockingQueue.size();
}
public static void main(String[] args) throws InterruptedException {
BlockingQueueDemo demo = new BlockingQueueDemo();
// 創(chuàng)建生產(chǎn)者線程
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 15; i++) {
demo.produce(i);
Thread.sleep(100); // 生產(chǎn)延時(shí)
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 創(chuàng)建消費(fèi)者線程
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 15; i++) {
int element = demo.consume();
Thread.sleep(150); // 消費(fèi)延時(shí)
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
// 打印最終隊(duì)列的大小
System.out.println("Final queue size: " + demo.size());
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說(shuō)明: 消息隊(duì)列系統(tǒng)在微服務(wù)架構(gòu)中用于異步處理任務(wù),例如發(fā)送郵件、短信通知等。這些服務(wù)通常由獨(dú)立的服務(wù)實(shí)例處理,以提高系統(tǒng)的響應(yīng)性和可擴(kuò)展性。消息隊(duì)列需要能夠處理高并發(fā)的消息生產(chǎn)和消費(fèi),確保消息的可靠傳遞。
為什么需要 BlockingQueue 技術(shù): BlockingQueue 提供了一種有效的機(jī)制來(lái)處理生產(chǎn)者-消費(fèi)者場(chǎng)景,特別是在面對(duì)高并發(fā)和需要線程安全時(shí)。它能夠使生產(chǎn)者在隊(duì)列滿時(shí)阻塞,消費(fèi)者在隊(duì)列空時(shí)阻塞,從而平衡生產(chǎn)和消費(fèi)的速度,確保系統(tǒng)的穩(wěn)定性和消息的不丟失。
沒(méi)有 BlockingQueue 技術(shù)會(huì)帶來(lái)什么后果:
沒(méi)有使用 BlockingQueue 或其他并發(fā)隊(duì)列可能會(huì)導(dǎo)致以下問(wèn)題:
- 消息丟失:在高并發(fā)情況下,如果沒(méi)有適當(dāng)?shù)臋C(jī)制來(lái)控制消息的產(chǎn)生和消費(fèi),可能會(huì)導(dǎo)致消息丟失。
- 系統(tǒng)過(guò)載:如果沒(méi)有流控機(jī)制,生產(chǎn)者可能會(huì)過(guò)快地生成消息,導(dǎo)致系統(tǒng)資源耗盡,甚至崩潰。
- 數(shù)據(jù)不一致:在多線程環(huán)境下,如果不正確地管理消息的訪問(wèn),可能會(huì)導(dǎo)致數(shù)據(jù)處理的不一致性。
代碼實(shí)現(xiàn):
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class MessageQueueSystem {
private final BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
public void produceMessage(String content) throws InterruptedException {
// 將消息添加到隊(duì)列中,如果隊(duì)列滿了,生產(chǎn)者線程將被阻塞
messageQueue.put(new Message(content));
System.out.println("Message produced: " + content);
}
public Message consumeMessage() throws InterruptedException {
// 從隊(duì)列中取出消息,如果隊(duì)列空了,消費(fèi)者線程將被阻塞
Message message = messageQueue.take();
System.out.println("Message consumed: " + message.getContent());
return message;
}
public static void main(String[] args) throws InterruptedException {
MessageQueueSystem messageQueueSystem = new MessageQueueSystem();
// 創(chuàng)建生產(chǎn)者線程
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
messageQueueSystem.produceMessage("Message " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 創(chuàng)建消費(fèi)者線程
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
messageQueueSystem.consumeMessage();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
class Message {
private final String content;
public Message(String content) {
this.content = content;
}
public String getContent() {
return content;
}
}
2.13. Condition
Condition 是 Java 中 java.util.concurrent.locks 包提供的一個(gè)接口,它用于實(shí)現(xiàn)等待/通知機(jī)制。 Condition 通常與 Lock 接口配合使用,允許一個(gè)或多個(gè)線程在某些條件滿足之前掛起,并在條件滿足時(shí)被喚醒。
圖片
圖解說(shuō)明:
- Java 線程:表示運(yùn)行中的線程,它們可能需要在某些條件滿足之前掛起。
- Lock 實(shí)例:是 Lock 接口的具體實(shí)現(xiàn),如 ReentrantLock,用于控制對(duì)共享資源的訪問(wèn)。
- Condition 實(shí)例:是 Condition 接口的具體實(shí)現(xiàn),與 Lock 實(shí)例配合使用,用于線程間的等待/通知機(jī)制。
- 等待隊(duì)列(線程) :當(dāng)線程調(diào)用 Condition 的 await() 方法時(shí),如果條件不滿足,線程會(huì)被放入等待隊(duì)列。
- 共享資源:表示被多個(gè)線程共享的數(shù)據(jù),需要通過(guò) Lock 和 Condition 來(lái)保護(hù)以確保線程安全。
- 條件檢查:表示線程在嘗試獲取資源之前需要檢查的條件。
- 喚醒信號(hào):當(dāng)條件滿足時(shí),其他線程會(huì)發(fā)送喚醒信號(hào)給等待隊(duì)列中的線程。
- 鎖狀態(tài):表示鎖的當(dāng)前狀態(tài),如是否被鎖定,以及鎖定的線程等。
操作流程:
- 鎖定:線程通過(guò) Lock 實(shí)例獲取鎖。
- 條件檢查:線程檢查條件是否滿足。
- 等待:如果條件不滿足,線程調(diào)用 Condition 的 await() 方法,釋放鎖并進(jìn)入等待隊(duì)列。
- 喚醒:當(dāng)條件滿足時(shí),其他線程調(diào)用 Condition 的 signal() 或 signalAll() 方法,發(fā)送喚醒信號(hào)給等待隊(duì)列中的線程。
- 重新競(jìng)爭(zhēng)鎖:被喚醒的線程重新競(jìng)爭(zhēng)鎖。
- 再次檢查條件:線程在重新獲得鎖后,再次檢查條件是否滿足,如果滿足則繼續(xù)執(zhí)行。
綜合說(shuō)明:
- 作用: Condition 是與 Lock 接口配合使用的同步輔助工具,它允許一個(gè)或多個(gè)線程等待,直到被其他線程喚醒。
- 背景:在復(fù)雜的同步場(chǎng)景中,需要更細(xì)粒度的控制線程的等待和喚醒, Condition 提供了這種能力。
- 優(yōu)點(diǎn):
- 細(xì)粒度控制:提供了比 Object.wait()/ Object.notify() 更靈活的線程間協(xié)調(diào)機(jī)制。
- 多條件支持:一個(gè)鎖可以關(guān)聯(lián)多個(gè)條件,每個(gè)條件可以獨(dú)立喚醒等待的線程。
- 缺點(diǎn):
使用復(fù)雜:需要與 Lock 一起使用,增加了編程復(fù)雜度。
錯(cuò)誤使用可能導(dǎo)致死鎖或線程饑餓。
場(chǎng)景:適用于需要線程間復(fù)雜協(xié)調(diào)的場(chǎng)景,如任務(wù)調(diào)度、資源分配等。
業(yè)務(wù)舉例:在酒店預(yù)訂系統(tǒng)中, Condition 可以用于實(shí)現(xiàn)房間狀態(tài)的等待和通知機(jī)制。當(dāng)房間變?yōu)榭臻e時(shí),等待的顧客可以被通知并進(jìn)行預(yù)訂。
使用方式:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
private final Object[] buffer;
private int putPtr, takePtr, count;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int size) {
buffer = new Object[size];
}
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == buffer.length) { // 等待直到緩沖區(qū)非滿
notFull.await();
}
buffer[putPtr] = x;
putPtr = (putPtr + 1) % buffer.length;
count++;
notEmpty.signal(); // 通知可能等待的消費(fèi)者
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) { // 等待直到緩沖區(qū)非空
notEmpty.await();
}
Object x = buffer[takePtr];
takePtr = (takePtr + 1) % buffer.length;
count--;
notFull.signal(); // 通知可能等待的生產(chǎn)者
return x;
} finally {
lock.unlock();
}
}
}
public class ProducerConsumerDemo {
private final BoundedBuffer buffer;
public ProducerConsumerDemo(int size) {
buffer = new BoundedBuffer(size);
}
public void produce(String item) {
buffer.put(item);
}
public String consume() {
return (String) buffer.take();
}
public static void main(String[] args) throws InterruptedException {
final int SIZE = 10;
final ProducerConsumerDemo demo = new ProducerConsumerDemo(SIZE);
// 生產(chǎn)者線程
Thread producerThread = new Thread(() -> {
for (int i = 0; i < 20; i++) {
demo.produce("Item " + i);
try {
Thread.sleep(100); // 生產(chǎn)延時(shí)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 消費(fèi)者線程
Thread consumerThread = new Thread(() -> {
for (int i = 0; i < 20; i++) {
String item = demo.consume();
System.out.println("Consumed: " + item);
try {
Thread.sleep(150); // 消費(fèi)延時(shí)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
業(yè)務(wù)代碼案例:
業(yè)務(wù)說(shuō)明: 任務(wù)調(diào)度系統(tǒng)負(fù)責(zé)管理和執(zhí)行定時(shí)任務(wù)。這些任務(wù)可能包括數(shù)據(jù)備份、報(bào)告生成、系統(tǒng)維護(hù)等。系統(tǒng)需要能夠按預(yù)定時(shí)間觸發(fā)任務(wù),并確保任務(wù)在執(zhí)行時(shí)不會(huì)相互干擾。
為什么需要 Condition 技術(shù): 在任務(wù)調(diào)度系統(tǒng)中,任務(wù)的觸發(fā)通常依賴于時(shí)間,而任務(wù)的執(zhí)行可能需要等待特定條件滿足。 Condition 配合 Lock 使用,可以在沒(méi)有任務(wù)可執(zhí)行時(shí)讓調(diào)度器線程等待,直到有任務(wù)準(zhǔn)備好執(zhí)行。這種機(jī)制允許系統(tǒng)在沒(méi)有任務(wù)執(zhí)行需求時(shí)保持空閑,從而節(jié)省資源。
沒(méi)有 Condition 技術(shù)會(huì)帶來(lái)什么后果:
沒(méi)有使用 Condition 或其他等待/通知機(jī)制可能會(huì)導(dǎo)致以下問(wèn)題:
- 資源浪費(fèi):如果調(diào)度器不斷輪詢檢查新任務(wù),可能會(huì)浪費(fèi)大量 CPU 資源。
- 響應(yīng)性差:在新任務(wù)到來(lái)時(shí),如果沒(méi)有有效的機(jī)制來(lái)喚醒調(diào)度器,可能會(huì)導(dǎo)致任務(wù)執(zhí)行延遲。
- 代碼復(fù)雜度:沒(méi)有 Condition,可能需要使用更復(fù)雜的多線程同步機(jī)制,增加了代碼的復(fù)雜性和出錯(cuò)的風(fēng)險(xiǎn)。
代碼實(shí)現(xiàn):
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;
public class TaskScheduler {
private final ReentrantLock lock = new ReentrantLock();
private final Condition taskAvailable = lock.newCondition();
private final Queue<Runnable> tasks = new LinkedList<>();
public void schedule(Runnable task, long delay) {
lock.lock();
try {
tasks.add(() -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
task.run();
});
taskAvailable.signal(); // 通知調(diào)度器有新任務(wù)
} finally {
lock.unlock();
}
}
public void startScheduling() {
new Thread(this::runScheduler).start();
}
private void runScheduler() {
lock.lock();
try {
while (true) {
while (tasks.isEmpty()) { // 如果沒(méi)有任務(wù),等待
taskAvailable.await();
}
Runnable task = tasks.poll();
task.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
TaskScheduler scheduler = new TaskScheduler();
scheduler.schedule(() -> System.out.println("Task 1 executed at " + new Date()), 2000);
scheduler.schedule(() -> System.out.println("Task 2 executed at " + new Date()), 4000);
scheduler.startScheduling();
}
}