震撼!通過雙重異步,Excel 10萬行數據導入從191秒優化到2秒!
在現代的企業級應用開發中,海量數據的處理效率和并發性能優化是一個非常重要的課題。無論是大規模數據導入、文件解析,還是在分布式系統中處理高并發任務,如何提升系統的處理速度、合理利用計算資源、減少線程上下文切換的開銷,這些都是開發者必須面對的問題。在這一背景下,線程池技術以及異步編程逐漸成為提升系統性能的利器。
本文將深入探討如何通過合理設計線程池和利用異步編程模型,有效優化大規模數據的處理性能。我們將結合 Spring Boot 框架中的 @Async 注解、自定義線程池、以及通過使用 EasyExcel 進行大數據量的 Excel 解析和異步寫入數據庫的場景,詳細說明如何通過分而治之的策略,減少系統的響應時間、提高并發處理能力。同時,還將分析如何基于 CPU 和 IO 密集型任務的特性,來合理設置線程池的核心線程數、最大線程數等參數,以便在實際項目中能夠充分發揮硬件資源的性能。
通常我是這樣做的:
- 使用POI讀取需要導入的Excel文件;
- 將文件名作為表名,列標題作為列名,并將數據拼接成SQL語句;
- 通過JDBC或Mybatis插入到數據庫。
圖片
在操作中,如果文件數量多且數據量大,處理過程可能會非常緩慢。
訪問后,感覺程序沒有響應,但實際上,它正在讀取并插入數據,只是速度很慢。
讀取包含10萬行的Excel文件竟然耗時191秒!
我以為程序卡住了!
private void readXls(String filePath, String filename) throws Exception {
@SuppressWarnings("resource")
XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
// 讀取第一個工作表
XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
// 獲取總行數
int maxRow = sheet.getLastRowNum();
StringBuilder insertBuilder = new StringBuilder();
insertBuilder.append("insert into ").append(filename).append(" ( UUID,");
XSSFRow row = sheet.getRow(0);
for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
insertBuilder.append(row.getCell(i)).append(",");
}
insertBuilder.deleteCharAt(insertBuilder.length() - 1);
insertBuilder.append(" ) values ( ");
StringBuilder stringBuilder = new StringBuilder();
for (int i = 1; i <= maxRow; i++) {
XSSFRow xssfRow = sheet.getRow(i);
String id = "";
String name = "";
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
if (j == 0) {
id = xssfRow.getCell(j) + "";
} else if (j == 1) {
name = xssfRow.getCell(j) + "";
}
}
boolean flag = isExisted(id, name);
if (!flag) {
stringBuilder.append(insertBuilder);
stringBuilder.append('\'').append(uuid()).append('\'').append(",");
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
stringBuilder.append('\'').append(value).append('\'').append(",");
}
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
stringBuilder.append(" )").append("\n");
}
}
List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
int sum = JdbcUtil.executeDML(collect);
}
private static boolean isExisted(String id, String name) {
String sql = "select count(1) as num from " + static_TABLE + " where ID = '" + id + "' and NAME = '" + name + "'";
String num = JdbcUtil.executeSelect(sql, "num");
return Integer.valueOf(num) > 0;
}
private static String uuid() {
return UUID.randomUUID().toString().replace("-", "");
}
如何優化?
優化1:首先,查詢所有數據,將其緩存到map中,然后在插入前做決策。這樣可以大大提高速度。
優化2:如果單個Excel文件太大,可以考慮使用異步和多線程,分批讀取多行并插入數據庫。
圖片
優化3:如果文件太多,可以為每個Excel文件使用一個異步進程,實現雙重異步讀取和插入。
圖片
使用雙重異步處理后,從191秒優化到了2秒,你能相信嗎?
以下是異步讀取Excel文件和批量讀取大Excel文件的關鍵代碼。
異步讀取緩存的Excel Controller類
@RequestMapping(value = "/readExcelCacheAsync", method = RequestMethod.POST)
@ResponseBody
public String readExcelCacheAsync() {
String path = "G:\\Test\\data\\";
try {
// 讀取Excel之前,緩存所有數據
USER_INFO_SET = getUserInfo();
File file = new File(path);
String[] xlsxArr = file.list();
for (int i = 0; i < xlsxArr.length; i++) {
File fileTemp = new File(path + "\\" + xlsxArr[i]);
String filename = fileTemp.getName().replace(".xlsx", "");
readExcelCacheAsyncService.readXls(path + filename + ".xlsx", filename);
}
} catch (Exception e) {
logger.error("|#ReadDBCsv|#Exception: ", e);
return "error";
}
return "success";
}
批量讀取超大Excel文件
@Async("async-executor")
public void readXls(String filePath, String filename) throws Exception {
@SuppressWarnings("resource")
XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
// 讀取第一個工作表
XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
// 總行數
int maxRow = sheet.getLastRowNum();
logger.info(filename + ".xlsx,共 " + maxRow + " 行數據!");
StringBuilder insertBuilder = new StringBuilder();
insertBuilder.append("insert into ").append(filename).append(" ( UUID,");
XSSFRow row = sheet.getRow(0);
for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
insertBuilder.append(row.getCell(i)).append(",");
}
insertBuilder.deleteCharAt(insertBuilder.length() - 1);
insertBuilder.append(" ) values ( ");
int times = maxRow / STEP + 1;
for (int time = 0; time < times; time++) {
int start = STEP * time + 1;
int end = STEP * time + STEP;
if (time == times - 1) {
end = maxRow;
}
if (end + 1 - start > 0) {
readExcelDataAsyncService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder);
}
}
}
異步批量插入數據庫
@Async("async-executor")
public void readXlsCacheAsync(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = start; i <= end; i++) {
XSSFRow xssfRow = sheet.getRow(i);
String id = "";
String name = "";
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
if (j == 0) {
id = xssfRow.getCell(j) + "";
} else if (j == 1) {
name = xssfRow.getCell(j) + "";
}
}
// 在讀取Excel之前,先緩存所有數據,然后做決策
boolean flag = isExisted(id, name);
if (!flag) {
stringBuilder.append(insertBuilder);
stringBuilder.append('\'').append(uuid()).append('\'').append(",");
for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
stringBuilder.append('\'').append(value).append('\'').append(",");
}
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
stringBuilder.append(" )").append("\n");
}
}
List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
if (collect != null && collect.size() > 0) {
int sum = JdbcUtil.executeDML(collect);
}
}
private boolean isExisted(String id, String name) {
return ReadExcelCacheAsyncController.USER_INFO_SET.contains(id + "," + name);
}
異步線程池工具類
@Async 的目的是異步處理任務。
- 在方法上添加 @Async 表明該方法是異步的。
- 在類上添加 @Async 表示該類中的所有方法都是異步的。
- 使用此注解的類必須由 Spring 管理。
- 必須在啟動類或配置類中添加 @EnableAsync 注解,@Async 才能生效。
在使用 @Async 時,如果不指定線程池的名稱,即不自定義線程池,默認會使用一個線程池。這個默認線程池是 Spring 的 SimpleAsyncTaskExecutor。
默認線程池的默認配置如下:
- 默認核心線程數:8。
- 最大線程數:Integer.MAX_VALUE。
- 隊列類型:LinkedBlockingQueue。
- 容量:Integer.MAX_VALUE。
- 空閑線程保留時間:60秒。
- 線程池拒絕策略:AbortPolicy。
從最大線程數可以看出,在并發情況下,線程會無限制地創建。
你也可以通過 yml 文件重新配置:
spring:
task:
execution:
pool:
max-size: 10
core-size: 5
keep-alive: 3s
queue-capacity: 1000
thread-name-prefix: my-executor
你也可以自定義線程池。以下是使用 @Async 自定義線程池的簡單代碼實現:
@EnableAsync // 支持異步操作
@Configuration
public class AsyncTaskConfig {
/**
* 來自 com.google.guava 的線程池
* @return
*/
@Bean("my-executor")
public Executor firstExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-executor").build();
// 獲取 CPU 處理器數量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,
200, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), threadFactory);
threadPool.allowsCoreThreadTimeOut();
return threadPool;
}
/**
* Spring 的線程池
* @return
*/
@Bean("async-executor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 核心線程數
taskExecutor.setCorePoolSize(24);
// 線程池維護的最大線程數,超出核心線程數的線程僅當緩沖隊列滿時才會創建
taskExecutor.setMaxPoolSize(200);
// 緩沖隊列
taskExecutor.setQueueCapacity(50);
// 超出核心線程數的線程空閑時間,超時后將被銷毀
taskExecutor.setKeepAliveSeconds(200);
// 異步方法內部線程名
taskExecutor.setThreadNamePrefix("async-executor-");
/**
* 當線程池的任務緩存隊列已滿,且線程池中的線程數量已達到最大值時,如果還有任務到來,將采用任務拒絕策略。
* 通常有以下四種策略:
* ThreadPoolExecutor.AbortPolicy:拋棄任務并拋出 RejectedExecutionException 異常。
* ThreadPoolExecutor.DiscardPolicy:拋棄任務,但不拋出異常。
* ThreadPoolExecutor.DiscardOldestPolicy:拋棄隊列最前面的任務,然后嘗試執行當前任務(重復此過程)。
* ThreadPoolExecutor.CallerRunsPolicy:重試添加當前任務,自動調用執行方法,直到成功。
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
}
異步失效的原因
- 被 @Async 注解的方法不是 public 的;
- 被 @Async 注解的方法的返回值類型只能是 void 或 Future;
- 被 @Async 注解的方法如果是靜態的也會失效;
- 未添加 @EnableAsync 注解;
- 調用者和被 @Async 注解的方法不能在同一個類中;
- 對異步方法使用 @Transactional 是無效的,但對異步方法內調用的方法加上 @Transactional 是有效的。
線程池中設置核心線程數的問題
我尚未有時間詳細探討:在線程池中設置 CorePoolSize 和 MaxPoolSize 的最適宜和最高效的數量是多少。
借此機會進行了一些測試。
我記得有個關于 CPU 處理器數量的說法
將 CorePoolSize 設置為 CPU 處理器的數量時,效率最高嗎?
// 獲取 CPU 處理器數量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
Runtime.getRuntime().availableProcessors() 會獲取 CPU 核心線程數,代表計算資源。
- 對于 CPU 密集型任務,線程池的大小設置為 N,與 CPU 線程數一致,這可以最大限度地減少線程間的上下文切換。但在實際開發中,一般設置為 N+1,以防止線程由于不可預見的情況而阻塞。如果發生阻塞,多出來的線程可以繼續執行任務,保證 CPU 的高效利用。
- 對于 IO 密集型任務,線程池的大小設置為 2N。這個數值是根據業務壓力測試得出的,或者在不涉及業務時使用推薦值。
實際中,線程池的具體大小需要根據壓力測試以及機器的當前狀態進行調整。
如果線程池過大,會導致 CPU 持續切換,系統整體性能并不會有顯著提高,反而可能會變慢。
我電腦的 CPU 處理器數量為 24。
那么一次讀取多少行效率最高呢?
測試中,Excel 文件包含 10 萬行數據。10 萬 / 24 = 4166,因此我設置為 4200。這是最有效的設置嗎?
測試過程中似乎的確如此。
我記得大家習慣性地將核心線程數(CorePoolSize)和最大線程數(MaxPoolSize)設置為相同的數值,通常是 200。
這只是隨機選擇,還是基于經驗的?
測試發現,當 CorePoolSize 和 MaxPoolSize 都設置為 200 時,最初同時開啟了 150 個線程工作。
為什么會這樣呢?
經過數十次測試后
- 發現核心線程數并沒有太大區別;
- 關鍵是每次讀取和存儲的行數,不能太多,存儲速度會逐漸減慢;
- 也不能太少,如果少于 150 個線程,會導致線程阻塞,反而減慢進程。
IV.使用 EasyExcel 讀取并插入數據庫
我不會寫 EasyExcel 的雙異步優化。大家要記住避免掉進低級勤奮的陷阱。
ReadEasyExcelController
@RequestMapping(value = "/readEasyExcel", method = RequestMethod.POST)
@ResponseBody
public String readEasyExcel() {
try {
String path = "G:\\Test\\data\\";
String[] xlsxArr = new File(path).list();
for (int i = 0; i < xlsxArr.length; i++) {
String filePath = path + xlsxArr[i];
File fileTemp = new File(path + xlsxArr[i]);
String fileName = fileTemp.getName().replace(".xlsx", "");
List<UserInfo> list = new ArrayList<>();
EasyExcel.read(filePath, UserInfo.class, new ReadEasyExeclAsyncListener(readEasyExeclService, fileName, batchCount, list)).sheet().doRead();
}
}catch (Exception e){
logger.error("readEasyExcel Exception:",e);
return "error";
}
return "success";
}
ReadEasyExeclAsyncListener
public ReadEasyExeclService readEasyExeclService;
// 表名
public String TABLE_NAME;
// 批量插入閾值
private int BATCH_COUNT;
// 數據收集
private List<UserInfo> LIST;
public ReadEasyExeclAsyncListener(ReadEasyExeclService readEasyExeclService, String tableName, int batchCount, List<UserInfo> list) {
this.readEasyExeclService = readEasyExeclService;
this.TABLE_NAME = tableName;
this.BATCH_COUNT = batchCount;
this.LIST = list;
}
@Override
public void invoke(UserInfo data, AnalysisContext analysisContext) {
data.setUuid(uuid());
data.setTableName(TABLE_NAME);
LIST.add(data);
if (LIST.size() >= BATCH_COUNT) {
// 批量入庫
readEasyExeclService.saveDataBatch(LIST);
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
if (LIST.size() > 0) {
// 最后一批入庫
readEasyExeclService.saveDataBatch(LIST);
}
}
public static String uuid() {
return UUID.randomUUID().toString().replace("-", "");
}
ReadEasyExeclServiceImpl
@Service
public class ReadEasyExeclServiceImpl implements ReadEasyExeclService {
@Resource
private ReadEasyExeclMapper readEasyExeclMapper;
@Override
public void saveDataBatch(List<UserInfo> list) {
// Insert into the database via mybatis
readEasyExeclMapper.saveDataBatch(list);
// Insert into the database via JDBC
// insertByJdbc(list);
list.clear();
}
private void insertByJdbc(List<UserInfo> list){
List<String> sqlList = new ArrayList<>();
for (UserInfo u : list){
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("insert into ").append(u.getTableName()).append(" ( UUID,ID,NAME,AGE,ADDRESS,PHONE,OP_TIME ) values ( ");
sqlBuilder.append("'").append(ReadEasyExeclAsyncListener.uuid()).append("',")
.append("'").append(u.getId()).append("',")
.append("'").append(u.getName()).append("',")
.append("'").append(u.getAge()).append("',")
.append("'").append(u.getAddress()).append("',")
.append("'").append(u.getPhone()).append("',")
.append("sysdate )");
sqlList.add(sqlBuilder.toString());
}
JdbcUtil.executeDML(sqlList);
}
}
UserInfo
@Data
public class UserInfo {
private String tableName;
private String uuid;
@ExcelProperty(value = "ID")
private String id;
@ExcelProperty(value = "NAME")
private String name;
@ExcelProperty(value = "AGE")
private String age;
@ExcelProperty(value = "ADDRESS")
private String address;
@ExcelProperty(value = "PHONE")
private String phone;
}
結語
在處理高并發、大數據導入等場景時,異步編程和線程池技術提供了一種極具效率的解決方案。通過合理配置線程池的核心線程數、最大線程數、隊列長度等參數,能夠在確保系統穩定性的前提下,大幅提升并發處理能力。而通過異步編程,我們可以有效避免線程阻塞、減少資源浪費,并讓系統在面對大量請求時依然能夠保持較高的響應速度。
本文的示例通過 Spring Boot 的 @Async 注解和自定義線程池,在實際的 EasyExcel 大數據導入場景下,驗證了這種技術組合的高效性和實用性。此外,通過對 CPU 密集型任務和 IO 密集型任務的深入分析,開發者能夠根據自身項目的特點,選擇合適的線程池配置策略,最大化資源利用率和性能表現。
在實際應用中,線程池和異步編程不僅適用于大數據導入,還可以推廣到包括文件處理、網絡請求、日志處理等各類需要并發處理的場景中。因此,掌握并靈活運用這些技術,將為我們的系統性能優化提供堅實的基礎,使我們能夠應對更復雜、更苛刻的業務需求。