EndPoint提供基础的网络IO服务,用来实现网络连接和控制,它是服务器对外I/O操作的接入点。主要任务是管理对外的socket连接,同时将建立好的socket连接交到合适的工作线程中去。
里面两个主要的属性类是Acceptor和Poller、SocketProcessor
Acceptor
Acceptor类实现了Runnable接口,主要用于接收网络请求,建立连接,连接建立之后,将一个SocketChannel对象包装成一个NioChannel,并注册到Poller中。由Poller来负责执行数据的读取和业务执行。
我们看一下Acceptor的run方法:
public void run() {
SocketChannel socket = serverSock.accept();//从监听的serversocket中获取新的连接
setSocketOptions(socket);//设置通道的属性
……
}
protected boolean setSocketOptions(SocketChannel socket) {
NioChannel = channel = new NioChannel(socket, bufhandler);//将通道包装成NioChannel
getPoller0().register(channel);//从poller数组中选择一个poller,将channel注册到poller中
……
}
Poller
Poller实现了Runnable接口,在NioEndpoint的时候,会初始化pollers数组,同时启动pollers数组中的线程,让pollers开始工作。
封装后socketchannel的放入Poller线程内部维护的一个PollerEvent队列中,然后在Poller线程运行时处理队列,将socketchannel注册到这个Poller的Selector上。
当事件到来的时候,Selector发现要处理的事件,通过selector.select系列方法来获取数据,然后经由processKey到processSocket方法,封装成一个SocketProcessor对象后,放在EndPoint的线程池中执行。
SocketChannel是如何注册到Poller中的?
protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();//内部维护的事件队列
public void register(final NioChannel socket)
{
socket.setPoller(this);
KeyAttachment key = keyCache.poll();
final KeyAttachment ka = key!=null?key:new KeyAttachment();
ka.reset(this,socket,getSocketProperties().getSoTimeout());
PollerEvent r = eventCache.poll();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);//socketchannel、key一同加入到了events队列
}
SocketChannel是如何注册到每个Poller的selector中的?答案在event()方法中,在该方法中遍历events队列,依次执行run方法
public boolean events() {
while ( (r = (Runnable)events.poll()) != null ) {
result = true;
r.run();
}
……
}
public void run() {
if ( interestOps == OP_REGISTER ) {
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);//注册到selector中
}
else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
int ops = key.interestOps() | interestOps;
att.interestOps(ops);
key.interestOps(ops);
att.setCometOps(ops);//另外一种注册方法?
}
……
}
Poller的执行在其run方法中,主要是将请求封装成SocketProcessor对象,交给线程池处理。
public void run() {
if ( keyCount == 0 ) hasEvents = (hasEvents | events());//通过事件机制监控感兴趣的网络事件,见上面的events()分析
//遍历渠道上到来的key,交给processor去处理
Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
for()
{
SelectionKey sk = (SelectionKey) iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
processKey(sk, attachment);
}
}
//processKey(sk, attachment)调用processSocket(channel, SocketStatus.OPEN),最终使用Endpoint的线程池执行请求
protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false);
attachment.setCometNotify(false); //will get reset upon next reg
sc = new SocketProcessor(socket,status);
if ( dispatch ) executor.execute(sc);=====>任务被封装成SocketProcessor对象,在成功获取线程池后,则通过线程池来进行socket数据数据的读写操作。
else sc.run();
……
}
SocketProcessor
请求到达socketProcess之后,首先执行其run方法,请求被转移到handler.process,根据上下文,我们知道这了的hander指的是Http11ConnectionHandler
public void run() {
(status==null)?(handler.process(socket)==Handler.SocketState.CLOSED) :
(handler.event(socket,status)==Handler.SocketState.CLOSED);
}
参考文献:
http://blog.****.net/yanlinwang/