Java并發編程中高效緩存設計的哲學
本文將基于并發編程和算法中經典的哈希取模、鎖分段、 異步化、原子化。這幾個核心設計理念編寫逐步推演出一個相對高效的緩存工具,希望對你有所啟發。
基于緩存存儲運算結果
我們有一批數據需要通過運算才能獲得結果,而每一次運算大約耗時是500ms,所以為了避免重復運算導致的等待,我們希望對應數據第一次運算的結果直接緩存到容器中,后續線程可直接通過容器獲得結果:
于是我們就有了第一個版本,利用緩存避免非必要的重復計算,從而提升程序在單位時間內的吞吐量
public class ComputeCache {
public final Map<Integer, Integer> cache = new HashMap<>();
public synchronized int compute(int arg) {
if (cache.containsKey(arg)) {//若存在直接返回結果
return cache.get(arg);
} else {//若不存在則計算后緩存并返回
int result = doCompute(arg);
cache.put(arg, result);
return result;
}
}
//模擬耗時的計算
private int doCompute(int key) {
ThreadUtil.sleep(500);
return key << 1;
}
public synchronized int size() {
return cache.size();
}
}
我們利用下面這段單元測試來驗證緩存的性能和正確性,這里筆者也簡單介紹一下幾個比較核心的點:
- 聲明本機CPU核心數+1的線程數執行并發運算
- 利用倒計時門閂控制線程并發流程起止,保證準確感知所有運算任務結束后,執行耗時統計
- 利用容器中最直觀且容易檢查出錯誤的屬性size進行比對判斷我們的緩存是否正確
最終在筆者的機器下5000并發的耗時大約是26765ms,整體還是不太符合我們的預期:
//初始化緩存工具
ComputeCache cache = new ComputeCache();
//聲明處理計算密集型任務的緩存
ExecutorService threadPool = ThreadUtil.newExecutor(Runtime.getRuntime().availableProcessors() + 1);
//執行5000次緩存調用
int size = 5000;
//通過hashSet數據比對來判斷容器正確性
ConcurrentHashSet<Integer> set = new ConcurrentHashSet<>();
CountDownLatch countDownLatch = new CountDownLatch(size);
long begin = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
//生成50的隨機數
int num = RandomUtil.randomInt(50);
//利用并發安全set完成計數
set.add(num);
//利用緩存工具進行計算
threadPool.submit(() -> {
try {
int res1 = cache.compute(num);
int res2 = cache.compute(num);
Assert.equals(res1, res2);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
long end = System.currentTimeMillis();
//利用size判斷容器正確性
Assert.equals(set.size(), cache.size());
Console.log("耗時:{}ms", end - begin);
threadPool.shutdownNow();
仔細查看代碼也很很直觀看出原因,上述緩存鎖的粒度是緩存工具這個示例的,這也就意味著單位時間內只有一個線程可以操作該緩存工具,這使得在緩存運算初期大量運算任務必須串行的執行:
鎖分段散列減小鎖粒度
所以我們就需要考慮鎖小鎖的粒度,即針對數值1計算用數值1的鎖,針對數值2計算則使用數值2的鎖,本質上就是一種鎖分段的思想:
所以我們就考慮將synchronized關鍵字去掉,取而代之是使用ConcurrentHashMap這個線程安全的并發容器,利用其底層的entry級別的鎖來完成鎖的分段:
public final Map<Integer, Integer> cache = new ConcurrentHashMap<>();
public int compute(int arg) {
if (cache.containsKey(arg)) {//若存在直接返回結果
return cache.get(arg);
} else {//若不存在則計算后緩存并返回
int result = doCompute(arg);
cache.put(arg, result);
return result;
}
}
//模擬耗時的計算
private int doCompute(int key) {
ThreadUtil.sleep(500);
return key << 1;
}
public int size() {
return cache.size();
}
基于之前的單元測試壓測結果耗時大約是2022ms,相較于第一個版本有了很大的改進,但這還是不符合筆者的預期,仔細查看我們編寫的緩存可以試想這樣一個場景:
- 線程0希望獲得數值1的計算結果,通過contain發現沒有,調用doCompute執行運算
- 線程1在線程0運算期間也看到數值1沒有運算結果,也執行運算
因為數值計算的冪等性保證這種重復的運算不會帶來不好的結果,但針對這種耗時運算帶來的重復阻塞對于吞吐量要求較高的程序來說是非常不希望看到的:
這里補充說明一下,關于ConcurrentHashMap如果對其工作原理不太了解的讀者可以移步筆者這篇文章:《Java 并發容器總結》
異步化提升處理效率
所以我們必須想辦法避免這種非必要的重復運算,所以我們就必須借助一種手段將那些正在運算過程中的數值任務提前暴露,讓其他線程感知從而避免重復運算。就像Spring的依賴注入一樣,為避免循環依賴,先提前暴露一個未完全的對象讓被注入的bean提前感知,從而避免重復加載:
所以筆者這里使用FutureTask將緩存不存在的數值結果以任務維度存入緩存中,避免運算過程中其他線程看到緩存為空執行重復運算:
public int compute(int key) throws ExecutionException, InterruptedException {
FutureTask<Integer> f = cache.get(key);
if (f == null) {//若為空
FutureTask<Integer> futureTask = new FutureTask<>(() -> doCompute(key));
//緩存保證下一個線程看到時直接取出使用
cache.put(key, futureTask);
futureTask.run();
f = futureTask;
}
return f.get();
}
以同樣的單元測試結果耗時為1321ms,相較于上一個版本也是有著顯著的提升,但這還是不符合我們的預期。
原子化避免重復運算
查看我們的緩存代碼可以看到,判空和set存入緩存操作是兩個動作,這種非原子操作依然是存在重復運算的情況,試想這樣一個場景:
- 線程0查看數值1沒有緩存結果
- 線程1在隨后也看到數值1沒有緩存結果
- 線程0生成運算任務提交緩存
- 線程1生成運算任務提交緩存將線程0結果覆蓋
雖然雙方可以基于各自的異步任務獲取正確結果,但還是存在重復提交任務的情況:
于是我們就有了這個最終的版本,即通過ConcurrentHashMap內置的原子操作方法putIfAbsent將判空和保存操作以一個原子的維度進行操作,只有putIfAbsent返回null的情況下任務才能存入緩存并啟動運行,由此避免重復提交運行和開銷:
public int compute(int key) throws ExecutionException, InterruptedException {
FutureTask<Integer> f = cache.get(key);
if (f == null) {
FutureTask<Integer> futureTask = new FutureTask<>(() -> doCompute(key));
//利用putIfAbsent原子操作添加
f = cache.putIfAbsent(key, futureTask);
//若返回空說明第一次添加,則讓這個任務啟動,其他線程直接基于緩存中的任務獲取結果
if (f == null) {
f = futureTask;
f.run();
}
}
return f.get();
}
這一操作在單元測試的性能表現上差異不是很大,但是這種極致優化的設計理念確是每一個java工程師所必須具備的素質。