阿里二面:Redis分布式鎖過期了但業務還沒有執行完,怎么辦
面試官:你們系統是怎么實現分布式鎖的?
我:我們使用了redis的分布式鎖。具體做法是后端接收到請求后加入一個分布式鎖,如果加鎖成功,就執行業務,如果加鎖失敗就等待鎖或者拒絕請求。業務執行完成后釋放鎖。
面試官:能說一下具體使用的命令嗎?
我:我們使用的是SETNX命令,具體如下:
- SETNX KEY_NAME VALUE
設置成功返回1,設置失敗返回0。如下圖,客戶端1加鎖成功,客戶端2獲取鎖失?。?/p>
面試官:這樣設置會不會有問題呢?如果加鎖成功的客戶端掛了怎么辦?
我:比如上圖中的客戶端1掛了,這個鎖就不能釋放了。可以設置一個過期時間,命令如下:
- SET key value [EX seconds] [PX milliseconds] NX
面試官:設置了過期時間,如果業務還沒有執行完成,但是redis鎖過期了,怎么辦?
我:需要對鎖進行續約。
面試官:能說一下具體怎么操作嗎?
我:設置鎖成功后,啟動一個watchdog,每隔一段時間(比如10s)為當前分布式鎖續約,也就是每隔10s重新設置當前key的超時時間。命令如下:
- EXPIRE <key> <seconds>
整個流程如下:
面試官:watchdog怎么實現呢?
我:當客戶端加鎖成功后,可以啟動一個定時任務,每隔10s(最好支持配置)來檢測業務是否處理完成,檢測的依據就是判斷分布式鎖的key是否還存在,如果存在,就進行續約。
面試官:如果當前線程已經處理完,這個key是被其他客戶端寫入的呢?
我:可以為每個客戶端指定一個clientID,在VALUE中增加一個clientID的前綴,這樣在續鎖的時候,可以判斷當前分布式鎖的value前綴來確定是不是當前客戶端的,如果是再續鎖,否則不做處理。
面試官:你們的續鎖功能是自己實現的嗎?
我:我們用的redisson的分布式鎖方案,使用redisson獲取分布式鎖非常簡單,代碼如下:
- RLock lock = redisson.getLock("client-lock");
- lock.lock();
- try {
- //處理業務
- } catch (Exception e) {
- //處理異常
- } finally {
- lock.unlock();
- }
具體原理是:如果客戶端1加鎖成功,這個分布式鎖超時時間默認是30秒(可以通過Config.lockWatchdogTimeout來修改)。加鎖成功后,就會啟動一個watchdog,watchdog是一個后臺線程,會每隔10秒檢查一下客戶端1是否還持有鎖key,如果是,就延長鎖key的生存時間,延長操作就是再次把鎖key的超時時間設置成30s。
面試官:redisson里的定時器怎么實現的?
我:redisson定時器使用的是netty-common包中的HashedWheelTime來實現的。
面試官:如果client1宕機了,這時分布式鎖還可以續期嗎?
我:因為分布式鎖的續期是在客戶端執行的,所以如果client1宕機了,續期線程就不能工作了,也就不能續期了。這時應該把分布式鎖刪除,讓其他客戶端來獲取。
面試官:那如果client1宕機了,其他客戶端需要等待30s才能有機會獲取到鎖,有辦法立刻刪除鎖嗎?
我:因為client1宕機了,只能等到超時時間后鎖被自動刪除。如果要立刻刪除,需要增加額外的工作,比如增加哨兵機制,讓哨兵來維護所有redis客戶端的列表。哨兵定時監控客戶端是否宕機,如果檢測到宕機,立刻刪除這個客戶端的鎖。如下圖:
這里的哨兵并不是redis的哨兵,而且為了檢測客戶端故障業務系統自己做的哨兵。
面試官:如果不用redisson,怎么實現分布式鎖續鎖呢?比如springboot2.0默認使用redis客戶端是Lettuce。
我:Lettuce并沒有提供像redisson這樣的watchdog機制,所以續鎖需要業務系統自己實現??梢苑譃橐韵聨撞絹韺崿F:
1.加鎖的命令,我們參照spring包里的分布式鎖代碼,如果鎖存在并且是當前客戶端加的鎖,那就續鎖,如果鎖不存在,則加鎖。代碼如下:
- private static final String OBTAIN_LOCK_SCRIPT =
- "local lockClientId = redis.call('GET', KEYS[1])\n" +
- "if lockClientId == ARGV[1] then\n" +
- " redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
- " return true\n" +
- "elseif not lockClientId then\n" +
- " redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
- " return true\n" +
- "end\n" +
- "return false";
2.把鎖保存在一個數據結構里,比如HashMap,定時任務定時掃描這個map,對每個鎖進行續鎖操作。代碼如下:
- private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
3.續鎖命令
- private static final String RENEW_LOCK_SCRIPT =
- "local lockClientId = redis.call('GET', KEYS[1])\n" +
- "if lockClientId == ARGV[1] then\n" +
- " redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
- " return true\n" +
- "end\n" +
- "return false";
4.如果鎖是當前客戶端加的,那就續鎖,否則失敗。
寫一個定時任務,定時執行續鎖代碼:
- redisTemplate.execute(renewLockScript,
- Collections.singletonList(lockKey), clientId,
- String.valueOf(expireAfter));
面試官:這個問題就聊到這里,咱們下一個問題...