NIO的理解和使用
一、概述
NIO是 non-blocking-io的简称,非阻塞IO,由于它是后续出来的IO模型,有时也叫做 new-IO。NIO是后续比如React等的多路复用的基础模型。它是UNIX的五种IO模型中的一种。
NIO有三大组件:buffer、channel和selector,这三大组件共同作用,提供了多路复用的非阻塞解决方案。
二、组件一-buffer
buffer是NIO中的顶层缓冲抽象类,在NIO中的buffer存储的是基本类型,但是除了Boolean类型,如下
初始化一个buffer,它的position值为0,mark为-1,capacity为buffer的容量大小,limit的值是不固定的,且数据都是0.
buffer也是NIO中的一个重要组件,充当缓冲区的角色,可以理解为是一个存储基本数据类型的容器,并且它的存储结构是一个线性的有限的集合。它有三个重要的属性:position、limit和capacity。
position:下一个元素被读或写的位置
limit:读或写截至的位置的后一个位置。不会大于容量。理解就是limit是限制的意思,是读或写的截至位置,当读或写超过这个limit,那么会抛出异常。
capacity:就是缓冲区的容量大小,是固定不变的。
下面讲一下buffer的重要方法
标记和重置(mark方法和reset方法)
public final Buffer mark() { mark = position; return this; }public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; }
当方法调用了reset方法,position会重置到mark的标记处。且必须先使用mark方法才能使用reset方法,否则抛异常。且当position或limit的值小于mark得值时,mark会失效,所以,后续调用reset也会报错!
清除、翻转和重绕(clear方法,flip方法和 rewind方法)
public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }public final Buffer rewind() { position = 0; mark = -1; return this; }
clear:它将limit置为capacity位置,将position置为0位置。目的就是为了新的序列输入做准备,可以理解为就是重置了buffer。
flip:它将limit置为当前position的位置,将position置为0 的位置。目的就是从buffer中读取数据,结合get方法使用。
rewind:它将position置为0位置,limit位置不变。
注意:buffer是线程不安全的。当一个buffer被多个线程使用,要使用合适的同步代码块来控制。
且属性的值:mark <= position <= limit <= capacity
buffer中还有一个不显眼的属性:address。这个属性仅使用于direct buffer。(这里先不做进一步研究)
下面结合图解来进一步理解buffer的读写过程
首先,创建一个ByteBuffer并插入三个字节1,2,3
使用flip方法切换为读
读取两个字节1,2,然后使用compact方法压缩buffer(结合的Bytebuffer)
使用clear方法
下面再来看看buffer的实现类ByteBuffer
ByteBuffer有直接缓存,也有非直接缓存。如果是直接字节缓存,Java虚拟机将尽最大努力直接对其执行本地I/O操作。也就是说,调用操作系统不在需要将buffer的内容复制到中间缓冲区或从中间缓冲区复制。
零拷贝和直接缓存
通过 DirectByteBuf
- ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
- ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存
大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用
- 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
- java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
- DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
- 通过专门线程访问引用队列,根据虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
- 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到
- 只发生了一次用户态与内核态的切换
- 数据拷贝了 3 次
进一步优化(linux 2.4)
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
- 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu
整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有
- 更少的用户态与内核态的切换
- 不利用 cpu 计算,减少 cpu 缓存伪共享
- 零拷贝适合小文件传输
直接缓冲区的内容可能在正常垃圾回收堆之外,也就是堆外内存。所以,他们对应用程序内存占用的影响可能不会太大。
建议直接buffer可以用于那些大的,长时间存在的对象。直接buffer直接受底层操作系统的本机I\O操作。
Java平台通常支持通过JNI从本地代码中创建直接字节缓存。
ByteBuffer的一些常用的方法有slice方法,compact方法和duplicate方法
slice方法是切割的意思,就是从现在的position位置开始切割为一个新的Bytebuffer。
duplicate方法就是复制现在的buffer,包括属性的值。
compact方法是将未读完的数据压缩。
三、组件二-channel
Channel接口是NIO中的顶层channel接口,所有的channel都要实现这个接口。channel代表了开启与实体的连接。例如硬件设备、一个文件,一个网络socket等的读写操作
一个channel能开启也能关闭,channel在创建时是开启的,一但关闭就关闭了。
channel是线程安全的。
只有两个方法:isOpen()和close方法
我们着重看一下ServerSocketChannel,它是用于Socket的channel抽象类。
它里面比较重要的方法就是accept方法,这个方法用于接收此channel通道的socket的连接。
在阻塞模式下调用它,如果没有连接,方法会阻塞,直到有连接或I/O异常。
在非阻塞模式下调用它,如果没有连接,则立即返回null,线程不会阻塞。
四、组件三-selector
selector的官方解释是:SelectableChannel对象的多路复用器。这里的多路指的是多个channel通道,复用的意思是一起用,结合起来就是多个channel通道注册到这个selector,可以实现共同使用,注意,这里的共同使用不是并发的意思。
selectionKey是其实就是将selector、channel、attach(附件)和ops(监听事件)封装为一个key保存在selectedKey集合中。
selector中存放了一个selectedKey的集合。集合中的每一个元素都是一个selectedKey,这每一个key就是所注册进selector中的channel,以及它所携带的附件(就是buffer)和监听事件。
五、实操
下面演示一个非阻塞的,监听服务器写事件,客户端读事件的非阻塞案例
服务端
public class TestWriteEvent { public static void main(String[] args) throws Exception{ ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT,null); ssc.bind(new InetSocketAddress("localhost",8081)); while(true) { selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { //因为本例子中ServerSocketChannel只有一个,故直接用Ssc SocketChannel sc = ssc.accept(); sc.configureBlocking(false); ByteBuffer buffer = ByteBuffer.wrap(new byte[]{1,2,3,4,5}); sc.register(selector,SelectionKey.OP_WRITE,buffer); int write = sc.write(buffer); System.out.println("本次写入了:" + write + "个字节"); }else if(key.isWritable()){ System.out.println("可读事件"); Thread.sleep(2000l); ByteBuffer buffer = (ByteBuffer) key.attachment(); if(!buffer.hasRemaining()){ key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); } } } } }}
客户端
public class TestWriteEventClilent { public static void main(String[] args) throws Exception{ SocketChannel sc = SocketChannel.open(); //注意,客户端需要用connect连接,服务端是bind sc.connect(new InetSocketAddress("localhost",8081)); Selector selector = Selector.open(); ByteBuffer buffer = ByteBuffer.allocate(5); sc.configureBlocking(false); SelectionKey register = sc.register(selector, SelectionKey.OP_READ, buffer); while(true){ selector.select(); if(register.isReadable()){ System.out.println("察觉到channel中可读......"); sc.read((ByteBuffer) register.attachment()); buffer.flip(); while(buffer.hasRemaining()){ System.out.println(buffer.get()); } } } }}
案例二:演示selector的多路复用,boss线程负责建立连接,worker线程负责读写。
/** * Author: cheng * boss这个线程只负责建立连接,读写由work其他线程执行。 */public class TestMultiThread { public static void main(String[] args) throws Exception{ Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, SelectionKey.OP_ACCEPT, null); ssc.bind(new InetSocketAddress(8080)); //创建worker并初始化 Worker worker1 = new Worker("work1"); while(true){ boss.select(); Iterator<SelectionKey> iterator = boss.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if(key.isAcceptable()){ SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //将读写事件交给worker,关联worker System.out.println("before register..."); //注意:如果select方法阻塞了,则无法进行register! worker1.register(sc); System.out.println("after register..."); } } } } /** * work类负责读写 **/ static class Worker implements Runnable{ private Thread thread; private Selector selector; private String name; private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>(); //用于两线程之间的通信 private volatile boolean start = false; //初始化开关 public Worker(String name) throws Exception { this.name = name; } //初始化线程和selector且只执行一次 public void register(SocketChannel sc) throws Exception{ if(!start){ selector = Selector.open(); thread = new Thread(this,name); thread.start(); start=true; } //只是将任务加入到了队列中 queue.add(()->{ try { sc.register(selector,SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } }); //唤醒selector selector.wakeup(); } @Override public void run() { while(true){ try{ /*** 很哈的解决了select和register的执行顺序,保证了先执行select,然后才会执行register。**/ selector.select(); Runnable task = queue.poll(); if(task!= null){ //将channel注册到了selector中并执行 task.run(); } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); //注意移除! iterator.remove(); if(key.isReadable()){ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();channel.read(buffer);buffer.flip();while(buffer.hasRemaining()){ System.out.println("读到的数据为:"+buffer.get());} } } }catch(Exception e ){ e.printStackTrace();; } } } }}
六、总结
1、NIO的三大组件:buffer、channel和selector
2、buffer充当缓冲区的角色,可读可写,可以通过flip方法和clear切换读写,主要涉及的是position和limit属性。
3、channel是数据通道,常见的数据通道有UDP的DatagramChannel,TCP的SocketChannel和ServerSocketChannel等等
4、selector是一个多路复用器,其实就是可以让一个线程去监听多个channel通道中的事件,比较适合于连接数多,事件发生概率少的场景。
5、NIO结合buffer、channel和selector,实现了非阻塞的多路复用模型,为后续的Reacter等提供了很好的解决方案。
6、NIO是Non-blocking-io的简称,也就是非阻塞IO,所谓阻塞就是发起读取数据请求的时,当数据还没准备就绪的时候,这时请求是即刻返回,还是在这里等待数据的就绪,如果需要等待的话就是阻塞,反之如果即刻返回就是非阻塞。
7、NIO中使用缓存也分为两种:直接缓存和非直接缓存。直接缓存使用的堆外内存,好处就是I/O快捷,不需要中间buffer,可直接读取,这其实就是NIO的零拷贝了,缺点就是由于不在JVM垃圾回收的管理范畴,容易造成内存泄露。官方建议就是,如果是大对象,长生命周期的,适用于堆外内存。
8、NIO是非阻塞IO,也属于同步IO,在IO模型里面如果请求方从发起请求到数据最后完成的这一段过程中都需要自己参与,那么这种我们称为同步请求;反之,如果应用发送完指令后就不再参与过程了,只需要等待最终完成结果的通知,那么这就属于异步。