netty的线程模型对一个channel来说是单线程的,也就是说这个channel的所有读写事件都是同一个线程执行的,避免了多线程产生的并发问题.而一个eventloop是可以被多个channel绑定的,那么每次服务器连接一个channel之时,netty时如何知道使用哪个线程的呢?
本文描述背景假设对netty的使用比较熟悉,例如 ChannelContext,ChannelPipeline,EventLoopGroup的概念,netty 的网络模型 NIO , netty 的reactor线程模型等.
netty中eventloop线程和客户端管道的绑定是怎么实现的?
netty通过java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object)
方法将自身实现的NioSocketChannel和SelectionKey,原生的socket channel绑定在了一起.
NioSocketChannel(AbstractNioChannel.NioUnsafe负责创建)在初始化过程中(由ServerBootstrapAcceptor
负责添加pipeline等)时会设置eventloop变量的,
这样每次selectionkey事件触发就知道是哪个niosocketchannel,然后就知道了eventloop(执行线程).具体代码实现位置见下文
NioEventLoopGroup初始化时做了什么?
NioEventLoopGroup 是一个多线程池,初始化时根据设置的线程大小依次创建EventLoop对象,EventLoop也是一个线程池抽象,只不过它是单线程的
代码摘要:
NioEventLoopGroup()
MultithreadEventExecutorGroup(...)
for (int i = 0; i < nThreads; i ++) {
EventExecutor[i] = newChild(executor,args)
return (NioEventLoop) NioEventLoopGroup#newChild(...)//注意这里返回的NioEventLoop是整个服务端的boss线程执行入口
}
netty的 ServerChannel 的 register 过程?
通过反射实例化 NioServerSocketChannel , 然后向 nio select 中注册一个 ops=0的selectkey .具体过程见下面代码
代码摘要:
//代码中 this 指的是 NioServerSocketChannel 服务端channel
ServerBootstrap.bind(port)
AbstractBootstrap.doBind(port)
initAndRegister()
//通过反射 NioServerSocketChannel.class生成channel实例
channel = channelFactory.newChannel()
init(channel)
setChannelOptions//设置options
channel.attr().set()//设置属性
ChannelPipeline p = channel.pipeline();//获取pipeline
p.addLast(ServerBootstrapAcceptor);//注意这个 ServerBootstrapAcceptor 很重要,目的见下文
config().group().register(channel);
//最终会调用
AbstractUnsafe#register(this, promise)
//设置变量
AbstractChannel.this.eventLoop = eventLoop;//将boss线程和netty的 nio server channel绑定
//然后调用nio的doRegister 设置
AbstractNioChannel#doRegister()
selectionKey =javaChannel().register(eventLoop().unwrappedSelector(), 0, this);//将netty的nio server channel绑定到java原生的selectkey的att参数上.
doBind0(...)
io.netty.channel.socket.nio.NioServerSocketChannel#doBind()//端口绑定
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
的逻辑?
这里的 register方法,服务端管道NioServerSocketChannel
初始化时会调用 , 每个客户端NioSocketChannel
初始化时也会被ServerBootstrapAcceptor
触发调用
group().register(channel);
//最终会调用
AbstractUnsafe#register(this, promise)
//设置变量
AbstractChannel.this.eventLoop = eventLoop;//n1.将线程和netty的niochannel绑定
//然后调用nio的doRegister 设置
AbstractNioChannel#doRegister()
selectionKey =javaChannel().register(eventLoop().unwrappedSelector(), 0, this);//n2.将netty的niochannel绑定到java原生的selectkey的att参数上.
//通过N1和N2两步就将selector中的事件和netty自己实现的niochannel(内部持有一个线程应用)绑定到了一起.这样如果触发了读事件的SelectKey,netty通过调用 SelectKey的attachment()方法就可以获取channel了.
下列地方会调用SelectionKey.attachment()
方法:
- 解决jdk原生nio selector 空转的BUG问题
io.netty.channel.nio.NioEventLoop#rebuildSelector()
//解决BUG的方式就是依次取出selectKey里面的att对象(也就是channeel),然后重新创建selector
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
...
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
}
...
selector = newSelectorTuple.selector;
...
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
- selector 事件处理
io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized()
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
...
}
- 连接关闭
io.netty.channel.nio.NioEventLoop#closeAll
NioServerSocketChannel 初始化做了什么?
NioServerSocketChannel的初始化过程会打开 nio 的 selector
NioServerSocketChannel()
//打开 nio selector
SelectorProvider.openServerSocketChannel();
super(...)
AbstractNioChannel.super(...)
AbstractChannel()
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor
的作用
ServerBootstrapAcceptor 负责配置NioSocketChannel的pipeline并且绑定一个eventloop[MultithreadEventLoopGroup.next().register(channel)]
ServerBootstrapAcceptor 本身也是一个 ChannelInboundHandlerAdapter ,它是在 ServerChannel 启动时被添加到 ChannelPipeline 中的,用来初始化每个客户端连接的 NioSocketChannel .
//看这个读方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//注意这里的 register方法就是会执行之前所说的 绑定 NioScoketChannel 到 SelectionKey
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
总结netty服务端接入客户端的处理流程
- Netty 服务端启动会创建一个EventLoop线程池(如果是NIO的,则具体的线程执行器是
NioEventLoop
),并且初始化一个 NioServerSocketChannel - NioServerSocketChannel 的 ChannelPipeline 中会被添加一个 ServerBootstrapAcceptor 的 ChannelHandler
- 当 selector 轮询出客户端读写事件(NioEventLoop 中的 run 方法中的逻辑)
- 调用 NioEventLoop.processSelectedKey方法处理
- 继续调用 AbstractNioChannel.NioUnsafe.read() 构建一个 NioSocketChannel (客户端管道)
- 然后调用 NioServerSocketChannel 的 pipeline.fireChannelRead(NioSocketChannel)处理
- 也就是调用 ServerBootstrapAcceptor.channelRead()处理,然后就会调用上文提到的
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
逻辑 - 然后就是触发业务层实现的 ChannelHandler 处理
你可以通过设置这个方法的断点io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
来分析整个流程的执行情况
1. 在方法的第一行设置断点
2. 启动一个netty写的nio server程序
3. 启动一个netty写的nio client程序连接2步的server端,然后 IDEA 的编辑器就会显示整个调用栈.