成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

1.8w字圖解Java并發容器框架:并發安全 Map、JUC 集合、Java 7 種阻塞隊列正確使用場景和原理詳解

開發 前端
所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。

今天進入 Java 并發編程第二章節,圍繞著并發容器展開,主要內容如下:

  • ConcurrentHashMap的使用和原理
  • ConcurrentLinkedQueue 的使用和原理
  • Java 7 種阻塞隊列使用場景和原理詳解:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue以及LinkedBlockingDeque

ConcurrentHashMap 的使用和原理

Map 是一個接口,它的實現方式有很多種,比如常見的 HashMap、LinkedHashMap,但是這些 Map 的實現并不是線程安全的,在多線程高并發的環境中會出現線程安全的問題。

鑒于 Map 是一個在高并發的應用環境中應用比較廣泛的數據結構,Doug Lea 自 JDK 1.5 版本起在 Java 中引入了 ConcurrentHashMap。

ConcurrentHashMap 的使用

對一個技術的掌握,從使用開始。我們現在來實現一個一個高并發計數器,例如記錄網站訪問量、接口調用次數等。

@Service
public class Counter {
    private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

    public void increase(String key) {
        map.compute(key, (k, v) -> (v == null) ? 1 : v + 1);
    }

    public int get(String key) {
        return map.getOrDefault(key, 0);
    }
}
  • compute :這是一個并發安全原子操作,我們使用 compute 方法實現對計數器的增加操作。

如果 key 不存在則新建一個值為 1 的計數器;

否則將其 value 遞增 1。

  • 通過 get 方法可以獲取指定 key 對應的計數器值。

這個例子比較簡單,現在開始上強度。ConcurrentHashMap 還可以用來實現緩存管理器,例如存儲經常使用的業務數據、系統配置等信息,從而避免頻繁的數據庫查詢或網絡請求。

以下是一個支持過期時間、自動刷新和并發控制的緩存管理器實現,包含詳細注釋和最佳實踐:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
 * 高性能并發緩存管理器
 * @param <K> 鍵類型
 * @param <V> 值類型
 */
publicclass ConcurrentCache<K, V> {

    privatefinal ConcurrentHashMap<K, CacheEntry<V>> cache = new ConcurrentHashMap<>();
    privatefinal Function<K, V> loader;  // 緩存加載器
    privatefinal ScheduledExecutorService cleaner;  // 過期清理線程

    // 默認配置
    privatelong defaultTTL = 30_000;          // 默認30秒
    privatelong cleanupInterval = 5_000;      // 5秒清理一次
    privateint maxRetries = 3;                // 最大重試次數
    privateboolean refreshOnAccess = true;    // 訪問時刷新TTL

    public ConcurrentCache(Function<K, V> loader) {
        this.loader = loader;
        this.cleaner = Executors.newSingleThreadScheduledExecutor();
        startCleanupTask();
    }

    /**
     * 獲取緩存值(線程安全)
     */
    public V get(K key) {
        CacheEntry<V> entry = cache.get(key);

        // 無緩存或已過期時加載
        if (entry == null || entry.isExpired()) {
            return loadAndCache(key);
        }

        // 更新訪問時間(可選)
        if (refreshOnAccess) {
            entry.touch();
        }
        return entry.value;
    }

    /**
     * 原子性的加載和緩存操作
     */
    private V loadAndCache(K key) {
        int retry = 0;
        while (retry++ < maxRetries) {
            try {
                // 使用compute保證原子性
                CacheEntry<V> newEntry = cache.compute(key, (k, oldEntry) -> {
                    // 檢查其他線程是否已經加載
                    if (oldEntry != null && !oldEntry.isExpired()) {
                        return oldEntry;
                    }

                    V value = loader.apply(k);
                    returnnew CacheEntry<>(value, defaultTTL, TimeUnit.MILLISECONDS);
                });

                return newEntry.value;
            } catch (Exception ex) {
                if (retry >= maxRetries) {
                    thrownew CacheLoadException("加載緩存失敗,key=" + key, ex);
                }
                // 指數退避重試
                sleepUninterruptibly((long) Math.pow(2, retry), TimeUnit.MILLISECONDS);
            }
        }
        thrownew CacheLoadException("超過最大重試次數,key=" + key);
    }

    /**
     * 主動放入緩存(支持自定義TTL)
     */
    public void put(K key, V value, long ttl, TimeUnit unit) {
        cache.put(key, new CacheEntry<>(value, ttl, unit));
    }

    /**
     * 啟動定期清理任務(雙重檢查鎖模式)
     */
    private void startCleanupTask() {
        if (cleaner.isShutdown()) return;

        cleaner.scheduleWithFixedDelay(() -> {
            cache.forEach((key, entry) -> {
                if (entry.isExpired()) {
                    cache.remove(key, entry); // 使用CAS刪除
                }
            });
        }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
    }

    // 其他實用方法
    public void remove(K key) { cache.remove(key); }
    public void clear() { cache.clear(); }
    public long size() { return cache.mappingCount(); }

    // 配置方法(Builder模式風格)
    public ConcurrentCache<K, V> defaultTTL(long ttl, TimeUnit unit) {
        this.defaultTTL = unit.toMillis(ttl);
        returnthis;
    }

    public ConcurrentCache<K, V> cleanupInterval(long interval, TimeUnit unit) {
        this.cleanupInterval = unit.toMillis(interval);
        returnthis;
    }

    // 異常處理
    privatestaticclass CacheLoadException extends RuntimeException {
        CacheLoadException(String message, Throwable cause) {
            super(message, cause);
        }

        CacheLoadException(String message) {
            super(message);
        }
    }

    // 工具方法:不可中斷的休眠
    private static void sleepUninterruptibly(long duration, TimeUnit unit) {
        try {
            Thread.sleep(unit.toMillis(duration));
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
    }

    // 關閉時釋放資源
    public void shutdown() {
        cleaner.shutdownNow();
        cache.clear();
    }

    // 緩存條目:包含值、過期時間和訪問時間戳
    privatestaticclass CacheEntry<V> {
        final V value;
        finallong expireAt;  // 絕對過期時間(納秒)
        final AtomicLong accessTime = new AtomicLong(); // 最后訪問時間(納秒)

        CacheEntry(V value, long ttl, TimeUnit unit) {
            this.value = value;
            this.expireAt = System.nanoTime() + unit.toNanos(ttl);
            touch();
        }
        // 刷新訪問時間
        void touch() {
            accessTime.set(System.nanoTime());
        }
        // 判斷是否已過期
        boolean isExpired() {
            return System.nanoTime() > expireAt;
        }
    }
}

實現原理

  • CHM 的源碼有 6k 多行,包含的內容多,精巧,不容易理解;建議在查看源碼的時候,可以首先把握整體結構脈絡,對于一些精巧的優化,哈希技巧可以先了解目的就可以了,不用深究;
  • 對整體把握比較清楚后,在逐步分析,可以比較快速的看懂;
  • JDK1.8 版本中的 CHM,和 JDK1.7 版本的差別非常大,在查看資料的時候要注意區分,1.7 中主要是使用 Segment 分段鎖 來解決并發問題的。

JDK 1.7 版本 ConcurrentHashMap

在 JDK1.7 版本中,ConcurrentHashMap 的數據結構是由一個 Segment 數組和多個 HashEntry 組成。

而每一個 Segment 元素存儲的是 HashEntry 數組+鏈表,并對應一個 ReentrantLock 鎖,用于并發訪問控制。

圖片圖片

以 put 操作為例,來看一下 ConcurrentHashMap 的實現過程:

  1. 首先計算 key 的哈希值;
  2. 根據哈希值找到對應的 Segment;
  3. 獲取 Segment 對應的鎖;
  4. 如果還沒有元素,就直接插入到 Segment 中;
  5. 如果已經存在元素,就循環比較 key 是否相等;
  6. 如果 key 已經存在,就根據要求更新 value;
  7. 如果 key 不存在,就插入新的元素(鏈表或者紅黑樹)。

上述操作中,步驟 2 到 3 相當于對對應的 Segment 加了一個悲觀鎖,如果 Segment 數組只有一個 Segment 元素,效果與 Hashtable 類似;

如果存在多個 Segment,效果就相當于使用了分段鎖機制,提高了并發訪問性能。

JDK 1.8 ConcurrentHashMap

在 JDK1.8 中,ConcurrentHashMap 的實現原理摒棄了這種設計,而是選擇了與 HashMap 類似的數組+鏈表+紅黑樹的方式實現,而加鎖則采用 CAS 和 synchronized 實現。

圖片圖片

其主要區別就在 CHM 支持并發:

  • 使用 Unsafe 方法操作數組內部元素,保證可見性;(U.getObjectVolatile、U.compareAndSwapObject、U.putObjectVolatile);
  • 在更新和移動節點的時候,直接鎖住對應的哈希桶,鎖粒度更小,且動態擴展;
  • 針對擴容慢操作進行優化。

a.首先擴容過程的中,節點首先移動到過度表 nextTable ,所有節點移動完畢時替換散列表 table;

b.移動時先將散列表定長等分,然后逆序依次領取任務擴容,設置 sizeCtl 標記正在擴容;

c.移動完成一個哈希桶或者遇到空桶時,將其標記為 ForwardingNode 節點,并指向 nextTable ;

d.后有其他線程在操作哈希表時,遇到 ForwardingNode 節點,則先幫助擴容(繼續領取分段任務),擴容完成后再繼續之前的操作;

  • 優化哈希表計數器,采用 LongAdder、Striped64 類似思想;
  • 以及大量的哈希算法優化和狀態變量優化;
  • 關注「碼哥跳動」,并設置星標,一起擁抱硬核技術和對象,面向人民幣編程。
類定義和成員變量
// node數組最大容量:2^30=1073741824
privatestaticfinalint MAXIMUM_CAPACITY = 1 << 30;
// 默認初始值,必須是2的幕數
privatestaticfinalint DEFAULT_CAPACITY = 16;
//數組可能最大值,需要與toArray()相關方法關聯
staticfinalint MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并發級別,遺留下來的,為兼容以前的版本
privatestaticfinalint DEFAULT_CONCURRENCY_LEVEL = 16;
// 負載因子
privatestaticfinalfloat LOAD_FACTOR = 0.75f;
// 鏈表轉紅黑樹閥值,> 8 鏈表轉換為紅黑樹
staticfinalint TREEIFY_THRESHOLD = 8;
//樹轉鏈表閥值,小于等于6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
staticfinalint UNTREEIFY_THRESHOLD = 6;
staticfinalint MIN_TREEIFY_CAPACITY = 64;
privatestaticfinalint MIN_TRANSFER_STRIDE = 16;
privatestaticint RESIZE_STAMP_BITS = 16;
// 2^15-1,help resize的最大線程數
privatestaticfinalint MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中記錄size大小的偏移量
privatestaticfinalint RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
staticfinalint MOVED     = -1;
// 樹根節點的hash值
staticfinalint TREEBIN   = -2;
// ReservationNode的hash值
staticfinalint RESERVED  = -3;
// 可用處理器數量
staticfinalint NCPU = Runtime.getRuntime().availableProcessors();
//存放node的數組
transientvolatile Node<K,V>[] table;
/*控制標識符,用來控制table的初始化和擴容的操作,不同的值有不同的含義
 *當為負數時:-1代表正在初始化,-N代表有N-1個線程正在 進行擴容
 *當為0時:代表當時的table還沒有被初始化
 *當為正數時:表示初始化或者下一次進行擴容的大小
 */
privatetransientvolatileint sizeCtl;

上面有幾個重要的地方這里單獨講:

LOAD_FACTOR:

這里的負載系數,同 HashMap 等其他 Map 的系數有明顯區別:

  • 通常的系數默認 0.75,可以由構造函數傳入,當節點數 size 超過 loadFactor * capacity 時擴容;
  • 而 CMH 的系數則固定 0.75(使用 n - (n >>> 2) 表示),構造函數傳入的系數只影響初始化容量,見第 5 個構造函數。

sizeCtl:

sizeCtl 是 CHM 中最重要的狀態變量,其中包括很多中狀態,這里先整體介紹幫助后面源碼理解;

  • sizeCtl = 0 :初始值,還未指定初始容量;
  • sizeCtl > 0 :

table 未初始化,表示初始化容量;

table 已初始化,表示擴容閾值(0.75n);

  • sizeCtl = -1 :表示正在初始化;
  • sizeCtl < -1 :表示正在擴容,具體結構如圖所示:

圖片圖片

Node 節點

Node 是 ConcurrentHashMap 存儲結構的基本單元,繼承于 HashMap 中的 Entry,用于存儲數據,源代碼如下。

static class Node<K,V> implements Map.Entry<K,V> {  // 哈希表普通節點
finalint hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node<K,V> find(int h, Object k) {}   // 主要在擴容時,利用多態查詢已轉移節點
}

staticfinalclass ForwardingNode<K,V> extends Node<K,V> {  // 標識擴容節點
final Node<K,V>[] nextTable;  // 指向成員變量 ConcurrentHashMap.nextTable

  ForwardingNode(Node<K,V>[] tab) {
    super(MOVED, null, null, null);  // hash = -1,快速確定 ForwardingNode 節點
    this.nextTable = tab;
  }

Node<K,V> find(int h, Object k) {}
}

staticfinalclass TreeBin<K,V> extends Node<K,V> { // 紅黑樹根節點
  TreeBin(TreeNode<K,V> b) {
    super(TREEBIN, null, null, null);  // hash = -2,快速確定紅黑樹,
    ...
  }
}
staticfinalclass TreeNode<K,V> extends Node<K,V> { } // 紅黑樹普通節點,其 hash 同 Node 普通節點 > 0;
哈希計算
static finalint MOVED     = -1;          // hash for forwarding nodes
staticfinalint TREEBIN   = -2;          // hash for roots of trees
staticfinalint RESERVED  = -3;          // hash for transient reservations
staticfinalint HASH_BITS = 0x7fffffff;  // usable bits of normal node hash

// 讓高位16位,參與哈希桶定位運算的同時,保證 hash 為正
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
哈希桶可見性

一個數組即使聲明為 volatile,也只能保證這個數組引用本身的可見性,其內部元素的可見性是無法保證的,如果每次都加鎖,則效率必然大大降低,在 CHM 中則使用 Unsafe 方法來保證:

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

staticfinal <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

staticfinal <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
  U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
put 操作

思路是對當前的 table 進行無條件自循環直到 put 成功,可以分成以下六步流程來概述。

  1. 如果沒有初始化就先調用 initTable()方法來進行初始化過程
  2. 如果沒有 hash 沖突就直接 CAS 插入
  3. 如果還在進行擴容操作就先進行擴容
  4. 如果存在 hash 沖突,就加鎖來保證線程安全,這里有兩種情況,一種是鏈表形式就直接遍歷到尾端插入,一種是紅黑樹就按照紅黑樹結構插入,
  5. 最后一個如果該鏈表的數量大于閾值 8,就要先轉換成黑紅樹的結構,break 再一次進入循環
  6. 如果添加成功就調用 addCount()方法統計 size,并且檢查是否需要擴容
public V put(K key, V value) {
    return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) thrownew NullPointerException();
    int hash = spread(key.hashCode()); //兩次hash,減少hash沖突,可以均勻分布
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { //對這個table進行迭代
        Node<K,V> f; int n, i, fh;
        //這里就是上面構造方法沒有進行初始化,在這里進行判斷,為null就調用initTable進行初始化,屬于懶漢模式初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        elseif ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置沒有數據,就直接無鎖插入
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        elseif ((fh = f.hash) == MOVED)//如果在進行擴容,則先進行擴容操作
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //如果以上條件都不滿足,那就要進行加鎖操作,也就是存在hash沖突,鎖住鏈表或者紅黑樹的頭結點
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { //表示該節點是鏈表結構
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //這里涉及到相同的key進行put就會覆蓋原先的value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {  //插入鏈表尾部
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    elseif (f instanceof TreeBin) {//紅黑樹結構
                        Node<K,V> p;
                        binCount = 2;
                        //紅黑樹結構旋轉插入
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) { //如果鏈表的長度大于8時就會進行紅黑樹的轉換
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);//統計size,并且檢查是否需要擴容
    returnnull;
}

流程圖如下所示:

圖片圖片

get 操作

get 方法可能看代碼不是很長,但是他卻能 保證無鎖狀態下的內存一致性 。

public V get(Object key) {
  Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 計算 hash
if ((tab = table) != null && (n = tab.length) > 0 &&  // 確保 table 已經初始化

    // 確保對應的哈希桶不為空,注意這里是 Volatile 語義獲取;因為擴容的時候,是完全拷貝,所以只要不為空,則鏈表必然完整
    (e = tabAt(tab, (n - 1) & h)) != null) {
    if ((eh = e.hash) == h) {
      if ((ek = e.key) == key || (ek != null && key.equals(ek)))
        return e.val;
    }

    // hash < 0,則必然在擴容,原來位置的節點可能全部移動到 i + oldCap 位置,所以利用多態到 nextTable 中查找
    elseif (eh < 0) return (p = e.find(h, key)) != null ? p.val : null;

    while ((e = e.next) != null) { // 遍歷鏈表
      if (e.hash == h &&
        ((ek = e.key) == key || (ek != null && key.equals(ek))))
        return e.val;
    }
  }
returnnull;
}

ConcurrentHashMap 的 get 操作的流程很簡單,也很清晰,可以分為三個步驟來描述.

  1. 計算 hash 值,定位到該 table 索引位置,如果是首節點符合就返回
  2. 如果遇到擴容的時候,會調用標志正在擴容節點 ForwardingNode 的 find 方法,查找該節點,匹配就返回
  3. 以上都不符合的話,就往下遍歷節點,匹配就返回,否則最后就返回 null
size 操作

在 JDK1.8 版本中,對于 size 的計算,在擴容和 addCount()方法就已經有處理了,JDK1.7 是在調用 size()方法才去計算,其實在并發集合中去計算 size 是沒有多大的意義的,因為 size 是實時在變的,只能計算某一刻的大小,但是某一刻太快了,人的感知是一個時間段,所以并不是很精確。

擴容

擴容操作一直都是比較慢的操作,而 CHM 中巧妙的利用任務劃分,使得多個線程可能同時參與擴容;

另外擴容條件也有兩個:

  • 有鏈表長度超過 8,但是容量小于 64 的時候,發生擴容;
  • 節點數超過閾值的時候,發生擴容;

其擴容的過程可描述為:

  • 首先擴容過程的中,節點首先移動到過度表 nextTable ,所有節點移動完畢時替換散列表 table;
  • 移動時先將散列表定長等分,然后逆序依次領取任務擴容,設置 sizeCtl 標記正在擴容;
  • 移動完成一個哈希桶或者遇到空桶時,將其標記為 ForwardingNode 節點,并指向 nextTable ;
  • 后有其他線程在操作哈希表時,遇到 ForwardingNode 節點,則先幫助擴容(繼續領取分段任務),擴容完成后再繼續之前的操作;

如圖:

圖片圖片

具體源碼如下:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    stride = MIN_TRANSFER_STRIDE; // 根據 CPU 數量計算任務步長
if (nextTab == null) {          // 初始化 nextTab
    try {
      @SuppressWarnings("unchecked")
      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];  // 擴容一倍
      nextTab = nt;
    } catch (Throwable ex) {
      sizeCtl = Integer.MAX_VALUE; // 發生 OOM 時,不再擴容
      return;
    }
    nextTable = nextTab;
    transferIndex = n;
  }
int nextn = nextTab.length;
  ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);  // 標記空桶,或已經轉移完畢的桶
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {  // 逆向遍歷擴容
    Node<K,V> f; int fh;
    while (advance) {  // 向前獲取哈希桶
      int nextIndex, nextBound;
      if (--i >= bound || finishing)               // 已經取到哈希桶,或已完成時退出
        advance = false;
      elseif ((nextIndex = transferIndex) <= 0) { // 遍歷到達頭節點,已經沒有待遷移的桶,線程準備退出
        i = -1;
        advance = false;
      }
      elseif (U.compareAndSwapInt
           (this, TRANSFERINDEX, nextIndex,
            nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {  // 當前任務完成,領取下一批哈希桶
        bound = nextBound;
        i = nextIndex - 1;  // 索引指向下一批哈希桶
        advance = false;
      }
    }

    // i < 0  :表示擴容結束,已經沒有待移動的哈希桶
    // i >= n :擴容結束,再次檢查確認
    // i + n >= nextn : 在使用 nextTable 替換 table 時,有線程進入擴容就會出現
    if (i < 0 || i >= n || i + n >= nextn) { // 完成擴容準備退出
      int sc;
      if (finishing) {  // 兩次檢查,只有最后一個擴容線程退出時,才更新變量
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1); // 0.75*2*n
        return;
      }
      if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  // 擴容線程減一
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return;  // 不是最后一個線程,直接退出
        finishing = advance = true;   // 最后一個線程,再次檢查
        i = n;                        // recheck before commit
      }
    }
    elseif ((f = tabAt(tab, i)) == null)  // 當前節點為空,直接標記為 ForwardingNode,然后繼續獲取下一個桶
      advance = casTabAt(tab, i, null, fwd);

    // 之前的線程已經完成該桶的移動,直接跳過,正常情況下自己的任務區間,不會出現 ForwardingNode 節點,
    elseif ((fh = f.hash) == MOVED)  // 此處為極端條件下的健壯性檢查
      advance = true; // already processed

    // 開始處理鏈表
    else {
      // 注意在 get 的時候,可以無鎖獲取,是因為擴容是全拷貝節點,完成后最后在更新哈希桶
      // 而在 put 的時候,是直接將節點加入尾部,獲取修改其中的值,此時如果允許 put 操作,最后就會發生臟讀,
      // 所以 put 和 transfer,需要競爭同一把鎖,也就是對應的哈希桶,以保證內存一致性效果
      synchronized (f) {
        if (tabAt(tab, i) == f) {  // 確認鎖定的是同一個桶
          Node<K,V> ln, hn;
          if (fh >= 0) {  // 正常節點
            int runBit = fh & n;  // hash & n,判斷擴容后的索引
            Node<K,V> lastRun = f;

            // 此處找到鏈表最后擴容后處于同一位置的連續節點,這樣最后一節就不用再一次復制了
            for (Node<K,V> p = f.next; p != null; p = p.next) {
              int b = p.hash & n;
              if (b != runBit) {
                runBit = b;
                lastRun = p;
              }
            }
            if (runBit == 0) {
              ln = lastRun;
              hn = null;
            }
            else {
              hn = lastRun;
              ln = null;
            }

            // 依次將鏈表拆分成,lo、hi 兩條鏈表,即位置不變的鏈表,和位置 + oldCap 的鏈表
            // 注意最后一節鏈表沒有new,而是直接使用原來的節點
            // 同時鏈表的順序也被打亂了,lastRun 到最后為正序,前面一節為逆序
            for (Node<K,V> p = f; p != lastRun; p = p.next) {
              int ph = p.hash; K pk = p.key; V pv = p.val;
              if ((ph & n) == 0)
                ln = new Node<K,V>(ph, pk, pv, ln);
              else
                hn = new Node<K,V>(ph, pk, pv, hn);
            }
            setTabAt(nextTab, i, ln);      // 插入 lo 鏈表
            setTabAt(nextTab, i + n, hn);  // 插入 hi 鏈表
            setTabAt(tab, i, fwd);         // 哈希桶移動完成,標記為 ForwardingNode 節點
            advance = true;                // 繼續獲取下一個桶
          }
          elseif (f instanceof TreeBin) { // 拆分紅黑樹
            TreeBin<K,V> t = (TreeBin<K,V>)f;
            TreeNode<K,V> lo = null, loTail = null; // 為避免最后在反向遍歷,先留頭結點的引用,
            TreeNode<K,V> hi = null, hiTail = null; // 因為順序的鏈表,可以加速紅黑樹構造
            int lc = 0, hc = 0;  // 同樣記錄 lo,hi 鏈表的長度
            for (Node<K,V> e = t.first; e != null; e = e.next) {  // 中序遍歷紅黑樹
              int h = e.hash;
              TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);  // 構造紅黑樹節點
              if ((h & n) == 0) {
                if ((p.prev = loTail) == null)
                  lo = p;
                else
                  loTail.next = p;
                loTail = p;
                ++lc;
              }
              else {
                if ((p.prev = hiTail) == null)
                  hi = p;
                else
                  hiTail.next = p;
                hiTail = p;
                ++hc;
              }
            }

            // 判斷是否需要將其轉化為紅黑樹,同時如果只有一條鏈,那么就可以不用在構造
            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
            setTabAt(nextTab, i, ln);
            setTabAt(nextTab, i + n, hn);
            setTabAt(tab, i, fwd);
            advance = true;
          }
        }
      }
    }
  }
}

ConcurrentLinkedQueue 的使用和原理

ConcurerntLinkedQueue 一個基于單向鏈表的無界線程安全隊列,支持高并發的隊列操作,無需顯式的鎖,而且容量沒有上限。

此隊列按照 FIFO(先進先出)原則對元素進行排序。新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素。

應用場景

常見的使用場景可能包括任務調度、事件處理、日志記錄等。比如訂單處理系統,其中多個生產者生成訂單,多個消費者處理訂單。

// 訂單事件處理器(生產環境級實現)
publicclass OrderEventProcessor {
    // 使用隊列作為訂單緩沖區(無容量限制)
    privatefinal ConcurrentLinkedQueue<OrderEvent> queue = new ConcurrentLinkedQueue<>();
    privatefinal ExecutorService workers = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors() * 2,
        new NamedThreadFactory("order-processor")
    );

    // 初始化處理線程
    public void start() {
        for (int i = 0; i < workers.getCorePoolSize(); i++) {
            workers.submit(this::processEvents);
        }
    }

    // 接收訂單事件(來自網絡IO線程)
    public void receiveEvent(OrderEvent event) {
        queue.offer(event); // 無阻塞插入
        metrics.recordEnqueue(); // 監控埋點
    }

    // 事件處理核心邏輯
    private void processEvents() {
        while (!Thread.currentThread().isInterrupted()) {
            OrderEvent event = queue.poll(); // 無阻塞獲取
            if (event != null) {
                try {
                    handleEvent(event);
                } catch (Exception ex) {
                    handleFailure(event, ex); // 異常處理
                }
            } else {
                // 隊列空時自適應休眠(避免CPU空轉)
                sleepBackoff();
            }
        }
    }

    // 指數退避休眠(動態調節CPU使用率)
    private void sleepBackoff() {
        long delay = 1; // 初始1ms
        while (queue.isEmpty() && delay < 100) {
            try {
                TimeUnit.MILLISECONDS.sleep(delay);
                delay <<= 1; // 指數增加等待時間
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    // 優雅關閉
    public void shutdown() {
        workers.shutdown();
        while (!queue.isEmpty()) {
            // 等待剩余任務處理完成
            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
        }
    }
}

需要注意的是,以下場景不適合。

  1. 阻塞需求,需要 take()阻塞等待。
  2. 有界隊列。
  3. 強一致性,要求精確的 size。

實現原理

以 JDK 17 源碼為基線,ConcurrentLinkedQueue 是 Java 并發包中基于無鎖算法實現的線程安全隊列,專為高并發場景設計。其核心設計目標包括:

  1. 無阻塞操作:通過 CAS 實現非阻塞算法
  2. 線性擴展能力:性能隨 CPU 核心數增加而提升
  3. 弱一致性:迭代器與 size() 方法返回近似值
  4. 內存效率:每個元素僅需 24 字節存儲開銷

ConcurrentLinkedQueue 的結構

ConcurrentLinkedQueue 由 head 節點和 tail 節點組成,每個節點(Node)由節點元素(item)和指向下一個節點的引用(next)組成,節點與節點之間就是通過這個 next 關聯起來,從而組成一張鏈表結構的隊列。

ConcurrentLinkedQueue 的節點都是 Node 類型的。

static finalclass Node<E> {
    volatile E item;
    volatile Node<E> next;


    Node(E item) {
        ITEM.set(this, item);
    }


    Node() {}

    void appendRelaxed(Node<E> next) {

        NEXT.set(this, next);
    }

    boolean casItem(E cmp, E val) {
        return ITEM.compareAndSet(this, cmp, val);
    }
}

入隊操作(offer)

流程圖如下。

圖片圖片

代碼解析。

public boolean offer(E e) {
    final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // CAS插入新節點
            if (NEXT.compareAndSet(p, null, newNode)) {
                // 惰性更新tail(允許失敗)
                if (p != t)
                    TAIL.weakCompareAndSet(this, t, newNode);
                returntrue;
            }
        }
        elseif (p == q) // 處理已移除節點
            p = (t != (t = tail)) ? t : head;
        else// 推進指針
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

優化點:

  1. weakCompareAndSet 減少內存屏障開銷
  2. 允許尾指針最多滯后 log(n) 個節點
  3. 通過 VarHandle 實現精確內存排序

出隊操作(poll)

核心機制:

  1. 兩階段出隊:先標記 item 為 null,再更新 head
  2. 頭指針可能跳躍多個已消費節點
  3. 自動清理無效節點

流程圖如下。

圖片圖片

核心源碼。

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item;
            if ((item = p.item) != null && p.casItem(item, null)) {
                // 成功獲取數據
                if (p != h)
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            elseif ((q = p.next) == null) {
                updateHead(h, p);
                returnnull;
            }
            elseif (p == q)
              // 繼續循環
                 continue restartFromHead;
        }
    }
}

Java 7 種阻塞隊列

Chaya:“什么是阻塞隊列?”

阻塞隊列,顧名思義,首先它是一個隊列,線程 1 往阻塞隊列中添加元素,而線程 2 從阻塞隊列中移除元素。

  • 當阻塞隊列是空時,從隊列中獲取元素的操作將會被阻塞。
  • 當阻塞隊列是滿時,從隊列中添加元素的操作將會被阻塞。

JDK1.8 中的阻塞隊列實現共有 7 個,分別是:

  • ArrayBlockingQueue:基于數組的有界隊列;
  • LinkedBlockingQueue:基于鏈表的無界隊列(可以設置容量);
  • PriorityBlockingQueue:基于二叉堆的無界優先級隊列;
  • DelayQueue:基于 PriorityBlockingQueue 的無界延遲隊列;
  • SynchronousQueue:無容量的阻塞隊列(Executors.newCachedThreadPool() 中使用的隊列);
  • LinkedTransferQueue:基于鏈表的無界隊列;
  • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

阻塞隊列核心接口

阻塞隊列統一實現了BlockingQueue接口,BlockingQueue接口在java.util包Queue接口的基礎上提供了put(e)以及take()兩個阻塞方法。

除了阻塞功能,BlockingQueue 接口還定義了定時的offer以及poll,以及一次性移除方法drainTo。

//插入元素,隊列滿后會拋出異常
boolean add(E e);
//移除元素,隊列為空時會拋出異常
E remove();

//插入元素,成功反會true
boolean offer(E e);
//移除元素
E poll();

//插入元素,隊列滿后會阻塞
void put(E e) throws InterruptedException;
//移除元素,隊列空后會阻塞
E take() throws InterruptedException;

//限時插入
boolean offer(E e, long timeout, TimeUnit unit)
//限時移除
E poll(long timeout, TimeUnit unit);

//獲取所有元素到Collection中
int drainTo(Collection<? super E> c);

阻塞隊列 6 大使用場景

Java 阻塞隊列(BlockingQueue)是并發編程中的核心工具,其線程安全和阻塞特性使其在以下場景中發揮重要作用。

    生產者-消費者模型(經典場景)

    電商系統中,用戶下單后需異步處理庫存扣減、支付回調、物流通知等操作。

    痛點:生產(下單)與消費(處理)速度不一致,需解耦并保證高吞吐。

    public class OrderProcessor {
        // 生產級配置:建議隊列大小為 CPU 核心數*2~4
        privatestaticfinal BlockingQueue<Order> queue = new LinkedBlockingQueue<>(2048);
        privatestaticfinal ExecutorService consumerPool = Executors.newFixedThreadPool(8);
    
        // 生產者(Web 服務線程)
        public void submitOrder(Order order) {
            if (!queue.offer(order)) {  // 隊列滿時快速失敗
                log.warn("Order queue overflow! Reject order: {}", order.getId());
                thrownew ServiceException("系統繁忙,請稍后重試");
            }
            log.info("Order submitted: {}", order.getId());
        }
    
        // 消費者(后臺線程池)
        @PostConstruct
        public void initConsumers() {
            for (int i = 0; i < 8; i++) {
                consumerPool.execute(() -> {
                    while (!Thread.currentThread().isInterrupted()) {
                        try {
                            Order order = queue.take();  // 阻塞直到有訂單
                            processOrder(order);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                });
            }
        }
    
        private void processOrder(Order order) {
            // 扣減庫存、支付回調等業務邏輯
        }
    }

    線程池任務調度

    場景問題

    視頻平臺需將上傳的視頻轉碼為不同分辨率,任務具有突發性。

    public class VideoTranscoder {
        // 使用 PriorityBlockingQueue 確保 VIP 用戶優先處理
        privatestaticfinal BlockingQueue<TranscodeTask> queue =
            new PriorityBlockingQueue<>(1000, Comparator.comparing(TranscodeTask::getPriority));
    
        // 自定義線程池(核心線程數=CPU數, 最大線程數=CPU數*2)
        privatestaticfinal ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 30, TimeUnit.SECONDS, queue
        );
    
        public void submitTranscodeTask(TranscodeTask task) {
            executor.execute(() -> {
                // 執行實際轉碼操作
                process(task);
            });
        }
    
        // 監控隊列狀態(生產環境建議接入 Prometheus)
        public MonitorData getQueueStatus() {
            returnnew MonitorData(
                queue.size(),
                executor.getActiveCount(),
                queue.remainingCapacity()
            );
        }
    }

    生產級要點:

    1. 使用有界隊列避免 OOM
    2. RejectedExecutionHandler 需配置合理拒絕策略
    3. 隊列監控接入告警系統

    流量削峰

    瞬時流量高峰可達平時 100 倍,數據庫無法承受直接壓力。

    public class SeckillService {
        // 隊列容量=商品庫存*2(內存可控)
        privatefinal BlockingQueue<SeckillRequest> queue =
            new ArrayBlockingQueue<>(20000);
    
        // 異步消費隊列
        @Scheduled(fixedRate = 100)
        public void processQueue() {
            List<SeckillRequest> batch = new ArrayList<>(100);
            queue.drainTo(batch, 100);  // 批量取100條
            if (!batch.isEmpty()) {
                seckillDao.batchProcess(batch); // 批量寫入數據庫
            }
        }
    
        public boolean trySeckill(SeckillRequest request) {
            return queue.offer(request);  // 非阻塞提交
        }
    }

    圖片圖片

    生產級設計:

    1. 隊列容量與數據庫吞吐量匹配
    2. 批量處理減少數據庫壓力
    3. 前端配合顯示排隊狀態

    延遲任務調度:訂單超時關閉

    需在訂單創建 30 分鐘后檢查支付狀態,未支付自動關閉。

    public class OrderTimeoutChecker implements Runnable {
        privatefinal DelayQueue<DelayedOrder> queue = new DelayQueue<>();
    
        public void addOrder(Order order) {
            queue.put(new DelayedOrder(order, 30, TimeUnit.MINUTES));
        }
    
        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    DelayedOrder order = queue.take();  // 阻塞直到有到期訂單
                    checkPayment(order.getOrderId());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    
        privatestaticclass DelayedOrder implements Delayed {
            privatefinal Order order;
            privatefinallong expireTime;
    
            // 實現 getDelay() 和 compareTo()
        }
    }

    生產級注意:

    1. 分布式場景需用 Redis/ZooKeeper 替代
    2. 集群環境下需防重復處理
    3. 添加 JVM 關閉鉤子確保任務不丟失

    異步日志系統

    需要記錄詳細業務日志但磁盤 I/O 不能影響主線程性能。

    public class AsyncLogger {
        privatestaticfinal BlockingQueue<LogEvent> queue =
            new LinkedTransferQueue<>();  // 高吞吐無界隊列
    
        static {
            // 守護線程消費日志
            Thread loggerThread = new Thread(() -> {
                while (true) {
                    try {
                        LogEvent event = queue.take();
                        writeToDisk(event);
                    } catch (InterruptedException e) {
                        // 優雅關閉處理
                        drainRemainingLogs();
                        break;
                    }
                }
            });
            loggerThread.setDaemon(true);
            loggerThread.start();
        }
    
        public static void log(LogEvent event) {
            if (!queue.offer(event)) {  // 防御性設計
                fallbackLog(event);
            }
        }
    }

    線程池隊列

    • 線程池中活躍線程數達到 corePoolSize 時,線程池將會將后續的 task 提交到 BlockingQueue 中;
    • 線程池的核心方法 ThreadPoolExecutor,用 BlockingQueue 存放任務的阻塞隊列,被提交但尚未被執行的任務。
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

    線程池在內部實際也是構建了一個生產者消費者模型,將線程和任務兩者解耦,并不直接關聯,從而良好的緩沖任務,復用線程。

    圖片圖片

    不同的線程池實現用的是不同的阻塞隊列,newFixedThreadPool 和 newSingleThreadExecutor 用的是 LinkedBlockingQueue,newCachedThreadPool 用的是 SynchronousQueue。

    各類隊列對比和選型

    圖片圖片

    生產選型建議:

    • 網絡請求緩沖 → ArrayBlockingQueue(可控內存)
    • 任務調度 → PriorityBlockingQueue(優先級控制)
    • 線程間直接通信 → SynchronousQueue
    • 磁盤 I/O 解耦 → LinkedBlockingQueue(吞吐優先)

    性能優化要點

    隊列監控:

    // 通過 JMX 暴露指標
    public class QueueMonitor implements QueueMonitorMXBean {
        private final BlockingQueue<?> queue;
    
        public int getQueueSize() {
            return queue.size();
        }
    
        // 注冊到 MBeanServer...
    }

    拒絕策略(以線程池為例):

    new ThreadPoolExecutor.CallerRunsPolicy() {  // 生產推薦策略
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            log.warn("Task rejected, running in caller thread");
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    動態擴縮容:

    // 根據監控指標調整隊列容量
    public void adjustQueueCapacity(int newSize) {
        if (queue instanceof ResizableBlockingQueue) {
            ((ResizableBlockingQueue) queue).setCapacity(newSize);
        }
    }

    掌握了各種阻塞隊列的使用場景,接下來我們深入拆解每個阻塞隊列的實現原理。

    ArrayBlockingQueue

    ArrayBlockingQueue是一個底層用數組實現的有界阻塞隊列,有界是指他的容量大小是固定的,不能擴充容量,在初始化時就必須確定隊列大小。

    它通過可重入的獨占鎖ReentrantLock來控制并發,Condition來實現阻塞和通知喚醒。

    結構概述

    public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
        final Object[] items;               // 容器數組
        int takeIndex;                      // 出隊索引
        int putIndex;                       // 入隊索引
        int count;                          // 排隊個數
        final ReentrantLock lock;           // 全局鎖
        private final Condition notEmpty;   // 出隊條件隊列
        private final Condition notFull;    // 入隊條件隊列
        ...
    }

    ArrayBlockingQueue 的結構如圖所示:

    圖片圖片

    • ArrayBlockingQueue 的數組其實是一個邏輯上的環狀結構,在添加、取出數據的時候,并沒有像 ArrayList 一樣發生數組元素的移動(當然除了 removeAt(final int removeIndex));
    • 并且由 takeIndex 和 putIndex 指示讀寫位置;
    • 在讀寫的時候還有兩個讀寫條件隊列。

    阻塞入隊

    阻塞入隊 put 方法:

    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        //獲取獨占鎖
        lock.lockInterruptibly();
        try {
          //如果隊列已滿則通過await阻塞put方法
            while (count == items.length)
                notFull.await();
            //滿足條件,插入元素,并喚醒因notEmpty等待的消費線程
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
    //插入元素后將putIndex+1,當隊列使用完后重置為0
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
    //隊列添加元素后喚醒因notEmpty等待的消費線程
        notEmpty.signal();
    }

    阻塞出隊

    //移除隊列中的元素
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
    //獲取獨占鎖
        lock.lockInterruptibly();
        try {
          //如果隊列已空則通過await阻塞take方法
            while (count == 0)
                notEmpty.await();
            return dequeue(); //移除元素
        } finally {
            lock.unlock();
        }
    }
    
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
    //移除元素后將takeIndex+1,當隊列使用完后重置為0
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    //隊列消費元素后喚醒因notFull等待的消費線程
        notFull.signal();
        return x;
    }

    隊列滿后通過notFull.await()來阻塞生產者線程,消費元素后通過 notFull.signal()來喚醒阻塞的生產者線程。

    隊列為空后通過notEmpty.await()來阻塞消費者線程,生產元素后通過notEmpty.signal()喚醒阻塞的消費者線程。

    drainTo

    drainTo方法可以一次性獲取隊列中所有的元素,它減少了鎖定隊列的次數,使用得當在某些場景下對性能有不錯的提升。

    public int drainTo(Collection<? super E> c, int maxElements) {
        checkNotNull(c);
        if (c == this)
            thrownew IllegalArgumentException();
        if (maxElements <= 0)
            return0;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock; //僅獲取一次鎖
        lock.lock();
        try {
            int n = Math.min(maxElements, count); //獲取隊列中所有元素
            int take = takeIndex;
            int i = 0;
            try {
                while (i < n) {
                    @SuppressWarnings("unchecked")
                    E x = (E) items[take];
                    c.add(x); //循環插入元素
                    items[take] = null;
                    if (++take == items.length)
                        take = 0;
                    i++;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                    count -= i;
                    takeIndex = take;
                    if (itrs != null) {
                        if (count == 0)
                            itrs.queueIsEmpty();
                        elseif (i > take)
                            itrs.takeIndexWrapped();
                    }
                    for (; i > 0 && lock.hasWaiters(notFull); i--)
                        notFull.signal(); //喚醒等待的生產者線程
                }
            }
        } finally {
            lock.unlock();
        }
    }

    LinkedBlockingQueue

    LinkedBlockingQueue是一個底層用單向鏈表實現的有界阻塞隊列,和ArrayBlockingQueue一樣,采用ReentrantLock來控制并發,不同的是它使用了兩個獨占鎖來控制消費和生產。

    如果不是特殊業務,LinkedBlockingQueue 使用時,切記要定義容量 new LinkedBlockingQueue(capacity),防止過度膨脹。

    結構概述

    public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    // 默認 Integer.MAX_VALUE
    privatefinalint capacity;
    // 容量
    privatefinal AtomicInteger count = new AtomicInteger();
    // 頭結點 head.item == null
    transient Node<E> head;
    // 尾節點 last.next == null
    privatetransient Node<E> last;
    // take鎖,出隊鎖,只有take,poll方法會持有
    privatefinal ReentrantLock takeLock = new ReentrantLock();
    // 出隊等待條件
    // 當隊列無元素時,take鎖會阻塞在notEmpty條件上,等待其它線程喚醒
    privatefinal Condition notEmpty = takeLock.newCondition();
    // 入隊鎖,只有put,offer會持有
    privatefinal ReentrantLock putLock = new ReentrantLock();
    // 入隊等待條件
    // 當隊列滿了時,put鎖會會阻塞在notFull上,等待其它線程喚醒
    privatefinal Condition notFull = putLock.newCondition();
    // 基于鏈表實現,肯定要有結點類,典型的單鏈表結構
    staticclass Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
      }
    }

    LinkedBlockingQueue 的結構如圖所示:

    圖片圖片

    如圖所示,

    • LinkedBlockingQueue 其實就是一個簡單的單向鏈表,其中頭部元素的數據為空,尾部元素的 next 為空;
    • 因為讀寫都有競爭,所以在頭部和尾部分別有一把鎖;同時還有對應的兩個條件隊列;

    put 和 take 方法

    public void put(E e) throws InterruptedException {
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        //因為使用了雙鎖,需要使用AtomicInteger計算元素總量,避免并發計算不準確
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                //隊列已滿,阻塞生產線程
                notFull.await();
            }
          //插入元素到隊列尾部
            enqueue(node);
          //count + 1
            c = count.getAndIncrement();
          //如果+1后隊列還未滿,通過其他生產線程繼續生產
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
    //只有當之前是空時,消費隊列才會阻塞,否則是不需要通知的
        if (c == 0)
            signalNotEmpty();
    }
    
    private void enqueue(Node<E> node) {
        //將新元素添加到鏈表末尾,然后將last指向尾部元素
        last = last.next = node;
    }
    
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
              //隊列為空,阻塞消費線程
                notEmpty.await();
            }
          //消費一個元素
            x = dequeue();
          //count - 1
            c = count.getAndDecrement();
          // 通知其他等待的消費線程繼續消費
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
          //只有當前隊列是滿的,生產隊列才會阻塞,否則是不需要通知的
            signalNotFull();
        return x;
    }
    
    //消費隊列頭部的下一個元素,同時將新頭部置空
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    可以看到LinkedBlockingQueue通過takeLock和putLock兩個鎖來控制生產和消費,互不干擾,只要隊列未滿,生產線程可以一直生產,只要隊列不為空,消費線程可以一直消費,不會相互因為獨占鎖而阻塞。

    Chaya:“為什么 ArrayBlockingQueue 中不使用雙鎖來實現隊列的生產和消費呢?”

    我的理解是 ArrayBlockingQueue 也能使用雙鎖來實現功能,但由于它底層使用了數組這種簡單結構,相當于一個共享變量,如果通過兩個鎖,需要更加精確的鎖控制。

    LinkedBlockingQueue不存在這個問題,鏈表這種數據結構頭尾節點都相對獨立,存儲上也不連續,雙鎖控制不存在復雜性。

    PriorityBlockingQueue

    PriorityBlockingQueue是一個底層由數組實現的無界隊列,并帶有排序功能,同樣采用ReentrantLock來控制并發。

    由于是無界的,所以插入元素時不會阻塞,沒有隊列滿的狀態,只有隊列為空的狀態。

    PriorityBlockingQueue 具有以下特性:

    • 基于優先級排序:元素按自然順序(Comparable)或自定義 Comparator 排序。
    • 線程安全:通過 ReentrantLock 保證并發操作的安全性。
    • 動態擴容:底層是數組實現的二叉堆,容量不足時自動擴容。
    • 阻塞操作:take() 在隊列為空時阻塞;put() 不會阻塞(因為隊列無界)。

    適用場景

    例子中,創建了一個優先級阻塞隊列,用于存儲和檢索PriorityTask對象,這些對象根據它們的優先級進行排序,client 代碼會向隊列中添加任務,并從隊列中檢索并處理優先級最高的任務。

    // 定義一個具有優先級的任務類
    class PriorityTask implements Comparable<PriorityTask>{
        privateint priority;
        private String name;
    
        public PriorityTask(int priority, String name) {
            this.priority = priority;
            this.name = name;
        }
    
        @Override
        public int compareTo(PriorityTask o) {
            if (this.priority < o.priority) {
                return -1;
            } elseif (this.priority > o.priority) {
                return1;
            } else {
                return0;
            }
        }
    
        @Override
        public String toString() {
            return"PriorityTask{" +
                    "priority=" + priority +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
    
    
    publicclass PriorityBlockingQueueExample {
    
        public static void main(String[] args) throws InterruptedException {
            // 創建一個優先級阻塞隊列,使用PriorityTask類的自然順序進行排序
            PriorityBlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();
    
            // 添加任務到隊列
            queue.put(new PriorityTask(3, "Task 3"));
            queue.put(new PriorityTask(1, "Task 1"));
            queue.put(new PriorityTask(2, "Task 2"));
    
            // 從隊列中取出并打印任務,優先級高的先出隊
            while (!queue.isEmpty()) {
                PriorityTask task = queue.take();
                System.out.println("Processing: " + task);
            }
        }
    
    }

    在這個示例中,定義了一個名為 PriorityTask 的類,它實現了 Comparable 接口,并且重寫了 compareTo 方法來定義優先級規則。

    隊列中的元素將根據這個規則自動排序,從而保證優先級高的任務先被處理。

    實現原理

    PriorityBlockingQueue 內部通過 數組 維護一個 最小二叉堆(默認),堆頂元素始終是優先級最高的(最小元素)。 數組下標關系:

    • 父節點:parent = (childIndex - 1) / 2
    • 左子節點:leftChild = parent * 2 + 1
    • 右子節點:rightChild = parent * 2 + 2
    關鍵字段
    // 存儲元素的數組
    privatetransient Object[] queue;
    
    // 元素數量
    privatetransientint size;
    
    // 排序規則(為 null 時使用自然排序)
    privatetransient Comparator<? super E> comparator;
    
    // 保證線程安全的鎖
    privatefinal ReentrantLock lock = new ReentrantLock();
    
    // 非空條件變量(用于 take() 阻塞)
    privatefinal Condition notEmpty = lock.newCondition();
    入隊源碼
    public boolean offer(E e) {
        if (e == null)
            thrownew NullPointerException();
       // 首先獲取鎖對象。
        final ReentrantLock lock = this.lock;
       // 只有一個線程操作入隊和出隊動作。
        lock.lock();
       // n代表數組的實際存儲內容的大小
       // cap代表隊列的整體大小,也就是數組的長度。
        int n, cap;
        Object[] array;
       // 如果數組實際長度大于等于數組的長度時,需要進行擴容操作。
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
           // 如果用戶指定比較器,則使用用戶指定的比較器來進行比較,如果沒有則使用默認比較器。
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
               // 進行上浮操作。
                siftUpComparable(n, e, array);
            else
               // 進行上浮操作。
                siftUpUsingComparator(n, e, array, cmp);
           // 實際長度增加1,由于有且僅有一個線程操作隊列,所以這里并沒有使用原子性操作。
            size = n + 1;
           // 通知等待的線程,隊列已經有數據,可以獲取數據。
            notEmpty.signal();
        } finally {
           // 解鎖操作。
            lock.unlock();
        }
       // 返回操作成功。
        returntrue;
    }
    關鍵步驟
    • 加鎖:確保線程安全。
    • 擴容檢查:若數組已滿,調用 tryGrow() 擴容(通常擴容 50%)。
    • 堆上浮:將新元素插入數組末尾,逐步與父節點比較并交換,直到滿足堆性質。
    • 喚醒消費者:通知可能阻塞的 take() 線程。
    出隊
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
    
    private E dequeue() {
       // 數組的元素的個數。
        int n = size - 1;
       // 如果數組中不存在元素則直接返回null。
        if (n < 0)
            returnnull;
        else {
           // 獲取隊列數組。
            Object[] array = queue;
           // 將第一個元素也就是二叉堆的根結點堆頂元素作為返回結果。
            E result = (E) array[0];
           // 獲取數組中最后一個元素。
            E x = (E) array[n];
           // 將最后一個元素設置為null。
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
               // 進行下沉操作。
                siftDownComparable(0, x, array, n);
            else
               // 進行下沉操作。
                siftDownUsingComparator(0, x, array, n, cmp);
           // 實際元素大小減少1.
            size = n;
           // 返回結果。
            return result;
        }
    }
    關鍵步驟
    • 加鎖:確保線程安全。
    • 取出堆頂:返回堆頂元素(優先級最高)。
    • 堆下沉:將末尾元素移到堆頂,逐步與子節點比較并交換,直到滿足堆性質。

    DelayQueue

    DelayQueue 也是一個無界隊列,它是在PriorityQueue基礎上實現的,先按延遲優先級排序,延遲時間短的排在前面。

    和PriorityBlockingQueue相似,底層也是數組,采用一個ReentrantLock來控制并發。

    由于是無界的,所以插入元素時不會阻塞,沒有隊列滿的狀態。

    private finaltransient ReentrantLock lock = new ReentrantLock();
    privatefinal PriorityQueue<E> q = new PriorityQueue<E>();//優先級隊列
    
    public void put(E e) {
        offer(e);
    }
    
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e); //插入元素到優先級隊列
            if (q.peek() == e) { //如果插入的元素在隊列頭部
                leader = null;
                available.signal(); //通知消費線程
            }
            returntrue;
        } finally {
            lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek(); //獲取頭部元素
                if (first == null)
                    available.await(); //空隊列阻塞
                else {
                    long delay = first.getDelay(NANOSECONDS); //檢查元素是否延遲到期
                    if (delay <= 0)
                        return q.poll(); //到期則彈出元素
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay); //阻塞未到期的時間
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

    SynchronousQueue

    SynchronousQueue相比較之前的 4 個隊列就比較特殊了,它是一個沒有容量的隊列,也就是說它內部時不會對數據進行存儲,每進行一次 put 之后必須要進行一次 take,否則相同線程繼續 put 會阻塞。

    這種特性很適合做一些傳遞性的工作,一個線程生產,一個線程消費。

    這里只對它的非公平實現下的 take 和 put 方法做下簡單分析:

    //非公平情況下調用內部類TransferStack的transfer方法put
    public void put(E e) throws InterruptedException {
        if (e == null) thrownew NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            thrownew InterruptedException();
        }
    }
    //非公平情況下調用內部類TransferStack的transfer方法take
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        thrownew InterruptedException();
    }
    
    //具體的put以及take方法,只有E的區別,通過E來區別REQUEST還是DATA模式
    E transfer(E e, boolean timed, long nanos) {
        SNode s = null; // constructed/reused as needed
        int mode = (e == null) ? REQUEST : DATA;
    
        for (;;) {
            SNode h = head;
            //棧無元素或者元素和插入的元素模式相匹配,也就是說都是插入元素
            if (h == null || h.mode == mode) {
                //有時間限制并且超時
                if (timed && nanos <= 0) {
                    if (h != null && h.isCancelled())
                        casHead(h, h.next);  // 重新設置頭節點
                    else
                        returnnull;
                }
                //未超時cas操作嘗試設置頭節點
                elseif (casHead(h, s = snode(s, e, h, mode))) {
                    //自旋一段時間后未消費元素則掛起put線程
                    SNode m = awaitFulfill(s, timed, nanos);
                    if (m == s) {               // wait was cancelled
                        clean(s);
                        returnnull;
                    }
                    if ((h = head) != null && h.next == s)
                        casHead(h, s.next);     // help s's fulfiller
                    return (E) ((mode == REQUEST) ? m.item : s.item);
                }
            }
            //棧不為空并且和頭節點模式不匹配,存在元素則消費元素并重新設置head節點
            elseif (!isFulfilling(h.mode)) { // try to fulfill
                if (h.isCancelled())            // already cancelled
                    casHead(h, h.next);         // pop and retry
                elseif (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                    for (;;) { // loop until matched or waiters disappear
                        SNode m = s.next;       // m is s's match
                        if (m == null) {        // all waiters are gone
                            casHead(s, null);   // pop fulfill node
                            s = null;           // use new node next time
                            break;              // restart main loop
                        }
                        SNode mn = m.next;
                        if (m.tryMatch(s)) {
                            casHead(s, mn);     // pop both s and m
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        } else                  // lost match
                            s.casNext(m, mn);   // help unlink
                    }
                }
            }
            //節點正在匹配階段
            else {                            // help a fulfiller
                SNode m = h.next;               // m is h's match
                if (m == null)                  // waiter is gone
                    casHead(h, null);           // pop fulfilling node
                else {
                    SNode mn = m.next;
                    if (m.tryMatch(h))          // help match
                        casHead(h, mn);         // pop both h and m
                    else                        // lost match
                        h.casNext(m, mn);       // help unlink
                }
            }
        }
    }
    
    //先自旋后掛起的核心方法
    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        finallong deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        //計算自旋的次數
        int spins = (shouldSpin(s) ?
                        (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (;;) {
            if (w.isInterrupted())
                s.tryCancel();
            SNode m = s.match;
            //匹配成功過返回節點
            if (m != null)
                return m;
            //超時控制
            if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    s.tryCancel();
                    continue;
                }
            }
            //自旋檢查,是否進行下一次自旋
            if (spins > 0)
                spins = shouldSpin(s) ? (spins-1) : 0;
            elseif (s.waiter == null)
                s.waiter = w; // establish waiter so can park next iter
            elseif (!timed)
                LockSupport.park(this); //在這里掛起線程
            elseif (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

    代碼非常復雜,這里說下我所理解的核心邏輯。

    代碼中可以看到put以及take方法都是通過調用transfer方法來實現的,然后通過參數mode來區別,在生產元素時如果是同一個線程多次put則會采取自旋的方式多次嘗試put元素,可能自旋過程中元素會被消費,這樣可以及時put,降低線程掛起的性能損耗,高吞吐量的核心也在這里。

    消費線程一樣,空棧時也會先自旋,自旋失敗然后通過線程的LockSupport.park方法掛起。

    LinkedTransferQueue

    LinkedTransferQueue 是一個由鏈表結構組成的無界阻塞 TransferQueue 隊列。

    LinkedTransferQueue采用一種預占模式。意思就是消費者線程取元素時,如果隊列不為空,則直接取走數據,若隊列為空,那就生成一個節點(節點元素為 null)入隊,然后消費者線程被等待在這個節點上,后面生產者線程入隊時發現有一個元素為 null 的節點,生產者線程就不入隊了,直接就將元素填充到該節點,并喚醒該節點等待的線程,被喚醒的消費者線程取走元素,從調用的方法返回。我們稱這種節點操作為“匹配”方式。

    隊列實現了 TransferQueue 接口重寫了 tryTransfer 和transfer方法,這組方法和SynchronousQueue` 公平模式的隊列類似,具有匹配的功能.。

    LinkedBlockingDeque

    LinkedBlockingDeque 是一個由鏈表結構組成的雙向阻塞隊列。

    所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。

    相比其他的阻塞隊列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 單詞結尾的方法,表示插入,獲取(peek)或移除雙端隊列的第一個元素。

    以 Last 單詞結尾的方法,表示插入,獲取或移除雙端隊列的最后一個元素。

    另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。

    在初始化 LinkedBlockingDeque 時可以設置容量防止其過渡膨脹,默認容量也是 Integer.MAX_VALUE。

    另外雙向阻塞隊列可以運用在“工作竊取”模式中。

    責任編輯:武曉燕 來源: 碼哥跳動
    相關推薦

    2025-04-22 08:32:50

    2012-03-06 11:01:44

    Java

    2021-01-09 13:57:05

    阻塞隊列并發

    2025-05-08 08:31:33

    2025-04-14 08:31:20

    2025-04-23 08:31:26

    Java并發框架

    2017-02-14 10:00:19

    Java開發Lock

    2018-09-19 14:53:02

    NIOBIO運行

    2016-09-01 09:01:00

    MySQLRedisJMQ

    2025-06-13 08:00:00

    Java并發編程volatile

    2025-04-27 08:30:48

    2025-06-10 10:15:00

    Java容器并發

    2019-07-18 11:08:09

    Java并發框架

    2024-05-29 08:49:45

    2024-02-23 10:10:00

    List接口Java

    2021-04-30 08:39:10

    架構消息隊列高并發

    2019-08-28 14:25:00

    線程安全容器

    2017-04-12 10:02:21

    Java阻塞隊列原理分析

    2024-08-26 09:51:57

    2020-09-22 12:00:23

    Javahashmap高并發
    點贊
    收藏

    51CTO技術棧公眾號

    主站蜘蛛池模板: hitomi一区二区三区精品 | 日韩欧美国产精品一区二区 | 日本一区二区视频 | 精品国模一区二区三区欧美 | 亚洲精品一二三区 | 999久久久 | 欧美 日韩 国产 成人 在线 91 | 丁香综合| 中文字幕一区在线观看视频 | 久久久青草婷婷精品综合日韩 | www四虎com | 亚洲欧美在线一区 | 日韩欧美在线播放 | jlzzjlzz欧美大全| 翔田千里一区二区 | 午夜成人在线视频 | 97超碰在线播放 | 国产精品国产成人国产三级 | 性色在线 | 国产精品视频播放 | 暴草美女 | 欧美午夜视频 | 日本不卡免费新一二三区 | 欧洲亚洲一区二区三区 | 欧美在线观看免费观看视频 | 黑人巨大精品欧美一区二区一视频 | 99久久久无码国产精品 | 91欧美| 精品国产一二三区 | 免费一级毛片 | 中文字幕 国产精品 | 亚洲精品乱码久久久久久按摩观 | 农村真人裸体丰满少妇毛片 | 99热这里都是精品 | 91麻豆久久久 | 在线观看 亚洲 | 精品欧美激情在线观看 | 日本免费一区二区三区四区 | 成人伊人 | 欧美精品一区二区三区四区 在线 | 亚洲欧美日韩在线一区二区 |