成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

深入解析 Guava Cache- 從基本用法、回收策略、刷新策略到實現原理

開發 前端
Guava Cache 支持 segment 粒度上支持了 LRU 機制, 體現在 Segment 上就是 writeQueue 和 accessQueue。 隊列中的元素按照訪問或者寫時間排序,新的元素會被添加到隊列尾部。如果,在隊列中已經存在了該元素,則會先delete掉,然后再尾部添加該節點。

Guava Cache 是非常強大的本地緩存工具,提供了非常簡單 API 供開發者使用。

這篇文章,我們將詳細介紹 Guava Cache 的基本用法回收策略刷新策略實現原理

圖片圖片

1.基本用法

1.1 依賴配置

<dependency>
     <groupId>com.google.guava</groupId>
     <artifactId>guava</artifactId>
     <version>31.0.1-jre</version>
</dependency>

1.2 創建緩存

Guava Cache 提供了基于 Builder 構建者模式的構造器,用戶只需要根據需求設置好各種參數即可使用。

(1)手工創建緩存對象

@Test
public void testHandCache() {
      // 測試手工測試
      Cache<String, String> cache = CacheBuilder.newBuilder().
              // 最大容量為20(基于容量進行回收)
                      maximumSize(20)
              // 配置寫入后多久未更新,緩存會過期
              .expireAfterWrite(10, TimeUnit.SECONDS).build();
      cache.put("hello", "value_HELLO");
      assertEquals("value_HELLO", cache.getIfPresent("hello"));
      Thread.sleep(10000);
      // 過期后重新獲取 
      assertNull(cache.getIfPresent("hello"));
}

我們可以創建一個緩存對象 Cache ,通過 CacheBuilder 構造器,配置相關參數(最大容量 20 個條目、緩存過期時間 10 秒),最后調用構建方法。

(2)創建緩存加載器

CacheLoader 可以理解為一個固定的加載器,在創建 Cache 對象時指定,然后簡單地重寫 V load(K key) throws Exception 方法,就可以達到當檢索不存在的時候,會自動的加載數據。

@Test
public void testLoadingCache() throws InterruptedException, ExecutionException {
      CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() {
          //自動寫緩存數據的方法
          @Override
          public String load(String key) {
              System.out.println("加載 key:" + key);
              return"value_" + key.toUpperCase();
          }
          @Override
          //重新刷新緩存
          public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
              returnsuper.reload(key, oldValue);
          }
      };

      LoadingCache<String, String> cache =
              CacheBuilder.newBuilder()
                      // 最大容量為100(基于容量進行回收)
                      .maximumSize(20)
                      // 配置寫入后多久未更新,緩存會過期
                      .expireAfterWrite(10, TimeUnit.SECONDS)
                      //配置寫入后多久刷新緩存
                      .refreshAfterWrite(1, TimeUnit.SECONDS).build(cacheLoader);
        assertEquals(0, cache.size());
        assertEquals("value_HELLO", cache.getUnchecked("hello"));
        assertEquals(1, cache.size());

     // 通過 Callable 獲取數據
       String key = "mykey";
       String value = cache.get(key, new Callable<String>() {
          @Override
           public String call() throws Exception {
               return"call_" + key;
          }
        });
       System.out.println("call value:" + value);
}

和手工創建緩存對象不同,我們首先創建緩存加載器對象,并重寫 load 方法,然后通過緩存構造器創建 LoadingCache 對象 ,該對象支持寫入后刷新方法。

同時 LoadingCache 對象支持 Callable 模式,也就是調用 get 方法時,可以傳入 Callable 對象。這樣可以在使用緩存時,更加靈活。

2.回收策略

Guava Cache 提供了三種基本的緩存回收方式:

  • 基于容量回收策略
  • 基于時間的回收策略
  • 基于引用回收策略

2.1 基于容量回收策略

基于容量的回收策略可以分為兩種:基于大小基于權重

基于大小:我們可以使用 maximumSize 方法設置最大緩存項數量,當緩存項數量達到設定的最大值時,舊的緩存項將會被移除。

Cache<Object, Object> cache = CacheBuilder.newBuilder()
    .maximumSize(100)
    .build();

基于權重:如果不同的緩存值,需要占據不同的內存空間,也就是不同的緩存項有不同的“權重”(weights)。

我們可以使用 CacheBuilder.weigher(Weigher) 指定一個權重函數,并且用 maximumWeight(long) 指定最大總重。

Cache<Object, Object> cache = CacheBuilder.newBuilder()
    .maximumWeight(1000)
    .weigher(new Weigher<Object, Object>() {
        public int weigh(Object key, Object value) {
            // 定義權重計算方法
            return value.size();
        }
    }).build();

2.2 基于時間的回收策略

我們可以使用 expireAfterAccess 和 expireAfterWrite 方法設置緩存項的最大存活時間。

  • expireAfterAccess 表示緩存項在給定時間內沒有被讀/寫訪問會過期。
  • expireAfterWrite 表示緩存項在被創建或最后一次更新后的指定時間內會過期。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
    // 10分鐘沒有訪問后會被回收,或者重新加載
    .expireAfterAccess(10, TimeUnit.MINUTES)
    // 5分鐘沒有更新,緩存會被回收,或者重新加載
    // .expireAfterWrite(5,TimeUnit.MINUTES)
.build();

2.3 基于引用回收策略

Guava Cache 提供了以下三個方法來配置基于引用的回收策略:

  • weakKeys() 方法:
    通過調用 weakKeys() 方法,可以使緩存中的鍵使用弱引用。這意味著如果某個鍵沒有其他強引用指向它,那么該鍵可能會被垃圾回收,并且相應的緩存項也會被移除。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
       .weakKeys()
       .build();
  • weakValues() 方法:
    通過調用 weakValues() 方法,可以使緩存中的值使用弱引用。這樣,如果某個值沒有其他強引用指向它,那么該值可能會被垃圾回收,相應的緩存項也會被移除。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
       .weakValues()
       .build();
  • softValues() 方法:
    通過調用 softValues() 方法,可以使緩存中的值使用軟引用。軟引用相對于弱引用,更傾向于在內存不足時被垃圾回收。如果某個值沒有其他強引用指向它,且內存不足時,該值可能會被垃圾回收,相應的緩存項也會被移除。
Cache<Object, Object> cache = CacheBuilder.newBuilder()
       .softValues()
       .build();

一般來講,我們在生產環境使用的是(基于容量回收策略 + 基于時間的回收策略)兩者配合來使用。

當然 ,我們同樣可以使用手工回收的方式。

Cache<String,String> cache = CacheBuilder.newBuilder().build();
Object value = new Object();
cache.put("key1","value1");
cache.put("key2","value2");
cache.put("key3","value3");

//1.清除指定的key
cache.invalidate("key1");

//2.批量清除list中全部key對應的記錄
List<String> list = new ArrayList<String>();
list.add("key1");
list.add("key2");
cache.invalidateAll(list);

3.刷新策略

3.1 手工刷新

我們可以強制緩存加載器重新加載鍵的新值,調用 LoadingCache 對象的刷新方法。

String value = loadingCache.get("key");
loadingCache.refresh("key");

3.2 自動刷新

Guava Cache 提供了刷新(refresh)機制,可以通過 refreshAfterWrite 方法來設置刷新時間,當緩存項過期的同時可以重新加載新值。

Cache<String, String> cache = CacheBuilder.newBuilder()
    .refreshAfterWrite(5, TimeUnit.MINUTES)
     // 設置并發級別為3,并發級別是指可以同時寫緩存的線程數
    .concurrencyLevel(3)
    .build(new CacheLoader<String, String>() {
        @Override
        public String load(String key) throws Exception {
            // 異步加載新值的邏輯
            return fetchDataFromDataSource(key);
        }
    });
// 在獲取緩存值時,如果緩存項過期,將返回舊值 
String value = cache.get("exampleKey");

配置刷新方法 refreshAfterWrite,當大量線程同時訪問緩存項,緩存已過期時,更新線程調用 load 方法更新該緩存,其他請求線程并不需要等待,框架直接返回該緩存項的舊值。

因為更新線程同時也是請求線程,所以在上面的示例代碼里面,刷新緩存是個同步操作,可不可以異步的加載緩存呢 ?

我們有兩種方式:異步加載緩存的原理是重寫 reload 方法

@Test
public void testAnsynRefreshMethod1() throws InterruptedException, ExecutionException {
      ExecutorService executorService = Executors.newFixedThreadPool(5);
      CacheLoader<String, String> cacheLoader = new CacheLoader<String, String>() {
          //自動寫緩存數據的方法
          @Override
          public String load(String key) {
              System.out.println(Thread.currentThread().getName() + " 加載 key:" + key);
              // 從數據庫加載數據
              return"value_" + key.toUpperCase();
          }

          @Override
          //異步刷新緩存
          public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
              ListenableFutureTask<String> futureTask = ListenableFutureTask.create(() -> {
                  System.out.println(Thread.currentThread().getName() + " 異步加載 key:" + key + " oldValue:" + oldValue);
                  Thread.sleep(1000);
                  return load(key);
              });
              executorService.submit(futureTask);
              return futureTask;
          }
      };

      LoadingCache<String, String> cache = CacheBuilder.newBuilder()
              // 最大容量為20(基于容量進行回收)
              .maximumSize(20)
              //配置寫入后多久刷新緩存
              .refreshAfterWrite(2, TimeUnit.SECONDS).build(cacheLoader);

       String key = "hello";
       // 第一次加載
       String value = cache.get(key);
       System.out.println(value);
       Thread.sleep(3000);
      for (int i = 0; i < 10; i++) {
          executorService.execute(new Runnable() {
              @Override
              public void run() {
                  try {
                      String value2 = cache.get(key);
                      System.out.println(Thread.currentThread().getName() + value2);
                      // 第二次加載
                  } catch (Exception e) {
                       e.printStackTrace();
                  }
              }
          });
      }
      Thread.sleep(20000);
}

或者使用更優雅的使用方式:

ExecutorService executorService = Executors.newFixedThreadPool(5);
CacheLoader<String, String> cacheLoader = CacheLoader.asyncReloading(
           new CacheLoader<String, String>() {
                  //自動寫緩存數據的方法
                  @Override
                  public String load(String key) {
                      System.out.println(Thread.currentThread().getName() + " 加載 key:" + key);
                      // 從數據庫加載數據
                      return "value_" + key.toUpperCase();
                  }
            } , executorService);

自動刷新的缺點是:當緩存項到了指定過期時間,不管是同步刷新還是異步刷新,絕大部分請求線程都會返回舊的數據值,緩存值會有一定的延遲效果。

所以一般場景下,使用efreshAfterWrite和 expireAfterWrite配合使用 。

比如說控制緩存每1秒進行刷新,如果超過 2s 沒有訪問,那么則讓緩存失效,訪問時不會得到舊值,而是必須得待新值加載。

4.實現原理

Guava Cache 的數據結構跟 JDK1.7 的 ConcurrentHashMap 類似,如下圖所示:

圖片圖片

4.1 構造函數

public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
      CacheLoader<? super K1, V1> loader) {
   checkWeightWithWeigher();
   return new LocalCache.LocalLoadingCache<>(this, loader);
}

通過構造器 CacheBuilder 的構建方法創建本地緩存類 LocalCache 的靜態包裝類 LocalLoadingCache對象。

class LocalCache<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> {
   // ..... 省略代碼 
   staticclass LocalLoadingCache<K, V> extends LocalManualCache<K, V>
      implements LoadingCache<K, V> {
    
    LocalLoadingCache(
        CacheBuilder<? super K, ? super V> builder, CacheLoader<? super K, V> loader) {
      super(new LocalCache<K, V>(builder, checkNotNull(loader)));
    }
    // LoadingCache methods
    @Override
    public V get(K key) throws ExecutionException {
      return localCache.getOrLoad(key);
    }
    @Override
    public V getUnchecked(K key) {
      try {
        return get(key);
      } catch (ExecutionException e) {
        thrownew UncheckedExecutionException(e.getCause());
      }
    }
    @Override
    public ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException {
      return localCache.getAll(keys);
    }
    @Override
    public void refresh(K key) {
      localCache.refresh(key);
    }
   // ..... 省略代碼 
  }
}

LocalLoadingCache 類對外暴露了若干方法,它的底層依然是 LocalCache 對象來執行相關緩存操作,LocalCache 本質上就是一個 Map 。

4.2 初始化緩存

LocalCache(
      CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) {
    concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);
    // key的強度,即引用類型的強弱
    keyStrength = builder.getKeyStrength();
    // value的強度,即引用類型的強弱
    valueStrength = builder.getValueStrength();
    // key的比較策略,跟key的引用類型有關
    keyEquivalence = builder.getKeyEquivalence();
    // value的比較策略,跟value的引用類型有關
    valueEquivalence = builder.getValueEquivalence();

    maxWeight = builder.getMaximumWeight();
    weigher = builder.getWeigher();
    //訪問后的過期時間,設置了expireAfterAccess參數
    expireAfterAccessNanos = builder.getExpireAfterAccessNanos();
     //寫入后的過期時間,設置了expireAfterWrite參數
    expireAfterWriteNanos = builder.getExpireAfterWriteNanos();
    refreshNanos = builder.getRefreshNanos();

    int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY);
    if (evictsBySize() && !customWeigher()) {
      initialCapacity = (int) Math.min(initialCapacity, maxWeight);
    }
    // Find the lowest power-of-two segmentCount that exceeds concurrencyLevel, unless
    // maximumSize/Weight is specified in which case ensure that each segment gets at least 10
    // entries. The special casing for size-based eviction is only necessary because that eviction
    // happens per segment instead of globally, so too many segments compared to the maximum size
    // will result in random eviction behavior.
    int segmentShift = 0;
    int segmentCount = 1;
    while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) {
      ++segmentShift;
      segmentCount <<= 1;
    }
    this.segmentShift = 32 - segmentShift;
    segmentMask = segmentCount - 1;

    this.segments = newSegmentArray(segmentCount);
    
    int segmentCapacity = initialCapacity / segmentCount;
    if (segmentCapacity * segmentCount < initialCapacity) {
      ++segmentCapacity;
    }
    int segmentSize = 1;
    while (segmentSize < segmentCapacity) {
      segmentSize <<= 1;
    }
    if (evictsBySize()) {
      // Ensure sum of segment max weights = overall max weights
      long maxSegmentWeight = maxWeight / segmentCount + 1;
      long remainder = maxWeight % segmentCount;
      for (int i = 0; i < this.segments.length; ++i) {
        if (i == remainder) {
          maxSegmentWeight--;
        }
        this.segments[i] =
            createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
      }
    } else {
      for (int i = 0; i < this.segments.length; ++i) {
        this.segments[i] =
            createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
      }
    }
}

LocalCache 維護一個 Segment 數組,數組大小滿足如下條件:

  1. 數組大小是 2 的冪次 ,并且小于并發度 concurrencyLevel ;
  2. 若指定了容量大小,數組大小乘以 20 要大于緩存權重 maxWeight (假如設置容量大小最大值為40,那么 maxWeight 為 40 )。

接下來,我們看看 Segment 類的核心屬性 :

static class Segment<K, V> extends ReentrantLock {
    // 存活的元素大小
    volatileint count;
    // 存活的元素權重
    long totalWeight;
    //修改、更新的數量,用來做弱一致性
    int modCount;
    //擴容用
    int threshold;
    //存放Entry的數組,用來存放Entry,使用AtomicReferenceArray是因為要用CAS來保證原子性
    volatile@Nullable AtomicReferenceArray<ReferenceEntry<K, V>> table;
     //如果key是弱引用的話,那么被 GC 回收后,就會放到ReferenceQueue,要根據這個queue做一些清理工作
    final@Nullable ReferenceQueue<K> keyReferenceQueue;
    //如果value是弱引用的話,那么被 GC 回收后,就會放到ReferenceQueue,要根據這個queue做一些清理工作
    final@Nullable ReferenceQueue<V> valueReferenceQueue;
    //記錄哪些entry被訪問,用于accessQueue的更新。
    final Queue<ReferenceEntry<K, V>> recencyQueue;
    // 讀取次數計數器
    final AtomicInteger readCount = new AtomicInteger();
    // 如果一個元素新寫入,則會記到這個隊列的尾部,用來做expire
    @GuardedBy("this")
    final Queue<ReferenceEntry<K, V>> writeQueue;
    //讀、寫都會放到這個隊列,用來進行LRU替換算法
    @GuardedBy("this")
    final Queue<ReferenceEntry<K, V>> accessQueue;
}

ReferenceEntry 有幾種引用類型 :

圖片圖片

下圖展示了 StringEntry 核心屬性 :

圖片圖片

每種 Entry 對象都有 Next 屬性 ,指向下一個 Entry 。對象值 valueReference 默認是一個占位符 unSet ,表示沒有被設置過值。

4.3 查詢流程

進入 LoadingCache 的 get(key) 方法 , 如下代碼所示:

// 1.調用LoadingCache的getOrLoad 
V getOrLoad(K key) throws ExecutionException {
    return get(key, defaultLoader);
}
// 2.計算 key 的哈希值,并判斷位于哪一個段 Segment,最后通過查詢
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
    int hash = hash(checkNotNull(key));
    return segmentFor(hash).get(key, hash, loader);
}

(1)計算 key 對應的哈希值

int hash(@Nullable Object key) {
    int h = keyEquivalence.hash(key);
    return rehash(h);
}

(2)定位分段 Segment

Segment<K, V> segmentFor(int hash) {
   // segmentMask =  segmentCount - 1
   return segments[(hash >>> segmentShift) & segmentMask];
}

第二步驟,和 ConcurrentHashMap 類似,通過哈希值計算數據存儲在哪一個分段 Segment 。

(3)從定位的分段查詢出對象

V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      // 判斷 key、loader 是否為空 
      checkNotNull(key);
      checkNotNull(loader);
      try {
        if (count != 0) { // read-volatile
          // don't call getLiveEntry, which would ignore loading values
          // 根據hash定位到 table 的第一個 Entry
          ReferenceEntry<K, V> e = getEntry(key, hash);
          if (e != null) {
            // 獲取當前時間
            long now = map.ticker.read();
            // 獲取當前存活的 Value 
            V value = getLiveValue(e, now);
            if (value != null) {
              //記錄被訪問過
              recordRead(e, now);
              //記錄命中率
              statsCounter.recordHits(1);
              //判斷是否需要刷新,如果需要刷新,那么會去異步刷新,且返回舊值。
              return scheduleRefresh(e, key, hash, value, now, loader);
            }
            ValueReference<K, V> valueReference = e.getValueReference();
            //如果 Entry 過期了且數據還在加載中,則等待直到加載完成。
            if (valueReference.isLoading()) {
              return waitForLoadingValue(e, key, valueReference);
            }
          }
        }
        // at this point e is either null or expired;
        // 走到這一步表示: 之前沒有寫入過數據 || 數據已經過期 || 數據不是在加載中。
        return lockedGetOrLoad(key, hash, loader);
      } catch (ExecutionException ee) {
        Throwable cause = ee.getCause();
        if (cause instanceof Error) {
          thrownew ExecutionError((Error) cause);
        } elseif (cause instanceof RuntimeException) {
          thrownew UncheckedExecutionException(cause);
        }
        throw ee;
      } finally {
        postReadCleanup();
      }
 }
A 定位第一個Entry
ReferenceEntry<K, V> getEntry(Object key, int hash) {
    for (ReferenceEntry<K, V> e = getFirst(hash); e != null; e = e.getNext()) {
      // 判斷哈希值
      if (e.getHash() != hash) {
        continue;
      }
      // 判斷key
      K entryKey = e.getKey();
      if (entryKey == null) {
        tryDrainReferenceQueues();
        continue;
      }
      if (map.keyEquivalence.equivalent(key, entryKey)) {
        return e;
      }
    }
    returnnull;
}
B 從第一個 Entry 獲取存活的值
V getLiveValue(ReferenceEntry<K, V> entry, long now) {
     if (entry.getKey() == null) {
        tryDrainReferenceQueues();
        return null;
     }
     V value = entry.getValueReference().get();
     if (value == null) {
       tryDrainReferenceQueues();
       return null;
     }
     if (map.isExpired(entry, now)) {
       tryExpireEntries(now);
       return null;
     }
     return value;
}

boolean isExpired(ReferenceEntry<K, V> entry, long now) {
    checkNotNull(entry);
    // 如果配置了 expireAfterAccess ,比較當前時間和 Entry 的 accessTime 比較
    if (expiresAfterAccess() && (now - entry.getAccessTime() >= expireAfterAccessNanos)) {
      returntrue;
    }
    // 如果配置了 expireAfterWrite ,比較當前時間和 Entry 的 writeTime 比較
    if (expiresAfterWrite() && (now - entry.getWriteTime() >= expireAfterWriteNanos)) {
      returntrue;
    }
    returnfalse;
}

假如 Entry 的 key 為空,或者 vlaue 為空,或者過期了,則返回空 。

C 調度刷新 scheduleRefresh
V scheduleRefresh(
        ReferenceEntry<K, V> entry,
        K key,
        int hash,
        V oldValue,
        long now,
        CacheLoader<? super K, V> loader) {
       //1、是否配置了 refreshAfterWrite
       //2、用 writeTime 判斷是否達到刷新的時間
       //3、是否在加載中,如果是則沒必要再進行刷新
      if (map.refreshes()
          && (now - entry.getWriteTime() > map.refreshNanos)
          && !entry.getValueReference().isLoading()) {
          V newValue = refresh(key, hash, loader, true);
          if (newValue != null) {
              return newValue;
          }
      }
     return oldValue;
}

調度刷新方法會判斷三個條件 :

  • 配置了刷新時間 refreshAfterWrite
  • 當前時間減去 Entry 的寫入時間大于刷新時間
  • 當前 Entry 未處于加載中

當滿足了三個條件之后,調用 refresh 方法,當異步加載成功后,返回新值。

V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
     //插入一個 LoadingValueReference ,實質是把對應Entry的ValueReference替換為新建的LoadingValueReference
     final LoadingValueReference<K, V> loadingValueReference =
         insertLoadingValueReference(key, hash, checkTime);
     if (loadingValueReference == null) {
       returnnull;
     }
     // 調用異步加載方法loadAsync
     ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
     if (result.isDone()) {
       try {
         return Uninterruptibles.getUninterruptibly(result);
       } catch (Throwable t) {
         // don't let refresh exceptions propagate; error was already logged
       }
     }
     returnnull;
}

首先將 Entry 對象的 ValueReference 包裝為新建的 LoadingValueReference , 表明當前對象正在加載中。

LoadingValueReference<K, V> insertLoadingValueReference(
        final K key, final int hash, boolean checkTime) {
      ReferenceEntry<K, V> e = null;
      lock();
      try {
        long now = map.ticker.read();
        preWriteCleanup(now);
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);
        // Look for an existing entry.
        for (e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            // We found an existing entry.
            ValueReference<K, V> valueReference = e.getValueReference();
            if (valueReference.isLoading()
                || (checkTime && (now - e.getWriteTime() < map.refreshNanos))) {
              // refresh is a no-op if loading is pending
              // if checkTime, we want to check *after* acquiring the lock if refresh still needs
              // to be scheduled
              returnnull;
            }
            // continue returning old value while loading
            ++modCount;
            LoadingValueReference<K, V> loadingValueReference =
                new LoadingValueReference<>(valueReference);
            e.setValueReference(loadingValueReference);
            return loadingValueReference;
          }
        }
        ++modCount;
        LoadingValueReference<K, V> loadingValueReference = new LoadingValueReference<>();
        e = newEntry(key, hash, first);
        e.setValueReference(loadingValueReference);
        table.set(index, e);
        return loadingValueReference;
      } finally {
        unlock();
        postWriteCleanup();
      }
}

接下來,分析異步加載loadAsync方法:

ListenableFuture<V> loadAsync(
        final K key,
        final int hash,
        final LoadingValueReference<K, V> loadingValueReference,
        CacheLoader<? super K, V> loader) {
      final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
      loadingFuture.addListener(
          new Runnable() {
            @Override
            public void run() {
              try {
                getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
              } catch (Throwable t) {
                logger.log(Level.WARNING, "Exception thrown during refresh", t);
                loadingValueReference.setException(t);
              }
            }
          },
          directExecutor());
      return loadingFuture;
}

public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
      try {
        // 記錄耗時時間 
        stopwatch.start();
        V previousValue = oldValue.get();
        if (previousValue == null) {
          V newValue = loader.load(key);
          return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
        }
        ListenableFuture<V> newValue = loader.reload(key, previousValue);
        if (newValue == null) {
          return Futures.immediateFuture(null);
        }
        // To avoid a race, make sure the refreshed value is set into loadingValueReference
        // *before* returning newValue from the cache query.
        return transform(
            newValue,
            new com.google.common.base.Function<V, V>() {
              @Override
              public V apply(V newValue) {
                LoadingValueReference.this.set(newValue);
                return newValue;
              }
            },
            directExecutor());
      } catch (Throwable t) {
        ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
        if (t instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
        return result;
      }
 }

loadAsync 方法流程:

  • 調用 loadingValueReference 對象的 loadFuture 方法,假如舊數據為空值,則同步調用加載器 loader 的 load 方法 ,并返回包裝了新值的 Future 。
  • 假如舊數據不為空值,則調用加載器 loader 的 reload 方法(此處可以重新實現為異步的方式),經過轉換操作返回包裝了新值的 Future 。
  • 將新的值存儲在 Entry 對象里。
D 查詢/加載 lockedGetOrLoad

如果之前沒有寫入過數據 、 數據已經過期、 數據不是在加載中,則會調用lockedGetOrLoad方法。

V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
    ReferenceEntry<K, V> e;
    ValueReference<K, V> valueReference = null;
    LoadingValueReference<K, V> loadingValueReference = null;
    //用來判斷是否需要創建一個新的Entry
    boolean createNewEntry = true;
    //segment上鎖
    lock();
    try {
      // re-read ticker once inside the lock
      long now = map.ticker.read();
      //做一些清理工作
      preWriteCleanup(now);

      int newCount = this.count - 1;
      AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
      int index = hash & (table.length() - 1);
      ReferenceEntry<K, V> first = table.get(index);

      //通過key定位entry
      for (e = first; e != null; e = e.getNext()) {
        K entryKey = e.getKey();
        if (e.getHash() == hash
            && entryKey != null
            && map.keyEquivalence.equivalent(key, entryKey)) {
          //找到entry
          valueReference = e.getValueReference();
          //如果value在加載中則不需要重復創建entry
          if (valueReference.isLoading()) {
            createNewEntry = false;
          } else {
            V value = valueReference.get();
            //value為null說明已經過期且被清理掉了
            if (value == null) {
              //寫通知queue
              enqueueNotification(
                  entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
            //過期但還沒被清理
            } elseif (map.isExpired(e, now)) {
              //寫通知queue
              // This is a duplicate check, as preWriteCleanup already purged expired
              // entries, but let's accomodate an incorrect expiration queue.
              enqueueNotification(
                  entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
            } else {
              recordLockedRead(e, now);
              statsCounter.recordHits(1);
              //其他情況則直接返回value
              //來到這步,是不是覺得有點奇怪,我們分析一下: 
              //進入lockedGetOrLoad方法的條件是數據已經過期 || 數據不是在加載中,但是在lock之前都有可能發生并發,進而改變entry的狀態,所以在上面中再次判斷了isLoading和isExpired。所以來到這步說明,原來數據是過期的且在加載中,lock的前一刻加載完成了,到了這步就有值了。
              return value;
            }
            writeQueue.remove(e);
            accessQueue.remove(e);
            this.count = newCount; // write-volatile
          }
          break;
        }
      }
      //創建一個Entry,且set一個新的 LoadingValueReference。
      if (createNewEntry) {
        loadingValueReference = new LoadingValueReference<>();

        if (e == null) {
          e = newEntry(key, hash, first);
          e.setValueReference(loadingValueReference);
          table.set(index, e);
        } else {
          e.setValueReference(loadingValueReference);
        }
      }
    } finally {
      unlock();
      postWriteCleanup();
    }
    //同步加載數據
    if (createNewEntry) {
      try {
        synchronized (e) {
          return loadSync(key, hash, loadingValueReference, loader);
        }
      } finally {
        statsCounter.recordMisses(1);
      }
    } else {
      // The entry already exists. Wait for loading.
      return waitForLoadingValue(e, key, valueReference);
    }
}

5.總結

通過解析 Guava Cache 的實現原理,我們發現 Guava LocalCache 與 ConcurrentHashMap 有以下不同:

  • ConcurrentHashMap ”分段控制并發“是隱式的(實現中沒有Segment對象),而 LocalCache 是顯式的。
    在 JDK 1.8 之后,ConcurrentHashMap 采用synchronized + CAS 實現:當 put 的元素在哈希桶數組中不存在時,直接 CAS 進行寫操作;在發生哈希沖突的情況下使用 synchronized 鎖定頭節點。其實是比分段鎖更細粒度的鎖實現,只在特定場景下鎖定其中一個哈希桶,降低鎖的影響范圍。
  • Guava Cache 使用 ReferenceEntry 來封裝鍵值對,并且對于值來說,還額外實現了 ValueReference 引用對象來封裝對應 Value 對象。
  • Guava Cache 支持過期 + 自動 loader 機制,這也使得其加鎖方式與 ConcurrentHashMap 不同。
  • Guava Cache 支持 segment 粒度上支持了 LRU 機制, 體現在 Segment 上就是 writeQueue 和 accessQueue。
    隊列中的元素按照訪問或者寫時間排序,新的元素會被添加到隊列尾部。如果,在隊列中已經存在了該元素,則會先delete掉,然后再尾部添加該節點。
責任編輯:武曉燕 來源: 勇哥Java實戰
相關推薦

2024-08-29 08:28:17

2017-06-29 09:15:36

推薦算法策略

2009-12-11 11:08:31

靜態路由策略

2017-04-12 11:15:52

ReactsetState策略

2012-02-01 10:29:13

2011-11-04 14:07:20

微軟Hotmail策略

2020-02-10 09:35:18

數據中心服務器技術

2010-09-27 09:01:26

JVM分代垃圾回收

2024-11-20 11:55:58

2024-01-02 15:41:04

CythonPython語言

2024-01-04 08:33:11

異步JDK數據結構

2009-03-09 18:46:11

Windows phoWindows Mob

2018-10-24 14:30:30

緩存服務更新

2024-12-03 10:59:36

2024-07-30 14:31:01

2009-02-03 09:04:51

Oracle數據庫Oracle安全策略Oracle備份

2023-03-14 11:00:05

過期策略Redis

2015-10-30 09:33:48

ChromeAndroid合一

2010-11-11 14:36:17

MySQL

2025-02-21 12:00:00

SpringBoot防重復提交緩存機制
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 一区二区三区高清 | xnxx 日本免费 | 亚洲欧美在线视频 | 久久蜜桃资源一区二区老牛 | 欧美久久视频 | 一区二区三区四区免费在线观看 | 黄a网站| 亚洲精品久久久久久一区二区 | 手机看片在线播放 | 日本视频在线播放 | 国产又色又爽又黄又免费 | 久久不卡| 久久精品色欧美aⅴ一区二区 | 欧美一区二区视频 | 欧美激情欧美激情在线五月 | 日日干日日操 | 欧美色综合一区二区三区 | 国户精品久久久久久久久久久不卡 | 国产成人精品av | 成人毛片在线视频 | 亚洲综合在线播放 | 精品国产一区二区国模嫣然 | 337p日本欧洲亚洲大胆鲁鲁 | 久久久片 | 国产91av视频| 三级黄色片在线观看 | 中文字幕av在线播放 | 国产激情一区二区三区 | 国产一区二区三区在线 | 亚洲一区二区中文字幕在线观看 | 凹凸日日摸日日碰夜夜 | 欧美日韩在线一区二区 | 神马久久av | 在线午夜 | 亚洲人成人一区二区在线观看 | 久久五月婷 | 国产午夜久久久 | 91嫩草精品 | 久久精品国产免费看久久精品 | 成人免费视频播放 | 一区中文字幕 |