深入解析 Guava Cache- 從基本用法、回收策略、刷新策略到實現原理
Guava Cache 是非常強大的本地緩存工具,提供了非常簡單 API 供開發者使用。
這篇文章,我們將詳細介紹 Guava Cache 的基本用法、回收策略,刷新策略,實現原理。
圖片
1.基本用法
1.1 依賴配置
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
1.2 創建緩存
Guava Cache 提供了基于 Builder 構建者模式的構造器,用戶只需要根據需求設置好各種參數即可使用。
(1)手工創建緩存對象
@Test
public void testHandCache() {
// 測試手工測試
Cache<String, String> cache = CacheBuilder.newBuilder().
// 最大容量為20(基于容量進行回收)
maximumSize(20)
// 配置寫入后多久未更新,緩存會過期
.expireAfterWrite(10, TimeUnit.SECONDS).build();
cache.put("hello", "value_HELLO");
assertEquals("value_HELLO", cache.getIfPresent("hello"));
Thread.sleep(10000);
// 過期后重新獲取
assertNull(cache.getIfPresent("hello"));
}
我們可以創建一個緩存對象 Cache ,通過 CacheBuilder 構造器,配置相關參數(最大容量 20 個條目、緩存過期時間 10 秒),最后調用構建方法。
(2)創建緩存加載器
CacheLoader 可以理解為一個固定的加載器,在創建 Cache 對象時指定,然后簡單地重寫 V load(K key) throws Exception
方法,就可以達到當檢索不存在的時候,會自動的加載數據。
@Test
public void testLoadingCache() throws InterruptedException, ExecutionException {
CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() {
//自動寫緩存數據的方法
@Override
public String load(String key) {
System.out.println("加載 key:" + key);
return"value_" + key.toUpperCase();
}
@Override
//重新刷新緩存
public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
returnsuper.reload(key, oldValue);
}
};
LoadingCache<String, String> cache =
CacheBuilder.newBuilder()
// 最大容量為100(基于容量進行回收)
.maximumSize(20)
// 配置寫入后多久未更新,緩存會過期
.expireAfterWrite(10, TimeUnit.SECONDS)
//配置寫入后多久刷新緩存
.refreshAfterWrite(1, TimeUnit.SECONDS).build(cacheLoader);
assertEquals(0, cache.size());
assertEquals("value_HELLO", cache.getUnchecked("hello"));
assertEquals(1, cache.size());
// 通過 Callable 獲取數據
String key = "mykey";
String value = cache.get(key, new Callable<String>() {
@Override
public String call() throws Exception {
return"call_" + key;
}
});
System.out.println("call value:" + value);
}
和手工創建緩存對象不同,我們首先創建緩存加載器對象,并重寫 load 方法,然后通過緩存構造器創建 LoadingCache 對象 ,該對象支持寫入后刷新方法。
同時 LoadingCache 對象支持 Callable 模式,也就是調用 get 方法時,可以傳入 Callable 對象。這樣可以在使用緩存時,更加靈活。
2.回收策略
Guava Cache 提供了三種基本的緩存回收方式:
- 基于容量回收策略
- 基于時間的回收策略
- 基于引用回收策略
2.1 基于容量回收策略
基于容量的回收策略可以分為兩種:基于大小和基于權重。
基于大小:我們可以使用 maximumSize
方法設置最大緩存項數量,當緩存項數量達到設定的最大值時,舊的緩存項將會被移除。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
.maximumSize(100)
.build();
基于權重:如果不同的緩存值,需要占據不同的內存空間,也就是不同的緩存項有不同的“權重”(weights)。
我們可以使用 CacheBuilder.weigher(Weigher)
指定一個權重函數,并且用 maximumWeight(long)
指定最大總重。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
.maximumWeight(1000)
.weigher(new Weigher<Object, Object>() {
public int weigh(Object key, Object value) {
// 定義權重計算方法
return value.size();
}
}).build();
2.2 基于時間的回收策略
我們可以使用 expireAfterAccess
和 expireAfterWrite
方法設置緩存項的最大存活時間。
expireAfterAccess
表示緩存項在給定時間內沒有被讀/寫訪問會過期。expireAfterWrite
表示緩存項在被創建或最后一次更新后的指定時間內會過期。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
// 10分鐘沒有訪問后會被回收,或者重新加載
.expireAfterAccess(10, TimeUnit.MINUTES)
// 5分鐘沒有更新,緩存會被回收,或者重新加載
// .expireAfterWrite(5,TimeUnit.MINUTES)
.build();
2.3 基于引用回收策略
Guava Cache 提供了以下三個方法來配置基于引用的回收策略:
- weakKeys() 方法:
通過調用weakKeys()
方法,可以使緩存中的鍵使用弱引用。這意味著如果某個鍵沒有其他強引用指向它,那么該鍵可能會被垃圾回收,并且相應的緩存項也會被移除。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
.weakKeys()
.build();
- weakValues() 方法:
通過調用weakValues()
方法,可以使緩存中的值使用弱引用。這樣,如果某個值沒有其他強引用指向它,那么該值可能會被垃圾回收,相應的緩存項也會被移除。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
.weakValues()
.build();
- softValues() 方法:
通過調用softValues()
方法,可以使緩存中的值使用軟引用。軟引用相對于弱引用,更傾向于在內存不足時被垃圾回收。如果某個值沒有其他強引用指向它,且內存不足時,該值可能會被垃圾回收,相應的緩存項也會被移除。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
.softValues()
.build();
一般來講,我們在生產環境使用的是(基于容量回收策略 + 基于時間的回收策略)兩者配合來使用。
當然 ,我們同樣可以使用手工回收的方式。
Cache<String,String> cache = CacheBuilder.newBuilder().build();
Object value = new Object();
cache.put("key1","value1");
cache.put("key2","value2");
cache.put("key3","value3");
//1.清除指定的key
cache.invalidate("key1");
//2.批量清除list中全部key對應的記錄
List<String> list = new ArrayList<String>();
list.add("key1");
list.add("key2");
cache.invalidateAll(list);
3.刷新策略
3.1 手工刷新
我們可以強制緩存加載器重新加載鍵的新值,調用 LoadingCache 對象的刷新方法。
String value = loadingCache.get("key");
loadingCache.refresh("key");
3.2 自動刷新
Guava Cache 提供了刷新(refresh)機制,可以通過 refreshAfterWrite
方法來設置刷新時間,當緩存項過期的同時可以重新加載新值。
Cache<String, String> cache = CacheBuilder.newBuilder()
.refreshAfterWrite(5, TimeUnit.MINUTES)
// 設置并發級別為3,并發級別是指可以同時寫緩存的線程數
.concurrencyLevel(3)
.build(new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
// 異步加載新值的邏輯
return fetchDataFromDataSource(key);
}
});
// 在獲取緩存值時,如果緩存項過期,將返回舊值
String value = cache.get("exampleKey");
配置刷新方法 refreshAfterWrite
,當大量線程同時訪問緩存項,緩存已過期時,更新線程調用 load 方法更新該緩存,其他請求線程并不需要等待,框架直接返回該緩存項的舊值。
因為更新線程同時也是請求線程,所以在上面的示例代碼里面,刷新緩存是個同步操作,可不可以異步的加載緩存呢 ?
我們有兩種方式:異步加載緩存的原理是重寫 reload 方法。
@Test
public void testAnsynRefreshMethod1() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() {
//自動寫緩存數據的方法
@Override
public String load(String key) {
System.out.println(Thread.currentThread().getName() + " 加載 key:" + key);
// 從數據庫加載數據
return"value_" + key.toUpperCase();
}
@Override
//異步刷新緩存
public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
ListenableFutureTask<String> futureTask = ListenableFutureTask.create(() -> {
System.out.println(Thread.currentThread().getName() + " 異步加載 key:" + key + " oldValue:" + oldValue);
Thread.sleep(1000);
return load(key);
});
executorService.submit(futureTask);
return futureTask;
}
};
LoadingCache<String, String> cache = CacheBuilder.newBuilder()
// 最大容量為20(基于容量進行回收)
.maximumSize(20)
//配置寫入后多久刷新緩存
.refreshAfterWrite(2, TimeUnit.SECONDS).build(cacheLoader);
String key = "hello";
// 第一次加載
String value = cache.get(key);
System.out.println(value);
Thread.sleep(3000);
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
String value2 = cache.get(key);
System.out.println(Thread.currentThread().getName() + value2);
// 第二次加載
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(20000);
}
或者使用更優雅的使用方式:
ExecutorService executorService = Executors.newFixedThreadPool(5);
CacheLoader<String, String> cacheLoader = CacheLoader.asyncReloading(
new CacheLoader<String, String>() {
//自動寫緩存數據的方法
@Override
public String load(String key) {
System.out.println(Thread.currentThread().getName() + " 加載 key:" + key);
// 從數據庫加載數據
return "value_" + key.toUpperCase();
}
} , executorService);
自動刷新的缺點是:當緩存項到了指定過期時間,不管是同步刷新還是異步刷新,絕大部分請求線程都會返回舊的數據值,緩存值會有一定的延遲效果。
所以一般場景下,使用efreshAfterWrite
和 expireAfterWrite
配合使用 。
比如說控制緩存每1秒進行刷新,如果超過 2s 沒有訪問,那么則讓緩存失效,訪問時不會得到舊值,而是必須得待新值加載。
4.實現原理
Guava Cache 的數據結構跟 JDK1.7 的 ConcurrentHashMap 類似,如下圖所示:
圖片
4.1 構造函數
public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
CacheLoader<? super K1, V1> loader) {
checkWeightWithWeigher();
return new LocalCache.LocalLoadingCache<>(this, loader);
}
通過構造器 CacheBuilder
的構建方法創建本地緩存類 LocalCache
的靜態包裝類 LocalLoadingCache
對象。
class LocalCache<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> {
// ..... 省略代碼
staticclass LocalLoadingCache<K, V> extends LocalManualCache<K, V>
implements LoadingCache<K, V> {
LocalLoadingCache(
CacheBuilder<? super K, ? super V> builder, CacheLoader<? super K, V> loader) {
super(new LocalCache<K, V>(builder, checkNotNull(loader)));
}
// LoadingCache methods
@Override
public V get(K key) throws ExecutionException {
return localCache.getOrLoad(key);
}
@Override
public V getUnchecked(K key) {
try {
return get(key);
} catch (ExecutionException e) {
thrownew UncheckedExecutionException(e.getCause());
}
}
@Override
public ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException {
return localCache.getAll(keys);
}
@Override
public void refresh(K key) {
localCache.refresh(key);
}
// ..... 省略代碼
}
}
LocalLoadingCache
類對外暴露了若干方法,它的底層依然是 LocalCache
對象來執行相關緩存操作,LocalCache
本質上就是一個 Map 。
4.2 初始化緩存
LocalCache(
CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) {
concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);
// key的強度,即引用類型的強弱
keyStrength = builder.getKeyStrength();
// value的強度,即引用類型的強弱
valueStrength = builder.getValueStrength();
// key的比較策略,跟key的引用類型有關
keyEquivalence = builder.getKeyEquivalence();
// value的比較策略,跟value的引用類型有關
valueEquivalence = builder.getValueEquivalence();
maxWeight = builder.getMaximumWeight();
weigher = builder.getWeigher();
//訪問后的過期時間,設置了expireAfterAccess參數
expireAfterAccessNanos = builder.getExpireAfterAccessNanos();
//寫入后的過期時間,設置了expireAfterWrite參數
expireAfterWriteNanos = builder.getExpireAfterWriteNanos();
refreshNanos = builder.getRefreshNanos();
int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY);
if (evictsBySize() && !customWeigher()) {
initialCapacity = (int) Math.min(initialCapacity, maxWeight);
}
// Find the lowest power-of-two segmentCount that exceeds concurrencyLevel, unless
// maximumSize/Weight is specified in which case ensure that each segment gets at least 10
// entries. The special casing for size-based eviction is only necessary because that eviction
// happens per segment instead of globally, so too many segments compared to the maximum size
// will result in random eviction behavior.
int segmentShift = 0;
int segmentCount = 1;
while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) {
++segmentShift;
segmentCount <<= 1;
}
this.segmentShift = 32 - segmentShift;
segmentMask = segmentCount - 1;
this.segments = newSegmentArray(segmentCount);
int segmentCapacity = initialCapacity / segmentCount;
if (segmentCapacity * segmentCount < initialCapacity) {
++segmentCapacity;
}
int segmentSize = 1;
while (segmentSize < segmentCapacity) {
segmentSize <<= 1;
}
if (evictsBySize()) {
// Ensure sum of segment max weights = overall max weights
long maxSegmentWeight = maxWeight / segmentCount + 1;
long remainder = maxWeight % segmentCount;
for (int i = 0; i < this.segments.length; ++i) {
if (i == remainder) {
maxSegmentWeight--;
}
this.segments[i] =
createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
}
} else {
for (int i = 0; i < this.segments.length; ++i) {
this.segments[i] =
createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
}
}
}
LocalCache
維護一個 Segment 數組,數組大小滿足如下條件:
- 數組大小是 2 的冪次 ,并且小于并發度 concurrencyLevel ;
- 若指定了容量大小,數組大小乘以 20 要大于緩存權重 maxWeight (假如設置容量大小最大值為40,那么 maxWeight 為 40 )。
接下來,我們看看 Segment 類的核心屬性 :
static class Segment<K, V> extends ReentrantLock {
// 存活的元素大小
volatileint count;
// 存活的元素權重
long totalWeight;
//修改、更新的數量,用來做弱一致性
int modCount;
//擴容用
int threshold;
//存放Entry的數組,用來存放Entry,使用AtomicReferenceArray是因為要用CAS來保證原子性
volatile@Nullable AtomicReferenceArray<ReferenceEntry<K, V>> table;
//如果key是弱引用的話,那么被 GC 回收后,就會放到ReferenceQueue,要根據這個queue做一些清理工作
final@Nullable ReferenceQueue<K> keyReferenceQueue;
//如果value是弱引用的話,那么被 GC 回收后,就會放到ReferenceQueue,要根據這個queue做一些清理工作
final@Nullable ReferenceQueue<V> valueReferenceQueue;
//記錄哪些entry被訪問,用于accessQueue的更新。
final Queue<ReferenceEntry<K, V>> recencyQueue;
// 讀取次數計數器
final AtomicInteger readCount = new AtomicInteger();
// 如果一個元素新寫入,則會記到這個隊列的尾部,用來做expire
@GuardedBy("this")
final Queue<ReferenceEntry<K, V>> writeQueue;
//讀、寫都會放到這個隊列,用來進行LRU替換算法
@GuardedBy("this")
final Queue<ReferenceEntry<K, V>> accessQueue;
}
ReferenceEntry 有幾種引用類型 :
圖片
下圖展示了 StringEntry 核心屬性 :
圖片
每種 Entry 對象都有 Next 屬性 ,指向下一個 Entry 。對象值 valueReference 默認是一個占位符 unSet ,表示沒有被設置過值。
4.3 查詢流程
進入 LoadingCache 的 get(key) 方法 , 如下代碼所示:
// 1.調用LoadingCache的getOrLoad
V getOrLoad(K key) throws ExecutionException {
return get(key, defaultLoader);
}
// 2.計算 key 的哈希值,并判斷位于哪一個段 Segment,最后通過查詢
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
int hash = hash(checkNotNull(key));
return segmentFor(hash).get(key, hash, loader);
}
(1)計算 key 對應的哈希值
int hash(@Nullable Object key) {
int h = keyEquivalence.hash(key);
return rehash(h);
}
(2)定位分段 Segment
Segment<K, V> segmentFor(int hash) {
// segmentMask = segmentCount - 1
return segments[(hash >>> segmentShift) & segmentMask];
}
第二步驟,和 ConcurrentHashMap 類似,通過哈希值計算數據存儲在哪一個分段 Segment 。
(3)從定位的分段查詢出對象
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
// 判斷 key、loader 是否為空
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) { // read-volatile
// don't call getLiveEntry, which would ignore loading values
// 根據hash定位到 table 的第一個 Entry
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
// 獲取當前時間
long now = map.ticker.read();
// 獲取當前存活的 Value
V value = getLiveValue(e, now);
if (value != null) {
//記錄被訪問過
recordRead(e, now);
//記錄命中率
statsCounter.recordHits(1);
//判斷是否需要刷新,如果需要刷新,那么會去異步刷新,且返回舊值。
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
//如果 Entry 過期了且數據還在加載中,則等待直到加載完成。
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
// at this point e is either null or expired;
// 走到這一步表示: 之前沒有寫入過數據 || 數據已經過期 || 數據不是在加載中。
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
thrownew ExecutionError((Error) cause);
} elseif (cause instanceof RuntimeException) {
thrownew UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();
}
}
A 定位第一個Entry
ReferenceEntry<K, V> getEntry(Object key, int hash) {
for (ReferenceEntry<K, V> e = getFirst(hash); e != null; e = e.getNext()) {
// 判斷哈希值
if (e.getHash() != hash) {
continue;
}
// 判斷key
K entryKey = e.getKey();
if (entryKey == null) {
tryDrainReferenceQueues();
continue;
}
if (map.keyEquivalence.equivalent(key, entryKey)) {
return e;
}
}
returnnull;
}
B 從第一個 Entry 獲取存活的值
V getLiveValue(ReferenceEntry<K, V> entry, long now) {
if (entry.getKey() == null) {
tryDrainReferenceQueues();
return null;
}
V value = entry.getValueReference().get();
if (value == null) {
tryDrainReferenceQueues();
return null;
}
if (map.isExpired(entry, now)) {
tryExpireEntries(now);
return null;
}
return value;
}
boolean isExpired(ReferenceEntry<K, V> entry, long now) {
checkNotNull(entry);
// 如果配置了 expireAfterAccess ,比較當前時間和 Entry 的 accessTime 比較
if (expiresAfterAccess() && (now - entry.getAccessTime() >= expireAfterAccessNanos)) {
returntrue;
}
// 如果配置了 expireAfterWrite ,比較當前時間和 Entry 的 writeTime 比較
if (expiresAfterWrite() && (now - entry.getWriteTime() >= expireAfterWriteNanos)) {
returntrue;
}
returnfalse;
}
假如 Entry 的 key 為空,或者 vlaue 為空,或者過期了,則返回空 。
C 調度刷新 scheduleRefresh
V scheduleRefresh(
ReferenceEntry<K, V> entry,
K key,
int hash,
V oldValue,
long now,
CacheLoader<? super K, V> loader) {
//1、是否配置了 refreshAfterWrite
//2、用 writeTime 判斷是否達到刷新的時間
//3、是否在加載中,如果是則沒必要再進行刷新
if (map.refreshes()
&& (now - entry.getWriteTime() > map.refreshNanos)
&& !entry.getValueReference().isLoading()) {
V newValue = refresh(key, hash, loader, true);
if (newValue != null) {
return newValue;
}
}
return oldValue;
}
調度刷新方法會判斷三個條件 :
- 配置了刷新時間 refreshAfterWrite
- 當前時間減去 Entry 的寫入時間大于刷新時間
- 當前 Entry 未處于加載中
當滿足了三個條件之后,調用 refresh 方法,當異步加載成功后,返回新值。
V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
//插入一個 LoadingValueReference ,實質是把對應Entry的ValueReference替換為新建的LoadingValueReference
final LoadingValueReference<K, V> loadingValueReference =
insertLoadingValueReference(key, hash, checkTime);
if (loadingValueReference == null) {
returnnull;
}
// 調用異步加載方法loadAsync
ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
if (result.isDone()) {
try {
return Uninterruptibles.getUninterruptibly(result);
} catch (Throwable t) {
// don't let refresh exceptions propagate; error was already logged
}
}
returnnull;
}
首先將 Entry 對象的 ValueReference 包裝為新建的 LoadingValueReference , 表明當前對象正在加載中。
LoadingValueReference<K, V> insertLoadingValueReference(
final K key, final int hash, boolean checkTime) {
ReferenceEntry<K, V> e = null;
lock();
try {
long now = map.ticker.read();
preWriteCleanup(now);
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
// Look for an existing entry.
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// We found an existing entry.
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()
|| (checkTime && (now - e.getWriteTime() < map.refreshNanos))) {
// refresh is a no-op if loading is pending
// if checkTime, we want to check *after* acquiring the lock if refresh still needs
// to be scheduled
returnnull;
}
// continue returning old value while loading
++modCount;
LoadingValueReference<K, V> loadingValueReference =
new LoadingValueReference<>(valueReference);
e.setValueReference(loadingValueReference);
return loadingValueReference;
}
}
++modCount;
LoadingValueReference<K, V> loadingValueReference = new LoadingValueReference<>();
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
return loadingValueReference;
} finally {
unlock();
postWriteCleanup();
}
}
接下來,分析異步加載loadAsync
方法:
ListenableFuture<V> loadAsync(
final K key,
final int hash,
final LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader) {
final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
loadingFuture.addListener(
new Runnable() {
@Override
public void run() {
try {
getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during refresh", t);
loadingValueReference.setException(t);
}
}
},
directExecutor());
return loadingFuture;
}
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
try {
// 記錄耗時時間
stopwatch.start();
V previousValue = oldValue.get();
if (previousValue == null) {
V newValue = loader.load(key);
return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
}
ListenableFuture<V> newValue = loader.reload(key, previousValue);
if (newValue == null) {
return Futures.immediateFuture(null);
}
// To avoid a race, make sure the refreshed value is set into loadingValueReference
// *before* returning newValue from the cache query.
return transform(
newValue,
new com.google.common.base.Function<V, V>() {
@Override
public V apply(V newValue) {
LoadingValueReference.this.set(newValue);
return newValue;
}
},
directExecutor());
} catch (Throwable t) {
ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
return result;
}
}
loadAsync 方法流程:
- 調用 loadingValueReference 對象的 loadFuture 方法,假如舊數據為空值,則同步調用加載器 loader 的 load 方法 ,并返回包裝了新值的 Future 。
- 假如舊數據不為空值,則調用加載器 loader 的 reload 方法(此處可以重新實現為異步的方式),經過轉換操作返回包裝了新值的 Future 。
- 將新的值存儲在 Entry 對象里。
D 查詢/加載 lockedGetOrLoad
如果之前沒有寫入過數據 、 數據已經過期、 數據不是在加載中,則會調用lockedGetOrLoad
方法。
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
//用來判斷是否需要創建一個新的Entry
boolean createNewEntry = true;
//segment上鎖
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
//做一些清理工作
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
//通過key定位entry
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
//找到entry
valueReference = e.getValueReference();
//如果value在加載中則不需要重復創建entry
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
//value為null說明已經過期且被清理掉了
if (value == null) {
//寫通知queue
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
//過期但還沒被清理
} elseif (map.isExpired(e, now)) {
//寫通知queue
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accomodate an incorrect expiration queue.
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
} else {
recordLockedRead(e, now);
statsCounter.recordHits(1);
//其他情況則直接返回value
//來到這步,是不是覺得有點奇怪,我們分析一下:
//進入lockedGetOrLoad方法的條件是數據已經過期 || 數據不是在加載中,但是在lock之前都有可能發生并發,進而改變entry的狀態,所以在上面中再次判斷了isLoading和isExpired。所以來到這步說明,原來數據是過期的且在加載中,lock的前一刻加載完成了,到了這步就有值了。
return value;
}
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
//創建一個Entry,且set一個新的 LoadingValueReference。
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}
//同步加載數據
if (createNewEntry) {
try {
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
return waitForLoadingValue(e, key, valueReference);
}
}
5.總結
通過解析 Guava Cache 的實現原理,我們發現 Guava LocalCache 與 ConcurrentHashMap 有以下不同:
- ConcurrentHashMap ”分段控制并發“是隱式的(實現中沒有Segment對象),而 LocalCache 是顯式的。
在 JDK 1.8 之后,ConcurrentHashMap 采用synchronized + CAS
實現:當 put 的元素在哈希桶數組中不存在時,直接 CAS 進行寫操作;在發生哈希沖突的情況下使用 synchronized 鎖定頭節點。其實是比分段鎖更細粒度的鎖實現,只在特定場景下鎖定其中一個哈希桶,降低鎖的影響范圍。 - Guava Cache 使用 ReferenceEntry 來封裝鍵值對,并且對于值來說,還額外實現了 ValueReference 引用對象來封裝對應 Value 對象。
- Guava Cache 支持過期 + 自動 loader 機制,這也使得其加鎖方式與 ConcurrentHashMap 不同。
- Guava Cache 支持 segment 粒度上支持了 LRU 機制, 體現在 Segment 上就是 writeQueue 和 accessQueue。
隊列中的元素按照訪問或者寫時間排序,新的元素會被添加到隊列尾部。如果,在隊列中已經存在了該元素,則會先delete掉,然后再尾部添加該節點。