通常我们用ServerBootstrap引导服务器,
public final class SimpleServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { } }); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
其中group()无非传入两个EventLoopGroup,一个负责接收连接,一个负责处理连接上的响应。这之后的逻辑基本上在EventLoop跟ChannelPipeLine中理得差不多了。
这次我们重点来看ChannelFuture f = b.bind().sync(); bind()方法使netty开始对本地端口的绑定与监听。
bind逻辑的实现在其超类AbstractBootstrap中
public ChannelFuture bind() { //检查group和channelFactory属性是否为null validate(); SocketAddress localAddress = this.localAddress; if (localAddress == null) { throw new IllegalStateException("localAddress not set"); } return doBind(localAddress); }
其中validate()方法是对netty中的group跟channelFactory进行非null验证,保证都是已经配置完成的(gorup是在.goup()中绑定的,channelFactory是在第一次.channel时候,生成静态BootstrapChannelFactory配置进去)。然后判断本地地址是否已经绑定,最后调用doBind()继续后面的绑定。
private ChannelFuture doBind(final SocketAddress localAddress) { //1.初始化channel并将初始化好的channel注册到事件循环 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise; /** * 2.判断初始化且注册任务是否已经完成,如果已经完成,则调用bind端口方法 * 注意这里regFuture.isDone()方法,返回ture,只是标示任务是否已经完成,完成的情况可能有: * 1.正常终止、2.异常、3.取消 */ if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); } return promise; }
其中有涉及到channelFutrue的内容,会在后面的博文中详细介绍。
首先调用initAndRegister(),将初始化好的Channel注册到EventLoopGroup中。
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { //初始化Channel实例:对channel实例进行相关参数设置。这里调用的是对应子类的实现 init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelFuture regFuture = group().register(channel); //如果注册失败,regFuture.cause会返回失败的异常对象 //如果注册出现异常,并channel已经注册,那么关闭。如果没有注册,那么强行关闭 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
先通过反射得到channel实例,再调用具体子类的init()方法,对channel实例进行相关参数设置。
@Override void init(Channel channel) throws Exception { //将传入Bootstrap final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //将AbstractBootstrap的Hanlder添加到NioServerketChannel //就是把handler(new SimpleServerHandler())这个handler添加到NioServerketChannel的pipline ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } /** * ChannelInitializer是一个特殊的ChannelInboundHandler,当channelRegistered事件触发后, * 会调用initChannel方法,调完后,这个handler会从piplne中删除 */ p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
首先在服务端通过加锁的方式配置options跟atts属性,取出NioServerChannel对应的pipline,将之前配置的handler添加到pipline中。
最后通过添加ChannelInitializer(类似于占位符)的方式,向pipline中添加ServerBootstrapAcceptor实例。
我们先看看ChannelInitializer的巧妙实现方式,以下是它的channelRegistered方法,也就是在之后通道注册之后,会链式调用pipline中的相应方法,也就会调用以下方法。
@Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { ChannelPipeline pipeline = ctx.pipeline(); boolean success = false; try { //调用initChannel方法进行ChannelPipline初始化 initChannel((C) ctx.channel()); //从pipline删除这个handler pipeline.remove(this); //链式触发fireChannelRegistered事件 ctx.fireChannelRegistered(); success = true; } catch (Throwable t) { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); } finally { if (pipeline.context(this) != null) { pipeline.remove(this); } if (!success) { ctx.close(); } } }
无非先initChannel()这是我们刚刚在ServerBootstrap.init()中实现的方法,再之后将ChannelInitializer这个handler从链中删除,继续调用后面的相应的事件。(其实通过这个类似于占位符的方式我们可以在引导过程中添加多个channelhandler,无非在initChannel()的实现中完成)。
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } });
我们再回到ServerBootstrapAcceptor,ServerBootstrapAcceptor重写了channelRead(ctx,msg)方法,主要的作用是当有client连接上Server的时,pipline触发channelRead 事件,然后会给pipline添加用户自己的handler。其中第二个形参msg 在Nio中,是Server accpet客户端的connect连接后,产生的NioSocketChannel对象。
init()方法完成通道的配置之后,我们继续initAndRegister的之后步骤ChannelFuture regFuture = group().register(channel);通道完成了后,无非把通道绑定到EventLoopGroup上,(这里是group,而非childgroup,也就是最上文的bossGroup)。
调用了NioEventGroup的register(),具体的逻辑在其超类MultithreadEventLoopGroup中。
@Override public EventLoop next() { return (EventLoop) super.next(); } @Override public ChannelFuture register(Channel channel) { return next().register(channel); } @Override public ChannelFuture register(Channel channel, ChannelPromise promise) { return next().register(channel, promise); }
无非调用next().register(),我们先来看super.next()的实现
public EventExecutor next() { return this.children[Math.abs(this.childIndex.getAndIncrement() % this.children.length)]; }
由于EventLoopGroup在初始化时候创建了singleThreadEventLoop数组,其实就是NioEventLoop(详细见之前的博客EventLoop解析),所以通过如上方式选择相应的NioEventLoop返回,并调用其register(),childIndex是一个AtomicInteger类。
由于NioEventLoop extends SingleThreadEventLoop,NioEventLoop没有重写该方法,因此看 SingleThreadEventLoop类中的register方法。
@Override public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; }具体调用了usafe类的register()方法,跟踪代码,具体逻辑的主要实现 在AbstractNioUnsafe的doRegister()方法。
@Override protected void doRegister() throws Exception { boolean selected = false; for (; ; ) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { /** * CancelledKeyException抛出的场景: * 当前Channel已经注册了给定的selector,但是相应个key却已经被取消了 */ if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
selectionKey = javaChannel().register(eventLoop().selector, 0, this);就完成了ServerSocketChannel注册到Selector中。
到这里基本上把initAndRegister介绍完了。
确保注册与初始化都已经完成,调用doBind0();(AbstractBootstrap.doBind0())
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. //判断注册事件是否成功,如果注册成功,则把bind任务丢到任务队列 channel.eventLoop().execute(new Runnable() { @Override public void run() { //channel注册到eventloop成功后,才开始调用Channel的bind方法 if (regFuture.isSuccess()) { //调用Channel的bind方法 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
因为之前initAndRegister已经完成,所以eventloop线程模型已经建立(channel已经注册至eventLoop中),这里只需将bind跟添加监听器任务扔至Task队列中,异步地完成。
channel的bind函数调用了pipline的bind调用了tail的bind,最终调用了跟tailhandler绑定DefaultChannelHandlerContext的bind函数
@Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } validatePromise(promise, false); //从tail ---> head找Outbound执行 final DefaultChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
其中findContextOutBound()函数无非找到下一个类型为outbound的handler
private DefaultChannelHandlerContext findContextOutbound() { DefaultChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }这里链式调用每一个outboundhandler的bind函数,最后到head的headhandler中bind实现
@Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
具体逻辑在AbstractUnsafe.bind()
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { if (!ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() && Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); try { //调用子类的doBind()方法,如果使用的NIO那么这里调用的就是NioServerSocketChannel实现的doBind()方法 doBind(localAddress); } catch (Throwable t) { promise.setFailure(t); closeIfClosed(); return; } promise.setSuccess(); //判断端口是否已经成功,成功则触发fireChannelActive事件 if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } }
最后调用了子类的dobind(),在nioServerSocketChannel中实现
//调用ServerSocketChannel的bind方法 @Override protected void doBind(SocketAddress localAddress) throws Exception { // javaChannel().socket().bind(localAddress, config.getBacklog()); }最后无非调用底层ServerSocketChannel的bind方法~已经跑到jdk层面了