Spring Boot 多數據源如何處理事務?教你一招!
首先我先聲明一點,本文單純就是技術探討,要從實際應用中來說的話,我并不建議這樣去玩分布式事務、也不建議這樣去玩多數據源,畢竟分布式事務主要還是用在微服務場景下。
好啦,那就不廢話了,開整。
1. 思路梳理
首先我們來梳理一下思路。
在上篇文章中,我們是一個微服務,在 A 中分別去調用 B 和 C,當 B 或者 C 有一個執行失敗的時候,就去回滾。B 和 C 都是調用遠程的服務,所謂的回滾也不是傳統意義上的數據庫回滾,而是一種“反向補償”,即利用一條更新 SQL,將已經更新的數據復原。在這個例子中,B 和 C 都是遠程服務,操作的也都是不同的數據庫,這不就是我們多數據源中的情況么!
在微服務中,一個服務實際上就代表了一個數據源,而在我們多數據源的案例中,一個注解就能標記出來一個數據源,這樣一類比,你就會發現利用分布式事務來解決多數據源中的事務問題其實是非常 Easy 的。而且這里還不是微服務項目,只是一個單體項目,更簡單!
不過也有一些需要注意的細節。
2. 代碼實踐
接下來我們就結合代碼來講講。
2.1 案例準備
首先多數據源的案例我就不重復寫了,我們之前已經寫過一個,這里就不再贅述,文章一開頭也有相關的鏈接,還沒看過的小伙伴可以先看看。
也可以直接在公眾號后臺回復 dynamic_datasource 獲取相關的案例。
2.2 開始整活
因為上篇文章我主要是和大家分享的 seata 的 AT 模式,所以本文也是一樣,就先采用 AT 模式。
小伙伴們知道,在我們的多數據源案例中,我們用到了兩個庫,test08 和 test09,現在也還是這兩個庫,但是現在由于我們使用的是 AT 模式,我們需要在這兩個庫中分別創建 undo log 表,用來記錄我們對表的更新操作,當事務提交之后,undo log 表中的數據就會被清除,undo log,undo log 表的腳本如下:
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
數據庫準備好之后,接下來就是準備依賴了,seata 有兩個依賴,一個是 seata-all,還有一個微服務版的,咱們這里就直接使用上篇文章中所用到的微服務版的,依賴如下:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
配好之后,接下來提供兩個配置文件 file.conf 和 regsigry.conf,這兩個配置文件和上篇文章中介紹到的一模一樣,這里不再贅述。
接下來配置 application.yaml,如下:
spring:
cloud:
alibaba:
seata:
tx-service-group: my_test_tx_group
main:
allow-circular-references: true
seata:
enable-auto-data-source-proxy: false
application-id: dd
大家看下這里的幾個配置:
- tx-service-group:這個是事務群組的名稱,相關名字是在 file.conf 中配置的。
- allow-circular-references:這個是允許循環依賴,可能有的小伙伴已經知道,現在最新版的 Spring Boot 中已經禁掉了循環依賴,但是這個 seata 中似乎還是用到了循環依賴,所以要開啟。
- enable-auto-data-source-proxy:由于 seata 會自動代理數據源,但是我們現在的數據源是自己加載的,所以關閉掉這個數據源的自動代理,將來用自己的。
- application-id:給我們的應用取一個名字。
好啦,這個文件就配置好了。
接下來就是數據源問題了,剛剛說了,seata 中會自動代理數據源,用到的代理對象是 DataSourceProxy,而我們在之前自定義的數據源加載中,并沒有用到這個 DataSourceProxy 對象所以這里要稍作修改,一共改兩個地方,如下:
LoadDataSource.java
@Component
@EnableConfigurationProperties(DruidProperties.class)
public class LoadDataSource {
@Autowired
DruidProperties druidProperties;
public Map<String, DataSourceProxy> loadAllDataSource() {
Map<String, DataSourceProxy> map = new HashMap<>();
Map<String, Map<String, String>> ds = druidProperties.getDs();
try {
Set<String> keySet = ds.keySet();
for (String key : keySet) {
DataSource dataSource = druidProperties.dataSource((DruidDataSource) DruidDataSourceFactory.createDataSource(ds.get(key)));
DataSourceProxy proxyDs = new DataSourceProxy(dataSource);
map.put(key, proxyDs);
}
} catch (Exception e) {
e.printStackTrace();
}
return map;
}
}
其實這里的改動就是把之前的 DataSource 用 DataSourceProxy 重新包裹一下,然后將獲取到的 DataSourceProxy 存起來。最后再修改一下動態數據源的地方:
@Component
public class DynamicDataSource extends AbstractRoutingDataSource {
public DynamicDataSource(LoadDataSource loadDataSource) {
//1.設置所有的數據源
Map<String, DataSourceProxy> allDs = loadDataSource.loadAllDataSource();
super.setTargetDataSources(new HashMap<>(allDs));
//2.設置默認的數據源
//將來,并不是所有的方法上都有 @DataSource 注解,對于那些沒有 @DataSource 注解的方法,該使用哪個數據源?
super.setDefaultTargetDataSource(allDs.get(DataSourceType.DEFAULT_DS_NAME));
//3
super.afterPropertiesSet();
}
/**
* 這個方法用來返回數據源名稱,當系統需要獲取數據源的時候,會自動調用該方法獲取數據源的名稱
* @return
*/
@Override
protected Object determineCurrentLookupKey() {
return DynamicDataSourceContextHolder.getDataSourceType();
}
}
Map 中的 value 類型變為 DataSourceProxy,其他都不變。
另外還有一個地方要改造下,就是解析 @DataSource 注解的切面,在之前的解析中,我們是將異常捕獲了,現在我們要將之拋出來,如下:
@Around("pc()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
//獲取方法上面的有效注解
DataSource dataSource = getDataSource(pjp);
if (dataSource != null) {
//獲取注解中數據源的名稱
String value = dataSource.value();
DynamicDataSourceContextHolder.setDataSourceType(value);
}
try {
return pjp.proceed();
} finally {
DynamicDataSourceContextHolder.clearDataSourceType();
}
}
將之拋出來的原因也很簡單,因為這是切面方法,所有的 service 層方法都在這里執行,如果將異常捕獲了,將來 service 層方法不拋出異常,事務就沒法生效了。
好了,現在準備工作就算是到位了。
接下來我們寫一個簡單的多數據源事務的案例,首先我們來創建一個 MasterService,專門用來操作 master 數據源:
@Service
public class MasterService {
@Autowired
MasterMapper masterMapper;
@DataSource("master")
public void addUser(String username, Integer age) {
masterMapper.addUser(username, age);
}
}
mapper 就不用看了吧,就是普通的添加,大家可以在文末下載本文案例案例。
再來一個 SlaveService,用來操作 slave 數據源:
@Service
public class SlaveService {
@Autowired
SlaveMapper slaveMapper;
@DataSource("slave")
public void addAccount(String name, Double balance) {
int i = 1 / 0;
slaveMapper.addAccount(name, balance);
}
}
slave 數據源的方法中有一個異常。
最后,我們在 UserService 中分別調用這兩個方法:
@Service
public class UserService {
@Autowired
MasterService masterService;
@Autowired
SlaveService slaveService;
@GlobalTransactional(rollbackFor = Exception.class)
public void test() {
masterService.addUser("javaboy.org", 99);
slaveService.addAccount("javaboy.org", 99.0);
}
}
注意,test 方法上有一個全局事務注解。
好啦,齊活!現在我們去執行這個 test 方法,由于 slaveService#addAccount 中的方法會拋出異常,所以會導致整個事務回滾,最終的結果就是 master 中也沒有添加進數據。
3. 總結
好啦,結合上一篇文章,相信大家應該能夠熟練的使用 seata 分布式事務中的 at 模式了吧!