糾正誤區:這才是 SpringBoot Redis 分布式鎖的正確實現方式
我是碼哥,可以叫我靚仔。
在說分布式鎖之前,我們先說下為什么需要分布式鎖。
在單機部署的時候,我們可以使用 Java 中提供的 JUC 鎖機制避免多線程同時操作一個共享變量產生的安全問題。JUC 鎖機制只能保證同一個 JVM 進程中的同一時刻只有一個線程操作共享資源。
一個應用部署多個節點,多個進程如果要修改同一個共享資源,為了避免操作亂序導致的并發安全問題,這個時候就需要引入分布式鎖,分布式鎖就是用來控制同一時刻,只有一個 JVM 進程中的一個線程可以訪問被保護的資源。
分布式鎖很重要,然而很多公司的系統可能還在跑著有缺陷的分布式鎖方案,其中不乏一些大型公司。
所以,碼哥今天分享一個正確 Redis 分布式鎖代碼實戰,讓你一飛沖天,該代碼可直接用于生產,不是簡單的 demo。
溫馨提示:如果你只想看代碼實戰部分,可直接翻到 SpringBoot 實戰章節。
錯誤的分布式鎖
說正確方案之前,先來一個錯誤的,知道錯在哪,才能意識到如何寫正確。
在銀行工作的小白老師,使用 Redis SET 指令實現加鎖, 指令滿足了當 key 不存在則設置 value,同時設置超時時間,并且滿足原子語意。
SET lockKey 1 NX PX expireTime
- lockKey 表示鎖的資源,value 設置成 1。
- NX:表示只有 lockKey 不存在的時候才能 SET 成功,從而保證只有一個客戶端可以獲得鎖。
- PX expireTime設置鎖的超時時間,單位是毫秒;也可以使用 EX seconds以秒為單位設置超時時間。
至于解鎖操作,小白老師果決的使用 DEL指令刪除。一個分布式鎖方案出來了,一氣呵成,組員不明覺厲,紛紛豎起大拇指,偽代碼如下。
//加鎖成功
if(jedis.set(lockKey, 1, "NX", "EX", 10) == 1){
try {
do work //執行業務
} finally {
//釋放鎖
jedis.del(key);
}
}
然而,這是一個錯誤的分布式鎖。問題在于解鎖的操作有可能出現釋放別人的鎖的情況。
有可能出現釋放別人的鎖的情況。
- 客戶端 A 獲取鎖成功,設置超時時間 10 秒。
- 客戶端 A 執行業務邏輯,但是因為某些原因(網絡問題、FullGC、代碼垃圾性能差)執行很慢,時間超過 10 秒,鎖因為超時自動釋放了。
- 客戶端 B 加鎖成功。
- 客戶端 A 執行 DEL 釋放鎖,相當于把客戶端 B 的鎖釋放了。
原因很簡單:客戶端加鎖時,沒有設置一個唯一標識。釋放鎖的邏輯并不會檢查這把鎖的歸屬,直接刪除。
殘血版分布式鎖
小白老師:“碼哥,怎么解決釋放別人的鎖的情況呢?”
解決方法:客戶端加鎖時設置一個“唯一標識”,可以讓 value 存儲客戶端的唯一標識,比如隨機數、 UUID 等;釋放鎖時判斷鎖的唯一標識與客戶端的標識是否匹配,匹配才能刪除。
加鎖
SET lockKey randomValue NX PX 3000
解鎖
刪除鎖的時候判斷唯一標識是否匹配偽代碼如下。
if (jedis.get(lockKey).equals(randomValue)) {
jedis.del(lockKey);
}
加鎖、解鎖的偽代碼如下所示。
try (Jedis jedis = pool.getResource()) {
//加鎖成功
if(jedis.set(lockKey, randomValue, "NX", "PX", 3000) == 1){
do work //執行業務
}
} finally {
//判斷是不是當前線程加的鎖,是才釋放
if (randomValue.equals(jedis.get(keylockKey {
jedis.del(lockKey); //釋放鎖
}
}
到這里,很多公司可能都是使用這個方式來實現分布式鎖。
小白:“碼哥,還有問題。判斷鎖的唯一標識是否與當前客戶端匹配和刪除操作不是原子操作。”
聰明。這個方案還存在原子性問題,存在其他客戶端把鎖給釋放的問題。
- 客戶端 A 執行唯一標識匹配成功,還來不及執行DEL釋放鎖操作,鎖過期被釋放。
- 客戶端 B 獲取鎖成功,value 設置了自己的客戶端唯一標識。
- 客戶端 A 繼續執行 DEL刪除鎖操作,相當于把客戶端 B 的鎖給刪了。
青銅版分布式鎖
雖然叫青銅版,這也是我們最常用的分布式鎖方案之一了,這個版本沒有太大的硬傷,并且比較簡單。
小白老師:“碼哥,這如何是好,如何解決解鎖不是原子操作的問題?分布式鎖這么多門道,是我膚淺了。”
解決方案很簡單,解鎖的邏輯我們可以通過 Lua 腳本來實現判斷和刪除的過程。
- KEYS[1]是 lockKey。
- ARGV[1] 表示客戶端的唯一標識 requestId。
返回 nil 表示鎖不存在,已經被刪除了。只有返回值是 1 才表示加鎖成功。
// key 不存在,返回 null
if (redis.call('exists', KEYS[1]) == 0) then
return nil;
end;
// 獲取 KEY[1] 中的 value 與 ARGV[1] 匹配,匹配則 del,返回 1。不匹配 return 0 解鎖失敗
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1]);
else
return 0;
end;
使用上面的腳本,每個鎖都用一個隨機值作為唯一標識,當刪除鎖的客戶端的“唯一標識”與鎖的 value 匹配的時候,才能執行刪除操作。這個方案已經相對完美,我們用的最多的可能就是這個方案了。
理論知識學完了,上實戰。
Spring Boot 環境準備
接下來碼哥,給你一個基于 Spring Boot 并且能用于生產實戰的代碼。在上實戰代碼之前,先把 Spring Boot 集成 Redis 的環境搞定。
添加依賴
代碼基于 Spring Boot 2.7.18 ,使用 lettuce 客戶端來操作 Redis。添加 spring-boot-starter-data-redis依賴。
<dependencyManagement>
<dependencies>
<dependency>
<!-- Import dependency management from Spring Boot -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.7.18</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
<optional>true</optional>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!--redis依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--Jackson依賴-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
SpringBoot 配置
先配置 yaml。
server:
servlet:
context-path: /redis
port: 9011
spring:
application:
name: redis
redis:
host: 127.0.0.1
port: 6379
password: magebyte
timeout: 6000
client-type: lettuce
lettuce:
pool:
max-active: 300
max-idle: 100
max-wait: 1000ms
min-idle: 5
RedisTemplate 默認序列化方式不具備可讀性,我們改下配置,使用 JSON 序列化。注意了,這一步是附加操作,與分布式鎖沒有關系,是碼哥順帶給你的彩蛋。
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(connectionFactory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer); // key的序列化類型
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 方法過期,改為下面代碼
// objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // value的序列化類型
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
把分布式鎖接口定義出來,所謂面向接口和對象編程,代碼如有神。順帶用英文顯擺下什么叫做專業。
/**
* 分布式鎖
*/
public interface Lock {
/**
* Tries to acquire the lock with defined <code>leaseTime</code>.
* Waits up to defined <code>waitTime</code> if necessary until the lock became available.
* <p>
* Lock will be released automatically after defined <code>leaseTime</code> interval.
*
* @param waitTime the maximum time to acquire the lock
* @param leaseTime lease time
* @param unit time unit
* @return <code>true</code> if lock is successfully acquired,
* otherwise <code>false</code> if lock is already set.
* @throws InterruptedException - if the thread is interrupted
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Acquires the lock with defined <code>leaseTime</code>.
* Waits if necessary until lock became available.
* <p>
* Lock will be released automatically after defined <code>leaseTime</code> interval.
*
* @param leaseTime the maximum time to hold the lock after it's acquisition,
* if it hasn't already been released by invoking <code>unlock</code>.
* If leaseTime is -1, hold the lock until explicitly unlocked.
* @param unit the time unit
*/
void lock(long leaseTime, TimeUnit unit);
/**
* Releases the lock.
*
* <p><b>Implementation Considerations</b>
*
* <p>A {@code Lock} implementation will usually impose
* restrictions on which thread can release a lock (typically only the
* holder of the lock can release it) and may throw
* an (unchecked) exception if the restriction is violated.
* Any restrictions and the exception
* type must be documented by that {@code Lock} implementation.
*/
void unlock();
}
青銅分布式鎖實戰
DistributedLock 實現 Lock 接口,構造方法實現 resourceName 和 StringRedisTemplate 的屬性設置。客戶端唯一標識使用uuid:threadId 組成。
DistributedLock
public class DistributedLock implements Lock {
/**
* 標識 id
*/
private final String id = UUID.randomUUID().toString();
/**
* 資源名稱
*/
private final String resourceName;
private final List<String> keys = new ArrayList<>(1);
/**
* redis 客戶端
*/
private final StringRedisTemplate redisTemplate;
public DistributedLock(String resourceName, StringRedisTemplate redisTemplate) {
this.resourceName = resourceName;
this.redisTemplate = redisTemplate;
keys.add(resourceName);
}
private String getRequestId(long threadId) {
return id + ":" + threadId;
}
}
加鎖 tryLock、lock
tryLock 以阻塞等待 waitTime 時間的方式來嘗試獲取鎖。獲取成功則返回 true,反之 false。tryAcquire 方法相當于執行了 Redis 的SET resourceName uuid:threadID NX PX {leaseTime} 指令。
與 tryLock不同的是, lock 一直嘗試自旋阻塞等待獲取分布式鎖,直到獲取成功為止。而 tryLock 只會阻塞等待 waitTime 時間。
此外,為了讓程序更加健壯,碼哥實現了阻塞等待獲取分布式鎖,讓你用的更加開心,面試不慌加薪不難。如果你不需要自旋阻塞等待獲取鎖,那把 while 代碼塊刪除即可。
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 獲取鎖
Boolean isAcquire = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (Boolean.TRUE.equals(isAcquire)) {
return true;
}
time -= System.currentTimeMillis() - current;
// 等待時間用完,獲取鎖失敗
if (time <= 0) {
return false;
}
// 自旋獲取鎖
while (true) {
long currentTime = System.currentTimeMillis();
isAcquire = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (Boolean.TRUE.equals(isAcquire)) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
return false;
}
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
long threadId = Thread.currentThread().getId();
Boolean acquired;
do {
acquired = tryAcquire(leaseTime, unit, threadId);
} while (Boolean.TRUE.equals(acquired));
}
private Boolean tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return redisTemplate.opsForValue().setIfAbsent(resourceName, getRequestId(threadId), leaseTime, unit);
}
解鎖 unlock
解鎖的邏輯是通過執行 lua 腳本實現。
@Override
public void unlock() {
long threadId = Thread.currentThread().getId();
// 執行 lua 腳本
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LuaScript.unlockScript(), Long.class);
Long opStatus = redisTemplate.execute(redisScript, keys, getRequestId(threadId));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
}
}
LuaScript
其實這個腳本就是在講解青銅板分布式鎖原理的那段代碼,具體邏輯已經解釋過,這里就不再重復分析。
public class LuaScript {
private LuaScript() {
}
/**
* 分布式鎖解鎖腳本
*
* @return 當且僅當返回 `1`才表示加鎖成功.
*/
public static String unlockScript() {
return "if (redis.call('exists', KEYS[1]) == 0) then " +
"return nil;" +
"end; "+
"if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]);" +
"else" +
" return 0;" +
"end;";
}
}
RedisLockClient
最后,還需要提供一個客戶端給方便使用。
@Component
public class RedisLockClient {
@Autowired
private StringRedisTemplate redisTemplate;
public Lock getLock(String name) {
return new DistributedLock(name, redisTemplate);
}
}
單元測試來一個。
@Slf4j
@SpringBootTest(classes = RedisApplication.class)
public class RedisLockTest {
@Autowired
private RedisLockClient redisLockClient;
@Test
public void testLockSuccess() throws InterruptedException {
Lock lock = redisLockClient.getLock("order:pay");
try {
boolean isLock = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (!isLock) {
log.warn("加鎖失敗");
return;
}
TimeUnit.SECONDS.sleep(3);
log.info("業務邏輯執行完成");
} finally {
lock.unlock();
}
}
}
有兩個點需要注意。
- 釋放鎖的代碼一定要放在 finally{} 塊中。否則一旦執行業務邏輯過程中拋出異常,程序就無法執行釋放鎖的流程。只能干等著鎖超時釋放。
- 加鎖的代碼應該寫在 try {} 代碼中,放在 try 外面的話,如果執行加鎖異常(客戶端網絡連接超時),但是實際指令已經發送到服務端并執行,就會導致沒有機會執行解鎖的代碼。
小白:“碼哥,這個方案你管它叫青銅級別而已,這么說還有王者、超神版?我們公司還用錯誤版分布式鎖,難怪有時候出現重復訂單,是我膚淺了。”
趕緊將這個方案替換原來的錯誤或者殘血版的 Redis 分布式鎖吧。