步骤一:打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道,代码示例如下:
ServerSocketChannel acceptorSvr = ServerSocketChannel.open();
步骤二:绑定监听端口,设置连接为非阻塞模式,示例代码如下:
acceptorSvr.socket().bind(new InetSocketAddress(InetAddress.getByName(“IP”), port));
acceptorSvr.configureBlocking(false);
步骤三:创建Reactor线程,创建多路复用器并启动线程,代码如下:
Selector selector = Selector.open();
New Thread(new ReactorTask()).start();
步骤四:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件,代码如下:
SelectionKey key = acceptorSvr.register( selector, SelectionKey.OP_ACCEPT, ioHandler);
步骤五:多路复用器在线程run方法的无限循环体内轮询准备就绪的Key,代码如下:
int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey)it.next();
// ... deal with I/O event ...
}
步骤六:多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路,代码示例如下:
SocketChannel channel = svrChannel.accept();
步骤七:设置客户端链路为非阻塞模式,示例代码如下:
channel.configureBlocking(false);
channel.socket().setReuseAddress(true);
步骤八:将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,用来读取客户端发送的网络消息,代码如下:
SelectionKey key = socketChannel.register( selector, SelectionKey.OP_READ, ioHandler);
步骤九:异步读取客户端请求消息到缓冲区,示例代码如下:
int readNumber = channel.read(receivedBuffer);
步骤十:对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排,示例代码如下:
Object message = null;
while(buffer.hasRemain())
{
byteBuffer.mark();
Object message = decode(byteBuffer);
if (message == null)
{
byteBuffer.reset();
break;
}
messageList.add(message );
}
if (!byteBuffer.hasRemain())
byteBuffer.clear();
else
byteBuffer.compact();
if (messageList != null & !messageList.isEmpty())
{
for(Object messageE : messageList)
handlerTask(messageE);
}
步骤十一:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端,示例代码如下:
socketChannel.write(buffer);
注意:如果发送区TCP缓冲区满,会导致写半包,此时,需要注册监听写操作位,循环写,直到整包消息写入TCP缓冲区,此处不赘述,后续Netty源码分析章节会详细分析Netty的处理策略。