作者: Grey
原文地址:
新连接的接入分为3个过程
-
检测到有新连接。
-
将新连接注册到 worker 线程。
-
注册新连接的读事件。
检测新连接的代码在NioEventLoop
中的processSelectedKey()
方法中
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
......
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
......
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
.....
}
启动一个 Netty 服务端和 Netty 客户端,在unsafe.read()
这一行打断点,可以得到这里的unsafe
就是NioMessageUnsafe
,进入NioMessageUnsafe
的read()
方法,
这个方法主要做的事情就是:创建,设置并绑定NioSocketChannel
。
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
......
do {
// 创建`NioSocketChannel`
int localRead = doReadMessages(readBuf);
......
} while (continueReading(allocHandle));
......
// 设置并绑定 NioSocketChannel
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
......
}
}
创建NioSocketChannel
调用的是doReadMessages()
方法,通过Debug,可以看到doReadMessage()
来自于NioServerSocketChannel
中
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
可以看到此时调用的是 Java 底层的accept()
方法,创建了一条 JDK 层面的Channel
, Netty 将其封装成自定义的NioSocketChannel
,并加入一个List
。
继续 Debug,进入 NioSocketChannel 的构造方法中,调用的是AbstractNioByteChannel
的构造方法
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
这个方法类似在 NIO 编程中,注册 OP_READ 事件,表示 Channel 对读事件感兴趣。
接下来是设置并绑定NioSocketChannel
,处理每个NioSocketChannel
,通过 Debug 可以来到AbstractUnsafe
的register0()
方法
private void register0(ChannelPromise promise) {
// 注册Selector
doRegister();
// 执行 handler
pipeline.invokeHandlerAddedIfNeeded();
// 传播 ChannelRegistered事件
pipeline.fireChannelRegistered();
// 注册读事件
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
}
这个方法主要完成的事情就是:
-
将
NioSocketChannel
注册到Selector
上 -
配置自定义的
Handler
。 -
将连接注册事件传播下去,调用了每个
Handler
的channelRegistered
方法。 -
注册读事件。
完整代码见:hello-netty
本文所有图例见:processon: Netty学习笔记
更多内容见:Netty专栏