字節(jié)二面:你有沒有用過(guò)分布式鎖?有哪些分布式鎖實(shí)現(xiàn)方案?使用分布式鎖有哪些優(yōu)缺點(diǎn)?
引言
隨著業(yè)務(wù)規(guī)模的不斷擴(kuò)張和技術(shù)架構(gòu)的演進(jìn),分布式系統(tǒng)已經(jīng)成為支撐高并發(fā)、海量數(shù)據(jù)處理的關(guān)鍵基礎(chǔ)設(shè)施。在分布式環(huán)境中,各個(gè)節(jié)點(diǎn)相對(duì)獨(dú)立且可能并發(fā)地執(zhí)行任務(wù),這極大地提升了系統(tǒng)的整體性能和可用性。當(dāng)涉及到對(duì)共享資源的訪問(wèn)和修改時(shí),為了確保數(shù)據(jù)的一致性和正確性,我們需要一種能在多節(jié)點(diǎn)間協(xié)調(diào)并發(fā)操作的技術(shù)手段,也就是分布式鎖。
傳統(tǒng)的單機(jī)環(huán)境下,進(jìn)程內(nèi)可以通過(guò)本地鎖輕松實(shí)現(xiàn)對(duì)臨界區(qū)資源的互斥訪問(wèn)。但是,這一方法在分布式系統(tǒng)中不再適用,因?yàn)閱螜C(jī)鎖無(wú)法跨越網(wǎng)絡(luò)邊界,無(wú)法保證不同節(jié)點(diǎn)間的并發(fā)控制。分布式鎖正是在這種背景下產(chǎn)生,它是一種能夠?qū)崿F(xiàn)在分布式系統(tǒng)中多個(gè)節(jié)點(diǎn)之間協(xié)同工作的鎖機(jī)制,旨在保護(hù)共享資源不受并發(fā)沖突的影響,確保在復(fù)雜的分布式場(chǎng)景下數(shù)據(jù)操作的有序性和一致性。
庫(kù)存扣減
我們以WMS系統(tǒng)中,訂單出入庫(kù)操作庫(kù)存為例。
CREATE TABLE `tb_inventory`
(
`id` BIGINT NOT NULL AUTO_INCREMENT,
`account_id` BIGINT NOT NULL DEFAULT 0 COMMENT '帳套ID',
`sku` VARCHAR(128) NOT NULL DEFAULT '' COMMENT '商品sku編碼',
`warehouse_code` VARCHAR(16) NOT NULL DEFAULT '' COMMENT '庫(kù)存編碼',
`available_inventory` INT UNSIGNED NOT NULL DEFAULT 0 COMMENT '可用庫(kù)存',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改時(shí)間',
`deleted` TINYINT UNSIGNED NULL DEFAULT 0 COMMENT '0-未刪除 1/null-已刪除',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY uk_warehouse_code (customer_no, warehouse_code, sku, deleted)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
CHARACTER SET = utf8mb4 COMMENT = '庫(kù)存表';
庫(kù)存表為示例所用,無(wú)實(shí)際業(yè)務(wù)參考意義。
關(guān)于操作庫(kù)存,常見有以下一些錯(cuò)誤做法:
1、內(nèi)存中判斷庫(kù)存是否充足,并完成扣減
直接在內(nèi)存中判斷是否有庫(kù)存,計(jì)算扣減之后的值更新數(shù)據(jù)庫(kù),并發(fā)的情況下會(huì)導(dǎo)致庫(kù)存相互覆蓋發(fā)。
/**
* 確認(rèn)訂單出庫(kù)
*
* @param customerNo
* @param orderNo
*/
@Transactional(rollbackFor = Exception.class)
@Override
public void confirmOrder(String customerNo, String orderNo) {
// 查詢訂單信息
OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);
String warehouseCode = outboundOrderDO.getWarehouseCode();
// 忽略 訂單信息校驗(yàn)等,,,
// 查詢訂單明細(xì) 假設(shè)我們的出庫(kù)訂單是一單一件
OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());
String sku = detailDO.getSku();
Integer qty = detailDO.getQty();
// 查詢庫(kù)存
TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);
Integer availableInventory = inventoryDO.getAvailableInventory();
// 判斷庫(kù)存是否足夠
if (qty > availableInventory){
throw new ServiceException(StatusEnum.SERVICE_ERROR, "庫(kù)存不足,不能出庫(kù)");
}
// 剩余庫(kù)存
Integer remainInventory = availableInventory - qty;
// 扣減庫(kù)存
TbInventoryDO updateInventory = new TbInventoryDO();
updateInventory.setCustomerNo(customerNo);
updateInventory.setWarehouseCode(warehouseCode);
updateInventory.setSku(sku);
updateInventory.setAvailableInventory(remainInventory);
tbInventoryMapper.updateInventory(updateInventory);
}
sql中直接執(zhí)行更新庫(kù)存
<update id="updateInventory">
UPDATE tb_inventory
SET available_inventory = #{availableInventory}
WHERE sku = #{sku}
AND customer_no = #{customerNo}
AND warehouse_code = #{warehouseCode}
AND deleted = 0
</update>
庫(kù)存SKU的庫(kù)存已經(jīng)變成了負(fù)數(shù):
圖片
2、內(nèi)存中判斷庫(kù)存是否充足,Sql中執(zhí)行庫(kù)存扣減
在InnoDB存儲(chǔ)引擎下,UPDATE通常會(huì)應(yīng)用行鎖,所以在SQL中加入運(yùn)算避免值的相互覆蓋,但是庫(kù)存的數(shù)量還是可能變?yōu)樨?fù)數(shù)。因?yàn)樾r?yàn)庫(kù)存是否充足在內(nèi)存中執(zhí)行,并發(fā)情況下都會(huì)讀到有庫(kù)存。
/**
* 確認(rèn)訂單出庫(kù)
*
* @param customerNo
* @param orderNo
*/
@Transactional(rollbackFor = Exception.class)
@Override
public void confirmOrder(String customerNo, String orderNo) {
// 查詢訂單信息
OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);
String warehouseCode = outboundOrderDO.getWarehouseCode();
// 忽略 訂單信息校驗(yàn)等,,,
// 查詢訂單明細(xì) 假設(shè)我們的出庫(kù)訂單是一單一件
OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());
String sku = detailDO.getSku();
Integer qty = detailDO.getQty();
// 查詢庫(kù)存
TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);
Integer availableInventory = inventoryDO.getAvailableInventory();
// 判斷庫(kù)存是否足夠
if (qty > availableInventory){
throw new ServiceException(StatusEnum.SERVICE_ERROR, "庫(kù)存不足,不能出庫(kù)");
}
// 扣減庫(kù)存
TbInventoryDO updateInventory = new TbInventoryDO();
updateInventory.setCustomerNo(customerNo);
updateInventory.setWarehouseCode(warehouseCode);
updateInventory.setSku(sku);
// 庫(kù)存差值
updateInventory.setDiffInventory(qty);
tbInventoryMapper.updateInventory(updateInventory);
}
庫(kù)存扣減在sql中進(jìn)行
<update id="updateInventory">
UPDATE tb_inventory
SET available_inventory = available_inventory - #{diffInventory}
WHERE sku = #{sku}
AND customer_no = #{customerNo}
AND warehouse_code = #{warehouseCode}
AND deleted = 0
</update>
庫(kù)存SKU的庫(kù)存已經(jīng)變成了負(fù)數(shù):
圖片
在操作庫(kù)存方法上使用synchronized
雖然synchronized可以防止在多并發(fā)環(huán)境下,多個(gè)線程并發(fā)訪問(wèn)這個(gè)庫(kù)存操作方法,但是synchronized的作用在方法結(jié)束之后就失效了,可能此時(shí)事務(wù)并沒有提交,導(dǎo)致可能其他的線程會(huì)在拿到鎖之后讀取到舊庫(kù)存數(shù)據(jù),在執(zhí)行扣除時(shí),依然可能會(huì)造成庫(kù)存扣減不對(duì)。
/**
* 確認(rèn)訂單出庫(kù)
*
* @param customerNo
* @param orderNo
*/
@Transactional(rollbackFor = Exception.class)
@Override
public synchronized void confirmOrder(String customerNo, String orderNo) {
// 查詢訂單信息
OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);
String warehouseCode = outboundOrderDO.getWarehouseCode();
// 忽略 訂單信息校驗(yàn)等,,,
// 查詢訂單明細(xì) 假設(shè)我們的出庫(kù)訂單是一單一件
OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());
String sku = detailDO.getSku();
Integer qty = detailDO.getQty();
// 查詢庫(kù)存
TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);
Integer availableInventory = inventoryDO.getAvailableInventory();
// 判斷庫(kù)存是否足夠
if (qty > availableInventory){
throw new ServiceException(StatusEnum.SERVICE_ERROR, "庫(kù)存不足,不能出庫(kù)");
}
// 扣減庫(kù)存
TbInventoryDO updateInventory = new TbInventoryDO();
updateInventory.setCustomerNo(customerNo);
updateInventory.setWarehouseCode(warehouseCode);
updateInventory.setSku(sku);
// 庫(kù)存差值
updateInventory.setDiffInventory(qty);
tbInventoryMapper.updateInventory(updateInventory);
}
庫(kù)存SKU的庫(kù)存已經(jīng)變成了負(fù)數(shù):
圖片
從上面的錯(cuò)誤案例來(lái)看,在操作庫(kù)存時(shí),不是原子性的,導(dǎo)致庫(kù)存操作失敗。以下我們從單體以及分布式系統(tǒng)兩個(gè)方向探討如何保證數(shù)據(jù)的一致性和正確性。
單機(jī)系統(tǒng)
在單機(jī)系統(tǒng)中,數(shù)據(jù)和業(yè)務(wù)邏輯都集中在一個(gè)進(jìn)程中,面對(duì)并發(fā)訪問(wèn)共享資源的情況,需要依靠鎖機(jī)制和數(shù)據(jù)庫(kù)的事務(wù)管理(行鎖)來(lái)維護(hù)數(shù)據(jù)的正確性和一致性。
對(duì)于鎖機(jī)制,我們不管是采用synchronized還是Lock等,我們要保證的一個(gè)條件就是:要讓數(shù)據(jù)庫(kù)的事務(wù)在鎖的控制范圍之內(nèi)。
針對(duì)上述錯(cuò)誤案例,我們可以將鎖作用于事務(wù)之外,即將鎖放在庫(kù)存操作方法的上一層(例如service層)。
@Service
public class OrderServiceImpl implements IOrderService {
private IOrderManager orderManager;
/**
* 確認(rèn)訂單出庫(kù)
*
* @param customerNo
* @param orderNo
*/
@Override
public synchronized void confirmOrder(String customerNo, String orderNo) {
orderManager.confirmOrder(customerNo, orderNo);
}
@Autowired
public void setOrderManager(IOrderManager orderManager) {
this.orderManager = orderManager;
}
}
此時(shí)我們?cè)诓僮鲙?kù)存,會(huì)因?yàn)閹?kù)存不夠,導(dǎo)致庫(kù)存操作失敗:
圖片
這種方式雖然可以實(shí)現(xiàn)數(shù)據(jù)一致性和正確性,但是并不是很推薦,因?yàn)槲覀兊氖聞?wù)要控制的粒度盡可能的小。
推薦的方式,是我們?cè)冁i的控制范圍去提交事務(wù)。即手動(dòng)提交事務(wù)。使用TransactionTemplate或直接在代碼中調(diào)用PlatformTransactionManager的getTransaction和commit方法來(lái)手動(dòng)管理事務(wù)。
@Autowired
private PlatformTransactionManager transactionManager;
/**
* 確認(rèn)訂單出庫(kù)
*
* @param customerNo
* @param orderNo
*/
@Override
public synchronized void confirmOrder(String customerNo, String orderNo) {
// 查詢訂單信息
OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);
String warehouseCode = outboundOrderDO.getWarehouseCode();
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
// 忽略 訂單信息校驗(yàn)等,,,
// 查詢訂單明細(xì) 假設(shè)我們的出庫(kù)訂單是一單一件
OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());
String sku = detailDO.getSku();
Integer qty = detailDO.getQty();
// 查詢庫(kù)存
TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);
Integer availableInventory = inventoryDO.getAvailableInventory();
// 判斷庫(kù)存是否足夠
if (qty > availableInventory){
System.err.println("庫(kù)存不足,不能出庫(kù)");
throw new ServiceException(StatusEnum.SERVICE_ERROR, "庫(kù)存不足,不能出庫(kù)");
}
// 扣減庫(kù)存
TbInventoryDO updateInventory = new TbInventoryDO();
updateInventory.setCustomerNo(customerNo);
updateInventory.setWarehouseCode(warehouseCode);
updateInventory.setSku(sku);
// 庫(kù)存差值
updateInventory.setDiffInventory(qty);
tbInventoryMapper.updateInventory(updateInventory);
// 提交事務(wù)
transactionManager.commit(status);
}
此時(shí)我們?cè)偃?zhí)庫(kù)存操作,會(huì)因?yàn)閹?kù)存不夠,導(dǎo)致庫(kù)存操作失敗:
圖片
對(duì)于上述同步鎖的實(shí)現(xiàn),我們最好使用Lock得方式去實(shí)現(xiàn),可以更精細(xì)控制同步邏輯。
@Autowired
private PlatformTransactionManager transactionManager;
private final Lock orderLock = new ReentrantLock();
/**
* 確認(rèn)訂單出庫(kù)
*
* @param customerNo
* @param orderNo
*/
@Override
public void confirmOrder(String customerNo, String orderNo) {
// 查詢訂單信息
OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);
String warehouseCode = outboundOrderDO.getWarehouseCode();
try {
// 嘗試獲取鎖,最多等待timeout時(shí)間
if (orderLock.tryLock(1, TimeUnit.SECONDS)) {
// 成功獲取到鎖,執(zhí)行確認(rèn)訂單的邏輯
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
// 忽略 訂單信息校驗(yàn)等,,,
// 查詢訂單明細(xì) 假設(shè)我們的出庫(kù)訂單是一單一件
OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());
String sku = detailDO.getSku();
Integer qty = detailDO.getQty();
// 查詢庫(kù)存
TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);
Integer availableInventory = inventoryDO.getAvailableInventory();
// 判斷庫(kù)存是否足夠
if (qty > availableInventory){
System.err.println("庫(kù)存不足,不能出庫(kù)");
throw new ServiceException(StatusEnum.SERVICE_ERROR, "庫(kù)存不足,不能出庫(kù)");
}
// 扣減庫(kù)存
TbInventoryDO updateInventory = new TbInventoryDO();
updateInventory.setCustomerNo(customerNo);
updateInventory.setWarehouseCode(warehouseCode);
updateInventory.setSku(sku);
// 庫(kù)存差值
updateInventory.setDiffInventory(qty);
tbInventoryMapper.updateInventory(updateInventory);
// 提交事務(wù)
transactionManager.commit(status);
}catch (Exception e){
// 回滾事務(wù)
transactionManager.rollback(status);
// 處理異常
e.printStackTrace();
}finally {
// 釋放鎖
orderLock.unlock();
}
} else {
// 獲取鎖超時(shí)
System.out.println("Failed to confirm order within the timeout period: " +orderNo);
// 處理超時(shí)情況,比如記錄日志、通知用戶等
}
} catch (InterruptedException e) {
// 如果在等待鎖的過(guò)程中線程被中斷,處理中斷異常
Thread.currentThread().interrupt();
// ... 處理中斷邏輯 ...
}
}
在單機(jī)系統(tǒng)中,上述方法可以保證數(shù)據(jù)一致性以及正確性,但是實(shí)際業(yè)務(wù)中,我們應(yīng)用通常都部署在多個(gè)服務(wù)器中,此時(shí)上述方案就不能保證了,就需要分布式鎖來(lái)解決了。
分布式鎖的實(shí)現(xiàn)
在單機(jī)系統(tǒng)中,鎖是一種基本的同步機(jī)制,用于控制多個(gè)線程對(duì)共享資源的并發(fā)訪問(wèn)。當(dāng)我們升級(jí)到分布式系統(tǒng)時(shí),由于服務(wù)分散在多個(gè)節(jié)點(diǎn)之上,原本在單機(jī)環(huán)境下使用的鎖機(jī)制無(wú)法直接跨越多個(gè)節(jié)點(diǎn)來(lái)協(xié)調(diào)資源訪問(wèn)。所以此時(shí),分布式鎖作為一種擴(kuò)展的鎖概念應(yīng)運(yùn)而生。分布式鎖是一種跨多個(gè)節(jié)點(diǎn)、進(jìn)程或服務(wù)的同步原語(yǔ),它允許在分布式系統(tǒng)中協(xié)調(diào)對(duì)共享資源的訪問(wèn),確保在任何時(shí)候只有一個(gè)節(jié)點(diǎn)能夠獨(dú)占地執(zhí)行操作,即使這些節(jié)點(diǎn)分布在不同的物理或虛擬機(jī)器上。
分布式鎖的基本要素
1. 互斥性: 這是分布式鎖最基本的要求,意味著在任意時(shí)刻,只有一個(gè)客戶端(無(wú)論是進(jìn)程、線程還是服務(wù)實(shí)例)能夠持有并使用鎖,從而確保共享資源不會(huì)同時(shí)被多個(gè)客戶端修改。
2. 持久性: 分布式鎖必須具備一定的持久化能力,即便服務(wù)重啟或網(wǎng)絡(luò)短暫斷開,鎖的狀態(tài)仍然能夠得到保持。
3. 可重入性: 類似于單機(jī)環(huán)境下的可重入鎖,分布式鎖也應(yīng)該支持同一客戶端在持有鎖的同時(shí)再次請(qǐng)求鎖而不被阻塞,這對(duì)于遞歸調(diào)用或涉及多個(gè)資源訪問(wèn)的操作至關(guān)重要。
4. 公平性(Fairness): 在某些場(chǎng)景下,要求鎖分配遵循一定的公平原則,即等待最久的客戶端在鎖釋放時(shí)優(yōu)先獲得鎖。雖然不是所有分布式鎖實(shí)現(xiàn)都需要考慮公平性,但在某些高性能或高并發(fā)的系統(tǒng)中,公平性是非常重要的。
5. 容錯(cuò)性: 分布式鎖服務(wù)應(yīng)當(dāng)具備一定的容錯(cuò)能力,即即使一部分服務(wù)節(jié)點(diǎn)發(fā)生故障,仍能保證鎖功能的正確運(yùn)行,防止死鎖和數(shù)據(jù)不一致。這通常通過(guò)服務(wù)冗余和復(fù)制機(jī)制來(lái)實(shí)現(xiàn),如使用Raft、Paxos等一致性協(xié)議或基于ZooKeeper、etcd等分布式協(xié)調(diào)服務(wù)。
常見分布式鎖解決方案
基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)
1.數(shù)據(jù)庫(kù)悲觀鎖
悲觀鎖以預(yù)防性策略處理并發(fā)沖突,它假設(shè)并發(fā)訪問(wèn)導(dǎo)致的數(shù)據(jù)沖突是常態(tài)。因此,在訪問(wèn)數(shù)據(jù)之前,它會(huì)積極地獲取并持有鎖,確保在鎖未釋放時(shí),其他事務(wù)無(wú)法對(duì)同一數(shù)據(jù)進(jìn)行訪問(wèn)。通過(guò)運(yùn)用SELECT ... FOR UPDATE SQL語(yǔ)句,能夠在查詢階段即鎖定相關(guān)行,實(shí)現(xiàn)數(shù)據(jù)的獨(dú)占訪問(wèn)。然而,重要的是要注意,此操作應(yīng)僅針對(duì)唯一鍵執(zhí)行,否則可能會(huì)大幅增加鎖定范圍和潛在的鎖表風(fēng)險(xiǎn),從而影響系統(tǒng)的并發(fā)性能與效率。
最常見的做法是直接在業(yè)務(wù)數(shù)據(jù)上使用SELECT ... FOR UPDATE,例如:
<select id="selectSkuInventoryForUpdate" resultType="com.springboot.mybatis.entity.TbInventoryDO">
SELECT *
FROM tb_inventory
WHERE sku = #{sku}
AND customer_no = #{customerNo}
AND warehouse_code = #{warehouseCode}
AND deleted = 0
FOR UPDATE
</select>
在一個(gè)事務(wù)中,先使用SELECT ... FOR UPDATE后,在執(zhí)行更新。
/**
* 使用SELECT... FOR UPDATE 實(shí)現(xiàn)分布式鎖,扣減庫(kù)存
* @param customerNo
* @param orderNo
*/
@Transactional(rollbackFor = Exception.class)
@Override
public void confirmOrderWithLock(String customerNo, String orderNo) {
// 查詢訂單信息
OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);
String warehouseCode = outboundOrderDO.getWarehouseCode();
// 查詢訂單明細(xì) 假設(shè)我們的出庫(kù)訂單是一單一件
OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());
String sku = detailDO.getSku();
Integer qty = detailDO.getQty();
// 查詢庫(kù)存
TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventoryForUpdate(customerNo, warehouseCode, sku);
Integer availableInventory = inventoryDO.getAvailableInventory();
// 判斷庫(kù)存是否足夠
if (qty > availableInventory){
System.err.println("庫(kù)存不足,不能出庫(kù)");
throw new ServiceException(StatusEnum.SERVICE_ERROR, "庫(kù)存不足,不能出庫(kù)");
}
// 扣減庫(kù)存
TbInventoryDO updateInventory = new TbInventoryDO();
updateInventory.setCustomerNo(customerNo);
updateInventory.setWarehouseCode(warehouseCode);
updateInventory.setSku(sku);
// 庫(kù)存差值
updateInventory.setDiffInventory(qty);
tbInventoryMapper.updateInventory(updateInventory);
}
但是,這種實(shí)現(xiàn)方式,很容易造成業(yè)務(wù)表的鎖壓力,特別是數(shù)據(jù)量大,并發(fā)量高的時(shí)候。所以,還有一種做法是,專門維護(hù)一張鎖的表,而不是直接在業(yè)務(wù)數(shù)據(jù)表上使用SELECT FOR UPDATE。這種方式在某些場(chǎng)景下可以幫助簡(jiǎn)化鎖的管理,并且可以在一定程度上減輕對(duì)業(yè)務(wù)數(shù)據(jù)表的鎖定壓力。(其實(shí)實(shí)現(xiàn)方式,類似Redis實(shí)現(xiàn)的分布式鎖,只是用數(shù)據(jù)庫(kù)實(shí)現(xiàn)了而已)。其實(shí)現(xiàn)流程,如下:
數(shù)據(jù)庫(kù)實(shí)現(xiàn)悲觀鎖流程
1. 創(chuàng)建鎖表:首先,創(chuàng)建一張鎖表,例如lock_table,包含lock_key(用于標(biāo)識(shí)需要鎖定的業(yè)務(wù)資源)、lock_holder(持有鎖的客戶端標(biāo)識(shí),如用戶ID或事務(wù)ID)、acquire_time(獲取鎖的時(shí)間)等字段。
CREATE TABLE `tb_lock`
(
id BIGINT AUTO_INCREMENT
PRIMARY KEY,
lock_key VARCHAR(255) NOT NULL DEFAULT '' COMMENT '鎖的業(yè)務(wù)編碼。對(duì)應(yīng)業(yè)務(wù)表的唯一鍵',
lock_holder VARCHAR(32) NOT NULL DEFAULT '' COMMENT '持有鎖的客戶端標(biāo)識(shí)',
acquire_time DATETIME NOT NULL COMMENT '獲取鎖的時(shí)間',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL COMMENT '創(chuàng)建時(shí)間',
update_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修改時(shí)間',
deleted TINYINT UNSIGNED DEFAULT '0' NULL COMMENT '0-未刪除 1/null-已刪除',
UNIQUE KEY uk_lock (lock_key, deleted)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
CHARACTER SET = utf8mb4 COMMENT = 'Lock表';
- 插入鎖記錄:當(dāng)客戶端想要獲取鎖時(shí),嘗試在lock_table中插入一條記錄,其中l(wèi)ock_key對(duì)應(yīng)需要保護(hù)的業(yè)務(wù)資源,例如商品SKU。插入操作通常是通過(guò)INSERT INTO ... ON DUPLICATE KEY UPDATE這樣的語(yǔ)句實(shí)現(xiàn),以確保在存在相同鎖鍵的情況下更新記錄,否則插入新記錄,這一步相當(dāng)于獲取鎖。
<insert id="insertLock">
INSERT INTO tb_lock
(lock_key,lock_holder,acquire_time)
VALUES
(#{lockKey},#{lockHolder},#{acquireTime})
</insert>
- 使用 SELECT FOR UPDATE:在插入鎖記錄時(shí),可以通過(guò)SELECT ... FOR UPDATE鎖定鎖表中的相應(yīng)記錄,確保在當(dāng)前事務(wù)結(jié)束前,其他事務(wù)無(wú)法更新或刪除這條鎖記錄。
<select id="selectLockByLockKey" resultType="com.springboot.mybatis.entity.TbLockDO">
SELECT *
FROM tb_lock
WHERE lock_key = #{lockKey} AND deleted = 0
FOR UPDATE
</select>
4. 檢查鎖狀態(tài):在獲取鎖時(shí),可以檢查鎖是否已被持有,比如檢查lock_holder字段,如果已有其他事務(wù)持有鎖,則獲取鎖失敗,需要等待或重試。
// 嘗試獲取鎖
tryLock(lockKey, lockHolder);
// 使用SELECT FOR UPDATE鎖定鎖表記錄
TbLockDO tbLockDO = tbLockMapper.selectLockByLockKey(lockKey);
if (!tbLockDO.getLockHolder().equals(lockHolder)) {
// 鎖已被其他客戶端持有,獲取鎖失敗,需要處理此異常情況
throw new IllegalStateException("Lock is held by another client.");
}
- 釋放鎖:當(dāng)業(yè)務(wù)操作完成時(shí),可以通過(guò)刪除或更新鎖表中的對(duì)應(yīng)記錄來(lái)釋放鎖。
<delete id="deleteLockByLockKey" parameterType="java.lang.String">
DELETE FROM tb_lock
WHERE lock_key = #{lockKey}
AND lock_holder = #{lockHolder}
AND deleted = 0
</delete>
基于數(shù)據(jù)庫(kù)悲觀鎖實(shí)現(xiàn),代碼如下:
@Transactional(rollbackFor = Exception.class)
@Override
public void confirmOrderWithLock(String customerNo, String orderNo) {
// 查詢訂單信息
OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);
String warehouseCode = outboundOrderDO.getWarehouseCode();
// 查詢訂單明細(xì) 假設(shè)我們的出庫(kù)訂單是一單一件
OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());
String sku = detailDO.getSku();
Integer qty = detailDO.getQty();
String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);
String lockHolder = Thread.currentThread().getName();
try {
// 嘗試獲取鎖
tryLock(lockKey, lockHolder);
// 使用SELECT FOR UPDATE鎖定鎖表記錄
TbLockDO tbLockDO = tbLockMapper.selectLockByLockKey(lockKey);
if (!tbLockDO.getLockHolder().equals(lockHolder)) {
// 鎖已被其他客戶端持有,獲取鎖失敗,需要處理此異常情況
throw new IllegalStateException("Lock is held by another client.");
}
// 查詢庫(kù)存
TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventoryForUpdate(customerNo, warehouseCode, sku);
Integer availableInventory = inventoryDO.getAvailableInventory();
// 判斷庫(kù)存是否足夠
if (qty > availableInventory){
System.err.println("庫(kù)存不足,不能出庫(kù)");
throw new ServiceException(StatusEnum.SERVICE_ERROR, "庫(kù)存不足,不能出庫(kù)");
}
// 扣減庫(kù)存
TbInventoryDO updateInventory = new TbInventoryDO();
updateInventory.setCustomerNo(customerNo);
updateInventory.setWarehouseCode(warehouseCode);
updateInventory.setSku(sku);
// 庫(kù)存差值
updateInventory.setDiffInventory(qty);
tbInventoryMapper.updateInventory(updateInventory);
}finally {
unlock(lockKey, lockHolder);
}
}
/**
* 嘗試獲取鎖
* @param lockKey 鎖的key 業(yè)務(wù)編碼
* @param lockHolder 鎖的持有者
* @return 是否獲取成功
*/
private void tryLock(String lockKey, String lockHolder) {
TbLockDO tbLockDO = new TbLockDO();
tbLockDO.setLockKey(lockKey);
tbLockDO.setLockHolder(lockHolder);
tbLockDO.setAcquireTime(LocalDateTime.now());
//插入一條數(shù)據(jù) insert into
tbLockMapper.insertLock(tbLockDO);
}
/**
* 鎖釋放
* @param lockKey 鎖的key 業(yè)務(wù)編碼
*/
private void unlock(String lockKey, String lockHolder){
tbLockMapper.deleteLockByLockKey(lockKey, lockHolder);
}
圖片
數(shù)據(jù)庫(kù)悲觀鎖實(shí)現(xiàn)分布式鎖可以防止并發(fā)沖突,確保在事務(wù)結(jié)束前,這些記錄不會(huì)被其他并發(fā)事務(wù)修改。它還可以控制鎖的粒度,提供行級(jí)別的鎖定,減少鎖定范圍,提高并發(fā)性能。這種方式非常適合于處理需要更新的事務(wù)場(chǎng)景,特別是銀行轉(zhuǎn)賬、庫(kù)存扣減等需要保證數(shù)據(jù)完整性和一致性的操作。
但是,需要注意的是,過(guò)度或不當(dāng)使用SELECT FOR UPDATE會(huì)導(dǎo)致更多的行被鎖定,在高并發(fā)場(chǎng)景下,如果大量事務(wù)都在等待獲取鎖,可能會(huì)導(dǎo)致鎖等待和死鎖問(wèn)題,并且當(dāng)事務(wù)持有SELECT FOR UPDATE的鎖時(shí),其他事務(wù)嘗試修改這些鎖定的行會(huì)陷入等待狀態(tài),直至鎖釋放。這可能導(dǎo)致其他事務(wù)的延遲和系統(tǒng)吞吐量下降,長(zhǎng)時(shí)間持有鎖會(huì)導(dǎo)致數(shù)據(jù)庫(kù)資源(如內(nèi)存、連接數(shù)等)消耗增大,特別是長(zhǎng)事務(wù)中持有鎖時(shí)間較長(zhǎng),會(huì)影響系統(tǒng)的總體性能。所以我們?cè)谑褂脮r(shí)要特別注意不要再長(zhǎng)事務(wù)中使用悲觀鎖。
2.數(shù)據(jù)庫(kù)樂(lè)觀鎖
樂(lè)觀鎖假定并發(fā)沖突不太可能發(fā)生,因此在讀取數(shù)據(jù)時(shí)不鎖定資源,而是在更新數(shù)據(jù)時(shí)驗(yàn)證數(shù)據(jù)是否被其他事務(wù)修改過(guò)。
在數(shù)據(jù)庫(kù)表中添加一個(gè)version字段。
ALTER TABLE `tb_inventory` ADD COLUMN `version` INT NOT NULL DEFAULT 0 COMMENT '樂(lè)觀鎖版本' AFTER available_inventory;
每次更新時(shí)將version字段加1。在更新數(shù)據(jù)時(shí),通過(guò)UPDATE語(yǔ)句附帶WHERE version = oldVersion條件,只有當(dāng)version值不變時(shí)更新操作才會(huì)成功。若version已變,則表示數(shù)據(jù)已被其他事務(wù)修改,此次更新失敗。
<update id="updateInventorWithVersion">
UPDATE tb_inventory
SET available_inventory = available_inventory - #{diffInventory},
version = #{version} + 1
WHERE sku = #{sku}
AND customer_no = #{customerNo}
AND warehouse_code = #{warehouseCode}
AND version = #{version}
AND deleted = 0
</update>
基于樂(lè)觀鎖實(shí)現(xiàn)的方案:
@Transactional(rollbackFor = Exception.class)
@Override
public void confirmOrderWithVersion(String customerNo, String orderNo) {
// 查詢訂單信息
OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);
String warehouseCode = outboundOrderDO.getWarehouseCode();
// 查詢訂單明細(xì) 假設(shè)我們的出庫(kù)訂單是一單一件
OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());
String sku = detailDO.getSku();
Integer qty = detailDO.getQty();
// 查詢庫(kù)存
TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);
Integer availableInventory = inventoryDO.getAvailableInventory();
Integer curVersion = inventoryDO.getVersion();
// 判斷庫(kù)存是否足夠
if (qty > availableInventory){
System.err.println("庫(kù)存不足,不能出庫(kù)");
throw new ServiceException(StatusEnum.SERVICE_ERROR, "庫(kù)存不足,不能出庫(kù)");
}
// 扣減庫(kù)存
TbInventoryDO updateInventory = new TbInventoryDO();
updateInventory.setCustomerNo(customerNo);
updateInventory.setWarehouseCode(warehouseCode);
updateInventory.setSku(sku);
// 設(shè)置當(dāng)前數(shù)據(jù)版本號(hào)
updateInventory.setVersion(curVersion);
// 庫(kù)存差值
updateInventory.setDiffInventory(qty);
updateInventory.setVersion(inventoryDO.getVersion());
int updateRows = tbInventoryMapper.updateInventorWithVersion(updateInventory);
if (updateRows != 1){
System.err.println("更新庫(kù)存時(shí)發(fā)生并發(fā)沖突,請(qǐng)重試");
throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新庫(kù)存時(shí)發(fā)生并發(fā)沖突,請(qǐng)重試");
}
}
圖片
樂(lè)觀鎖假定大多數(shù)情況下不會(huì)有并發(fā)沖突,所以在讀取數(shù)據(jù)時(shí)不立即加鎖,而是等到更新數(shù)據(jù)時(shí)才去檢查是否有其他事務(wù)進(jìn)行了改動(dòng),這樣可以減少鎖的持有時(shí)間,提高了系統(tǒng)的并發(fā)性能。并且,樂(lè)觀鎖在數(shù)據(jù)更新時(shí)才檢查沖突,而不是在獲取數(shù)據(jù)時(shí)就加鎖,所以大大降低了死鎖的風(fēng)險(xiǎn)。并且因?yàn)椴怀<渔i,所以減少了數(shù)據(jù)庫(kù)級(jí)別的鎖管理開銷,非常適合對(duì)于讀多寫少的場(chǎng)景。
但是,當(dāng)并發(fā)寫入較多時(shí),可能出現(xiàn)大量更新沖突,需要不斷地重試事務(wù)以獲得成功的更新。過(guò)多的重試可能導(dǎo)致性能下降,特別是在并發(fā)度極高時(shí),可能會(huì)形成“ABA”問(wèn)題。并且 在極端并發(fā)條件下,如果沒有正確的重試機(jī)制或超時(shí)機(jī)制,樂(lè)觀鎖可能無(wú)法保證強(qiáng)一致性。尤其是在涉及多個(gè)表的復(fù)雜事務(wù)中,單個(gè)樂(lè)觀鎖可能不足以解決所有并發(fā)問(wèn)題。
基于Redis實(shí)現(xiàn)
1.Redis的setNX實(shí)現(xiàn)
Redis的setNX(set if not exists)命令是原子操作,當(dāng)鍵不存在時(shí)才設(shè)置值,設(shè)置成功則返回true,否則返回false。通過(guò)這個(gè)命令可以快速地在Redis中爭(zhēng)奪一把鎖。
利用Redis,我們可以生成一個(gè)唯一的鎖ID作為key的一部分。然后使用setNX嘗試設(shè)置key-value對(duì),value可以是過(guò)期時(shí)間戳。若設(shè)置成功,則認(rèn)為獲取鎖成功,執(zhí)行業(yè)務(wù)邏輯。在業(yè)務(wù)邏輯完成后,刪除對(duì)應(yīng)key釋放鎖,或設(shè)置過(guò)期時(shí)間自動(dòng)釋放。
@Slf4j
public class RedisDistributedLock implements AutoCloseable{
private final StringRedisTemplate stringRedisTemplate;
private final DefaultRedisScript<Boolean> unlockScript;
/**鎖的key*/
private final String lockKey;
/**鎖過(guò)期時(shí)間*/
private final Integer expireTime;
private static final String UNLOCK_LUA_SCRIPT = "if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n" +
" return redis.call(\"del\", KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockKey, Integer expireTime) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockKey = lockKey;
this.expireTime = expireTime;
// 初始化Lua解鎖腳本
this.unlockScript = new DefaultRedisScript<>();
unlockScript.setScriptText(UNLOCK_LUA_SCRIPT);
unlockScript.setResultType(Boolean.class);
}
/**
* 獲取鎖
* @return 是否獲取成功
*/
public Boolean getLock() {
String value = UUID.randomUUID().toString();
try {
return stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("獲取分布式鎖失敗: {}", e.getMessage());
return false;
}
}
/**
* 釋放鎖
* @return 是否釋放成功
*/
public Boolean unLock() {
// 使用Lua腳本進(jìn)行解鎖操作
List<String> keys = Collections.singletonList(lockKey);
Object result = stringRedisTemplate.execute(unlockScript, keys, stringRedisTemplate.opsForValue().get(lockKey));
boolean unlocked = (Boolean) result;
log.info("釋放鎖的結(jié)果: {}", unlocked);
return unlocked;
}
@Override
public void close() throws Exception {
unLock();
}
}
然后,我們?cè)谔幚韼?kù)存時(shí),先嘗試獲取鎖,如果獲取到鎖,則就可以更新庫(kù)存。
@Transactional(rollbackFor = Exception.class)
@Override
public void confirmOrderWithRedisNx(String customerNo, String orderNo) {
// 查詢訂單信息
OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);
String warehouseCode = outboundOrderDO.getWarehouseCode();
// 查詢訂單明細(xì) 假設(shè)我們的出庫(kù)訂單是一單一件
OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());
String sku = detailDO.getSku();
Integer qty = detailDO.getQty();
String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);
// 30秒過(guò)期
try (RedisDistributedLock lock = new RedisDistributedLock(stringRedisTemplate, lockKey, 30)) {
if (lock.getLock()) {
// 查詢庫(kù)存
TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);
Integer availableInventory = inventoryDO.getAvailableInventory();
// 判斷庫(kù)存是否足夠
if (qty > availableInventory){
System.err.println("庫(kù)存不足,不能出庫(kù)");
throw new ServiceException(StatusEnum.SERVICE_ERROR, "庫(kù)存不足,不能出庫(kù)");
}
// 扣減庫(kù)存
TbInventoryDO updateInventory = new TbInventoryDO();
updateInventory.setCustomerNo(customerNo);
updateInventory.setWarehouseCode(warehouseCode);
updateInventory.setSku(sku);
// 庫(kù)存差值
updateInventory.setDiffInventory(qty);
tbInventoryMapper.updateInventory(updateInventory);
} else {
log.error("更新庫(kù)存時(shí)發(fā)生并發(fā)沖突,請(qǐng)重試");
throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新庫(kù)存時(shí)發(fā)生并發(fā)沖突,請(qǐng)重試");
}
} catch (Exception e) {
log.error("處理分布式鎖時(shí)發(fā)生錯(cuò)誤: {}", e.getMessage());
}
}
圖片
Redis作為內(nèi)存數(shù)據(jù)庫(kù),其操作速度快,setNX的執(zhí)行時(shí)間幾乎可以忽略不計(jì),尤其適合高并發(fā)場(chǎng)景下的鎖請(qǐng)求。Redis作為一個(gè)可以獨(dú)立的服務(wù),可以輕松實(shí)現(xiàn)不同進(jìn)程或服務(wù)器之間的互斥鎖。而setNX命令是原子操作,能夠在Redis這一單線程環(huán)境下以原子性的方式實(shí)現(xiàn)鎖的獲取,簡(jiǎn)單一行命令即可實(shí)現(xiàn)鎖的爭(zhēng)搶。同時(shí)可以通過(guò)EX或PX參數(shù),可以在設(shè)置鎖時(shí)一并設(shè)定過(guò)期時(shí)間,避免因意外情況導(dǎo)致的死鎖。
但是單純使用setNX并不能自動(dòng)續(xù)期,一旦鎖過(guò)期而又未主動(dòng)釋放,可能出現(xiàn)鎖被其他客戶端誤獲取的情況,需要額外實(shí)現(xiàn)鎖的自動(dòng)續(xù)期機(jī)制,例如使用WATCH和MULTI命令組合,或者SET命令的新參數(shù)如SET key value PX milliseconds NX XX。而setNX在獲取不到鎖時(shí)會(huì)立即返回失敗,所以我們必須輪詢或使用某種延時(shí)重試策略來(lái)不斷嘗試獲取鎖。并且如果多個(gè)客戶端同時(shí)請(qǐng)求鎖,Redis并不會(huì)保證特定的排隊(duì)順序,可能導(dǎo)致“饑餓”現(xiàn)象(即某些客戶端始終無(wú)法獲取鎖)。
雖然Redis的setNX命令在實(shí)現(xiàn)分布式鎖方面提供了便捷性和高性能,但要構(gòu)建健壯、可靠的分布式鎖解決方案,往往還需要結(jié)合其他命令(如expire、watch、multi/exec等)以及考慮到各種邊緣情況和容錯(cuò)機(jī)制。一些成熟的Redis客戶端庫(kù)(如Redisson、Jedis)提供了封裝好的分布式鎖實(shí)現(xiàn),解決了上述許多問(wèn)題。
基于Redisson實(shí)現(xiàn)
Redisson是一個(gè)高性能、開源的Java駐內(nèi)存數(shù)據(jù)網(wǎng)格,它基于Redis,并提供了眾多分布式數(shù)據(jù)結(jié)構(gòu)和一套分布式服務(wù),例如分布式鎖、信號(hào)量、閉鎖、隊(duì)列、映射等。Redisson使得開發(fā)者能夠更容易地在Java應(yīng)用程序中使用Redis,特別是對(duì)分布式環(huán)境下的同步原語(yǔ)提供了豐富的API支持。
Redisson的分布式鎖核心原理基于Redis命令,但進(jìn)行了增強(qiáng)和封裝,提供了一種更加可靠和易于使用的分布式鎖實(shí)現(xiàn)。他實(shí)現(xiàn)分布式鎖的思路與Redis的setNx實(shí)現(xiàn)類似。但是,相比較與Redis的setNx實(shí)現(xiàn)分布式鎖,Redisson還支持可重入鎖,即同一個(gè)線程在已經(jīng)獲得鎖的情況下可以再次獲取鎖而不被阻塞。內(nèi)部通過(guò)計(jì)數(shù)器記錄持有鎖的次數(shù),每次成功獲取鎖時(shí)計(jì)數(shù)器遞增,釋放鎖時(shí)遞減,只有當(dāng)計(jì)數(shù)器歸零時(shí)才真正釋放鎖。Redisson使用了看門狗(Watchdog)機(jī)制來(lái)監(jiān)控鎖的狀態(tài),定期自動(dòng)延長(zhǎng)鎖的有效期,這樣即使持有鎖的客戶端暫時(shí)凍結(jié)或網(wǎng)絡(luò)抖動(dòng),鎖也不會(huì)因?yàn)槌瑫r(shí)而被提前釋放。并且,對(duì)于Redis集群,Redisson還可以實(shí)現(xiàn)RedLock算法,通過(guò)在多個(gè)Redis節(jié)點(diǎn)上分別獲取鎖,增加分布式鎖的可用性和容錯(cuò)能力。
我們使用Redisson實(shí)現(xiàn)分布式鎖,實(shí)現(xiàn)庫(kù)存扣減。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.7</version>
</dependency>
spring:
redisson:
address: "redis://127.0.0.1:6379"
password:
@Override
public void confirmOrderWithRedisson(String customerNo, String orderNo) {
// 查詢訂單信息
OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);
String warehouseCode = outboundOrderDO.getWarehouseCode();
// 查詢訂單明細(xì) 假設(shè)我們的出庫(kù)訂單是一單一件
OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());
String sku = detailDO.getSku();
Integer qty = detailDO.getQty();
String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);
// 30秒過(guò)期
RLock lock = redissonClient.getLock(lockKey);
try {
if (lock.tryLock(30, TimeUnit.SECONDS)) {
// 查詢庫(kù)存
TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);
Integer availableInventory = inventoryDO.getAvailableInventory();
// 判斷庫(kù)存是否足夠
if (qty > availableInventory){
System.err.println("庫(kù)存不足,不能出庫(kù)");
throw new ServiceException(StatusEnum.SERVICE_ERROR, "庫(kù)存不足,不能出庫(kù)");
}
// 扣減庫(kù)存
TbInventoryDO updateInventory = new TbInventoryDO();
updateInventory.setCustomerNo(customerNo);
updateInventory.setWarehouseCode(warehouseCode);
updateInventory.setSku(sku);
// 庫(kù)存差值
updateInventory.setDiffInventory(qty);
tbInventoryMapper.updateInventory(updateInventory);
} else {
log.error("更新庫(kù)存時(shí)發(fā)生并發(fā)沖突,請(qǐng)重試");
throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新庫(kù)存時(shí)發(fā)生并發(fā)沖突,請(qǐng)重試");
}
}catch (Exception e){
throw new ServiceException(StatusEnum.SERVICE_ERROR, "獲取分布式鎖時(shí)被中斷");
}finally {
// 無(wú)論成功與否,都要釋放鎖
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
圖片
Redisson支持多種類型的分布式鎖,包括可重入鎖(RLock)、讀寫鎖(RReadWriteLock)、公平鎖(RFairLock)等,滿足不同業(yè)務(wù)場(chǎng)景的需求。Redisson支持鎖的自動(dòng)續(xù)期功能,可以防止因?yàn)殒i持有者在業(yè)務(wù)處理過(guò)程中長(zhǎng)時(shí)間未完成而導(dǎo)致鎖過(guò)期被其他客戶端獲取。對(duì)于Redisson RedLock算法(多節(jié)點(diǎn)部署時(shí)),即使部分Redis節(jié)點(diǎn)失效,也能在大多數(shù)Redis節(jié)點(diǎn)存活的情況下維持鎖的穩(wěn)定性,增強(qiáng)了系統(tǒng)的容錯(cuò)性和高可用性。
相較于簡(jiǎn)單的數(shù)據(jù)庫(kù)悲觀鎖,Redisson的分布式鎖實(shí)現(xiàn)更為復(fù)雜。雖然Redisson提供了自動(dòng)續(xù)期機(jī)制,但如果客戶端在獲取鎖后突然崩潰且沒有正常釋放鎖,理論上仍然有可能導(dǎo)致鎖泄漏。雖然Redisson也提供了超時(shí)設(shè)置,但極端情況下仍需結(jié)人工清理機(jī)制或者其他的方案來(lái)預(yù)防此類問(wèn)題。
使用Zookeeper
在Zookeeper中實(shí)現(xiàn)分布式鎖的基本原理是利用Zookeeper的臨時(shí)節(jié)點(diǎn)和Watcher監(jiān)聽機(jī)制。
客戶端在Zookeeper中指定的某個(gè)路徑下創(chuàng)建臨時(shí)有序節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)名稱后都會(huì)附加一個(gè)唯一的遞增數(shù)字,表示節(jié)點(diǎn)的順序。當(dāng)多個(gè)客戶端同時(shí)請(qǐng)求鎖時(shí),它們都會(huì)創(chuàng)建各自的臨時(shí)有序節(jié)點(diǎn)。
客戶端按照節(jié)點(diǎn)順序判斷自己是否可以獲得鎖。節(jié)點(diǎn)順序最小的客戶端被認(rèn)為是鎖的持有者,它觀察到的序號(hào)比自己大的所有節(jié)點(diǎn)都是待解鎖的隊(duì)列。鎖的持有者繼續(xù)執(zhí)行業(yè)務(wù)邏輯,其它客戶端則會(huì)注冊(cè)Watcher監(jiān)聽比自己序號(hào)小的那個(gè)節(jié)點(diǎn)。
當(dāng)鎖持有者完成業(yè)務(wù)處理后,會(huì)刪除它創(chuàng)建的臨時(shí)節(jié)點(diǎn),Zookeeper會(huì)觸發(fā)Watcher通知等待隊(duì)列中的下一個(gè)節(jié)點(diǎn)。接收到通知的下一個(gè)節(jié)點(diǎn)發(fā)現(xiàn)其觀察的節(jié)點(diǎn)已刪除,于是重新檢查當(dāng)前路徑下剩余節(jié)點(diǎn)的順序,如果自己是現(xiàn)在最小的節(jié)點(diǎn),則認(rèn)為獲得了鎖。
Watcher機(jī)制允許客戶端監(jiān)聽Zookeeper上的節(jié)點(diǎn)變化事件,當(dāng)節(jié)點(diǎn)被創(chuàng)建、刪除、更新時(shí),Zookeeper會(huì)向注冊(cè)了相應(yīng)事件的客戶端發(fā)送通知。在分布式鎖場(chǎng)景中,客戶端通過(guò)注冊(cè)Watcher來(lái)監(jiān)聽鎖持有者的節(jié)點(diǎn)狀態(tài),以便在鎖釋放時(shí)及時(shí)獲取鎖。
圖片
而我們使用Apache Curator框架作為Zookeeper客戶端實(shí)現(xiàn)分布式鎖。Curator擁有良好的架構(gòu)設(shè)計(jì),提供了豐富的recipes(即預(yù)制模板)來(lái)實(shí)現(xiàn)常見的分布式協(xié)調(diào)任務(wù),包括共享鎖、互斥鎖、屏障、Leader選舉等。Curator的分布式鎖實(shí)現(xiàn)如InterProcessMutex和InterProcessSemaphoreMutex,直接提供了易于使用的API來(lái)獲取和釋放鎖。
Curator在實(shí)現(xiàn)分布式鎖時(shí),充分考慮了ZooKeeper的特性,比如臨時(shí)節(jié)點(diǎn)的生命周期關(guān)聯(lián)會(huì)話、有序節(jié)點(diǎn)的排序機(jī)制以及Watcher事件的通知機(jī)制等,確保在各種異常情況下,鎖的行為符合預(yù)期,例如客戶端斷線后鎖能被正確釋放。
Curator內(nèi)部集成了重試策略和背壓控制,當(dāng)ZooKeeper操作遇到網(wǎng)絡(luò)延遲或短暫的ZooKeeper集群不穩(wěn)定時(shí),Curator能夠自動(dòng)進(jìn)行重試,而不是立即拋出異常。
@Component
public class ZkLock {
private final CuratorFramework client;
private final InterProcessMutex lock;
@Value("${curator.zookeeper.connect-string}")
private String zookeeperConnectString;
public ZkLock() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(zookeeperConnectString, retryPolicy);
client.start();
// 分布式鎖路徑
String lockPath = "/locks/product_stock";
lock = new InterProcessMutex(client, lockPath);
}
public void acquireLock(Runnable task) throws Exception {
// 嘗試獲取鎖,超時(shí)時(shí)間為30秒
if (lock.acquire(30, TimeUnit.SECONDS)) {
try {
task.run(); // 在持有鎖的情況下執(zhí)行任務(wù)
} finally {
lock.release(); // 無(wú)論是否出現(xiàn)異常,都要確保釋放鎖
}
} else {
throw new Exception("獲取分布是鎖失敗");
}
}
}
使用ZkLock:
@Override
public void confirmOrderWithZk(String customerNo, String orderNo) {
// 查詢訂單信息
OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);
String warehouseCode = outboundOrderDO.getWarehouseCode();
// 查詢訂單明細(xì) 假設(shè)我們的出庫(kù)訂單是一單一件
OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());
String sku = detailDO.getSku();
Integer qty = detailDO.getQty();
String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);
// 30秒過(guò)期
zkLock.acquireLock(() -> {
// 查詢庫(kù)存
TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);
Integer availableInventory = inventoryDO.getAvailableInventory();
// 判斷庫(kù)存是否足夠
if (qty > availableInventory){
System.err.println("庫(kù)存不足,不能出庫(kù)");
throw new ServiceException(StatusEnum.SERVICE_ERROR, "庫(kù)存不足,不能出庫(kù)");
}
// 扣減庫(kù)存
TbInventoryDO updateInventory = new TbInventoryDO();
updateInventory.setCustomerNo(customerNo);
updateInventory.setWarehouseCode(warehouseCode);
updateInventory.setSku(sku);
// 庫(kù)存差值
updateInventory.setDiffInventory(qty);
tbInventoryMapper.updateInventory(updateInventory);
});
}
Apache Curator實(shí)現(xiàn)的分布式鎖適用于需要在分布式環(huán)境中實(shí)現(xiàn)強(qiáng)一致性和高可靠性的并發(fā)控制場(chǎng)景,但是它對(duì)ZooKeeper的依賴就涉及到了一些網(wǎng)絡(luò)開銷以及運(yùn)維復(fù)雜性等方面的缺點(diǎn)。
總結(jié)
分布式鎖是一種在分布式系統(tǒng)中實(shí)現(xiàn)互斥控制的機(jī)制,確保在多臺(tái)機(jī)器間,某一資源在同一時(shí)刻只被一個(gè)服務(wù)或者一個(gè)請(qǐng)求所訪問(wèn)或修改。它的核心挑戰(zhàn)在于如何保證在無(wú)中心化環(huán)境下的全局唯一性和一致性。
其實(shí)現(xiàn)主要依賴分布式存儲(chǔ)系統(tǒng)或協(xié)調(diào)服務(wù)。常見的實(shí)現(xiàn)方式有如下幾種方式:
- 基于數(shù)據(jù)庫(kù):利用數(shù)據(jù)庫(kù)事務(wù)的ACID特性,通過(guò)特定行的INSERT/UPDATE操作獲取鎖,DELETE/UPDATE操作釋放鎖。然而,可能存在性能瓶頸及高并發(fā)下數(shù)據(jù)庫(kù)連接數(shù)受限問(wèn)題。
- 基于緩存系統(tǒng)(如Redis):借助SETNX等原子操作或Lua腳本設(shè)置唯一鍵值對(duì)獲取鎖,并支持設(shè)置鎖超時(shí)以防止死鎖。這種方式具有較高的性能和內(nèi)置防死鎖機(jī)制。
- 基于ZooKeeper:利用ZooKeeper的ZNode、觀察者機(jī)制及臨時(shí)有序節(jié)點(diǎn)。服務(wù)通過(guò)創(chuàng)建臨時(shí)節(jié)點(diǎn)競(jìng)爭(zhēng)鎖,最小編號(hào)節(jié)點(diǎn)獲勝。節(jié)點(diǎn)故障時(shí),ZooKeeper自動(dòng)清理相關(guān)臨時(shí)節(jié)點(diǎn),實(shí)現(xiàn)鎖的自動(dòng)轉(zhuǎn)移。
而實(shí)際業(yè)務(wù)開發(fā)中,我們需要根據(jù)具體的業(yè)務(wù)以及系統(tǒng)資源等考慮,選擇合適的分布式鎖實(shí)現(xiàn)方式。