JAVA基础知识之网络编程——-基于NIO的非阻塞Socket通信

时间:2023-03-09 00:07:08
JAVA基础知识之网络编程——-基于NIO的非阻塞Socket通信

阻塞IO与非阻塞IO

通常情况下的Socket都是阻塞式的, 程序的输入输出都会让当前线程进入阻塞状态, 因此服务器需要为每一个客户端都创建一个线程。

从JAVA1.4开始引入了NIO API, NIO可以实现非阻塞IO, 这样就可以使用一个线程处理所有的客户请求。

基于NIO的非阻塞Socket通信

服务器将用来监听客户端请求的channel注册到selector上,启动一个线程,使用selector的select()获取求情的客户端的channel数量,

当监听到有客户端请求时,就通过SelectionKey返回对应的客户端channel进行通信。

下面是一个非常简单的例子,

服务器端

 package niochat;

 import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset; public class NServer {
//用于检查所有channel状态的selector
private Selector selector = null;
static final int PORT = 3001;
private Charset charset = Charset.forName("utf-8");
public void init() throws IOException, InterruptedException {
selector = Selector.open();
//通过open方法打开一个未绑定的ServerSocketChannel实例
ServerSocketChannel server = ServerSocketChannel.open();
InetSocketAddress isa = new InetSocketAddress("127.0.0.1", PORT);
server.bind(isa);
//设置非阻塞
server.configureBlocking(false);
//注册ServerSocketChannel到selector
server.register(selector, SelectionKey.OP_ACCEPT); //服务器端需要轮询selector看是否有channel需要通信
while ( true ) {
//select()将会一直阻塞,直到选择了至少一个channel(进行通信),此时sector就会调用wakeup(),select()方法才能返回
int num = selector.select();
Thread.sleep(1000);
//select()方法返回后,用selectedKeys()返回对应channel的SelectionKey集合,通过key.channel()方法可以返回对应的channel实例
//被选中的key集合selectedKeys表示需要进行IO处理的channel集合,一个key代表一个channel
System.out.println("SelectionKey.size = "+selector.selectedKeys().size());
for (SelectionKey sk : selector.selectedKeys()) {
//从selector上的已选择的key集合中删除正在处理的key
System.out.println("SelectionKey.value = "+sk);
selector.selectedKeys().remove(sk);
//如果sk对应channel包含客户端连接请求
if (sk.isAcceptable()) {
//调用accept方法接受请求,产生一个服务器端的SocketChannel
//在非阻塞模式下,如果没有连接则直接返回null
SocketChannel sc = server.accept();
//设置非阻塞模式
sc.configureBlocking(false);
//将SocketChannel也注册到selector
sc.register(selector, SelectionKey.OP_READ);
//再将sk对应的channel设置为请求准备接受其他请求
sk.interestOps(SelectionKey.OP_ACCEPT);
} //如果sk对应的channel有数据需要读取
if (sk.isReadable()) {
//获取sk对应的channel
SocketChannel sc = (SocketChannel)sk.channel();
//channel中的数据必须先写入buffer中,然后才能写入进content中
ByteBuffer buff = ByteBuffer.allocate(1024);
String content = "";
try {
while(sc.read(buff) > 0) {
//buffer 复位
buff.flip();
content += charset.decode(buff);
}
System.out.println("读取的数据:"+ content);
//sk复位
sk.interestOps(SelectionKey.OP_READ);
} //遇到channel有异常说明客户端有异常,取消注册此sk
catch (IOException ex) {
//从已选择key集合中取消sk,下一次select()时此channel将自动被删除
ex.printStackTrace(); sk.cancel();
if(sk.channel() != null) {
sk.channel().close();
} } if (content.length() > 0) {
//广播
for(SelectionKey key : selector.keys()) {
Channel targetChannel = key.channel();
if (targetChannel instanceof SocketChannel) {
SocketChannel dest = (SocketChannel)targetChannel;
dest.write(charset.encode(content));
}
}
}
} }
}
} public static void main(String[] args) throws IOException, InterruptedException {
new NServer().init();
}
}

客户端

 package niochat;

 import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner; public class NClient {
//定义检测SocketChannel的selector
private Selector selector = null;
private static final int PORT = 3001;
private Charset charset = Charset.forName("utf-8");
//客户端的SocketChannel
private SocketChannel sc = null;
public void init() throws IOException {
selector = Selector.open();
InetSocketAddress isa = new InetSocketAddress("127.0.0.1", PORT);
//调用open静态方法创建连接到指定主机的SocketChannel
sc = SocketChannel.open(isa);
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
//创建子线程读取服务器返回的数据
new ClientThread().start();
//键盘输入流
Scanner scan = new Scanner(System.in);
while (scan.hasNextLine()) {
String line = scan.nextLine();
//放进SocketChannel
sc.write(charset.encode(line));
}
} private class ClientThread extends Thread {
public void run() {
try {
while (selector.select() > 0) {
//被选中的key集合selectedKeys表示需要进行IO处理的channel集合
for (SelectionKey sk : selector.selectedKeys()) {
//删除正在处理的key
selector.selectedKeys().remove(sk);
if (sk.isReadable()) {
SocketChannel sc = (SocketChannel)sk.channel();
ByteBuffer buff = ByteBuffer.allocate(1024);
String content = "";
while (sc.read(buff) > 0) {
sc.read(buff);
buff.flip();
content += charset.decode(buff);
}
System.out.println("聊天信息:"+content);
sk.interestOps(SelectionKey.OP_READ);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
} public static void main(String[] args) throws IOException {
new NClient().init();
}
}

执行结果,启动了一个服务器端,然后启动了两个客户端,

aaarticlea/png;base64," alt="" />