成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

雙異步系列完結撒花,如何解決異步事務問題?

開發 后端
想要保證事務,肯定是使用@Transactional來實現。現在的場景是導入若干個大的Excel文件數據,因為每個Excel導入的表不同,所以只要保證單Excel的事務即可。

一、前情提要

在上一篇文章中,我們通過雙異步的方式導入了10萬行的Excel,有個小伙伴在評論區問我,如果保證事務呢,如果分批的話。

原始需求:讀取一個10萬行的Excel

通過串行讀取Excel,單個Excel耗時191s。

優化1:使用雙異步后,從 191s 優化到 2s

  • 分別通過POI和EasyExcel的方式讀取Excel并插入數據庫。
  • 探討了“線程池中的核心線程數設置問題”。
  • 經過數十次的測試,總結了通過線程池的方式,爭取一次性并行入庫,效率最佳。

優化2:使用雙異步后,如何保證數據一致性?

通過Future獲取異步返回值,再和Excel文件數據行進行比較,實現對數據準確性的判斷!

  • 逐行分析了FutureTask源碼,繪制了FutureTask執行流程圖。
  • 分析get()源碼,繪制get()方法執行流程圖。
  • 但是,發現了一個問題,Future.get()會造成主線程的阻塞。

優化3:獲取雙異步返回值時,如何保證主線程不阻塞?

Java8中引入了CompletableFuture,它實現了對Future的全面升級,可以通過回調的方式,獲取異步線程返回值。

CompletableFuture的異步執行通過ForkJoinPool實現, 它使用守護線程去執行任務。

ForkJoinPool在于可以充分利用多核CPU的優勢,把一個任務拆分成多個小任務,把多個小任務放到多個CPU上并行執行,當多個小任務執行完畢后,再將其執行結果合并起來。

  • 通過CompletableFuture優化 “通過Future獲取異步返回值”;
  • CompletableFuture和Future的效率對比;
  • 自定義ForkJoinPool線程池;
  • 核心線程數相同的情況下,CompletableFuture的入庫效率要優于Future的入庫效率,10萬條數據大概要快4秒鐘;
  • 通過CompletableFuture.allOf解決阻塞主線程問題;
  • 總結了CompletableFuture中花俏的語法糖。

二、異步某線程失敗時,主線程回滾所有異步線程的事務!

想要保證事務,肯定是使用@Transactional來實現。

現在的場景是導入若干個大的Excel文件數據,因為每個Excel導入的表不同,所以只要保證單Excel的事務即可。

上文中,是使用異步批量讀取并插入的方式實現的Excel文件入庫。

也就是說,1個主線程事務 + 若干個子線程事務,我們想要保證單Excel的插入事務,所有異步子線程有任何一個報錯,都要進行事務回滾,如果全部都沒報錯,則進行事務提交。

這個時候,有的小伙伴可能會想到,主線程加個@Transactional注解,所有子線程分別加@Transactional注解,就可以了吧?

但是,這樣是不行的,子線程的異常只會回滾其自身的事務。

如果Excel中有10萬條數據,一次插入4200條數據,最后一次插入3400條。如果其它線程都插入成功了,最后一個報錯了,此時,數據庫中還是會有96600條數據插入成功,與單Excel的事務需求不符。

通過代碼模擬這種情況:

if(end == sheet.getLastRowNum()){
    logger.info("插入最后一批數據,模擬異常");
    int a = 1/0;
}

三、@Transactional注解

聲明式事務管理建立在AOP之上的。其本質是對方法前后進行攔截,然后在目標方法開始之前創建或者加入一個事務,在執行完目標方法之后根據執行情況提交或者回滾事務。

簡而言之,@Transactional注解在代碼執行出錯的時候能夠進行事務的回滾。

  • 在啟動類上添加@EnableTransactionManagement注解。
  • 用于類上時,該類的所有 public 方法將都具有該類型的事務屬性,同時,我們也可以在方法級別使用該標注來覆蓋類級別的定義。
  • 在項目中,@Transactional(rollbackFor=Exception.class),如果類加了這個注解,那么這個類里面的方法拋出異常,就會回滾,數據庫里面的數據也會回滾。
  • 在@Transactional注解中如果不配置rollbackFor屬性,那么事物只會在遇到RuntimeException的時候才會回滾,加上rollbackFor=Exception.class,可以讓事物在遇到非運行時異常時也回滾。

1、@Transactional

使用@Transactional后,當程序發生RuntimeException運行時異常在沒有使用try,catch進行捕獲的時候,程序都會中止,當程序發生中止,則會觸發數據庫的回滾。

當使用了trycatch進行捕獲到這個異常,假如在catch中加入了throw e拋出異常,則程序中止,數據庫回滾。

加入在try catch中沒有throw e 拋出異常,只是簡單的打印異常,則異常被捕獲未拋出異常去終止程序,在trycatch中的操作數據庫語句插入失敗,在trycatch上面和下面的數據庫相關插入語句成功,也就是程序成功跑完,數據庫不會發生回滾。

2、@Transactional(rollbackFor = Exception.class)

在@Transactional注解中如果不配置rollbackFor屬性,那么事物只會在遇到RuntimeException的時候才會回滾,加上rollbackFor=Exception.class,可以讓事物在遇到非運行時異常時也回滾。

四、注解失效問題

1、@Transactional 應用在非 public 修飾的方法上

事務攔截器在目標方法執行前后進行攔截,內部會調用方法來獲取Transactional 注解的事務配置信息,調用前會檢查目標方法的修飾符是否為 public,不是 public則不會獲取@Transactional 的屬性配置信息。

2、@Transactional 注解屬性 rollbackFor 設置錯誤

rollbackFor 可以指定能夠觸發事務回滾的異常類型。

Spring默認拋出了未檢查unchecked異常(繼承自 RuntimeException 的異常)或者 Error才回滾事務;其他異常不會觸發回滾事務。

如果在事務中拋出其他類型的異常,但卻期望 Spring 能夠回滾事務,就需要指定rollbackFor屬性。

3、同一個類中方法調用,導致@Transactional失效

開發中避免不了會對同一個類里面的方法調用,比如有一個類Test,它的一個方法A,A再調用本類的方法B(不論方法B是用public還是private修飾),但方法A沒有聲明注解事務,而B方法有。則外部調用方法A之后,方法B的事務是不會起作用的。這也是經常犯錯誤的一個地方。

那為啥會出現這種情況?其實這還是由于使用Spring AOP代理造成的,因為只有當事務方法被當前類以外的代碼調用時,才會由Spring生成的代理對象來管理。

在同一個類中調用異步方法,等于調用this本類的方法,沒有走Spring生成的代理類,也就不會讓他異步執行,@Transactional的原理也類似。

4、捕獲異常

如果你手動的catch捕獲這個異常并進行處理,事務管理器會認為當前事務應該正常commit,就會導致注解失效,如果非要捕獲且不失效,就必須在代碼塊內throw new Exception拋出異常。

五、通過Future獲取異步返回值,添加事務

1、添加事務

@Transactional(rollbackFor = Exception.class)
public void readXls(String filePath, String filename) throws Exception{
 try {
  // 省略一些復雜操作...
   
   List<Future<Integer>> futureList = new ArrayList<>();
  for (int time = 0; time < times; time++) {
   Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsyncMybatis();
            futureList.add(sumFuture);
  }
  
  // 主線程獲取Future返回值
        boolean futureFlag = getFutureResult(futureList, excelRow);
        if (futureFlag) {
            logger.info("readXlsCacheAsync---插入數據成功,提交事務");
        } else {
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            logger.info("readXlsCacheAsync---插入數據失敗,回滾事務");
        }
 } catch (Exception e) {
  TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
  logger.error("readXlsCacheAsync---插入數據異常,回滾事務:", e);
    }
}

@Async("async-executor")//是否開啟異步
@Override
public Integer readXlsCacheAsyncMybatis() {
    try {
     // 省略一些復雜操作...
    }catch (Exception e){
        throw new RuntimeException("插入數據庫異常", e);
    }
}

(1)添加事務 + 不開啟異步

如果入庫異常,事務回滾成功。

(2)添加事務 + 開啟異步

回滾失敗!

2、手動添加事務

public void readXls(String filePath, String filename) throws Exception{
  // 手動開啟事務,不自動提交
    TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
 try {
  // 省略一些復雜操作...
   
   List<Future<Integer>> futureList = new ArrayList<>();
  for (int time = 0; time < times; time++) {
   Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsyncMybatis();
            futureList.add(sumFuture);
  }
  
  // 主線程獲取Future返回值
        boolean futureFlag = getFutureResult(futureList, excelRow);
        if (futureFlag) {
         dataSourceTransactionManager.commit(transactionStatus); // 提交
            logger.info("readXlsCacheAsync---插入數據成功,提交事務");
        } else {
            dataSourceTransactionManager.rollback(transactionStatus);// 回滾
            logger.info("readXlsCacheAsync---插入數據失敗,回滾事務");
        }
 } catch (Exception e) {
  dataSourceTransactionManager.rollback(transactionStatus);// 回滾
  logger.error("readXlsCacheAsync---插入數據異常,回滾事務:", e);
    }
}

@Async("async-executor")//是否開啟異步
@Override
public Integer readXlsCacheAsyncMybatis() {
    try {
     // 省略一些復雜操作...
    }catch (Exception e){
        throw new RuntimeException("插入數據庫異常", e);
    }
}

(1)添加事務 + 不開啟異步

如果入庫異常,事務回滾成功。

(2)Future獲取異步返回值,添加手動事務,異常回滾失敗!

六、@async + @Transactional 事務失效問題

回顧一下需求:異步某線程失敗時,主線程回滾所有異步線程的事務!

是代碼有問題,還是就是實現不了呢?

@Async和@Transactional注解都是通過Spring aop實現的,核心都是靠著關鍵的MethodInterceptor實現,@Async會給對應bean代理對象中放入一個AnnotationAsyncExecutionInterceptor攔截器,而@Transactional會給對應bean的代理對象中放入一個TransactionInterceptor攔截器。

Spring事務管理的傳播機制是使用 ThreadLocal 實現的。因為 ThreadLocal 是線程私有的,所以 Spring 的事務傳播機制是不能夠跨線程的。

七、Spring 的事務傳播機制是不能夠跨線程的

1、一個異步線程一個事務,然后根據結果統一提交/回滾?

2、核心代碼

/**
 * 數據源事務管理器
 */
private DataSourceTransactionManager dataSourceTransactionManager;

@Autowired
public void setUserService(DataSourceTransactionManager dataSourceTransactionManager) {
 this.dataSourceTransactionManager = dataSourceTransactionManager;
}

@Override
public void readXls(String filePath, String filename) {

 List<TransactionStatus> transactionStatusList = Collections.synchronizedList(new ArrayList<>());
 List<TransactionResource> transactionResourceList = Collections.synchronizedList(new ArrayList<>());

 try {
  List<Future<Integer>> futureList = new ArrayList<>();
        for (int time = 0; time < times; time++) {
   Future<Integer> sumFuture = readAsyncFutureTransactionDBService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder,transactionStatusList,transactionResourceList);
            futureList.add(sumFuture);
  }
  
  // 主線程獲取Future返回值
        boolean futureFlag = getFutureResult(futureList, excelRow);
  if (futureFlag) {
   for (int i = 0; i < transactionStatusList.size(); i++) {
    TransactionStatus transactionStatus = transactionStatusList.get(i);
    dataSourceTransactionManager.commit(transactionStatus); // 提交
   }

   logger.info("readXlsCacheAsync---插入數據成功,提交事務");
  } else {
   for (int i = 0; i < transactionStatusList.size(); i++) {
    TransactionStatus transactionStatus = transactionStatusList.get(i);
    dataSourceTransactionManager.rollback(transactionStatus);// 回滾
   }

   logger.info("readXlsCacheAsync---插入數據失敗,事務回滾");
   throw new RuntimeException("readXlsCacheAsync---插入數據異常,異常事務回滾");
  }
 } catch (Exception e) {
  logger.error("readXlsCacheAsync---插入數據異常,事務回滾:", e);

  for (int i = 0; i < transactionStatusList.size(); i++) {
   TransactionStatus transactionStatus = transactionStatusList.get(i);
   dataSourceTransactionManager.rollback(transactionStatus);// 回滾
  }
  //connection.rollback();
  throw new RuntimeException("readXlsCacheAsync---插入數據異常,異常事務回滾");
 }
}

3、異步線程類

@Async("async-executor")
@Override
public Future<Integer> readXlsCacheAsyncMybatis(XSSFSheet sheet, 
            XSSFRow row, 
            int start, 
            int end, 
            StringBuilder insertBuilder,
            List<TransactionStatus> transactionStatusList,
  List<ReadAsyncFutureTransactionServiceImpl.TransactionResource> transactionResourceList) throws Exception {
  
 DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);
 // 開啟新事務
 transactionStatusList.add(transactionStatus);
 // copy事務資源
 transactionResourceList.add(ReadAsyncFutureTransactionServiceImpl.TransactionResource.copyTransactionResource());

 try {
  // 入庫操作
 }catch (Exception e){
  throw new RuntimeException("readXlsCacheAsyncMybatis分批異步讀取Excel,通過Mybatis插入數據庫異常");
 }
}

4、事務復制類

/**
 * 保存當前事務資源,用于線程間的事務資源COPY操作
 * <p>
 * `@Builder`注解是Lombok庫提供的一個注解,它可以用于自動生成Builder模式的代碼,使用@Builder注解可以簡化創建對象實例的過程,并且可以使代碼更加清晰和易于維護
 */
static class TransactionResource {
    // TransactionSynchronizationManager類內部默認提供了下面六個ThreadLocal屬性,分別保存當前線程對應的不同事務資源
    // 保存當前事務關聯的資源,默認只會在新建事務的時候保存當前獲取到的DataSource和當前事務對應Connection的映射關系
    // 當然這里Connection被包裝為了ConnectionHolder
    // 事務結束后默認會移除集合中的DataSource作為key關聯的資源記錄
    private Map<Object, Object> resources;
    //下面五個屬性會在事務結束后被自動清理,無需我們手動清理
    // 事務監聽者,在事務執行到某個階段的過程中,會去回調監聽者對應的回調接口(典型觀察者模式的應用),默認為空集合
    private Set<TransactionSynchronization> synchronizations;
    // 存放當前事務名字
    private String currentTransactionName;
    // 存放當前事務是否是只讀事務
    private Boolean currentTransactionReadOnly;
    // 存放當前事務的隔離級別
    private Integer currentTransactionIsolationLevel;
    // 存放當前事務是否處于激活狀態
    private Boolean actualTransactionActive;

    /**
     * 對事務資源進行復制
     *
     * @return TransactionResource
     */
    public static TransactionResource copyTransactionResource() {
        return TransactionResource.builder()
                //返回的是不可變集合
                .resources(TransactionSynchronizationManager.getResourceMap())
                //如果需要注冊事務監聽者,這里記得修改,我們這里不需要,就采用默認負責,spring事務內部默認也是這個值
                .synchronizations(new LinkedHashSet<>()).currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName()).currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly()).currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel()).actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive()).build();
    }

    /**
     * 使用
     */
    public void autoWiredTransactionResource() {
        resources.forEach(TransactionSynchronizationManager::bindResource);
        //如果需要注冊事務監聽者,這里記得修改,我們這里不需要,就采用默認負責,spring事務內部默認也是這個值
        TransactionSynchronizationManager.initSynchronization();
        TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
        TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
        TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
    }

    /**
     * 移除
     */
    public void removeTransactionResource() {
        // 事務結束后默認會移除集合中的DataSource作為key關聯的資源記錄
        // DataSource如果重復移除,unbindResource時會因為不存在此key關聯的事務資源而報錯
        resources.keySet().forEach(key -> {
            if (!(key instanceof DataSource)) {
                TransactionSynchronizationManager.unbindResource(key);
            }
        });
    }
}

5、為何要用事務復制類?而最后提交和回滾的時候也沒用它?

如何不加會怎么樣?

在提交和回滾的時候,會出現異常:

八、總結

經過不懈的努力,終于解決了“異步某線程失敗時,主線程回滾所有異步線程的事務!”這個看起來很簡單的問題。

也是對雙異步入庫系列的一個完結。

通過添加事務,可以有效的控制Excel異步插入數據的準確性。

讀取一個10萬行的Excel的最佳解決方案是:

  • 通過EasyExcel異步讀取Excel;
  • 通過Future獲取異步返回值,比較Excel行數和入庫數,保證數據入庫一致性;
  • 通過CompletableFuture + 自定義ForkJoinPool線程池的方式執行,解決主線程阻塞問題;
  • 根據核心線程數,設置每個線程讀取的Excel數據行數,以達到效率最佳;
  • 通過手動添加事務 + 一個線程一個事務 + 復制事務的方式實現異步事務的有效控制。

責任編輯:姜華 來源: 哪吒編程
相關推薦

2021-05-31 07:30:47

Connectsocket函數

2023-09-18 07:46:28

2013-04-01 15:38:54

異步編程異步編程模型

2024-03-13 14:35:33

Spring事件異步

2024-12-10 00:00:30

ServletTomcat異步

2024-01-25 08:40:12

線程雙異步Java8

2024-01-22 08:52:00

AQS雙異步數據一致性

2012-05-10 09:14:28

Windows 7硬盤

2021-11-01 22:36:04

JavaScript

2009-07-01 13:58:00

JavaScript異

2021-02-04 09:13:03

Session異步線程

2024-01-10 09:44:11

MySQL死鎖

2021-02-02 12:46:36

Spring異步循環

2015-09-16 15:11:58

C#異步編程

2009-11-09 10:50:30

WCF異步調用

2020-11-30 14:40:52

事務系統項目

2022-04-28 08:46:26

異步任務系統高并發

2013-04-01 15:25:41

異步編程異步EMP

2020-10-15 13:29:57

javascript

2014-04-24 09:49:57

Android測試異步任務
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产1区2区在线观看 | 精品成人一区二区 | 国产免费看 | 91精品成人久久 | 一级黄片一级毛片 | 国产激情三区 | 色视频在线播放 | 四虎在线观看 | 亚洲欧美一区二区三区国产精品 | 亚洲一区二区成人 | 国产成人精品免高潮在线观看 | 午夜免费看 | 国产成人精品一区二区三区网站观看 | 久久99久久99久久 | 国产精品久久免费观看 | av免费网站在线观看 | 国产99久久精品一区二区永久免费 | 亚洲一区综合 | 狠狠色网 | 免费视频一区二区 | 国产高清免费在线 | 成人精品免费 | 高清一区二区三区 | 亚洲成人一区二区三区 | 久久在线视频 | 九九九久久国产免费 | 国产日产精品一区二区三区四区 | www.色综合| 国产精品久久久亚洲 | 成人在线精品视频 | 中文字幕亚洲视频 | 九九精品在线 | 国产欧美精品一区二区色综合朱莉 | www.亚洲国产精品 | 成人在线免费看 | 91看片网 | 免费亚洲成人 | 在线观看精品视频网站 | 我我色综合 | 亚洲精品久久久一区二区三区 | 狠狠的日 |