本地消息表:Spring Boot 實現分布式事務的優雅方案
前言
在微服務架構中,分布式事務一直是一個棘手的問題,常見的分布式事務解決方案包括:
2PC(兩階段提交)
2PC即兩階段提交協議,是將整個事務流程分為兩個階段,準備階段(Prepare phase)、提交階段(commit phase):
- 準備階段(Prepare phase):事務管理器給每個參與者發送Prepare消息,每個數據庫參與者在本地執行事務,并寫本地的Undo/Redo日志,此時事務沒有提交。 (Undo日志是記錄修改前的數據,用于數據庫回滾,Redo日志是記錄修改后的數據,用于提交事務后寫入數據文件)
- 提交階段(commit phase):如果事務管理器收到了參與者的執行失敗或者超時消息時,直接給每個參與者發送回滾(Rollback)消息;否則,發送提交(Commit)消息;參與者根據事務管理器的指令執行提交或者回滾操作,并釋放事務處理過程中使用的鎖資源。注意:必須在最后階段釋放鎖資源。
當所有參與者均反饋yes,提交事務
圖片
當任何階段1一個參與者反饋no,中斷事務
圖片
TCC(Try-Confirm-Cancel)
TCC要求每個分支事務實現三個操作:預處理Try、確認Confirm、撤銷Cancel。Try操作做業務檢查及資源預留,Confirm做業務確認操作,Cancel實現一個與Try相反的操作即回滾操作。TM首先發起所有的分支事務的try操作,任何一個分支事務的try操作執行失敗,TM將會發起所有分支事務的Cancel操作,若try操作全部成功,TM將會發起所有分支事務的Confirm操作,其中Confirm/Cancel操作若執行失敗,TM會進行重試。
- Try階段是做業務檢查(一致性)及資源預留(隔離),此階段僅是一個初步操作,它和后續的Confirm一起才能真正構成一個完整的業務邏輯。
- Confirm階段是做確認提交,Try階段所有分支事務執行成功后開始執行Confirm。通常情況下,采用TCC則認為Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。若Confirm階段真的出錯了,需引入重試機制或人工處理。
- Cancel階段是在業務執行錯誤需要回滾的狀態下執行分支事務的業務取消,預留資源釋放。通常情況下,采用TCC則認為Cancel階段也是一定成功的。若Cancel階段真的出錯了,需引入重試機制或人工處理。
- TM事務管理器可以實現為獨立的服務,也可以讓全局事務發起方充當TM的角色,TM獨立出來是為了成為公用組件,是為了考慮系統結構和軟件復用。
當Try階段服務全部正常執行, 執行確認業務邏輯操作
圖片
當Try階段存在服務執行失敗, 進入Cancel階段
圖片
可靠消息最終一致性
可靠消息最終一致性方案是指當事務發起方執行完成本地事務后并發出一條消息,事務參與方(消息消費者)一定能夠接收消息并處理事務成功,此方案強調的是只要消息發給事務參與方最終事務要達到一致。
正常情況——事務主動方發消息
圖片
異常情況——事務主動方消息恢復
圖片
本地消息表
本地消息表這個方案最初是eBay提出的,此方案的核心是通過本地事務保證數據業務操作和消息的一致性,然后通過定時任務將消息發送至消息中間件,待確認消息發送給消費方成功再將消息刪除。
本地消息表的核心思想是:將分布式事務拆分為多個本地事務,并通過消息表記錄事務狀態。具體流程如下:
- 業務操作與消息記錄:在同一個本地事務中,執行業務操作并記錄消息到本地數據庫的消息表中
- 消息發送:事務提交后,通過定時任務或事件監聽機制,將消息發送到消息隊列
- 消息消費:下游服務從消息隊列消費消息并執行業務操作
- 消息確認:下游服務處理完成后,通過某種機制確認消息已處理
圖片
這種方案的最大優勢是將分布式事務轉換為本地事務,利用數據庫的ACID特性保證業務操作和消息記錄的原子性。
實現步驟
本地消息表因其實現簡單、可靠性高、性能良好,成為中小型項目的首選方案,本文將詳細介紹本地消息表的原理,并結合Spring Boot提供完整的實現方案。
數據庫設計
-- 消息表結構
CREATE TABLE message (
id VARCHAR(32) PRIMARY KEY,
content TEXT NOT NULL,
topic VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL COMMENT 'INIT, SENDING, SENT, FAILED',
retry_count INT DEFAULT 0,
next_retry_time DATETIME,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
INDEX idx_status (status),
INDEX idx_next_retry_time (next_retry_time)
);
-- 業務表示例(如訂單表)
CREATE TABLE orders (
id VARCHAR(32) PRIMARY KEY,
user_id VARCHAR(32) NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
status VARCHAR(20) NOT NULL,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL
);
消息結構
public enum MessageStatus {
INIT("初始化"),
SENDING("發送中"),
SENT("已發送"),
FAILED("發送失敗");
private final String description;
MessageStatus(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
public interface MessageRepository extends JpaRepository<Message, String> {
List<Message> findByStatusAndNextRetryTimeLessThanEqual(String status, LocalDateTime now);
@Transactional
@Modifying
@Query("UPDATE Message m SET m.status = ?2, m.updateTime = ?3 WHERE m.id = ?1")
int updateStatus(String id, String status, LocalDateTime updateTime);
@Transactional
@Modifying
@Query("UPDATE Message m SET m.status = ?2, m.retryCount = m.retryCount + 1, " +
"m.nextRetryTime = ?3, m.updateTime = ?3 WHERE m.id = ?1")
int updateForRetry(String id, String status, LocalDateTime nextRetryTime);
@Transactional
@Modifying
@Query("DELETE FROM Message m WHERE m.createTime < ?1")
int deleteOlderThan(LocalDateTime threshold);
}
- MessageStatus枚舉:定義消息的四種狀態
- MessageRepository接口:繼承JPA的JpaRepository,提供基本CRUD操作,并定義自定義查詢方法
消息服務實現
@Service
public class MessageServiceImpl implements MessageService {
@Autowired
private MessageRepository messageRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${message.retry.interval:60000}")
private long retryInterval;
@Value("${message.max.retry:10}")
private int maxRetry;
@Value("${message.retention.days:30}")
private int retentionDays;
@Override
@Transactional
public Message saveMessage(String content, String topic) {
Message message = new Message();
message.setId(UUID.randomUUID().toString().replace("-", ""));
message.setContent(content);
message.setTopic(topic);
message.setStatus(MessageStatus.INIT.name());
message.setCreateTime(LocalDateTime.now());
message.setUpdateTime(LocalDateTime.now());
return messageRepository.save(message);
}
@Override
public void sendMessage(Message message) {
try {
// 更新消息狀態為發送中
messageRepository.updateStatus(message.getId(), MessageStatus.SENDING.name(), LocalDateTime.now());
// 發送消息到RabbitMQ
rabbitTemplate.convertAndSend(message.getTopic(), message.getContent());
// 更新消息狀態為已發送
messageRepository.updateStatus(message.getId(), MessageStatus.SENT.name(), LocalDateTime.now());
} catch (Exception e) {
// 發送失敗,更新重試信息
handleSendFailure(message, e);
}
}
@Override
public void processPendingMessages() {
List<Message> pendingMessages = messageRepository.findByStatusAndNextRetryTimeLessThanEqual(
MessageStatus.INIT, LocalDateTime.now());
for (Message message : pendingMessages) {
sendMessage(message);
}
}
@Override
public void retryFailedMessages() {
List<Message> failedMessages = messageRepository.findByStatusAndNextRetryTimeLessThanEqual(
MessageStatus.FAILED, LocalDateTime.now());
for (Message message : failedMessages) {
if (message.getRetryCount() < maxRetry) {
try {
// 更新消息狀態為發送中
messageRepository.updateStatus(message.getId(), MessageStatus.SENDING.name(), LocalDateTime.now());
// 重試發送消息
rabbitTemplate.convertAndSend(message.getTopic(), message.getContent());
// 更新消息狀態為已發送
messageRepository.updateStatus(message.getId(), MessageStatus.SENT.name(), LocalDateTime.now());
} catch (Exception e) {
// 重試失敗,更新重試信息
handleSendFailure(message, e);
}
} else {
// 超過最大重試次數,記錄日志并標記為永久失敗
// 可以添加告警機制
System.err.println("Message exceeded max retry count: " + message.getId());
}
}
}
@Override
public void cleanOldMessages() {
LocalDateTime threshold = LocalDateTime.now().minusDays(retentionDays);
int deletedCount = messageRepository.deleteOlderThan(threshold);
System.out.println("Deleted " + deletedCount + " old messages");
}
@Override
public void updateMessageStatus(String messageId, MessageStatus status) {
messageRepository.updateStatus(messageId, status.name(), LocalDateTime.now());
}
private void handleSendFailure(Message message, Exception e) {
// 計算下一次重試時間
LocalDateTime nextRetryTime = LocalDateTime.now().plusSeconds(retryInterval);
// 更新消息狀態為失敗并增加重試次數
messageRepository.updateForRetry(
message.getId(),
MessageStatus.FAILED.name(),
nextRetryTime
);
// 記錄錯誤日志
System.err.println("Failed to send message: " + message.getId() + ", error: " + e.getMessage());
}
}
- saveMessage:創建并保存新消息,設置初始狀態為INIT
- sendMessage:發送消息到消息隊列,處理發送成功和失敗的情況
- processPendingMessages:處理待發送的消息
- retryFailedMessages:重試發送失敗的消息,實現最大重試次數控制
- cleanOldMessages:清理過期消息,防止消息表過大
- handleSendFailure:處理發送失敗的情況,更新重試信息
定時任務配置
@Component
public class MessageScheduler {
@Autowired
private MessageService messageService;
/**
* 定時處理待發送的消息
*/
@Scheduled(fixedRate = 10000) // 每10秒執行一次
public void processPendingMessages() {
messageService.processPendingMessages();
}
/**
* 定時重試失敗的消息
*/
@Scheduled(fixedRate = 30000) // 每30秒執行一次
public void retryFailedMessages() {
messageService.retryFailedMessages();
}
/**
* 定時清理過期消息
*/
@Scheduled(fixedRateString = "${message.clean.interval}")
public void cleanOldMessages() {
messageService.cleanOldMessages();
}
}
- processPendingMessages:每 10 秒檢查一次待發送的消息
- retryFailedMessages:每 30 秒檢查一次需要重試的消息
- cleanOldMessages:按配置的間隔清理過期消息
業務邏輯
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private MessageService messageService;
@Override
@Transactional
public Order createOrder(String userId, Double amount) {
// 創建訂單
Order order = new Order();
order.setId(UUID.randomUUID().toString().replace("-", ""));
order.setUserId(userId);
order.setAmount(amount);
order.setStatus("CREATED");
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
// 保存訂單
orderRepository.save(order);
// 在同一事務中保存消息(關鍵:確保訂單創建和消息記錄在同一個本地事務中)
String messageContent = String.format("{\"orderId\":\"%s\",\"userId\":\"%s\",\"amount\":%.2f}",
order.getId(), userId, amount);
messageService.saveMessage(messageContent, "order.created");
return order;
}
}
消息消費處理
@Component
public class OrderCreatedConsumer {
@Autowired
private PaymentService paymentService;
@Autowired
private MessageService messageService;
@Autowired
private ObjectMapper objectMapper;
@RabbitListener(queues = "order.created")
public void handleOrderCreated(String message) {
try {
// 解析消息內容
Map<String, Object> messageData = objectMapper.readValue(message, Map.class);
String orderId = (String) messageData.get("orderId");
String userId = (String) messageData.get("userId");
Double amount = Double.parseDouble(messageData.get("amount").toString());
// 處理訂單創建事件(例如:創建支付記錄)
paymentService.createPayment(orderId, userId, amount);
// 可以在這里添加其他業務邏輯,如扣減庫存、發送通知等
} catch (Exception e) {
// 消費失敗,記錄日志
System.err.println("Failed to process order created message: " + e.getMessage());
// 注意:RabbitMQ默認會重試,可能導致消息重復消費,需要在業務層處理冪等性
}
}
}
@Service
public class PaymentServiceImpl implements PaymentService {
@Override
@Transactional
public void createPayment(String orderId, String userId, Double amount) {
// 實現支付邏輯
// 例如:創建支付記錄、調用支付網關、更新訂單狀態等
System.out.println("Creating payment for order: " + orderId + ", amount: " + amount);
// 這里只是示例,實際應用中需要根據業務需求實現具體邏輯
}
}
- @RabbitListener注解:監聽指定隊列的消息
- 業務處理:根據消息內容執行業務邏輯
?
冪等性設計:消費端需要確保業務操作的冪等性,防止重復消費導致的數據不一致。
解決接口冪等問題,只需要記住一句口令"一鎖、二判、三更新",只要嚴格遵守這個過程,那么就可以解決并發問題。
//一鎖:先加一個分布式鎖
@DistributeLock(scene = "OEDER", keyExpression = "#request.identifier", expire = 3000)
public OrderResponse apply(OrderRequest request) {
OrderResponse response = new OrderResponse();
//二判:判斷請求是否執行成功過
OrderDTO orderDTO = orderService.queryOrder(request.getProduct(), request.getIdentifier());
if (orderDTO != null) {
response.setSuccess(true);
response.setResponseCode("DUPLICATED");
return response;
}
//三更新:執行更新的業務邏輯
return orderService.order(request);
}
總結
本地消息表方案的核心在于:
- 將業務操作和消息記錄放在同一個本地事務中,確保原子性
- 通過定時任務異步發送消息,避免阻塞業務流程
- 實現消息重試機制,提高消息發送成功率
- 消費端實現冪等性,保證數據一致性