nio高并发编程

时间:2022-12-17 18:02:59

之前http://blog.csdn.net/sunmenggmail/article/details/8638480

已经整理过,这次是2.0版


参考:

http://daizuan.iteye.com/blog/1112909

http://daizuan.iteye.com/blog/1113471

http://www.cnblogs.com/pingh/archive/2013/07/30/3224990.html

http://www.cnblogs.com/ajian005/archive/2012/09/27/2753662.html(相当好,总结了开源框架)

陷阱1:处理事件忘记移除key
在select返回值大于0的情况下,循环处理
Selector.selectedKeys集合,每处理一个必须从Set中移除

nio高并发编程
Iterator<SelectionKey> it=set.iterator();
While(it.hasNext()){
SelectionKey key
=it.next();
it.remove();
//切记移除
„„处理事件
}
nio高并发编程

 不移除的后果是本次的就绪的key集合下次会再次返回,导致无限循环,CPU消耗100%

 陷阱2:Selector返回的key集合非线程安全

Selector.selectedKeys/keys 返回的集合都是非线程安全的
Selector.selectedKeys返回的可移除
Selector.keys 不可变
对selected keys的处理必须单线程处理或者适当同步

陷阱3:正确注册Channel和更新interest
直接注册不可吗?
channel.register(selector, ops, attachment);
不是不可以,效率问题
至少加两次锁,锁竞争激烈
Channel本身的regLock,竞争几乎没有
Selector内部的key集合,竞争激烈
更好的方式:加入缓冲队列,等待注册,reactor单线程处理

nio高并发编程
If(isReactorThread()){
channel.register(selector,ops,attachment);
}
else{
register.offer(newEvent(channel,ops,attachment));
selector.wakeup();
}
nio高并发编程

同样,SelectionKey.interest(ops)
在linux上会阻塞,需要获取selector内部锁做同步
在win32上不会阻塞
屏蔽平台差异,避免锁的激烈竞争,采用类似注册channel的方式:

nio高并发编程
if (this.isReactorThread()) {
key.interestOps(key.interestOps()
| SelectionKey.OP_READ);
}
else {
this.register.offer(new Event(key,SelectionKey.OP_READ));
selector.wakeup();
}
nio高并发编程

 

陷阱4:正确处理OP_WRITE
OP_WRITE处理不当很容易导致CPU 100%
OP_WRITE触发条件:
   前提:interest了OP_WRITE
   触发条件:
        socket发送缓冲区可写
        远端关闭
        有错误发生
正确的处理方式:
   仅在已经连接的channel上注册
   仅在有数据可写的时候才注册
   触发之后立即取消注册,否则会继续触发导致循环
   处理完成后视情况决定是否继续注册
     没有完全写入,继续注册
     全部写入,无需注册

陷阱5:正确取消注册channel
SelectableChannel一旦注册将一直有效直到明确取消
怎么取消注册?
   channel.close(),内部会调用key.cancel()
   key.cancel();
   中断channel的读写所在线程引起的channel关闭
但是这样还不够!
   key.cancel()仅仅是将key加入cancelledKeys
   直到下一次select才真正处理
   并且channel的socketfd只有在真正取消注册后才会close(fd)

后果是什么?
  服务端,问题不大,select调用频繁
  客户端,通常只有一个连接,关闭channel之后,没有调用select就关闭了selector
  sockfd没有关闭,停留在CLOSE_WAIT状态
正确的处理方式,取消注册也应当作为事件交给reactor处理,及时wakeup做select
适当的时候调用selector.selectNow()
  Netty在超过256连接关闭的时候主动调用一次selectNow

nio高并发编程
static final int CLEANUP_INTERVAL=256;
private boolean cleanUpCancelledKeys()throws IOException{
if(cancelledKeys>=CLEANUP_INTERVAL){
cancelledKeys
=0;
selector.selectNow();
returntrue;
}
returnfalse;
}
//channel关闭的时候
channel.socket.close();
cancelledKeys
++;
nio高并发编程

陷阱6:同时注册OP_ACCPET和OP_READ,同时注册OP_CONNECT和OP_WRITE
在底层来说,只有两种事件:read和write
Java NIO还引入了OP_ACCEPT和OP_CONNECT
  OP_ACCEPT、OP_READ == Read
  OP_CONNECT、OP_WRITE == Write
同时注册OP_ACCEPT和OP_READ ,或者同时注册OP_CONNECT和OP_WRITE在不同平台上产生错误的行为,避免这样做!

陷阱7:正确处理connect
SocketChannel.connect方法在非阻塞模式下可能返回false,切记判断返回值
    如果是loopback连接,可能直接返回true,表示连接成功
    返回false,后续处理
       注册channel到selector,监听OP_CONNECT事件
       在OP_CONNECT触发后,调用SocketChannel.finishConnect成功后,连接才真正建立
陷阱:
    没有判断connect返回值
    没有调用finishConnect
    在OP_CONNECT触发后,没有移除OP_CONNECT,导致SelectionKey一直处于就绪状态,空耗CPU
       OP_CONNECT只能在还没有连接的channel上注册

忠告

尽量不要尝试实现自己的nio框架,除非有经验丰富的工程师
尽量使用经过广泛实践的开源NIO框架Mina、Netty3、xSocket
尽量使用最新稳定版JDK
遇到问题的时候,也许你可以先看下java的bug database



elector自身是线程安全的,而他的key set却不是。在一次选择发生的过程中,对于key的关心事件的修改要等到下一次select的时候才会生效。 另外,key和其代表的channel有可能在任何时候被cancel和close。因此存在于key set中的key并不代表其key是有效的,也不代表其channel是open的。如果key有可能被其他的线程取消或关闭channel,程序必须小 心的同步检查这些条件。 

阻塞了的select可以通过调用selector的wakeup方法来唤醒。nio高并发编程


http://blog.csdn.net/cutesource/article/details/6192016

如何正确使用NIO来构架网络服务器一直是最近思考的一个问题,于是乎分析了一下Jetty、Tomcat和Mina有关NIO的源码,发现大伙都基于类似的方式,我感觉这应该算是NIO构架网络服务器的经典模式,并基于这种模式写了个小小网络服务器,压力测试了一下,效果还不错。废话不多说,先看看三者是如何使用NIO的。

Jetty Connector的实现

先看看有关类图:

nio高并发编程

其中:

SelectChannelConnector负责组装各组件

SelectSet负责侦听客户端请求

SelectChannelEndPoint负责IO的读和写

HttpConnection负责逻辑处理

在整个服务端处理请求的过程可以分为三个阶段,时序图如下所示:

阶段一:监听并建立连接

nio高并发编程

这一过程主要是启动一个线程负责accept新连接,监听到后分配给相应的SelectSet,分配的策略就是轮询。

阶段二:监听客户端的请求

nio高并发编程

这一过程主要是启动多个线程(线程数一般为服务器CPU的个数),让SelectSet监听所管辖的channel队列,每个SelectSet维护一个Selector,这个Selector监听队列里所有的channel,一旦有读事件,从线程池里拿线程去做处理请求

阶段三:处理请求

nio高并发编程

这一过程就是每次客户端请求的数据处理过程,值得注意的是为了不让后端的业务处理阻碍Selector监听新的请求,就多线程来分隔开监听请求和处理请求两个阶段。

由此可以大致总结出Jetty有关NIO使用的模式,如下图所示:

nio高并发编程

最核心就是把三件不同的事情隔离开,并用不同规模的线程去处理,最大限度地利用NIO的异步和通知特性


下面再来看看Tomcat是如何使用NIO来构架Connector这块

先看看Tomcat Connector这块的类图:

nio高并发编程

其中:

NioEndpoint负责组装各部件

Acceptor负责监听新连接,并把连接交给Poller

Poller负责监听所管辖的channel队列,并把请求交给SocketProcessor处理

SocketProcessor负责数据处理,并把请求传递给后端业务处理模块

在整个服务端处理请求的过程可以分为三个阶段,时序图如下所示:

阶段一:监听并建立连接

nio高并发编程

这一阶段主要是Acceptor监听新连接,并轮询取一个Poller ,把连接交付给Poller

阶段二:监听客户端的请求

nio高并发编程

这一过程主要是让每个Poller监听所管辖的channel队列,select到新请求后交付给SocketProcessor处理

阶段三:处理请求

nio高并发编程

这一过程就是从多线程执行SocketProcessor,做数据和业务处理

于是乎我们发现抛开具体代码细节,Tomcat和Jetty在NIO的使用方面是非常一致的,采用的模式依然是下图:

nio高并发编程


Mina框架

最后我们再看看NIO方面最著名的框架Mina,抛开Mina有关session和处理链条等方面的设计,单单挑出前端网络层处理来看,也采用的是与Jetty和Tomcat类似的模式,只不过它做了些简化,它没有隔开请求侦听和请求处理两个阶段,因此,宏观上看它只分为两个阶段。

先看看它的类图:

nio高并发编程

其中:

SocketAcceptor起线程调用SocketAcceptor.Work负责新连接侦听,并交给SocketIoProcessor处理

SocketIoProcessor起线程调用SocketIoProcessor.Work负责侦听所管辖的channel队列, select到新请求后交给IoFilterChain处理

IoFilterChain组装了mina的处理链条

在整个服务端处理请求的过程可以分为两个阶段,时序图如下所示:

阶段一:监听并建立连接

nio高并发编程

阶段二:监听并处理客户端的请求

nio高并发编程

 

总结来看Jetty、tomcat和Mina,我们也大概清楚了该如何基于NIO来构架网络服务器,通过这个提炼出来的模式,我写了个很简单的NIO Server,在保持连接的情况下,可以很轻松的保持6万连接(由于有65535连接限制),并能在负载只有3左右的情况下(4核),承担3到4万的TPS请求(当然做的事情很简单,仅仅是把buffer转化为自定义协议的包,然后再把包转为buffer写到客户端)。因此简单地实践一下可以证明这个模式的有效性,不妨再看看这个图,希望对大伙以后写server有用:

nio高并发编程



安装这个架构,写了个粗略的版本,以后有机会一定要看看jetty等是怎么优雅的实现的


//server

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.*;

public class Server {

private ConcurrentLinkedQueue<SelectionKey> m_conn = new ConcurrentLinkedQueue<SelectionKey>();
private ConcurrentLinkedQueue<SelectionKey> m_req = new ConcurrentLinkedQueue<SelectionKey>();
private final int m_processNum = 3;
private final int m_worksNum = 3;
private final int m_port = 3562;
private ServerSocketChannel channel ;
private boolean connQuEpt = true;
private boolean reqQuEpt = true;
private Selector selector;//for connection
private List<Selector> m_reqSelector = new ArrayList<Selector>();


public void listen() throws IOException{
selector = Selector.open();
channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.socket().bind(new InetSocketAddress(m_port));
channel.register(selector, SelectionKey.OP_ACCEPT);

new Thread(new ConnectionHander()).start();

//new Thread(new RequestManager()).start();
creatRequestHanders();

new Thread(new ProcessManager()).start();

}

/*class RequestManager implements Runnable {
private ExecutorService m_reqPool;
public RequestManager() {
m_reqPool = Executors.newFixedThreadPool(m_processNum, new RequestThreadFactor());
}
public void run() {
while (true) {

}
}
}*/

void creatRequestHanders() {
try {
for (int i = 0; i < m_processNum; ++i) {
Selector slt = Selector.open();
m_reqSelector.add(slt);
RequestHander req = new RequestHander();
req.setSelector(slt);
new Thread(req).start();
}
}
catch(IOException e) {
e.printStackTrace();
}
}

class ProcessManager implements Runnable {
private ExecutorService m_workPool;
public ProcessManager() {
m_workPool = Executors.newFixedThreadPool(m_worksNum);
}
public void run() {
SelectionKey key;
while(true) {
//太消耗cpu//应该要加一个wait,但是这样就有锁了
while((key = m_req.poll()) !=null) {
ProcessRequest preq = new ProcessRequest();
preq.setKey(key);
m_workPool.execute(preq);
}
}
}
}


/*class RequestThread extends Thread {
private Selector selector;
public RequestThread(Runnable r) {
super(r);
try {
selector = Selector.open();
}
catch(IOException e) {
e.printStackTrace();
//todo
}
}
}

class RequestThreadFactor implements ThreadFactory {
public Thread newThread(Runnable r) {
return new RequestThread(r);
}
}*/

//监视请求连接
class ConnectionHander implements Runnable {

int idx = 0;
@Override
public void run() {

System.out.println("listenning to connection");
while (true) {

try {
selector.select();
Set<SelectionKey> selectKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectKeys.iterator();

while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();

m_conn.add(key);
int num = m_reqSelector.size();
m_reqSelector.get(idx).wakeup();//防止监听request的进程都在堵塞中
idx =(idx + 1)%num;
}


}
catch(IOException e) {
e.printStackTrace();
}

}
}
}

//监视读操作
class RequestHander implements Runnable {
private Selector selector;
public void setSelector(Selector slt) {
selector = slt;
}
public void run() {
try {
SelectionKey key;
System.out.println(Thread.currentThread() + "listenning to request");
while (true) {
selector.select();
while((key = m_conn.poll()) != null) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();//接受一个连接
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
System.out.println(Thread.currentThread() + "a connected line");
}

Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while(it.hasNext()) {
SelectionKey keytmp = it.next();
it.remove();
if (keytmp.isReadable()) {
m_req.add(keytmp);
}

}
}
}
catch(IOException e) {
e.printStackTrace();
}
}
}

//读数据并进行处理和发送返回
class ProcessRequest implements Runnable {
SelectionKey key;
public void setKey(SelectionKey key) {
this.key = key;
}
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel sc = (SocketChannel) key.channel();
String msg = null;
try{
int readBytes = 0;
int ret;
try{
while((ret = sc.read(buffer)) > 0) {

}
}
catch(IOException e) {

}
finally {
buffer.flip();
}
if (readBytes > 0) {
msg = Charset.forName("utf-8").decode(buffer).toString();
buffer = null;
}
}
finally {
if(buffer != null)
buffer.clear();
}
try {
System.out.println("server received [ " + msg +"] from client address : " + sc.getRemoteAddress());
Thread.sleep(2000);
sc.write(ByteBuffer.wrap((msg + " server response ").getBytes(Charset.forName("utf-8"))));
}
catch(Exception e) {

}


}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Server server = new Server();
try {
server.listen();
}
catch(IOException e) {

}
}

}


//client

package javatest;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.*;

public class Client implements Runnable {
// 空闲计数器,如果空闲超过10次,将检测server是否中断连接.
private static int idleCounter = 0;
private Selector selector;
private SocketChannel socketChannel;
private ByteBuffer temp = ByteBuffer.allocate(1024);

public static void main(String[] args) throws IOException {
Client client= new Client();
new Thread(client).start();
//client.sendFirstMsg();
}

public Client() throws IOException {
// 同样的,注册闹钟.
this.selector = Selector.open();

// 连接远程server
socketChannel = SocketChannel.open();
// 如果快速的建立了连接,返回true.如果没有建立,则返回false,并在连接后出发Connect事件.
Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost", 3562));
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);

if (isConnected) {
this.sendFirstMsg();
} else {
// 如果连接还在尝试中,则注册connect事件的监听. connect成功以后会出发connect事件.
key.interestOps(SelectionKey.OP_CONNECT);
}
}

public void sendFirstMsg() throws IOException {
String msg = "Hello NIO.";
socketChannel.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
}

@Override
public void run() {
while (true) {
try {
// 阻塞,等待事件发生,或者1秒超时. num为发生事件的数量.
int num = this.selector.select(1000);
if (num ==0) {
idleCounter ++;
if(idleCounter >10) {
// 如果server断开了连接,发送消息将失败.
try {
this.sendFirstMsg();
} catch(ClosedChannelException e) {
e.printStackTrace();
this.socketChannel.close();
return;
}
}
continue;
} else {
idleCounter = 0;
}
Set<SelectionKey> keys = this.selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isConnectable()) {
// socket connected
SocketChannel sc = (SocketChannel)key.channel();
if (sc.isConnectionPending()) {
sc.finishConnect();
}
// send first message;
this.sendFirstMsg();
}
if (key.isReadable()) {
// msg received.
SocketChannel sc = (SocketChannel)key.channel();
this.temp = ByteBuffer.allocate(1024);
int count = sc.read(temp);
if (count<0) {
sc.close();
continue;
}
// 切换buffer到读状态,内部指针归位.
temp.flip();
String msg = Charset.forName("UTF-8").decode(temp).toString();
System.out.println("Client received ["+msg+"] from server address:" + sc.getRemoteAddress());

Thread.sleep(1000);
// echo back.
sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));

// 清空buffer
temp.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}