Redis 做分布式鎖你會幾種姿勢?
Redis 簡簡單單的幾種數據類型,一個 key/value 數據庫,現在又是分布式鎖、又是限流工具、又是消息隊列......,感覺都要被玩壞了。不過話說回來,Redis 在這么多場合被開發者們喜歡,還是得益于它極高的性能與使用的簡潔性。
在面試的時候,說到 Redis ,很多人第一反應就是緩存,其實除了緩存,Redis 還有非常多豐富的使用場景,這些使用場景,松哥在未來都會和大家一一分享。
今天就先來看一個簡單的,用 Redis 做分布式鎖。
1.什么是分布式鎖
首先我們來看一個問題場景:
例如一個簡單的用戶操作,一個線程去修改用戶的狀態,首先從數據庫中讀出用戶的狀態,然后在內存中進行修改,修改完成后,再存回去。在單線程中,這個操作沒有問題,但是在多線程中,由于讀取、修改、存 這是三個操作,不是原子操作,所以在多線程中,這樣會出問題。
解決這個問題,我們就需要鎖,對于鎖,大家應該不會陌生,在 Java 中的 synchronized 以及 ReentrantLock 可重入鎖都是我們比較常見的,但是這種鎖都是本地鎖,現在微服務、分布式系統思想大行其道,在這樣的系統中,本地鎖顯然是不夠用的,于是大家紛紛想辦法,如何在分布式環境下解決鎖的問題。想出來的辦法很多,我們可以通過 MySQL、可以通過 ZK、也可以通過 Redis ,都可以用來解決分布式鎖的問題,這里我們主要來看看如何通過 Redis 解決分布式鎖問題。
2.解決方案
2.1 整體思路
分布式鎖實現的思路很簡單,就是進來一個線城先占位,當別的線城進來操作時,發現已經有人占位了,就會放棄或者稍后再試。
在 Redis 中,占位一般使用 setnx 指令,先進來的線程先占位,線程的操作執行完成后,再調用 del 指令釋放位子。同時為了防止死鎖,我們一般還要給鎖加一個過期時間,到期了自動釋放。
基于這樣的思路,我們來看兩種不同的實現方式:
2.2 解決方案一
基于我們前面所說的思路,可以使用 setnx 和 expire 實現分布式鎖,但是 setnx 和設置過期時間 expire 這是兩個操作,這兩個操作一起的話就不具備原子性(除非自己寫 Lua 腳本),為了解決這個問題,從 Redis2.8 開始,setnx 和 expire 可以通過一個命令一起來執行了,這個命令就是 set,set 中多了一個參數:
從圖中大家可以看到,在 key/value 之后,還有一個 EX 5 表示以秒計的過期時間(PX 表示以毫秒計的過期時間),最后的 NX 就是說如果 k1 不存在,這條命令執行成功,否則執行失敗,這就相當于 setnx 的效果了。
因此,我們封裝的鎖如下:
public class LockTest {
public static void main(String[] args) {
Redis redis = new Redis();
redis.execute(jedis->{
String set = jedis.set("k1", "v1", new SetParams().nx().ex(5));
if (set !=null && "OK".equals(set)) {
//沒人占位
jedis.set("name", "javaboy");
String name = jedis.get("name");
System.out.println(name);
jedis.del("k1");//釋放資源
}else{
//有人占位,停止/暫緩 操作
}
});
}
}
對于上面這段代碼,大家重點看思路,不必深究代碼細節:
- 首先構造一個 Redis 實例,然后調用 execute 方法,這個是我自己封裝的方法,目的是為了配置 Jedis 連接池并及時回收使用過的資源。這一塊小伙伴們測試的時候可以直接使用自己創建的 Jedis 實例,效果是一樣的。
- 調用 jedis 中的 set 方法,注意第三個參數,我們設置了 nx 同時 設置了過期時間為 5 秒,這就相當于 setnx 和 expire 兩個命令的結合體。
- 如果成功執行了 set 命令,在 if 中就可以去寫自己的業務了。如果沒能搶到鎖,則可以進入到一個延遲消息隊列中,停一會再去嘗試。
但是這樣的封裝,又帶來了一個新的問題,那就是超時問題,關于超時問題,松哥通過一個??視頻??教程來和大家分享。
2.3 解決方案二
上面的代碼寫著還是蠻長的,那么有沒有簡單一點的辦法呢?當然是有的!那就是 Redisson。
相對于 Jedis 這種原生態的應用,Redisson 對 Redis 請求做了較多的封裝,對于鎖,也提供了對應的方法可以直接使用:
Config config = new Config();
//配置 Redis 基本連接信息
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123");
//獲取一個 RedissonClient 對象
RedissonClient redisson = Redisson.create(config);
//獲取一個鎖對象實例
RLock lock = redisson.getLock("lock");
try {
//獲取鎖
boolean b = lock.tryLock(500, 1000, TimeUnit.MILLISECONDS);
if (b) {
//獲取到鎖了,開始寫業務
RBucket<Object> bucket = redisson.getBucket("javaboy");
bucket.set("www.javaboy.org");
Object o = bucket.get();
System.out.println(o);
}else{
System.out.println("沒拿到鎖");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//釋放鎖
lock.unlock();
}
在這段代碼中,核心的就是 lock.tryLock(500, 1000, TimeUnit.MILLISECONDS);,第一個參數是嘗試加鎖的等待時間為 500 毫秒,第二個參數表示鎖的超時時間為 1000 毫秒,也就是這個鎖在 1000 毫秒后會自動失效。
小伙伴們發現,這和我們在方案一里邊配置的參數是一樣的,其實思路是不變的,Redisson 只不過是將我們寫的和鎖相關的方法封裝起來了而已。
3.小結
當然,這里我只是先簡單介紹下加鎖的思路以及在 Redis 單機中如何加鎖,后面松哥再和大家分享 Redis 集群中如何加鎖。