深度剖析 Seata 源碼
一、如何使用源碼
需要了解的是,這篇文章是基于筆者相對早期的項目作為樣例進行講解,所以對應(yīng)的seata版本為1.4.2(核心部分實現(xiàn)大體是一樣的),建議讀者閱讀本文在調(diào)試源碼時可以選擇和筆者相同的版本進行理解學(xué)習(xí),對應(yīng)的下載地址為:https://github.com/apache/incubator-seata/tree/v1.4.2
完成下載后,為保證編譯可以通過我們還需要將seata-serializer-protobuf模塊移除掉,該模塊的位置如下圖所示:
同時seata的啟動類位于seata-server模塊,所以我們需要將該模塊的registry.conf的配置改為自己的配置:
以筆者為例,seata配置都是通過nacos進行統(tǒng)一管理的,所以對應(yīng)的配置類型也都是針對nacos維度去協(xié)調(diào)適配,大體配置如下所示:
registry {
# 將seata注冊到nacos上
type = "nacos"
nacos {
# nacos地址
serverAddr = "ip:8848"
# 命名空間id
namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
# 組名
group = "DEFAULT_GROUP"
# 集群節(jié)點名稱
cluster = "default"
}
}
config {
# 通過nacos獲取配置
type = "nacos"
nacos {
serverAddr = "ip:8848"
namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
group = "DEFAULT_GROUP"
}
}
經(jīng)過這幾個步驟后seata就可以像我們?nèi)粘R粯拥姆绞竭M行使用了。
二、基于AT模式詳解Seata全鏈路流程
1. seata服務(wù)端啟動
我們先從seata的服務(wù)端啟動開始,seata服務(wù)端啟動時會進行如下幾個核心步驟:
- 創(chuàng)建工作線程池workingThreads。
- 基于工作線程池創(chuàng)建一個Netty服務(wù)端對外提供服務(wù)。
- 基于該服務(wù)端創(chuàng)建的一個默認的協(xié)調(diào)者DefaultCoordinator管理全局事務(wù)。
- 默認協(xié)調(diào)者初始化幾個定時任務(wù)處理一些異步的全局事務(wù)提交、回滾、超時監(jiān)測的任務(wù)。
對應(yīng)的我們給出這塊邏輯的核心入口代碼,即位于Server的主函數(shù)入口的main方法,可以看到seata服務(wù)端的創(chuàng)建是基于netty完成的,完成創(chuàng)建和初始化之后就與協(xié)調(diào)者coordinator進行綁定:
public static void main(String[] args) throws IOException {
//......
//創(chuàng)建工作線程池處理業(yè)務(wù)請求
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
//基于該線程池初始化 seata 服務(wù)端
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
//......
//log store mode : file, db, redis
SessionHolder.init(parameterParser.getStoreMode());
//初始化協(xié)調(diào)者,處理seata服務(wù)端收到的各種事務(wù)讀寫請求
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
//初始化各種異步定時任務(wù):全局事務(wù)提交、全局事務(wù)回滾、超時監(jiān)測等
coordinator.init();
//將協(xié)調(diào)者作為seata服務(wù)端的處理器
nettyRemotingServer.setHandler(coordinator);
//......
}
對應(yīng)的我們也給出默認協(xié)調(diào)者的初始化源碼,即DefaultCoordinator的init方法,可以看到這段代碼本質(zhì)上就是提交一些定時任務(wù)處理全局事務(wù)提交、回滾、超時監(jiān)測、undo log刪除等:
public void init() {
//每秒執(zhí)行,處理需要回滾的分布式事務(wù)
retryRollbacking.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.retryRollbackingLock();
if (lock) {
try {
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
} finally {
SessionHolder.unRetryRollbackingLock();
}
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//......
//異步定時提交全局事務(wù)的定時任務(wù),每秒執(zhí)行一次
asyncCommitting.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.asyncCommittingLock();
if (lock) {
try {
//掃描獲取各種異步提交的全局事務(wù)
handleAsyncCommitting();
} catch (Exception e) {
LOGGER.info("Exception async committing ... ", e);
} finally {
SessionHolder.unAsyncCommittingLock();
}
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//......
}
2. 本地服務(wù)如何基于GlobalTransaction注解開啟事務(wù)
我們都知道seata也是基于spring boot實現(xiàn)的,所以我們可以大膽的認為應(yīng)用端使用GlobalTransaction開啟分布式事務(wù)本質(zhì)上也是和spring boot自動裝配有著一定的聯(lián)系。
所以我們從seata-spring-boot-starter這個腳手架的源碼包的spring.factories文件入手,可以看到一個SeataAutoConfiguration的注入:
于是我們就可以看到一個GlobalTransactionScanner即一個關(guān)于GlobalTransaction注解掃描的類:
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
//......
//掃描我們的配置文件中配置的applicationId、txServiceGroup對應(yīng)的事務(wù)
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
查看GlobalTransactionScanner源碼我們可以看到該類型繼承了spring的初始化bean并設(shè)置屬性后的拓展點InitializingBean的afterPropertiesSet方法,該方法內(nèi)部會初始化當前seata客戶端,分別初始化TM客戶端(使用GlobalTransaction注解的方法的服務(wù)即做為TM)和RM客戶端處理其他TM或者RM服務(wù)端發(fā)送的消息,它們初始化的工作分別是:
- TM客戶端會注冊各種TC消息響應(yīng)的處理器,處理各種seata server對應(yīng)的TC響應(yīng)的消息,例如:全局事務(wù)開啟結(jié)果處理器、全局事務(wù)提交處理器、全局事務(wù)回滾處理器等。
- RM客戶端則是注冊一些各種seata server對應(yīng)TC請求消息的處理器,例如:分支事務(wù)提交、分支事務(wù)回滾、分支事務(wù)undo.log刪除等。
對應(yīng)我們給出GlobalTransactionScanner的afterPropertiesSet源碼可以看到客戶端初始化這段調(diào)用的入口,可以看到啟動時某個線程完成CAS上鎖初始化標識之后,即通過initClient初始化客戶端:
@Override
public void afterPropertiesSet() {
//......
//基于擴展點進行客戶端初始化
if (initialized.compareAndSet(false, true)) {
initClient();
}
}
步入后即可看到對于TM和RM客戶端的初始化調(diào)用:
private void initClient() {
//......
// 初始化TM客戶端
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
//......
// 初始化RM客戶端
RMClient.init(applicationId, txServiceGroup);
//......
}
此時我們先看看TM客戶端內(nèi)部的處理函數(shù)即位于TmNettyRemotingClient的registerProcessor即可看到上述所說的TC響應(yīng)消息處理器的綁定步驟,即:
- 注冊TC響應(yīng)消息處理器
- 注冊全局事務(wù)開啟響應(yīng)處理器
- 注冊全局事務(wù)提交響應(yīng)處理器
- 注冊心跳消息處理器
private void registerProcessor() {
// 1.registry TC response processor 注冊一些TC響應(yīng)消息的處理器
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
//全局事務(wù)開啟結(jié)果響應(yīng)處理器
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
//全局事務(wù)提交響應(yīng)處理器
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
// 2. 注冊心跳消息
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
同理我們也給出RM客戶端內(nèi)部初始化的調(diào)用RmNettyRemotingClient的registerProcessor方法:
- 注冊分支事務(wù)提交消息處理器
- 注冊rm客戶端對應(yīng)的分支事務(wù)提及和回滾處理器
- 注冊undo Log刪除處理器
- 注冊TC響應(yīng)消息處理器
- 注冊心跳處理器
private void registerProcessor() {
// 1. 注冊分支事務(wù)提交消息處理器
RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
// 2.注冊rm客戶端對應(yīng)的分支事務(wù)回滾處理器
RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
// 3. 注冊undo log刪除處理器
RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
// 4. 注冊TC響應(yīng)消息處理器
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
// 5.注冊心跳消息處理器
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
同時GlobalTransactionScanner繼承了AbstractAutoProxyCreator的wrapIfNecessary,該代理類會在spring容器中的bean進行檢查并決定是否進行動態(tài)代理。以我們的GlobalTransactionScanner邏輯它本質(zhì)上就是:
- 檢查當前bean是否有GlobalTransactional這個注解
- 如果有則基于全局事務(wù)攔截器對其進行增強
對應(yīng)核心邏輯如下所示,可以看到這段代碼會通過existsAnnotation檢查當前bean是否存在GlobalTransactional注解,如果有則基于globalTransactionalInterceptor 對其進行增強:
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {
synchronized (PROXYED_SET) {
//......
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//......
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//判斷是否有GlobalTransaction注解,如果有則為其生成分布式事務(wù)的動態(tài)代理
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
//如果攔截器為空則初始化攔截器
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
//......
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
//基于上一步的interceptor為其生成動態(tài)代理
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
//......
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
這也就意味著我們調(diào)用帶有GlobalTransactional注解方法時,就會走到GlobalTransactionalInterceptor的增強邏輯上,它會走到GlobalTransactionalInterceptor的invoke方法上,最終會走到事務(wù)模板類transactionalTemplate的execute方法,該方法會執(zhí)行如下三個核心步驟:
- 開啟全局事務(wù)。
- 執(zhí)行原始業(yè)務(wù)邏輯。
- 根據(jù)各個分支事務(wù)結(jié)果提交或者回滾事務(wù)。
對應(yīng)的我們給出GlobalTransactionalInterceptor的invoke方法,可以看到當該方法認為注解存在的情況下會直接調(diào)用handleGlobalTransaction開啟并處理全局事務(wù):
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//......
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//獲取GlobalTransactional注解信息
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
//......
if (!localDisable) {
//若全局事務(wù)注解不為空則調(diào)用handleGlobalTransaction處理全局事務(wù)
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
//......
}
}
}
//......
}
步入其內(nèi)部就會走到transactionalTemplate的execute方法,即可看到對于:
- 分支事務(wù)的創(chuàng)建
- 告知TC請求開啟全局事務(wù)
- 執(zhí)行本地事務(wù)
- 全局提交或者回滾
對應(yīng)邏輯的源碼如下所示,讀者可結(jié)合說明了解:
public Object execute(TransactionalExecutor business) throws Throwable {
//......
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
//如果tx為空則以全局事務(wù)啟動者的身份創(chuàng)建一個全新的事務(wù)
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
//向TC發(fā)送請求開啟全局事務(wù)
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
//執(zhí)行業(yè)務(wù)邏輯(被代理的原始方法)
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
//全局事務(wù)回滾
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
//分支事務(wù)執(zhí)行成功,提交全局事務(wù)
commitTransaction(tx);
return rs;
} finally {
//......
}
} finally {
//......
}
}
3. 客戶端如何開啟分布式事務(wù)
上文調(diào)用分布式事務(wù)的方法時內(nèi)部會走到的代理的transactionalTemplate的execute方法,其內(nèi)部有個beginTransaction就是開啟分布式事務(wù)的關(guān)鍵,由上文可知作為GlobalTransactional注解的方法對對應(yīng)的服務(wù)就是作為TM即transaction manager,所以在調(diào)用beginTransaction時,這個方法的代理就會以TM的身份發(fā)送一個請求告知TC自己要開啟一個全局事務(wù),TC經(jīng)過自己的協(xié)調(diào)處理后(后文會介紹流程)返回一份xid告知TM開啟成功:
對應(yīng)的我們查看seata客戶端對應(yīng)TransactionalTemplate的beginTransaction方法即可看到begin方法的調(diào)用,該方法回告知seata服務(wù)端自己要開啟一個全局事務(wù):
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
//......
//開始分布式事務(wù)
tx.begin(txInfo.getTimeOut(), txInfo.getName());
//......
} catch (TransactionException txe) {
//......
}
}
查看begin內(nèi)部就是通過TM發(fā)起請求,得到xid并緩存到當前線程內(nèi)部,開始后續(xù)的執(zhí)行流程分布式事務(wù)處理流程:
@Override
public void begin(int timeout, String name) throws TransactionException {
//......
//通過TM告知TC開啟全局事務(wù),從而得到xid
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
//將xid緩存到當前線程的緩存中
RootContext.bind(xid);
//......
}
4. seata服務(wù)端如何注冊全局事務(wù)
基于上述請求,對應(yīng)seata server端的TC收到請求后會基于傳參中的消息標信息,定位到對應(yīng)的執(zhí)行器即TM消息處理器,然后驅(qū)動TM處理器將這個請求生成一份全局session信息從而構(gòu)成本次請求的全局事務(wù)信息,再將請求寫入數(shù)據(jù)表中:
我們給出TC處理消息的代碼入口AbstractNettyRemotingServer的channelRead方法,從名字不難看出TC服務(wù)端也是基于netty實現(xiàn),其內(nèi)部通過processMessage處理各種消息:
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
//基于netty編寫的服務(wù)端,channelRead通過processMessage處理客戶端各種請求
processMessage(ctx, (RpcMessage) msg);
}
步入processMessage即可看到基于處理表定位消息并交由處理器處理消息邏輯pair.getFirst().process(ctx, rpcMessage);:
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
//......
//獲取網(wǎng)絡(luò)消息
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
//通過處理表定位到對應(yīng)的處理器
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
//基于第一個處理器處理當前消息
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
//......
} finally {
//......
}
});
} catch (RejectedExecutionException e) {
//......
}
} else {
//......
}
}
}
因為我們的消息是TM發(fā)來的,所以上一步的處理器是ServerOnRequestProcessor的:
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (ChannelManager.isRegistered(ctx.channel())) {
//處理TM客戶端發(fā)送來的消息
onRequestMessage(ctx, rpcMessage);
} else {
//......
}
}
最終走到GlobalBeginRequest這個工具的handle基于協(xié)調(diào)者將事務(wù)信息寫入global_table從而得到xid返回給TM客戶端:
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
//生成全局事務(wù)信息并得到xid將數(shù)據(jù)寫入響應(yīng)返回給TM
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
//.......
}
5. RM和TC如何協(xié)調(diào)處理分支事務(wù)
完成全局事務(wù)的注冊管理之后,我們再來聊聊各個分支事務(wù)的執(zhí)行和提交回滾,上文提及,seata原生我們本地的jdbc數(shù)據(jù)庫連接通過代理加以封裝,所以在我們seata客戶端執(zhí)行本地事務(wù)完成后提交的commit方法是經(jīng)過了seata的代理這一層,該連接代理在調(diào)用commit方法時,其內(nèi)部就會通過RM向TC注冊一個分支事務(wù)的請求,TC收到請求后會執(zhí)行如下工作:
- 基于lock_table嘗試為事務(wù)生成全局鎖。
- 分支事務(wù)信息寫入到branch_table表上并返回branch_id給RM:
我們給出ConnectionProxy的commit方法入口,其內(nèi)部調(diào)用了一個doCommit方法,它就是事務(wù)提交的核心邏輯:
@Override
public void commit() throws SQLException {
try {
//excute會調(diào)用doCommit生成undoLog緩存和執(zhí)行分支事務(wù)
LOCK_RETRY_POLICY.execute(() -> {
//excuete執(zhí)行成功后這一步會注冊分支事務(wù)并提交本地事務(wù)和undoLog鏡像以保證原子性
doCommit();
return null;
});
} catch (SQLException e) {
//......
} catch (Exception e) {
throw new SQLException(e);
}
}
其內(nèi)部調(diào)用ConnectionProxy的doCommit會調(diào)用processGlobalTransactionCommit執(zhí)行分支事務(wù):
private void doCommit() throws SQLException {
//如果處于全局事務(wù)中則調(diào)用processGlobalTransactionCommit
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
//......
} else {
//......
}
}
最終就可以在processGlobalTransactionCommit看到如下邏輯:
- register這個注冊分支事務(wù)的邏輯,TC基于RM給定的resourceId信息,生成操作數(shù)據(jù)的全局鎖,并插入分支事務(wù)信息到brach_table中。
- undo日志刷盤到本地undo日志中。
- 本地業(yè)務(wù)的事務(wù)提交。
private void processGlobalTransactionCommit() throws SQLException {
try {
//向TC發(fā)起請求注冊分支事務(wù),TC基于RM給定的resourceId生成全局鎖并插入分支事務(wù)信息到brach_table后就不會拋異常
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
//undo日志刷盤
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
//本地事務(wù)提交
targetConnection.commit();
} catch (Throwable ex) {
//......
}
//......
}
這里我們著重看一下register函數(shù),其內(nèi)部本質(zhì)上就是通過RM客戶端告知TC自己準備執(zhí)行分支事務(wù)提交,幫我上一把全局鎖并注冊分支事務(wù):
private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
//向tc發(fā)起請求并獲得register
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
//緩存到當前線程中
context.setBranchId(branchId);
}
最后這個注冊的邏輯就會來到AbstractResourceManager的branchRegister上,可以看到它會攜帶著全局事務(wù)id和主鍵等數(shù)據(jù)發(fā)送請求給TC:
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
try {
BranchRegisterRequest request = new BranchRegisterRequest();
//傳入全局事務(wù)id即xid
request.setXid(xid);
//基于當前數(shù)據(jù)主鍵生成lockkeys
request.setLockKey(lockKeys);
request.setResourceId(resourceId);
request.setBranchType(branchType);
request.setApplicationData(applicationData);
//基于RM的netty客戶端將其異步發(fā)送
BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
//......
return response.getBranchId();
} catch (TimeoutException toe) {
//......
}
}
6. seata服務(wù)端處理分支事務(wù)請求
TC處理流程與上述文章同理,收到消息后基于request中的消息表定位到對應(yīng)的處理器,我們這里最終會走到BranchRegisterRequest的處理器上,通過AbstractTCInboundHandler注冊分支事務(wù):
@Override
public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
BranchRegisterResponse response = new BranchRegisterResponse();
exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
@Override
public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
throws TransactionException {
try {
//tc注冊分支事務(wù)入口
doBranchRegister(request, response, rpcContext);
} catch (StoreException e) {
//......
}
}
}, request, response);
return response;
}
最終這段邏輯就會走到AbstractCore的branchRegister,大體執(zhí)行的步驟是:
- 生成分支事務(wù)session
- 嘗試獲得數(shù)據(jù)全局鎖lock_table
- 取鎖成功將分支事務(wù)信息寫入branch_table
- 返回branch_id給RM
對應(yīng)源碼邏輯如下,大體邏輯就說基于分支事務(wù)session生成全局鎖存到lock_table后,將分支事務(wù)信息存到branch_table中:
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
//......
return SessionHolder.lockAndExecute(globalSession, () -> {
//......
//獲取分支事務(wù)的表信息并將其寫入到lock_table中意味獲得全局鎖,上鎖失敗會拋異常
branchSessionLock(globalSession, branchSession);
try {
//添加分支事務(wù)信息到branch_table中
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
//......
}
//......
//返回分支事務(wù)id
return branchSession.getBranchId();
});
}
TC返回成功后,RM就會執(zhí)行undo日志刷盤和本地事務(wù)提交,詳情參考我們本節(jié)代碼processGlobalTransactionCommit方法,這里不貼出了。
7. RM生成回滾日志
對于java程序而言大部分SQL操作底層都是基于Executor執(zhí)行器操作的,在上述代理執(zhí)行commit方法前,seata底層將代理的連接即上文的connectionProxy通過AbstractDMLBaseExecutor執(zhí)行SQL操作,該執(zhí)會針對我們的連接代理進行如下邏輯處理:
- 判斷連接代理connectionProxy是否是自動提交,若非自動提交則調(diào)用executeAutoCommitFalse方法,該方法會生成undoLog數(shù)據(jù)寫入緩存,然后將RM當執(zhí)行分支事務(wù)SQL,基于該執(zhí)行結(jié)果生成后置鏡像,最后將undo日志寫入undo_log表中。
- 若開啟自動提交則關(guān)閉自動提交后,復(fù)用executeAutoCommitFalse方法執(zhí)行系統(tǒng)的undoLog和分支事務(wù)SQL的執(zhí)行操作。
對應(yīng)源碼的整體工作鏈路圖如下所示:
這里我們直接給出AbstractDMLBaseExecutor的doExecute方法作為入口,可以看到若開啟自動提交則調(diào)用executeAutoCommitTrue,反之調(diào)用executeAutoCommitFalse:
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
//若自動提交則關(guān)閉自動提交,并生成undo信息存入緩沖區(qū)
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
//直接生成undo log鏡像寫入緩存
return executeAutoCommitFalse(args);
}
}
因為都會復(fù)用executeAutoCommitFalse這段邏輯,所以我們直接查看這個方法的邏輯,可以看到該邏輯內(nèi)部會基于分支事務(wù)前后的數(shù)據(jù)生成前置和后置鏡像:
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
//基于分支事務(wù)的SQL定位操作前的SQL生成前置鏡像
TableRecords beforeImage = beforeImage();
//執(zhí)行分支事務(wù)的SQL
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
//生成分支事務(wù)操作后置鏡像
TableRecords afterImage = afterImage(beforeImage);
//將undoLog寫入緩沖區(qū)
prepareUndoLog(beforeImage, afterImage);
return result;
}
8. 事務(wù)全局提交與回滾
TransactionalTemplate(即TM)驅(qū)動各種分支事務(wù)準備成功后,就會執(zhí)行commitTransaction提交全局事務(wù),對應(yīng)的代碼位于TransactionalTemplate的execute方法,該方法會通知TC驅(qū)動全局事務(wù)提交,而TC收到該請求之后,就會驅(qū)動各個分支事務(wù)提交事務(wù),每個分支事務(wù)收到該請求后就會刪除undoLog并提交各自未提交的事務(wù):
public Object execute(TransactionalExecutor business) throws Throwable {
//......
try {
//向TC發(fā)送請求開啟全局事務(wù)
beginTransaction(txInfo, tx);
Object rs;
try {
//執(zhí)行業(yè)務(wù)邏輯(被代理的原始方法)
rs = business.execute();
} catch (Throwable ex) {
//全局事務(wù)回滾
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
//分支事務(wù)執(zhí)行成功,提交全局事務(wù)
commitTransaction(tx);
return rs;
} finally {
//......
}
} finally {
//......
}
}
步入其內(nèi)部可以看到DefaultGlobalTransaction調(diào)用transactionManager即TM提交全局事務(wù):
@Override
public void commit() throws TransactionException {
//......
try {
while (retry > 0) {
try {
//執(zhí)行全局事務(wù)提交
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
//......
}
} finally {
//......
}
//......
}
這個commit的邏輯也很簡單,即告知TC要提交全局事務(wù)了:
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
//通知TC提交全局事務(wù)
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
對應(yīng)的TC收到該請求后,對應(yīng)的AbstractTCInboundHandler就會調(diào)用doGlobalCommit通知各個RM提交全局事務(wù):
@Override
public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
GlobalCommitResponse response = new GlobalCommitResponse();
response.setGlobalStatus(GlobalStatus.Committing);
exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
@Override
public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
throws TransactionException {
try {
//遍歷RM提交各個分支事務(wù)
doGlobalCommit(request, response, rpcContext);
} catch (StoreException e) {
//......
}
}
//......
//......
}, request, response);
return response;
}
對應(yīng)的我們可以來道該源碼內(nèi)部的DefaultCore的doGlobalCommit方法印證這一點,可以看到該方法會遍歷各個分支事務(wù)調(diào)用branchCommit通知其提交或者回滾事務(wù):
@Override
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
//......
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
//遍歷全局事務(wù)中的分支事務(wù)
Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
//......
}
try {
//告知RM提交事務(wù)
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
//......
} catch (Exception ex) {
//......
}
return CONTINUE;
});
//......
}
//......
return success;
}
最后請求達到RM上的DefaultRMHandler按照TC要求提交或者回滾事務(wù):
//RM提交分支事務(wù)
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
return getRMHandler(request.getBranchType()).handle(request);
}
//RM回滾分支事務(wù)
@Override
public BranchRollbackResponse handle(BranchRollbackRequest request) {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
return getRMHandler(request.getBranchType()).handle(request);
}
提交事務(wù)本質(zhì)上就是提交后刪除undoLog即可,這里我們以分支事務(wù)回滾為例,可以看到上述代碼BranchRollbackResponse 會調(diào)用handle方法執(zhí)行分支事務(wù)回滾,該方法最終會走到AbstractRMHandler的doBranchRollback,該方法會調(diào)動RM管理器將分支事務(wù)回滾:
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
//......
//回滾分支事務(wù)
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
applicationData);
//將xid和處理結(jié)果狀態(tài)響應(yīng)給TC
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
//......
}
最終該方法內(nèi)部就會調(diào)用AbstractUndoLogManager的undo解析當前分支事務(wù)的前置鏡像數(shù)據(jù),該方法內(nèi)部執(zhí)行邏輯為:
- 定位分支事務(wù)的undo日志數(shù)據(jù)
- 反序列化為undo對象
- 基于該undo對象信息解析出表名、列以及數(shù)據(jù)等信息。
- 通過undoExecutor 執(zhí)行器將分支事務(wù)還原。
對應(yīng)源碼如下:
@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
//......
for (; ; ) {
try {
//......
// Find UNDO LOG
//獲取當前分支事務(wù)的undo鏡像
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
//......
//獲取undo數(shù)據(jù)
byte[] rollbackInfo = getRollbackInfo(rs);
//反序列化生成undo對象 branchUndoLog
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
//遍歷undo對象生成SQL還原分支事務(wù)值
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
//獲取表的表名、列的元信息
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
//獲取對應(yīng)的執(zhí)行執(zhí)行器 將對應(yīng)分支事務(wù)的表數(shù)據(jù)回滾
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}
//......
} catch (SQLIntegrityConstraintViolationException e) {
//......
} catch (Throwable e) {
//......
} finally {
//......
}
}
}
三、小結(jié)
讓我們來做個小結(jié),總的來說seata實現(xiàn)數(shù)據(jù)庫的AT模式分布式事務(wù)的流程為:
(1) 調(diào)用帶有g(shù)lobalTransactional注解的方法執(zhí)行業(yè)務(wù)邏輯。
(2) 該方法以TM的身份通知TC開啟全局事務(wù)。
(3) TC收到通知后到global_table創(chuàng)建該方法的全局事務(wù)信息,這里以筆者某個下單業(yè)務(wù)的分布式事務(wù)場景為例,對應(yīng)的數(shù)據(jù)如下所示:
(4) RM開始工作,各自找TC注冊分支事務(wù),基于當前數(shù)據(jù)生成全局鎖存入lock_table,保證當前數(shù)據(jù)操作時沒有其他事務(wù)干擾:
全局鎖成功后TC將數(shù)據(jù)存入branch_table表,對應(yīng)數(shù)據(jù)如下所示:
(5) RM完成分支事務(wù)注冊后,持有本地鎖的事務(wù)執(zhí)行本地分支事務(wù),成功后將生成分支數(shù)據(jù)的前后鏡像undo表,如下所示:
這里我們以后置鏡像為例子查看賬戶表修改后的字段值為例,可以看到該鏡像將每一個字段的類型、值等信息都序列化為JSON生成undo鏡像:
(6) TM感知到所有分支事務(wù)準備成功,通知TC將這些RM(分支事務(wù))提交,即將undoLog刪除,反之基于undoLog將數(shù)據(jù)回滾。
對應(yīng)我們給出下面這段圖,讀者可以結(jié)合上面源碼梳理一下全流程:
我是 SharkChili ,Java 開發(fā)者,Java Guide 開源項目維護者。歡迎關(guān)注我的公眾號:寫代碼的SharkChili,也歡迎您了解我的開源項目 mini-redis:https://github.com/shark-ctrl/mini-redis。