基于 Golang 和 Redis 解決分布式系統下的并發問題
在分布式系統和數據庫的交互中,并發問題如同暗流般潛伏,稍有不慎就會掀起應用的驚濤駭浪。試想一下,我們正在構建一個股票交易平臺,允許不同用戶同時購買公司股票。每個公司都有一定數量的可用股票,用戶只能在剩余股票充足的情況下進行購買。
Golang 與 Redis 的解決方案:構建穩固的交易系統
為了解決這個問題,我們可以借助 Golang 和 Redis 的強大功能,構建一個安全可靠的交易系統。
數據層搭建:GoRedis 助力高效交互
首先,我們使用 goredis 客戶端庫創建一個數據層(Repository),用于與 Redis 數據庫進行交互:
type Repository struct {
client *redis.Client
}
var _ go_redis_concurrency.Repository = (*Repository)(nil)
func NewRepository(address, password string) Repository {
return Repository{
client: redis.NewClient(&redis.Options{
Addr: address,
Password: password,
}),
}
}
購買股票功能實現:并發問題初現端倪
接下來,我們實現 BuyShares 函數,模擬用戶購買股票的操作:
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
companySharesKey := BuildCompanySharesKey(companyId)
// --- (1) ----
// 獲取當前可用股票數量
currentShares, err := r.client.Get(ctx, companySharesKey).Int()
if err != nil {
fmt.Print(err.Error())
return err
}
// --- (2) ----
// 驗證剩余股票是否充足
if currentShares < numShares {
fmt.Print("error: 公司剩余股票不足\n")
return errors.New("error: 公司剩余股票不足")
}
currentShares -= numShares
// --- (3) ----
// 更新公司可用股票數量
_, err = r.client.Set(ctx, companySharesKey, currentShares, 0).Result()
return err
}
該函數包含三個步驟:
- 獲取公司當前可用股票數量。
- 驗證剩余股票是否足以滿足用戶購買需求。
- 更新公司可用股票數量。
看似邏輯清晰,但當多個用戶并發執行 BuyShares 函數時,問題就出現了。
模擬并發場景:問題暴露無遺
為了模擬并發場景,我們創建多個 Goroutine 同時執行 BuyShares 函數:
const (
total_clients = 30
)
func main() {
// --- (1) ----
// 初始化 Repository
repository := redis.NewRepository(fmt.Sprintf("%s:%d", config.Redis.Host, config.Redis.Port), config.Redis.Pass)
// --- (2) ----
// 并發執行 BuyShares 函數
companyId := "TestCompanySL"
var wg sync.WaitGroup
wg.Add(total_clients)
for idx := 1; idx <= total_clients; idx++ {
userId := fmt.Sprintf("user%d", idx)
go repository.BuyShares(context.Background(), userId, companyId, 100, &wg)
}
wg.Wait()
// --- (3) ----
// 獲取公司剩余股票數量
shares, err := repository.GetCompanyShares(context.Background(), companyId)
if err != nil {
panic(err)
}
fmt.Printf("公司 %s 剩余股票數量: %d\n", companyId, shares)
}
假設公司 TestCompanySL 初始擁有 1000 股可用股票,每個用戶購買 100 股。我們期望的結果是,只有 10 個用戶能夠成功購買股票,剩余用戶會因為股票不足而收到錯誤信息。
然而,實際運行結果卻出乎意料,公司剩余股票數量可能出現負數,這意味著多個用戶在讀取可用股票數量時,獲取到的是同一個未更新的值,導致最終結果出現偏差。
Redis 并發解決方案:精準打擊,逐個擊破
為了解決上述并發問題,Redis 提供了多種解決方案,讓我們來一一剖析。
原子操作:簡單場景下的利器
原子操作能夠在不加鎖的情況下,保證對數據的修改操作具有原子性。在 Redis 中,可以使用 INCRBY 命令對指定 key 的值進行原子遞增或遞減。
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
// ... (省略部分代碼) ...
// 使用 INCRBY 命令原子更新股票數量
_, err = r.client.IncrBy(ctx, companySharesKey, int64(-numShares)).Result()
return err
}
然而,在我們的股票交易場景中,原子操作并不能完全解決問題。因為在更新股票數量之前,還需要進行剩余股票數量的驗證。如果多個用戶同時讀取到相同的可用股票數量,即使使用原子操作更新,最終結果仍然可能出現錯誤。
事務:保證操作的原子性
Redis 事務可以將多個命令打包成一個原子操作,要么全部執行成功,要么全部回滾。通過 MULTI、EXEC、DISCARD 和 WATCH 命令,可以實現對數據的原子性操作。
- MULTI:標記事務塊的開始。
- EXEC:執行事務塊中的所有命令。
- DISCARD:取消事務塊,放棄執行所有命令。
- WATCH:監視指定的 key,如果 key 在事務執行之前被修改,則事務執行失敗。
在我們的例子中,可以使用 WATCH 命令監視公司可用股票數量的 key。如果 key 在事務執行之前被修改,則說明有其他用戶并發修改了數據,當前事務執行失敗,從而保證數據的一致性。
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
companySharesKey := BuildCompanySharesKey(companyId)
// 使用事務保證操作的原子性
tx := r.client.TxPipeline()
tx.Watch(ctx, companySharesKey)
// ... (省略部分代碼) ...
_, err = tx.Exec(ctx).Result()
return err
}
然而,在高并發場景下,使用事務可能會導致大量事務執行失敗,影響系統性能。
LUA 腳本:將邏輯移至 Redis 服務端執行
為了避免上述問題,可以借助 Redis 的 LUA 腳本功能,將業務邏輯移至 Redis 服務端執行。LUA 腳本在 Redis 中以原子方式執行,可以有效避免并發問題。
local sharesKey = KEYS[1]
local requestedShares = ARGV[1]
local currentShares = redis.call("GET", sharesKey)
if currentShares < requestedShares then
return {err = "error: 公司剩余股票不足"}
end
currentShares = currentShares - requestedShares
redis.call("SET", sharesKey, currentShares)
該 LUA 腳本實現了與 BuyShares 函數相同的邏輯,包括獲取可用股票數量、驗證剩余股票是否充足以及更新股票數量。
在 Golang 中,可以使用 goredis 庫執行 LUA 腳本:
var BuyShares = redis.NewScript(`
local sharesKey = KEYS[1]
local requestedShares = ARGV[1]
local currentShares = redis.call("GET", sharesKey)
if currentShares < requestedShares then
return {err = "error: 公司剩余股票不足"}
end
currentShares = currentShares - requestedShares
redis.call("SET", sharesKey, currentShares)
`)
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
keys := []string{BuildCompanySharesKey(companyId)}
err := BuyShares.Run(ctx, r.client, keys, numShares).Err()
if err != nil {
fmt.Println(err.Error())
}
return err
}
使用 LUA 腳本可以有效解決并發問題,并且性能優于事務機制。
分布式鎖:靈活控制并發訪問
除了 LUA 腳本,還可以使用分布式鎖來控制對共享資源的并發訪問。Redis 提供了 SETNX 命令,可以實現簡單的分布式鎖機制。
在 Golang 中,可以使用 redigo 庫的 Lock 函數獲取分布式鎖:
func (r *Repository) BuyShares(ctx context.Context, userId, companyId string, numShares int, wg *sync.WaitGroup) error {
defer wg.Done()
companySharesKey := BuildCompanySharesKey(companyId)
// 獲取分布式鎖
lockKey := "lock:" + companySharesKey
lock, err := r.client.Lock(ctx, lockKey, redislock.Options{
RetryStrategy: redislock.ExponentialBackoff{
InitialDuration: time.Millisecond * 100,
MaxDuration: time.Second * 3,
},
})
if err != nil {
return fmt.Errorf("獲取分布式鎖失敗: %w", err)
}
defer lock.Unlock(ctx)
// ... (省略部分代碼) ...
return nil
}
使用分布式鎖可以靈活控制并發訪問,但需要謹慎處理鎖的釋放和超時問題,避免出現死鎖情況。
總結
Redis 提供了多種解決并發問題的方案,包括原子操作、事務、LUA 腳本和分布式鎖等。在實際應用中,需要根據具體場景選擇合適的方案。
- 原子操作適用于簡單場景,例如計數器等。
- 事務可以保證多個操作的原子性,但性能較低。
- LUA 腳本可以將業務邏輯移至 Redis 服務端執行,性能較高,但需要熟悉 LUA 語法。
- 分布式鎖可以靈活控制并發訪問,但需要謹慎處理鎖的釋放和超時問題。
希望本文能夠幫助你更好地理解和解決 Redis 并發問題,構建更加穩定可靠的分布式系統。