前言:本篇文章并没有深刻介绍nio相关的npi或者详细的使用说明,适合刚刚了解nio并且没写过例子的同学查看。如有错误,还请多多指正。
想入门学习nio技术的话请戳这里 并发编程网nio
最近看了下nio相关的知识,但还是有很多不明白的地方。写了一个小小的demo,可以实现nio通信。
SocketChannel通信代码
注: 以下代码改编自网络
service
public class MyService {
public static Selector selector = null;
public static void main(String[] args) {
MyService.init();
}
public static void init() {
initSelector();// 初始化selector
initServerSocketChannel(); // 初始化serverSocketChannel
run();
}
// first
public static void initSelector() {
try {
selector = Selector.open();// 打开selector
} catch (IOException e) {
// 初始化selector失败
e.printStackTrace();
}
}
public static void initServerSocketChannel() {
try {
ServerSocketChannel server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress(7777), 1024);
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
// 初始化serverSocket失败
e.printStackTrace();
}
}
public static void run() {
while (true) {
try {
selector.select(1000); // 阻塞selector
// ================如果有新连接
Set<SelectionKey> selectedKeys = selector.selectedKeys();// 获得事件集合;
// ================遍历selectedKeys
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey key = null;
while (iterator.hasNext()) {
key = iterator.next();// 获得到当前的事件
// ===============处理事件
handle(key);
// ===============
iterator.remove(); // 移除事件
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 初始化seclector和serverSocket
// 当一个selector上有新的事件有反应后 select();
// 获得获得事件集合
// 遍历集合事件
// 处理事件
public static void handle(SelectionKey key) {
try {
// 连接就绪
if (key.isAcceptable()) {
handleAcceptable(key);
}
// 读就绪
if (key.isReadable()) {
handelReadable(key);
}
} catch (IOException e) {
key.cancel();
if (key.channel() != null) {
try {
key.channel().close();
} catch (IOException e1) {
}
}
}
}
// 处理读事件
public static void handelReadable(SelectionKey key) throws IOException {
// ==================我们要将数据从通道读到buffer里
SocketChannel ssc = (SocketChannel) key.channel(); // TODO:
// 为什么这里是socketChannel
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
String content = "服务端返回了: ";
int readBytes = ssc.read(byteBuffer);// channel ==> buffer
if (readBytes > 0) {// 代表读完毕了,准备写(即打印出来)
byteBuffer.flip(); // 为write()准备
// =====取出buffer里的数据
byte[] bytes = new byte[byteBuffer.remaining()]; // 创建字节数组
byteBuffer.get(bytes);// 将数据取出放到字节数组里
content += new String(bytes);
content += "__________";
doWrite(ssc, content);
}
}
// 处理连接事件
public static void handleAcceptable(SelectionKey key) throws IOException {
// 获得对应的ServerSocketChannel TODO: 这里为什么是socketChannel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// 得到对应的SocketChannel TODO:accpet是什么意思
SocketChannel channel = ssc.accept();// 在非阻塞模式下,accept()可能为null
// 处理socketChannel
channel.configureBlocking(false); // TODO: 为什么设置非阻塞
channel.register(selector, SelectionKey.OP_READ); // TODO: 将准备状态转化为读状态
// 将key对应Channel设置为准备接受其他请求
key.interestOps(SelectionKey.OP_ACCEPT);// TODO:
}
// ============= 发送消息
private static void doWrite(SocketChannel sc, String data) throws IOException {
byte[] req = data.getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(req.length);
byteBuffer.put(req);
byteBuffer.flip();
sc.write(byteBuffer);
if (!byteBuffer.hasRemaining()) {
System.out.println(data + " Send 2 Service successed");
}
}
}
client
public class MyClient {
public static void main(String[] args) {
MyClient.init();
}
// public ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<String>(8);
// static Charset charset = Charset.forName("UTF-8");
private static Selector selector = null;
private volatile static boolean stop = false;
private static SocketChannel channel = null;
public static void init() {
initSelector();// 初始化selector
initSocketChannel(); // 初始化serverSocketChannel
run();
}
// 初始化selector
public static void initSelector() {
try {
selector = Selector.open();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 初始化SocketChannel
public static void initSocketChannel() {
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress("127.0.0.1", 7777));
channel.register(selector, SelectionKey.OP_CONNECT);
} catch (ClosedChannelException e) {
System.out.println("client: 失去主机连接");
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void run() {
try {
while (!stop) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
handle(key);
iterator.remove();
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void handle(SelectionKey key) throws IOException {
// 连接就绪
try {
if (key.isConnectable()) {
handleConnectable(key);
}
// 读就绪
if (key.isReadable()) {
handelReadable(key);
}
} catch (Exception e) {
key.cancel();
if (key.channel() != null) {
try {
key.channel().close();
} catch (IOException e1) {
}
}
}
}
private static void handelReadable(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int temp = sc.read(buffer); // 从channel读到buffer
String content = "来自服务端的: ";
if (temp > 0) {// 代表读完毕了,准备写(即打印出来)
buffer.flip(); // 为write()准备
// =====取出buffer里的数据
byte[] bytes = new byte[buffer.remaining()]; // 创建字节数组
buffer.get(bytes);// 将数据取出放到字节数组里
content += new String(bytes);
content += "============";
System.out.println(content);
// doWrite(sc, content);
}
// key.interestOps(SelectionKey.OP_READ);// TODO:
}
private static void handleConnectable(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
if (sc.finishConnect()) {
// 将关注的事件变成read
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc, "dddddd");
}
}
private static void doWrite(SocketChannel sc, String data) throws IOException {
byte[] req = data.getBytes();
ByteBuffer byteBuffer = ByteBuffer.wrap(req);
byteBuffer.put(req);
byteBuffer.flip();
sc.write(byteBuffer);
// if (!byteBuffer.hasRemaining()) {
// System.out.println("Send successed : " + data);
// }
}
}
总结:
1. byteBuffer 用于数据的读取。从channel中获得是读,放到channel里是写。
大概是这种感觉 buffer —->channel——>buffer
2. socketChannel通信里有很多代码的格式是重复的。 将相同部分抽象之后,还是很有规律的。
- 初始化函数: 初始化 selector 和 ServerSocketChannel(或SocketChannel)
- 运行函数:死循环,用途在于等待新的连接,并且收到新的连接之后,调用总体处理函数。处理后将事件移除
- 总体处理函数:判断事件类型,并交给相应的处理函数处理,并且处理异常,在异常里取消key,并把channel关闭
- 工具函数:即将String变成byte构建buffer,再写入到channel里。