成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

深度剖析 Seata 源碼

開發(fā)
本文將針對 seata 分布式事務(wù)注冊到提交回滾的全流程進行深入分析和講解,希望對你有幫助。

一、如何使用源碼

需要了解的是,這篇文章是基于筆者相對早期的項目作為樣例進行講解,所以對應(yīng)的seata版本為1.4.2(核心部分實現(xiàn)大體是一樣的),建議讀者閱讀本文在調(diào)試源碼時可以選擇和筆者相同的版本進行理解學(xué)習(xí),對應(yīng)的下載地址為:https://github.com/apache/incubator-seata/tree/v1.4.2

完成下載后,為保證編譯可以通過我們還需要將seata-serializer-protobuf模塊移除掉,該模塊的位置如下圖所示:

同時seata的啟動類位于seata-server模塊,所以我們需要將該模塊的registry.conf的配置改為自己的配置:

以筆者為例,seata配置都是通過nacos進行統(tǒng)一管理的,所以對應(yīng)的配置類型也都是針對nacos維度去協(xié)調(diào)適配,大體配置如下所示:

registry {
  # 將seata注冊到nacos上
  type = "nacos"
  nacos {
  # nacos地址
    serverAddr = "ip:8848"
    # 命名空間id
    namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
    # 組名
    group = "DEFAULT_GROUP"
    # 集群節(jié)點名稱
    cluster = "default"
  }
}
config {
  # 通過nacos獲取配置
  type = "nacos"
  nacos {
    serverAddr = "ip:8848"
    namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
    group = "DEFAULT_GROUP"
  }
}

經(jīng)過這幾個步驟后seata就可以像我們?nèi)粘R粯拥姆绞竭M行使用了。

二、基于AT模式詳解Seata全鏈路流程

1. seata服務(wù)端啟動

我們先從seata的服務(wù)端啟動開始,seata服務(wù)端啟動時會進行如下幾個核心步驟:

  • 創(chuàng)建工作線程池workingThreads。
  • 基于工作線程池創(chuàng)建一個Netty服務(wù)端對外提供服務(wù)。
  • 基于該服務(wù)端創(chuàng)建的一個默認的協(xié)調(diào)者DefaultCoordinator管理全局事務(wù)。
  • 默認協(xié)調(diào)者初始化幾個定時任務(wù)處理一些異步的全局事務(wù)提交、回滾、超時監(jiān)測的任務(wù)。

對應(yīng)的我們給出這塊邏輯的核心入口代碼,即位于Server的主函數(shù)入口的main方法,可以看到seata服務(wù)端的創(chuàng)建是基于netty完成的,完成創(chuàng)建和初始化之后就與協(xié)調(diào)者coordinator進行綁定:

public static void main(String[] args) throws IOException {
        //......
        //創(chuàng)建工作線程池處理業(yè)務(wù)請求
        ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
        //基于該線程池初始化 seata 服務(wù)端
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
 //......
        //log store mode : file, db, redis
        SessionHolder.init(parameterParser.getStoreMode());
        //初始化協(xié)調(diào)者,處理seata服務(wù)端收到的各種事務(wù)讀寫請求
        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        //初始化各種異步定時任務(wù):全局事務(wù)提交、全局事務(wù)回滾、超時監(jiān)測等
        coordinator.init();
        //將協(xié)調(diào)者作為seata服務(wù)端的處理器
        nettyRemotingServer.setHandler(coordinator);
       //......
    }

對應(yīng)的我們也給出默認協(xié)調(diào)者的初始化源碼,即DefaultCoordinator的init方法,可以看到這段代碼本質(zhì)上就是提交一些定時任務(wù)處理全局事務(wù)提交、回滾、超時監(jiān)測、undo log刪除等:

public void init() {
        //每秒執(zhí)行,處理需要回滾的分布式事務(wù)
        retryRollbacking.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.retryRollbackingLock();
            if (lock) {
                try {
                    handleRetryRollbacking();
                } catch (Exception e) {
                    LOGGER.info("Exception retry rollbacking ... ", e);
                } finally {
                    SessionHolder.unRetryRollbackingLock();
                }
            }
        }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

       //......
        //異步定時提交全局事務(wù)的定時任務(wù),每秒執(zhí)行一次
        asyncCommitting.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.asyncCommittingLock();
            if (lock) {
                try {
                    //掃描獲取各種異步提交的全局事務(wù)
                    handleAsyncCommitting();
                } catch (Exception e) {
                    LOGGER.info("Exception async committing ... ", e);
                } finally {
                    SessionHolder.unAsyncCommittingLock();
                }
            }
        }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

       //......
    }

2. 本地服務(wù)如何基于GlobalTransaction注解開啟事務(wù)

我們都知道seata也是基于spring boot實現(xiàn)的,所以我們可以大膽的認為應(yīng)用端使用GlobalTransaction開啟分布式事務(wù)本質(zhì)上也是和spring boot自動裝配有著一定的聯(lián)系。

所以我們從seata-spring-boot-starter這個腳手架的源碼包的spring.factories文件入手,可以看到一個SeataAutoConfiguration的注入:

于是我們就可以看到一個GlobalTransactionScanner即一個關(guān)于GlobalTransaction注解掃描的類:

@Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
      //......
        //掃描我們的配置文件中配置的applicationId、txServiceGroup對應(yīng)的事務(wù)
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }

查看GlobalTransactionScanner源碼我們可以看到該類型繼承了spring的初始化bean并設(shè)置屬性后的拓展點InitializingBean的afterPropertiesSet方法,該方法內(nèi)部會初始化當前seata客戶端,分別初始化TM客戶端(使用GlobalTransaction注解的方法的服務(wù)即做為TM)和RM客戶端處理其他TM或者RM服務(wù)端發(fā)送的消息,它們初始化的工作分別是:

  • TM客戶端會注冊各種TC消息響應(yīng)的處理器,處理各種seata server對應(yīng)的TC響應(yīng)的消息,例如:全局事務(wù)開啟結(jié)果處理器、全局事務(wù)提交處理器、全局事務(wù)回滾處理器等。
  • RM客戶端則是注冊一些各種seata server對應(yīng)TC請求消息的處理器,例如:分支事務(wù)提交、分支事務(wù)回滾、分支事務(wù)undo.log刪除等。

對應(yīng)我們給出GlobalTransactionScanner的afterPropertiesSet源碼可以看到客戶端初始化這段調(diào)用的入口,可以看到啟動時某個線程完成CAS上鎖初始化標識之后,即通過initClient初始化客戶端:

@Override
    public void afterPropertiesSet() {
        //......
        //基于擴展點進行客戶端初始化
        if (initialized.compareAndSet(false, true)) {
            initClient();
        }
    }

步入后即可看到對于TM和RM客戶端的初始化調(diào)用:

private void initClient() {
        //......
        // 初始化TM客戶端
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
       //......
        // 初始化RM客戶端
        RMClient.init(applicationId, txServiceGroup);
      //......
    }

此時我們先看看TM客戶端內(nèi)部的處理函數(shù)即位于TmNettyRemotingClient的registerProcessor即可看到上述所說的TC響應(yīng)消息處理器的綁定步驟,即:

  • 注冊TC響應(yīng)消息處理器
  • 注冊全局事務(wù)開啟響應(yīng)處理器
  • 注冊全局事務(wù)提交響應(yīng)處理器
  • 注冊心跳消息處理器
private void registerProcessor() {
        // 1.registry TC response processor 注冊一些TC響應(yīng)消息的處理器
        ClientOnResponseProcessor onResponseProcessor =
                new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        //全局事務(wù)開啟結(jié)果響應(yīng)處理器
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
        //全局事務(wù)提交響應(yīng)處理器
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
        // 2. 注冊心跳消息
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }

同理我們也給出RM客戶端內(nèi)部初始化的調(diào)用RmNettyRemotingClient的registerProcessor方法:

  • 注冊分支事務(wù)提交消息處理器
  • 注冊rm客戶端對應(yīng)的分支事務(wù)提及和回滾處理器
  • 注冊undo Log刪除處理器
  • 注冊TC響應(yīng)消息處理器
  • 注冊心跳處理器
private void registerProcessor() {
        // 1. 注冊分支事務(wù)提交消息處理器
        RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
        // 2.注冊rm客戶端對應(yīng)的分支事務(wù)回滾處理器
        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
        // 3. 注冊undo log刪除處理器
        RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
        // 4. 注冊TC響應(yīng)消息處理器
        ClientOnResponseProcessor onResponseProcessor =
            new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
        // 5.注冊心跳消息處理器
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }

同時GlobalTransactionScanner繼承了AbstractAutoProxyCreator的wrapIfNecessary,該代理類會在spring容器中的bean進行檢查并決定是否進行動態(tài)代理。以我們的GlobalTransactionScanner邏輯它本質(zhì)上就是:

  • 檢查當前bean是否有GlobalTransactional這個注解
  • 如果有則基于全局事務(wù)攔截器對其進行增強

對應(yīng)核心邏輯如下所示,可以看到這段代碼會通過existsAnnotation檢查當前bean是否存在GlobalTransactional注解,如果有則基于globalTransactionalInterceptor 對其進行增強:

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {
            synchronized (PROXYED_SET) {
              //......
                //check TCC proxy
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                   //......
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    //判斷是否有GlobalTransaction注解,如果有則為其生成分布式事務(wù)的動態(tài)代理
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                    //如果攔截器為空則初始化攔截器
                    if (globalTransactionalInterceptor == null) {
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }

              //......
                if (!AopUtils.isAopProxy(bean)) {
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    //基于上一步的interceptor為其生成動態(tài)代理
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
             //......
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

這也就意味著我們調(diào)用帶有GlobalTransactional注解方法時,就會走到GlobalTransactionalInterceptor的增強邏輯上,它會走到GlobalTransactionalInterceptor的invoke方法上,最終會走到事務(wù)模板類transactionalTemplate的execute方法,該方法會執(zhí)行如下三個核心步驟:

  • 開啟全局事務(wù)。
  • 執(zhí)行原始業(yè)務(wù)邏輯。
  • 根據(jù)各個分支事務(wù)結(jié)果提交或者回滾事務(wù)。

對應(yīng)的我們給出GlobalTransactionalInterceptor的invoke方法,可以看到當該方法認為注解存在的情況下會直接調(diào)用handleGlobalTransaction開啟并處理全局事務(wù):

@Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
      //......
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {

            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            //獲取GlobalTransactional注解信息
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            //......
            if (!localDisable) {
                //若全局事務(wù)注解不為空則調(diào)用handleGlobalTransaction處理全局事務(wù)
                if (globalTransactionalAnnotation != null) {
                    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                } else if (globalLockAnnotation != null) {
                     //......
                }
            }
        }
         //......
    }

步入其內(nèi)部就會走到transactionalTemplate的execute方法,即可看到對于:

  • 分支事務(wù)的創(chuàng)建
  • 告知TC請求開啟全局事務(wù)
  • 執(zhí)行本地事務(wù)
  • 全局提交或者回滾

對應(yīng)邏輯的源碼如下所示,讀者可結(jié)合說明了解:

public Object execute(TransactionalExecutor business) throws Throwable {
       //......

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            //如果tx為空則以全局事務(wù)啟動者的身份創(chuàng)建一個全新的事務(wù)
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                //向TC發(fā)送請求開啟全局事務(wù)
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    //執(zhí)行業(yè)務(wù)邏輯(被代理的原始方法)
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    //全局事務(wù)回滾
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                //分支事務(wù)執(zhí)行成功,提交全局事務(wù)
                commitTransaction(tx);

                return rs;
            } finally {
             //......
            }
        } finally {
         //......
        }
    }

3. 客戶端如何開啟分布式事務(wù)

上文調(diào)用分布式事務(wù)的方法時內(nèi)部會走到的代理的transactionalTemplate的execute方法,其內(nèi)部有個beginTransaction就是開啟分布式事務(wù)的關(guān)鍵,由上文可知作為GlobalTransactional注解的方法對對應(yīng)的服務(wù)就是作為TM即transaction manager,所以在調(diào)用beginTransaction時,這個方法的代理就會以TM的身份發(fā)送一個請求告知TC自己要開啟一個全局事務(wù),TC經(jīng)過自己的協(xié)調(diào)處理后(后文會介紹流程)返回一份xid告知TM開啟成功:

對應(yīng)的我們查看seata客戶端對應(yīng)TransactionalTemplate的beginTransaction方法即可看到begin方法的調(diào)用,該方法回告知seata服務(wù)端自己要開啟一個全局事務(wù):

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            //......
            //開始分布式事務(wù)
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
         //......
        } catch (TransactionException txe) {
            //......

        }
    }

查看begin內(nèi)部就是通過TM發(fā)起請求,得到xid并緩存到當前線程內(nèi)部,開始后續(xù)的執(zhí)行流程分布式事務(wù)處理流程:

@Override
    public void begin(int timeout, String name) throws TransactionException {
        //......
        //通過TM告知TC開啟全局事務(wù),從而得到xid
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        //將xid緩存到當前線程的緩存中
        RootContext.bind(xid);
        //......
    }

4. seata服務(wù)端如何注冊全局事務(wù)

基于上述請求,對應(yīng)seata server端的TC收到請求后會基于傳參中的消息標信息,定位到對應(yīng)的執(zhí)行器即TM消息處理器,然后驅(qū)動TM處理器將這個請求生成一份全局session信息從而構(gòu)成本次請求的全局事務(wù)信息,再將請求寫入數(shù)據(jù)表中:

我們給出TC處理消息的代碼入口AbstractNettyRemotingServer的channelRead方法,從名字不難看出TC服務(wù)端也是基于netty實現(xiàn),其內(nèi)部通過processMessage處理各種消息:

@Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  
            //基于netty編寫的服務(wù)端,channelRead通過processMessage處理客戶端各種請求
            processMessage(ctx, (RpcMessage) msg);
        }

步入processMessage即可看到基于處理表定位消息并交由處理器處理消息邏輯pair.getFirst().process(ctx, rpcMessage);:

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
         //......
         //獲取網(wǎng)絡(luò)消息
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //通過處理表定位到對應(yīng)的處理器
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                //基于第一個處理器處理當前消息
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                //......
                            } finally {
                                //......
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        //......
                        
                    }
                } else {
                //......
               }
        }
    }

因為我們的消息是TM發(fā)來的,所以上一步的處理器是ServerOnRequestProcessor的:

@Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(ctx.channel())) {
         //處理TM客戶端發(fā)送來的消息
            onRequestMessage(ctx, rpcMessage);
        } else {
           //......
        }
    }

最終走到GlobalBeginRequest這個工具的handle基于協(xié)調(diào)者將事務(wù)信息寫入global_table從而得到xid返回給TM客戶端:

@Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
        throws TransactionException {
  //生成全局事務(wù)信息并得到xid將數(shù)據(jù)寫入響應(yīng)返回給TM
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
            request.getTransactionName(), request.getTimeout()));
       //.......
    }

5. RM和TC如何協(xié)調(diào)處理分支事務(wù)

完成全局事務(wù)的注冊管理之后,我們再來聊聊各個分支事務(wù)的執(zhí)行和提交回滾,上文提及,seata原生我們本地的jdbc數(shù)據(jù)庫連接通過代理加以封裝,所以在我們seata客戶端執(zhí)行本地事務(wù)完成后提交的commit方法是經(jīng)過了seata的代理這一層,該連接代理在調(diào)用commit方法時,其內(nèi)部就會通過RM向TC注冊一個分支事務(wù)的請求,TC收到請求后會執(zhí)行如下工作:

  • 基于lock_table嘗試為事務(wù)生成全局鎖。
  • 分支事務(wù)信息寫入到branch_table表上并返回branch_id給RM:

我們給出ConnectionProxy的commit方法入口,其內(nèi)部調(diào)用了一個doCommit方法,它就是事務(wù)提交的核心邏輯:

@Override
    public void commit() throws SQLException {
        try {
         //excute會調(diào)用doCommit生成undoLog緩存和執(zhí)行分支事務(wù)
            LOCK_RETRY_POLICY.execute(() -> {
                //excuete執(zhí)行成功后這一步會注冊分支事務(wù)并提交本地事務(wù)和undoLog鏡像以保證原子性
                doCommit();
                return null;
            });
        } catch (SQLException e) {
           //......
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

其內(nèi)部調(diào)用ConnectionProxy的doCommit會調(diào)用processGlobalTransactionCommit執(zhí)行分支事務(wù):

private void doCommit() throws SQLException {
        //如果處于全局事務(wù)中則調(diào)用processGlobalTransactionCommit
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
         //......
        } else {
           //......
        }
    }

最終就可以在processGlobalTransactionCommit看到如下邏輯:

  • register這個注冊分支事務(wù)的邏輯,TC基于RM給定的resourceId信息,生成操作數(shù)據(jù)的全局鎖,并插入分支事務(wù)信息到brach_table中。
  • undo日志刷盤到本地undo日志中。
  • 本地業(yè)務(wù)的事務(wù)提交。
private void processGlobalTransactionCommit() throws SQLException {
        try {
            //向TC發(fā)起請求注冊分支事務(wù),TC基于RM給定的resourceId生成全局鎖并插入分支事務(wù)信息到brach_table后就不會拋異常
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            //undo日志刷盤
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            //本地事務(wù)提交
            targetConnection.commit();
        } catch (Throwable ex) {
          //......
        }
          //......
    }

這里我們著重看一下register函數(shù),其內(nèi)部本質(zhì)上就是通過RM客戶端告知TC自己準備執(zhí)行分支事務(wù)提交,幫我上一把全局鎖并注冊分支事務(wù):

private void register() throws TransactionException {
        if (!context.hasUndoLog() || !context.hasLockKey()) {
            return;
        }
        //向tc發(fā)起請求并獲得register
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), null, context.buildLockKeys());
        //緩存到當前線程中
        context.setBranchId(branchId);
    }

最后這個注冊的邏輯就會來到AbstractResourceManager的branchRegister上,可以看到它會攜帶著全局事務(wù)id和主鍵等數(shù)據(jù)發(fā)送請求給TC:

@Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
        try {
            BranchRegisterRequest request = new BranchRegisterRequest();
            //傳入全局事務(wù)id即xid
            request.setXid(xid);
            //基于當前數(shù)據(jù)主鍵生成lockkeys
            request.setLockKey(lockKeys);
            request.setResourceId(resourceId);
            request.setBranchType(branchType);
            request.setApplicationData(applicationData);
            //基于RM的netty客戶端將其異步發(fā)送
            BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
         //......
            return response.getBranchId();
        } catch (TimeoutException toe) {
           //......
        }
    }

6. seata服務(wù)端處理分支事務(wù)請求

TC處理流程與上述文章同理,收到消息后基于request中的消息表定位到對應(yīng)的處理器,我們這里最終會走到BranchRegisterRequest的處理器上,通過AbstractTCInboundHandler注冊分支事務(wù):

@Override
    public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
        BranchRegisterResponse response = new BranchRegisterResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
            @Override
            public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
                throws TransactionException {
                try {
                    //tc注冊分支事務(wù)入口
                    doBranchRegister(request, response, rpcContext);
                } catch (StoreException e) {
                 //......
                }
            }
        }, request, response);
        return response;
    }

最終這段邏輯就會走到AbstractCore的branchRegister,大體執(zhí)行的步驟是:

  • 生成分支事務(wù)session
  • 嘗試獲得數(shù)據(jù)全局鎖lock_table
  • 取鎖成功將分支事務(wù)信息寫入branch_table
  • 返回branch_id給RM

對應(yīng)源碼邏輯如下,大體邏輯就說基于分支事務(wù)session生成全局鎖存到lock_table后,將分支事務(wù)信息存到branch_table中:

@Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
      //......
        return SessionHolder.lockAndExecute(globalSession, () -> {
             //......
            //獲取分支事務(wù)的表信息并將其寫入到lock_table中意味獲得全局鎖,上鎖失敗會拋異常
            branchSessionLock(globalSession, branchSession);

            try {
                //添加分支事務(wù)信息到branch_table中
                globalSession.addBranch(branchSession);
            } catch (RuntimeException ex) {
                 //......
            }
             //......
             //返回分支事務(wù)id
            return branchSession.getBranchId();
        });
    }

TC返回成功后,RM就會執(zhí)行undo日志刷盤和本地事務(wù)提交,詳情參考我們本節(jié)代碼processGlobalTransactionCommit方法,這里不貼出了。

7. RM生成回滾日志

對于java程序而言大部分SQL操作底層都是基于Executor執(zhí)行器操作的,在上述代理執(zhí)行commit方法前,seata底層將代理的連接即上文的connectionProxy通過AbstractDMLBaseExecutor執(zhí)行SQL操作,該執(zhí)會針對我們的連接代理進行如下邏輯處理:

  • 判斷連接代理connectionProxy是否是自動提交,若非自動提交則調(diào)用executeAutoCommitFalse方法,該方法會生成undoLog數(shù)據(jù)寫入緩存,然后將RM當執(zhí)行分支事務(wù)SQL,基于該執(zhí)行結(jié)果生成后置鏡像,最后將undo日志寫入undo_log表中。
  • 若開啟自動提交則關(guān)閉自動提交后,復(fù)用executeAutoCommitFalse方法執(zhí)行系統(tǒng)的undoLog和分支事務(wù)SQL的執(zhí)行操作。

對應(yīng)源碼的整體工作鏈路圖如下所示:

這里我們直接給出AbstractDMLBaseExecutor的doExecute方法作為入口,可以看到若開啟自動提交則調(diào)用executeAutoCommitTrue,反之調(diào)用executeAutoCommitFalse:

@Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        //若自動提交則關(guān)閉自動提交,并生成undo信息存入緩沖區(qū)
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            //直接生成undo log鏡像寫入緩存
            return executeAutoCommitFalse(args);
        }
    }

因為都會復(fù)用executeAutoCommitFalse這段邏輯,所以我們直接查看這個方法的邏輯,可以看到該邏輯內(nèi)部會基于分支事務(wù)前后的數(shù)據(jù)生成前置和后置鏡像:

protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        //基于分支事務(wù)的SQL定位操作前的SQL生成前置鏡像
        TableRecords beforeImage = beforeImage();
        //執(zhí)行分支事務(wù)的SQL 
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        //生成分支事務(wù)操作后置鏡像
        TableRecords afterImage = afterImage(beforeImage);
        //將undoLog寫入緩沖區(qū)
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

8. 事務(wù)全局提交與回滾

TransactionalTemplate(即TM)驅(qū)動各種分支事務(wù)準備成功后,就會執(zhí)行commitTransaction提交全局事務(wù),對應(yīng)的代碼位于TransactionalTemplate的execute方法,該方法會通知TC驅(qū)動全局事務(wù)提交,而TC收到該請求之后,就會驅(qū)動各個分支事務(wù)提交事務(wù),每個分支事務(wù)收到該請求后就會刪除undoLog并提交各自未提交的事務(wù):

public Object execute(TransactionalExecutor business) throws Throwable {
          //......

            try {
             
                //向TC發(fā)送請求開啟全局事務(wù)
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    
                    //執(zhí)行業(yè)務(wù)邏輯(被代理的原始方法)
                    rs = business.execute();
                } catch (Throwable ex) {            
                    //全局事務(wù)回滾
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

              
                //分支事務(wù)執(zhí)行成功,提交全局事務(wù)
                commitTransaction(tx);

                return rs;
            } finally {
          //......
            }
        } finally {
          //......
        }
    }

步入其內(nèi)部可以看到DefaultGlobalTransaction調(diào)用transactionManager即TM提交全局事務(wù):

@Override
    public void commit() throws TransactionException {
       //......
        try {
            while (retry > 0) {
                try {
                    //執(zhí)行全局事務(wù)提交
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                   //......
            }
        } finally {
          //......
        }
        //......
    }

這個commit的邏輯也很簡單,即告知TC要提交全局事務(wù)了:

@Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        //通知TC提交全局事務(wù)
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

對應(yīng)的TC收到該請求后,對應(yīng)的AbstractTCInboundHandler就會調(diào)用doGlobalCommit通知各個RM提交全局事務(wù):

@Override
    public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
        GlobalCommitResponse response = new GlobalCommitResponse();
        response.setGlobalStatus(GlobalStatus.Committing);
        exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
            @Override
            public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
                throws TransactionException {
                try {
                //遍歷RM提交各個分支事務(wù)
                    doGlobalCommit(request, response, rpcContext);
                } catch (StoreException e) {
                  //......
                }
            }
     //......

          //......


        }, request, response);
        return response;
    }

對應(yīng)的我們可以來道該源碼內(nèi)部的DefaultCore的doGlobalCommit方法印證這一點,可以看到該方法會遍歷各個分支事務(wù)調(diào)用branchCommit通知其提交或者回滾事務(wù):

@Override
    public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
      //......
        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            //遍歷全局事務(wù)中的分支事務(wù)
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                //......
                }
                try {
                    //告知RM提交事務(wù)
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                    //......
                } catch (Exception ex) {
                    //......
                }
                return CONTINUE;
            });
             //......
        }
        //......
        return success;
    }

最后請求達到RM上的DefaultRMHandler按照TC要求提交或者回滾事務(wù):

//RM提交分支事務(wù)
    @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }
    //RM回滾分支事務(wù)
    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }

提交事務(wù)本質(zhì)上就是提交后刪除undoLog即可,這里我們以分支事務(wù)回滾為例,可以看到上述代碼BranchRollbackResponse 會調(diào)用handle方法執(zhí)行分支事務(wù)回滾,該方法最終會走到AbstractRMHandler的doBranchRollback,該方法會調(diào)動RM管理器將分支事務(wù)回滾:

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
        throws TransactionException {
        //......
        //回滾分支事務(wù)
        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        //將xid和處理結(jié)果狀態(tài)響應(yīng)給TC
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
       //......
    }

最終該方法內(nèi)部就會調(diào)用AbstractUndoLogManager的undo解析當前分支事務(wù)的前置鏡像數(shù)據(jù),該方法內(nèi)部執(zhí)行邏輯為:

  • 定位分支事務(wù)的undo日志數(shù)據(jù)
  • 反序列化為undo對象
  • 基于該undo對象信息解析出表名、列以及數(shù)據(jù)等信息。
  • 通過undoExecutor 執(zhí)行器將分支事務(wù)還原。

對應(yīng)源碼如下:

@Override
    public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
      //......

        for (; ; ) {
            try {
               //......

                // Find UNDO LOG
                //獲取當前分支事務(wù)的undo鏡像
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();

                boolean exists = false;
                while (rs.next()) {
                   //......
                    //獲取undo數(shù)據(jù)
                    byte[] rollbackInfo = getRollbackInfo(rs);

                    //反序列化生成undo對象 branchUndoLog
                    String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                        : UndoLogParserFactory.getInstance(serializer);
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                    try {
                        // put serializer name to local
                        setCurrentSerializer(parser.getName());
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        if (sqlUndoLogs.size() > 1) {
                            Collections.reverse(sqlUndoLogs);
                        }
                        //遍歷undo對象生成SQL還原分支事務(wù)值
                        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                            //獲取表的表名、列的元信息
                            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                            sqlUndoLog.setTableMeta(tableMeta);
                            //獲取對應(yīng)的執(zhí)行執(zhí)行器 將對應(yīng)分支事務(wù)的表數(shù)據(jù)回滾
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(), sqlUndoLog);
                            undoExecutor.executeOn(conn);
                        }
                    } finally {
                        // remove serializer name
                        removeCurrentSerializer();
                    }
                }

                //......
            } catch (SQLIntegrityConstraintViolationException e) {
                //......
            } catch (Throwable e) {
                //......

            } finally {
               //......
            }
        }
    }

三、小結(jié)

讓我們來做個小結(jié),總的來說seata實現(xiàn)數(shù)據(jù)庫的AT模式分布式事務(wù)的流程為:

(1) 調(diào)用帶有g(shù)lobalTransactional注解的方法執(zhí)行業(yè)務(wù)邏輯。

(2) 該方法以TM的身份通知TC開啟全局事務(wù)。

(3) TC收到通知后到global_table創(chuàng)建該方法的全局事務(wù)信息,這里以筆者某個下單業(yè)務(wù)的分布式事務(wù)場景為例,對應(yīng)的數(shù)據(jù)如下所示:

(4) RM開始工作,各自找TC注冊分支事務(wù),基于當前數(shù)據(jù)生成全局鎖存入lock_table,保證當前數(shù)據(jù)操作時沒有其他事務(wù)干擾:

全局鎖成功后TC將數(shù)據(jù)存入branch_table表,對應(yīng)數(shù)據(jù)如下所示:

(5) RM完成分支事務(wù)注冊后,持有本地鎖的事務(wù)執(zhí)行本地分支事務(wù),成功后將生成分支數(shù)據(jù)的前后鏡像undo表,如下所示:

這里我們以后置鏡像為例子查看賬戶表修改后的字段值為例,可以看到該鏡像將每一個字段的類型、值等信息都序列化為JSON生成undo鏡像:

(6) TM感知到所有分支事務(wù)準備成功,通知TC將這些RM(分支事務(wù))提交,即將undoLog刪除,反之基于undoLog將數(shù)據(jù)回滾。

對應(yīng)我們給出下面這段圖,讀者可以結(jié)合上面源碼梳理一下全流程:

我是 SharkChili ,Java 開發(fā)者,Java Guide 開源項目維護者。歡迎關(guān)注我的公眾號:寫代碼的SharkChili,也歡迎您了解我的開源項目 mini-redis:https://github.com/shark-ctrl/mini-redis。

責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2022-09-27 18:56:28

ArrayList數(shù)組源代碼

2024-02-05 19:06:04

DartVMGC流程

2010-03-01 14:50:06

Python 工具

2009-09-15 14:52:15

linq級聯(lián)刪除

2011-05-23 14:20:59

WordPress

2010-03-01 18:33:30

2023-01-10 13:48:50

ContainerdCRI源碼

2010-02-01 13:34:59

Python 腳本

2010-02-02 15:25:35

Python語法

2010-02-03 16:56:24

Python包

2010-03-05 16:38:30

2014-10-17 09:30:38

2020-04-01 10:28:12

Apache HBas數(shù)據(jù)結(jié)構(gòu)算法

2010-02-04 15:38:39

Android 手機

2022-03-24 14:40:31

開發(fā)Harmony鴻蒙

2022-04-29 14:56:40

通話應(yīng)用源碼剖析

2009-12-07 18:43:29

WCF框架

2010-02-05 18:00:18

Android源代碼

2010-02-06 15:32:30

Android架構(gòu)

2010-02-26 17:44:40

Python測試框架
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 在线视频一区二区 | 呦呦在线视频 | 国产视频亚洲视频 | 国产精品久久久久久久久久三级 | 久久久激情 | 日韩视频观看 | 国产区在线视频 | 欧美一区二区三区四区视频 | 黄视频国产 | 欧美久久一级特黄毛片 | 婷婷久久一区 | 日本一区二区高清不卡 | 国产一区二区在线播放 | a a毛片| 国产日韩91 | 极品销魂美女一区二区 | 高清18麻豆 | 91不卡在线| 天天干天天想 | 国产99久久精品一区二区永久免费 | 中国美女一级黄色片 | 国产精品久久久久久久久图文区 | 玖玖玖在线 | 韩国av一区二区 | 精品中文字幕一区二区三区 | 欧美一级观看 | 日日操夜夜操视频 | 日韩国产在线观看 | 欧美性一区二区三区 | 久久成人18免费网站 | 国产在线一区二区 | 久在线 | 久久久成人精品 | 欧美日韩综合 | av福利网站 | 91精品欧美久久久久久久 | 日韩av免费在线观看 | 中文字幕精品一区二区三区精品 | 欧美精品一区三区 | 一级a性色生活片久久毛片波多野 | 91精品国产91久久久久久最新 |