导语
NIO的出现是为服务器端编程而设计的。它的作用就是能够让一个线程为多个连接服务。NIO中的API都是非阻塞模式的,这样可以在服务器端采用异步的方式来处理多个请求。NIO中有两个重要的东西就是通道和缓冲器,这两个必须掌握。
普通IO与NIO
使用普通IO进行服务器编程时,每当有一个连接来临时,我们需要为该连接服务。如果服务器只输出数据的话,那么直接向该连接中写入数据即可,一般情况下调用write()之类的方法即可写入数据。但是我们知道该方法会阻塞,这是什么原因呢?其实很简单,我们知道网络协议栈是进行分层了的。只有等下层做好了接受数据的准备时,上层才能把数据传给下层。我们知道最底层是物理层,对应的是我们的网卡和驱动。网卡中是有数据寄存器的,只有当数据寄存器未满时,它才接受来自上层的数据。还有就是数据传输时会希望拿到对方的ACK,如果没拿到会进行重传,这也导致了数据在某一层的累计。最终的效果就是调用write方法时,发现写了一些数据写不了,然后write就被阻塞,等待下层有接受数据的空间时继续写,直到所有的数据写完时write才返回。这样就会使得的我们干着急---正在服务的连接阻塞在write上但又无法接受新的连接。那么NIO有什么好处呢?首先,IO的操作可被配置为非阻塞模式,在这种模式下,它会尽力而为的去写,比如当前只能写3字节,它写完这三字节马上返回。这样的话就为我们节省了时间。那么问题又来了,我是如何知道写了多少字节呢?不用担心,在NIO中引入了缓冲器的概念,它会记录写了多少,还需要多少去写的。
多线程IO与NIO
多线程IO好,还是NIO好,这个不能一概而论。首先从设计的复杂性上来说NIO比较复杂。但是使用多线程IO的话又很消耗内存,而且还有存在线程切换的问题。其次NIO是不是就一定比多线程IO快,这个也不一定。有人做过测试说在Linux上使用Java 6完成的实际测试中,多线程经典IO设计要胜出于NIO 30%左右。那什么情况下使用NIO呢?有种情况就是服务器需要同时支持超大量的长期连接,比如10000个连接以上,而客户端不会很频繁的发送太多的数据。
通道
通道的作用就是将缓冲器中的数据移入或移出到各种I/O源,如文件,socket等。通道类的层次结构相当复杂,有多个接口和许多可选操作。但是对于TCP连接来说,只需要关注SocketChannle,ServerSocketChannel这两个类即可。
连接
SocketChannel类没有任何公共构造函数,只能通过open()方法来创建SocketChannel对象:
public static SocketChannel open(SocketAddress remote) throws IOException
public static SocketChannle open() throws IOExecption
第一个方法会建立连接。这个方法将阻塞(也就说在连接建立或抛出异常之前,这个方法不会返回)。如果需要在连接之前对Socket进行配置,则需要调用第二种方法,例如:
SocketChannel channel = SocketChannel.open()
SocketAddress address = new InetSocketAddress("www.xdysite.cn", 80);
address.configureBlocking(false)
channel.connect(address);
读取
为了读取SocketChannel,首先要创建一个ByteBuffer,通道可以在其中存储数据。然后将这个ByteBuffer传给read()方法:
public abstract int read(ByteBuffer dst) throws IOException
通道会用尽可能多的数据填充缓冲区,然后返回放入的字节数。如果遇到流末尾。通道会用所有剩余的字节填充缓冲区,而且在下一次调用read()时会返回-1。因为数据将存储在缓冲区的当前位置,而这个位置会随着增加更多数据而自动更新,所以可以一直向read()方法中传入同一个缓冲区,直到缓冲区填满。例如,下面的循环会一直读取数据,直到缓冲区填满或者检测到流末尾为止:
while (buffer.hasRemaining() && channel.read(buffer) != -1)
有时如果能从一个源填充多个缓冲区将会很有用,这被称为散布。下面两个方法接受一个ByteBuffer对象数组作为参数,按顺序填充数组中的各个ByteBuffer:
public final long read(ByteBuffer[] dsts) throws IOExecption
public final long read(ByteBuffer[] dsts, int offset, int length) throws IOException
下面是其使用示例:
ByteBuffer[] buffers = new ByteBuffer[2];
buffers[0] = ByteBuffer.allocate(1000);
buffers[1] = ByteBuffer.allocate(1000);
while(buffers[1].hasRemaining() && channel.read(buffers) != -1);
写入
Socket通道提供了读写方法,一般情况下它们是全双工的。要想写入,只需要一个ByteBuffer,将其回绕,然后传给某个写入方法,这个方法在把数据复制到输出并将缓存排空,这与读取过程正好相反。基本的write()会接受一个缓冲区作为参数:
public abstract int write(ByteBuffer src) throws IOException
与读取一样,如果通道是非阻塞的,这个方法不能保证会写入缓冲区的全部内容。当然由于缓冲区基于游标的特性,我们可以反复来调用该方法,直到缓冲区排空:
while (buffer.hasRemaining() && channel.write(buffer) != -1)
将多个缓冲区的数据写入到一个SocketChannel被称作聚集。例如:你在一个缓冲区中存储了HTTP首部,而在另一个缓冲区中存储了HTTP主体。可以使用下面的两个方法一次写完这两个缓冲区:
public final long write(ByteBuffer[] dsts) throw IOException
public final long write(ByteBuffer[] dsts, int offset, int length) throws IOException
第一个方法是排空所有的缓冲区,第二个方法是从位于offset的缓冲区开始,排空length个缓冲区
关闭
和正常的Socket一样,在用完通道后应当将其关闭,释放它可能使用的端口和其他任何资源:
public void close() throws IOException
如果通道已经关闭,再次进行关闭将没有任何效果。如果试图读写已关闭的通道。将抛出一个异常。
ServerSocketChannel类
ServerSocketChannel只有一个目的:接受入站连接。我们是无法读取,写入或者连接ServerSocketChannel的。我们可以通过其accept()来得到一个连接(这里的accept方法和 ServerSocket中的accept方法是一样的,都是从连接队列中取出一个连接并将该连接返回给我们)。下面我们来看一个通过ServerSocketChannel来创建一个服务器Socket的例子:
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(80));
接受连接
一旦打开并绑定ServerSocketChannel对象,accept()就可以调用了:
public abstract SocketChannel accept() throws IOException
accept()可以在阻塞或者非阻塞模式下工作,这取决于我们对Socket的配置(默认情况是阻塞的)。在阻塞情况下,如果请求连接的队列中有连接,那么它会从中取出一个并返回一个SocketChannel对象。我们可以通过该对象与客户端进行数据传输了。但是需要注意的该对象不是Socket的子类,我们不能从它上面直接调用类似getInputStream()之类的方法来使用。它只能通过缓冲器来使用。
ServerSocketChannel还可以工作在非阻塞模式(在调用bind方法之前先调用configureBlocking方法)。在这种情况下,如果没有入站连接,accept()方法会返回null。非阻塞模式更适合于需要为每个连接完成大量工作的服务器,这样可以并行处理多个请求了。非阻塞模式下一般都与Selector结合使用。
Channels类
ChannelS是一个简单的工具,可以将传统的基于I/O流,阅读器(Reader)和(Writer)包装在通道中,也可以从通道转化为基于I/O流,阅读器和书写器。处于性能的考虑,有可能在程序中的一部分中使用新I/O模型,但是同时仍要与处理流的传统API交互,这个类将很有用。下面是其的一些转换方法:
public static InputStream newInputStream(ReadableByteChannel ch)
public static OutputStream newOutPutStream(WritableByteChannel ch)
public static ReadableByteChannel newChannel(InputStream in)
public static WritableByteChannel newChannel(OutputStream out)
public static Reader newReader (ReadableByteChannel channel, CharsetDecoder decoder, int minimumBufferCapacity)
public static Reader newReader (ReadableByteChannel ch, String encoding)
public static Writer newWriter (WritableByteChannel ch, String encoding)
SocketChannel类实现了这些方法签名中出现的ReadableByteChannel和WritableByteChannel接口。ServerSocketChannel则都没实现,所以无法对 ServerSocketChannel进行读写。
Selector类
Selector类的作用就是对通道进行检查。我们向其注册多个通道,只要有一个通道准备就绪(可接受,可读,可写)时,Selector就会通知我们。
Selector唯一的构造函数是一个保护类型方法,一般情况下,要调用静态工程方法Selector.open()来创建新的选择器:
public static Selector open() throws IOException
下一步是为具有通道特性的server注册该选择器,这里需要调用ServerSocketChannel对象的 register()方法。
public fianl SelectionKey register(Selector sel, int pos)
public final SelectionKey register(Selector sel, int pos, Object att)
第一个参数是选择器,第二个参数是SelectionKey类中的一个常量,标识选择器所要观察的操作。SelectionKey中定义的4个常量:
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_CONNECT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
不同的通道注册到选择器后,就可以随时查看选择器来找出哪些通道已经准备好可以进行处理了。下面的两个方法就是用来查看是否有通道准备好:
public abstract int select() throws IOException
public abstract int select(long timeout) throws IOException
第一个方法会阻塞,直到至少有一个注册的通道准备好可以进行处理了。第二个在返回前会等到不超过timeout毫秒。当知道有通道准备好处理时(select方法返回了),我们就可以使用SelctionKyes()方法获取就绪的通道了。
public abstract Set<SelectionKey> selectedKeys()
迭代处理返回的集合时,要依次处理每个SelectionKey。注意:在我们没处理一个key时,我们需要将其从集合中删除,因为Selector只会往集合中添加已就绪的key,但是不会清理里面已被处理过的key。
SelectionKey类
一个selector对象可以被多个通道注册,当注册完毕后会返回SelectionKey对象,该对象是与具体的通道绑定的,是通道的风向标。返回值是不需要保存的,我们可以调用Selector中的selectedKeys()方法来获取所有的SelectionKey对象(返回的是一个SelectionKey集合)。当从所选择的键集合中获取一个SelectionKey时,通常首先要测试这些键能进行哪些操作。有以下4种可能:
public final boolean isAcceptable()
public final booolean isConnectable()
public final boolean isReadable()
public final boolean isWritable()
通过根据键的测试可以获得通道的状态。比如我们调用isWritable()返回true则表示这个键对应的通道处于可写状态,这样我们就可以在通道上执行写操作了。一旦了解到与键关联的通道准备好完成何种操作,就可以用channel()方法来获取这个通道了:
public abstract SelectableChannel channel()
如果在保存状态信息的SelectionKey存储了一个对象,就可以用attachment()方法来获取该对象:
public final Object attachment()
最后,如果结束使用连接,就要撤销其SelectionKey对象的注册,这样选择器就不会浪费资源去查询它是否准备就绪。可以调用这个键的cannel()方法来撤销注册:
public abstract void cancel()
一个示例的客户端
虽然新的I/O API并非专门为客户端而设计,但的确可以用于客户端。我们可以从一个简答的客户端开始来了解NIO。我们使用RFC864中定义的字符生成器协议来实现一个客户端。服务器在端口19监听连接。当客户端连接时,客户端的所有输入都被忽略。而服务器将发送连续的字符序列,直到客户端断开连接为止。
package com.dy.xidian;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
public class ChargenClient {
public static int DEFAULT_PORT = 19;
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("Usage: java ChargenClient host [port]");
return;
}
int port;
try {
port = Integer.parseInt(args[1]);
} catch (RuntimeException e) {
port = DEFAULT_PORT;
}
try {
SocketAddress address = new InetSocketAddress(args[0], port);
SocketChannel client = SocketChannel.open(address);
ByteBuffer buffer = ByteBuffer.allocate(74);
WritableByteChannel out = Channels.newChannel(System.out);
while (client.read(buffer) != -1) {
buffer.flip();
out.write(buffer);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
在实现这个新I/O API的客户端时,首先要调用静态工厂方法SocketChannel.open()来创建一个新的 java.nio.channel.SocketChannel对象。这个方法的参数是一个是java.net.SocketAddress对象,指示要连接的主机和端口。例如,下面的代码连接指向 www.xdysite.cn端口8000的通道:
SocketAddress addr = new InetSocketAddress("www.xdysite.cn", 8000);
SocketChannel client = SocketChannel.open(addr);
通道会以阻塞的模式打开,但是不会立马去连接服务器。如果这是传统的客户端,你可能会获取该Socket的输入和输出流,但这不是传统的客户端。利用通道可以将内容写入到缓冲区中。下面使用静态方法allocate()来创建一个容量为74字节的缓冲区ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(74);
这个ByteBuffer对象传递给通道read()方法。通道会将从Socket读取的数据填充这个缓冲区。它的返回成功读取并存储到缓冲区的字节数:
int byteRead = client.read(buffer);
默认情况下,会至少读取一个字节,或者返回-1指示数据结束。假定缓冲区中有一些数据,我们可以将这些数据复制到System.out中去。有几个方法可以中ByteBuffer中提取一个字节数组,然后再写到传统的OutputStream中去。不过,采用一种完全基于通道的解决方案更为合适。这需要利用到Channels工具类,将System.out封装到一个通道中:
WritableByteChannel output = Channels.newChannel(System.out)
然后将读取的数据写入到与System.out连接的这个输出管道中。不过,在写入数据之前需要回绕(flip)缓冲区,这个后面有讲。
buffer.flip()
output.write(buffer)
buffer.clear()
一个示例服务器
客户端使用通道和缓冲区就可以了,但是实际上通道和缓冲区主要用于处理多并发连接的服务器系统。下面是通过NIO设计的一个服务器。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class ChargenServer {
public static int DEFAULT_PORT=19;
public static void main(String[] args) {
int port;
try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException e) {
port = DEFAULT_PORT;
}
System.out.println("Listening for connections on port " + port);
byte[] rotation = new byte[95*2];
for(byte i=' '; i<= '~'; i++) {
rotation[i-' '] = i;
rotation[i+95-' '] = i;
}
ServerSocketChannel serverChannel;
Selector selector;
try {
//获取通道
serverChannel = ServerSocketChannel.open();
//绑定端口
serverChannel.bind(new InetSocketAddress(19));
//配置为非阻塞模式,这样accept()方法将不会阻塞
serverChannel.configureBlocking(false);
//创建Selector,迭代处理所有准备好的连接
selector = Selector.open();
//向通道注册选择器,第二个参数指明选择器所要关注的操作
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
return;
}
while (true) {
try {
//当有可以进行操作的通道时,该方法将会返回,否则阻塞
selector.select();
} catch (IOException e) {
e.printStackTrace();
break;
}
//获取已经就绪了的通道
Set<SelectionKey> readyKeys = selector.selectedKeys();
//通过迭代的方式来处理就绪通道
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//从集合中删除这个键
iterator.remove();
try {
if(key.isAcceptable()) { //发现这个key关联到ServerSocket并且有可接受的连接
//通过key获取serverSocket
ServerSocketChannel server = (ServerSocketChannel)key.channel();
//一定能够取出一个连接
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
//将来自客户端的连接设为非阻塞模式
client.configureBlocking(false);
//将该连接注册到Selector
SelectionKey key2 = client.register(selector, SelectionKey.OP_WRITE);
//为每个客户端(channel)创建一个缓冲器并填充数据
ByteBuffer buffer = ByteBuffer.allocate(74);
buffer.put(rotation, 0, 72);
buffer.put((byte)'\r');
buffer.put((byte)'\n');
buffer.flip();
//将该缓冲器绑定到key上以方便将来使用
key2.attach(buffer);
} else if (key.isWritable()) { //发现客户端连接可以写了
//通过KEY获取客户端连接
SocketChannel client = (SocketChannel) key.channel();
//获取对应的缓冲器
ByteBuffer buffer = (ByteBuffer) key.attachment();
//缓冲器调整
if (!buffer.hasRemaining()) {
//用下一行重新填充缓冲区
buffer.rewind();
//得到上一次的首字符
int first = buffer.get();
//准备改变缓冲区中的数据
buffer.rewind();
//寻找rotation中新的首字符位置
int position = first - ' '+ 1;
//将数据从rotation中复制到缓冲区
buffer.put(rotation, position, 72);
//在缓冲区末尾存储一个行分隔符
buffer.put((byte)'\r');
buffer.put((byte)'\n');
//准备缓冲区写入
buffer.flip();
}
//将缓冲器中的数据写入到Socket中,并不保证全部写入
client.write(buffer);
}
} catch (IOException e) {
key.cancel();
try {
key.channel().close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
}