Java NIO --- 网络编程相关

时间:2021-01-26 20:19:28

   参考:      Java NIO 系列教程

 

 NIO 与 IO 的区别

   NIO :面向缓冲区 非阻塞

   Java NIO --- 网络编程相关 

  IO: 面向流 阻塞

      Java NIO --- 网络编程相关

     以网络编程为例,在服务端如果使用传统的IO,我们一般对每一个客户端的连接都会先得到Socket

     然后new 一个新的线程来处理,而对于NIO的处理,一个线程就可以管理多个连接 .   

     那么在NIO,怎么做到了? ------ 事件驱动模型!!

     1. 通过Channel , 通道来建立客户端与服务端的连接!

         信息的传递就在这上面,可以写,也可以读!

     2. 当你打开通道后,接下来就是注册,因为服务线程只有一个,那么它又怎么管理这么多的通道了?

         Selector 选择器!selector会通过它的select()监听所有准备就绪的IO操作!如果没有,就会阻塞!

         所以,通过注册,表示我们已经进入准备就绪.....

    3. 但是,又怎么区分我们是要读, 写 ,还是建立连接?

       这个就是SelectionKey的作用了,它又四个取值!分别代表连接,接收,读,写!

       当注册的时候,就会要求指定感兴趣的事件 : selectionKey!

       通过SelectionKey你可以获得相应的channal,然后进行额外的操作!

    4. 怎么读写了?

        NIO中,得首先把数据读入或写入缓冲区!在NIO包中,八种基本数据类型都有相对应的

        缓冲区!  

 

    当客户端多个请求同时到达服务端时,服务端怎么处理的?

    下面是服务端的监听代码:

     同一时刻,有多少个通道被selector监测已经准备就绪了,就会被
     抓住,然后通过selectedKeys()返回他们的key-set
    在遍历这个set ,对每一个与key相关的channel进行相应的处理

private static void Listen() throws IOException {
System.out.println("服务端线程" + Thread.currentThread().getName() + " 启动成功...." );
// 阻塞式的方法
while (selector.select() >= 0) {
Set<SelectionKey> sKeys = selector.selectedKeys();
System.out.println(sKeys.size() + " " + sKeys);
Iterator<SelectionKey> it = sKeys.iterator();
// 遍历
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// key的四种可能取值 , 安排四种处理事件
if (key.isAcceptable()) {
acceptEvent(key);
} else if (key.isConnectable()) {
connectEvent(key);
} else if (key.isReadable()) {
readEvent(key);
} else if (key.isWritable()) {
writeEvent(key);
}
}
}
}

 

  三个核心元素

   Channel:数据通道,有点IO中的流,传递数据!

  • FileChannel:从文件中读写数据。
  • DatagramChannel:能通过UDP读写网络中的数据。
  • SocketChannel:能通过TCP读写网络中的数据。
  • ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

    Selector:选择器

select()
会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。

 

 Buffer:

    缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。 

    三个属性 // 可以把缓冲区看成一个数组!

     capacity :容量,可以通过allocat()方法分配 --- 数组最大长度 

     position

        写模式,从0开始,写一个,position+1

        读模式,重新置为0,然后一个一个读,直到等于limit!!

      limit

        写模式表示你能写的最大值 , 等于capacity , 读模式表示之前写入数据的最大位置,也就是写模式中position

  Buffer 的一般用法

写入数据到Buffer
调用flip()方法 //
从Buffer中读取数据
调用clear()方法或者compact()方法 
当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到读模式。在读模式下,可以读取之前写入到buffer的所有数据。
一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用clear()或compact()方法。clear()方法会清空整个缓冲区。compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。

 

   NIO的非阻塞体现在哪里?

Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。 Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

 

   NIO的缺点?

    不知道缓冲区中是否有足够的数据可以进行处理,所以必须向下面一样,多次访问缓冲区,看是否拥有

     足够的数据来处理!

 

ByteBuffer buffer = ByteBuffer.allocate(48);  
int bytesRead = inChannel.read(buffer);
while(! bufferFull(bytesRead) ) {
bytesRead = inChannel.read(buffer);

 

所以,NIO可让您只使用一个(或几个)单线程管理多个通道(网络连接或文件),但付出的代价是解析数据可能会比从一个阻塞流中读取数据更复杂。
如果需要管理同时打开的成千上万个连接,这些连接每次只是发送少量的数据,例如聊天服务器,实现NIO的服务器可能是一个优势。同样,如果你需要维持许多打开的连接到其他计算机上,如P2P网络中,使用一个单独的线程来管理你所有出站连接,可能是一个优势。但对于有少量的连接使用非常高的带宽,一次发送大量的数据,也许典型的IO服务器实现可能非常契合。

  

   总结 

    用IO进行socket编程的时候,服务端应对n个客户端的连接请求的解决办法:

      一个就是一个服务线程在那运行,接收一个处理一个,这样明显太慢

     另一个就是通过给每一个每一个连接对象new一个新的线程来进行处理,可是这种的方法

     的坏处就在于浪费服务器资源,使服务器一直运行在高负荷的状态下,而且很多线程可能阻塞很久才会

     工作。

     而NIO则能很好的解决高负荷问题,因为它根本就不需要为每一个连接new一个新的线程,(NIO是基于Reactor模式);

     我之前看的一篇博文将Reactor模式的,他里面的饭店服务员问题,我觉得很好的说明了这个问题,如果饭店为每个客人

     都安排一个服务员的话,那么100个客人就有一百个服务员,可是当客人点菜的时候,服务员又在等着,这不就浪费资源了吗?

     为什么就不让客人点菜的时候,服务员去招呼别人,当他点好菜的时候,再叫这个服务员,这不就很好的利用了资源了吗?

     这里,不就像NIO吗?不必为每个线程启动一个线程,而是向selector注册感兴趣的事,那样当下次这件事情发生的时候,这个线程就会立马行动

     进行处理。

 

  实验代码:

  客户端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/*
* 在客户端 调用服务端car中的方法
* Movable接口对服务端和客户端都是可见的
* client 和 server 的包或子包 彼此 都是不可见的!!
*/
public class Client implements Runnable {

public static void main(String[] args) {
for(int i = 0 ; i < 100 ; i++) {
new Thread(new Client()).start();
}
}

/*
* 选择器 , 每一个 Channel必须注册得到相应的selector
* 这样线程才能对channel进行管理 ,建立联系!
* 那是不是每个线程就只有一个selector?? 还是可以有多个
*/
Selector selector;


void initClient (String ip , int port) throws IOException {
// 首先得打开 通道
SocketChannel socketChannel = SocketChannel.open();
// 非阻塞模式
socketChannel.configureBlocking(false);
// 连接 非阻塞模式返回false表示连接中!!并没有建立连接
boolean connect = socketChannel.connect(new InetSocketAddress(ip , port));
// 代开选择器
selector = Selector.open();
// 通道与选择器关联!
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}


/*
* 进行监听
*/

void Listen() throws IOException {
/*
* This method performs a blocking selection operation.
* It returns only after at least one channel is selected,
* this selector's wakeup method is invoked, or the current thread is interrupted, whichever comes first.
select()方法时阻塞式的,只有至少一个channel被选中时!
*/
/*
* 关于select方法 :
* Selector通过select操作,监视所有在该Selector注册过的Channel的对应的动作,
* 如果监测到某一对应的动作,则返回selectedKeys,自己手动取到各个SelectionKey进行相应的处理。
*/
while(true) {
/*
* 为什么能够这样无线循环下去了?不是select是阻塞的吗?
* 下面做了解释....
*/
// Selects a set of keys whose corresponding channels are ready for I/O operations
// 核心 直接的一句话
// 这样理解的话,当我们注册某一个感兴趣的事情后,那么该channel对该事件就准备就绪了..
// 开始的时候 , 注册的时候connect事件,然后注册写事件,那么在下一次外层while循环的时候
// 就会监测到写事件已经准备就绪了!进行相应的写事件处理.... 如此....
selector.select();
Set<SelectionKey> sKeys = selector.selectedKeys();
Iterator<SelectionKey> it = sKeys.iterator();
//遍历
while(it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// key的四种可能取值 , 安排四种处理事件
if(key.isAcceptable()) {
acceptEvent(key);
} else if(key.isConnectable()) {
connectEvent(key);
} else if(key.isReadable()) {
readEvent(key);
} else if(key.isWritable()) {
writeEvent(key);
}
}
}
}


private void writeEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
// 缓冲区 读写
ByteBuffer buffer = ByteBuffer.allocate(500);
String input = Thread.currentThread().getName() + " 客户端写入到服务端";
buffer = ByteBuffer.wrap(input.getBytes());
socketChannel.write(buffer);
//在此注册感兴趣的事件
socketChannel.register(selector, SelectionKey.OP_READ);
}


private void readEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(20);
socketChannel.read(buffer);
String readInfo = new String(buffer.array());
System.out.println(Thread.currentThread().getName() + " " + readInfo);
socketChannel.register(selector, SelectionKey.OP_WRITE);
}


private void connectEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// 正在连接中!
if(socketChannel.isConnectionPending()) {
/*
* A non-blocking connection operation is initiated(开始) by placing a socket
* channel in non-blocking mode and then invoking its connect method. Once
* the connection is established, or the attempt has failed, the socket channel
* will become connectable and this method may be invoked to complete the connection sequence.
*/
socketChannel.finishConnect();
}

if(socketChannel.isBlocking()) {
socketChannel.configureBlocking(false);
}

if(socketChannel.isConnected()) {
System.out.println(Thread.currentThread().getName() + " 成功建立了连接");
}
// 将感兴趣的事情注册为写
socketChannel.register(selector, SelectionKey.OP_WRITE);
}

private void acceptEvent(SelectionKey key) {

}


@Override
public void run() {
try {
initClient("localhost" , 65534);
Listen();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}




}

 

 服务端:

  

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {

public static void main(String[] args) {
try {
initServer(65534);
Listen();
} catch (IOException e) {
e.printStackTrace();
}
}

static Selector selector;

static void initServer(int port) throws IOException {
// 首先得打开 通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 非阻塞模式
ssChannel.configureBlocking(false);
// 连接
ssChannel.bind(new InetSocketAddress(port));
// 打开选择器
selector = Selector.open();
// 通道与选择器关联!
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
}

/*
* 服务端是怎么处理同时来自多个客户端的请求?
* 同一时刻,有多少个通道被selector监测已经准备就绪了,就会被
* 抓住,然后通过selectedKeys()返回他们的key-set
* 在遍历这个set ,对每一个与key相关的channel进行相应的处理
*/

private static void Listen() throws IOException {
System.out.println("服务端线程" + Thread.currentThread().getName() + " 启动成功...." );
// 阻塞式的方法
while (selector.select() >= 0) {
Set<SelectionKey> sKeys = selector.selectedKeys();
Iterator<SelectionKey> it = sKeys.iterator();
// 遍历
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// key的四种可能取值 , 安排四种处理事件
if (key.isAcceptable()) {
acceptEvent(key);
} else if (key.isConnectable()) {
connectEvent(key);
} else if (key.isReadable()) {
readEvent(key);
} else if (key.isWritable()) {
writeEvent(key);
}
}
}
}


private static void acceptEvent(SelectionKey key) throws IOException {
ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
SocketChannel sChannel = ssChannel.accept();
if(sChannel.isBlocking()) {
sChannel.configureBlocking(false);
}
System.out.println(Thread.currentThread().getName() + " 服务端接收连接成功.....");
sChannel.register(selector, SelectionKey.OP_WRITE);
}

private static void connectEvent(SelectionKey key) {
// TODO Auto-generated method stub

}

private static void readEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(500);
socketChannel.read(buffer);
String readInfo = new String(buffer.array());
System.out.println(readInfo);
socketChannel.register(selector, SelectionKey.OP_WRITE);
}

private static void writeEvent(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(500);
String input = "服务端写入到客户端";
buffer = ByteBuffer.wrap(input.getBytes());
socketChannel.write(buffer);
socketChannel.register(selector, SelectionKey.OP_READ);
}



}