接上一篇《NIO的原理以及与传统IO的对比》
想要使用Java实现前面提到的高性能网络服务端,需要使用JDK1.4之后提供java.nio包下的ByteBuffer、SelectionKey、Selector、ServerSocketChannel以及SocketChannel;下面就是一个使用JDK提供的API编写的,一个完整的客户端以及服务端实现NIO交互的代码:
一、创建工程
打开MyEclipse,新建一个名为“NIO_Test”的Java工程:
然后在src下面创建一个“cn.com.nio.test.execute”包,在下面创建“MultiplexerTimeServer”、“TimeClient”、“TimeClientHandle”、“TimeServer”四个Java文件:
其中“TimeServer”和“MultiplexerTimeServer”分别为服务端的启动类以及业务处理类;“TimeClient”和“TimeClientHandle”分别为客户端的启动类和业务处理类。
二、编写业务逻辑
我们下面分别来编写上面创建的类的具体逻辑。
(1)服务端启动类
首先是服务端的启动类“TimeServer”:
package cn.com.nio.test.execute;
public class TimeServer {
/**
* @param args
* @author Administrator
* */
public static void main(String[] args) {
int port = 8080;//服务端启动端口
if(args!=null && args.length>0){
//如果主函数的args参数不为空的话,则取参数中的数据作为启动端口
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
//如果数据获取异常,就采用之前的默认值8080
}
}
//创建服务端启动实例对象(一个实现Runnable接口的线程执行类)
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
//启动Runnable线程执行类“MultiplexerTimeServer”
new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
}
}
这里我们定义了服务端启动的端口“port”,默认是8080,如果主函数main的args参数不为空的话,则取参数中的数据作为启动端口。之后创建一个服务端启动实例对象,该对象是实现了Runnable接口的线程执行类,将“port”端口传入进去,然后开启一个线程来执行该实例对象,已开启服务端的服务。
(2)服务端执行类
上面的MultiplexerTimeServer是具体的服务端执行类,也是一个实现了Runnable接口的线程执行类,用来开启服务端的网络请求监听以及接收、转发功能。具体逻辑如下:
package cn.com.nio.test.execute;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class MultiplexerTimeServer implements Runnable{
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean stop;//volatile关键字保证stop字段在多线程的一致性
/**
* 初始化多路复用器,绑定监听端口
* @param port
* */
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();//创建一个selector监听器
//创建一个Channel通道
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);//支持非阻塞
//指定socket连接绑定的端口(1024为套接字上请求的最大挂起连接数)
serverChannel.socket().bind(new InetSocketAddress(port),1024);
//将该Channel通道注册到selector监听器上,注册事件为“OP_ACCEPT”接收请求事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The Time server is start in port:"+port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
//停止服务端,将停止参数置为true
this.stop = true;
}
/**
* 线程执行方法
* */
@Override
public void run() {
while(!stop){//停止参数不为true的情况下,服务一直启动
try {
selector.select(1000);//执行监听,设置超时时间为1000毫秒
/*获取已经注册的Channel通道上哪些有消息。每一个Channel注册后,都有分配一个
独一无二的key,selector可以获取这些活跃的Channel的key,进行遍历*/
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while(it.hasNext()){//遍历活跃的Channel的key
key = it.next();
it.remove();
try {
handleInput(key);//处理该key的请求信息
} catch (Exception e) {
if(key != null){
key.cancel();//出现异常后,将该key撤销
if(key.channel()!=null){
//出现异常后,将该key绑定的通道关闭
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if(key.isValid()){//判断Key是否还有用
//处理新接入的请求信息
if(key.isAcceptable()){//判断key是否是可接收的
//接收新的连接
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);//非阻塞
//添加新的连接至selector监听器
sc.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()){//判断key是否是可读取的
//读取请求数据
SocketChannel sc = (SocketChannel) key.channel();//获取channel通道
ByteBuffer readBuffer = ByteBuffer.allocate(1024);//分配一个缓存(与操作系统交互的)
int readBytes = sc.read(readBuffer);//从缓存中通过通道读取到buffer中
if(readBytes>0){//如果接受的信息不为空
readBuffer.flip();//识别是不是一个完整的包
byte[] bytes = new byte[readBuffer.remaining()];//创建一个存储信息的byte数组
readBuffer.get(bytes);//将buffer中的数据读到byte数组中
String body = new String(bytes,"UTF-8");//将byte数组转换为String(并转码)
System.out.println("The Time server receive order:"+body);
//返回当前的时间给发送方,如果对方发送的请求信息内容为“QUERY TIME ORDER”,则
//返回当前时间,如果请求内容不是“QUERY TIME ORDER”,则返回“BAD ORDER”
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?
new java.util.Date(System.currentTimeMillis()).toString()
:"BAD ORDER";
doWrite(sc,currentTime);//返回消息
}else if(readBytes<0){
//对链路关闭
key.cancel();
sc.close();
}else{
//读到0字节,忽略
}
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException {
if(response!=null && response.trim().length()>0){
byte[] bytes = response.getBytes();//将response字符串序列化
//创建一个bytes长度的数据缓存
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);//将bytes放入缓存
writeBuffer.flip();
channel.write(writeBuffer);//写入通道,反馈给发送端
}
}
}
在该执行类中,首先通过启动类传入的port端口,创建了selector监听器,并将一个通注册到selector监听器上,注册事件为“OP_ACCEPT”接收请求事件,以此来接收客户端的请求信息。
在run方法中,为该执行类真正执行时的逻辑,此时selector监听器会对通道进行持续的监听,遍历所有注册的、活跃的通道的key,并处理拥有可连接、可读取属性的key对应的通道。如果是可读取的,接受请求信息并给予相关的回应信息。
值得注意的是,上面的所有操作都是非阻塞的,各自通道的读取和写入操作互不影响。
(3)客户端启动类
客户端的启动类和服务端类似,这里会在相关端口启动一个客户端业务处理类:
package cn.com.nio.test.execute;
public class TimeClient {
/**
* @param args
* @author Administrator
* */
public static void main(String[] args) {
int port = 8080;//指定需要交互的服务端的端口
if(args!=null && args.length>0){
//如果主函数的args参数不为空的话,则取参数中的数据作为交互端口
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
//如果数据获取异常,就采用之前的默认值8080
}
}
//创建客户端启动实例对象,启动Runnable线程
new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClient-001").start();
}
}
然后是客户端的业务处理类,进行与服务端的连接,与发送请求信息:
package cn.com.nio.test.execute;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TimeClientHandle implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;//volatile关键字保证stop字段在多线程的一致性
public TimeClientHandle(String string, int port) {
this.host = host == null? "127.0.0.1":host;//指定服务端的主机ip
this.port = port;//指定服务端的主机端口
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
}
while(!stop){
try {
selector.select(1000);//向服务端发送请求
//获取活跃的Channel通道的key,进行遍历
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while(it.hasNext()){//遍历活跃的Channel的key
key = it.next();
it.remove();
try {
handleInput(key);//处理key
} catch (Exception e) {
if(key != null){
key.cancel();//出现异常后,将该key撤销
if(key.channel()!=null){
//出现异常后,将该key绑定的通道关闭
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if(key.isValid()){//判断Key是否还有用
//判断是否连接成功
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){//判断key是否是可连接的
if(sc.finishConnect()){//判断通道是否完成连接(三次握手)
sc.register(selector, SelectionKey.OP_READ);//注册一个读请求的通道
doWrite(sc);
}else{
System.exit(1);//连接失败,进程退出
}
}
if(key.isReadable()){//判断key是否是可读取的(服务端返回的数据)
//读取请求数据
ByteBuffer readBuffer = ByteBuffer.allocate(1024);//分配一个缓存(与操作系统交互的)
int readBytes = sc.read(readBuffer);//从缓存中通过通道读取到buffer中
if(readBytes>0){//如果接受的信息不为空
readBuffer.flip();//识别是不是一个完整的包
byte[] bytes = new byte[readBuffer.remaining()];//创建一个存储信息的byte数组
readBuffer.get(bytes);//将buffer中的数据读到byte数组中
String body = new String(bytes,"UTF-8");//将byte数组转换为String(并转码)
System.out.println("Now is:"+body);//打印服务端反馈的信息
this.stop = true;
}else if(readBytes<0){
//对链路关闭
key.cancel();
sc.close();
}else{
//读到0字节,忽略
}
}
}
}
private void doWrite(SocketChannel channel) throws IOException {
byte[] req = "QUERY TIME ORDER".getBytes();
//创建一个bytes长度的数据缓存
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);//将请求信息的bytes数组放入缓存
writeBuffer.flip();
channel.write(writeBuffer);//写入通道,发送给服务端
if(!writeBuffer.hasRemaining()){
System.out.println("Send order 2 server succeed.");
}
}
private void doConnect() throws IOException{
//如果直接连接成功,则注册到多路复用器上,发送请求信息,读取应答
if(socketChannel.connect(new InetSocketAddress(host,port))){
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else{
//如果连接不成功,则注册一个请求连接类型的通道
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
}
在客户端的业务处理类中,首先也是创建了一个selector监听器,然后创建了一个channel通道。当开启线程后,首先会获取服务端的连接,如果连接成功,则注册一个可读的通道,并向该通道发送信息;如果连接不成功则注册一个可连接的通道,向服务端请求连接。
当接收到可读类型的通道时,说明服务端反馈了信息,此时获取信息并转换为可读类型。
运行试验,首先打开服务端TimeServer:
此时服务端等待通道连接。
然后打开客户端,此时会发送“QUERY TIME ORDER”信息向服务端请求当前时间,服务端打印了收到的请求信息:
收到请求信息后,服务端会判断合法性,如果合法,则并返回当前时间给客户端,客户端打印服务端反馈的信息:
以上就是使用NIO来完成一个客户端与服务端的非阻塞的网络请求数据交互,可以实现高性能的网络服务。但是直接这样写的话,代码的复杂度很高。如果使用第三方封装好的框架来编写,会大大提高开发效率。目前业内大部分开发人员会使用Netty这个第三方框架,来完成NIO的连接交互逻辑,会大大减少开发时间,提高开发效率。
下一篇文章会对Netty进行学习,来看一下使用Netty的优点。
之前我的另一篇博文:
《【Netty入门和实践】2.NIO的样例代码分析》
javascript:void(0) 对NIO样例代码进行了更加详细的剖析,可以继续进行延伸阅读。
参考:
传智播客《2017零基础大数据》教学视频