成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

高效讀取大文件,再也不用擔心 OOM 了!

開發 前端
本篇文章我們只是簡單介紹了下,數據從文件讀取幾種方式。數據讀取之后,我們肯定還需要處理,然后最后會存儲到數據庫中或者輸出到另一個文件中。

最近阿粉接到一個需求,需要從文件讀取數據,然后經過業務處理之后存儲到數據庫中。這個需求,說實話不是很難,阿粉很快完成了第一個版本。

[[330058]]

內存讀取

第一個版本,阿粉采用內存讀取的方式,所有的數據首先讀讀取到內存中,程序代碼如下:

  1. Stopwatch stopwatch = Stopwatch.createStarted(); 
  2. // 將全部行數讀取的內存中 
  3. List<String> lines = FileUtils.readLines(new File("temp/test.txt"), Charset.defaultCharset()); 
  4. for (String line : lines) { 
  5.     // pass 
  6. stopwatch.stop(); 
  7. System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s"); 
  8. // 計算內存占用 
  9. logMemory(); 

logMemory方法如下:

  1. MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); 
  2. //堆內存使用情況 
  3. MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage(); 
  4. //初始的總內存 
  5. long totalMemorySize = memoryUsage.getInit(); 
  6. //已使用的內存 
  7. long usedMemorySize = memoryUsage.getUsed(); 
  8.  
  9. System.out.println("Total Memory: " + totalMemorySize / (1024 * 1024) + " Mb"); 
  10. System.out.println("Free Memory: " + usedMemorySize / (1024 * 1024) + " Mb"); 

上述程序中,阿粉使用 Apache Common-Io 開源第三方庫,FileUtils#readLines將會把文件中所有內容,全部讀取到內存中。

這個程序簡單測試并沒有什么問題,但是等拿到真正的數據文件,運行程序,很快程序發生了 OOM。

之所以會發生 OOM,主要原因是因為這個數據文件太大。假設上面測試文件 test.txt總共有 200W 行數據,文件大小為:740MB。

通過上述程序讀取到內存之后,在我的電腦上內存占用情況如下:

 

可以看到一個實際大小為 700 多 M 的文件,讀到內存中占用內存量為 1.5G 之多。而我之前的程序,虛擬機設置內存大小只有 1G,所以程序發生了 OOM。

當然這里最簡單的辦法就是加內存唄,將虛擬機內存設置到 2G,甚至更多。不過機器內存始終有限,如果文件更大,還是沒有辦法全部都加載到內存。

不過仔細一想真的需要將全部數據一次性加載到內存中?

很顯然,不需要!

在上述的場景中,我們將數據到加載內存中,最后不還是一條條處理數據。

所以下面我們將讀取方式修改成逐行讀取。

逐行讀取

逐行讀取的方式比較多,這里阿粉主要介紹兩種方式:

  • BufferReader
  • Apache Commons IO
  • Java8 stream

BufferReader

我們可以使用 BufferReader#readLine 逐行讀取數據。

  1. try (BufferedReader fileBufferReader = new BufferedReader(new FileReader("temp/test.txt"))) { 
  2.     String fileLineContent; 
  3.     while ((fileLineContent = fileBufferReader.readLine()) != null) { 
  4.         // process the line. 
  5.     } 
  6. } catch (FileNotFoundException e) { 
  7.     e.printStackTrace(); 
  8. } catch (IOException e) { 
  9.     e.printStackTrace(); 

Apache Commons IOCommon-IO

中有一個方法 FileUtils#lineIterator可以實現逐行讀取方式,使用代碼如下:

  1. Stopwatch stopwatch = Stopwatch.createStarted(); 
  2. LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name()); 
  3. while (fileContents.hasNext()) { 
  4.     fileContents.nextLine(); 
  5.     //  pass 
  6. logMemory(); 
  7. fileContents.close(); 
  8. stopwatch.stop(); 
  9. System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s"); 

這個方法返回一個迭代器,每次我們都可以獲取的一行數據。

其實我們查看代碼,其實可以發現 FileUtils#lineIterator,其實用的就是 BufferReader,感興趣的同學可以自己查看一下源碼。

由于公號內無法插入外鏈,關注『Java極客技術』,回復『20200610』 獲取源碼

Java8 stream

Java8 Files 類新增了一個 lines,可以返回 Stream我們可以逐行處理數據。

  1. Stopwatch stopwatch = Stopwatch.createStarted(); 
  2. // lines(Path path, Charset cs) 
  3. try (Stream<String> inputStream = Files.lines(Paths.get("temp/test.txt"), StandardCharsets.UTF_8)) { 
  4.     inputStream 
  5.             .filter(str -> str.length() > 5)// 過濾數據 
  6.             .forEach(o -> { 
  7.                 // pass do sample logic 
  8.             }); 
  9. logMemory(); 
  10. stopwatch.stop(); 
  11. System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s"); 

使用這個方法有個好處在于,我們可以方便使用 Stream 鏈式操作,做一些過濾操作。

注意:這里我們使用 try-with-resources 方式,可以安全的確保讀取結束,流可以被安全的關閉。

并發讀取

逐行的讀取的方式,解決我們 OOM 的問題。不過如果數據很多,我們這樣一行行處理,需要花費很多時間。

上述的方式,只有一個線程在處理數據,那其實我們可以多來幾個線程,增加并行度。

下面在上面的基礎上,阿粉就拋磚引玉,介紹下阿粉自己比較常用兩種并行處理方式。

逐行批次打包

第一種方式,先逐行讀取數據,加載到內存中,等到積累一定數據之后,然后再交給線程池異步處理。

  1. @SneakyThrows 
  2. public static void readInApacheIOWithThreadPool() { 
  3.     // 創建一個 最大線程數為 10,隊列最大數為 100 的線程池 
  4.     ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100)); 
  5.     // 使用 Apache 的方式逐行讀取數據 
  6.     LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name()); 
  7.     List<String> lines = Lists.newArrayList(); 
  8.     while (fileContents.hasNext()) { 
  9.         String nextLine = fileContents.nextLine(); 
  10.         lines.add(nextLine); 
  11.         // 讀取到十萬的時候 
  12.         if (lines.size() == 100000) { 
  13.             // 拆分成兩個 50000 ,交給異步線程處理 
  14.             List<List<String>> partition = Lists.partition(lines, 50000); 
  15.             List<Future> futureList = Lists.newArrayList(); 
  16.             for (List<String> strings : partition) { 
  17.                 Future<?> future = threadPoolExecutor.submit(() -> { 
  18.                     processTask(strings); 
  19.                 }); 
  20.                 futureList.add(future); 
  21.             } 
  22.             // 等待兩個線程將任務執行結束之后,再次讀取數據。這樣的目的防止,任務過多,加載的數據過多,導致 OOM 
  23.             for (Future future : futureList) { 
  24.                 // 等待執行結束 
  25.                 future.get(); 
  26.             } 
  27.             // 清除內容 
  28.             lines.clear(); 
  29.         } 
  30.  
  31.     } 
  32.     // lines 若還有剩余,繼續執行結束 
  33.     if (!lines.isEmpty()) { 
  34.         // 繼續執行 
  35.         processTask(lines); 
  36.     } 
  37.   threadPoolExecutor.shutdown(); 
  38.     private static void processTask(List<String> strings) { 
  39.         for (String line : strings) { 
  40.             // 模擬業務執行 
  41.             try { 
  42.                 TimeUnit.MILLISECONDS.sleep(10L); 
  43.             } catch (InterruptedException e) { 
  44.                 e.printStackTrace(); 
  45.             } 
  46.         } 
  47.     } 

上述方法,等到內存的數據到達 10000 的時候,拆封兩個任務交給異步線程執行,每個任務分別處理 50000 行數據。

后續使用 future#get(),等待異步線程執行完成之后,主線程才能繼續讀取數據。

之所以這么做,主要原因是因為,線程池的任務過多,再次導致 OOM 的問題。

大文件拆分成小文件第二種方式,首先我們將一個大文件拆分成幾個小文件,然后使用多個異步線程分別逐行處理數據。

  1. public static void splitFileAndRead() throws Exception { 
  2.     // 先將大文件拆分成小文件 
  3.     List<File> fileList = splitLargeFile("temp/test.txt"); 
  4.     // 創建一個 最大線程數為 10,隊列最大數為 100 的線程池 
  5.     ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100)); 
  6.     List<Future> futureList = Lists.newArrayList(); 
  7.     for (File file : fileList) { 
  8.         Future<?> future = threadPoolExecutor.submit(() -> { 
  9.             try (Stream inputStream = Files.lines(file.toPath(), StandardCharsets.UTF_8)) { 
  10.                 inputStream.forEach(o -> { 
  11.                     // 模擬執行業務 
  12.                     try { 
  13.                         TimeUnit.MILLISECONDS.sleep(10L); 
  14.                     } catch (InterruptedException e) { 
  15.                         e.printStackTrace(); 
  16.                     } 
  17.                 }); 
  18.             } catch (IOException e) { 
  19.                 e.printStackTrace(); 
  20.             } 
  21.         }); 
  22.         futureList.add(future); 
  23.     } 
  24.     for (Future future : futureList) { 
  25.         // 等待所有任務執行結束 
  26.         future.get(); 
  27.     } 
  28.     threadPoolExecutor.shutdown(); 
  29.  
  30.  
  31.  
  32. private static List<File> splitLargeFile(String largeFileName) throws IOException { 
  33.     LineIterator fileContents = FileUtils.lineIterator(new File(largeFileName), StandardCharsets.UTF_8.name()); 
  34.     List<String> lines = Lists.newArrayList(); 
  35.     // 文件序號 
  36.     int num = 1; 
  37.     List<File> files = Lists.newArrayList(); 
  38.     while (fileContents.hasNext()) { 
  39.         String nextLine = fileContents.nextLine(); 
  40.         lines.add(nextLine); 
  41.         // 每個文件 10w 行數據 
  42.         if (lines.size() == 100000) { 
  43.             createSmallFile(lines, num, files); 
  44.             num++; 
  45.         } 
  46.     } 
  47.     // lines 若還有剩余,繼續執行結束 
  48.     if (!lines.isEmpty()) { 
  49.         // 繼續執行 
  50.         createSmallFile(lines, num, files); 
  51.     } 
  52.     return files; 

上述方法,首先將一個大文件拆分成多個保存 10W 行的數據的小文件,然后再將小文件交給線程池異步處理。

由于這里的異步線程每次都是逐行從小文件的讀取數據,所以這種方式不用像上面方法一樣擔心 OOM 的問題。

另外,上述我們使用 Java 代碼,將大文件拆分成小文件。這里阿粉還有一個簡單的辦法,我們可以直接使用下述命令,直接將大文件拆分成小文件:

  1. # 將大文件拆分成 100000 的小文件 
  2.  split -l 100000 test.txt 

后續 Java 代碼只需要直接讀取小文件即可。

總結當我們從文件讀取數據時,如果文件不是很大,我們可以考慮一次性讀取到內存中,然后快速處理。

如果文件過大,我們就沒辦法一次性加載到內存中,所以我們需要考慮逐行讀取,然后處理數據。但是單線程處理數據畢竟有限,所以我們考慮使用多線程,加快處理數據。

本篇文章我們只是簡單介紹了下,數據從文件讀取幾種方式。數據讀取之后,我們肯定還需要處理,然后最后會存儲到數據庫中或者輸出到另一個文件中。

這個過程,說實話比較麻煩,因為我們的數據源文件,可能是 txt,也可能是 excel,這樣我們就需要增加多種讀取方法。同樣的,當數據處理完成之后,也有同樣的問題。

 

不過好在,上述的問題我們可以使用 Spring Batch 完美解決。

 

責任編輯:武曉燕 來源: Java極客技術
相關推薦

2021-12-21 09:05:46

命令Linux敲錯

2015-05-29 09:01:48

2021-08-13 22:38:36

大數據互聯網技術

2019-09-04 10:00:07

手機人臉識別

2015-10-22 10:38:43

Wi-Fi燃氣報警器

2021-06-08 07:48:26

數據 Python開發

2021-06-11 07:14:04

QQ音樂微信翻譯

2022-09-14 08:02:25

加密算法Bcryp

2016-08-09 16:17:37

高德地圖TFBOYS大數據

2023-11-27 17:11:02

數據庫oracle

2020-04-30 09:01:27

路由器安全網絡安全路由器

2024-04-15 00:08:00

MySQLInnoDB數據庫

2020-04-10 09:55:28

Git 工具黑魔法

2023-07-29 22:02:06

MyBatis數據庫配置

2025-04-10 08:03:31

Spring系統

2018-10-11 15:51:32

ChromeGoogle瀏覽器

2020-04-30 09:19:56

Docker容器虛擬機

2022-06-01 10:09:39

Linux網絡延遲

2018-09-19 05:01:01

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲精品国产第一综合99久久 | 久久99久久99久久 | 亚洲精品一 | 精品视频一区二区 | 久久精品亚洲精品国产欧美 | 欧美一级欧美三级在线观看 | 亚洲三区在线 | 日韩免费网站 | 国产精品一区二区三 | 天天干,夜夜操 | 久久久久久国产免费视网址 | 99精品欧美 | 国产亚洲精品精品国产亚洲综合 | 爱爱视频在线观看 | 午夜精品福利视频 | 日本不卡一区 | 国产日韩久久久久69影院 | 免费一区二区三区 | 99在线精品视频 | 成人精品一区 | 亚洲精品视频免费观看 | 成人在线播放网站 | 91久久国产 | 美女视频一区二区 | 国产精品亚洲片在线播放 | 欧美精品一区久久 | 亚洲欧美中文字幕 | 日韩国产欧美一区 | 国产精品免费一区二区三区四区 | 一区二区av | 一区二区三区欧美在线 | 久久久久久国产精品免费免费 | 日韩一区精品 | 久久久av| 精品伊人久久 | 精品自拍视频在线观看 | 五月天婷婷综合 | 国产成人精品999在线观看 | 欧美日韩三级在线观看 | 国产免费又色又爽又黄在线观看 | 日日摸夜夜爽人人添av |