成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

實戰派 | Java項目中玩轉Redis6.0客戶端緩存!

數據庫 Redis
首先介紹一下今天要使用到的工具Lettuce,它是一個可伸縮線程安全的redis客戶端。多個線程可以共享同一個RedisConnection,利用nio框架Netty來高效地管理多個連接。

哈嘍大家好啊,我是Hydra。

在前面的文章中,我們介紹了Redis6.0中的新特性客戶端緩存client-side caching,通過telnet連接模擬客戶端,測試了三種客戶端緩存的工作模式,這篇文章我們就來點硬核實戰,看看客戶端緩存在java項目中應該如何落地。

鋪墊

首先介紹一下今天要使用到的工具Lettuce,它是一個可伸縮線程安全的redis客戶端。多個線程可以共享同一個RedisConnection,利用nio框架Netty來高效地管理多個連接。

放眼望向現在常用的redis客戶端開發工具包,雖然能用的不少,但是目前率先擁抱redis6.0,支持客戶端緩存功能的卻不多,而lettuce就是其中的領跑者。

我們先在項目中引入最新版本的依賴,下面正式開始實戰環節:

<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.8.RELEASE</version>
</dependency>

實戰

在項目中應用lettuce,開啟并使用客戶端緩存功能,只需要下面這一段代碼:

public static void main(String[] args) throws InterruptedException {
// 創建 RedisClient 連接信息
RedisURI redisURI= RedisURI.builder()
.withHost("127.0.0.1")
.withPort(6379)
.build();
RedisClient client = RedisClient.create(redisURI);
StatefulRedisConnection<String, String> connect = client.connect();

Map<String, String> map = new HashMap<>();
CacheFrontend<String,String> frontend=ClientSideCaching.enable(CacheAccessor.forMap(map),
connect, TrackingArgs.Builder.enabled().noloop());

String key="user";
while (true){
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(10);
}
}

上面的代碼主要完成了幾項工作:

  • 通過RedisURI配置redis連接的標準信息,并建立連接。
  • 創建用于充當本地緩存的Map,開啟客戶端緩存功能,建立一個緩存訪問器CacheFrontend。
  • 在循環中使用CacheFrontend,不斷查詢同一個key對應的值并打印。

啟動上面的程序,控制臺會不斷的打印user對應的緩存,在啟動一段時間后,我們在其他的客戶端修改user對應的值,運行的結果如下:

可以看到,在其他客戶端修改了key所對應的值后,打印結果也發生了變化。但是到這里,我們也不知道lettuce是不是真的使用了客戶端緩存,雖然結果正確,但是說不定是它每次都重新執行了get命令呢?

所以我們下面來看看源碼,分析一下具體的代碼執行流程。

分析

在上面的代碼中,最關鍵的類就是CacheFrontend了,我們再來仔細看一下上面具體實例化時的語句:

CacheFrontend<String,String> frontend=ClientSideCaching.enable(
CacheAccessor.forMap(map),
connect,
TrackingArgs.Builder.enabled().noloop()
);

首先調用了ClientSideCaching的enable()方法,我們看一下它的源碼:

解釋一下傳入的3個參數:

  • CacheAccessor:一個定義對客戶端緩存進行訪問接口,上面調用它的forMap方法返回的是一個MapCacheAccessor,它的底層使用的我們自定義的Map來存放本地緩存,并且提供了get、put、evict等方法操作Map。
  • StatefulRedisConnection:使用到的redis連接。
  • TrackingArgs:客戶端緩存的參數配置,使用noloop后不會接收當前連接修改key后的通知。

向redis服務端發送開啟tracking的命令后,繼續向下調用create()方法:

這個過程中實例化了一個重要對象,它就是實現了RedisCache接口的DefaultRedisCache對象,實際向redis執行查詢時的get請求、寫入的put請求,都是由它來完成。

實例化完成后,繼續向下調用同名的create()方法:

在這個方法中,實例化了ClientSideCaching對象,注意一下傳入的兩個參數,通過前面的介紹也很好理解它們的分工:

  • 當本地緩存存在時,直接從CacheAccessor中讀取。
  • 當本地緩存不存在時,使用RedisCache從服務端讀取。

需要額外注意一下的是返回前的兩行代碼,先看第一句(行號114的那行)。

這里向RedisCache添加了一個監聽,當監聽到類型為invalidate的作廢消息時,拿到要作廢的key,傳遞給消費者。一般情況下,keys中只會有一個元素。

消費時會遍歷當前ClientSideCaching的消費者列表invalidationListeners:

而這個列表中的所有,就是在上面的第二行代碼中(行號115的那行)添加的,看一下方法的定義:

而實際傳入的方法引用則是下面MapCacheAccessor的evict()方法,也就是說,當收到key作廢的消息后,會移除掉本地緩存Map中緩存的這個數據。

客戶端緩存的作廢邏輯我們梳理清楚了,再來看看它是何時寫入的,直接看ClientSideCaching的get()方法:

可以看到,get方法會先從本地緩存MapCacheAccessor中嘗試獲取,如果取到則直接返回,如果沒有再使用RedisCache讀取redis中的緩存,并將返回的結果存入到MapCacheAccessor中。

圖解

源碼看到這里,是不是基本邏輯就串聯起來了,我們再畫兩張圖來梳理一下這個流程。先看get的過程:

再來看一下通知客戶端緩存失效的過程:

怎么樣,配合這兩張圖再理解一下,是不是很完美?

其實也不是…回憶一下我們之前使用兩級緩存Caffeine+Redis時,當時使用的通知機制,會在修改redis緩存后通知所有主機修改本地緩存,修改成為最新的值。目前的lettuce看來,顯然不滿足這一功能,只能做到作廢刪除緩存但是不會主動更新。

擴展

那么,如果想實現本地客戶端緩存的實時更新,我們應該如何在現在的基礎上進行擴展呢?仔細想一下的話,思路也很簡單:

  • 首先,移除掉lettuce的客戶端緩存本身自帶的作廢消息監聽器
  • 然后,添加我們自己的作廢消息監聽器

回顧一下上面源碼分析的圖,在調用DefaultRedisCache的addInvalidationListener()方法時,其實是調用的是StatefulRedisConnection的addListener()方法,也就是說,這個監聽器其實是添加在redis連接上的。

如果我們再看一下這個方法源碼的話,就會發現,在它的附近還有一個對應的removeListener()方法,一看就是我們要找的東西,準備用它來移除消息監聽。

不過再仔細看看,這個方法是要傳參數的啊,我們明顯不知道現在里面已經存在的PushListener有什么,所以沒法直接使用,那么無奈只能再接著往下看看這個pushHandler是什么玩意…

通過注釋可以知道,這個PushHandler就是一個用來操作PushListener的處理工具,雖然我們不知道具體要移除的PushListener是哪一個,但是驚喜的是,它提供了一個getPushListeners()方法,可以獲取當前所有的監聽器。

這樣一來就簡單了,我上來直接清除掉這個集合中的所有監聽器,問題就迎刃而解了~

不過,在StatefulRedisConnectionImpl中的pushHandler是一個私有對象,也沒有對外進行暴露,想要操作起來還是需要費上一點功夫的。下面,我們就在分析的結果上進行代碼的修改。

魔改

首先,我們需要自定義一個工具類,它的主要功能是操作監聽器,所以就命名為ListenerChanger好了。它要完成的功能主要有三個:

  • 移除原有的全部消息監聽。
  • 添加新的自定義消息監聽。
  • 更新本地緩存MapCacheAccessor中的數據。

首先定義構造方法,需要傳入StatefulRedisConnection和CacheAccessor作為參數,在后面的方法中會用到,并且創建一個RedisCommands,用于后面向redis服務端發送get命令請求。

public class ListenerChanger<K, V> {
private StatefulRedisConnection<K, V> connection;
private CacheAccessor<K, V> mapCacheAccessor;
private RedisCommands<K, V> command;
public ListenerChanger(StatefulRedisConnection<K, V> connection,
CacheAccessor<K, V> mapCacheAccessor) {
this.connection = connection;
this.mapCacheAccessor = mapCacheAccessor;
this.command = connection.sync();
}

//其他方法先省略……
}

移除監聽

前面說過,pushHandler是一個私有對象,我們無法直接獲取和操作,所以只能先使用反射獲得。PushHandler中的監聽器列表存儲在一個CopyOnWriteArrayList中,我們直接使用迭代器移除掉所有內容即可。

public void removeAllListeners() {
try {
Class connectionClass = StatefulRedisConnectionImpl.class;
Field pushHandlerField = connectionClass.getDeclaredField("pushHandler");
pushHandlerField.setAccessible(true);
PushHandler pushHandler = (PushHandler) pushHandlerField.get(this.connection);

CopyOnWriteArrayList<PushListener> pushListeners
= (CopyOnWriteArrayList) pushHandler.getPushListeners();
Iterator<PushListener> it = pushListeners.iterator();
while (it.hasNext()) {
PushListener listener = it.next();
pushListeners.remove(listener);
}
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}

添加監聽

這里我們模仿DefaultRedisCache中addInvalidationListener()方法的寫法,添加一個監聽器,除了最后處理的代碼基本一致。對于監聽到的要作廢的keys集合,另外啟動一個線程更新本地數據。

public void addNewListener() {
this.connection.addListener(new PushListener() {
@Override
public void onPushMessage(PushMessage message) {
if (message.getType().equals("invalidate")) {
List<Object> content = message.getContent(StringCodec.UTF8::decodeKey);
List<K> keys = (List<K>) content.get(1);
System.out.println("modifyKeys:"+keys);

// start a new thread to update cacheAccessor
new Thread(()-> updateMap(keys)).start();
}
}
});
}

本地更新

使用RedisCommands重新從redis服務端獲取最新的數據,并更新本地緩存mapCacheAccessor中的數據。

private void updateMap(List<K> keys){
for (K key : keys) {
V newValue = this.command.get(key);
System.out.println("newValue:"+newValue);
mapCacheAccessor.put(key, newValue);
}
}

至于為什么執行這個方法時額外啟動了一個新線程,是因為我在測試中發現,當在PushListener的onPushMessage方法中執行RedisCommands的get()方法時,會一直取不到值,但是像這樣新啟動一個線程就沒有問題。

測試

下面,我們來寫一段測試代碼,來測試上面的改動。

public static void main(String[] args) throws InterruptedException {
// 省略之前創建連接代碼……

Map<String, String> map = new HashMap<>();
CacheAccessor<String, String> mapCacheAccessor = CacheAccessor.forMap(map);
CacheFrontend<String, String> frontend = ClientSideCaching.enable(mapCacheAccessor,
connect,
TrackingArgs.Builder.enabled().noloop());

ListenerChanger<String, String> listenerChanger
= new ListenerChanger<>(connect, mapCacheAccessor);
// 移除原有的listeners
listenerChanger.removeAllListeners();
// 添加新的監聽器
listenerChanger.addNewListener();

String key = "user";
while (true) {
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(30);
}
}

可以看到,代碼基本上在之前的基礎上沒有做什么改動,只是在創建完ClientSideCaching后,執行了我們自己實現的ListenerChanger的兩個方法。先移除所有監聽器、再添加新的監聽器。下面我們以debug模式啟動測試代碼,簡單看一下代碼的執行邏輯。

首先,在未執行移除操作前,pushHandler中的監聽器列表中有一個監聽器:

移除后,監聽器列表為空:

在添加完自定義監聽器、并且執行完第一次查詢操作后,在另外一個redis客戶端中修改user的值,這時PushListener會收到作廢類型的消息監聽:

啟動一個新線程,查詢redis中user對應的最新值,并放入cacheAccessor中:

當循環中CacheFrontend的get()方法再被執行時,會直接從cacheAccessor中取到刷新后的值,不需要再次去訪問redis服務端了:

總結

到這里,我們基于lettuce的客戶端緩存的基本使用、以及在這個基礎上進行的魔改就基本完成了??梢钥吹?,lettuce客戶端已經在底層封裝了一套比較成熟的API,能讓我們在將redis升級到6.0以后,開箱即用式地使用客戶端緩存這一新特性。在使用中,不需要我們關注底層原理,也不用做什么業務邏輯的改造,總的來說,使用起來還是挺香的。


責任編輯:姜華 來源: 碼農參上
相關推薦

2020-05-11 21:31:02

Redis 6.0緩存客戶端

2020-05-20 14:45:38

緩存MySQL數據庫

2022-02-11 16:35:41

Redis6.0代碼命令

2022-04-28 13:58:41

Redis6客戶端服務端

2011-11-30 14:21:19

Java分布式緩存

2015-08-17 09:48:29

C#客戶端分布式緩存

2011-08-17 10:10:59

2021-06-15 09:20:08

Redis數據類型

2021-09-22 15:46:29

虛擬桌面瘦客戶端胖客戶端

2010-05-31 10:11:32

瘦客戶端

2011-10-26 13:17:05

2014-08-11 16:35:35

KafkaJava客戶端

2022-09-23 08:02:42

Kafka消息緩存

2011-03-24 13:00:31

配置nagios客戶端

2011-03-02 14:36:24

Filezilla客戶端

2010-12-21 11:03:15

獲取客戶端證書

2009-06-27 20:32:00

LinuxNFS客戶端

2013-03-20 11:01:37

Redis客戶端連接

2023-12-01 08:18:24

Redis網絡

2010-03-18 16:49:43

Java Socket
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲精品68久久久一区 | 欧美精品久久久久 | 国产精品久久欧美久久一区 | 一级免费在线视频 | 色吧久久 | 91国内外精品自在线播放 | 亚洲国产免费 | 91视频.com| 亚洲一区在线播放 | 91av在线电影| 91久久精品国产免费一区 | 午夜视频在线免费观看 | com.色.www在线观看 | 日本在线一二 | 大香网伊人 | 一级黄色在线 | 中文字幕精品一区二区三区在线 | 天天曰天天干 | 免费人成激情视频在线观看冫 | 午夜a区| 日韩精品一区二区三区第95 | 激情三区| 在线免费观看毛片 | 国产精品成人一区二区三区 | 久久久久久久久久久高潮一区二区 | 在线第一页 | 三级免费av | 日韩欧美一区二区三区四区 | 欧美日韩在线观看视频 | аⅴ资源新版在线天堂 | 国产精品久久久久久久久久99 | 亚洲国产成人av好男人在线观看 | 91精品国产91久久综合桃花 | 日本aa毛片a级毛片免费观看 | 成人影院在线观看 | 亚洲国产一区二区三区 | 国产在线色 | 一区二区三区国产在线观看 | 国产成人小视频 | 色婷婷精品 | 日韩一区二区在线观看视频 |