Seata如何實現兩階段提交(2PC)分布式事務
介紹
2PC,全稱為兩階段提交(Two-Phase Commit),是一種在分布式系統中用來保證事務原子性和一致性的協議。它主要用于協調分布式數據庫或分布式事務環境中的多個參與者,確保所有參與者要么一起成功提交事務,要么一起回滾事務,以保持數據的一致性。
圖片
在2PC協議中有兩個主要階段:
- 準備階段(Prepare Phase):
事務協調器接收到發起事務的客戶端請求后,向所有參與該事務的資源管理器(例如數據庫、服務節點等)發送“準備提交”請求。
每個資源管理器執行事務操作,并將事務相關的更改鎖定但不提交,然后回復事務協調器它們是否準備好提交事務(根據各自是否能夠成功完成事務而定)。
- 提交階段(Commit Phase):
如果事務協調器收到了所有資源管理器的肯定答復,即所有參與者都準備好提交事務,則向所有參與者發出“正式提交”指令。
若協調器收到任何一個參與者的否定響應,或者在等待超時后仍有參與者未響應,則向所有參與者發出“回滾事務”的指令。
通過這種方式,2PC確保了所有節點要么全部完成事務,要么全部撤銷事務,從而維護了分布式環境下的事務原子性。然而,2PC也存在一些缺點,比如單點故障問題(即事務協調器宕機可能導致事務長期阻塞)、網絡分區情況下的不確定性以及性能上的潛在瓶頸。
Seata把一個分布式事務理解成一個包含了若干分支事務的全局事務。全局事務的職責是協調其下管轄的分支事務 達成一致,要么一起成功提交,要么一起失敗回滾。此外,通常分支事務本身就是一個關系數據庫的本地事務,下圖是全局事務與分支事務的關系圖:
圖片
與 傳統2PC 的模型類似,Seata定義了3個組件來協議分布式事務的處理過程
圖片
- Transaction Coordinator (TC):事務協調器,它是獨立的中間件,需要獨立部署運行,它維護全局事務的運行狀態,接收TM指令發起全局事務的提交與回滾,負責與RM通信協調各各分支事務的提交或回滾。
- Transaction Manager (TM):事務管理器,TM需要嵌入應用程序中工作,它負責開啟一個全局事務,并最終向TC發起全局提交或全局回滾的指令。
- Resource Manager (RM):控制分支事務,負責分支注冊、狀態匯報,并接收事務協調器TC的指令,驅動分支(本地)事務的提交和回滾。
具體實現
案例分析:兩個賬戶在不同的銀行(張三在bank1、李四在bank2),bank1和bank2是兩個微服務。交易過程是,張三給李四轉賬指定金額。
上述交易步驟,要么一起成功,要么一起失敗,必須是一個整體性的事務。
圖片
為了簡化環境搭建,小編這里采用file啟動seata,項目搭建也只是兩個普通的SpringBoot項目,未使用微服務。
下載seata服務器
官方下載地址:https://github.com/seata/seata/releases
- registry.type=file:
registry.type=file 其類型設置為 file 時,意味著 Seata 的服務注冊中心不依賴于外部的如 Nacos、Eureka、Zookeeper 等第三方注冊中心,而是使用本地文件的方式來存儲和管理服務節點信息。這種模式主要用于快速測試或簡單的單機部署場景,因為在這種模式下無法自動發現和管理集群環境中的其他 Seata Server 節點,不具備高可用性。
- config.type=file:
config.type=file 表示 Seata 使用本地文件作為配置源。這意味著 Seata 會從指定的本地文件中讀取全局事務協調器(TC)、事務管理器(TM)和資源管理器(RM)等組件所需的配置信息,而不是通過Nacos、Apollo或其他遠程配置中心獲取配置。這種方式同樣適用于快速驗證和簡單部署情況,實際生產環境中可能需要結合分布式配置中心來動態更新和管理配置。
- seata安裝初始化參考《SpringCloud Alibaba微服務實戰之環境準備》,注意本次啟動是采用file方式啟動
- seata啟動:/bin/seata-server.bat -m file
圖片
- bank-1 和 bank-2啟動:
圖片
bank-1 和 bank-2服務搭建
庫表建立
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額',
PRIMARY KEY (`id`) USING BTREE )
ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '張三的賬戶', '1', '', 10000);
CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額',
PRIMARY KEY (`id`) USING BTREE)
ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的賬戶', '2', NULL, 0);
備注:分別在bank1、bank2庫中創建undo_log表,此表為seata框架使用
依賴引入
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.1</version>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--bank-2 不需要-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
</dependencies>
定義配置
server:
port: 8081
#port: 8082
spring:
application:
name: bank-1
#name: bank-2
datasource:
url: jdbc:mysql://localhost:3306/bank1?characterEncoding=utf8&useSSL=false
#url: jdbc:mysql://localhost:3306/bank2?characterEncoding=utf8&useSSL=false
driver-class-name: com.mysql.jdbc.Driver
username: root
password: root
seata:
tx-service-group: order_tx_group #自定義事務組名稱需要與seata-server中的對應
service:
vgroup-mapping:
order_tx_group: default # TC 集群(必須與seata-server保持一致)
定義mapper
# bank-1
@Update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountNo}")
int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
# bank-2
@Update("UPDATE account_info SET account_balance = account_balance + #{amount} WHERE account_no = #{accountNo}")
int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
服務調用
bank-1:
@GlobalTransactional
@Override
public void updateAccountBalance(String accountNo, Double amount) {
log.info("******** Bank1 Service Begin ... xid: {}" , RootContext.getXID());
//張三扣減金額
baseMapper.updateAccountBalance(accountNo,amount * -1);
//向李四轉賬
CloseableHttpClient httpclient = HttpClients.createDefault();
HttpGet httpget = new HttpGet("http://localhost:8082/bank2/transfer?amount="+amount);
httpget.addHeader(RootContext.KEY_XID,RootContext.getXID());
try{
CloseableHttpResponse response = httpclient.execute(httpget);
HttpEntity entity = response.getEntity();
String result = EntityUtils.toString(entity);
log.info("bank2 服務返回結果:"+result);
}catch (Exception e){
throw new RuntimeException("bank2 服務異常");
}
//人為制造錯誤
if(amount > 100){
throw new RuntimeException("bank1 make exception amount > 100");
}
}
當業務方法開啟全局異常處理器后,TM注冊到TC獲取到一個XID,此時在業務中,服務遠程訪問時,此XID會被下面分支業務方法RM接收到,當各個方法處理完成后RM會向TC直接交互把結果通過XID通知給TC,最后業務方法結束后,TM會通知TC業務已經完成,TC會根據RM通知的結果來通知各個RM提交或者回滾。但是在分布式事務中,入口TM傳出時不會將XID放入請求頭中向其他服務傳遞,這樣就導致全局異常捕獲失效,因此需要手動將XID設置到請求頭中,攜帶給各分支業務來避免事務失效問題。
bank-2:
@Transactional
@Override
public void updateAccountBalance(String accountNo, Double amount) {
log.info("******** Bank2 Service Begin ... xid: {}" , RootContext.getXID());
//李四增加金額
baseMapper.updateAccountBalance(accountNo,amount);
//制造異常
if(amount < 100){
throw new RuntimeException("bank1 make exception amount < 100");
}
}
服務配置seata
file.conf:
圖片
registry.conf:
圖片
執行流程
正常流程:
圖片
回滾流程:
圖片