启动服务(上)服务端:NioServerSocketChannel 是什么时候激活的
Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)
本文会从请求处理的角度分析 Netty 源码,包含以下 7 个过程:启动服务、构建连接、接收数据、业务处理、发送数据、断开连接、关闭服务。
Netty 服务端启动,最主要的工作就是绑定端口、构建连接。
1. 主线分析
我们先回顾一下 Server 启动的代码:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // 配置参数Server
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { ... });
ChannelFuture f = b.bind(8888).sync(); // 启动服务,绑定端口
f.channel().closeFuture().sync(); // 关闭服务
1.1 主线
ServerBootstrap 启动的核心方法是 bind。服务器在启动时主要工作如下:
- 初始化:最主要的是初始化并配置 ServerSocketChannel,包括 TCP 参数、Handler 的配置。
- 注册:包括注册到 NioEventLoop 和将 Channel 注册到 Selector 两部分工作。需要注意,ServerSocketChannel 注册完成后,不会立刻注册感兴趣的事件来启动任务。
- 绑定端口:只有注册完成后才会启动端口绑定,并在端口绑定完成后注册 OP_ACCEPT 启动任务。
- 启动任务:在 Selector 上注册 OP_ACCEPT 事件,开始构建连接。
ServerBootstrap#bind 主要的执行过程如下:
ServerBootstrap#bind
-> initAndRegister
-> ServerBootstrap#init # 初始化
-> AbstractUnsafe#register # 注册
-> doBind0
-> AbstractUnsafe#bind # 绑定端口
-> AbstractUnsafe#beginRead # 启动任务,通过pipeline.fireChannelActive()
以上的初始化启动工作,分别是在 main thread 和 boss thread 两个线程上执行。
-
main thread 线程
创建 Selector。
创建 ServerSocketChannel,并初始化。
将 Channel 注册到 EventLoopGroup。
-
boss thread 线程
- 将 Channel 注册到 EventLoopGroup 对应的 Selector。
- 绑定地址启动。
- Channel 将 OP_ACCEPT 事件注册到 Selector。
注意:Netty 为了避免上下文切换,采用了一种局部串行化的执行方式。也就是将任务提交到 Channel 注册的 EventLoopGroup 上执行。对于 ServerSocketChannel 而言就是 EventLoop。
1.2 知识点
(1)启动服务的本质
- Selector:Selector selector = SelectorProviderImpl.openSelector();
- ServerSocketChannel:ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel();
- 注册 Channel 到 Selector:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
- 绑定端口:javaChannel().bind(localAddress, config.getBacklog());
- 注册感兴趣事件到 Selector:javaChannel().bind(localAddress, config.getBacklog());
(2)Selector 创建
每个 NioEventLoop 对应一个自己的 Selector,在创建 NioEventLoop 时会创建 Selector。一旦有 Channel 注册到 NioEventLoop 上,就会通过调用 startThread 方法启动该线程。
(3)感兴趣事件注册时机
- Channel 注册时都是 0:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this) 。
- 感兴趣事件注册:当 pipeline.fireChannelActive() 时,调用 beginRead 注册感兴趣的事件。对于 NioServerSocketChannel 而言,就是注册 OP_ACCEPT 事件,NioSocketChannel 则是 OP_READ 事件。
2. 源码分析
图1:服务端启动流程
- 初始化:init() 方法,对应第 1~6 步。先通过反射创建 ServerSocketChannel,再通过 init 进行初始化。初始化主要是配置 ServerSocketChannel 的 TCP 参数、附加属性、自定义 Handler、以及构建连接的 ServerBootstrapAcceptor。
- 注册:register() 方法,对应第 7~10 步。包含到 EventLoop 和 Selector 的注册。其中 doRegister 调用 javaChannal.register() 注册到 Selector。
- 绑定端口:doBind0() 方法,对应第 11~15 步。绑定端口后,触发 pipeline#fireChannelActive 开始接收连接。其中 doBind 调用 javaChannal.bind() 绑定端口。
- 启动任务:beginRead() 方法。通过 pipeline#fireChannelActive 方法调用 beginRead,将 OP_ACCEPT 注册到 Selector 上,此时服务端已经启动成功,可以接收客户端的连接。其中 beginRead 调用 selectionKey.interestOps 注册感兴趣的事件。
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1. 初始化和注册。initAndRegister是异步执行
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 2. 绑定端口:注意doBind0必须在initAndRegister执行完成后
// doBind0是同步执行,也就是只有javaChannel.bind之后,才会注册OP_ACCEPT事件
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
2.1 初始化
Server 看需要配置的参数有:
- 线程池:服务端通常会构建两个线程池。bossGroup 负责接收连接,childGroup 负责处理网络 IO 以及系统 Task。
- TCP 参数和其它附加配置。
- 自定义 Handler。如日志输出等。
ServerBootstrap#initAndRegister
-> ChannelFactory#newChannel # 通过反射创建连接
-> init
-> setChannelOptions # TCP参数设置
-> setAttributes # Channel附加属性设置
-> ChannelPipeline#addLast(handler) # ServerChannel配置的Handler
-> ChannelPipeline#addLast(ServerBootstrapAcceptor) # 接收连接的Handler
说明: initAndRegister 先通过反射创建 ServerSocketChannel,再通过 init 进行初始化。初始化主要是配置 ServerSocketChannel 的 TCP 参数、附加属性、自定义 Handler、以及接收连接的 Handler。
void init(Channel channel) throws Exception {
// 1. 配置 TCP 参数和附加属性
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
...
// 2. 配置Handler,其中ServerBootstrapAcceptor为接收客户端的Handler
pipeline.addLast(config.handler());
pipeline.addLast(new ServerBootstrapAcceptor(
channel, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
说明:ServerBootstrapAcceptor 是专门用于接收客户端请求的 Handler。pipeline.addLast 添加 ChannelInitializer 时,ChannelInitializer 执行 initChannel 方法后,会将 ChannelInitializer 自身从 pipeline 中移除。Hander 执行过程如下:
思考1:ServerSocketChannel 创建为什么要用 provider.openServerSocketChannel()?
private static ServerSocketChannel newSocket(SelectorProvider provider) {
// ServerSocketChannel.open() 每秒创建 5000 个连接时性能会下将 1%
// https://github.com/netty/netty/issues/2308
return provider.openServerSocketChannel();
}
2.2 注册
ServerBootstrap#initAndRegister
-> EventLoopGroup#register
-> SingleThreadEventLoop#register
-> AbstractUnsafe#register √
说明: 其实 Channel 注册包含以下几件事:
绑定线程:将 Channel 注册到 EventLoop。
绑定 Selector:将 Channel 注册到 EventLoop 对应的 Selector。
激活 Channel:如果 Channel 已经是 Active 状态,注册感兴趣的事件到 Selector 上,启动任务。需要注意的是,对于 ServerSocket 而言,此时不可能是 Active 状态,只有 bind 成功后才会注册 OP_ACCEPT 来接收客户端连接。
config().group().register(channel)
注意:这里的 config().group() 是 bossGroup。Channel 是如何注册到 NioEventLoopGroup 是的详见:https://www.cnblogs.com/binarylei/p/10135712.html
2.3 绑定端口
ServerBootstrap#doBind0
-> AbstractChannel#bind
-> AbstractUnsafe#bind √
-> NioServerSocketChannel#doBind
-> DefaultChannelPipeline#fireChannelActive
-> AbstractUnsafe#beginRead √
说明: Channel 绑定端口成功后会通过 pipeline#fireChannelActive 方法调用 beginRead,将 OP_ACCEPT 注册到 Selector 上,此时服务端已经启动成功,可以接收客户端的连接。
// AbstractUnsafe#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
doBind(localAddress); // javaChannel.bind()
pipeline.fireChannelActive(); // beginRead()
}
NioServerSocketChannel#doBind 则会调用最底层的 javaChannel.bind() 绑定端口。
2.4 激活 Channel
pipeline#fireChannelActive[tail...head] -> head#readIfIsAutoRead
-> channel#read -> pipeline#read[head...tail] -> head#read
-> AbstractUnsafe#beginRead
-> AbstractNioChannel#doBeginRead
说明:pipeline#fireChannelActive 最终会调用 doBeginRead 方法,将 OP_ACCEPT 注册到 Selector 上,从而启动服务。
// HeadContext#readIfIsAutoRead
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
说明: DefaultChannelConfig 中所有的 Channel 的 autoRead 属性都默认是 1,也就是 true。channel.read 通过 pipeline 从 tail -> head 传播,调用 head.read 方法时最终调用 unsafe.beginRead()。
unsafe.beginRead 直接调用 doBeginRead 方法激活 Channel。事实上,也就是将感兴趣的事件注册到 Selector 上。对于 NioServerSocketChannel 而言,就是注册 OP_ACCEPT 事件,NioSocketChannel 则是 OP_READ 事件。
@Override
protected void doBeginRead() throws Exception {
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
思考1:channel.read()、pipeline.read()、ctx.read()、unsafe.read() 区别?
- channel.read():直接调用 pipeline.read()。
- pipeline.read():调用 tail.read(),从 head 或 tail 经历全部的 Handler。实际上,最后的 head.read() 调用 unsafe.beginRead(),这个方法会注册 OP_ACCEPT 或 OP_READ 事件从而激活 Channel。
- ctx.read():从当前 ctx 开始之后的全部的 Handler。如果发送数据,需要使用 ctx.write 而不是 ctx.channel().write。
- unsafe.read():最底层的 API。和 unsafe.beginRead() 不同,unsafe#read 会真正从 socket revbuf 读取数据。
每天用心记录一点点。内容也许不重要,但习惯很重要!