成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

精通Java并發(fā)鎖機(jī)制:24種鎖技巧+業(yè)務(wù)鎖匹配方案

開(kāi)發(fā) 前端
為什么需要?CyclicBarrier?技術(shù): 在多階段數(shù)據(jù)處理的場(chǎng)景中,不同的處理任務(wù)可能由不同的線程執(zhí)行,而這些線程的執(zhí)行時(shí)間可能不同。?CyclicBarrier?允許每個(gè)階段的處理在開(kāi)始前等待所有相關(guān)線程完成上一階段的任務(wù),確保數(shù)據(jù)的一致性和完整性。

在 Java 并發(fā)編程中,鎖是確保線程安全、協(xié)調(diào)多線程訪問(wèn)共享資源的關(guān)鍵機(jī)制。從基本的 synchronized 同步關(guān)鍵字到高級(jí)的 ReentrantLock、讀寫(xiě)鎖 ReadWriteLock、無(wú)鎖設(shè)計(jì)如 AtomicInteger,再到復(fù)雜的同步輔助工具如 CountDownLatch、 CyclicBarrier 和 Semaphore,每種鎖都針對(duì)特定的并發(fā)場(chǎng)景設(shè)計(jì),以解決多線程環(huán)境下的同步問(wèn)題。 StampedLock 提供了樂(lè)觀讀鎖和悲觀寫(xiě)鎖的選項(xiàng),而 ConcurrentHashMap 和 ConcurrentLinkedQueue 等并發(fā)集合則通過(guò)內(nèi)部機(jī)制優(yōu)化了并發(fā)訪問(wèn)。了解不同鎖的特點(diǎn)和適用場(chǎng)景,對(duì)于構(gòu)建高效、穩(wěn)定的并發(fā)應(yīng)用程序至關(guān)重要。

1、鎖選擇維度

選擇適合的鎖通常依賴于特定的應(yīng)用場(chǎng)景和并發(fā)需求。以下是一個(gè)表格,概述了不同鎖類型的關(guān)鍵特性和選擇它們的考量維度:

鎖類型

適用場(chǎng)景

鎖模式

性能特點(diǎn)

公平性

鎖的粗細(xì)

條件支持

阻塞策略

用途舉例

synchronized

簡(jiǎn)單的同步需求,無(wú)需復(fù)雜控制

獨(dú)占式

適中,偏向鎖、輕量級(jí)鎖優(yōu)化

無(wú)公平策略

粗粒度鎖

不支持

阻塞等待

單例模式、簡(jiǎn)單的計(jì)數(shù)器

ReentrantLock

需要靈活的鎖控制,如可中斷、超時(shí)、嘗試鎖定等

獨(dú)占式

高,支持多種鎖定方式

可配置公平性

細(xì)粒度鎖

支持

可中斷、超時(shí)、嘗試

同步代碼塊或方法、復(fù)雜同步控制

ReadWriteLock

讀多寫(xiě)少的場(chǎng)景

共享-獨(dú)占式

高,提高讀操作并發(fā)性

不支持公平性

細(xì)粒度鎖

不支持

阻塞等待

緩存系統(tǒng)、文件系統(tǒng)

StampedLock

讀多寫(xiě)多,需要樂(lè)觀讀和悲觀寫(xiě)的場(chǎng)景

樂(lè)觀讀-悲觀寫(xiě)

高,提供讀寫(xiě)鎖的擴(kuò)展

可配置公平性

細(xì)粒度鎖

支持

可中斷、超時(shí)、嘗試

高性能計(jì)數(shù)器、數(shù)據(jù)緩存

CountDownLatch

需要等待一組操作完成的場(chǎng)景

無(wú)

低,一次性

不支持公平性

粗粒度鎖

不支持

阻塞等待

任務(wù)協(xié)調(diào)、初始化操作

Semaphore

需要控制資源訪問(wèn)數(shù)量的場(chǎng)景

信號(hào)量

高,控制并發(fā)數(shù)量

不支持公平性

細(xì)粒度鎖

支持

阻塞等待

限流、資源池管理

CyclicBarrier

需要周期性執(zhí)行一組操作的場(chǎng)景

無(wú)

低,重用性

支持公平性

粗粒度鎖

支持

阻塞等待

并行計(jì)算、批處理

2、鎖詳細(xì)分析

2.7. CyclicBarrier

CyclicBarrier 是 Java 中用于線程間同步的一種工具,它允許一組線程互相等待,直到所有線程都到達(dá)一個(gè)公共屏障點(diǎn)。 

圖片圖片

圖解說(shuō)明:
  • Java 線程:表示運(yùn)行中的線程,它們可能需要在某個(gè)點(diǎn)同步。
  • CyclicBarrier 實(shí)例:是 CyclicBarrier 類的實(shí)例,用于協(xié)調(diào)一組線程在屏障點(diǎn)同步。
  • 屏障:表示線程需要到達(dá)的同步點(diǎn),所有線程必須到達(dá)這個(gè)點(diǎn)才能繼續(xù)執(zhí)行。
  • 共享資源或任務(wù):表示線程需要訪問(wèn)的共享資源或執(zhí)行的任務(wù),它們?cè)谄琳宵c(diǎn)同步后可以安全地執(zhí)行。
  • 等待區(qū):表示等待其他線程到達(dá)屏障點(diǎn)的線程集合。
  • 計(jì)數(shù)器: CyclicBarrier 內(nèi)部維護(hù)一個(gè)計(jì)數(shù)器,用于跟蹤尚未到達(dá)屏障點(diǎn)的線程數(shù)量。
  • 屏障動(dòng)作(Runnable) :可選的,當(dāng)所有線程到達(dá)屏障點(diǎn)時(shí),可以執(zhí)行一個(gè)特定的動(dòng)作或任務(wù)。
綜合說(shuō)明:
  • 作用: CyclicBarrier 是一種同步幫助工具,允許一組線程相互等待,直到所有線程都到達(dá)某個(gè)公共屏障點(diǎn)。
  • 背景:在需要多個(gè)線程協(xié)作完成任務(wù)時(shí), CyclicBarrier 提供了一種機(jī)制,使得所有線程可以在屏障點(diǎn)同步,然后繼續(xù)執(zhí)行。
  • 優(yōu)點(diǎn):

可重復(fù)使用:與 CountDownLatch 不同, CyclicBarrier 可以重復(fù)使用,適用于周期性的任務(wù)同步。

支持屏障動(dòng)作:可以設(shè)置一個(gè)在所有線程到達(dá)屏障點(diǎn)后執(zhí)行的回調(diào)。

  • 缺點(diǎn):
  • 可能導(dǎo)致死鎖:如果一個(gè)或多個(gè)線程未到達(dá)屏障點(diǎn),其他線程將一直等待。

  • 復(fù)雜性:需要合理設(shè)計(jì)以避免線程永久等待。

  • 場(chǎng)景:適用于需要周期性同步多個(gè)線程的場(chǎng)景。

  • 業(yè)務(wù)舉例:在多階段數(shù)據(jù)處理流程中,每個(gè)階段需要所有數(shù)據(jù)都準(zhǔn)備好后才能開(kāi)始處理。使用 CyclicBarrier可以確保所有數(shù)據(jù)加載線程在每個(gè)階段開(kāi)始前都已準(zhǔn)備好。

使用方式:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Race {
    private final CyclicBarrier barrier;

    public Race(int numberOfRunners) {
        barrier = new CyclicBarrier(numberOfRunners, () -> {
            System.out.println("比賽開(kāi)始!");
            // 這里可以放置所有參與者到達(dá)屏障后要執(zhí)行的操作
        });
    }

    public void run() {
        System.out.println("等待其他參賽者...");
        try {
            barrier.await(); // 等待其他線程
            System.out.println("開(kāi)始跑步!");
            // 跑步時(shí)間
            Thread.sleep((long) (Math.random() * 10000));
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        final int numberOfRunners = 5;
        Race race = new Race(numberOfRunners);

        // 創(chuàng)建參賽者線程
        for (int i = 0; i < numberOfRunners; i++) {
            final int runnerNumber = i + 1;
            new Thread(() -> {
                System.out.println("參賽者 " + runnerNumber + " 已準(zhǔn)備就緒");
                race.run();
            }).start();
        }
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說(shuō)明: 在大數(shù)據(jù)處理系統(tǒng)中,經(jīng)常需要對(duì)大量數(shù)據(jù)進(jìn)行多階段處理,例如,數(shù)據(jù)清洗、轉(zhuǎn)換、聚合和加載。這些處理階段通常需要按順序執(zhí)行,且每個(gè)階段開(kāi)始前必須確保所有數(shù)據(jù)都已準(zhǔn)備好。

為什么需要 CyclicBarrier 技術(shù): 在多階段數(shù)據(jù)處理的場(chǎng)景中,不同的處理任務(wù)可能由不同的線程執(zhí)行,而這些線程的執(zhí)行時(shí)間可能不同。 CyclicBarrier 允許每個(gè)階段的處理在開(kāi)始前等待所有相關(guān)線程完成上一階段的任務(wù),確保數(shù)據(jù)的一致性和完整性。

沒(méi)有 CyclicBarrier 技術(shù)會(huì)帶來(lái)什么后果:

沒(méi)有使用 CyclicBarrier 或其他同步協(xié)調(diào)機(jī)制可能會(huì)導(dǎo)致以下問(wèn)題:

  1. 數(shù)據(jù)不一致:如果后續(xù)階段的處理在前一階段的數(shù)據(jù)未完全準(zhǔn)備好時(shí)開(kāi)始,可能會(huì)導(dǎo)致處理結(jié)果不準(zhǔn)確。
  2. 資源浪費(fèi):在等待數(shù)據(jù)準(zhǔn)備的過(guò)程中,系統(tǒng)資源可能被無(wú)效占用,導(dǎo)致資源利用效率低下。
  3. 錯(cuò)誤和異常:由于階段間的依賴關(guān)系沒(méi)有得到妥善處理,可能會(huì)引發(fā)程序錯(cuò)誤或運(yùn)行時(shí)異常。

代碼實(shí)現(xiàn):

import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DataProcessingPipeline {
    private final ExecutorService executor = Executors.newFixedThreadPool(4);
    private final CyclicBarrier barrier;
    private final int numberOfPhases;
    private final int numberOfTasks;

    public DataProcessingPipeline(int numberOfTasks, int numberOfPhases) {
        this.numberOfTasks = numberOfTasks;
        this.numberOfPhases = numberOfPhases;
        this.barrier = new CyclicBarrier(numberOfTasks, () -> {
            System.out.println("一個(gè)階段完成,準(zhǔn)備進(jìn)入下一階段");
        });
    }

    public void processData() throws Exception {
        for (int phase = 1; phase <= numberOfPhases; phase++) {
            System.out.println("階段 " + phase + " 開(kāi)始");
            for (int task = 0; task < numberOfTasks; task++) {
                final int currentTask = task;
                executor.submit(() -> {
                    try {
                        // 數(shù)據(jù)處理任務(wù)
                        System.out.println("任務(wù) " + currentTask + " 在階段 " + phase + " 執(zhí)行");
                        Thread.sleep((long) (Math.random() * 5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            barrier.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            barrier.await(); // 等待所有任務(wù)完成
        }
        executor.shutdown();
    }

    public static void main(String[] args) {
        DataProcessingPipeline pipeline = new DataProcessingPipeline(4, 3);
        try {
            pipeline.processData();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.8. Atomic Variables

原子變量是 Java 中 java.util.concurrent.atomic 包提供的一些類,它們利用底層硬件的原子性指令來(lái)保證操作的原子性,無(wú)需使用鎖。 

圖片圖片

圖解說(shuō)明:
  • Java 線程:表示運(yùn)行中的線程,它們可能需要對(duì)共享資源進(jìn)行原子操作。
  • Atomic Variables:表示原子變量的集合,包括 AtomicInteger、 AtomicLong、 AtomicReference 等。
  • AtomicInteger、AtomicLong、AtomicReference:分別表示整型、長(zhǎng)整型和引用類型的原子變量。
  • 硬件支持的原子指令:底層硬件提供的原子性指令,如 compare-and-swap (CAS)、load-linked、store-conditional 等。
  • 共享資源:表示被多個(gè)線程共享的數(shù)據(jù),如計(jì)數(shù)器、累加器等。
  • 內(nèi)存:表示 Java 程序使用的內(nèi)存空間,包括堆和棧等。
  • 變量狀態(tài):表示原子變量在內(nèi)存中的當(dāng)前狀態(tài)。
綜合說(shuō)明:
  • 作用:原子變量類(如 AtomicInteger, AtomicLong, AtomicReference 等)提供了一種機(jī)制,使得對(duì)變量的某些操作(如自增、自減、讀取和寫(xiě)入)是原子性的,無(wú)需使用傳統(tǒng)的鎖。
  • 背景:在多線程環(huán)境中,對(duì)共享變量的并發(fā)訪問(wèn)需要同步措施以防止數(shù)據(jù)競(jìng)爭(zhēng)。原子變量利用底層硬件的原子指令來(lái)保證操作的原子性,從而簡(jiǎn)化了線程同步。
  • 優(yōu)點(diǎn):

無(wú)鎖設(shè)計(jì):避免使用傳統(tǒng)鎖,減少了線程切換的開(kāi)銷。

性能優(yōu)化:對(duì)于高競(jìng)爭(zhēng)的簡(jiǎn)單變量訪問(wèn),原子變量通常比鎖有更好的性能。

  • 缺點(diǎn):
  • 功能限制:僅適用于簡(jiǎn)單的操作,復(fù)雜的操作無(wú)法通過(guò)原子變量實(shí)現(xiàn)。

  • 可組合性問(wèn)題:復(fù)雜的原子操作需要仔細(xì)設(shè)計(jì),否則可能引入競(jìng)態(tài)條件。

  • 場(chǎng)景:適用于對(duì)簡(jiǎn)單變量進(jìn)行原子操作的場(chǎng)景,如計(jì)數(shù)器、累加器等。

  • 業(yè)務(wù)舉例:在電商平臺(tái)的庫(kù)存管理中, AtomicInteger 可以用來(lái)原子地更新商品的庫(kù)存數(shù)量,確保在高并發(fā)環(huán)境下庫(kù)存數(shù)據(jù)的一致性。

使用方式:
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class Counter {
    // 使用 AtomicInteger 來(lái)確保計(jì)數(shù)器的線程安全
    private final AtomicInteger count = new AtomicInteger(0);

    // 提供一個(gè)方法來(lái)增加計(jì)數(shù)器的值
    public void increment() {
        // 原子地增加計(jì)數(shù)器的值
        count.incrementAndGet();
    }

    // 提供一個(gè)方法來(lái)獲取當(dāng)前計(jì)數(shù)器的值
    public int getCount() {
        // 原子地獲取計(jì)數(shù)器的值
        return count.get();
    }
}

public class DataStore {
    // 使用 AtomicLong 來(lái)統(tǒng)計(jì)數(shù)據(jù)總量
    private final AtomicLong dataCount = new AtomicLong(0);

    public void addData(long size) {
        // 原子地將數(shù)據(jù)大小累加到總量
        dataCount.addAndGet(size);
    }

    public long getDataCount() {
        // 原子地獲取當(dāng)前數(shù)據(jù)總量
        return dataCount.get();
    }
}

// 測(cè)試類
public class AtomicVariablesDemo {
    public static void main(String[] args) {
        Counter counter = new Counter();
        DataStore dataStore = new DataStore();

        // 多線程環(huán)境中對(duì)計(jì)數(shù)器和數(shù)據(jù)總量的更新
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                counter.increment();
                dataStore.addData(100);  // 假設(shè)每次操作增加100單位數(shù)據(jù)
            }).start();
        }

        // 等待所有線程完成
        while (Thread.activeCount() > 1) {
            Thread.yield();
        }

        // 輸出計(jì)數(shù)器的值和數(shù)據(jù)總量
        System.out.println("Counter value: " + counter.getCount());
        System.out.println("Data store size: " + dataStore.getDataCount());
    }
}

業(yè)務(wù)代碼案例:

場(chǎng)景描述:社交網(wǎng)絡(luò)的實(shí)時(shí)消息計(jì)數(shù)

業(yè)務(wù)說(shuō)明: 社交網(wǎng)絡(luò)平臺(tái)需要顯示每個(gè)用戶的實(shí)時(shí)消息通知數(shù)。每當(dāng)用戶收到新消息時(shí),消息計(jì)數(shù)需要增加;用戶閱讀消息時(shí),計(jì)數(shù)可能會(huì)減少或被重置。此計(jì)數(shù)需要對(duì)所有用戶可見(jiàn),且在高并發(fā)環(huán)境下保持準(zhǔn)確。

為什么需要 AtomicVariables 技術(shù): 在社交網(wǎng)絡(luò)中,多個(gè)用戶可能同時(shí)發(fā)送消息給同一個(gè)接收者,或者一個(gè)用戶可能同時(shí)在多個(gè)設(shè)備上接收消息。這導(dǎo)致對(duì)消息計(jì)數(shù)的讀取和更新操作非常頻繁。使用 AtomicInteger 可以確保消息計(jì)數(shù)更新的原子性,并且在多線程環(huán)境下保持?jǐn)?shù)據(jù)的一致性。

沒(méi)有 AtomicVariables 技術(shù)會(huì)帶來(lái)什么后果:

沒(méi)有使用 AtomicVariables 或其他并發(fā)控制機(jī)制可能會(huì)導(dǎo)致以下問(wèn)題:

  1. 數(shù)據(jù)不一致:消息計(jì)數(shù)可能會(huì)出錯(cuò),導(dǎo)致用戶看到不正確的消息數(shù)量。
  2. 用戶體驗(yàn)下降:如果消息通知不準(zhǔn)確,用戶可能會(huì)錯(cuò)過(guò)重要通知,或者對(duì)應(yīng)用的可靠性產(chǎn)生懷疑。
  3. 系統(tǒng)復(fù)雜度增加:在沒(méi)有有效同步機(jī)制的情況下,維護(hù)數(shù)據(jù)一致性將變得復(fù)雜且容易出錯(cuò)。

代碼實(shí)現(xiàn):

import java.util.concurrent.atomic.AtomicInteger;

public class MessageNotificationCounter {
    private final AtomicInteger messageCount = new AtomicInteger(0);

    // 接收新消息時(shí)調(diào)用此方法
    public void receiveMessage() {
        // 原子地增加消息計(jì)數(shù)
        messageCount.incrementAndGet();
        System.out.println("New message received. Total messages: " + messageCount.get());
    }

    // 用戶閱讀消息時(shí)調(diào)用此方法
    public void messagesRead() {
        // 原子地減少消息計(jì)數(shù)
        messageCount.decrementAndGet();
        System.out.println("Messages read. Remaining messages: " + messageCount.get());
    }

    // 獲取當(dāng)前消息計(jì)數(shù)
    public int getMessageCount() {
        return messageCount.get();
    }
}

// 測(cè)試類
public class AtomicVariablesDemo {
    public static void main(String[] args) {
        MessageNotificationCounter counter = new MessageNotificationCounter();

        // 多個(gè)用戶同時(shí)發(fā)送消息
        Thread sender1 = new Thread(() -> {
            counter.receiveMessage();
        });
        Thread sender2 = new Thread(() -> {
            counter.receiveMessage();
        });

        // 用戶閱讀消息
        Thread reader = new Thread(() -> {
            counter.messagesRead();
        });

        sender1.start();
        sender2.start();
        reader.start();

        sender1.join();
        sender2.join();
        reader.join();

        System.out.println("Final message count: " + counter.getMessageCount());
    }
}

2.9. ConcurrentHashMap

ConcurrentHashMap 是 Java 中一個(gè)線程安全的哈希表,它通過(guò)分段鎖(Segmentation)和 CAS 操作來(lái)支持高并發(fā)的讀寫(xiě)操作。 

圖解說(shuō)明:
  • Java 線程:表示運(yùn)行中的線程,它們可能需要對(duì) ConcurrentHashMap 進(jìn)行讀寫(xiě)操作。
  • ConcurrentHashMap 實(shí)例:是 ConcurrentHashMap 類的實(shí)例,用于存儲(chǔ)鍵值對(duì)并提供線程安全的訪問(wèn)。
  • Segment 數(shù)組: ConcurrentHashMap 將哈希表分為多個(gè)段(Segment),每個(gè)段維護(hù)一部分哈希桶,通過(guò)分段鎖減少鎖的競(jìng)爭(zhēng)。
  • Hash 桶:存儲(chǔ)哈希桶數(shù)組,每個(gè)桶可以包含一個(gè)或多個(gè)鍵值對(duì)。
  • 鏈表或紅黑樹(shù):在哈希桶中,鍵值對(duì)最初以鏈表形式存儲(chǔ),當(dāng)鏈表長(zhǎng)度超過(guò)閾值時(shí),鏈表可能會(huì)被轉(zhuǎn)換為紅黑樹(shù)以提高搜索效率。
  • 共享資源:表示存儲(chǔ)在 ConcurrentHashMap 中的鍵值對(duì)數(shù)據(jù)。
  • 讀操作:線程可以并發(fā)地讀取 ConcurrentHashMap 中的數(shù)據(jù),在讀多寫(xiě)少的場(chǎng)景下,讀操作不會(huì)阻塞其他讀操作。
  • 寫(xiě)操作:線程對(duì) ConcurrentHashMap 的寫(xiě)入操作,寫(xiě)操作需要獲取相應(yīng)段的鎖。
  • 鎖:每個(gè)段擁有自己的鎖,寫(xiě)操作需要獲取鎖,而讀操作通常不需要。
升級(jí)設(shè)計(jì)說(shuō)明:
Java 1.7 ConcurrentHashMap 鎖機(jī)制

在 Java 1.7 中, ConcurrentHashMap 使用分段鎖機(jī)制,其中每個(gè)段相當(dāng)于一個(gè)小的哈希表,擁有自己的鎖。 

Java 1.8 ConcurrentHashMap 鎖機(jī)制

在 Java 1.8 中, ConcurrentHashMap 摒棄了分段鎖機(jī)制,采用了 CAS 和 synchronized 來(lái)確保線程安全。

綜合說(shuō)明:
  • 作用: ConcurrentHashMap 是 Java 中提供的一個(gè)線程安全的哈希表,它通過(guò)分段鎖的概念來(lái)允許并發(fā)的讀寫(xiě)操作,從而提高并發(fā)訪問(wèn)的性能。
  • 背景:傳統(tǒng)的 HashMap 在多線程環(huán)境下需要外部同步,而 ConcurrentHashMap 通過(guò)鎖分離技術(shù)減少了鎖的競(jìng)爭(zhēng),提供了更好的并發(fā)性能。
  • 優(yōu)點(diǎn):
  • 高并發(fā):通過(guò)細(xì)分鎖到段,允許多個(gè)線程同時(shí)操作不同段的數(shù)據(jù)。
  • 動(dòng)態(tài)擴(kuò)容:內(nèi)部采用動(dòng)態(tài)數(shù)組和鏈表結(jié)構(gòu),提高了空間和時(shí)間效率。
  • 缺點(diǎn):
  • 復(fù)雜度高:實(shí)現(xiàn)復(fù)雜,需要維護(hù)多個(gè)鎖和復(fù)雜的數(shù)據(jù)結(jié)構(gòu)。

  • 性能調(diào)優(yōu):在極端高并發(fā)場(chǎng)景下,可能需要調(diào)整默認(rèn)的并發(fā)級(jí)別。

  • 場(chǎng)景:適用于需要高并發(fā)訪問(wèn)的緩存或數(shù)據(jù)存儲(chǔ)。

  • 業(yè)務(wù)舉例:在大數(shù)據(jù)處理系統(tǒng)中, ConcurrentHashMap 可以用來(lái)存儲(chǔ)實(shí)時(shí)計(jì)算結(jié)果,支持大量并發(fā)的讀寫(xiě)操作而不會(huì)導(dǎo)致性能瓶頸。

使用方式:
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;

public class ConcurrentHashMapDemo {
    // 創(chuàng)建一個(gè) ConcurrentHashMap 實(shí)例
    private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

    // 將一個(gè)鍵值對(duì)插入到 Map 中
    public void put(String key, Integer value) {
        // put 方法是線程安全的
        map.put(key, value);
    }

    // 從 Map 中獲取與指定鍵關(guān)聯(lián)的值
    public Integer get(String key) {
        // get 方法是線程安全的
        return map.get(key);
    }

    // 計(jì)算 Map 中的元素?cái)?shù)量
    public int size() {
        // size 方法是線程安全的
        return map.size();
    }

    // 演示刪除操作
    public void remove(String key) {
        // remove 方法是線程安全的
        map.remove(key);
    }

    // 演示如何批量添加數(shù)據(jù)
    public void addAll(Map<String, Integer> newData) {
        // putAll 方法是線程安全的
        map.putAll(newData);
    }

    public static void main(String[] args) {
        ConcurrentHashMapDemo demo = new ConcurrentHashMapDemo();

        // 批量添加數(shù)據(jù)
        demo.addAll(Map.of("key1", 1, "key2", 2, "key3", 3));

        // 單獨(dú)添加一條數(shù)據(jù)
        demo.put("key4", 4);

        // 獲取并打印一條數(shù)據(jù)
        System.out.println("Value for 'key1': " + demo.get("key1"));

        // 獲取 Map 的大小
        System.out.println("Map size: " + demo.size());

        // 刪除一條數(shù)據(jù)
        demo.remove("key2");

        // 再次獲取 Map 的大小
        System.out.println("Map size after removal: " + demo.size());
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說(shuō)明: 在分布式緩存系統(tǒng)中,經(jīng)常需要存儲(chǔ)和檢索用戶會(huì)話信息、應(yīng)用配置、熱點(diǎn)數(shù)據(jù)等。這些數(shù)據(jù)需要被多個(gè)應(yīng)用實(shí)例共享,并且要求在高并發(fā)環(huán)境下依然保持高性能。緩存數(shù)據(jù)通常有過(guò)期時(shí)間,需要定期清理。

為什么需要 ConcurrentHashMap 技術(shù): ConcurrentHashMap 提供了一種高效的方式來(lái)處理并發(fā)的讀取和更新操作,并且它的分段鎖機(jī)制允許多個(gè)線程同時(shí)對(duì)不同段進(jìn)行操作,從而提高并發(fā)處理能力。此外, ConcurrentHashMap 在 Java 8 中引入的紅黑樹(shù)結(jié)構(gòu)使得即使在高并發(fā)更新導(dǎo)致哈希沖突時(shí),也能保持高效的性能。

沒(méi)有 ConcurrentHashMap 技術(shù)會(huì)帶來(lái)什么后果:

沒(méi)有使用 ConcurrentHashMap 可能會(huì)導(dǎo)致以下問(wèn)題:

  1. 性能瓶頸:在高并發(fā)環(huán)境下,如果使用 HashMap 加 synchronized,可能導(dǎo)致嚴(yán)重的性能瓶頸,因?yàn)樗芯€程必須等待一個(gè)鎖。
  2. 數(shù)據(jù)不一致:在沒(méi)有適當(dāng)同步的情況下,多個(gè)線程同時(shí)更新數(shù)據(jù)可能導(dǎo)致緩存數(shù)據(jù)不一致。
  3. 擴(kuò)展性差:隨著系統(tǒng)負(fù)載的增加,基于 HashMap 的緩存解決方案可能難以擴(kuò)展,因?yàn)殒i競(jìng)爭(zhēng)和線程安全問(wèn)題。

代碼實(shí)現(xiàn):

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class DistributedCache<K, V> {
    private final ConcurrentHashMap<K, V> cacheMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<K, Long> expirationMap = new ConcurrentHashMap<>();

    public void put(K key, V value, long ttl) {
        cacheMap.put(key, value);
        expirationMap.put(key, System.currentTimeMillis() + ttl);
        scheduleEviction(key, ttl);
    }

    public V get(K key) {
        Long expirationTime = expirationMap.get(key);
        if (expirationTime == null || expirationTime < System.currentTimeMillis()) {
            cacheMap.remove(key);
            expirationMap.remove(key);
            return null;
        }
        return cacheMap.get(key);
    }

    private void scheduleEviction(final K key, final long ttl) {
        new Thread(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(ttl);
                cacheMap.computeIfPresent(key, (k, v) -> null);
                expirationMap.remove(key);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }

    public static void main(String[] args) {
        DistributedCache<String, String> cache = new DistributedCache<>();
        cache.put("userSession", "sessionData", 5000); // 緩存設(shè)置5秒過(guò)期

        // 多個(gè)線程并發(fā)訪問(wèn)緩存
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            new Thread(() -> {
                String result = cache.get("userSession");
                System.out.println("Thread " + finalI + " retrieved: " + result);
            }).start();
        }
    }
}

2.10.ConcurrentSkipListMap

ConcurrentSkipListMap 是 Java 中實(shí)現(xiàn)的一個(gè)高性能并發(fā)的有序映射,它使用跳表(Skip List)作為其底層數(shù)據(jù)結(jié)構(gòu)。 

圖片圖片

圖解說(shuō)明:
  • Java 線程:表示運(yùn)行中的線程,它們可能需要對(duì) ConcurrentSkipListMap 進(jìn)行讀寫(xiě)操作。
  • ConcurrentSkipListMap 實(shí)例:是 ConcurrentSkipListMap 類的實(shí)例,用于存儲(chǔ)鍵值對(duì)并提供線程安全的訪問(wèn)。
  • Skip List 層級(jí)結(jié)構(gòu):跳表由多層索引構(gòu)成,每一層都是一個(gè)有序的鏈表。
  • 索引層:跳表中的索引層,用于加速搜索操作。
  • 數(shù)據(jù)層:跳表中的底層數(shù)據(jù)結(jié)構(gòu),存儲(chǔ)實(shí)際的鍵值對(duì)。
  • Node 節(jié)點(diǎn):跳表中的節(jié)點(diǎn),包含鍵值對(duì)和指向其他節(jié)點(diǎn)的鏈接。
  • 共享資源:表示存儲(chǔ)在 ConcurrentSkipListMap 中的鍵值對(duì)數(shù)據(jù)。
  • 讀操作:線程可以并發(fā)地讀取 ConcurrentSkipListMap 中的數(shù)據(jù)。
  • 寫(xiě)操作:線程可以并發(fā)地修改 ConcurrentSkipListMap 中的數(shù)據(jù)。
  • CAS 操作:在更新節(jié)點(diǎn)鏈接或修改數(shù)據(jù)時(shí),使用 CAS 操作來(lái)保證線程安全。
  • 自旋鎖/同步塊:在某些情況下,如果 CAS 操作失敗,可能會(huì)使用自旋鎖或同步塊來(lái)確保操作的原子性。

操作流程:

  1. 讀操作:

線程通過(guò)索引層快速定位到數(shù)據(jù)層的節(jié)點(diǎn)。

線程使用 volatile 讀取節(jié)點(diǎn)的值,確保內(nèi)存可見(jiàn)性。

  1. 寫(xiě)操作:
  • 線程在更新或添加節(jié)點(diǎn)時(shí),首先嘗試使用 CAS 操作。

  • 如果 CAS 操作失敗,線程可能會(huì)使用自旋鎖或同步塊來(lái)確保原子性。

綜合說(shuō)明:

作用: ConcurrentSkipListMap 是一種線程安全的有序映射,它通過(guò)使用跳表(Skip List)數(shù)據(jù)結(jié)構(gòu)來(lái)支持高效的并發(fā)訪問(wèn)和排序操作。 背景:在需要高效并發(fā)訪問(wèn)和保持元素有序的場(chǎng)景中,傳統(tǒng)的 TreeMap 由于其加鎖策略在高并發(fā)環(huán)境下性能受限, ConcurrentSkipListMap 提供了一種替代方案。 優(yōu)點(diǎn):

  • 高性能并發(fā)訪問(wèn):通過(guò)跳表結(jié)構(gòu)和細(xì)粒度鎖定,實(shí)現(xiàn)了高效的并發(fā)讀取和更新。
  • 有序性:保持元素的有序性,支持范圍查詢等操作。
  • 動(dòng)態(tài)調(diào)整:可以根據(jù)訪問(wèn)模式動(dòng)態(tài)調(diào)整結(jié)構(gòu),優(yōu)化性能。 缺點(diǎn):
  • 內(nèi)存占用:相比無(wú)序的 ConcurrentHashMap,由于維護(hù)了有序性,內(nèi)存占用可能更高。
  • 復(fù)雜性:實(shí)現(xiàn)相對(duì)復(fù)雜,涉及多級(jí)索引和節(jié)點(diǎn)的管理。 場(chǎng)景:適用于需要有序數(shù)據(jù)且高并發(fā)訪問(wèn)的場(chǎng)景,如實(shí)時(shí)數(shù)據(jù)索引、范圍查詢等。 業(yè)務(wù)舉例:在一個(gè)金融市場(chǎng)分析系統(tǒng)中,需要維護(hù)一個(gè)實(shí)時(shí)更新的價(jià)格索引, ConcurrentSkipListMap 可以用來(lái)存儲(chǔ)和快速檢索各種金融工具的當(dāng)前價(jià)格。
使用方式:
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;

public class ConcurrentSkipListMapDemo {
    // 創(chuàng)建一個(gè) ConcurrentSkipListMap 實(shí)例
    private final ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();

    // 將一個(gè)鍵值對(duì)插入到 Map 中
    public void put(Integer key, String value) {
        // put 方法是線程安全的
        map.put(key, value);
    }

    // 從 Map 中獲取與指定鍵關(guān)聯(lián)的值
    public String get(Integer key) {
        // get 方法是線程安全的
        return map.get(key);
    }

    // 獲取 Map 的鍵集合
    public java.util.NavigableSet<Integer> keySet() {
        // keySet 方法返回 Map 的鍵集合視圖
        return map.keySet();
    }

    // 獲取 Map 的值集合
    public java.util.Collection<String> values() {
        // values 方法返回 Map 的值集合視圖
        return map.values();
    }

    // 獲取 Map 的大小
    public int size() {
        // size 方法是線程安全的
        return map.size();
    }

    // 演示刪除操作
    public void remove(Integer key) {
        // remove 方法是線程安全的
        map.remove(key);
    }

    public static void main(String[] args) {
        ConcurrentSkipListMapDemo demo = new ConcurrentSkipListMapDemo();

        // 插入一些數(shù)據(jù)
        demo.put(1, "One");
        demo.put(2, "Two");
        demo.put(3, "Three");

        // 獲取并打印一條數(shù)據(jù)
        System.out.println("Value for key 2: " + demo.get(2));

        // 獲取 Map 的大小
        System.out.println("Map size: " + demo.size());

        // 獲取并打印所有鍵
        System.out.println("All keys: " + demo.keySet());

        // 刪除一條數(shù)據(jù)
        demo.remove(2);

        // 再次獲取 Map 的大小
        System.out.println("Map size after removal: " + demo.size());

        // 獲取并打印所有值
        System.out.println("All values: " + demo.values());
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說(shuō)明: 實(shí)時(shí)股票交易系統(tǒng)需要維護(hù)一個(gè)動(dòng)態(tài)變化的股票價(jià)格索引,該索引需要根據(jù)實(shí)時(shí)的市場(chǎng)數(shù)據(jù)進(jìn)行更新,并且允許多個(gè)交易線程并發(fā)地讀取和更新股票價(jià)格。此外,系統(tǒng)還需要定期根據(jù)價(jià)格波動(dòng)進(jìn)行調(diào)整,如計(jì)算價(jià)格的平均值、執(zhí)行價(jià)格范圍查詢等。

為什么需要 ConcurrentSkipListMap 技術(shù): ConcurrentSkipListMap 是一個(gè)線程安全的有序映射,它允許高效的范圍查詢和有序訪問(wèn),這對(duì)于股票價(jià)格索引來(lái)說(shuō)至關(guān)重要。由于股票價(jià)格會(huì)頻繁更新,且需要快速響應(yīng)市場(chǎng)變化,使用 ConcurrentSkipListMap 可以提供高效的插入、刪除和查找操作,同時(shí)保持?jǐn)?shù)據(jù)的有序性。

沒(méi)有 ConcurrentSkipListMap 技術(shù)會(huì)帶來(lái)什么后果:

沒(méi)有使用 ConcurrentSkipListMap 或其他適合有序并發(fā)操作的數(shù)據(jù)結(jié)構(gòu)可能會(huì)導(dǎo)致以下問(wèn)題:

  1. 性能瓶頸:如果使用 HashMap 或 ConcurrentHashMap,雖然可以實(shí)現(xiàn)并發(fā)更新,但無(wú)法高效執(zhí)行有序操作和范圍查詢,可能導(dǎo)致查詢性能不佳。
  2. 數(shù)據(jù)不一致:在高并發(fā)更新的情況下,如果沒(méi)有適當(dāng)?shù)耐綑C(jī)制,可能會(huì)導(dǎo)致價(jià)格信息的不一致。
  3. 復(fù)雜性增加:如果使用 synchronized 列表或數(shù)組來(lái)維護(hù)價(jià)格索引,可能需要手動(dòng)管理復(fù)雜的同步和排序邏輯,增加系統(tǒng)復(fù)雜性和出錯(cuò)的風(fēng)險(xiǎn)。

代碼實(shí)現(xiàn):

import java.util.concurrent.ConcurrentSkipListMap;

public class StockPriceIndex {
    private final ConcurrentSkipListMap<String, Double> priceIndex = new ConcurrentSkipListMap<>();

    public void updatePrice(String stockSymbol, Double newPrice) {
        // 更新股票價(jià)格
        priceIndex.put(stockSymbol, newPrice);
    }

    public Double getPrice(String stockSymbol) {
        // 獲取股票價(jià)格
        return priceIndex.get(stockSymbol);
    }

    public void removeStock(String stockSymbol) {
        // 移除股票信息
        priceIndex.remove(stockSymbol);
    }

    public ConcurrentSkipListMap<String, Double> headMap(String toKey) {
        // 獲取指定范圍內(nèi)的股票價(jià)格索引
        return priceIndex.headMap(toKey);
    }

    public static void main(String[] args) {
        StockPriceIndex index = new StockPriceIndex();
        index.updatePrice("AAPL", 150.00);
        index.updatePrice("GOOGL", 2750.50);
        index.updatePrice("MSFT", 250.00);

        System.out.println("Price of AAPL: " + index.getPrice("AAPL"));
        System.out.println("Price of GOOGL: " + index.getPrice("GOOGL"));

        // 獲取所有小于 "MSFT" 的股票價(jià)格索引
        ConcurrentSkipListMap<String, Double> subMap = index.headMap("MSFT");
        subMap.forEach((k, v) -> System.out.println("Stock: " + k + ", Price: " + v));
    }
}

2.11. ConcurrentLinkedQueue

ConcurrentLinkedQueue 是 Java 中一個(gè)線程安全的無(wú)鎖隊(duì)列,它使用 CAS (Compare-And-Swap) 操作來(lái)保證線程安全。 

圖片圖片

圖解說(shuō)明:
  • Java 線程:表示運(yùn)行中的線程,它們可能需要對(duì) ConcurrentLinkedQueue 進(jìn)行入隊(duì)或出隊(duì)操作。
  • ConcurrentLinkedQueue 實(shí)例:是 ConcurrentLinkedQueue 類的實(shí)例,用于存儲(chǔ)隊(duì)列中的元素并提供線程安全的訪問(wèn)。
  • Node 節(jié)點(diǎn)結(jié)構(gòu): ConcurrentLinkedQueue 使用內(nèi)部的 Node 類來(lái)存儲(chǔ)隊(duì)列中的每個(gè)元素。每個(gè)節(jié)點(diǎn)包含隊(duì)列中的一個(gè)元素和指向下一個(gè)節(jié)點(diǎn)的鏈接。
  • 虛擬頭節(jié)點(diǎn):隊(duì)列使用一個(gè)虛擬頭節(jié)點(diǎn)來(lái)簡(jiǎn)化出隊(duì)操作。虛擬頭節(jié)點(diǎn)不存儲(chǔ)實(shí)際的隊(duì)列元素。
  • 虛擬尾節(jié)點(diǎn):隊(duì)列使用一個(gè)虛擬尾節(jié)點(diǎn)來(lái)簡(jiǎn)化入隊(duì)操作。虛擬尾節(jié)點(diǎn)指向隊(duì)列中的最后一個(gè)節(jié)點(diǎn)。
  • 隊(duì)列元素:表示存儲(chǔ)在隊(duì)列中的實(shí)際數(shù)據(jù)。
  • 入隊(duì)操作:線程將新元素添加到隊(duì)列尾部的過(guò)程,通過(guò) CAS 更新虛擬尾節(jié)點(diǎn)的鏈接。
  • 出隊(duì)操作:線程從隊(duì)列頭部移除元素的過(guò)程,通過(guò) CAS 更新虛擬頭節(jié)點(diǎn)的鏈接。
  • CAS 操作: ConcurrentLinkedQueue 使用 CAS 操作來(lái)更新節(jié)點(diǎn)之間的鏈接,從而實(shí)現(xiàn)無(wú)鎖的線程安全隊(duì)列。
  • 自旋等待:在 CAS 操作失敗時(shí),線程可能會(huì)自旋等待直到操作成功。

操作流程:

  1. 入隊(duì)操作:線程通過(guò) CAS 操作將新節(jié)點(diǎn)插入到隊(duì)列尾部,并更新尾節(jié)點(diǎn)指針。
  2. 出隊(duì)操作:線程通過(guò) CAS 操作移除隊(duì)列頭部的節(jié)點(diǎn),并更新頭節(jié)點(diǎn)指針。
  3. CAS 操作:在入隊(duì)和出隊(duì)過(guò)程中,線程使用 CAS 來(lái)保證節(jié)點(diǎn)鏈接的原子性更新。
綜合說(shuō)明:

作用: ConcurrentLinkedQueue 是一種基于鏈接節(jié)點(diǎn)的無(wú)界線程安全隊(duì)列,支持高并發(fā)的入隊(duì)和出隊(duì)操作。 背景:在多線程環(huán)境中,需要一種高效的隊(duì)列來(lái)處理任務(wù)或消息傳遞, ConcurrentLinkedQueue 提供了一種無(wú)鎖的解決方案。 優(yōu)點(diǎn):

  • 無(wú)鎖設(shè)計(jì):利用 CAS 操作實(shí)現(xiàn)無(wú)鎖的線程安全隊(duì)列,提高了并發(fā)性能。
  • 簡(jiǎn)單高效:提供了簡(jiǎn)單的入隊(duì)和出隊(duì)操作,適合作為任務(wù)隊(duì)列或消息傳遞隊(duì)列。
  • 無(wú)界隊(duì)列:理論上隊(duì)列大小無(wú)界,適用于處理大量任務(wù)。 缺點(diǎn):
  • 可能的內(nèi)存消耗:由于是無(wú)界隊(duì)列,在極端情況下可能會(huì)消耗大量?jī)?nèi)存。
  • 性能限制:在某些高競(jìng)爭(zhēng)場(chǎng)景下,CAS 操作可能導(dǎo)致性能瓶頸。 場(chǎng)景:適用于作為任務(wù)隊(duì)列或消息傳遞隊(duì)列,支持高并發(fā)的入隊(duì)和出隊(duì)操作。 業(yè)務(wù)舉例:在一個(gè)分布式計(jì)算系統(tǒng)中, ConcurrentLinkedQueue 可以用于收集各個(gè)計(jì)算節(jié)點(diǎn)的輸出結(jié)果,然后由一個(gè)或多個(gè)消費(fèi)者線程進(jìn)行處理。

這兩個(gè)并發(fā)集合類在 Java 中提供了強(qiáng)大的工具,以支持復(fù)雜的并發(fā)數(shù)據(jù)處理需求,它們各自適用于不同的應(yīng)用場(chǎng)景,可以根據(jù)具體需求選擇合適的并發(fā)集合。kedQueue

使用方式:
import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueDemo {
    // 創(chuàng)建一個(gè) ConcurrentLinkedQueue 實(shí)例
    private final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();

    // 向隊(duì)列中添加元素
    public void add(int number) {
        // add 方法是線程安全的
        queue.add(number);
        System.out.println("Added " + number);
    }

    // 從隊(duì)列中獲取并移除元素
    public Integer poll() {
        // poll 方法是線程安全的,返回并移除隊(duì)列頭部的元素
        Integer result = queue.poll();
        if (result != null) {
            System.out.println("Polled " + result);
        } else {
            System.out.println("Queue is empty");
        }
        return result;
    }

    // 查看隊(duì)列頭部的元素但不移除
    public Integer peek() {
        // peek 方法是線程安全的,返回隊(duì)列頭部的元素但不移除
        Integer result = queue.peek();
        if (result != null) {
            System.out.println("Peeked " + result);
        } else {
            System.out.println("Queue is empty");
        }
        return result;
    }

    // 獲取隊(duì)列的大小
    public int size() {
        // size 方法估算隊(duì)列的大小
        int size = queue.size();
        System.out.println("Queue size: " + size);
        return size;
    }

    public static void main(String[] args) {
        ConcurrentLinkedQueueDemo demo = new ConcurrentLinkedQueueDemo();

        // 啟動(dòng)生產(chǎn)者線程
        Thread producerThread = new Thread(() -> {
            demo.add(1);
            demo.add(2);
            demo.add(3);
        });

        // 啟動(dòng)消費(fèi)者線程
        Thread consumerThread = new Thread(() -> {
            demo.poll();
            demo.poll();
            demo.poll();
            demo.poll(); // 這次調(diào)用應(yīng)該會(huì)返回 null,因?yàn)殛?duì)列已空
        });

        producerThread.start();
        consumerThread.start();

        producerThread.join();
        consumerThread.join();

        // 在所有線程完成后獲取隊(duì)列大小
        demo.size();
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說(shuō)明: 大規(guī)模日志處理系統(tǒng)需要從多個(gè)源實(shí)時(shí)收集、存儲(chǔ)并分析日志數(shù)據(jù)。這些日志數(shù)據(jù)通常由分布在不同服務(wù)器上的應(yīng)用程序生成,并且需要被快速地處理以避免數(shù)據(jù)丟失或延遲問(wèn)題。

為什么需要 ConcurrentLinkedQueue 技術(shù): 在日志處理場(chǎng)景中,日志數(shù)據(jù)的產(chǎn)生速度往往非常快,且來(lái)源眾多,因此需要一個(gè)高效且線程安全的隊(duì)列來(lái)緩存這些日志數(shù)據(jù)。 ConcurrentLinkedQueue 提供了高吞吐量和低延遲的并發(fā)訪問(wèn),無(wú)需使用鎖,使得它特別適合用作日志數(shù)據(jù)的緩沖區(qū)。此外,由于 ConcurrentLinkedQueue 是無(wú)界的,因此不會(huì)阻塞生產(chǎn)者線程,即使在高負(fù)載情況下也能保持高性能。

沒(méi)有 ConcurrentLinkedQueue 技術(shù)會(huì)帶來(lái)什么后果: 沒(méi)有使用 ConcurrentLinkedQueue 或其他高效的并發(fā)隊(duì)列可能會(huì)導(dǎo)致以下問(wèn)題:

  1. 數(shù)據(jù)丟失:如果使用有界隊(duì)列且沒(méi)有適當(dāng)?shù)纳a(chǎn)者速率控制,可能會(huì)因?yàn)殛?duì)列滿導(dǎo)致日志數(shù)據(jù)丟失。
  2. 性能瓶頸:如果使用鎖或其他同步機(jī)制來(lái)保護(hù)共享隊(duì)列,可能會(huì)導(dǎo)致性能瓶頸,尤其是在高并發(fā)場(chǎng)景下。
  3. 系統(tǒng)不穩(wěn)定:在高負(fù)載情況下,如果隊(duì)列處理速度跟不上數(shù)據(jù)產(chǎn)生速度,可能會(huì)導(dǎo)致系統(tǒng)崩潰或重啟。

代碼實(shí)現(xiàn):

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class LogProcessor {
    private final ConcurrentLinkedQueue<String> logQueue = new ConcurrentLinkedQueue<>();
    private final ExecutorService processorPool = Executors.newFixedThreadPool(10);

    public void log(String message) {
        // 生產(chǎn)者線程調(diào)用此方法來(lái)添加日志到隊(duì)列
        logQueue.add(message);
    }

    public void startLogProcessing() {
        // 消費(fèi)者線程池,用于處理隊(duì)列中的日志
        processorPool.submit(() -> {
            while (true) {
                try {
                    // 消費(fèi)者線程調(diào)用此方法來(lái)處理隊(duì)列中的日志
                    String logEntry = logQueue.poll();
                    if (logEntry != null) {
                        processLog(logEntry);
                    } else {
                        TimeUnit.MILLISECONDS.sleep(100); // 避免 CPU 過(guò)載
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }

    private void processLog(String logEntry) {
        // 實(shí)際處理日志的邏輯
        System.out.println("Processing log: " + logEntry);
    }

    public static void main(String[] args) {
        LogProcessor logProcessor = new LogProcessor();
        logProcessor.startLogProcessing();

        // 多個(gè)生產(chǎn)者線程生成日志
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            new Thread(() -> {
                logProcessor.log("Log entry " + finalI);
            }).start();
        }
    }
}

2.12. BlockingQueue

BlockingQueue 是 Java 中用于線程間通信的隊(duì)列,支持阻塞操作,當(dāng)隊(duì)列為空時(shí),獲取元素的操作會(huì)阻塞;當(dāng)隊(duì)列滿時(shí),插入元素的操作會(huì)阻塞。 

圖片圖片

圖解說(shuō)明:
  • Java 線程:表示運(yùn)行中的線程,它們可能需要向隊(duì)列中添加或移除元素。
  • BlockingQueue 實(shí)例:是 BlockingQueue 接口的具體實(shí)現(xiàn),如 ArrayBlockingQueue、 LinkedBlockingQueue 等,用于線程間通信。
  • 內(nèi)部數(shù)據(jù)結(jié)構(gòu):表示 BlockingQueue 內(nèi)部用于存儲(chǔ)元素的數(shù)據(jù)結(jié)構(gòu),如數(shù)組、鏈表等。
  • 隊(duì)列容量:表示 BlockingQueue 的最大容量,如果隊(duì)列有界,則插入操作在隊(duì)列滿時(shí)會(huì)阻塞。
  • 等待區(qū)(元素) :表示當(dāng)隊(duì)列為空時(shí),等待獲取元素的線程集合。
  • 等待區(qū)(空間) :表示當(dāng)隊(duì)列滿時(shí),等待空間釋放的線程集合。
  • 元素添加操作:表示向 BlockingQueue 中添加元素的操作,如果隊(duì)列滿,則操作會(huì)阻塞。
  • 元素移除操作:表示從 BlockingQueue 中移除元素的操作,如果隊(duì)列為空,則操作會(huì)阻塞。
綜合說(shuō)明:
  • 作用: BlockingQueue 是一個(gè)線程安全的隊(duì)列,支持阻塞操作,當(dāng)隊(duì)列為空時(shí),獲取元素的操作會(huì)阻塞;當(dāng)隊(duì)列滿時(shí),插入元素的操作會(huì)阻塞。
  • 背景:在生產(chǎn)者-消費(fèi)者模型中,需要一種機(jī)制來(lái)協(xié)調(diào)生產(chǎn)者和消費(fèi)者之間的操作, BlockingQueue 提供了這種協(xié)調(diào)。
  • 優(yōu)點(diǎn):
  • 線程協(xié)調(diào):自然地實(shí)現(xiàn)了生產(chǎn)者-消費(fèi)者之間的線程協(xié)調(diào)。
  • 阻塞操作:提供了阻塞獲取和阻塞插入的方法,簡(jiǎn)化了并發(fā)編程。
  • 缺點(diǎn):
  • 可能的死鎖:不當(dāng)使用可能導(dǎo)致死鎖,例如一個(gè)線程永久阻塞等待一個(gè)不會(huì)到來(lái)的元素。

  • 性能考慮:在高并發(fā)環(huán)境下,隊(duì)列的容量和鎖策略需要仔細(xì)調(diào)優(yōu)。

  • 場(chǎng)景:適用于生產(chǎn)者-消費(fèi)者場(chǎng)景,如任務(wù)分配、資源池管理等。

  • 業(yè)務(wù)舉例:在消息處理系統(tǒng)中, BlockingQueue 可以用于緩存待處理的消息,生產(chǎn)者線程生成消息并放入隊(duì)列,消費(fèi)者線程從隊(duì)列中取出并處理消息,確保了消息的順序性和系統(tǒng)的響應(yīng)性。

使用方式:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueDemo {
    // 創(chuàng)建一個(gè) LinkedBlockingQueue 實(shí)例,容量限制為10
    private final BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(10);

    // 向 BlockingQueue 中添加元素
    public void produce(Integer element) throws InterruptedException {
        // put 方法在隊(duì)列滿時(shí)阻塞,直到隊(duì)列中有空間
        blockingQueue.put(element);
        System.out.println("Produced: " + element);
    }

    // 從 BlockingQueue 中獲取元素
    public Integer consume() throws InterruptedException {
        // take 方法在隊(duì)列空時(shí)阻塞,直到隊(duì)列中有元素
        Integer element = blockingQueue.take();
        System.out.println("Consumed: " + element);
        return element;
    }

    // 獲取 BlockingQueue 的大小
    public int size() {
        // size 方法返回隊(duì)列當(dāng)前的元素?cái)?shù)量
        return blockingQueue.size();
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueueDemo demo = new BlockingQueueDemo();

        // 創(chuàng)建生產(chǎn)者線程
        Thread producerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 15; i++) {
                    demo.produce(i);
                    Thread.sleep(100); // 生產(chǎn)延時(shí)
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 創(chuàng)建消費(fèi)者線程
        Thread consumerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 15; i++) {
                    int element = demo.consume();
                    Thread.sleep(150); // 消費(fèi)延時(shí)
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producerThread.start();
        consumerThread.start();

        producerThread.join();
        consumerThread.join();

        // 打印最終隊(duì)列的大小
        System.out.println("Final queue size: " + demo.size());
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說(shuō)明: 消息隊(duì)列系統(tǒng)在微服務(wù)架構(gòu)中用于異步處理任務(wù),例如發(fā)送郵件、短信通知等。這些服務(wù)通常由獨(dú)立的服務(wù)實(shí)例處理,以提高系統(tǒng)的響應(yīng)性和可擴(kuò)展性。消息隊(duì)列需要能夠處理高并發(fā)的消息生產(chǎn)和消費(fèi),確保消息的可靠傳遞。

為什么需要 BlockingQueue 技術(shù): BlockingQueue 提供了一種有效的機(jī)制來(lái)處理生產(chǎn)者-消費(fèi)者場(chǎng)景,特別是在面對(duì)高并發(fā)和需要線程安全時(shí)。它能夠使生產(chǎn)者在隊(duì)列滿時(shí)阻塞,消費(fèi)者在隊(duì)列空時(shí)阻塞,從而平衡生產(chǎn)和消費(fèi)的速度,確保系統(tǒng)的穩(wěn)定性和消息的不丟失。

沒(méi)有 BlockingQueue 技術(shù)會(huì)帶來(lái)什么后果:

沒(méi)有使用 BlockingQueue 或其他并發(fā)隊(duì)列可能會(huì)導(dǎo)致以下問(wèn)題:

  1. 消息丟失:在高并發(fā)情況下,如果沒(méi)有適當(dāng)?shù)臋C(jī)制來(lái)控制消息的產(chǎn)生和消費(fèi),可能會(huì)導(dǎo)致消息丟失。
  2. 系統(tǒng)過(guò)載:如果沒(méi)有流控機(jī)制,生產(chǎn)者可能會(huì)過(guò)快地生成消息,導(dǎo)致系統(tǒng)資源耗盡,甚至崩潰。
  3. 數(shù)據(jù)不一致:在多線程環(huán)境下,如果不正確地管理消息的訪問(wèn),可能會(huì)導(dǎo)致數(shù)據(jù)處理的不一致性。

代碼實(shí)現(xiàn):

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class MessageQueueSystem {
    private final BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();

    public void produceMessage(String content) throws InterruptedException {
        // 將消息添加到隊(duì)列中,如果隊(duì)列滿了,生產(chǎn)者線程將被阻塞
        messageQueue.put(new Message(content));
        System.out.println("Message produced: " + content);
    }

    public Message consumeMessage() throws InterruptedException {
        // 從隊(duì)列中取出消息,如果隊(duì)列空了,消費(fèi)者線程將被阻塞
        Message message = messageQueue.take();
        System.out.println("Message consumed: " + message.getContent());
        return message;
    }

    public static void main(String[] args) throws InterruptedException {
        MessageQueueSystem messageQueueSystem = new MessageQueueSystem();

        // 創(chuàng)建生產(chǎn)者線程
        Thread producerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    messageQueueSystem.produceMessage("Message " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 創(chuàng)建消費(fèi)者線程
        Thread consumerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    messageQueueSystem.consumeMessage();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producerThread.start();
        consumerThread.start();

        producerThread.join();
        consumerThread.join();
    }
}

class Message {
    private final String content;

    public Message(String content) {
        this.content = content;
    }

    public String getContent() {
        return content;
    }
}

2.13. Condition

Condition 是 Java 中 java.util.concurrent.locks 包提供的一個(gè)接口,它用于實(shí)現(xiàn)等待/通知機(jī)制。 Condition 通常與 Lock 接口配合使用,允許一個(gè)或多個(gè)線程在某些條件滿足之前掛起,并在條件滿足時(shí)被喚醒。 

圖片圖片

圖解說(shuō)明:
  • Java 線程:表示運(yùn)行中的線程,它們可能需要在某些條件滿足之前掛起。
  • Lock 實(shí)例:是 Lock 接口的具體實(shí)現(xiàn),如 ReentrantLock,用于控制對(duì)共享資源的訪問(wèn)。
  • Condition 實(shí)例:是 Condition 接口的具體實(shí)現(xiàn),與 Lock 實(shí)例配合使用,用于線程間的等待/通知機(jī)制。
  • 等待隊(duì)列(線程) :當(dāng)線程調(diào)用 Condition 的 await() 方法時(shí),如果條件不滿足,線程會(huì)被放入等待隊(duì)列。
  • 共享資源:表示被多個(gè)線程共享的數(shù)據(jù),需要通過(guò) Lock 和 Condition 來(lái)保護(hù)以確保線程安全。
  • 條件檢查:表示線程在嘗試獲取資源之前需要檢查的條件。
  • 喚醒信號(hào):當(dāng)條件滿足時(shí),其他線程會(huì)發(fā)送喚醒信號(hào)給等待隊(duì)列中的線程。
  • 鎖狀態(tài):表示鎖的當(dāng)前狀態(tài),如是否被鎖定,以及鎖定的線程等。
操作流程:
  1. 鎖定:線程通過(guò) Lock 實(shí)例獲取鎖。
  2. 條件檢查:線程檢查條件是否滿足。
  3. 等待:如果條件不滿足,線程調(diào)用 Condition 的 await() 方法,釋放鎖并進(jìn)入等待隊(duì)列。
  4. 喚醒:當(dāng)條件滿足時(shí),其他線程調(diào)用 Condition 的 signal() 或 signalAll() 方法,發(fā)送喚醒信號(hào)給等待隊(duì)列中的線程。
  5. 重新競(jìng)爭(zhēng)鎖:被喚醒的線程重新競(jìng)爭(zhēng)鎖。
  6. 再次檢查條件:線程在重新獲得鎖后,再次檢查條件是否滿足,如果滿足則繼續(xù)執(zhí)行。
綜合說(shuō)明:
  • 作用: Condition 是與 Lock 接口配合使用的同步輔助工具,它允許一個(gè)或多個(gè)線程等待,直到被其他線程喚醒。
  • 背景:在復(fù)雜的同步場(chǎng)景中,需要更細(xì)粒度的控制線程的等待和喚醒, Condition 提供了這種能力。
  • 優(yōu)點(diǎn):
  • 細(xì)粒度控制:提供了比 Object.wait()/ Object.notify() 更靈活的線程間協(xié)調(diào)機(jī)制。
  • 多條件支持:一個(gè)鎖可以關(guān)聯(lián)多個(gè)條件,每個(gè)條件可以獨(dú)立喚醒等待的線程。
  • 缺點(diǎn):
  • 使用復(fù)雜:需要與 Lock 一起使用,增加了編程復(fù)雜度。

  • 錯(cuò)誤使用可能導(dǎo)致死鎖或線程饑餓。

  • 場(chǎng)景:適用于需要線程間復(fù)雜協(xié)調(diào)的場(chǎng)景,如任務(wù)調(diào)度、資源分配等。

  • 業(yè)務(wù)舉例:在酒店預(yù)訂系統(tǒng)中, Condition 可以用于實(shí)現(xiàn)房間狀態(tài)的等待和通知機(jī)制。當(dāng)房間變?yōu)榭臻e時(shí),等待的顧客可以被通知并進(jìn)行預(yù)訂。

使用方式:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
    private final Object[] buffer;
    private int putPtr, takePtr, count;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedBuffer(int size) {
        buffer = new Object[size];
    }

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == buffer.length) { // 等待直到緩沖區(qū)非滿
                notFull.await();
            }
            buffer[putPtr] = x;
            putPtr = (putPtr + 1) % buffer.length;
            count++;
            notEmpty.signal(); // 通知可能等待的消費(fèi)者
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) { // 等待直到緩沖區(qū)非空
                notEmpty.await();
            }
            Object x = buffer[takePtr];
            takePtr = (takePtr + 1) % buffer.length;
            count--;
            notFull.signal(); // 通知可能等待的生產(chǎn)者
            return x;
        } finally {
            lock.unlock();
        }
    }
}

public class ProducerConsumerDemo {
    private final BoundedBuffer buffer;

    public ProducerConsumerDemo(int size) {
        buffer = new BoundedBuffer(size);
    }

    public void produce(String item) {
        buffer.put(item);
    }

    public String consume() {
        return (String) buffer.take();
    }

    public static void main(String[] args) throws InterruptedException {
        final int SIZE = 10;
        final ProducerConsumerDemo demo = new ProducerConsumerDemo(SIZE);

        // 生產(chǎn)者線程
        Thread producerThread = new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                demo.produce("Item " + i);
                try {
                    Thread.sleep(100); // 生產(chǎn)延時(shí)
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 消費(fèi)者線程
        Thread consumerThread = new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                String item = demo.consume();
                System.out.println("Consumed: " + item);
                try {
                    Thread.sleep(150); // 消費(fèi)延時(shí)
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        producerThread.start();
        consumerThread.start();

        producerThread.join();
        consumerThread.join();
    }
}
業(yè)務(wù)代碼案例:

業(yè)務(wù)說(shuō)明: 任務(wù)調(diào)度系統(tǒng)負(fù)責(zé)管理和執(zhí)行定時(shí)任務(wù)。這些任務(wù)可能包括數(shù)據(jù)備份、報(bào)告生成、系統(tǒng)維護(hù)等。系統(tǒng)需要能夠按預(yù)定時(shí)間觸發(fā)任務(wù),并確保任務(wù)在執(zhí)行時(shí)不會(huì)相互干擾。

為什么需要 Condition 技術(shù): 在任務(wù)調(diào)度系統(tǒng)中,任務(wù)的觸發(fā)通常依賴于時(shí)間,而任務(wù)的執(zhí)行可能需要等待特定條件滿足。 Condition 配合 Lock 使用,可以在沒(méi)有任務(wù)可執(zhí)行時(shí)讓調(diào)度器線程等待,直到有任務(wù)準(zhǔn)備好執(zhí)行。這種機(jī)制允許系統(tǒng)在沒(méi)有任務(wù)執(zhí)行需求時(shí)保持空閑,從而節(jié)省資源。

沒(méi)有 Condition 技術(shù)會(huì)帶來(lái)什么后果:

沒(méi)有使用 Condition 或其他等待/通知機(jī)制可能會(huì)導(dǎo)致以下問(wèn)題:

  1. 資源浪費(fèi):如果調(diào)度器不斷輪詢檢查新任務(wù),可能會(huì)浪費(fèi)大量 CPU 資源。
  2. 響應(yīng)性差:在新任務(wù)到來(lái)時(shí),如果沒(méi)有有效的機(jī)制來(lái)喚醒調(diào)度器,可能會(huì)導(dǎo)致任務(wù)執(zhí)行延遲。
  3. 代碼復(fù)雜度:沒(méi)有 Condition,可能需要使用更復(fù)雜的多線程同步機(jī)制,增加了代碼的復(fù)雜性和出錯(cuò)的風(fēng)險(xiǎn)。

代碼實(shí)現(xiàn):

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;

public class TaskScheduler {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition taskAvailable = lock.newCondition();
    private final Queue<Runnable> tasks = new LinkedList<>();

    public void schedule(Runnable task, long delay) {
        lock.lock();
        try {
            tasks.add(() -> {
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                task.run();
            });
            taskAvailable.signal(); // 通知調(diào)度器有新任務(wù)
        } finally {
            lock.unlock();
        }
    }

    public void startScheduling() {
        new Thread(this::runScheduler).start();
    }

    private void runScheduler() {
        lock.lock();
        try {
            while (true) {
                while (tasks.isEmpty()) { // 如果沒(méi)有任務(wù),等待
                    taskAvailable.await();
                }
                Runnable task = tasks.poll();
                task.run();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        TaskScheduler scheduler = new TaskScheduler();
        scheduler.schedule(() -> System.out.println("Task 1 executed at " + new Date()), 2000);
        scheduler.schedule(() -> System.out.println("Task 2 executed at " + new Date()), 4000);
        scheduler.startScheduling();
    }
}

責(zé)任編輯:武曉燕 來(lái)源: Solomon肖哥彈架構(gòu)
相關(guān)推薦

2020-07-06 08:03:32

Java悲觀鎖樂(lè)觀鎖

2024-03-18 12:21:28

Java輕量級(jí)鎖重量級(jí)鎖

2019-11-28 16:00:06

重入鎖讀寫(xiě)鎖樂(lè)觀鎖

2023-10-08 09:34:11

Java編程

2019-01-04 11:18:35

獨(dú)享鎖共享鎖非公平鎖

2024-01-29 01:08:01

悲觀鎖遞歸鎖讀寫(xiě)鎖

2019-10-17 08:51:00

Java悲觀鎖Monitor

2023-07-05 08:18:54

Atomic類樂(lè)觀鎖悲觀鎖

2010-07-26 15:17:46

SQL Server鎖

2010-04-16 15:12:12

ORACLE鎖機(jī)制

2020-04-24 15:44:50

MySQL數(shù)據(jù)庫(kù)鎖機(jī)制

2019-04-12 15:14:44

Python線程

2021-03-31 10:05:26

偏向鎖輕量級(jí)鎖

2024-10-10 09:40:29

2022-03-24 13:36:18

Java悲觀鎖樂(lè)觀鎖

2024-11-29 07:38:12

MySQL數(shù)據(jù)庫(kù)

2018-07-31 10:10:06

MySQLInnoDB死鎖

2025-06-04 02:55:00

MySQL意向鎖記錄鎖

2021-01-15 05:12:14

Java并發(fā)樂(lè)觀鎖

2024-12-16 00:52:26

MySQL數(shù)據(jù)庫(kù)并發(fā)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 黄色一级片aaa | 男女那个视频 | 成人免费视频网站在线看 | 久热精品在线观看视频 | www.99久久.com | 国产一级视频在线观看 | 国产精品久久久久久久久久久免费看 | 亚洲一区二区av | 国产污视频在线 | 毛片一级片 | 亚洲36d大奶网| 日韩有码一区 | 欧美成人免费在线视频 | 日本又色又爽又黄又高潮 | 一区精品在线观看 | 五月天婷婷狠狠 | 91免费在线播放 | 一区二区三区视频在线 | 在线一区 | 成人综合一区 | 亚洲女人天堂成人av在线 | 午夜成人免费视频 | 欧美不卡视频 | 91精品一区二区三区久久久久 | 91日b| 亚洲一区二区三区在线 | 色婷婷精品久久二区二区蜜臂av | 成人二区 | 日韩欧美网| 国产精品免费大片 | 久久精品国产精品青草 | 中文字幕一区在线观看视频 | 日韩av一区二区在线观看 | 中文一区二区 | 亚洲成人在线免费 | 国产欧美精品区一区二区三区 | 欧美激情在线精品一区二区三区 | 久久久久国产精品一区二区 | 免费观看av | 日韩av免费看 | 日韩欧美福利视频 |