参考: Java NIO 系列教程
NIO 与 IO 的区别
NIO :面向缓冲区 非阻塞
IO: 面向流 阻塞
以网络编程为例,在服务端如果使用传统的IO,我们一般对每一个客户端的连接都会先得到Socket
然后new 一个新的线程来处理,而对于NIO的处理,一个线程就可以管理多个连接 .
那么在NIO,怎么做到了? ------ 事件驱动模型!!
1. 通过Channel , 通道来建立客户端与服务端的连接!
信息的传递就在这上面,可以写,也可以读!
2. 当你打开通道后,接下来就是注册,因为服务线程只有一个,那么它又怎么管理这么多的通道了?
Selector 选择器!selector会通过它的select()监听所有准备就绪的IO操作!如果没有,就会阻塞!
所以,通过注册,表示我们已经进入准备就绪.....
3. 但是,又怎么区分我们是要读, 写 ,还是建立连接?
这个就是SelectionKey的作用了,它又四个取值!分别代表连接,接收,读,写!
当注册的时候,就会要求指定感兴趣的事件 : selectionKey!
通过SelectionKey你可以获得相应的channal,然后进行额外的操作!
4. 怎么读写了?
NIO中,得首先把数据读入或写入缓冲区!在NIO包中,八种基本数据类型都有相对应的
缓冲区!
当客户端多个请求同时到达服务端时,服务端怎么处理的?
下面是服务端的监听代码:
同一时刻,有多少个通道被selector监测已经准备就绪了,就会被
抓住,然后通过selectedKeys()返回他们的key-set
在遍历这个set ,对每一个与key相关的channel进行相应的处理
private static void Listen() throws IOException {
System.out.println("服务端线程" + Thread.currentThread().getName() + " 启动成功...." );
// 阻塞式的方法
while (selector.select() >= 0) {
Set<SelectionKey> sKeys = selector.selectedKeys();
System.out.println(sKeys.size() + " " + sKeys);
Iterator<SelectionKey> it = sKeys.iterator();
// 遍历
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// key的四种可能取值 , 安排四种处理事件
if (key.isAcceptable()) {
acceptEvent(key);
} else if (key.isConnectable()) {
connectEvent(key);
} else if (key.isReadable()) {
readEvent(key);
} else if (key.isWritable()) {
writeEvent(key);
}
}
}
}
三个核心元素
Channel:数据通道,有点IO中的流,传递数据!
- FileChannel:从文件中读写数据。
- DatagramChannel:能通过UDP读写网络中的数据。
- SocketChannel:能通过TCP读写网络中的数据。
- ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
Selector:选择器
select()
会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。
Buffer:
缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。
三个属性 // 可以把缓冲区看成一个数组!
capacity :容量,可以通过allocat()方法分配 --- 数组最大长度
position
写模式,从0开始,写一个,position+1
读模式,重新置为0,然后一个一个读,直到等于limit!!
limit
写模式表示你能写的最大值 , 等于capacity , 读模式表示之前写入数据的最大位置,也就是写模式中position
Buffer 的一般用法
调用flip()方法 //
从Buffer中读取数据
调用clear()方法或者compact()方法
一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用clear()或compact()方法。clear()方法会清空整个缓冲区。compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。
NIO的非阻塞体现在哪里?
NIO的缺点?
不知道缓冲区中是否有足够的数据可以进行处理,所以必须向下面一样,多次访问缓冲区,看是否拥有
足够的数据来处理!
ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buffer);
while(! bufferFull(bytesRead) ) {
bytesRead = inChannel.read(buffer);
如果需要管理同时打开的成千上万个连接,这些连接每次只是发送少量的数据,例如聊天服务器,实现NIO的服务器可能是一个优势。同样,如果你需要维持许多打开的连接到其他计算机上,如P2P网络中,使用一个单独的线程来管理你所有出站连接,可能是一个优势。但对于有少量的连接使用非常高的带宽,一次发送大量的数据,也许典型的IO服务器实现可能非常契合。
总结
用IO进行socket编程的时候,服务端应对n个客户端的连接请求的解决办法:
一个就是一个服务线程在那运行,接收一个处理一个,这样明显太慢
另一个就是通过给每一个每一个连接对象new一个新的线程来进行处理,可是这种的方法
的坏处就在于浪费服务器资源,使服务器一直运行在高负荷的状态下,而且很多线程可能阻塞很久才会
工作。
而NIO则能很好的解决高负荷问题,因为它根本就不需要为每一个连接new一个新的线程,(NIO是基于Reactor模式);
我之前看的一篇博文将Reactor模式的,他里面的饭店服务员问题,我觉得很好的说明了这个问题,如果饭店为每个客人
都安排一个服务员的话,那么100个客人就有一百个服务员,可是当客人点菜的时候,服务员又在等着,这不就浪费资源了吗?
为什么就不让客人点菜的时候,服务员去招呼别人,当他点好菜的时候,再叫这个服务员,这不就很好的利用了资源了吗?
这里,不就像NIO吗?不必为每个线程启动一个线程,而是向selector注册感兴趣的事,那样当下次这件事情发生的时候,这个线程就会立马行动
进行处理。
实验代码:
客户端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/*
* 在客户端 调用服务端car中的方法
* Movable接口对服务端和客户端都是可见的
* client 和 server 的包或子包 彼此 都是不可见的!!
*/
public class Client implements Runnable {
public static void main(String[] args) {
for(int i = 0 ; i < 100 ; i++) {
new Thread(new Client()).start();
}
}
/*
* 选择器 , 每一个 Channel必须注册得到相应的selector
* 这样线程才能对channel进行管理 ,建立联系!
* 那是不是每个线程就只有一个selector?? 还是可以有多个
*/
Selector selector;
void initClient (String ip , int port) throws IOException {
// 首先得打开 通道
SocketChannel socketChannel = SocketChannel.open();
// 非阻塞模式
socketChannel.configureBlocking(false);
// 连接 非阻塞模式返回false表示连接中!!并没有建立连接
boolean connect = socketChannel.connect(new InetSocketAddress(ip , port));
// 代开选择器
selector = Selector.open();
// 通道与选择器关联!
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
/*
* 进行监听
*/
void Listen() throws IOException {
/*
* This method performs a blocking selection operation.
* It returns only after at least one channel is selected,
* this selector's wakeup method is invoked, or the current thread is interrupted, whichever comes first.
select()方法时阻塞式的,只有至少一个channel被选中时!
*/
/*
* 关于select方法 :
* Selector通过select操作,监视所有在该Selector注册过的Channel的对应的动作,
* 如果监测到某一对应的动作,则返回selectedKeys,自己手动取到各个SelectionKey进行相应的处理。
*/
while(true) {
/*
* 为什么能够这样无线循环下去了?不是select是阻塞的吗?
* 下面做了解释....
*/
// Selects a set of keys whose corresponding channels are ready for I/O operations
// 核心 直接的一句话
// 这样理解的话,当我们注册某一个感兴趣的事情后,那么该channel对该事件就准备就绪了..
// 开始的时候 , 注册的时候connect事件,然后注册写事件,那么在下一次外层while循环的时候
// 就会监测到写事件已经准备就绪了!进行相应的写事件处理.... 如此....
selector.select();
Set<SelectionKey> sKeys = selector.selectedKeys();
Iterator<SelectionKey> it = sKeys.iterator();
//遍历
while(it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// key的四种可能取值 , 安排四种处理事件
if(key.isAcceptable()) {
acceptEvent(key);
} else if(key.isConnectable()) {
connectEvent(key);
} else if(key.isReadable()) {
readEvent(key);
} else if(key.isWritable()) {
writeEvent(key);
}
}
}
}
private void writeEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
// 缓冲区 读写
ByteBuffer buffer = ByteBuffer.allocate(500);
String input = Thread.currentThread().getName() + " 客户端写入到服务端";
buffer = ByteBuffer.wrap(input.getBytes());
socketChannel.write(buffer);
//在此注册感兴趣的事件
socketChannel.register(selector, SelectionKey.OP_READ);
}
private void readEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(20);
socketChannel.read(buffer);
String readInfo = new String(buffer.array());
System.out.println(Thread.currentThread().getName() + " " + readInfo);
socketChannel.register(selector, SelectionKey.OP_WRITE);
}
private void connectEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// 正在连接中!
if(socketChannel.isConnectionPending()) {
/*
* A non-blocking connection operation is initiated(开始) by placing a socket
* channel in non-blocking mode and then invoking its connect method. Once
* the connection is established, or the attempt has failed, the socket channel
* will become connectable and this method may be invoked to complete the connection sequence.
*/
socketChannel.finishConnect();
}
if(socketChannel.isBlocking()) {
socketChannel.configureBlocking(false);
}
if(socketChannel.isConnected()) {
System.out.println(Thread.currentThread().getName() + " 成功建立了连接");
}
// 将感兴趣的事情注册为写
socketChannel.register(selector, SelectionKey.OP_WRITE);
}
private void acceptEvent(SelectionKey key) {
}
@Override
public void run() {
try {
initClient("localhost" , 65534);
Listen();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
服务端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Server {
public static void main(String[] args) {
try {
initServer(65534);
Listen();
} catch (IOException e) {
e.printStackTrace();
}
}
static Selector selector;
static void initServer(int port) throws IOException {
// 首先得打开 通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 非阻塞模式
ssChannel.configureBlocking(false);
// 连接
ssChannel.bind(new InetSocketAddress(port));
// 打开选择器
selector = Selector.open();
// 通道与选择器关联!
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
}
/*
* 服务端是怎么处理同时来自多个客户端的请求?
* 同一时刻,有多少个通道被selector监测已经准备就绪了,就会被
* 抓住,然后通过selectedKeys()返回他们的key-set
* 在遍历这个set ,对每一个与key相关的channel进行相应的处理
*/
private static void Listen() throws IOException {
System.out.println("服务端线程" + Thread.currentThread().getName() + " 启动成功...." );
// 阻塞式的方法
while (selector.select() >= 0) {
Set<SelectionKey> sKeys = selector.selectedKeys();
Iterator<SelectionKey> it = sKeys.iterator();
// 遍历
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// key的四种可能取值 , 安排四种处理事件
if (key.isAcceptable()) {
acceptEvent(key);
} else if (key.isConnectable()) {
connectEvent(key);
} else if (key.isReadable()) {
readEvent(key);
} else if (key.isWritable()) {
writeEvent(key);
}
}
}
}
private static void acceptEvent(SelectionKey key) throws IOException {
ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
SocketChannel sChannel = ssChannel.accept();
if(sChannel.isBlocking()) {
sChannel.configureBlocking(false);
}
System.out.println(Thread.currentThread().getName() + " 服务端接收连接成功.....");
sChannel.register(selector, SelectionKey.OP_WRITE);
}
private static void connectEvent(SelectionKey key) {
// TODO Auto-generated method stub
}
private static void readEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(500);
socketChannel.read(buffer);
String readInfo = new String(buffer.array());
System.out.println(readInfo);
socketChannel.register(selector, SelectionKey.OP_WRITE);
}
private static void writeEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(500);
String input = "服务端写入到客户端";
buffer = ByteBuffer.wrap(input.getBytes());
socketChannel.write(buffer);
socketChannel.register(selector, SelectionKey.OP_READ);
}
}