Java NIO案例

时间:2023-03-09 02:27:15
Java NIO案例

Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)   http://blog.****.net/anxpp/article/details/51512200

Java NIO框架Netty简单使用   http://blog.****.net/anxpp/article/details/52139155

使用最新Netty实现一个简单的聊天程序    http://blog.****.net/anxpp/article/details/52139155

服务端、客户端

package com.dsp.nio;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set; /**
*
* 监控是否可连接、可读、可写
*
* 代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler
*
*/
public class Reactor implements Runnable { private static Logger log = LoggerFactory.getLogger(Reactor.class); final Selector selector; final ServerSocketChannel serverSocket; /**
* 服务端配置初始化,监听端口
* @param port
* @throws IOException
*/
public Reactor(int port) throws IOException {
this.selector = Selector.open();
this.serverSocket = ServerSocketChannel.open();
this.serverSocket.socket().bind(new InetSocketAddress(port));
this.serverSocket.configureBlocking(false);
SelectionKey selectionKey = this.serverSocket.register(selector, SelectionKey.OP_ACCEPT);
// 利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor
selectionKey.attach(new Acceptor());
log.info("===>>> attach(new Acceptor())");
}
/*
* SPI
*/
// Alternatively, use explicit SPI provider
// SelectorProvider selectorProvider = SelectorProvider.provider();
// selector = selectorProvider.openSelector();
// serverSocket = selectorProvider.openServerSocketChannel(); /**
* 分发请求
*
* @param selectionKey
*/
void dispatch(SelectionKey selectionKey) {
Runnable run = (Runnable) (selectionKey.attachment());
if (run != null) {
run.run();
}
} /**
* 监听连接和channel是否就绪
*/
public void run() {
try {
/**
* 线程未被中断
*/
while (!Thread.interrupted()) {
int readySize = this.selector.select();
log.info("I/O ready size = {}", readySize);
Set<?> selectedKeys = this.selector.selectedKeys();
Iterator<?> iterator = selectedKeys.iterator();
// Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
while (iterator.hasNext()) {
/*
* 一个新的连接,第一次出发Accepter线程任务,之后触发Handler线程任务
*/
SelectionKey selectionKey = (SelectionKey) iterator.next();
log.info("===>>> acceptable = {}, connectable = {}, readable = {}, writable = {}.",
              selectionKey.isAcceptable(), selectionKey.isConnectable(),
              selectionKey.isReadable(), selectionKey.isWritable());
dispatch(selectionKey);
}
selectedKeys.clear();
}
} catch (IOException ex) {
log.info("reactor stop!" + ex);
}
} /**
* 处理新连接
*
* @author dsp
*
*/
class Acceptor implements Runnable { @Override
public void run() {
try {
log.debug("===>>> ready for accept!");
SocketChannel socketChannel = serverSocket.accept();
if (socketChannel != null) {
new Handler(selector, socketChannel);
}
} catch (IOException ex) {
/* . . . */
}
} } }
package com.dsp.nio;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; /**
*
* 处理读写
*
*/
final class Handler implements Runnable { private static Logger log = LoggerFactory.getLogger(Reactor.class); static final int MAX_IN = 1024; static final int MAX_OUT = 1024; ByteBuffer inputBuffer = ByteBuffer.allocate(MAX_IN); ByteBuffer output = ByteBuffer.allocate(MAX_OUT); final SocketChannel socketChannel; final SelectionKey selectionKey; static final int READING = 0, SENDING = 1; int state = READING; /**
* 注意在Handler里面又执行了一次attach,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler,从而开始了数据的
* “读 -> 处理 -> 写 -> 发出” 等流程处理。
*
* @param selector
* @param socketChannel
* @throws IOException
*/
Handler(Selector selector, SocketChannel socketChannel) throws IOException {
this.socketChannel = socketChannel;
this.socketChannel.configureBlocking(false);
this.selectionKey = this.socketChannel.register(selector, 0);
this.selectionKey.attach(this);
this.selectionKey.interestOps(SelectionKey.OP_READ);
// selector.wakeup();
} /**
* 只是返回true,具体的判断没有实现
*
* @return
*/
boolean inputIsComplete() {
return true;
} /**
* 只是返回true,具体的判断没有实现
*
* @return
*/
boolean outputIsComplete() {
return true;
} /**
* 处理数据(无具体实现)
*/
void process(String msg) {
// output.put("hello world, hello dsp!".getBytes());
String outMsg = "out + " + msg;
output.put(outMsg.getBytes());
output.flip();
} /**
* 读取请求数据并处理
*
* @throws IOException
*/
void read() throws IOException {
log.info("===>>> read into bytebuffer from socketchannel inputs.");
if (inputIsComplete()) {
socketChannel.read(inputBuffer);
inputBuffer.flip();
byte[] inputBytes = new byte[inputBuffer.limit()];
inputBuffer.get(inputBytes);
String inputString = new String(inputBytes);
log.info("===>>> 从客户端读取请求信息 = {}", inputString);
log.info("===>>> read complete."); process(inputString); state = SENDING;
// 读完了数据之后,注册OP_WRITE事件
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
} /**
* 返回响应信息
*
* @throws IOException
*/
void send() throws IOException {
log.info("===>>> write into socketchannel from bytebuffer outputs");
socketChannel.write(output);
if (outputIsComplete()) {
// The key will be removed from all of the selector's key sets during the next
// selection operation.
selectionKey.cancel();
// 关闭通过,也就关闭了连接
socketChannel.close();
log.info("===>>> close socketchannel after write complete");
}
} @Override
public void run() {
try {
if (state == READING)
read();
else if (state == SENDING)
send();
} catch (IOException ex) {
/* . . . */
}
} }
package com.dsp.nio;

import java.io.IOException;

/**
*
* Model: Reactor in SingleThread
*
* 利用NIO多路复用机制,多路IO复用一个线程
*
* @author dsp
*
*/
public class ReactorInSingleThreadServer { public static void main(String args[]) throws IOException {
Reactor reactor = new Reactor(9999);
reactor.run(); // 不会开启线程,相当于普通方法调用
} }
package com.dsp.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
*
* 访问NIO服务器的客户端
*
* @author dsp
*
*/
public class ReactorInSingleThreadClient extends Thread { private static Logger log = LoggerFactory.getLogger(ReactorInSingleThreadClient.class); private static LinkedBlockingQueue<Thread> failureQueue = new LinkedBlockingQueue<Thread>(); @Override
public void run() {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = SocketChannel.open();
boolean connected = socketChannel.connect(new InetSocketAddress(9999));
if (connected) {
log.info("===>>> 和服务器 {} 已连接...", socketChannel.getRemoteAddress());
/*
* 请求
*/
String msg = "in + 你好,dsp!" + Thread.currentThread().getName();
buffer.put(msg.getBytes());
buffer.flip();
socketChannel.write(buffer);
buffer.clear(); /*
* 响应
*/
buffer.clear();
socketChannel.read(buffer);
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
String string = new String(data);
log.info("===>>> " + string);
buffer.clear(); socketChannel.close();
} else {
log.error("连不上服务器...");
}
} catch (java.net.ConnectException e) {
failureQueue.offer(this);
} catch (Exception e) {
e.printStackTrace();
}
} public static void main(String[] args) throws IOException, InterruptedException {
int maxThreads = 3000;
while (maxThreads-- > 0) {
new Thread(new ReactorInSingleThreadClient()).start();
} Thread.sleep(Integer.MAX_VALUE);
} }

^_^