JAVA NIO 概述及例子

时间:2022-09-05 15:25:58

JAVA NIO 简单概述


异步非阻塞,基于Reactor模式为核心。

#将关注的事件注册到selector当事件发生selector会反馈事件给注册者#

java 如何实现的selector

Selector.open(); //打开多路复用器
//获取selector//通过源码可见线程安全且唯一 public static SelectorProvider provider() {        synchronized (lock) {            if (provider != null)                return provider;            return AccessController.doPrivileged(                new PrivilegedAction<SelectorProvider>() {                    public SelectorProvider run() {                            if (loadProviderFromProperty())                  //如果在jvm配置中定义了java.nio.channels.spi.SelectorProvider则加载他(SPI)                                return provider;                            if (loadProviderAsService())                             //加载一组扩展的服务类(SPI)                                return provider;                            provider = sun.nio.ch.DefaultSelectorProvider.create();                            //根据不同平台调用navtiveselector实现                            return provider;                        }                    });        }    }



基本流程


1.打开selector

2.ServerSocket在selector中注册感兴趣的事件
   (Accept,connect,read,write)

3.当事件触发且有感兴趣的访问者时,将返回channel

4.可以通过interestOps修改兴趣点

5.channel使用完毕后要进行close关闭


代码

Console 
=================
[INFO]2017-11-07 15:49:55 --> com.bone.handle.ServerSocketHandle.execute(ServerSocketHandle.java:41): [Server]多路复用器
[INFO]2017-11-07 15:49:55 --> com.bone.handle.ServerSocketHandle.execute(ServerSocketHandle.java:43): [Server]Server开启
[INFO]2017-11-07 15:49:55 --> com.bone.handle.ServerSocketHandle.execute(ServerSocketHandle.java:45): [Server]端口绑定6666
[INFO]2017-11-07 15:49:55 --> com.bone.handle.ServerSocketHandle.execute(ServerSocketHandle.java:47): [Server]非阻塞设定
[INFO]2017-11-07 15:49:55 --> com.bone.handle.ServerSocketHandle.execute(ServerSocketHandle.java:49): [Server]时间监听
[INFO]2017-11-07 15:49:55 --> com.bone.handle.ServerSocketHandle.execute(ServerSocketHandle.java:51): [Server]默认缓冲区4096KB
[消息][/172.26.106.38:49744]接入
[消息]总人数[1]
[消息][/172.26.106.38:49744][1] :3
[消息][/172.26.106.38:49749]接入
[消息]总人数[2]
[消息][/172.26.106.38:49749][2] :44


目录

JAVA NIO 概述及例子


源码


定义了server启动过程

package com.bone.handle;

import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.bone.module.ServerData;

/**
* 启动流程
* @author Allen 2017年11月6日
*
*/
public class ServerSocketHandle extends ServerData implements Handle {

final Logger logger = LoggerFactory.getLogger(ServerSocketHandle.class);
private int post;
private int buf;

public ServerSocketHandle(int post, int buf) {
this.post = post;
this.buf = buf;
}

public ServerSocketHandle(int post) {
this.post = post;
}

@SuppressWarnings("unused")
private ServerSocketHandle() {
// TODO Auto-generated constructor stub
}

@Override
public void execute() throws Exception {
selector = Selector.open();
logger.info("[Server]多路复用器");
server = ServerSocketChannel.open();
logger.info("[Server]Server开启");
server.socket().bind(new InetSocketAddress(post));
logger.info("[Server]端口绑定{}", post);
server.configureBlocking(false);
logger.info("[Server]非阻塞设定");
server.register(selector, SelectionKey.OP_ACCEPT);
logger.info("[Server]时间监听");
buff = (buf == 0 ? 1 << 12 : buf);
logger.info("[Server]默认缓冲区{}{}", 1 << 12, "KB");
}
}

定义了启动项的模块控制

package com.bone.channel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.bone.code.Code;
import com.bone.handle.ServerSocketHandle;

/**
* 组件控制
* @author Allen 2017年11月6日
*
*/
public class Panel {

final Logger logger = LoggerFactory.getLogger(Panel.class);

private ServerSocketHandle socketHandle;
private BoneChannel channel;
private Code code;

public void setSocketHandle(ServerSocketHandle socketHandle) {
this.socketHandle = socketHandle;
}

public void setChannel(BoneChannel channel) {
this.channel = channel;
}

public void setCode(Code code) {
this.code = code;
}

public void run() {
try {
if (socketHandle == null || channel == null || code == null)
logger.error("serverSocketHandle or channel is null");
else {
socketHandle.execute( );
channel.ini(code);
}
} catch (Exception e) {
e.printStackTrace();
}

}
}


事件操作流

package com.bone.channel;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.bone.code.Code;
import com.bone.module.ServerData;
import com.bone.module.SocketClient;

/**
*
*
* @author Allen 2017年11月6日
*
*/
public abstract class BoneChannel extends ServerData implements Channel {

final Logger logger = LoggerFactory.getLogger(BoneChannel.class);
private Code code;

public void ini(Code code) throws IOException {
this.code = code;
int selectCount=0;
while ((selectCount = selector.select())>0) {
logger.debug("selectCount:{}", selectCount);
selector.selectedKeys().parallelStream().forEach(sk -> {
selector.selectedKeys().remove(sk);

if (sk.isAcceptable()) { //接入事件
accept(sk);
}
if (sk.isReadable()) { //数据流入事件
SocketClient sclient = sl((SocketChannel) sk.channel());
ByteBuffer byteBuff = ByteBuffer.allocate(buff);
try {
while (sclient.getSocketChannel().read(byteBuff) > 0) {
byteBuff.flip();
read(sclient, code.decode(byteBuff));
}
sk.interestOps(SelectionKey.OP_READ); //继续关注此事件
} catch (IOException e) {
leave(sk, sclient);
} catch (Exception e) {
leave(sk, sclient);
e.printStackTrace();
}
}

});
}

}

@Override
public void accept(SelectionKey sk) {
try {
SocketChannel sc = server.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ); //clientSocketChannel注册到多路复用器中以READ事件
this.accept(sl(sc));
sk.interestOps(SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void read(SocketClient socketClient, Object value) {
}

@Override
public void leave(SelectionKey sk, SocketClient sclient) {
try {
String key = ((SocketChannel) sk.channel()).getRemoteAddress().toString();
socketClients.remove(key);
sk.cancel();
if (sk.channel() != null) {
try {
sk.channel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (IOException e1) {
e1.printStackTrace();
}

}

public abstract void accept(SocketClient sc);

private SocketClient sl(SocketChannel sc) {

try {
String key = sc.getRemoteAddress().toString();
SocketClient socketEvent = null;
if (!socketClients.containsKey(key))
socketEvent = putSocketClients(key, new SocketClient(sc, code));
socketEvent = socketClients.get(key);
return socketEvent;
} catch (IOException e2) {
e2.printStackTrace();
return null;
}

}
}


编码UTF-8

/**
* UTF8编解码
* @author Allen 2017年11月6日
*
*/
public class StringCode implements Code {

private final Charset UTF8 = Charset.forName("utf-8");

@Override
public String decode(ByteBuffer bb) throws Exception {
return UTF8.decode(bb).toString();
}
@Override
public void encode(SocketChannel socketChannel, Object ob) throws Exception {
socketChannel.write(UTF8.encode(ob.toString()));
}
}

抽象接口

/**
*
* @author Allen 2017年11月6日
*
*/
public interface Code {
/**
* 解码
*/
public Object decode(ByteBuffer bb) throws Exception;

/**
* 编码
*/
public void encode(SocketChannel socketChannel, Object ob) throws Exception;

}
/**  *  * @author Allen 2017年11月6日 * */public interface Handle {public void execute() throws Exception;}
/** * 事件 *  * @author Allen 2017年11月6日 * */public interface Channel {	/**	 * 	 * 接入	 */	public void accept(SelectionKey sk);	/**	 * 读	 */	public void read(SocketClient socketClient, Object value);	/**	 * 离线	 */	public void leave(SelectionKey sk, SocketClient sclient);}


调用

	public static void main(String[] args) throws IOException {

Panel server = new Panel();
server.setSocketHandle(new ServerSocketHandle(6666, 1 << 12));
server.setChannel(new MyChannel());
server.setCode(new StringCode());
server.run();
}
public class MyChannel extends BoneChannel {	@Override 	public void read(SocketClient socketClient, Object value) {		System.out.println("[消息][" + socketClient.getID() + "][" + socketClient.clientSize() + "] :" + value);		 socketClients.values().forEach(v -> v.send("[全体] :" + value));	}  	@Override	public void leave(SelectionKey sk, SocketClient sclient) {		super.leave(sk, sclient);		socketClients.values().forEach(v -> v.send("[系统] :客户端" + sclient.getID() + " 离线了 "));	}	@Override	public void accept(SocketClient socketClient) {		System.out.println("[消息][" + socketClient.getID() + "]接入");		System.out.println("[消息]总人数[" + socketClient.clientSize() + "]");		socketClients.values().forEach(v -> v.send("[系统] :客户端" + socketClient.getID() + " 加入了 "));	}}

#在此例中,Code Channel Handle可以根据自己的要求进行定义