成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

糾正誤區:這才是 SpringBoot Redis 分布式鎖的正確實現方式

數據庫 Redis
碼哥今天分享一個正確 Redis 分布式鎖代碼實戰,讓你一飛沖天,該代碼可直接用于生產,不是簡單的 demo。

我是碼哥,可以叫我靚仔。

在說分布式鎖之前,我們先說下為什么需要分布式鎖。

在單機部署的時候,我們可以使用 Java 中提供的 JUC 鎖機制避免多線程同時操作一個共享變量產生的安全問題。JUC 鎖機制只能保證同一個 JVM 進程中的同一時刻只有一個線程操作共享資源。

一個應用部署多個節點,多個進程如果要修改同一個共享資源,為了避免操作亂序導致的并發安全問題,這個時候就需要引入分布式鎖,分布式鎖就是用來控制同一時刻,只有一個 JVM 進程中的一個線程可以訪問被保護的資源。

分布式鎖很重要,然而很多公司的系統可能還在跑著有缺陷的分布式鎖方案,其中不乏一些大型公司。

所以,碼哥今天分享一個正確 Redis 分布式鎖代碼實戰,讓你一飛沖天,該代碼可直接用于生產,不是簡單的 demo。

溫馨提示:如果你只想看代碼實戰部分,可直接翻到 SpringBoot 實戰章節。

錯誤的分布式鎖

說正確方案之前,先來一個錯誤的,知道錯在哪,才能意識到如何寫正確。

在銀行工作的小白老師,使用 Redis SET 指令實現加鎖, 指令滿足了當 key 不存在則設置 value,同時設置超時時間,并且滿足原子語意。

SET lockKey 1 NX PX expireTime
  • lockKey 表示鎖的資源,value 設置成 1。
  • NX:表示只有 lockKey 不存在的時候才能 SET 成功,從而保證只有一個客戶端可以獲得鎖。
  • PX expireTime設置鎖的超時時間,單位是毫秒;也可以使用 EX seconds以秒為單位設置超時時間。

至于解鎖操作,小白老師果決的使用 DEL指令刪除。一個分布式鎖方案出來了,一氣呵成,組員不明覺厲,紛紛豎起大拇指,偽代碼如下。

//加鎖成功
if(jedis.set(lockKey, 1, "NX", "EX", 10) == 1){
  try {
      do work //執行業務

  } finally {
    //釋放鎖
     jedis.del(key);
  }
}

然而,這是一個錯誤的分布式鎖。問題在于解鎖的操作有可能出現釋放別人的鎖的情況。

有可能出現釋放別人的鎖的情況。

  • 客戶端 A 獲取鎖成功,設置超時時間 10 秒。
  • 客戶端 A 執行業務邏輯,但是因為某些原因(網絡問題、FullGC、代碼垃圾性能差)執行很慢,時間超過 10 秒,鎖因為超時自動釋放了。
  • 客戶端 B 加鎖成功。
  • 客戶端 A 執行 DEL 釋放鎖,相當于把客戶端 B 的鎖釋放了。

原因很簡單:客戶端加鎖時,沒有設置一個唯一標識。釋放鎖的邏輯并不會檢查這把鎖的歸屬,直接刪除。

殘血版分布式鎖

小白老師:“碼哥,怎么解決釋放別人的鎖的情況呢?”

解決方法:客戶端加鎖時設置一個“唯一標識”,可以讓 value 存儲客戶端的唯一標識,比如隨機數、 UUID 等;釋放鎖時判斷鎖的唯一標識與客戶端的標識是否匹配,匹配才能刪除。

加鎖

SET lockKey randomValue NX PX 3000

解鎖

刪除鎖的時候判斷唯一標識是否匹配偽代碼如下。

if (jedis.get(lockKey).equals(randomValue)) {
    jedis.del(lockKey);
}

加鎖、解鎖的偽代碼如下所示。

try (Jedis jedis = pool.getResource()) {
  //加鎖成功
  if(jedis.set(lockKey, randomValue, "NX", "PX", 3000) == 1){
    do work //執行業務
  }
} finally {
  //判斷是不是當前線程加的鎖,是才釋放
  if (randomValue.equals(jedis.get(keylockKey {
     jedis.del(lockKey); //釋放鎖
  }
}

到這里,很多公司可能都是使用這個方式來實現分布式鎖。

小白:“碼哥,還有問題。判斷鎖的唯一標識是否與當前客戶端匹配和刪除操作不是原子操作。”

聰明。這個方案還存在原子性問題,存在其他客戶端把鎖給釋放的問題。

  • 客戶端 A 執行唯一標識匹配成功,還來不及執行DEL釋放鎖操作,鎖過期被釋放。
  • 客戶端 B 獲取鎖成功,value 設置了自己的客戶端唯一標識。
  • 客戶端 A 繼續執行 DEL刪除鎖操作,相當于把客戶端 B 的鎖給刪了。

青銅版分布式鎖

雖然叫青銅版,這也是我們最常用的分布式鎖方案之一了,這個版本沒有太大的硬傷,并且比較簡單。

小白老師:“碼哥,這如何是好,如何解決解鎖不是原子操作的問題?分布式鎖這么多門道,是我膚淺了。”

解決方案很簡單,解鎖的邏輯我們可以通過 Lua 腳本來實現判斷和刪除的過程。

  • KEYS[1]是 lockKey。
  • ARGV[1] 表示客戶端的唯一標識 requestId。

返回 nil 表示鎖不存在,已經被刪除了。只有返回值是 1 才表示加鎖成功。

// key 不存在,返回 null
if (redis.call('exists', KEYS[1]) == 0) then
   return nil;
end;
// 獲取 KEY[1] 中的 value 與 ARGV[1] 匹配,匹配則 del,返回 1。不匹配 return 0 解鎖失敗
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1]);
else
    return 0;
end;

使用上面的腳本,每個鎖都用一個隨機值作為唯一標識,當刪除鎖的客戶端的“唯一標識”與鎖的 value 匹配的時候,才能執行刪除操作。這個方案已經相對完美,我們用的最多的可能就是這個方案了。

理論知識學完了,上實戰。

Spring Boot 環境準備

接下來碼哥,給你一個基于 Spring Boot 并且能用于生產實戰的代碼。在上實戰代碼之前,先把 Spring Boot 集成 Redis 的環境搞定。

添加依賴

代碼基于 Spring Boot 2.7.18 ,使用 lettuce 客戶端來操作 Redis。添加 spring-boot-starter-data-redis依賴。

<dependencyManagement>
    <dependencies>
        <dependency>
            <!-- Import dependency management from Spring Boot -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.7.18</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
            <optional>true</optional>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <!--redis依賴-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <!--Jackson依賴-->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

SpringBoot 配置

先配置 yaml。

server:
  servlet:
    context-path: /redis
  port: 9011
spring:
  application:
    name: redis
  redis:
    host: 127.0.0.1
    port: 6379
    password: magebyte
    timeout: 6000
    client-type: lettuce
    lettuce:
      pool:
        max-active: 300
        max-idle: 100
        max-wait: 1000ms
        min-idle: 5

RedisTemplate 默認序列化方式不具備可讀性,我們改下配置,使用 JSON 序列化。注意了,這一步是附加操作,與分布式鎖沒有關系,是碼哥順帶給你的彩蛋。

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(connectionFactory);

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        redisTemplate.setKeySerializer(stringRedisSerializer); // key的序列化類型

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 方法過期,改為下面代碼
//        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,
                ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // value的序列化類型
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

把分布式鎖接口定義出來,所謂面向接口和對象編程,代碼如有神。順帶用英文顯擺下什么叫做專業。

/**
 * 分布式鎖
 */
public interface Lock {

    /**
     * Tries to acquire the lock with defined <code>leaseTime</code>.
     * Waits up to defined <code>waitTime</code> if necessary until the lock became available.
     * <p>
     * Lock will be released automatically after defined <code>leaseTime</code> interval.
     *
     * @param waitTime  the maximum time to acquire the lock
     * @param leaseTime lease time
     * @param unit      time unit
     * @return <code>true</code> if lock is successfully acquired,
     * otherwise <code>false</code> if lock is already set.
     * @throws InterruptedException - if the thread is interrupted
     */
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

    /**
     * Acquires the lock with defined <code>leaseTime</code>.
     * Waits if necessary until lock became available.
     * <p>
     * Lock will be released automatically after defined <code>leaseTime</code> interval.
     *
     * @param leaseTime the maximum time to hold the lock after it's acquisition,
     *                  if it hasn't already been released by invoking <code>unlock</code>.
     *                  If leaseTime is -1, hold the lock until explicitly unlocked.
     * @param unit      the time unit
     */
    void lock(long leaseTime, TimeUnit unit);

    /**
     * Releases the lock.
     *
     * <p><b>Implementation Considerations</b>
     *
     * <p>A {@code Lock} implementation will usually impose
     * restrictions on which thread can release a lock (typically only the
     * holder of the lock can release it) and may throw
     * an (unchecked) exception if the restriction is violated.
     * Any restrictions and the exception
     * type must be documented by that {@code Lock} implementation.
     */
    void unlock();
}

青銅分布式鎖實戰

DistributedLock 實現 Lock 接口,構造方法實現 resourceName 和 StringRedisTemplate 的屬性設置。客戶端唯一標識使用uuid:threadId 組成。

DistributedLock

public class DistributedLock implements Lock {

    /**
     * 標識 id
     */
    private final String id = UUID.randomUUID().toString();

    /**
     * 資源名稱
     */
    private final String resourceName;

    private final List<String> keys = new ArrayList<>(1);


    /**
     * redis 客戶端
     */
    private final StringRedisTemplate redisTemplate;

    public DistributedLock(String resourceName, StringRedisTemplate redisTemplate) {
        this.resourceName = resourceName;
        this.redisTemplate = redisTemplate;
        keys.add(resourceName);
    }

    private String getRequestId(long threadId) {
        return id + ":" + threadId;
    }
}

加鎖 tryLock、lock

tryLock 以阻塞等待 waitTime 時間的方式來嘗試獲取鎖。獲取成功則返回 true,反之 false。tryAcquire 方法相當于執行了 Redis 的SET resourceName uuid:threadID NX PX {leaseTime} 指令。

與 tryLock不同的是, lock 一直嘗試自旋阻塞等待獲取分布式鎖,直到獲取成功為止。而 tryLock 只會阻塞等待 waitTime 時間。

此外,為了讓程序更加健壯,碼哥實現了阻塞等待獲取分布式鎖,讓你用的更加開心,面試不慌加薪不難。如果你不需要自旋阻塞等待獲取鎖,那把 while 代碼塊刪除即可。

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 獲取鎖
    Boolean isAcquire = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (Boolean.TRUE.equals(isAcquire)) {
        return true;
    }

    time -= System.currentTimeMillis() - current;
    // 等待時間用完,獲取鎖失敗
    if (time <= 0) {
        return false;
    }
    // 自旋獲取鎖
    while (true) {
        long currentTime = System.currentTimeMillis();
        isAcquire = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (Boolean.TRUE.equals(isAcquire)) {
            return true;
        }

        time -= System.currentTimeMillis() - currentTime;
        if (time <= 0) {
            return false;
        }
    }
}

@Override
public void lock(long leaseTime, TimeUnit unit) {
    long threadId = Thread.currentThread().getId();
    Boolean acquired;
    do {
        acquired = tryAcquire(leaseTime, unit, threadId);
    } while (Boolean.TRUE.equals(acquired));
}

private Boolean tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return redisTemplate.opsForValue().setIfAbsent(resourceName, getRequestId(threadId), leaseTime, unit);
}

解鎖 unlock

解鎖的邏輯是通過執行 lua 腳本實現。

@Override
public void unlock() {
    long threadId = Thread.currentThread().getId();

    // 執行 lua 腳本
    DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LuaScript.unlockScript(), Long.class);
    Long opStatus = redisTemplate.execute(redisScript, keys, getRequestId(threadId));

    if (opStatus == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + threadId);
    }


}

LuaScript

其實這個腳本就是在講解青銅板分布式鎖原理的那段代碼,具體邏輯已經解釋過,這里就不再重復分析。

public class LuaScript {

    private LuaScript() {

    }

    /**
     * 分布式鎖解鎖腳本
     *
     * @return 當且僅當返回 `1`才表示加鎖成功.
     */
    public static String unlockScript() {
        return "if (redis.call('exists', KEYS[1]) == 0) then " +
                "return nil;" +
                "end; "+
                "if redis.call('get',KEYS[1]) == ARGV[1] then" +
                "   return redis.call('del',KEYS[1]);" +
                "else" +
                "   return 0;" +
                "end;";
    }
}

RedisLockClient

最后,還需要提供一個客戶端給方便使用。

@Component
public class RedisLockClient {

    @Autowired
    private StringRedisTemplate redisTemplate;


    public Lock getLock(String name) {
        return new DistributedLock(name, redisTemplate);
    }

}

單元測試來一個。

@Slf4j
@SpringBootTest(classes = RedisApplication.class)
public class RedisLockTest {

    @Autowired
    private RedisLockClient redisLockClient;


    @Test
    public void testLockSuccess() throws InterruptedException {
        Lock lock = redisLockClient.getLock("order:pay");
        try {
            boolean isLock = lock.tryLock(10, 30, TimeUnit.SECONDS);
            if (!isLock) {
                log.warn("加鎖失敗");
                return;
            }
            TimeUnit.SECONDS.sleep(3);
            log.info("業務邏輯執行完成");
        } finally {
            lock.unlock();
        }

    }

}

有兩個點需要注意。

  • 釋放鎖的代碼一定要放在 finally{} 塊中。否則一旦執行業務邏輯過程中拋出異常,程序就無法執行釋放鎖的流程。只能干等著鎖超時釋放。
  • 加鎖的代碼應該寫在 try {} 代碼中,放在 try 外面的話,如果執行加鎖異常(客戶端網絡連接超時),但是實際指令已經發送到服務端并執行,就會導致沒有機會執行解鎖的代碼。

小白:“碼哥,這個方案你管它叫青銅級別而已,這么說還有王者、超神版?我們公司還用錯誤版分布式鎖,難怪有時候出現重復訂單,是我膚淺了。”

趕緊將這個方案替換原來的錯誤或者殘血版的 Redis 分布式鎖吧。

責任編輯:姜華 來源: 碼哥字節
相關推薦

2021-11-29 00:18:30

Redis分布式

2018-04-03 16:24:34

分布式方式

2017-01-16 14:13:37

分布式數據庫

2023-01-13 07:39:07

2022-01-06 10:58:07

Redis數據分布式鎖

2023-08-21 19:10:34

Redis分布式

2019-06-19 15:40:06

分布式鎖RedisJava

2021-01-12 14:56:40

Redis分布式鎖工具

2019-02-26 09:51:52

分布式鎖RedisZookeeper

2021-09-17 07:51:24

RedissonRedis分布式

2023-03-01 08:07:51

2024-10-07 10:07:31

2024-04-01 05:10:00

Redis數據庫分布式鎖

2021-11-25 07:43:56

CIOIT董事會

2023-10-11 09:37:54

Redis分布式系統

2024-11-28 15:11:28

2020-07-30 09:35:09

Redis分布式鎖數據庫

2020-07-15 16:50:57

Spring BootRedisJava

2019-12-25 14:35:33

分布式架構系統

2020-02-11 16:10:44

Redis分布式鎖Java
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: www.99久久.com | 精品美女久久久久久免费 | 国产99热在线 | 国产91av视频 | 国产在线中文字幕 | 成人免费视频7777777 | 黄色a视频 | 韩国毛片视频 | 久草免费在线视频 | 日韩不卡在线 | 91在线视频国产 | 毛片免费观看 | 国产高清精品一区二区三区 | 天天操天天插天天干 | 国产精品久久久久久久久免费桃花 | 国产精品久久国产精品 | 美女天天操| 超碰日韩| 天堂中文在线播放 | 欧美一区二区免费视频 | 涩涩视频在线观看免费 | 国产一区久久久 | 日本精品一区二区三区在线观看视频 | 亚洲欧美综合 | 国产日韩精品一区二区三区 | 欧美一级免费看 | 在线观看日本网站 | 黑人巨大精品 | 福利一区二区在线 | 久久精品国产久精国产 | 日韩超碰在线 | 色一级| 天堂中文在线观看 | 欧美一区二区久久 | 亚洲成人精品国产 | 噜噜噜色网 | 国产欧美一区二区三区久久手机版 | 欧美在线观看免费观看视频 | 免费黄色录像片 | 91社区在线观看播放 | 日韩av在线一区二区 |