Java I/O體系從原理到應用,這一篇說清楚了
本文介紹操作系統I/O工作原理,Java I/O設計,基本使用,開源項目中實現高性能I/O常見方法和實現,徹底搞懂高性能I/O之道。
一、基礎概念
在介紹I/O原理之前,先重溫幾個基礎概念:
1. 操作系統與內核
操作系統:管理計算機硬件與軟件資源的系統軟件
內核:操作系統的核心軟件,負責管理系統的進程、內存、設備驅動程序、文件和網絡系統等等,為應用程序提供對計算機硬件的安全訪問服務
2. 內核空間和用戶空間
為了避免用戶進程直接操作內核,保證內核安全,操作系統將內存尋址空間劃分為兩部分:內核空間(Kernel-space),供內核程序使用用戶空間(User-space),供用戶進程使用 為了安全,內核空間和用戶空間是隔離的,即使用戶的程序崩潰了,內核也不受影響。
3. 數據流
計算機中的數據是基于隨著時間變換高低電壓信號傳輸的,這些數據信號連續不斷,有著固定的傳輸方向,類似水管中水的流動,因此抽象數據流(I/O流)的概念:指一組有順序的、有起點和終點的字節集合,抽象出數據流的作用:實現程序邏輯與底層硬件解耦,通過引入數據流作為程序與硬件設備之間的抽象層,面向通用的數據流輸入輸出接口編程,而不是具體硬件特性,程序和底層硬件可以獨立靈活替換和擴展。
二、I/O 工作原理
1. 磁盤I/O
典型I/O讀寫磁盤工作原理如下:
tips: DMA:全稱叫直接內存存取(Direct Memory Access),是一種允許外圍設備(硬件子系統)直接訪問系統主內存的機制。基于 DMA 訪問方式,系統主內存與硬件設備的數據傳輸可以省去CPU 的全程調度。
值得注意的是:
- 讀寫操作基于系統調用實現
- 讀寫操作經過用戶緩沖區,內核緩沖區,應用進程并不能直接操作磁盤
- 應用進程讀操作時需阻塞直到讀取到數據
2. 網絡I/O
這里先以最經典的阻塞式I/O模型介紹:
tips:recvfrom,經socket接收數據的函數
值得注意的是:
- 網絡I/O讀寫操作經過用戶緩沖區,Sokcet緩沖區
- 服務端線程在從調用recvfrom開始到它返回有數據報準備好這段時間是阻塞的,recvfrom返回成功后,線程開始處理數據報
三、Java I/O設計
1. I/O分類
Java中對數據流進行具體化和實現,關于Java數據流一般關注以下幾個點:
- 流的方向從外部到程序,稱為輸入流;從程序到外部,稱為輸出流
- 流的數據單位程序以字節作為最小讀寫數據單元,稱為字節流,以字符作為最小讀寫數據單元,稱為字符流
- 流的功能角色
從/向一個特定的IO設備(如磁盤,網絡)或者存儲對象(如內存數組)讀/寫數據的流,稱為節點流;
對一個已有流進行連接和封裝,通過封裝后的流來實現數據的讀/寫功能,稱為處理流(或稱為過濾流)。
2. I/O操作接口
java.io包下有一堆I/O操作類,初學時看了容易搞不懂,其實仔細觀察其中還是有規律:這些I/O操作類都是在繼承4個基本抽象流的基礎上,要么是節點流,要么是處理流。
(1) 四個基本抽象流
java.io包中包含了流式I/O所需要的所有類,java.io包中有四個基本抽象流,分別處理字節流和字符流:
- InputStream
- OutputStream
- Reader
- Writer
(2) 節點流
節點流I/O類名由節點流類型 + 抽象流類型組成,常見節點類型有:
- File文件
- Piped 進程內線程通信管道
- ByteArray / CharArray (字節數組 / 字符數組)
- StringBuffer / String (字符串緩沖區 / 字符串)
節點流的創建通常是在構造函數傳入數據源,例如:
- FileReader reader = new FileReader(new File("file.txt"));
- FileWriter writer = new FileWriter(new File("file.txt"));
(3) 處理流
處理流I/O類名由對已有流封裝的功能 + 抽象流類型組成,常見功能有:
- 緩沖:對節點流讀寫的數據提供了緩沖的功能,數據可以基于緩沖批量讀寫,提高效率。常見有BufferedInputStream、BufferedOutputStream
- 字節流轉換為字符流:由InputStreamReader、OutputStreamWriter實現
- 字節流與基本類型數據相互轉換:這里基本數據類型數據如int、long、short,由DataInputStream、DataOutputStream實現
- 字節流與對象實例相互轉換:用于實現對象序列化,由ObjectInputStream、ObjectOutputStream實現
處理流的應用了適配器/裝飾模式,轉換/擴展已有流,處理流的創建通常是在構造函數傳入已有的節點流或處理流:
- FileOutputStream fileOutputStream = new FileOutputStream("file.txt");
- // 擴展提供緩沖寫
- BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
- // 擴展提供提供基本數據類型寫
- DataOutputStream out = new DataOutputStream(bufferedOutputStream);
3. Java NIO
(1) 標準I/O存在問題
Java NIO(New I/O)是一個可以替代標準Java I/O API的IO API(從Java 1.4開始),Java NIO提供了與標準I/O不同的I/O工作方式,目的是為了解決標準 I/O存在的以下問題:
A. 數據多次拷貝
標準I/O處理,完成一次完整的數據讀寫,至少需要從底層硬件讀到內核空間,再讀到用戶文件,又從用戶空間寫入內核空間,再寫入底層硬件。
此外,底層通過write、read等函數進行I/O系統調用時,需要傳入數據所在緩沖區起始地址和長度由于JVM GC的存在,導致對象在堆中的位置往往會發生移動,移動后傳入系統函數的地址參數就不是真正的緩沖區地址了。
可能導致讀寫出錯,為了解決上面的問題,使用標準I/O進行系統調用時,還會額外導致一次數據拷貝:把數據從JVM的堆內拷貝到堆外的連續空間內存(堆外內存)。
所以總共經歷6次數據拷貝,執行效率較低。
B. 操作阻塞
傳統的網絡I/O處理中,由于請求建立連接(connect),讀取網絡I/O數據(read),發送數據(send)等操作是線程阻塞的。
- // 等待連接
- Socket socket = serverSocket.accept();
- // 連接已建立,讀取請求消息
- StringBuilder req = new StringBuilder();
- byte[] recvByteBuf = new byte[1024];
- int len;
- while ((len = socket.getInputStream().read(recvByteBuf)) != -1) {
- req.append(new String(recvByteBuf, 0, len, StandardCharsets.UTF_8));
- }
- // 寫入返回消息
- socket.getOutputStream().write(("server response msg".getBytes()));
- socket.shutdownOutput();
以上面服務端程序為例,當請求連接已建立,讀取請求消息,服務端調用read方法時,客戶端數據可能還沒就緒(例如客戶端數據還在寫入中或者傳輸中),線程需要在read方法阻塞等待直到數據就緒。
為了實現服務端并發響應,每個連接需要獨立的線程單獨處理,當并發請求量大時為了維護連接,內存、線程切換開銷過大。
(2) Buffer
Java NIO核心三大核心組件是Buffer(緩沖區)、Channel(通道)、Selector。
Buffer提供了常用于I/O操作的字節緩沖區,常見的緩存區有ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,分別對應基本數據類型: byte, char, double, float, int, long, short,下面介紹主要以最常用的ByteBuffer為例,Buffer底層支持Java堆外內存和堆內內存。
堆外內存是指與堆內存相對應的,把內存對象分配在JVM堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機,相比堆內內存,I/O操作中使用堆外內存的優勢在于:
- 不用被JVM GC線回收,減少GC線程資源占有
- 在I/O系統調用時,直接操作堆外內存,可以節省一次堆外內存和堆內內存的復制
ByteBuffer底層基于堆外內存的分配和釋放基于malloc和free函數,對外allocateDirect方法可以申請分配堆外內存,并返回繼承ByteBuffer類的DirectByteBuffer對象:
- public static ByteBuffer allocateDirect(int capacity) {
- return new DirectByteBuffer(capacity);
- }
堆外內存的回收基于DirectByteBuffer的成員變量Cleaner類,提供clean方法可以用于主動回收,Netty中大部分堆外內存通過記錄定位Cleaner的存在,主動調用clean方法來回收;另外,當DirectByteBuffer對象被GC時,關聯的堆外內存也會被回收。
tips:JVM參數不建議設置-XX:+DisableExplicitGC,因為部分依賴Java NIO的框架(例如Netty)在內存異常耗盡時,會主動調用System.gc(),觸發Full GC,回收DirectByteBuffer對象,作為回收堆外內存的最后保障機制,設置該參數之后會導致在該情況下堆外內存得不到清理。
堆外內存基于基礎ByteBuffer類的DirectByteBuffer類成員變量:Cleaner對象,這個Cleaner對象會在合適的時候執行unsafe.freeMemory(address),從而回收這塊堆外內存。
Buffer可以見到理解為一組基本數據類型,存儲地址連續的的數組,支持讀寫操作,對應讀模式和寫模式,通過幾個變量來保存這個數據的當前位置狀態:capacity、 position、 limit:
- capacity 緩沖區數組的總長度
- position 下一個要操作的數據元素的位置
- limit 緩沖區數組中不可操作的下一個元素的位置:limit <= capacity
(3) Channel
Channel(通道)的概念可以類比I/O流對象,NIO中I/O操作主要基于Channel:從Channel進行數據讀取 :創建一個緩沖區,然后請求Channel讀取數據 從Channel進行數據寫入 :創建一個緩沖區,填充數據,請求Channel寫入數據。
Channel和流非常相似,主要有以下幾點區別:
- Channel可以讀和寫,而標準I/O流是單向的
- Channel可以異步讀寫,標準I/O流需要線程阻塞等待直到讀寫操作完成
- Channel總是基于緩沖區Buffer讀寫
Java NIO中最重要的幾個Channel的實現:
- FileChannel:用于文件的數據讀寫,基于FileChannel提供的方法能減少讀寫文件數據拷貝次數,后面會介紹
- DatagramChannel:用于UDP的數據讀寫
- SocketChannel:用于TCP的數據讀寫,代表客戶端連接
- ServerSocketChannel:監聽TCP連接請求,每個請求會創建會一個SocketChannel,一般用于服務端
基于標準I/O中,我們第一步可能要像下面這樣獲取輸入流,按字節把磁盤上的數據讀取到程序中,再進行下一步操作,而在NIO編程中,需要先獲取Channel,再進行讀寫。
- FileInputStream fileInputStream = new FileInputStream("test.txt");
- FileChannel channel = fileInputStream.channel();
tips: FileChannel僅能運行在阻塞模式下,文件異步處理的 I/O 是在JDK 1.7 才被加入的 java.nio.channels.AsynchronousFileChannel。
- // server socket channel:
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 9091));
- while (true) {
- SocketChannel socketChannel = serverSocketChannel.accept();
- ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
- int readBytes = socketChannel.read(buffer);
- if (readBytes > 0) {
- // 從寫數據到buffer翻轉為從buffer讀數據
- buffer.flip();
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- String body = new String(bytes, StandardCharsets.UTF_8);
- System.out.println("server 收到:" + body);
- }
- }
(4) Selector
Selector(選擇器) ,它是Java NIO核心組件中的一個,用于檢查一個或多個NIO Channel(通道)的狀態是否處于可讀、可寫。實現單線程管理多個Channel,也就是可以管理多個網絡連接。
Selector核心在于基于操作系統提供的I/O復用功能,單個線程可以同時監視多個連接描述符,一旦某個連接就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作,常見有select、poll、epoll等不同實現。
Java NIO Selector基本工作原理如下:
- 初始化Selector對象,服務端ServerSocketChannel對象
- 向Selector注冊ServerSocketChannel的socket-accept事件
- 線程阻塞于selector.select(),當有客戶端請求服務端,線程退出阻塞
- 基于selector獲取所有就緒事件,此時先獲取到socket-accept事件,向Selector注冊客戶端SocketChannel的數據就緒可讀事件事件
- 線程再次阻塞于selector.select(),當有客戶端連接數據就緒,可讀
- 基于ByteBuffer讀取客戶端請求數據,然后寫入響應數據,關閉channel
示例如下,完整可運行代碼已經上傳github(https://github.com/caison/caison-blog-demo):
- Selector selector = Selector.open();
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.bind(new InetSocketAddress(9091));
- // 配置通道為非阻塞模式
- serverSocketChannel.configureBlocking(false);
- // 注冊服務端的socket-accept事件
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- while (true) {
- // selector.select()會一直阻塞,直到有channel相關操作就緒
- selector.select();
- // SelectionKey關聯的channel都有就緒事件
- Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
- while (keyIterator.hasNext()) {
- SelectionKey key = keyIterator.next();
- // 服務端socket-accept
- if (key.isAcceptable()) {
- // 獲取客戶端連接的channel
- SocketChannel clientSocketChannel = serverSocketChannel.accept();
- // 設置為非阻塞模式
- clientSocketChannel.configureBlocking(false);
- // 注冊監聽該客戶端channel可讀事件,并為channel關聯新分配的buffer
- clientSocketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocateDirect(1024));
- }
- // channel可讀
- if (key.isReadable()) {
- SocketChannel socketChannel = (SocketChannel) key.channel();
- ByteBuffer buf = (ByteBuffer) key.attachment();
- int bytesRead;
- StringBuilder reqMsg = new StringBuilder();
- while ((bytesRead = socketChannel.read(buf)) > 0) {
- // 從buf寫模式切換為讀模式
- buf.flip();
- int bufbufRemain = buf.remaining();
- byte[] bytes = new byte[bufRemain];
- buf.get(bytes, 0, bytesRead);
- // 這里當數據包大于byteBuffer長度,有可能有粘包/拆包問題
- reqMsg.append(new String(bytes, StandardCharsets.UTF_8));
- buf.clear();
- }
- System.out.println("服務端收到報文:" + reqMsg.toString());
- if (bytesRead == -1) {
- byte[] bytes = "[這是服務回的報文的報文]".getBytes(StandardCharsets.UTF_8);
- int length;
- for (int offset = 0; offset < bytes.length; offset += length) {
- length = Math.min(buf.capacity(), bytes.length - offset);
- buf.clear();
- buf.put(bytes, offset, length);
- buf.flip();
- socketChannel.write(buf);
- }
- socketChannel.close();
- }
- }
- // Selector不會自己從已selectedKeys中移除SelectionKey實例
- // 必須在處理完通道時自己移除 下次該channel變成就緒時,Selector會再次將其放入selectedKeys中
- keyIterator.remove();
- }
- }
tips: Java NIO基于Selector實現高性能網絡I/O這塊使用起來比較繁瑣,使用不友好,一般業界使用基于Java NIO進行封裝優化,擴展豐富功能的Netty框架來優雅實現。
四、高性能I/O優化
下面結合業界熱門開源項目介紹高性能I/O的優化。
1. 零拷貝
零拷貝(zero copy)技術,用于在數據讀寫中減少甚至完全避免不必要的CPU拷貝,減少內存帶寬的占用,提高執行效率,零拷貝有幾種不同的實現原理,下面介紹常見開源項目中零拷貝實現。
(1) Kafka零拷貝
Kafka基于Linux 2.1內核提供,并在2.4 內核改進的的sendfile函數 + 硬件提供的DMA Gather Copy實現零拷貝,將文件通過socket傳送。
函數通過一次系統調用完成了文件的傳送,減少了原來read/write方式的模式切換。同時減少了數據的copy, sendfile的詳細過程如下:
基本流程如下:
- 用戶進程發起sendfile系統調用
- 內核基于DMA Copy將文件數據從磁盤拷貝到內核緩沖區
- 內核將內核緩沖區中的文件描述信息(文件描述符,數據長度)拷貝到Socket緩沖區
- 內核基于Socket緩沖區中的文件描述信息和DMA硬件提供的Gather Copy功能將內核緩沖區數據復制到網卡
- 用戶進程sendfile系統調用完成并返回
相比傳統的I/O方式,sendfile + DMA Gather Copy方式實現的零拷貝,數據拷貝次數從4次降為2次,系統調用從2次降為1次,用戶進程上下文切換次數從4次變成2次DMA Copy,大大提高處理效率。
Kafka底層基于java.nio包下的FileChannel的transferTo:
- public abstract long transferTo(long position, long count, WritableByteChannel target)
transferTo將FileChannel關聯的文件發送到指定channel,當Comsumer消費數據,Kafka Server基于FileChannel將文件中的消息數據發送到SocketChannel。
A. RocketMQ零拷貝
RocketMQ基于mmap + write的方式實現零拷貝:mmap() 可以將內核中緩沖區的地址與用戶空間的緩沖區進行映射,實現數據共享,省去了將數據從內核緩沖區拷貝到用戶緩沖區。
- tmp_buf = mmap(file, len);
- write(socket, tmp_buf, len);
mmap + write 實現零拷貝的基本流程如下:
- 用戶進程向內核發起系統mmap調用
- 將用戶進程的內核空間的讀緩沖區與用戶空間的緩存區進行內存地址映射
- 內核基于DMA Copy將文件數據從磁盤復制到內核緩沖區
- 用戶進程mmap系統調用完成并返回
- 用戶進程向內核發起write系統調用
- 內核基于CPU Copy將數據從內核緩沖區拷貝到Socket緩沖區
- 內核基于DMA Copy將數據從Socket緩沖區拷貝到網卡
- 用戶進程write系統調用完成并返回
RocketMQ中消息基于mmap實現存儲和加載的邏輯寫在org.apache.rocketmq.store.MappedFile中,內部實現基于nio提供的java.nio.MappedByteBuffer,基于FileChannel的map方法得到mmap的緩沖區:
- // 初始化
- this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
- thisthis.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
查詢CommitLog的消息時,基于mappedByteBuffer偏移量pos,數據大小size查詢:
- public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
- int readPosition = getReadPosition();
- // ...各種安全校驗
- // 返回mappedByteBuffer視圖
- ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
- byteBuffer.position(pos);
- ByteBuffer byteBufferbyteBufferNew = byteBuffer.slice();
- byteBufferNew.limit(size);
- return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
- }
tips: transientStorePoolEnable機制Java NIO mmap的部分內存并不是常駐內存,可以被置換到交換內存(虛擬內存),RocketMQ為了提高消息發送的性能,引入了內存鎖定機制,即將最近需要操作的CommitLog文件映射到內存,并提供內存鎖定功能,確保這些文件始終存在內存中,該機制的控制參數就是transientStorePoolEnable。
因此,MappedFile數據保存CommitLog刷盤有2種方式:
- 開啟transientStorePoolEnable:寫入內存字節緩沖區(writeBuffer) -> 從內存字節緩沖區(writeBuffer)提交(commit)到文件通道(fileChannel) -> 文件通道(fileChannel) -> flush到磁盤
- 未開啟transientStorePoolEnable:寫入映射文件字節緩沖區(mappedByteBuffer) -> 映射文件字節緩沖區(mappedByteBuffer) -> flush到磁盤
RocketMQ 基于 mmap+write 實現零拷貝,適用于業務級消息這種小塊文件的數據持久化和傳輸 Kafka 基于 sendfile 這種零拷貝方式,適用于系統日志消息這種高吞吐量的大塊文件的數據持久化和傳輸。
tips: Kafka 的索引文件使用的是 mmap+write 方式,數據文件發送網絡使用的是 sendfile 方式。
B. Netty零拷貝
Netty 的零拷貝分為兩種:
- 基于操作系統實現的零拷貝,底層基于FileChannel的transferTo方法
- 基于Java 層操作優化,對數組緩存對象(ByteBuf )進行封裝優化,通過對ByteBuf數據建立數據視圖,支持ByteBuf 對象合并,切分,當底層僅保留一份數據存儲,減少不必要拷貝
2. 多路復用
Netty中對Java NIO功能封裝優化之后,實現I/O多路復用代碼優雅了很多:
- // 創建mainReactor
- NioEventLoopGroup boosGroup = new NioEventLoopGroup();
- // 創建工作線程組
- NioEventLoopGroup workerGroup = new NioEventLoopGroup();
- final ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap
- // 組裝NioEventLoopGroup
- .group(boosGroup, workerGroup)
- // 設置channel類型為NIO類型
- .channel(NioServerSocketChannel.class)
- // 設置連接配置參數
- .option(ChannelOption.SO_BACKLOG, 1024)
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .childOption(ChannelOption.TCP_NODELAY, true)
- // 配置入站、出站事件handler
- .childHandler(new ChannelInitializer<NioSocketChannel>() {
- @Override
- protected void initChannel(NioSocketChannel ch) {
- // 配置入站、出站事件channel
- ch.pipeline().addLast(...);
- ch.pipeline().addLast(...);
- }
- });
- // 綁定端口
- int port = 8080;
- serverBootstrap.bind(port).addListener(future -> {
- if (future.isSuccess()) {
- System.out.println(new Date() + ": 端口[" + port + "]綁定成功!");
- } else {
- System.err.println("端口[" + port + "]綁定失敗!");
- }
- });
3. 頁緩存(PageCache)
頁緩存(PageCache)是操作系統對文件的緩存,用來減少對磁盤的 I/O 操作,以頁為單位的,內容就是磁盤上的物理塊,頁緩存能幫助程序對文件進行順序讀寫的速度幾乎接近于內存的讀寫速度,主要原因就是由于OS使用PageCache機制對讀寫訪問操作進行了性能優化:
頁緩存讀取策略:當進程發起一個讀操作 (比如,進程發起一個 read() 系統調用),它首先會檢查需要的數據是否在頁緩存中:
- 如果在,則放棄訪問磁盤,而直接從頁緩存中讀取
- 如果不在,則內核調度塊 I/O 操作從磁盤去讀取數據,并讀入緊隨其后的少數幾個頁面(不少于一個頁面,通常是三個頁面),然后將數據放入頁緩存中
頁緩存寫策略:當進程發起write系統調用寫數據到文件中,先寫到頁緩存,然后方法返回。此時數據還沒有真正的保存到文件中去,Linux 僅僅將頁緩存中的這一頁數據標記為“臟”,并且被加入到臟頁鏈表中。
然后,由flusher 回寫線程周期性將臟頁鏈表中的頁寫到磁盤,讓磁盤中的數據和內存中保持一致,最后清理“臟”標識。在以下三種情況下,臟頁會被寫回磁盤:
- 空閑內存低于一個特定閾值
- 臟頁在內存中駐留超過一個特定的閾值時
- 當用戶進程調用 sync() 和 fsync() 系統調用時
RocketMQ中,ConsumeQueue邏輯消費隊列存儲的數據較少,并且是順序讀取,在page cache機制的預讀取作用下,Consume Queue文件的讀性能幾乎接近讀內存,即使在有消息堆積情況下也不會影響性能,提供了2種消息刷盤策略:
- 同步刷盤:在消息真正持久化至磁盤后RocketMQ的Broker端才會真正返回給Producer端一個成功的ACK響應
- 異步刷盤,能充分利用操作系統的PageCache的優勢,只要消息寫入PageCache即可將成功的ACK返回給Producer端。消息刷盤采用后臺異步線程提交的方式進行,降低了讀寫延遲,提高了MQ的性能和吞吐量
Kafka實現消息高性能讀寫也利用了頁緩存,這里不再展開。