Java 并發(fā)容器總結(jié)
一、詳解并發(fā)場(chǎng)景下的Map容器
1. 詳解JDK7版本HashMap
(1) jdk7版本下的HashMap數(shù)據(jù)結(jié)構(gòu)
jdk7版本的hashMap底層采用數(shù)組加鏈表的形式存儲(chǔ)元素,假如需要存儲(chǔ)的鍵值對(duì)經(jīng)過(guò)計(jì)算發(fā)現(xiàn)存放的位置已經(jīng)存在鍵值對(duì)了,那么就是用頭插法將新節(jié)點(diǎn)插入到這個(gè)位置。
圖片
對(duì)應(yīng)的我們也給出JDK7版本下的put方法,該版本進(jìn)行元素插入時(shí)會(huì)通過(guò)hash散列計(jì)算得元素對(duì)應(yīng)的索引位置,也就是我們常說(shuō)的bucket,然后遍歷查看是否存在重復(fù)的key,若存在則直接將value覆蓋。反之,則會(huì)在循環(huán)結(jié)束后調(diào)用addEntry采用頭插法將元素插入:
public V put(K key, V value) {
//......
//計(jì)算key的散列值
int hash = hash(key);
int i = indexFor(hash, table.length);
//定位到對(duì)應(yīng)桶的位置,查看是否存在重復(fù)的key,如果有則直接覆蓋
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
//走到這里說(shuō)明要在一個(gè)空的位置添加節(jié)點(diǎn),將modCount自增,并調(diào)用addEntry采用頭插法完成節(jié)點(diǎn)插入
modCount++;
addEntry(hash, key, value, i);
returnnull;
}
對(duì)應(yīng)我們也給出addEntry的邏輯實(shí)現(xiàn),它會(huì)判斷數(shù)組是否需要擴(kuò)容,然后調(diào)用createEntry執(zhí)行頭插法的三步驟:
- 定位到對(duì)應(yīng)bucket的頭節(jié)點(diǎn)
- 將新插入節(jié)點(diǎn)封裝為Entry,后繼節(jié)點(diǎn)指向bucket的頭節(jié)點(diǎn),構(gòu)成以我們節(jié)點(diǎn)為頭節(jié)點(diǎn)的鏈表
- 當(dāng)前bucket指向我們新插入的頭節(jié)點(diǎn)
對(duì)應(yīng)源碼如下所示,讀者可結(jié)合筆者說(shuō)明和注釋了解一下過(guò)程:
void addEntry(int hash, K key, V value, int bucketIndex) {
//查看數(shù)組是否達(dá)到閾值,若達(dá)到則進(jìn)行擴(kuò)容操作
if ((size >= threshold) && (null != table[bucketIndex])) {
resize(2 * table.length);
hash = (null != key) ? hash(key) : 0;
bucketIndex = indexFor(hash, table.length);
}
//使用頭插法將節(jié)點(diǎn)插入
createEntry(hash, key, value, bucketIndex);
}
void createEntry(int hash, K key, V value, int bucketIndex) {
//定位bucket的第一個(gè)節(jié)點(diǎn)
Entry<K,V> e = table[bucketIndex];
//采用頭插法將bucket對(duì)應(yīng)的節(jié)點(diǎn)作為新插入節(jié)點(diǎn)的后繼節(jié)點(diǎn),再讓table[bucketIndex] 指向我們插入的新節(jié)點(diǎn)
table[bucketIndex] = new Entry<>(hash, key, value, e);
size++;
}
(2) jdk7版本下的HashMap的擴(kuò)容
還記得我們上文說(shuō)明HashMap的put操作時(shí)提到的擴(kuò)容方法resize嘛?它的具體實(shí)現(xiàn)如下,可以看到它會(huì)根據(jù)newCapacity創(chuàng)建一個(gè)新的容器newTable ,然后將原數(shù)組的元素通過(guò)transfer方法轉(zhuǎn)移到新的容器newTable中。
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}
//創(chuàng)建新的容器
Entry[] newTable = new Entry[newCapacity];
//將舊的容器的元素轉(zhuǎn)移到新數(shù)組中
transfer(newTable, initHashSeedAsNeeded(newCapacity));
table = newTable;
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}
關(guān)于transfer的邏輯,這里涉及到鏈表元素的轉(zhuǎn)移操作,這里我們也直接以圖文的方式進(jìn)行說(shuō)明,執(zhí)行擴(kuò)容前會(huì)記錄帶轉(zhuǎn)移元素的e及其后繼節(jié)點(diǎn)next:
然后計(jì)算該節(jié)點(diǎn)e擴(kuò)容后要存放到新空間的索引位置i,我們假設(shè)為4,此時(shí)節(jié)點(diǎn)e就會(huì)指向新空間索引4的頭節(jié)點(diǎn)元素,因?yàn)槲覀兪莈ntry-0是第一個(gè)執(zhí)行遷移的元素,此時(shí)新bucket索引4空間為空,所以我們的entry-0指向空:
待遷移節(jié)點(diǎn)entry-0指向 newTable的頭節(jié)點(diǎn)后,對(duì)應(yīng)newTable直接指向這個(gè)遷移節(jié)點(diǎn),由此完成一個(gè)元素entry-0的遷移,同時(shí)e指針指向entry-0的后繼節(jié)點(diǎn)entry-1:
同理,假設(shè)entry-1通過(guò)計(jì)算后也是要遷移到索引4上,entry-1依然按照:指向newTable索引4位置的頭節(jié)點(diǎn),也就是entry-0作為后繼節(jié)點(diǎn)、newTable[4]指向entry-1等步驟不斷循環(huán)完成邏輯元素遷移:
有了上述圖解的基礎(chǔ),我們就可以很好的理解transfer這個(gè)元素遷移的源碼邏輯了:
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
//獲取遷移節(jié)點(diǎn)的后繼節(jié)點(diǎn)
Entry<K,V> next = e.next;
//......
//計(jì)算遷移節(jié)點(diǎn)到新空間的索引位置
int i = indexFor(e.hash, newCapacity);
//節(jié)點(diǎn)e的next指向指向插入位置的頭節(jié)點(diǎn),構(gòu)成一個(gè)以自己為頭節(jié)點(diǎn)的鏈表
e.next = newTable[i];
//newTable[i]位置指向我們的節(jié)點(diǎn)e,完成一個(gè)元素遷移
newTable[i] = e;
//e指向第一步記錄的next指針,執(zhí)行下一輪的元素遷移
e = next;
}
}
}
(3) jdk7版本下的HashMap并發(fā)擴(kuò)容問(wèn)題
當(dāng)我們了解了JDK7版本的hashMap擴(kuò)容過(guò)程之后,我們就從多線程角度看看什么時(shí)候會(huì)出現(xiàn)問(wèn)題,我們不妨想象有兩個(gè)線程同時(shí)在執(zhí)行多線程操作。
我們假設(shè)線程0和線程1并發(fā)執(zhí)行擴(kuò)容,單位時(shí)間內(nèi)二者所維護(hù)的e和next如下圖所示:
假設(shè)線程0先執(zhí)行,按照擴(kuò)容的代碼邏輯完成頭插法將entry-0和entry-1都遷移到索引4上,如下圖所示:
重點(diǎn)來(lái)了,此時(shí)線程1再次獲得CPU時(shí)間片指向代碼邏輯,此時(shí):
- e還是指向entry-0,而next還是指向entry-1
- 執(zhí)行e.next = newTable[i];就會(huì)拿到已遷移的entry-1
- 執(zhí)行 newTable[i] = e;再次指向entry-0,由此關(guān)系構(gòu)成下圖所示的環(huán)路
e = next;再次獲得entry-1,兩個(gè)元素不斷循環(huán)導(dǎo)致CPU100%問(wèn)題:
通過(guò)圖解我們得知CPU100%原因之后,我們不妨通過(guò)代碼來(lái)重現(xiàn)這個(gè)問(wèn)題。
首先我們將項(xiàng)目JDK版本設(shè)置為JDK7。然后定義一個(gè)大小為2的map,閾值為1.5,這也就以為著插入時(shí)看到size為3的時(shí)候會(huì)觸發(fā)擴(kuò)容。
/**
* 這個(gè)map 桶的長(zhǎng)度為2,當(dāng)元素個(gè)數(shù)達(dá)到 2 * 1.5 = 3 的時(shí)候才會(huì)觸發(fā)擴(kuò)容
*/
private static HashMap<Integer,String> map = new HashMap<Integer,String>(2,1.5f);
所以我們的工作代碼如下,先插入3個(gè)元素,然后兩個(gè)線程分別插入第4個(gè)元素。需要補(bǔ)充一句,這幾個(gè)元素的key值是筆者經(jīng)過(guò)調(diào)試后確定存放位置都在同一個(gè)索引上,所以這段代碼會(huì)觸發(fā)擴(kuò)容的邏輯,讀者自定義數(shù)據(jù)樣本時(shí),最好和讀者保持一致。
try{
map.put(5,"5");
map.put(7,"7");
map.put(3,"3");
System.out.println("此時(shí)元素已經(jīng)達(dá)到3了,再往里面添加就會(huì)產(chǎn)生擴(kuò)容操作:" + map);
new Thread("T1") {
public void run() {
map.put(11, "11");
System.out.println(Thread.currentThread().getName() + "擴(kuò)容完畢 " );
};
}.start();
new Thread("T2") {
public void run() {
map.put(15, "15");
System.out.println(Thread.currentThread().getName() + "擴(kuò)容完畢 " + map);
};
}.start();
Thread.sleep(60_000);//時(shí)間根據(jù)debug時(shí)間調(diào)整
//死循環(huán)后打印直接OOM,思考一下為什么?
//因?yàn)榇蛴〉臅r(shí)候回調(diào)用toString回遍歷鏈表,但此時(shí)鏈表已經(jīng)成環(huán)狀了
//那么就會(huì)無(wú)限拼接字符串
// System.out.println(map);
System.out.println(map.get(5));
System.out.println(map.get(7));
System.out.println(map.get(3));
System.out.println(map.get(11));
System.out.println(map.get(15));
System.out.println(map.size());
}catch (Exception e){
}
我們?cè)跀U(kuò)容的核心方法插個(gè)斷點(diǎn),斷點(diǎn)條件設(shè)置為:
Thread.currentThread().getName().equals("T1")||Thread.currentThread().getName().equals("T2")
并且斷點(diǎn)的調(diào)試方式改成thread:
我們首先將線程1斷點(diǎn)調(diào)試到記錄next引用這一步,然后將線程切換為線程2,模擬線程1被掛起。
我們直接將線程2走完,模擬線程2完成擴(kuò)容這一步,然后IDEA會(huì)自動(dòng)切回線程1,我們也將線程1直接走完。
從控制臺(tái)輸出結(jié)果來(lái)看,控制臺(tái)遲遲無(wú)法結(jié)束,說(shuō)明擴(kuò)容的操作遲遲無(wú)法完成,很明顯線程1的擴(kuò)容操作進(jìn)入死循環(huán),CPU100%問(wèn)題由此印證。
2. 詳解JDK8版本的HashMap
(1) 基本數(shù)據(jù)結(jié)構(gòu)
jdk8對(duì)HashMap底層數(shù)據(jù)結(jié)構(gòu)做了調(diào)整,從原本的數(shù)組+鏈表轉(zhuǎn)為數(shù)組+鏈表/紅黑樹的形式,即保證在數(shù)組長(zhǎng)度大于64且當(dāng)前節(jié)點(diǎn)鏈表長(zhǎng)度達(dá)到8的情況下,為避免元素哈希定位退化為O(n)級(jí)別的遍歷,通過(guò)鏈表樹化為紅黑樹來(lái)保證查詢效率:
對(duì)此我們也給出該版本的HashMap源碼,因?yàn)樽髡叩娘L(fēng)格比較經(jīng)典,筆者這里就按照核心的4條主線進(jìn)行說(shuō)明:
- 經(jīng)過(guò)哈希運(yùn)算后,對(duì)應(yīng)bucket不存在元素,直接基于key和value生成Node插入。
- 如果定位到的元素key一樣,默認(rèn)情況下直接將元素值覆蓋并返回舊元素。
- 如果定位到的key對(duì)應(yīng)bucket非空且為樹節(jié)點(diǎn)TreeNode則到樹節(jié)點(diǎn)中找到重復(fù)元素覆蓋或者將新節(jié)點(diǎn)插入。
- 如果key對(duì)應(yīng)的bucket為鏈表,則遍歷找到重復(fù)節(jié)點(diǎn)覆蓋或者找到后繼節(jié)點(diǎn)插入。
對(duì)應(yīng)我們put方法對(duì)應(yīng)的核心源碼如下,讀者可以結(jié)合注釋了解一下:
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
//用p記錄哈希定位后的bucket,若為空則直接創(chuàng)建節(jié)點(diǎn)存入該bucket中
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
//若定位到的元素key和當(dāng)前key一致則將該引用存到e中,后續(xù)進(jìn)行覆蓋處理
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
elseif (p instanceof TreeNode)//說(shuō)明定位到的bucket
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
//定位到鏈表中的最后一個(gè)節(jié)點(diǎn)
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
//定位到key相等的元素,如果onlyIfAbsent 設(shè)置為false即允許存在時(shí)覆蓋,則直接將元素覆蓋,返回就有值
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
//......
returnnull;
}
(2) 多線程操作下的鍵值對(duì)覆蓋問(wèn)題
筆者截取上述片段中的某個(gè)代碼段,即哈希定位桶為空的節(jié)點(diǎn)添加操作:
//如果數(shù)組對(duì)應(yīng)的索引里面沒(méi)有元素,則直接插入
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
這段代碼,在并發(fā)操作下是存在多線程都判斷到空,然后后者將前者鍵值對(duì)覆蓋的情況,如下圖:
所以我們不妨寫個(gè)代碼印證這個(gè)問(wèn)題,我們創(chuàng)建一個(gè)長(zhǎng)度為2的map,用兩個(gè)線程往map底層數(shù)組的同一個(gè)位置中插入鍵值對(duì)。兩個(gè)線程分別起名為t1、t2,這樣方便后續(xù)debug調(diào)試。
為了驗(yàn)證這個(gè)問(wèn)題,筆者使用countDownLatch阻塞一下流程,只有兩個(gè)線程都完成工作之后,才能執(zhí)行后續(xù)輸出邏輯。
private static HashMap<String, Long> map = new HashMap<>(2, 1.5f);
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
map.put("3", 3L);
countDownLatch.countDown();
}, "t1").start();
new Thread(() -> {
map.put("5", 5L);
countDownLatch.countDown();
}, "t2").start();
//等待上述線程執(zhí)行完,繼續(xù)執(zhí)行后續(xù)輸出邏輯
countDownLatch.await();
System.out.println(map.get("3"));
System.out.println(map.get("5"));
}
然后在插入新節(jié)點(diǎn)的地方打個(gè)斷點(diǎn),debug模式設(shè)置為thread,條件設(shè)置為:
"t1".equals(Thread.currentThread().getName())||"t2".equals(Thread.currentThread().getName())
啟動(dòng)程序,我們?cè)趖1完成判斷,正準(zhǔn)備執(zhí)行創(chuàng)建節(jié)點(diǎn)的操作時(shí)將線程切換為t2:
可以看到t2準(zhǔn)備將(5,5)這個(gè)鍵值對(duì)插入到數(shù)組中,我們直接放行這個(gè)邏輯:
此時(shí)線程自動(dòng)切回t1,我們放行斷點(diǎn),將(3,3)節(jié)點(diǎn)插入到數(shù)組中。此時(shí),我們已經(jīng)順利將線程2的鍵值對(duì)覆蓋了。
可以看到輸出結(jié)果key為5的value為null,hashMap在多線程情況下的索引覆蓋問(wèn)題得以印證。
(3) 如何解決Map的線程安全問(wèn)題
解決map線程安全問(wèn)題有兩種手段,一種是JDK自帶的collections工具,另一種則是并發(fā)容器ConcurrentHashMap
為了演示沖突情況下的性能,我們使用不同的map執(zhí)行100_0000次循環(huán)。
@Slf4j
publicclass MapTest {
@Test
public void mapTest() {
StopWatch stopWatch = new StopWatch();
stopWatch.start("synchronizedMap put");
Map<Object, Object> synchronizedMap = Collections.synchronizedMap(new HashMap<>());
IntStream.rangeClosed(0, 100_0000).parallel().forEach(i -> {
synchronizedMap.put(i, i);
});
stopWatch.stop();
stopWatch.start("concurrentHashMap put");
Map<Object, Object> concurrentHashMap = new ConcurrentHashMap<>();
IntStream.rangeClosed(0, 100_0000).parallel().forEach(i -> {
concurrentHashMap.put(i, i);
});
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
}
從輸出結(jié)果來(lái)看concurrentHashMap 在沖突頻繁的情況下性能更加優(yōu)異。
2023-03-14 20:29:25,669 INFO MapTest:37 - StopWatch '': running time (millis) = 1422
-----------------------------------------
ms % Task name
-----------------------------------------
00930 065% synchronizedMap put
00492 035% concurrentHashMap put
原因很簡(jiǎn)單synchronizedMap的put方法,每次操作都會(huì)上鎖,這意味著無(wú)論要插入的鍵值對(duì)在數(shù)組哪個(gè)位置,執(zhí)行插入操作前都必須先得到操作map的鎖,鎖的粒度非常大:
public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
反觀concurrentHashMap 它本質(zhì)的設(shè)計(jì)是利用了一種鎖升級(jí)的思想,即先通過(guò)CAS完成節(jié)點(diǎn)插入,失敗后才利用synchronized關(guān)鍵字進(jìn)行鎖定操作,同時(shí)鎖的僅僅只是數(shù)組中某個(gè)索引對(duì)應(yīng)的bucket即利用了鎖分段的思想,分散了鎖的粒度和競(jìng)爭(zhēng)的壓力:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) thrownew NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//獲取當(dāng)前鍵值對(duì)要存放的位置f
elseif ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
elseif ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//鎖定的范圍是對(duì)應(yīng)的某個(gè)bucket
synchronized (f) {
//......
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
returnnull;
}
二、詳解ConcurrentHashMap中的操作注意事項(xiàng)
1. 非原子化操作
使用ConcurrentHashMap存放鍵值對(duì),并不一定意味著所有存的操作都是線程安全的。對(duì)于非原子化操作仍然是存在線程安全問(wèn)題。
如下所示,我們的代碼首先會(huì)得到一個(gè)含有900的元素的ConcurrentHashMap,然后開(kāi)10個(gè)線程去查看map中還差多少個(gè)鍵值對(duì)夠1000個(gè),缺多少補(bǔ)多少。
//線程數(shù)
privatestaticint THREAD_COUNT = 10;
//數(shù)據(jù)項(xiàng)的大小
privatestaticint ITEM_COUNT = 1000;
//返回一個(gè)size大小的ConcurrentHashMap
private ConcurrentHashMap<String, Object> getData(int size) {
return LongStream.rangeClosed(1, size)
.parallel()
.boxed()
.collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(),
Function.identity(),
(o1, o2) -> o1,
ConcurrentHashMap::new));
}
@GetMapping("wrong")
public String wrong() throws InterruptedException {
//900個(gè)元素的ConcurrentHashMap
ConcurrentHashMap<String, Object> map = getData(ITEM_COUNT - 100);
log.info("init size:{}", map.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> {
IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//判斷當(dāng)前map缺多少個(gè)元素就夠1000個(gè),缺多少補(bǔ)多少
int gap = ITEM_COUNT - map.size();
log.info("{} the gap:{}",Thread.currentThread().getName(), gap);
map.putAll(getData(gap));
});
});
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", map.size());
return"ok";
}
從輸出結(jié)果可以看出,ConcurrentHashMap只能保存put的時(shí)候是線程安全,但無(wú)法保證put意外的操作線程安全,這段代碼計(jì)算ConcurrentHashMap還缺多少鍵值對(duì)的操作很可能出現(xiàn)多個(gè)線程得到相同的差值,結(jié)果補(bǔ)入相同大小的元素,導(dǎo)致ConcurrentHashMap多存放鍵值對(duì)的情況。
2023-03-1420:52:52,471 INFO ConcurrentHashMapMisuseController:44 - init size:900
2023-03-1420:52:52,473 INFO ConcurrentHashMapMisuseController:51 - ForkJoinPool-1-worker-9 the gap:100
2023-03-1420:52:52,473 INFO ConcurrentHashMapMisuseController:51 - ForkJoinPool-1-worker-2 the gap:100
2023-03-1420:52:52,474 INFO ConcurrentHashMapMisuseController:51 - ForkJoinPool-1-worker-6 the gap:100
2023-03-1420:52:52,474 INFO ConcurrentHashMapMisuseController:51 - ForkJoinPool-1-worker-4 the gap:100
2023-03-1420:52:52,474 INFO ConcurrentHashMapMisuseController:51 - ForkJoinPool-1-worker-13 the gap:100
2023-03-1420:52:52,474 INFO ConcurrentHashMapMisuseController:51 - ForkJoinPool-1-worker-11 the gap:100
2023-03-1420:52:52,474 INFO ConcurrentHashMapMisuseController:51 - ForkJoinPool-1-worker-9 the gap:0
2023-03-1420:52:52,474 INFO ConcurrentHashMapMisuseController:51 - ForkJoinPool-1-worker-15 the gap:0
2023-03-1420:52:52,474 INFO ConcurrentHashMapMisuseController:51 - ForkJoinPool-1-worker-10 the gap:-100
2023-03-1420:52:52,474 INFO ConcurrentHashMapMisuseController:51 - ForkJoinPool-1-worker-9 the gap:0
2023-03-1420:52:52,476 INFO ConcurrentHashMapMisuseController:60 - finish size:1500
解決方式也很簡(jiǎn)單,將查詢?nèi)鄙賯€(gè)數(shù)和put操作原子化,說(shuō)的通俗一點(diǎn)就是對(duì)查和插兩個(gè)操作上一把鎖確保多線程互斥即可。
@GetMapping("right")
public String right() throws InterruptedException {
ConcurrentHashMap<String, Object> map = getData(ITEM_COUNT - 100);
log.info("init size:{}", map.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> {
IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
synchronized (map){
int gap = ITEM_COUNT - map.size();
log.info("{} the gap:{}",Thread.currentThread().getName(), gap);
map.putAll(getData(gap));
}
});
});
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", map.size());
return"ok";
}
可以看到輸出結(jié)果正常了:
2023-03-1420:59:56,730 INFO ConcurrentHashMapMisuseController:69 - init size:900
2023-03-1420:59:56,732 INFO ConcurrentHashMapMisuseController:76 - ForkJoinPool-2-worker-9 the gap:100
2023-03-1420:59:56,733 INFO ConcurrentHashMapMisuseController:76 - ForkJoinPool-2-worker-4 the gap:0
2023-03-1420:59:56,734 INFO ConcurrentHashMapMisuseController:76 - ForkJoinPool-2-worker-8 the gap:0
2023-03-1420:59:56,734 INFO ConcurrentHashMapMisuseController:76 - ForkJoinPool-2-worker-9 the gap:0
2023-03-1420:59:56,734 INFO ConcurrentHashMapMisuseController:76 - ForkJoinPool-2-worker-1 the gap:0
2023-03-1420:59:56,734 INFO ConcurrentHashMapMisuseController:76 - ForkJoinPool-2-worker-15 the gap:0
2023-03-1420:59:56,734 INFO ConcurrentHashMapMisuseController:76 - ForkJoinPool-2-worker-2 the gap:0
2023-03-1420:59:56,734 INFO ConcurrentHashMapMisuseController:76 - ForkJoinPool-2-worker-6 the gap:0
2023-03-1420:59:56,735 INFO ConcurrentHashMapMisuseController:76 - ForkJoinPool-2-worker-11 the gap:0
2023-03-1420:59:56,735 INFO ConcurrentHashMapMisuseController:76 - ForkJoinPool-2-worker-13 the gap:0
2023-03-1420:59:56,737 INFO ConcurrentHashMapMisuseController:87 - finish size:1000
2. 合理使用API發(fā)揮ConcurrentHashMap最大性能
我們會(huì)循環(huán)1000w次,在這1000w次隨機(jī)生成10以內(nèi)的數(shù)字,以10以內(nèi)數(shù)字為key,出現(xiàn)次數(shù)為value存放到ConcurrentHashMap中。
你可能會(huì)寫出這樣一段代碼:
//map中的項(xiàng)數(shù)
privatestaticint ITEM_COUNT = 10;
//線程數(shù)
privatestaticint THREAD_COUNT = 10;
//循環(huán)次數(shù)
privatestaticint LOOP_COUNT = 1000_0000;
private Map<String, Long> normaluse() throws InterruptedException {
Map<String, Long> map = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
LongStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
synchronized (map) {
if (map.containsKey(key)) {
map.put(key, map.get(key) + 1);
} else {
map.put(key, 1L);
}
}
});
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return map;
}
實(shí)際上判斷key是否存在,若不存在則初始化這個(gè)key的操作,在ConcurrentHashMap中已經(jīng)提供好了這樣的API。 我們通過(guò)computeIfAbsent進(jìn)行判斷key是否存在,若不存在則初始化的原子操作,注意此時(shí)的value是一個(gè)Long類型的累加器,這個(gè)LongAdder是一個(gè)線程安全的累加器,通過(guò)LongAdder的increment方法確保多線程情況下,這一點(diǎn)我們可以在LongAdder的注釋中得知。
LongAdders can be used with a {@link
* java.util.concurrent.ConcurrentHashMap} to maintain a scalable
* frequency map (a form of histogram or multiset). For example, to
* add a count to a {@code ConcurrentHashMap<String,LongAdder> freqs},
* initializing if not already present, you can use {@code
* freqs.computeIfAbsent(k -> new LongAdder()).increment();}
大概意思是說(shuō)LongAdder可以用于統(tǒng)計(jì)頻率等場(chǎng)景,所以我們的代碼就直接簡(jiǎn)化為下面這段代碼,基于computeIfAbsent和LongAdder的良好設(shè)計(jì),這段代碼的語(yǔ)義非常豐富,大體是執(zhí)行這樣一段操作:
- computeIfAbsent執(zhí)行k插入,如果k不存在則插入k,value為L(zhǎng)ongAdder,若存在執(zhí)行步驟2。
- 不覆蓋原有k,直接返回容器中k對(duì)應(yīng)的LongAdder的引用
- 基于LongAdder的increment完成計(jì)數(shù)累加
由此也就實(shí)現(xiàn)了我們并發(fā)詞頻統(tǒng)計(jì)的需求了:
ConcurrentHashMap<String,LongAdder> freqs
freqs.computeIfAbsent(k -> new LongAdder()).increment();
所以我們改進(jìn)后的代碼如下:
private Map<String, Long> gooduse() throws InterruptedException {
Map<String, LongAdder> map = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
LongStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
map.computeIfAbsent(key, k -> new LongAdder()).increment();
});
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return map.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey()
, e -> e.getValue().longValue()));
}
完成后我們不妨對(duì)這段代碼進(jìn)行性能壓測(cè):
@GetMapping("good")
public String good() throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start("normaluse");
Map<String, Long> normaluse = normaluse();
stopWatch.stop();
Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");
Assert.isTrue(normaluse.entrySet()
.stream()
.mapToLong(i -> i.getValue().longValue())
.reduce(0, Long::sum)
== LOOP_COUNT, "normaluse count error");
stopWatch.start("gooduse");
Map<String, Long> gooduse = gooduse();
stopWatch.stop();
Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");
Assert.isTrue(gooduse.entrySet()
.stream()
.mapToLong(i -> i.getValue().longValue())
.reduce(0, Long::sum)
== LOOP_COUNT, "gooduse count error");
log.info(stopWatch.prettyPrint());
return"ok";
}
很明顯后者的性能要優(yōu)于前者,那么原因是什么呢?
-----------------------------------------
ms % Task name
-----------------------------------------
03458 080% normaluse
00871 020% gooduse
從ConcurrentHashMap的computeIfAbsent中不難看出,其底層實(shí)現(xiàn)"若key不存在則初始化"是通過(guò)ReservationNode+CAS實(shí)現(xiàn)的,相比于上一段代碼那種非原子化的操作性能自然高出不少。
三、詳解ArrayList線程安全問(wèn)題
1. 問(wèn)題重現(xiàn)以原因
我們使用并行流在多線程情況下往list中插入100w個(gè)元素。
@Test
public void listTest() {
StopWatch stopWatch = new StopWatch();
List<Object> list=new ArrayList<>();
IntStream.rangeClosed(1, 100_0000).parallel().forEach(i -> {
list.add(i);
});
Assert.assertEquals(100_0000,list.size());
}
從輸出結(jié)果來(lái)看,list確實(shí)發(fā)生了線程安全問(wèn)題。
java.lang.AssertionError:
Expected :1000000
Actual :377628
我們不妨看看arrayList的add方法,它的邏輯為:
- 判斷當(dāng)前數(shù)組空間是否可以容納新元素,若不夠則創(chuàng)建一個(gè)新數(shù)組,并將舊數(shù)組的元素全部轉(zhuǎn)移到新數(shù)組中
- 將元素e追加到數(shù)組末尾
public boolean add(E e) {
//確定當(dāng)前數(shù)組空間是否足夠,若不足則擴(kuò)容
ensureCapacityInternal(size + 1); // Increments modCount!!
//將元素添加到末尾
elementData[size++] = e;
return true;
}
所以如果我們兩個(gè)線程同時(shí)得到線程空間足夠,然后兩個(gè)線程分別執(zhí)行插入邏輯,如下圖所示,因?yàn)楦髯悦鞔_加上自己的元素?cái)?shù)組空間2是足夠的,所以執(zhí)行elementData[size++] = e;時(shí),線程2定位到的索引位置為2出現(xiàn)索引越界:
我們同樣可以寫一段簡(jiǎn)單的代碼就能輕易重現(xiàn)這個(gè)問(wèn)題:
@Test
public void listTest() throws InterruptedException {
ArrayList<Object> list = new ArrayList<>(2);
CountDownLatch countDownLatch = new CountDownLatch(2);
list.add(0);
new Thread(() -> {
list.add(1);
countDownLatch.countDown();
}, "t1").start();
new Thread(() -> {
list.add(2);
countDownLatch.countDown();
}, "t2").start();
countDownLatch.await();
System.out.println(list.toString());
}
我們的add方法上打一個(gè)斷點(diǎn),并設(shè)置條件為t1和t2兩個(gè)線程:
在t1線程正準(zhǔn)備插入元素時(shí),切換線程到t2:
然后直接將t2線程放行,回到t1線程放行后續(xù)操作。問(wèn)題得以重現(xiàn):
2. 解決ArrayList線程安全問(wèn)題的兩個(gè)思路
在此回到這段代碼,解決這段代碼線程安全問(wèn)題的方式有兩種:
@Test
public void listTest() {
StopWatch stopWatch = new StopWatch();
List<Object> list=new ArrayList<>();
IntStream.rangeClosed(1, 100_0000).parallel().forEach(i -> {
list.add(i);
});
Assert.assertEquals(100_0000,list.size());
}
- 第一種是使用synchronizedList這個(gè)api將容器包裝為線程安全容器:
@Test
public void listTest() {
List<Object> list=Collections.synchronizedList(new ArrayList<>());
IntStream.rangeClosed(1, 100_0000).parallel().forEach(i -> {
list.add(i);
});
Assert.assertEquals(100_0000,list.size());
}
- 第二種則是使用CopyOnWriteArrayList這個(gè)基于COW思想即寫時(shí)復(fù)制的并發(fā)容器:
@Test
public void listTest() {
List<Object> list=new CopyOnWriteArrayList<>();
IntStream.rangeClosed(1, 100_0000).parallel().forEach(i -> {
list.add(i);
});
Assert.assertEquals(100_0000,list.size());
}
3. synchronizedList和CopyOnWriteArrayList區(qū)別
雖然兩者都可以保證并發(fā)操作的線程安全,但我們還是需要注意兩者使用場(chǎng)景上的區(qū)別:
synchronizedList保證多線程操作安全的原理很簡(jiǎn)單,每次執(zhí)行插入或者讀取操作前上鎖。
public E get(int index) {
synchronized (mutex) {return list.get(index);}
}
public void add(int index, E element) {
synchronized (mutex) {list.add(index, element);}
}
CopyOnWriteArrayList意味寫時(shí)復(fù)制,從源碼中不難看出它保證線程安全的方式開(kāi)銷非常大:
- 獲得寫鎖。
- 復(fù)制一個(gè)新數(shù)組newElements 。
- 在newElements 添加元素。
- 將數(shù)組修改為newElements。
對(duì)應(yīng)的我們也給出相應(yīng)的add源碼的實(shí)現(xiàn)邏輯:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
//上鎖
lock.lock();
try {
//復(fù)制數(shù)組
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
//添加元素
newElements[len] = e;
//原子覆蓋
setArray(newElements);
returntrue;
} finally {
lock.unlock();
}
}
而對(duì)于讀CopyOnWriteArrayList則非常簡(jiǎn)單,直接返回原數(shù)組的值,所以CopyOnWriteArrayList更適合與讀多寫少的場(chǎng)景:
private E get(Object[] a, int index) {
return (E) a[index];
}
對(duì)此我們對(duì)兩者讀寫性能進(jìn)行了一次壓測(cè),首先是寫性能壓測(cè):
@GetMapping("testWrite")
public Map testWrite() {
int loopCount = 10_0000;
CopyOnWriteArrayList<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
//使用copyOnWriteArrayList添加10w個(gè)數(shù)據(jù)
StopWatch stopWatch = new StopWatch();
stopWatch.start("copyOnWriteArrayList add");
IntStream.rangeClosed(1, loopCount)
.parallel()
.forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
//使用synchronizedList添加10w個(gè)數(shù)據(jù)
stopWatch.start("synchronizedList add");
IntStream.rangeClosed(1, loopCount)
.parallel()
.forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map<String, Integer> result = new HashMap<>();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}
可以看出,高并發(fā)寫的情況下synchronizedList 性能更佳。
2023-03-15 00:16:14,532 INFO CopyOnWriteListMisuseController:39 - StopWatch '': running time (millis) = 5556
-----------------------------------------
ms % Task name
-----------------------------------------
05527 099% copyOnWriteArrayList add
00029 001% synchronizedList add
讀取性能壓測(cè)代碼:
@GetMapping("testRead")
public Map testRead() {
int loopCount = 100_0000;
CopyOnWriteArrayList<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
//為兩個(gè)list設(shè)置100_0000個(gè)元素
addAll(copyOnWriteArrayList);
addAll(synchronizedList);
//隨機(jī)讀取copyOnWriteArrayList中的元素
StopWatch stopWatch = new StopWatch();
stopWatch.start("copyOnWriteArrayList read");
IntStream.rangeClosed(0, loopCount)
.parallel()
.forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
//隨機(jī)讀取synchronizedList中的元素
stopWatch.start("synchronizedList read");
IntStream.rangeClosed(0, loopCount)
.parallel()
.forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map<String, Integer> result = new HashMap<>();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}
private void addAll(List<Integer> list) {
list.addAll(IntStream.rangeClosed(1, 100_0000)
.parallel()
.boxed()
.collect(Collectors.toList()));
}
而在高并發(fā)讀的情況下synchronizedList 性能更加:
2023-03-15 00:16:54,335 INFO CopyOnWriteListMisuseController:74 - StopWatch '': running time (millis) = 310
-----------------------------------------
ms % Task name
-----------------------------------------
00037 012% copyOnWriteArrayList read
00273 088% synchronizedList read
四、阻塞隊(duì)列ArrayBlockingQueue和延遲隊(duì)列DelayQueue
筆者近期已經(jīng)將阻塞隊(duì)列和延遲隊(duì)列的文章提交給了開(kāi)源項(xiàng)目JavaGuide,關(guān)于阻塞的隊(duì)列讀者可以參考這篇文章:
- https://github.com/Snailclimb/JavaGuide/blob/main/docs/java/collection/arrayblockingqueue-source-code.md
- https://github.com/Snailclimb/JavaGuide/blob/main/docs/java/collection/delayqueue-source-code.md
五、小結(jié)
以上筆者對(duì)高并發(fā)容器的個(gè)人理解,總的來(lái)說(shuō)讀者必須掌握以下幾點(diǎn):
- 通過(guò)閱讀源碼了解容器工作機(jī)制,代入多線程繪圖推算出可能存在的線程安全問(wèn)題,并學(xué)會(huì)使用IDEA加以實(shí)踐落地推算結(jié)果。
- 了解并發(fā)容器工作原理和所有API,確定在指定的場(chǎng)景可以正確使用并發(fā)容器保證線程安全和性能。