千萬別這么用 Redis 和 @Transactional!我踩過的坑能繞地球三圈
兄弟們,咱今天來嘮嘮 Redis 和 @Transactional 這倆哥們兒,要是搭伙兒沒搭好,能讓你在代碼里踩坑到懷疑人生,我當年踩過的坑連起來都能繞地球三圈了。咱先說好,今兒這文章不賣關子,直接上硬貨,從坑怎么來的,到怎么填坑,全給你整明白咯。
一、那些年,我在 Redis 和事務里踩的第一個大坑
故事得從三年前說起,那時候咱剛接手一個電商項目,負責用戶積分系統。需求是用戶下單成功后,扣減賬戶積分,同時記錄積分變更日志。咱心想,這事兒簡單啊,Redis 存用戶實時積分,數據庫存積分變更記錄,再用 @Transactional 保證數據庫操作的原子性,完美!
于是,咱寫了這么段代碼:
@Transactional(rollbackFor = Exception.class)
public void deduct積分(Long userId, Integer deductScore) {
// 先扣 Redis 里的積分
redisTemplate.opsForValue().increment(userId.toString(), -deductScore);
// 再往數據庫里插積分變更記錄
IntegralLog integralLog = new IntegralLog();
integralLog.setUserId(userId);
integralLog.setDeductScore(deductScore);
integralLog.setCreateTime(new Date());
integralLogMapper.insert(integralLog);
// 模擬一個可能出現的異常
if (deductScore > 1000) {
throw new RuntimeException("積分扣減超過限制");
}
}
代碼寫完,自測的時候沒問題,下單扣積分,記錄日志,一切正常。可上線沒兩天,運營小姐姐就來找咱了,說有用戶反饋積分扣了,但訂單沒成功,而且積分也沒恢復。咱趕緊查日志,發現確實有數據庫操作拋異常回滾了,但 Redis 里的積分沒回來。
這咋回事呢?咱一拍腦袋,恍然大悟:Redis 的操作根本不在數據庫事務里啊!@Transactional 管的是數據庫的事兒,對 Redis 那是鞭長莫及。咱先操作了 Redis,再操作數據庫,要是數據庫操作失敗回滾了,Redis 里的數據可不會跟著回滾,這不就出現數據不一致了嘛。就好比你先把兜里的錢給別人了,然后發現別人沒給你貨,想把錢要回來,可人家已經把錢揣兜里跑了,你說糟心不糟心。
二、以為把 Redis 操作放事務里就萬事大吉?Too Young Too Simple!
吃一塹長一智,咱知道不能先操作 Redis 了,那咱把 Redis 的操作放到數據庫事務里面總行了吧?咱又改了改代碼,這次先操作數據庫,再操作 Redis,而且都放在 @Transactional 注解的方法里:
@Transactional(rollbackFor = Exception.class)
public void deduct積分(Long userId, Integer deductScore) {
// 先往數據庫里插積分變更記錄
IntegralLog integralLog = new IntegralLog();
integralLog.setUserId(userId);
integralLog.setDeductScore(deductScore);
integralLog.setCreateTime(new Date());
integralLogMapper.insert(integralLog);
// 再扣 Redis 里的積分
redisTemplate.opsForValue().increment(userId.toString(), -deductScore);
// 模擬一個可能出現的異常
if (deductScore > 1000) {
throw new RuntimeException("積分扣減超過限制");
}
}
這次咱想,數據庫操作和 Redis 操作都在事務里,要是拋異常了,數據庫回滾,Redis 也能回滾吧?結果一測試,傻眼了:數據庫操作回滾了,可 Redis 里的積分還是被扣了。這是為啥呢?咱去查了查 Redis 的文檔和 Spring 事務的原理,才知道 Redis 的操作本身不是事務性的,Spring 的 @Transactional 只能管理數據庫事務,對于 Redis 這種外部資源的操作,它管不了。也就是說,雖然 Redis 操作寫在了事務方法里,但它不會隨著數據庫事務的回滾而回滾。就好比你帶著一個不聽話的小弟去辦事,你說咱們得一起行動,要是出事兒了就一起撤,結果這小弟自己跑了,根本不管你。
那有人可能會問了,能不能讓 Redis 支持事務,然后和數據庫事務一起提交或回滾呢?理論上是可以的,但實際操作起來可麻煩了。Redis 本身的事務和數據庫的事務不一樣,它的事務只是把多個命令打包執行,不支持回滾(除非在命令入隊時出錯),而且和數據庫事務的協調需要復雜的分布式事務解決方案,比如兩階段提交(2PC),這會增加系統的復雜度和性能開銷,一般不太建議這么做。
三、當 Redis 遇見可重復讀事務,又一個坑在等著你
咱再來說說另一個場景,在一個事務里多次讀取 Redis 的數據,而且數據庫事務的隔離級別是可重復讀。比如咱要根據用戶的積分來判斷是否能參加某個活動,在事務里先讀取 Redis 里的積分,然后進行一些業務處理,最后再讀取一次積分,看看有沒有變化。
@Transactional(isolation = Isolation.REPEATABLE_READ, rollbackFor = Exception.class)
public void checkAndDeduct積分(Long userId, Integer deductScore) {
// 第一次讀取 Redis 積分
Integer currentScore = Integer.parseInt(redisTemplate.opsForValue().get(userId.toString()));
// 業務處理,比如判斷積分是否足夠
if (currentScore < deductScore) {
throw new RuntimeException("積分不足");
}
// 模擬耗時的業務處理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 第二次讀取 Redis 積分
Integer updatedScore = Integer.parseInt(redisTemplate.opsForValue().get(userId.toString()));
// 發現積分可能已經被其他線程修改了
if (updatedScore < deductScore) {
throw new RuntimeException("積分不足");
}
// 扣減數據庫積分記錄
IntegralLog integralLog = new IntegralLog();
integralLog.setUserId(userId);
integralLog.setDeductScore(deductScore);
integralLog.setCreateTime(new Date());
integralLogMapper.insert(integralLog);
// 扣減 Redis 積分
redisTemplate.opsForValue().increment(userId.toString(), -deductScore);
}
在數據庫的可重復讀隔離級別下,事務內多次讀取數據庫數據是一致的,因為數據庫通過 MVCC(多版本并發控制)實現了可重復讀。但 Redis 是內存數據庫,沒有 MVCC 機制,它的數據是實時更新的。所以在事務執行過程中,其他線程可能會修改 Redis 里的數據,導致同一事務內多次讀取 Redis 的數據不一致,這就會出現業務判斷錯誤的情況。
比如在上面的例子中,第一次讀取積分時足夠扣減,但是在業務處理的這一秒鐘內,其他線程可能已經扣減了該用戶的積分,導致第二次讀取時積分不足,這時候再進行扣減就會出錯。而數據庫的可重復讀并不能保證 Redis 數據的一致性,這就是一個典型的分布式數據不一致問題。
四、這些 Redis 數據結構和事務搭配,分分鐘讓你翻車
(一)List 結構的左進右出操作
咱在使用 Redis 的 List 結構做隊列的時候,經常會用到 lpush 和 rpop 操作,模擬一個先進先出的隊列。比如在訂單處理系統中,把訂單號按順序存入 List,然后消費者從另一端取出訂單號進行處理。
@Transactional(rollbackFor = Exception.class)
public void processOrder() {
// 從 Redis 隊列右彈出一個訂單號
String orderId = redisTemplate.opsForList().rightPop("orderQueue");
if (orderId != null) {
// 根據訂單號處理訂單,比如更新訂單狀態到數據庫
Order order = orderMapper.selectByPrimaryKey(orderId);
order.setStatus("處理中");
orderMapper.updateByPrimaryKey(order);
// 模擬處理過程中出現異常
if (order.getAmount() > 10000) {
throw new RuntimeException("大額訂單處理異常");
}
// 處理完成,記錄處理日志
OrderProcessLog log = new OrderProcessLog();
log.setOrderId(orderId);
log.setProcessTime(new Date());
orderProcessLogMapper.insert(log);
}
}
看起來沒問題吧?但是如果在處理訂單的過程中,數據庫操作拋異常回滾了,但是 Redis 里的訂單號已經被 rpop 出去了,這個訂單就相當于丟失了,不會再被其他消費者處理。這就是因為 Redis 的 rpop 操作和數據庫事務沒有關聯,一旦執行就無法回滾,導致數據不一致。
(二)Set 結構的交集操作
還有一次,咱需要根據用戶的標簽來推薦商品,使用 Redis 的 Set 結構存儲用戶標簽和商品標簽,然后通過交集操作找出符合用戶標簽的商品。
@Transactional(rollbackFor = Exception.class)
public List<String> recommendGoods(String userId) {
// 獲取用戶的標簽集合
Set<String> userTags = redisTemplate.opsForSet().members("user:tags:" + userId);
if (userTags == null || userTags.isEmpty()) {
return new ArrayList<>();
}
// 獲取所有商品的標簽集合,并計算交集
Set<String> allGoods = redisTemplate.opsForSet().members("all:goods");
Set<String> recommendedGoods = new HashSet<>();
for (String good : allGoods) {
Set<String> goodTags = redisTemplate.opsForSet().members("good:tags:" + good);
if (goodTags != null && !goodTags.isEmpty() && goodTags.containsAll(userTags)) {
recommendedGoods.add(good);
}
}
// 將推薦的商品存入數據庫推薦表
for (String good : recommendedGoods) {
RecommendRecord record = new RecommendRecord();
record.setUserId(userId);
record.setGoodId(good);
record.setRecommendTime(new Date());
recommendRecordMapper.insert(record);
}
// 模擬異常
if (recommendedGoods.size() > 100) {
throw new RuntimeException("推薦商品數量過多");
}
return new ArrayList<>(recommendedGoods);
}
這里的問題在于,在計算交集的過程中,Redis 的 Set 成員可能會被其他線程修改,導致推薦結果不準確。而且,如果數據庫操作回滾了,已經計算出來的推薦結果并不會影響 Redis 里的數據,但是推薦記錄沒有存入數據庫,這就會出現推薦結果和數據庫記錄不一致的情況。
(三)SortedSet 結構的分數更新
在積分排名系統中,咱常用 SortedSet 來存儲用戶的積分和排名。當用戶積分變化時,需要更新 SortedSet 中的分數。
@Transactional(rollbackFor = Exception.class)
public void updateScore(Long userId, Integer score) {
// 更新 Redis 里的積分和排名
redisTemplate.opsForZSet().add("user:score:rank", userId.toString(), score);
// 更新數據庫里的用戶積分
User user = userMapper.selectByPrimaryKey(userId);
user.setScore(score);
userMapper.updateByPrimaryKey(user);
// 模擬異常
if (score < 0) {
throw new RuntimeException("積分不能為負數");
}
}
同樣的問題,要是數據庫更新失敗回滾了,Redis 里的積分和排名已經更新了,這就導致數據庫和 Redis 數據不一致。而且 SortedSet 的排名是根據分數實時計算的,一旦分數錯誤,排名也會跟著錯,影響整個排名系統的準確性。
五、填坑指南:正確使用 Redis 和 @Transactional 的姿勢
(一)先數據庫后 Redis,事務保護數據庫
經過前面的坑,咱總結出一個基本原則:在涉及數據庫和 Redis 操作的事務中,優先操作數據庫,再操作 Redis,并且利用數據庫事務的原子性來保證業務的一致性。如果數據庫操作失敗,回滾事務,Redis 操作就不會執行(因為 Redis 操作在數據庫操作之后)。
比如前面的扣積分場景,正確的做法是:
public void deduct積分(Long userId, Integer deductScore) {
// 這里不使用 @Transactional 注解,而是在數據庫操作的服務層方法使用
try {
// 先執行數據庫操作,利用數據庫事務
doDeductInDatabase(userId, deductScore);
// 數據庫操作成功后,再操作 Redis
redisTemplate.opsForValue().increment(userId.toString(), -deductScore);
} catch (Exception e) {
// 如果出現異常,需要根據情況處理 Redis 數據,比如恢復積分
// 這里需要注意,Redis 的回滾需要手動處理,因為它沒有事務
redisTemplate.opsForValue().increment(userId.toString(), deductScore);
throw new RuntimeException("積分扣減失敗", e);
}
}
@Transactional(rollbackFor = Exception.class)
private void doDeductInDatabase(Long userId, Integer deductScore) {
// 檢查數據庫中的積分是否足夠
UserIntegral userIntegral = userIntegralMapper.selectByUserId(userId);
if (userIntegral.getScore() < deductScore) {
throw new RuntimeException("積分不足");
}
// 扣減數據庫積分
userIntegral.setScore(userIntegral.getScore() - deductScore);
userIntegralMapper.updateByUserId(userIntegral);
// 插入積分變更記錄
IntegralLog integralLog = new IntegralLog();
integralLog.setUserId(userId);
integralLog.setDeductScore(deductScore);
integralLog.setCreateTime(new Date());
integralLogMapper.insert(integralLog);
}
這樣做的好處是,數據庫操作在事務中,保證了原子性,只有數據庫操作成功了,才會去操作 Redis。如果數據庫操作失敗,事務回滾,Redis 也不會有錯誤的數據。不過要注意,如果 Redis 操作拋出異常,需要手動處理數據庫事務,比如可以使用 @Transactional 的異常處理機制,或者在 Redis 操作失敗時回滾數據庫事務。
(二)異步處理 Redis,解耦事務依賴
如果對實時性要求不是特別高,可以把 Redis 的操作放到異步線程或者消息隊列中處理,這樣就不會和數據庫事務耦合在一起了。比如使用 Spring 的 @Async 注解,或者引入 RabbitMQ、Kafka 等消息中間件。
以 @Async 為例:
@Transactional(rollbackFor = Exception.class)
public void deduct積分(Long userId, Integer deductScore) {
// 執行數據庫操作
UserIntegral userIntegral = userIntegralMapper.selectByUserId(userId);
if (userIntegral.getScore() < deductScore) {
throw new RuntimeException("積分不足");
}
userIntegral.setScore(userIntegral.getScore() - deductScore);
userIntegralMapper.updateByUserId(userIntegral);
IntegralLog integralLog = new IntegralLog();
integralLog.setUserId(userId);
integralLog.setDeductScore(deductScore);
integralLog.setCreateTime(new Date());
integralLogMapper.insert(integralLog);
// 異步處理 Redis 操作
asyncService.updateRedisScore(userId, deductScore);
}
// 異步服務類
@Service
public class AsyncService {
@Async
public void updateRedisScore(Long userId, Integer deductScore) {
try {
redisTemplate.opsForValue().increment(userId.toString(), -deductScore);
} catch (Exception e) {
// 記錄異常日志,后續可以通過補償機制處理
log.error("更新 Redis 積分失敗,userId: {}, deductScore: {}", userId, deductScore, e);
}
}
}
這樣數據庫事務和 Redis 操作解耦了,數據庫事務成功提交后,異步執行 Redis 操作。即使 Redis 操作失敗,也可以通過日志記錄,后續通過定時任務或者補償接口來修復數據,提高了系統的可用性和容錯性。
(三)利用 Redis 管道和事務,減少網絡開銷
雖然 Redis 本身的事務不能和數據庫事務協同工作,但在批量操作 Redis 時,可以使用 Redis 的管道(Pipeline)和事務來減少網絡開銷,保證一批 Redis 命令的原子性(在命令入隊階段不出錯的情況下)。
比如批量插入數據到 Redis 時:
public void batchUpdateRedis(List<Long> userIds, List<Integer> deductScores) {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (int i = 0; i < userIds.size(); i++) {
String userId = userIds.get(i).toString();
Integer deductScore = deductScores.get(i);
connection.incrBy(userId.getBytes(), -deductScore);
}
return null;
});
}
管道可以將多個 Redis 命令打包發送,減少客戶端和服務器之間的網絡往返次數,提高性能。而 Redis 自身的事務可以保證這一批命令要么全部執行,要么都不執行(如果在入隊階段有錯誤),雖然和數據庫事務不同,但在純 Redis 操作的場景下,能保證 Redis 數據的一致性。
(四)緩存數據版本號,解決可重復讀問題
針對在事務中多次讀取 Redis 數據不一致的問題,可以給緩存的數據添加版本號,每次讀取數據時同時讀取版本號,在更新數據時檢查版本號是否一致,保證數據的一致性。
比如:
@Transactional(isolation = Isolation.REPEATABLE_READ, rollbackFor = Exception.class)
publicvoid checkAndDeduct積分(Long userId, Integer deductScore) {
// 第一次讀取 Redis 積分和版本號
String scoreKey = "user:score:" + userId;
String versionKey = "user:score:version:" + userId;
Integer currentScore = Integer.parseInt(redisTemplate.opsForValue().get(scoreKey));
Long version = Long.parseLong(redisTemplate.opsForValue().get(versionKey));
// 業務處理
if (currentScore < deductScore) {
thrownew RuntimeException("積分不足");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 第二次讀取版本號,檢查是否有變化
Long newVersion = Long.parseLong(redisTemplate.opsForValue().get(versionKey));
if (!version.equals(newVersion)) {
thrownew RuntimeException("積分數據已被修改,操作失敗");
}
// 扣減數據庫積分記錄
IntegralLog integralLog = new IntegralLog();
integralLog.setUserId(userId);
integralLog.setDeductScore(deductScore);
integralLog.setCreateTime(new Date());
integralLogMapper.insert(integralLog);
// 扣減 Redis 積分,并更新版本號
redisTemplate.opsForValue().increment(scoreKey, -deductScore);
redisTemplate.opsForValue().increment(versionKey);
}
通過版本號的方式,在事務中檢查數據是否被修改過,如果被修改過,就拋出異常,終止操作,保證了業務邏輯的正確性。
(五)使用分布式事務框架,解決跨資源事務問題
如果項目中涉及到多個數據庫和 Redis 等多個資源的事務協調,就需要使用分布式事務框架了,比如 Seata、TCC-Transaction 等。這些框架可以幫助我們管理跨資源的事務,保證最終一致性。
以 Seata 的 AT 模式為例,大致步驟如下:
- 定義事務的入口,開啟全局事務。
- 在操作數據庫和 Redis 的服務中,注冊分支事務。
- 數據庫操作通過 Seata 的代理數據源來實現自動生成回滾日志,保證可回滾性。
- Redis 操作需要手動實現補償邏輯,比如在分支事務回滾時,執行相反的操作(如增加積分來補償之前的扣減)。
不過分布式事務框架比較復雜,會增加系統的復雜度和性能開銷,所以在使用時需要根據項目的實際情況來選擇,不要盲目引入。
六、總結:踩坑不可怕,怕的是不總結
咱今天嘮了這么多 Redis 和 @Transactional 一起使用時的坑,總結起來就是一句話:Redis 操作和數據庫事務是兩個不同的世界,不能想當然地認為它們會自動協同工作。在使用時,一定要明確它們的邊界,根據業務場景選擇合適的方案。
記住這幾個關鍵點:
- 優先保證數據庫事務的原子性,Redis 操作放在數據庫操作之后,且做好異常處理和補償機制。
- 對實時性要求不高的場景,異步處理 Redis 操作,解耦事務依賴。
- 復雜的分布式事務場景,使用專業的分布式事務框架,不要自己硬剛。
- 了解 Redis 數據結構的特性,避免在事務中使用可能導致數據不一致的操作。
咱踩過的坑,希望你們別再踩了。要是覺得這篇文章有用,趕緊收藏轉發,讓更多的兄弟避避坑。