手動擼一個 Redis 分布式鎖
大家好呀,我是樓仔。
今天第一天開工,收拾心情,又要開始好好學習,好好工作了。
對于使用 Java 的小伙伴,其實我們完全不用手動擼一個分布式鎖,直接使用 Redisson 就行。
但是因為這些封裝好的組建,讓我們越來越懶。
我們使用一些封裝好的開源組建時,可以了解其中的原理,或者自己動手寫一個,可以更好提升你的技術(shù)水平。
今天我就教大家用原生的 Redis,手動擼一個 Redis 分布式鎖,很有意思。
01 問題引入
其實通過 Redis 實現(xiàn)分布式鎖,經(jīng)常會有面試官會問,很多同學都知道用 SetNx() 去獲取鎖,解決并發(fā)問題。
SetNx() 是什么?我簡單解答一下。
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在時,為 key 設(shè)置指定的值。
對于下面 2 種問題,你知道如何解決么?
- 如果獲取鎖的機器掛掉,如何處理?
- 當鎖超時時,A、B 兩個線程同時獲取鎖,可能導致鎖被同時獲取,如何解決?
這個就是我們實現(xiàn) Redis 分布式鎖時,需要重點解決的 2 個問題。
02 理論知識
剛才說過,通過 SetNx() 去獲取鎖,可以解決并發(fā)問題。
當獲取到鎖,處理完業(yè)務(wù)邏輯后,會將鎖釋放。
圖片
但當機器宕機,或者重啟時,沒有執(zhí)行 Del() 刪除鎖操作,會導致鎖一直沒有釋放。
所以,我們還需要記錄鎖的超時時間,判斷鎖是否超時。
圖片
這里我們通過 GetKey() 獲取鎖的超時時間 A,通過和當前時間比較,判斷鎖是否超時。
如果鎖未超時,直接返回,如果鎖超時,重新設(shè)置鎖的超時時間,成功獲取鎖。
還有其它問題么?當然!
因為在并發(fā)場景下,會存在 A、B 兩個線程同時執(zhí)行 SetNx(),導致兩個線程同時獲取到鎖。
那如何解決呢?將 SetNx() 用 GetSet() 替換。
圖片
GetSet() 是什么?我簡單解答一下。
Redis Getset 命令用于設(shè)置指定 key 的值,并返回 key 的舊值。
這里不太好理解,我舉個例子。
假如 A、B 兩個線程,A 先執(zhí)行,B 后執(zhí)行:
- 對于線程 A 和 B,通過 GetKey 獲取的超時時間都是 T1 = 100;
- 對于線程 A,將超時時間 Ta = 200 通過 GetSet() 設(shè)置,返回 T2 = 100,此時滿足條件 “T1 == T2”,獲取鎖成功;
- 對于線程 B,將超時時間 Tb = 201 通過 GetSet() 設(shè)置,由于鎖超時時間已經(jīng)被 A 重新設(shè)置,所以返回 T2 = 200,此時不滿足條件 “T1 == T2”,獲取鎖失敗。
可能有同學會繼續(xù)問,之前設(shè)置的超時是 Ta = 200,現(xiàn)在變成了 Tb = 201,延長或縮短了鎖的超時時間,不會有問題么?
其實在現(xiàn)實并發(fā)場景中,能走到這一步,基本是“同時”進來的,兩者的時間差非常小,可以忽略此影響。
03 代碼實戰(zhàn)
這里給出 Go 代碼,注釋都寫得非常詳細,即使你不會 Go,讀注釋也能讀懂。
// 獲取分布式鎖,需要考慮以下情況:
// 1. 機器A獲取到鎖,但是在未釋放鎖之前,機器掛掉或者重啟,會導致其它機器全部hang住,這時需要根據(jù)鎖的超時時間,判斷該鎖是否需要重置;
// 2. 當鎖超時時,需要考慮兩臺機器同時去獲取該鎖,需要通過GETSET方法,讓先執(zhí)行該方法的機器獲取鎖,另外一臺繼續(xù)等待。
func GetDistributeLock(key string, expireTime int64) bool {
currentTime := time.Now().Unix()
expires := currentTime + expireTime
redisAlias := "jointly"
// 1.獲取鎖,并將value值設(shè)置為鎖的超時時間
redisRet, err := redis.SetNx(redisAlias, key, expires)
if nil == err && utils.MustInt64(1) == redisRet {
// 成功獲取到鎖
return true
}
// 2.當獲取到鎖的機器突然重啟&掛掉時,就需要判斷鎖的超時時間,如果鎖超時,新的機器可以重新獲取鎖
// 2.1 獲取鎖的超時時間
currentLockTime, err := redis.GetKey(redisAlias, key)
if err != nil {
return false
}
// 2.2 當"鎖的超時時間"大于等于"當前時間",證明鎖未超時,直接返回
if utils.MustInt64(currentLockTime) >= currentTime {
return false
}
// 2.3 將最新的超時時間,更新到鎖的value值,并返回舊的鎖的超時時間
oldLockTime, err := redis.GetSet(redisAlias, key, expires)
if err != nil {
return false
}
// 2.4 當鎖的兩個"舊的超時時間"相等時,證明之前沒有其它機器進行GetSet操作,成功獲取鎖
// 說明:這里存在并發(fā)情況,如果有A和B同時競爭,A會先GetSet,當B再去GetSet時,oldLockTime就等于A設(shè)置的超時時間
if utils.MustString(oldLockTime) == currentLockTime {
return true
}
return false
}
刪除鎖邏輯:
// 刪除分布式鎖
// @return bool true-刪除成功;false-刪除失敗
func DelDistributeLock(key string) bool {
redisAlias := "jointly"
redisRet := redis.Del(redisAlias, key)
if redisRet != nil {
return false
}
return true
}
業(yè)務(wù)邏輯:
func DoProcess(processId int) {
fmt.Printf("啟動第%d個線程\n", processId)
redisKey := "redis_lock_key"
for {
// 獲取分布式鎖
isGetLock := GetDistributeLock(redisKey, 10)
if isGetLock {
fmt.Printf("Get Redis Key Success, id:%d\n", processId)
time.Sleep(time.Second * 3)
// 刪除分布式鎖
DelDistributeLock(redisKey)
} else {
// 如果未獲取到該鎖,為了避免redis負載過高,先睡一會
time.Sleep(time.Second * 1)
}
}
}
最后起個 10 個多線程,去執(zhí)行這個 DoProcess():
func main() {
// 初始化資源
var group string = "group"
var name string = "name"
var host string
// 初始化資源
host = "http://ip:port"
_, err := xrpc.NewXRpcDefault(group, name, host)
if err != nil {
panic(fmt.Sprintf("initRpc when init rpc failed, err:%v", err))
}
redis.SetRedis("louzai", "redis_louzai")
// 開啟10個線程,去搶Redis分布式鎖
for i := 0; i <= 9; i ++ {
go DoProcess(i)
}
// 避免子線程退出,主線程睡一會
time.Sleep(time.Second * 100)
return
}
程序跑了100 s,我們可以看到,每次都只有 1 個線程獲取到鎖,分別是 2、1、5、9、3,執(zhí)行結(jié)果如下:
啟動第0個線程
啟動第6個線程
啟動第9個線程
啟動第4個線程
啟動第5個線程
啟動第2個線程
啟動第1個線程
啟動第8個線程
啟動第7個線程
啟動第3個線程
Get Redis Key Success, id:2
Get Redis Key Success, id:2
Get Redis Key Success, id:1
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
04 后記
這個代碼,其實是我很久之前寫的,因為當時 Go 沒有開源的分布式鎖,但是我又需要通過單機去執(zhí)行某個任務(wù),所以就自己手動擼了一個,后來在線上跑了 2 年,一直都沒有問題。
不過期間也遇到過一個坑,就是我們服務(wù)遷移時,忘了將舊機器的分布式鎖停掉,導致鎖經(jīng)常被舊機器搶占,當時覺得很奇怪,我的鎖呢?
寫這篇文章時,又讓我想到當時工作的場景。
最后再切回正題,本文由淺入深,詳細講解了 Redis 實現(xiàn)的詳細過程,以及鎖超時、并發(fā)場景下,如何保證鎖能正常釋放,且只有一個線程去獲取鎖。