1、NIO简介
NIO(non-blocking IO或new IO),是同步非阻塞IO.
关于BIO、NIO和AIO的原理与区别请看我写的这篇文章:BIO、NIO、AIO三种IO模型的理解 本文只介绍NIO三大组件的原理及使用。
NIO三大组件分别是:Buffer、Channel、Selector,是JDK1.4提供的新API,NIO相关的类都放在java.nio包下,且对java.io包下BIO相关的很多类进行了改写。
NIO是面向缓冲区(或面向块的),而BIO是面向流(字符流或字节流)的。数据读取到一个它稍后处理的缓冲区(本质是一个内存块,基于数组实现),需要时可在缓冲区前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。
而BIO是基于事件的,当有连接请求、或读事件时才会处理,否则就阻塞在那里。
NIO的非阻塞模式:
- 非阻塞读:一个线程从某通道读取数据时,仅能读到目前准备好的数据,如果数据还没准备好,就立即返回,该线程可以继续做别的事情,而不是像BIO那样保持线程阻塞;
- 非阻塞写:一个线程写入一些数据到某通道时,先将数据写到缓冲区,等到数据可写时,再将缓冲区的数据写到通道。而BIO调用了write之后就阻塞在这里,即使数据不可写也要一直等待,直到数据完全写出去。
2、三大组件的关系
下面来看看三大组件的关系图:
可以看到,服务端仅仅只有一个线程,而这个线程里面维护着一个selector,一个selector可以同时处理多个客户端连接。channel可以理解为服务端与客户端传输数据的通道。
而buffer是缓冲区。前面说到NIO是面向缓冲区的,什么意思呢?客户端写数据时总是先将数据写到缓冲区,然后再从缓冲区写到channel,而读数据时恰恰是相反的,channel先将数据读到缓冲区,而客户端直接从缓冲区中读取数据。由此可见,buffer是双向的,既可以读,又可以写,而面向流的BIO则是单向的,一般都分为输入流和输出流两种,要么只能读,要么只能写。
上图就是NIO的多路复用模式。selector可以称作选择器,或者多路复用器,它也是多路复用的关键。
下面再复述一遍对应关系:
- 每个channel都会对应一个buffer(其实channel两头都有一个buffer,上图没有画出channel服务端那头的buffer),一个线程有一个selector,一个selector对应多个channel。
多个channel首先会注册到一个selector上面,selector会使用轮询机制,遍历这些channel,当哪个channel有事件(具体的事件类型有可接收accept、已连接connect、可读readable、可写writeable)发生了,就停下来处理这个channel。
3、Buffer入门实例
前面说到,Buffer本质是一个内存块,基于数组实现,Buffer是一个抽象类,七个基本类型(除了boolean)都有Buffer的子类,最常用的是ByteBuffer,因为网络数据都是以字节方式传输的。
上面也说到,buffer既可以读又可以写,但是读写要做切换,也就是说,如果buffer当前模式为写模式,则要显式切换到读模式才可以读数据,反之亦然。
下面以ButeBuffer为例,演示一下Buffer的使用。
// 创建新的buffer,大小是1024个字节
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 创建之后是写模式,因此可以直接写数据
for (int i = 0; i < 10; i++) {
buffer.put((byte) i);
}
// 调用flip方法切换到读模式
buffer.flip();
// 循环读取
while (buffer.hasRemaining()){
System.out.println(buffer.get());
}
4、Buffer核心原理
知道如何简单使用buffer之后,下面就来探究一下buffer的底层原理。
buffer的原理主要在于它的四个属性:
- mark:在缓冲区操作过程当中,可以将当前的 position的值临时存入mark属性中,需要的时候再取出,恢复到之前的position,重新从之前的position位置开始处理。调用mark()方法来设置mark=position,再调用reset()可以让 position恢复到 mark标记的位置,即 position=mark
- position:位置,读或写都会改变位置
- limit:表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作,极限可以修改
- capacity:容量,在缓冲区创建时设定且不可修改,比如前面创建的缓冲区容量就是1024且不可修改
mark属性和capacity已经介绍的很详细了,mark属性一般情况下不需要用它,而capacity就是容量在创建buffer时就已经固定了,因此下面重点介绍其它属性。
一个刚创建的buffer是写模式,比如ByteBuffer.allocate(10),此时position=0,limit=10,如下图所示。
limit表示读或写的极限,由于当前是写模式,因此可以写10个数据,且每写入一次数据,position就会加1。如果写第11个数据,则会报错。
limit是可以修改的,比如改成5,则最多只能放5个数据,写入第6个时就会报错,如下图所示。
当调用flip方法时,会从写模式切换到读模式。我们看看flip方法做了什么。
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
我们假设一下,新创建的buffer容量为10,且设置了limit=8,那么当写了5个数据后(未写满)就调用一下flip方法,调用之前limit=8,position=5,调用之后limit=5,而position重置为0,此时已经进入读模式,且会读取0~5之间的数据,因为你只写了5个啊。
以上的分析就是position和limit属性的含义,它会控制读或写的区间永远在position到limit的之间。
如何从写模式切换到读模式我们已经知道了,那么如何从读模式切换到写模式呢?答案是调用clear方法或compact方法。下面看一下clear方法做了什么。
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
其这么做的含义你们可以自行研究,前面道理已经讲得比较清楚了。
下面是读写模式切换方式图。
5、Channel核心原理与实战
常见的channel有如下几种:
- FileChannel(实现为FileChannelImpl)
- DatagramChannel(用于UDP)
- ServerSocketChannel(用于TCP,实现为ServerSocketChannelImpl)
- SocketChannel(用于TCP,实现为SocketChannelImpl)
下面主要演示一下FileChannel的使用。
FileChannel,顾名思义,是用于处理文件输入输出的。
向本地文件写数据代码示例:
FileOutputStream out = new FileOutputStream("E:/tmp/nio.txt");
FileChannel channel = out.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("测试nio往文件写数据".getBytes());
// 需要切换为读模式,因为下面调用write方法时相当于从buffer里面读数据
buffer.flip();
// 向channel写数据
int len = channel.write(buffer);
System.out.println("字节数:"+len);
out.close();
这里用到了BIO的类FileOutputStream,说明NIO也是基于BIO的。
从本地文件读数据代码示例:
FileInputStream in = new FileInputStream("E:/tmp/nio.txt");
FileChannel channel = in.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(8);
int len = -1;
List<Byte> list = new ArrayList<>();
byte[] bytes = new byte[8];
// 循环读取数据
while ((len=channel.read(buffer))!=-1){
// 下面要从buffer中读数据,因此切换为读模式
buffer.flip();
buffer.get(bytes,0,len);
for (int i = 0; i < len; i++) {
list.add(bytes[i]);
}
// 下一个循环需要先向buffer写数据,因此切换为写模式
buffer.clear();
}
in.close();
// 转为byte数组
byte[] resBytes = new byte[list.size()];
for (int i = 0; i < list.size(); i++) {
resBytes[i] = list.get(i);
}
// 以字符串形式打印
System.out.println(new String(resBytes));
文件拷贝代码示例:
FileInputStream in = new FileInputStream("E:/tmp/nio.txt");
FileOutputStream out = new FileOutputStream("E:/tmp/nio2.txt");
FileChannel src = in.getChannel();
FileChannel dst = out.getChannel();
src.transferTo(0,src.size(),dst);
// 或dst.transferFrom(src,0,src.size());
in.close();
out.close();
transferTo和transferFrom方法底层是零拷贝机制,会用到堆外物理内存,谨慎使用。关键零拷贝后面会有文章介绍,请耐心等待。
6、Selector原理与聊天室代码
前面已经介绍过selector了,它是多路复用的核心,主要用在网络编程模式中,服务端采用多路复用IO提升并发处理效率。
下面介绍一下selector的API。
selector几个核心方法:
- select:注册的通道上至少有一个通道有事件就返回,并将产生事件的selectionKey加入到内部集合,否则一直阻塞
- select(timeout):超过timeout就返回
- selectNow:立马返回,不阻塞
- selectedKeys:获取所有有事件发生的selectionKey,每个selectionKey对象聚合了一个selectableChannel对象
- keys :获取注册在selector上面所有的selectionKey
- wakeup:唤醒selector,比如调用了select方法,就会阻塞。如果selector正在阻塞中,则调用wakeup方法会唤醒,否则wakeup就在下次阻塞时生效
通过阅读SelectorImpl类(Selector的实现)的源码发现,keys方法和selectedKeys方法分别返回的是SelectorImpl的publicKeys属性和publicSelectedKeys属性,且这两个属性都是Set<SelectionKey>类型
SelectionKey类内部维护的几个事件常量:
- SelectionKey.OP_ACCEPT:可接收的
- SelectionKey.OP_CONNECT:已连接
- SelectionKey.OP_READ:可读
- SelectionKey.OP_WRITE:可写
如何创建一个Selector呢?答案是调用Selector.open()方法。
服务端多路复用IO核心流程:
- 服务端绑定一个端口(比如8888),开始监听
- 将ServerSocketChannel注册到Selector上,服务端线程开始调用selector的select方法,进入阻塞
- 一旦发生了事件,select方法苏醒,通过调用selector的selectedKeys方法获取有事件发生的selectionKey,并遍历这些selectionKey对象,拿到每个selectionKey对象维护的Channel,根据它们响应的事件做不同处理
- 如果发生的是Accept事件,则调用ServerSocketChannel的accept方法获取SocketChannel,并将SocketChannel注册到Selector上
- 如果发生的是Read事件,则从SocketChannel中读数据 如果发生的是Write事件,则往SocketChannel中写数据
- 如果发生的是Connect事件,则说明连接已建立
下面就来实现上述流程中的关键步骤。
服务端绑定端口:
// 1.构造一个ServerSocketChannel,并将其设为非阻塞模式
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2.绑定端口号
serverSocketChannel.bind(new InetSocketAddress("localhost",6666));
通用事件处理模型:
Selector selector = Selector.open();
// 将通道注册到reactor上,并只关注accept事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 服务端开始无限循环处理与客户端的通信
for(;;){
// 阻塞等待,也可以使用selectNow非阻塞等待或select(timeout)超时等待
selector.select();
// 获取本次有事件发生的SelectionKey集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
// 处理相应事件。因为这里是服务端代码,所以就没必要判断是否为Connect事件了
if(selectionKey.isAcceptable()){
// 接收新连接逻辑
}else if(selectionKey.isReadable()){
// 读通道数据逻辑
}else if(selectionKey.isWritable()){
// 往通道写数据逻辑
}
// 最后需要移除,防止重复处理
iterator.remove();
}
}
这里说明一下:ServerSocketChannel是专门接收连接的,因此它的有效事件是OP_ACCEPT,而SocketChannel才是用于服务端和客户端之间数据传输的,因此它的有效事件有OP_READ、OP_WRITE和OP_CONNECT。
OP_CONNECT比较特别,它表示客户端与服务端连接已建立,该事件发生于客户端的SocketChannel。为什么这么说呢,因为服务端也有一个SocketChannel对象。
register方法是SelectableChannel类的方法,前面说到的FileChannel不是该类的子类,因为FileChannel无法注册到Selector上面。
接收新连接,并注册到Selector:
// 通过事件,已经确定有新连接进来了,因为这里并不会阻塞
SocketChannel socketChannel = serverSocketChannel.accept();
// 设置非阻塞模式
socketChannel.configureBlocking(false);
// 可以打印一句话,表示有新的客户端已上线
System.out.println(String.format("客户端上线[%s]",socketChannel.getRemoteAddress().toString().substring(1)));
// 新的socketChannel注册到Selector上面,并主要关心Read事件
// 第三个参数为与之绑定的缓冲区,可以将读取的数据放到该缓冲区中
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
读事件发生,通道可读,读取数据:
下面是读数据的核心代码,这个很重要。
SocketChannel channel = (SocketChannel)selectionKey.channel();
// 拿到与之绑定的缓冲区
ByteBuffer buffer = (ByteBuffer)selectionKey.attachment();
// 先清空一下,因为可能有上一次的残留数据
buffer.clear();
// 当客户端主动关闭连接时,read方法会抛出异常,此时需要try catch掉,避免服务端因异常挂掉
try {
// 将channel中的数据读到buffer中,对于buffer是写,因为需要调用clear方法切换到写模式
channel.read(buffer);
} catch (IOException e) {
System.out.println(String.format("客户端下线[%s]",channel.getRemoteAddress().toString().substring(1)));
selectionKey.cancel();
channel.close();
continue;
}
// 将收到的消息转为string
String message = new String(buffer.array(), 0, buffer.position());
服务端收到客户端发送的消息之后该怎么办呢?
如果是群聊,则将消息转发给其它所有的客户端,也就是将message写到除自己之外的其它所有的SocketChannel。如果是私聊,则需要往指定的那个SocketChannel里写数据。这里可以任你发挥。
写事件发生,通道可写:
SocketChannel channel = (SocketChannel)selectionKey.channel();
// 拿到与之绑定的缓冲区,此时缓冲区是有数据的
ByteBuffer att = (ByteBuffer) selectionKey.attachment();
// 需要切换到读模式,因为往channel里写数据对于buffer而言是读
att.flip();
// 写数据
channel.write(att);
// 写完之后记得要取消write事件,否则CPU空转、利用率达到100%
selectionKey.interestOps(selectionKey.OP_READ);
好了,服务端的核心代码就写到这里了,喜欢本文的朋友们请点个关注,谢谢!