Spring聲明式事務源碼詳解
維護公司之前的后臺管理系統,在開發自測時發現mock接口失敗,數據庫仍插入成功。經排查發現代碼中沒有指定具體事務,在添加注解@Transactional后事務按預期生效回滾。為此,本文通過分析源碼來介紹下事務失效的根源。
1.事務的幾個小小例子
1.1 未添加事務,異常未回滾
代碼未執行前數據庫信息:
圖片
@Component
public class TransactionalTest {
@Resource
BasicPriceUploadRecordMapper basicPriceUploadRecordMapper ;
public void onAddTransactionToException() {
BasicPriceUploadRecord base = new BasicPriceUploadRecord();
base.setErrorMsg("未添加注解事務拋異常");
base.setId(1824040965380245002L);
basicPriceUploadRecordMapper.updateByPrimaryKeySelective(base);
throw new RuntimeException();
}
}
代碼執行后數據庫信息:
圖片
雖然拋出異常,但數據庫仍修改成功了。
1.2 添加注解事務,異常回滾
代碼未執行前數據庫信息:
圖片
@Component
public class TransactionalTest {
@Resource
BasicPriceUploadRecordMapper basicPriceUploadRecordMapper ;
/**
* 添加聲明式注解,出現異常情況
*/
@Transactional(transactionManager = "valuationTransactionManager", rollbackFor = Exception.class)
public void addTransactionToException() {
BasicPriceUploadRecord base = new BasicPriceUploadRecord();
base.setErrorMsg("添加注解事務拋異常");
base.setId(1824040965380245002L);
basicPriceUploadRecordMapper.updateByPrimaryKeySelective(base);
throw new RuntimeException();
}
}
代碼執行后數據庫信息:
圖片
沒有修改成功,證明添加注解事務生效,進行了事務回滾操作。
通過上述簡單例子,我們了解下聲明式事務@Transactional的一些相關信息。
2. Transactional
2.1 注解定義
是Spring框架中用于聲明式事務管理的一個注解,主要通過 AOP(面向切面編程)來實現事務管理。可以被應用于接口定義、接口方法、類定義或類的public方法上。
2.2 常用屬性
value|transactionManager:指定事務管理器名稱
propagation :事務的傳播行為
isolation: 事務的隔離級別
timeout: 事務的超時時間
readOnly: 事務是否為只讀
rollbackFor:一個異常類數組,遇到這些異常時事務回滾
noRollbackFor:一個異常類數組,遇到這些異常時事務不回滾
事務的傳播行為:
required:當前存在事務,則加入該事務;如果當前沒有事務,則創建一個新的事務
supports:當前存在事務,則加入該事務;如果當前沒有事務,則以非事務方式執行
mandatory:當前存在事務,則加入該事務;如果當前沒有事務,則拋出異常
requires_new:創建一個新的事務,并且掛起當前事務
not_supported:以非事務方式執行,并且掛起當前事務
never:以非事務方式執行,如果存在事務,則拋出異常
nested:當前存在事務,則在嵌套事務內執行;如果當前沒有事務,則其效果與required相同
事務的隔離級別:在spring中,用于控制事務之間的并發訪問,以減少或避免事務之間的數據不一致問題。開發中基本都是 default 級別
default:使用底層數據庫默認的隔離級別
read_uncommitted:最低級別的隔離,事務可以看到其他未提交事務的數據
read_committed:事務只能看到其他已提交事務的數據,可以避免臟讀
repeatable_read:事務在整個過程中多次讀取同一數據時,結果是一致的,可以避免不可重復讀
serializable:最高級別的隔離,事務之間完全隔離,可以避免幻讀
3. 源碼分析
以下源碼均基于Spring4.3.12版本。主要從 創建事務、開啟事務、提交事務、事務回滾 的維度來詳細分析聲明式事務。
3.1 事務簡易流程圖
圖片
3.2 代理類生成
在Spring框架中,當你配置了事務管理器并聲明了@Transactional注解時,Spring會在實例化bean時生成事務增強的代理類。創建代理類參考源碼路徑如下:
AbstractAutowireCapableBeanFactory.createBean=>
doCreateBean()=>
initializeBean()=>
applyBeanPostProcessorsAfterInitialization()=>
postProcessAfterInitialization()(BeanPostProcessor內接口)=>
AbstractAutoProxyCreator.postProcessAfterInitialization()=>
wrapIfNecessary()=>
createProxy() 中 proxyFactory.setProxyTargetClass(true); //是否對類進行代理的設置,true為cglib代理
3.3 代理類中方法執行入口
從TransactionInterceptor.invoke()方法開始分析 (獲取代理類,調用父類TransactionAspectSupport.invokeWithinTransaction()方法,該方法會將代理類的方法納入事務中)。
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 返回代理類的目標類
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
//事務中執行被代理的方法
return this.invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
}
3.4 主要核心邏輯
TransactionAspectSupport.invokeWithinTransaction()方法負責獲取事務屬性和事務管理器,然后針對聲明式事務和編程式事務區分處理流程(此處源碼忽略編程式事務)。
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
// 獲取事務屬性 TransactionDefinition對象(回顧規則,隔離級別,只讀等)
final TransactionAttribute txAttr = this.getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
// 根據事務屬性和方法,獲取對應的事務管理器,(后續用于做事務的提交,回滾等操作),數據庫的一些信息,
final PlatformTransactionManager tm = this.determineTransactionManager(txAttr);
// 獲取事務方法全路徑,
final String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
//響應式編程事務,大多數情況下都會執行到 else中的語句;
// CallbackPreferringPlatformTransactionManager 可以通過回掉函數來處理事務的提交和回滾操作, 此處不考慮,此處源碼可以忽略
if (txAttr != null && tm instanceof CallbackPreferringPlatformTransactionManager) {
// 此處省略,此處為編程式事務 處理邏輯
} else {
//創建事務,事務屬性等信息會被保存進 TransactionInfo,便于后續流程中的提交和回滾操作,詳情見下文
TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// 執行目標的方法 (執行具體的業務邏輯)
retVal = invocation.proceedWithInvocation();
} catch (Throwable var15) {
//異常處理
this.completeTransactionAfterThrowing(txInfo, var15);
throw var15;
} finally {
//清除當前節點的事務消息,將舊事務節點消息通過ThreadLoacl更新到當前線程(事務的掛起操作就是在這執行)
this.cleanupTransactionInfo(txInfo);
}
//提交事務
this.commitTransactionAfterReturning(txInfo);
return retVal;
}
}
3.4.1 開啟事務
TransactionAspectSupport.createTransactionIfNecessary() 方法作用是檢查當前是否存在事務,如果存在,則根據一定的規則創建一個新的事務。
protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
//如果事務名稱不為空,則使用方法唯一標識。并使用 DelegatingTransactionAttribute 封裝 txAttr
if (txAttr != null && ((TransactionAttributerollbackOn)txAttr).getName() == null) {
txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 獲取事務狀態。內部判斷是否開啟事務綁定線程與數據庫連接。詳情見下文
status = tm.getTransaction((TransactionDefinition)txAttr);
} else if (this.logger.isDebugEnabled()) {
this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
}
}
//構建事務消息,根據指定的屬性與狀態status 構建一個 TransactionInfo。將已經建立連接的事務所有信息,都記錄在ThreadLocal下的TransactionInfo 實例中,包括目標方法的所有狀態信息,如果事務執行失敗,spring會根據TransactionInfo中的信息來進行回滾等后續操作
return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}
1)AbstractPlatformTransactionManager.getTransaction() 獲取當前事務對象。通過這個方法,可以獲取到關于事務的詳細信息,如事務的狀態、相關屬性等。
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
//definition 中存儲的事務的注解信息,超時時間和隔離級別等
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 獲取當前事務
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 判斷當前線程是否存在事務
if (isExistingTransaction(transaction)) {
// 處理已經存在的事務
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 事務超時設置驗證,超時時間小于-1 拋異常
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 如果當前線程不存在事務且 事務傳播行為是 MANDATORY(用當前事務,如果當前沒有事務,則拋出異常) 拋異常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
//以下三種事務傳播行為 需要開啟新事務
else if (def.getPropagationBehavior() == TransactionDefinition.propagation_required ||
def.getPropagationBehavior() == TransactionDefinition.propagation_requires_new ||
def.getPropagationBehavior() == TransactionDefinition.propagation_nested) {
//掛起原事務,因為這里不存在原事務 故設置為null。
//當一個事務方法內部調用了另一個事務方法時,如果第二個事務方法需要獨立于第一個事務方法,那么可以使用 suspend 方法來掛起當前事務,然后再開始一個新的事務
AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
try {
boolean newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//開啟事務
this.doBegin(transaction, (TransactionDefinition)definition);
//同步事務狀態及書屋屬性
this.prepareSynchronization(status, (TransactionDefinition)definition);
return status;
} catch (RuntimeException var7) {
this.resume((Object)null, suspendedResources);
throw var7;
}
}
else {
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);//0
//創建默認狀態 詳情見 下文
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
1.1AbstractPlatformTransactionManager.doGetTransaction() 方法可能用于執行獲取事務的具體操作。它可能會根據一些條件或規則,去查找和獲取當前的事務對象,并進行相應的處理。
protected Object doGetTransaction() {
DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject();
//是否允許在一個事務內部開啟另一個事務。
txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
// this.dataSource數據源 配置
//判斷當前線程如果已經記錄數據庫連接則使用原連接
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.dataSource);
//false 表示不是新創建連接
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
1.1.1this.dataSource() 是我們配置DataSourceTransactionManager時傳入的。
<bean id="valuationTransactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="valuationDataSource"/>
</bean>
1.1.2TransactionSynchronizationManager.getResource() 方法的作用主要是獲取與當前事務相關聯的資源。TransactionSynchronizationManager 持有一個ThreadLocal的實例,存在一個key為dataSource ,value為ConnectionHolder 的Map信息。
//ThreadLocal 存放 ConnectionHolder 信息,ConnectionHolder 可以理解為Connection(數據庫連接)的包裝類,其中最主要屬性為 Connection
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");
// 獲取ConnectionHolder
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
//獲取連接信息
Object value = doGetResource(actualKey);
return value;
}
//具體執行獲取連接信息操作
private static Object doGetResource(Object actualKey) {
//從 ThreadLoacl中獲取
Map<Object, Object> map = (Map)resources.get();
if (map == null) {
return null;
} else {
Object value = map.get(actualKey);
if (value instanceof ResourceHolder && ((ResourceHolder)value).isVoid()) {
map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
}
1.2AbstractPlatformTransactionManager.isExistingTransaction() 方法用于判斷是否存在正在進行的事務。它可以幫助我們確定當前的執行環境是否處于事務中,以便進行相應的處理。
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
return txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive();
}
1.3AbstractPlatformTransactionManager.suspend() 掛起事務,對有無同步的事務采取不同方案,doSuspend()執行掛起具體操作。
protected final AbstractPlatformTransactionManager.SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
//如果有同步的事務,則優先掛起同步事務
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List suspendedSynchronizations = this.doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
//執行掛起操作
suspendedResources = this.doSuspend(transaction);
}
//重置事務名稱
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName((String)null);
//重置只讀狀態
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
//重置隔離級別
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel((Integer)null);
//重置事務激活狀態
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
//返回掛起的事務
return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException var8) {
this.doResumeSynchronization(suspendedSynchronizations);
throw var8;
}
} else if (transaction != null) {
Object suspendedResources = this.doSuspend(transaction);
return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources);
} else {
return null;
}
}
1.3.1 AbstractPlatformTransactionManager.doSuspend()執行掛起操作只是將當前ConnectionHolder設置為null,返回原有事務消息,方便后續恢復原有事務消息,并將當前正在進行的事務信息進行重置。
protected Object doSuspend(Object transaction) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
txObject.setConnectionHolder((ConnectionHolder)null);
//接觸綁定
return TransactionSynchronizationManager.unbindResource(this.dataSource);
}
1.3.1.1TransactionSynchronizationManager.unbindResource()解除綁定操作,將現有的事務消息remove并返回上一級
public static Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
//解綁操作,移除資源
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException("No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
} else {
return value;
}
}
1.4 AbstractPlatformTransactionManager.doBegin()數據庫連接獲取,當新事務時,則獲取新的數據庫連接,并為其設置隔離級別,是否只讀等屬性。
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
Connection con = null;
try {
//新事務開啟時將 ConnectionHolder 設置為null
if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//獲取新的數據庫連接
Connection newCon = this.dataSource.getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//設置事務隔離級別 和readOnly屬性
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
// 交給Spring控制事務提交
con.setAutoCommit(false);
}
this.prepareTransactionalConnection(con, definition);
//設置當前線程的事務激活狀態
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if (timeout != -1) {
// 設置超時時間
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(this.getDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable var7) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder((ConnectionHolder)null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
}
}
1.5AbstractPlatformTransactionManager.prepareTransactionStatus()創建默認Status,如果不需要開始事務 (比如SUPPORTS),則返回一個默認的狀態。
protected final DefaultTransactionStatus prepareTransactionStatus(TransactionDefinition definition, Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, Object suspendedResources) {
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
this.prepareSynchronization(status, definition);
return status;
}
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, Object suspendedResources) {
boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
//創建 DefaultTransactionStatus 對象
return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);
}
1.6AbstractPlatformTransactionManager.handleExistingTransaction()針對不同的傳播行為做不同的處理方法,比如掛起原事務開啟新事務等等。
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
//當傳播行為是 NEVER 時拋出異常
if (definition.getPropagationBehavior() == 5) {
throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
} else {
AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources;
boolean newSynchronization;
//當傳播方式為NOT_SUPPORTED 時掛起當前事務,然后在無事務的狀態下運行
if (definition.getPropagationBehavior() == 4) {
//掛起事務
suspendedResources = this.suspend(transaction);
newSynchronization = this.getTransactionSynchronization() == 0;
//返回默認status
return this.prepareTransactionStatus(definition, (Object)null, false, newSynchronization, debugEnabled, suspendedResources);
//當傳播方式為REQUIRES_NEW時,掛起當前事務,然后啟動新事務
} else if (definition.getPropagationBehavior() == 3) {
//掛起原事務
suspendedResources = this.suspend(transaction);
try {
newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//啟動新的事務
this.doBegin(transaction, definition);
this.prepareSynchronization(status, definition);
return status;
} catch (Error|RuntimeException var7) {
this.resumeAfterBeginException(transaction, suspendedResources, var7);
throw var7;
}
} else {
boolean newSynchronization;
//當傳播方式為NESTED時,設置事務的保存點
//存在事務,將該事務標注保存點,形成嵌套事務
//嵌套事務中的子事務出現異常不會影響到父事務保存點之前的操作
if (definition.getPropagationBehavior() == 6) {
if (!this.isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");
} else {
if (this.useSavepointForNestedTransaction()) {
DefaultTransactionStatus status = this.prepareTransactionStatus(definition, transaction, false, false, debugEnabled, (Object)null);
//創建保存點,回滾時,只回滾到該保存點
status.createAndHoldSavepoint();
return status;
} else {
newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, (Object)null);
//如果不支持保存點,就啟動新的事務
this.doBegin(transaction, definition);
this.prepareSynchronization(status, definition);
return status;
}
}
} else {
newSynchronization = this.getTransactionSynchronization() != 2;
return this.prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, (Object)null);
}
}
}
}
3.4.2 回滾事務
TransactionAspectSupport.completeTransactionAfterThrowing() 判斷事務是否存在,如不存在就不需要回滾,如果存在則在判斷是否滿足回滾條件。
protected void completeTransactionAfterThrowing(TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {
//判斷是否存在事務
if (txInfo != null && txInfo.hasTransaction()) {
// 判斷是否滿足回滾條件。拋出的異常類型,和定義的回滾規則進行匹配
if (txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 回滾處理
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
//省略代碼
} else {
try {
//不滿足回滾條件 出現異常
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
//省略代碼
}
}
}
1 AbstractPlatformTransactionManager.rollback()當在事務執行過程中出現異常或其他需要回滾的情況時,就會調用這個方法,將事務進行回滾操作,撤銷之前所做的數據庫操作,以保證數據的一致性。
public final void rollback(TransactionStatus status) throws TransactionException {
//判斷事務是否已經完成,回滾時拋出異常
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
} else {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
// 執行回滾操作。
this.processRollback(defStatus);
}
}
1.1AbstractPlatformTransactionManager.processRollback()方法主要用于處理事務的回滾操作。通過這個方法,可以確保事務在需要回滾時能夠正確地執行回滾操作,保持數據的完整性。
private void processRollback(DefaultTransactionStatus status) {
try {
try {
//解綁線程和會話綁定關系
this.triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
//如果有保存點(當前事務為單獨的線程則會退到保存點)
status.rollbackToHeldSavepoint();
} else if (status.isNewTransaction()) {
//如果是新事務直接回滾。調用數據庫連接并調用rollback方法進行回滾。使用底層數據庫連接提供的API
this.doRollback(status);
} else if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || !this.isGlobalRollbackOnParticipationFailure()) {
//如果當前事務不是獨立的事務,則只能等待事務鏈執行完成后再做回滾操作
this.doSetRollbackOnly(status);
}
}
}
//catch 等代碼
// 關閉會話,重置屬性
this.triggerAfterCompletion(status, 1);
} finally {
//清理并恢復掛起的事務
this.cleanupAfterCompletion(status);
}
}
3.4.3 提交事務
TransactionAspectSupport.commitTransactionAfterReturning() 基本上和回滾一樣,都是先判斷是否有事務,在操作提交。
protected void commitTransactionAfterReturning(TransactionAspectSupport.TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
//提交事務
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
1) AbstractPlatformTransactionManager.commit() 創建默認Status prepareTransactionStatu,發現是否有回滾標記,然后進行回滾。如果判斷無需回滾就可以直接提交。
public final void commit(TransactionStatus status) throws TransactionException {
// 事務狀態已完成則拋異常
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
} else {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;
//發現回滾標記
if (defStatus.isLocalRollbackOnly()) {
//回滾
this.processRollback(defStatus);
} else if (!this.shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
//回滾
this.processRollback(defStatus);
if (status.isNewTransaction() || this.isFailEarlyOnGlobalRollbackOnly()) {
throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
}
} else {
// 提交操作
this.processCommit(defStatus);
}
}
}
1.1 AbstractPlatformTransactionManager.processCommit()處理事務的提交操作
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
this.prepareForCommit(status);
this.triggerBeforeCommit(status);
this.triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || this.isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
//釋放保存點信息
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
// 是一個新的事務 則提交。 獲取數據庫連接后使用數據庫API進行提交事務
this.doCommit(status);
}
if (globalRollbackOnly) {
throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (TransactionException var20) {
if (this.isRollbackOnCommitFailure()) {
//提交異常回滾
this.doRollbackOnCommitException(status, var20);
} else {
this.triggerAfterCompletion(status, 2);
}
throw var20;
}
//省略其它異常攔截
try {
this.triggerAfterCommit(status);
} finally {
this.triggerAfterCompletion(status, 0);
}
} finally {
// 清理事務消息
this.cleanupAfterCompletion(status);
}
}
3.4.4 清除事務信息
AbstractPlatformTransactionManager.cleanupAfterCompletion() 這個方法主要用于在事務完成后進行清理工作。它可能會負責釋放資源、清理臨時數據等,以確保系統處于良好的狀態。
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
//將當前事務設置為完成狀態
status.setCompleted();
if (status.isNewSynchronization()) {
// 清空當前事務消息
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
//如果是新事務 則在事務完成之后做清理操作
this.doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
// 將原事務從掛起狀態恢復
this.resume(status.getTransaction(), (AbstractPlatformTransactionManager.SuspendedResourcesHolder)status.getSuspendedResources());
}
}
1AbstractPlatformTransactionManager.doCleanupAfterCompletion()在新事務完成后會調用resetConnectionAfterTransaction方法重置數據庫連接信息,并判單如果是新的數據庫連接則將其放回連接池。
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
if (txObject.isNewConnectionHolder()) {
// 將數據庫連接從當前線程中解除綁定
TransactionSynchronizationManager.unbindResource(this.dataSource);
}
Connection con = txObject.getConnectionHolder().getConnection();
try {
// 恢復數據庫連接的autoCommit狀態
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
// 負責重置數據庫連接信息,包括隔離級別、readOnly屬性等
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
if (txObject.isNewConnectionHolder()) {
// 如果是新的數據庫連接則將數據庫連接放回連接池
DataSourceUtils.releaseConnection(con, this.dataSource);
}
txObject.getConnectionHolder().clear();
}
2) AbstractPlatformTransactionManager.resume() 如果事務執行前有事務掛起,那么當前事務執行結束后需要將掛起的事務恢復,掛起事務時保存了原事務信息,重置了當前事務信息,所以恢復操作就是將當前的事務信息設置為之前保存的原事務信息。
protected final void resume(Object transaction, AbstractPlatformTransactionManager.SuspendedResourcesHolder resourcesHolder) throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
// 執行 恢復掛起事務 ,綁定資源bindResource
this.doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
this.doResumeSynchronization(suspendedSynchronizations);
}
}
}
3) TransactionAspectSupport.cleanupTransactionInfo()清除當前節點的事務消息,將舊事務節點信息通過thradLoacl更新到當前線程。
protected void cleanupTransactionInfo(TransactionAspectSupport.TransactionInfo txInfo) {
if (txInfo != null) {
//從當前線程的 ThreadLocal 獲取上層的事務信息,將當前事務出棧,繼續執行上層事務
txInfo.restoreThreadLocalStatus();
}
}
private void restoreThreadLocalStatus() {
//當前事務處理完之后,恢復上層事務上下文
TransactionAspectSupport.transactionInfoHolder.set(this.oldTransactionInfo);
}
總結
如果方法正常執行完成且沒有異常,調用commitTransactionAfterReturning()方法。如果執行中出現異常,調用completeTransactionAfterThrowing()方法。
兩個方法內部都會判斷是否存在事務以及是否滿足回滾條件來決定最終執行提交操作還是回滾操作。
上述例子1.1中,未添加事務,不受事務控制,因此修改操作生效。
上述例子1.2中,因加入事務注解,指定回滾異常類型,在completeTransactionAfterThrowing()中邏輯判斷需要回滾,調用txInfo.getTransactionManager().rollback()方法執行回滾操作,因此修改操作不生效。
常見事務失效的原因及解決方案
場景一:同類中非事務方法調用事務方法
@Service
public class MyService {
@Transactional
public void transactionalMethod() {
// 事務邏輯
}
public void nonTransactionalCall() {
transactionalMethod(); // 非事務方法中調用事務方法
}
}
失效原因:事務是基于動態代理實現的,但本類中調用另一個方法默認是this調用關系,并非動態代理,故失效
解決方案:要么將操作移動到事務中,要么調用另一個Service中的事務方法
@Service
public class MyService {
@Resource
MyService2 myService2;
public void nonTransactionalCall() {
myService2.transactionalMethod(); // 非事務方法中調用事務方法
}
}
@Service
public class MyService2 {
@Transactional
public void transactionalMethod() {
// 事務邏輯
}
}
場景二:事務方法中拋出檢查型異常且未被正確處理
//失效場景
@Service
public class MyService {
@Transactional
public void transactionalMethod() {
throw new Exception(); // 拋出檢查型異常
}
}
失效原因:@Transactional 注解默認處理RuntimeException,即只有拋出運行時異常,才會觸發事務回滾
解決方案:@Transactional 設置為 @Transactional(rollbackFor =Exception.class) 或者直接拋出運行時異常
解決方案1
@Service
public class MyService {
@Transactional
public void transactionalMethod() {
throw new RuntimeException(); // 拋出非檢查型異常
}
}
解決方案2
@Service
public class MyService {
@Transactional(rollbackFor =Exception.class)
public void transactionalMethod() {
throw new Exception();
}
}
場景三:多線程問題
public class MyService {
@Resource
private ValuationMapper valuationMapper;
@Transactional
public void transactionalMethod() {
new Thread(() ->{
Valuation v = new Valuation();
v.setUserName("張三");
valuationMapper.insert(v);
}).start();
}
}
原因:Spring的事務是通過數據庫連接來實現不同線程使用不同的數據庫連接,且放在ThreadLocal中,基于同一個數據庫連接的事務才能同時提交或回滾,多線程場景下,拿到的數據庫連接不是同一個
解決方案:
1.采用分布式事務保證
2.自己實現事務回滾
場景四:錯誤使用事務傳播特性
public class MyService {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void methodA() {
// 一些事務操作
methodB();
}
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void methodB() {
// 這里不會執行事務操作,因為 methodA 中傳播特性配置錯誤
}
public static void main(String[] args) {
MyService example = new MyService();
example.methodA();
}
}
原因:methodA 配置為 REQUIRES_NEW,而 methodB 配置為 NOT_SUPPORTED 不支持事務傳播特性,導致事務傳播特性失效
解決方案:修改 methodB 傳播特性
關于作者
張云剛 采貨俠JAVA開發工程師
大道至簡。再復雜的技術,歸根結底也是在基礎技術之上實現的。