高并發編程/消息傳遞機制避免鎖提高并發效率,不懂的趕緊進來(設計篇)
在現代軟件開發中,隨著多核處理器的普及和分布式系統的擴展,傳統的基于共享內存的并發模型正面臨越來越多的挑戰。消息傳遞機制作為一種替代方案,以其獨特的異步通信和無共享狀態的特性,為構建高效、可擴展和健壯的系統提供了新的思路。它通過將數據操作封裝在消息中,允許系統組件以松耦合的方式進行交互,從而減少了鎖的需求和競態條件的風險。本文將深入探討消息傳遞機制的原理、優勢以及如何在實際應用中實現這一模式,幫助讀者理解其在解決并發問題中的重要作用。
1、并發問題
1.1 問題描述
在并發環境中,兩個線程同時對計數器進行操作,線程1減少2,線程2減少9。由于缺乏同步,兩個線程都認為計數器值大于需要減少的值,最終導致計數器變為-1,這違反了業務規則,因為庫存不能為負數,表示過度分配。
1.2 解決方案
- 使用原子操作鎖定檢查和遞減步驟,確保操作的原子性。
因為傳統并發模式中,共享內存是傾向于強一致性弱隔離性的,例如悲觀鎖同步的方式就是使用強一致性的方式控制并發,
- 采用消息傳遞機制代替共享內存,減少鎖的使用。
使用共享數據的并發編程面臨的最大問題是數據條件競爭 data race,消息傳遞機制最大的優勢在于不會產生數據競爭狀態。而實現消息傳遞有兩種常見類型:基于 channel的消息傳遞、基于 Actor的消息傳遞。
1.3 為什么消息傳遞機制能減少鎖
消息傳遞機制能夠減少或消除對鎖的需求,主要是因為它改變了并發編程的范式,從直接操作共享狀態轉變為通過消息傳遞來協調操作。以下是消息傳遞機制如何實現這一點的幾個關鍵點:
- 分解任務:
在消息傳遞模型中,復雜的任務被分解成一系列更小的、可以獨立處理的任務單元(消息)。這些任務單元被發送到消息隊列中,而不是直接操作共享狀態。
- 無共享狀態:
每個線程或進程處理自己的任務單元,而不直接訪問或修改共享狀態。這樣,就避免了多個線程同時修改同一共享變量的情況,從而減少了鎖的需求。
消費者處理:
消費者線程從消息隊列中取出任務單元進行處理。由于每個任務單元是獨立的,消費者之間不需要同步,因為它們不會同時處理同一個任務單元。
線程安全:
消息隊列本身是線程安全的,它保證了消息的順序性和原子性,確保了消息的正確傳遞和處理。
并發性:
由于任務單元是獨立的,多個消費者可以并發地從消息隊列中取出任務單元進行處理,提高了系統的并發性和吞吐量。
解耦合:
消息傳遞機制使得生產者和消費者之間的耦合度降低,它們不需要知道對方的具體實現,只需要知道如何發送和接收消息。
容錯性:
如果某個消費者處理任務單元失敗,這不會影響其他消費者處理其他任務單元。這種機制提高了系統的容錯性。
1.4 消息傳遞機制的類型
基于Channel的消息傳遞:在Go語言中廣泛使用,通過channel實現goroutine之間的通信。
基于Actor的消息傳遞:在Akka框架中實現,每個Actor是一個并發執行的實體,通過消息傳遞進行通信。
1.5 消息傳遞機制避免鎖模型圖
圖片
說明:
- 生產者(Producer) :在業務邏輯中,當需要減少庫存時,生產者將減少庫存的請求封裝成一條消息,并發送到消息隊列中,而不是直接操作共享庫存狀態。
- 消息隊列(Message Queue) :消息隊列是生產者和消費者之間的中介,它負責存儲和傳遞消息。在這個例子中,消息隊列確保了消息的順序性和獨立性,使得每個減少庫存的請求都是獨立的。
- 消費者(Consumer) :消費者從消息隊列中取出消息,并根據消息內容執行相應的操作(在這個例子中是減少庫存)。由于每個消息都是獨立的,消費者不需要與生產者或其他消費者同步,因此避免了鎖的使用。
優勢:
- 無共享狀態:庫存狀態不再被多個線程共享,每個減少庫存的操作都是通過消息傳遞來協調的。
- 線程安全:由于消費者處理的是消息隊列中的消息,而不是直接操作共享狀態,因此不需要使用鎖來保證線程安全。
- 并發性:多個生產者可以并發地發送消息,多個消費者也可以并發地從消息隊列中取出和處理消息,提高了系統的并發處理能力。
1.6 消息傳遞機制避免鎖設計案例
業務:庫存管理
假設我們有一個在線商店,需要管理商品的庫存。在高并發環境下,多個客戶可能同時嘗試購買同一件商品,這就要求我們確保庫存的減少是線程安全的,以避免庫存變為負數。
傳統解決方案(使用鎖)
在傳統的解決方案中,我們可能會使用一個共享的庫存計數器,并在減少庫存的方法上加上同步鎖:
public class Inventory {
private int stock = 100;
public synchronized void reduceStock(int amount) {
if (stock >= amount) {
stock -= amount;
} else {
throw new IllegalArgumentException("庫存不足");
}
}
public synchronized int getStock() {
return stock;
}
}
在這個例子中, reduceStock 和 getStock 方法都被聲明為 synchronized,確保了在同一時間只有一個線程可以修改或讀取庫存。
使用消息傳遞機制的解決方案
現在,讓我們使用消息傳遞機制來重構這個庫存管理的業務邏輯,避免使用鎖:
import java.util.concurrent.ConcurrentLinkedQueue;
public class InventoryManager {
private final ConcurrentLinkedQueue<InventoryCommand> commandQueue = new ConcurrentLinkedQueue<>();
public void processCommands() {
while (!Thread.currentThread().isInterrupted()) {
InventoryCommand command = commandQueue.poll();
if (command != null) {
command.execute();
}
}
}
public void reduceStock(int amount) {
commandQueue.offer(new InventoryCommand(amount));
}
private static class InventoryCommand {
private final int amount;
private int stock = 100; // 每個命令有自己的庫存副本
public InventoryCommand(int amount) {
this.amount = amount;
}
public void execute() {
if (stock >= amount) {
stock -= amount;
System.out.println("庫存減少 " + amount + ",當前庫存 " + stock);
} else {
System.out.println("庫存不足,無法減少 " + amount);
}
}
}
}
public class Main {
public static void main(String[] args) {
InventoryManager manager = new InventoryManager();
Thread commandProcessor = new Thread(manager::processCommands);
commandProcessor.start();
// 模擬多個線程減少庫存
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> manager.reduceStock(20)).start();
}
// 等待命令處理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
commandProcessor.interrupt();
}
}
解釋
在這個改進的例子中:
- InventoryCommand 是一個包含庫存減少邏輯的類,每個命令都有自己的庫存副本。這意味著每個命令處理自己的庫存狀態,而不是共享一個全局的庫存狀態。
- reduceStock 方法將減少庫存的操作封裝為一個 InventoryCommand 對象,并將其添加到命令隊列中。
- processCommands 方法從隊列中取出命令并執行,由于每個命令處理自己的庫存副本,因此不需要使用鎖。
- 這里 privateintstock=100;定義在 InventoryCommand類中,使得每個 InventoryCommand對象都有自己的庫存副本,這樣做的主要目的是為了避免鎖的使用,并實現以下幾個關鍵點:
- 在消息傳遞模型中,每個消息(命令)的處理是獨立的,一個命令的失敗不會影響到其他命令的執行,從而提高了系統的容錯性。
- 避免使用鎖可以減少線程間的協調開銷,提高系統的吞吐量和響應性。在多核處理器上,無鎖的設計可以更好地利用硬件資源,提高并行處理能力。
- 在傳統的并發編程中,通常需要使用鎖(如 synchronized塊或 ReentrantLock)來保護對共享資源的訪問。通過為每個任務提供獨立的數據副本,可以避免這些復雜的并發控制機制,簡化編程模型。
- 由于每個命令操作的是自己的庫存副本,不存在多個線程同時修改同一共享變量的情況,從而避免了并發修改導致的數據不一致問題,也就不需要使用鎖來保證線程安全。
- 每個 InventoryCommand對象管理自己的庫存狀態,不依賴于全局共享的庫存狀態。這意味著不同的消息(命令)之間不會直接競爭或沖突,因為它們各自操作自己的數據副本。
- 無共享狀態:
- 線程安全:
- 簡化并發控制:
- 提高性能和可擴展性:
- 容錯性:
替代方案:使用不可變對象
另一種避免鎖的方法是使用不可變對象。不可變對象一旦創建,其狀態就不能被改變,因此天生是線程安全的,不需要使用鎖。例如,我們可以定義一個不可變的庫存命令對象:
public final class InventoryCommand {
private final int amount;
private final int newStock;
public InventoryCommand(int amount, int currentStock) {
this.amount = amount;
this.newStock = currentStock - amount;
}
public int getNewStock() {
return newStock;
}
public int getAmount() {
return amount;
}
}
在這個版本中, InventoryCommand對象在創建時就計算了新的庫存值,并且這個值是不可變的。處理命令時,我們只需讀取命令的屬性,而不需要修改它:
public void processCommands() {
while (!Thread.currentThread().isInterrupted()) {
InventoryCommand command = commandQueue.poll();
if (command != null) {
int newStock = command.getNewStock();
System.out.println("庫存減少 " + command.getAmount() + ",當前庫存 " + newStock);
}
}
}
這種方法進一步簡化了設計,因為命令對象本身不包含任何可變狀態,從而完全避免了鎖的需求。
1.7. 結論
消息傳遞機制通過改變并發編程的范式,從直接操作共享狀態轉變為通過消息傳遞來協調操作,從而減少了鎖的使用,提高了系統的并發性和容錯性。這種機制特別適用于需要高吞吐量和高可靠性的分布式系統。