Java文件的簡單讀寫、隨機讀寫、NIO讀寫與使用MappedByteBuffer讀寫
文件與目錄的創建和刪除較為簡單,因此忽略這部分內容的介紹,我們重點學習文件的讀寫。本篇內容包括:
- 簡單文件讀寫
- 隨機訪問文件讀寫
- NIO文件讀寫-FileChannel
- 使用MappedByteBuffer讀寫文件
簡單文件讀寫
FileOutputStream
由于流是單向的,簡單文件寫可使用FileOutputStream,而讀文件則使用FileInputStream。
任何數據輸出到文件都是以字節為單位輸出,包括圖片、音頻、視頻。以圖片為例,如果沒有圖片格式解析器,那么圖片文件其實存儲的就只是按某種格式存儲的字節數據罷了。
FileOutputStream指文件字節輸出流,用于將字節數據輸出到文件,僅支持順序寫入、支持以追加方式寫入,但不支持在指定位置寫入。
打開一個文件輸出流并寫入數據的示例代碼如下。
- public class FileOutputStreamStu{
- public void testWrite(byte[] data) throws IOException {
- try(FileOutputStream fos = new FileOutputStream("/tmp/test.file",true)) {
- fos.write(data);
- fos.flush();
- }
- }
- }
注意,如果不指定追加方式打開流,new FileOutputStream時會導致文件內容被清空,而FileOutputStream的默認構建函數是以非追加模式打開流的。
FileOutputStream的參數1為文件名,參數2為是否以追加模式打開流,如果為true,則字節將寫入文件的尾部而不是開頭。
調用flush方法目的是在流關閉之前清空緩沖區數據,實際上使用FileOutputStream并不需要調用flush方法,此處的刷盤指的是將緩存在JVM內存中的數據調用系統函數write寫入。如BufferedOutputStream,在調用BufferedOutputStream方法時,如果緩存未滿,實際上是不會調用系統函數write的,如下代碼所示。
- public class BufferedOutputStream extends FilterOutputStream {
- public synchronized void write(byte b[], int off, int len) throws IOException {
- if (len >= buf.length) {
- flushBuffer();
- out.write(b, off, len);
- return;
- }
- if (len > buf.length - count) {
- flushBuffer();
- }
- System.arraycopy(b, off, buf, count, len); // 只寫入緩存
- count += len;
- }
- }
FileInputStream
FileInputStream指文件字節輸入流,用于將文件中的字節數據讀取到內存中,僅支持順序讀取,不可跳躍讀取。
打開一個文件輸入流讀取數據的案例代碼如下。
- public class FileInputStreamStu{
- public void testRead() throws IOException {
- try (FileInputStream fis = new FileInputStream("/tmp/test/test.log")) {
- byte[] buf = new byte[1024];
- int realReadLength = fis.read(buf);
- }
- }
- }
其中buf數組中下標從0到realReadLength的字節數據就是實際讀取的數據,如果realReadLength返回-1,則說明已經讀取到文件尾并且未讀取到任何數據。
當然,我們還可以一個字節一個字節的讀取,如下代碼所示。
- public class FileInputStreamStu{
- public void testRead() throws IOException {
- try (FileInputStream fis = new FileInputStream("/tmp/test/test.log")) {
- int byteData = fis.read(); // 返回值取值范圍:[-1,255]
- if (byteData == -1) {
- return; // 讀取到文件尾了
- }
- byte data = (byte) byteData;
- // data為讀取到的字節數據
- }
- }
- }
至于讀取到的字節數據如何使用就需要看你文件中存儲的是什么數據了。
如果整個文件存儲的是一張圖片,那么需要將整個文件讀取完,再按格式解析成圖片,而如果整個文件是配置文件,則可以一行一行讀取,遇到\n換行符則為一行,代碼如下。
- public class FileInputStreamStu{
- @Test
- public void testRead() throws IOException {
- try (FileInputStream fis = new FileInputStream("/tmp/test/test.log")) {
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- int byteData;
- while ((byteData = fis.read()) != -1) {
- if (byteData == '\n') {
- buffer.flip();
- String line = new String(buffer.array(), buffer.position(), buffer.limit());
- System.out.println(line);
- buffer.clear();
- continue;
- }
- buffer.put((byte) byteData);
- }
- }
- }
- }
Java基于InputStream、OutputStream還提供了很多的API方便讀寫文件,如BufferedReader,但如果懶得去記這些API的話,只需要記住FileInputStream與FileOutputStream就夠了。
隨機訪問文件讀寫
RandomAccessFile相當于是FileInputStream與FileOutputStream的封裝結合,即可以讀也可以寫,并且RandomAccessFile支持移動到文件指定位置處開始讀或寫。
RandomAccessFile的使用如下。
- public class RandomAccessFileStu{
- public void testRandomWrite(long index,long offset){
- try (RandomAccessFile randomAccessFile = new RandomAccessFile("/tmp/test.idx", "rw")) {
- randomAccessFile.seek(index * indexLength());
- randomAccessFile.write(toByte(index));
- randomAccessFile.write(toByte(offset));
- }
- }
- }
RandomAccessFile構建方法:參數1為文件路徑,參數2為模式,'r'為讀,'w'為寫;
seek方法:在linux、unix操作系統下就是調用系統的lseek函數。
RandomAccessFile的seek方法通過調用native方法實現,源碼如下。
- JNIEXPORT void JNICALL
- Java_java_io_RandomAccessFile_seek0(JNIEnv *env,
- jobject this, jlong pos) {
- FD fd;
- fd = GET_FD(this, raf_fd);
- if (fd == -1) {
- JNU_ThrowIOException(env, "Stream Closed");
- return;
- }
- if (pos < jlong_zero) {
- JNU_ThrowIOException(env, "Negative seek offset");
- }
- // #define IO_Lseek lseek
- else if (IO_Lseek(fd, pos, SEEK_SET) == -1) {
- JNU_ThrowIOExceptionWithLastError(env, "Seek failed");
- }
- }
Java_java_io_RandomAccessFile_seek0函數的參數1表示RandomAccessFile對象,參數2表示偏移量。函數中調用的IO_Lseek方法實際是操作系統的lseek方法。
RandomAccessFile提供的讀、寫、指定偏移量其實都是通過調用操作系統函數完成的,包括前面介紹的文件輸入流和文件輸出流也不例外。
NIO文件讀寫-FileChannel
Channel(通道)表示IO源與目標打開的連接,Channel類似于傳統的流,但Channel本身不能直接訪問數據,只能與Buffer進行交互。Channel(通道)主要用于傳輸數據,從緩沖區的一側傳到另一側的實體(如File、Socket),支持雙向傳遞。
正如SocketChannel是客戶端與服務端通信的通道,FileChannel就是我們讀寫文件的通道。FileChannel是線程安全的,也就是一個FileChannel可以被多個線程使用。對于多線程操作,同時只會有一個線程能對該通道所在文件進行修改。如果需要確保多線程的寫入順序,就必須要轉為隊列寫入。
FileChannel可通過FileOutputStream、FileInputStream、RandomAccessFile獲取,也可以通過FileChannel#open方法打開一個通道。
以通過FileOutputStream獲取FileChannel為例,通過FileOutputStream或RandomAccessFile獲取FileChannel方法相同,代碼如下。
- public class FileChannelStu{
- public void testGetFileCahnnel(){
- try(FileOutputStream fos = new FileOutputStream("/tmp/test.log");
- FileChannel fileChannel = fos.getChannel()){
- // do....
- }catch (IOException exception){
- }
- }
- }
需要注意,通過FileOutputStream獲取的FileChannel只能執行寫操作,通過FileInputStream獲取的FileChannel只能執行讀操作,原因可查看getChannel方法源碼。
通過FileOutputStream或FileInputStream或RandomAccessFile打開的FileChannel,在流關閉時也會被關閉,可查看這幾個類的close方法源碼。
若想要獲取一個同時支持讀和寫的FileChannel需要通過open方法打開,代碼如下。
- public class FileChannelStu{
- public void testOpenFileCahnnel(){
- FileChannel channel = FileChannel.open(
- Paths.get(URI.create("file:" + rootPath + "/" + postion.fileName)),
- StandardOpenOption.READ,StandardOpenOption.WRITE);
- // do....
- channel.close();
- }
- }
open方法第二個變長參數傳StandardOpenOption.READ和StandardOpenOption.WRITE即可打開一個雙向讀寫的通道。
FileChannel允許對文件加鎖,文件鎖是進程級別的,不是線程級別的,文件鎖可以解決多個進程并發訪問、修改同一個文件的問題。文件鎖會被當前進程持有,一旦獲取到文件鎖就要調用一次release釋放鎖,當關閉對應的FileChannel對象時或當前JVM進程退出時,鎖也會自動被釋鎖。
文件鎖的使用案例代碼如下。
- public class FileChannelStu{
- public void testFileLock(){
- FileChannel channel = this.channel;
- FileLock fileLock = null;
- try {
- fileLock = channel.lock();// 獲取文件鎖
- // 執行寫操作
- channel.write(...);
- channel.write(...);
- } finally {
- if (fileLock != null) {
- fileLock.release(); // 釋放文件鎖
- }
- }
- }
- }
當然,只要我們能確保同時只有一個進程對文件執行寫操作,那么就不需要鎖文件。RocketMQ也并沒有使用文件鎖,因為每個Broker有自己數據目錄,即使一臺機器上部署多個Broker也不會有多個進程對同一個日記文件操作的情況。
上面例子去掉文件鎖后代碼如下。
- public class FileChannelStu{
- public void testWrite(){
- FileChannel channel = this.channel;
- channel.write(...);
- channel.write(...);
- }
- }
這里還存在一個問題,就是并發寫數據問題。雖然FileChannel是線程安全的,但兩次write并不是原子性操作,如果要確保兩次write是連續寫入的,還必須要加鎖。在RocketMQ中,通過引用計數器替代了鎖。
FileChannel提供的force方法用于刷盤,即調用操作系統的fsync函數,使用如下。
- public class FileChannelStu{
- public void closeChannel(){
- this.channel.force(true);
- this.channel.close();
- }
- }
force方法的參數表示除強制寫入內容更改外,文件元數據的更改是否也強制寫入。后面使用MappedByteBuffer時,可直接使用MappedByteBuffer的force方法。
FileChannel的force方法最終調用的C方法源碼如下:
- JNIEXPORT jint JNICALL
- Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this,
- jobject fdo, jboolean md)
- {
- jint fd = fdval(env, fdo);
- int result = 0;
- if (md == JNI_FALSE) {
- result = fdatasync(fd);
- } else {
- result = fsync(fd);
- }
- return handle(env, result, "Force failed");
- }
參數md對應調用force方法傳遞的metaData參數。
使用FileChannel支持seek(position)到指定位置讀或寫數據,代碼如下。
- public class FileChannelStu{
- public void testSeekWrite(){
- FileChannel channel = this.channel;
- synchronized (channel) {
- channel.position(100);
- channel.write(ByteBuffer.wrap(toByte(index)));
- channel.write(ByteBuffer.wrap(toByte(offset)));
- }
- }
- }
上述例子的作用是將指針移動到物理偏移量100byte位置處,順序寫入index和offset。讀取同理,代碼如下。
- public class FileChannelStu{
- public void testSeekRead(){
- FileChannel channel = this.channel;
- synchronized (channel) {
- channel.position(100);
- ByteBuffer buffer = ByteBuffer.allocate(16);
- int realReadLength = channel.read(buffer);
- if(realReadLength==16){
- long index = buffer.getLong();
- long offset = buffer.getLong();
- }
- }
- }
- }
其中read方法返回的是實際讀取的字節數,如果返回-1則代表已經是文件尾部了,沒有剩余內容可讀取。
使用MappedByteBuffer讀寫文件
MappedByteBuffer是Java提供的基于操作系統虛擬內存映射(MMAP)技術的文件讀寫API,底層不再通過read、write、seek等系統調用實現文件的讀寫。
我們需要通過FileChannel#map方法將文件的一個區域映射到內存中,代碼如下。
- public class MappedByteBufferStu{
- @Test
- public void testMappedByteBuffer() throws IOException {
- FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/tmp/test/test.log")),
- StandardOpenOption.WRITE, StandardOpenOption.READ);
- MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
- fileChannel.close();
- mappedByteBuffer.position(1024);
- mappedByteBuffer.putLong(10000L);
- mappedByteBuffer.force();
- }
- }
上面代碼的功能是通過FileChannel將文件[0~4096)區域映射到內存中,調用FileChannel的map方法返回MappedByteBuffer,在映射之后關閉通道,隨后在指定位置處寫入一個8字節的long類型整數,最后調用force方法將寫入數據從內存寫回磁盤(刷盤)。
映射一旦建立了,就不依賴于用于創建它的文件通道,因此在創建MappedByteBuffer之后我們就可以關閉通道了,對映射的有效性沒有影響。
實際上將文件映射到內存比通過read、write系統調用方法讀取或寫入幾十KB的數據要昂貴,從性能的角度來看,MappedByteBuffer適合用于將大文件映射到內存中,如上百M、上GB的大文件。
FileChannel的map方法有三個參數:
- MapMode:映射模式,可取值有READ_ONLY(只讀映射)、READ_WRITE(讀寫映射)、PRIVATE(私有映射),READ_ONLY只支持讀,READ_WRITE支持讀寫,而PRIVATE只支持在內存中修改,不會寫回磁盤;
- position和size:映射區域,可以是整個文件,也可以是文件的某一部分,單位為字節。
需要注意的是,如果FileChannel是只讀模式,那么map方法的映射模式就不能指定為READ_WRITE。如果文件是剛剛創建的,只要映射成功,文件的大小就會變成(0+position+size)。
通過MappedByteBuffer讀取數據示例如下:
- public class MappedByteBufferStu{
- @Test
- public void testMappedByteBufferOnlyRead() throws IOException {
- FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/tmp/test/test.log")),
- StandardOpenOption.READ);
- MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 4096);
- fileChannel.close();
- mappedByteBuffer.position(1024);
- long value = mappedByteBuffer.getLong();
- System.out.println(value);
- }
- }
mmap繞過了read、write系統函數調用,繞過了一次數據從內核空間到用戶空間的拷貝,即實現零拷貝,MappedByteBuffer使用直接內存而非JVM的堆內存。
mmap只是在虛擬內存分配了地址空間,只有在第一次訪問虛擬內存的時候才分配物理內存。在mmap之后,并沒有將文件內容加載到物理頁上,而是在虛擬內存中分配地址空間,當進程在訪問這段地址時,通過查找頁表,發現虛擬內存對應的頁沒有在物理內存中緩存則產生缺頁中斷,由內核的缺頁異常處理程序處理,將文件對應內容以頁為單位(4096)加載到物理內存中。
由于物理內存是有限的,mmap在寫入數據超過物理內存時,操作系統會進行頁置換,根據淘汰算法,將需要淘汰的頁置換成所需的新頁,所以mmap對應的內存是可以被淘汰的,被淘汰的內存頁如果是臟頁(有過寫操作修改頁內容),則操作系統會先將數據回寫磁盤再淘汰該頁。
數據寫過程如下:
- 1.將需要寫入的數據寫到對應的虛擬內存地址;
- 2.若對應的虛擬內存地址未對應物理內存,則產生缺頁中斷,由內核加載頁數據到物理內存;
- 3.數據被寫入到虛擬內存對應的物理內存;
- 4.在發生頁淘汰或刷盤時由操作系統將臟頁回寫到磁盤。
RocketMQ正是利用MappedByteBuffer實現索引文件的讀寫,實現一個基于文件系統的HashMap。
RocketMQ在創建新的CommitLog文件并通過FileChannel獲取MappedByteBuffer時會做一次預熱操作,即每個虛擬內存頁(Page Cache)都寫入四個字節的0x00,并強制刷盤將數據寫到文件中。這個動作的用處是通過讀寫操作把MMAP映射全部加載到物理內存中。并且在預熱之后還做了一個鎖住內存的操作,這是為了避免磁盤交換,防止操作系統把預熱過的頁臨時保存到swap區,防止程序再次讀取交換出去的數據頁時產生缺頁中斷。
參考文獻
【深入淺出Linux】關于mmap的解析
本文轉載自微信公眾號「Java藝術」,可以通過以下二維碼關注。轉載本文請聯系Java藝術公眾號。