實戰派 | Java項目中玩轉Redis6.0客戶端緩存!
哈嘍大家好啊,我是Hydra。
在前面的文章中,我們介紹了Redis6.0中的新特性客戶端緩存client-side caching,通過telnet連接模擬客戶端,測試了三種客戶端緩存的工作模式,這篇文章我們就來點硬核實戰,看看客戶端緩存在java項目中應該如何落地。
鋪墊
首先介紹一下今天要使用到的工具Lettuce,它是一個可伸縮線程安全的redis客戶端。多個線程可以共享同一個RedisConnection,利用nio框架Netty來高效地管理多個連接。
放眼望向現在常用的redis客戶端開發工具包,雖然能用的不少,但是目前率先擁抱redis6.0,支持客戶端緩存功能的卻不多,而lettuce就是其中的領跑者。
我們先在項目中引入最新版本的依賴,下面正式開始實戰環節:
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.8.RELEASE</version>
</dependency>
實戰
在項目中應用lettuce,開啟并使用客戶端緩存功能,只需要下面這一段代碼:
public static void main(String[] args) throws InterruptedException {
// 創建 RedisClient 連接信息
RedisURI redisURI= RedisURI.builder()
.withHost("127.0.0.1")
.withPort(6379)
.build();
RedisClient client = RedisClient.create(redisURI);
StatefulRedisConnection<String, String> connect = client.connect();
Map<String, String> map = new HashMap<>();
CacheFrontend<String,String> frontend=ClientSideCaching.enable(CacheAccessor.forMap(map),
connect, TrackingArgs.Builder.enabled().noloop());
String key="user";
while (true){
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(10);
}
}
上面的代碼主要完成了幾項工作:
- 通過RedisURI配置redis連接的標準信息,并建立連接。
- 創建用于充當本地緩存的Map,開啟客戶端緩存功能,建立一個緩存訪問器CacheFrontend。
- 在循環中使用CacheFrontend,不斷查詢同一個key對應的值并打印。
啟動上面的程序,控制臺會不斷的打印user對應的緩存,在啟動一段時間后,我們在其他的客戶端修改user對應的值,運行的結果如下:
可以看到,在其他客戶端修改了key所對應的值后,打印結果也發生了變化。但是到這里,我們也不知道lettuce是不是真的使用了客戶端緩存,雖然結果正確,但是說不定是它每次都重新執行了get命令呢?
所以我們下面來看看源碼,分析一下具體的代碼執行流程。
分析
在上面的代碼中,最關鍵的類就是CacheFrontend了,我們再來仔細看一下上面具體實例化時的語句:
CacheFrontend<String,String> frontend=ClientSideCaching.enable(
CacheAccessor.forMap(map),
connect,
TrackingArgs.Builder.enabled().noloop()
);
首先調用了ClientSideCaching的enable()方法,我們看一下它的源碼:
解釋一下傳入的3個參數:
- CacheAccessor:一個定義對客戶端緩存進行訪問接口,上面調用它的forMap方法返回的是一個MapCacheAccessor,它的底層使用的我們自定義的Map來存放本地緩存,并且提供了get、put、evict等方法操作Map。
- StatefulRedisConnection:使用到的redis連接。
- TrackingArgs:客戶端緩存的參數配置,使用noloop后不會接收當前連接修改key后的通知。
向redis服務端發送開啟tracking的命令后,繼續向下調用create()方法:
這個過程中實例化了一個重要對象,它就是實現了RedisCache接口的DefaultRedisCache對象,實際向redis執行查詢時的get請求、寫入的put請求,都是由它來完成。
實例化完成后,繼續向下調用同名的create()方法:
在這個方法中,實例化了ClientSideCaching對象,注意一下傳入的兩個參數,通過前面的介紹也很好理解它們的分工:
- 當本地緩存存在時,直接從CacheAccessor中讀取。
- 當本地緩存不存在時,使用RedisCache從服務端讀取。
需要額外注意一下的是返回前的兩行代碼,先看第一句(行號114的那行)。
這里向RedisCache添加了一個監聽,當監聽到類型為invalidate的作廢消息時,拿到要作廢的key,傳遞給消費者。一般情況下,keys中只會有一個元素。
消費時會遍歷當前ClientSideCaching的消費者列表invalidationListeners:
而這個列表中的所有,就是在上面的第二行代碼中(行號115的那行)添加的,看一下方法的定義:
而實際傳入的方法引用則是下面MapCacheAccessor的evict()方法,也就是說,當收到key作廢的消息后,會移除掉本地緩存Map中緩存的這個數據。
客戶端緩存的作廢邏輯我們梳理清楚了,再來看看它是何時寫入的,直接看ClientSideCaching的get()方法:
可以看到,get方法會先從本地緩存MapCacheAccessor中嘗試獲取,如果取到則直接返回,如果沒有再使用RedisCache讀取redis中的緩存,并將返回的結果存入到MapCacheAccessor中。
圖解
源碼看到這里,是不是基本邏輯就串聯起來了,我們再畫兩張圖來梳理一下這個流程。先看get的過程:
再來看一下通知客戶端緩存失效的過程:
怎么樣,配合這兩張圖再理解一下,是不是很完美?
其實也不是…回憶一下我們之前使用兩級緩存Caffeine+Redis時,當時使用的通知機制,會在修改redis緩存后通知所有主機修改本地緩存,修改成為最新的值。目前的lettuce看來,顯然不滿足這一功能,只能做到作廢刪除緩存但是不會主動更新。
擴展
那么,如果想實現本地客戶端緩存的實時更新,我們應該如何在現在的基礎上進行擴展呢?仔細想一下的話,思路也很簡單:
- 首先,移除掉lettuce的客戶端緩存本身自帶的作廢消息監聽器
- 然后,添加我們自己的作廢消息監聽器
回顧一下上面源碼分析的圖,在調用DefaultRedisCache的addInvalidationListener()方法時,其實是調用的是StatefulRedisConnection的addListener()方法,也就是說,這個監聽器其實是添加在redis連接上的。
如果我們再看一下這個方法源碼的話,就會發現,在它的附近還有一個對應的removeListener()方法,一看就是我們要找的東西,準備用它來移除消息監聽。
不過再仔細看看,這個方法是要傳參數的啊,我們明顯不知道現在里面已經存在的PushListener有什么,所以沒法直接使用,那么無奈只能再接著往下看看這個pushHandler是什么玩意…
通過注釋可以知道,這個PushHandler就是一個用來操作PushListener的處理工具,雖然我們不知道具體要移除的PushListener是哪一個,但是驚喜的是,它提供了一個getPushListeners()方法,可以獲取當前所有的監聽器。
這樣一來就簡單了,我上來直接清除掉這個集合中的所有監聽器,問題就迎刃而解了~
不過,在StatefulRedisConnectionImpl中的pushHandler是一個私有對象,也沒有對外進行暴露,想要操作起來還是需要費上一點功夫的。下面,我們就在分析的結果上進行代碼的修改。
魔改
首先,我們需要自定義一個工具類,它的主要功能是操作監聽器,所以就命名為ListenerChanger好了。它要完成的功能主要有三個:
- 移除原有的全部消息監聽。
- 添加新的自定義消息監聽。
- 更新本地緩存MapCacheAccessor中的數據。
首先定義構造方法,需要傳入StatefulRedisConnection和CacheAccessor作為參數,在后面的方法中會用到,并且創建一個RedisCommands,用于后面向redis服務端發送get命令請求。
public class ListenerChanger<K, V> {
private StatefulRedisConnection<K, V> connection;
private CacheAccessor<K, V> mapCacheAccessor;
private RedisCommands<K, V> command;
public ListenerChanger(StatefulRedisConnection<K, V> connection,
CacheAccessor<K, V> mapCacheAccessor) {
this.connection = connection;
this.mapCacheAccessor = mapCacheAccessor;
this.command = connection.sync();
}
//其他方法先省略……
}
移除監聽
前面說過,pushHandler是一個私有對象,我們無法直接獲取和操作,所以只能先使用反射獲得。PushHandler中的監聽器列表存儲在一個CopyOnWriteArrayList中,我們直接使用迭代器移除掉所有內容即可。
public void removeAllListeners() {
try {
Class connectionClass = StatefulRedisConnectionImpl.class;
Field pushHandlerField = connectionClass.getDeclaredField("pushHandler");
pushHandlerField.setAccessible(true);
PushHandler pushHandler = (PushHandler) pushHandlerField.get(this.connection);
CopyOnWriteArrayList<PushListener> pushListeners
= (CopyOnWriteArrayList) pushHandler.getPushListeners();
Iterator<PushListener> it = pushListeners.iterator();
while (it.hasNext()) {
PushListener listener = it.next();
pushListeners.remove(listener);
}
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}
添加監聽
這里我們模仿DefaultRedisCache中addInvalidationListener()方法的寫法,添加一個監聽器,除了最后處理的代碼基本一致。對于監聽到的要作廢的keys集合,另外啟動一個線程更新本地數據。
public void addNewListener() {
this.connection.addListener(new PushListener() {
@Override
public void onPushMessage(PushMessage message) {
if (message.getType().equals("invalidate")) {
List<Object> content = message.getContent(StringCodec.UTF8::decodeKey);
List<K> keys = (List<K>) content.get(1);
System.out.println("modifyKeys:"+keys);
// start a new thread to update cacheAccessor
new Thread(()-> updateMap(keys)).start();
}
}
});
}
本地更新
使用RedisCommands重新從redis服務端獲取最新的數據,并更新本地緩存mapCacheAccessor中的數據。
private void updateMap(List<K> keys){
for (K key : keys) {
V newValue = this.command.get(key);
System.out.println("newValue:"+newValue);
mapCacheAccessor.put(key, newValue);
}
}
至于為什么執行這個方法時額外啟動了一個新線程,是因為我在測試中發現,當在PushListener的onPushMessage方法中執行RedisCommands的get()方法時,會一直取不到值,但是像這樣新啟動一個線程就沒有問題。
測試
下面,我們來寫一段測試代碼,來測試上面的改動。
public static void main(String[] args) throws InterruptedException {
// 省略之前創建連接代碼……
Map<String, String> map = new HashMap<>();
CacheAccessor<String, String> mapCacheAccessor = CacheAccessor.forMap(map);
CacheFrontend<String, String> frontend = ClientSideCaching.enable(mapCacheAccessor,
connect,
TrackingArgs.Builder.enabled().noloop());
ListenerChanger<String, String> listenerChanger
= new ListenerChanger<>(connect, mapCacheAccessor);
// 移除原有的listeners
listenerChanger.removeAllListeners();
// 添加新的監聽器
listenerChanger.addNewListener();
String key = "user";
while (true) {
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(30);
}
}
可以看到,代碼基本上在之前的基礎上沒有做什么改動,只是在創建完ClientSideCaching后,執行了我們自己實現的ListenerChanger的兩個方法。先移除所有監聽器、再添加新的監聽器。下面我們以debug模式啟動測試代碼,簡單看一下代碼的執行邏輯。
首先,在未執行移除操作前,pushHandler中的監聽器列表中有一個監聽器:
移除后,監聽器列表為空:
在添加完自定義監聽器、并且執行完第一次查詢操作后,在另外一個redis客戶端中修改user的值,這時PushListener會收到作廢類型的消息監聽:
啟動一個新線程,查詢redis中user對應的最新值,并放入cacheAccessor中:
當循環中CacheFrontend的get()方法再被執行時,會直接從cacheAccessor中取到刷新后的值,不需要再次去訪問redis服務端了:
總結
到這里,我們基于lettuce的客戶端緩存的基本使用、以及在這個基礎上進行的魔改就基本完成了??梢钥吹?,lettuce客戶端已經在底層封裝了一套比較成熟的API,能讓我們在將redis升級到6.0以后,開箱即用式地使用客戶端緩存這一新特性。在使用中,不需要我們關注底層原理,也不用做什么業務邏輯的改造,總的來說,使用起來還是挺香的。