public final class SimpleServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { } }); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
这次我们重点来看ChannelFuture f = b.bind().sync(); bind()方法使netty开始对本地端口的绑定与监听。
public ChannelFuture bind() { //检查group和channelFactory属性是否为null validate(); SocketAddress localAddress = this.localAddress; if (localAddress == null) { throw new IllegalStateException("localAddress not set"); } return doBind(localAddress); }
private ChannelFuture doBind(final SocketAddress localAddress) { //1.初始化channel并将初始化好的channel注册到事件循环 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise; /** * 2.判断初始化且注册任务是否已经完成,如果已经完成,则调用bind端口方法 * 注意这里regFuture.isDone()方法,返回ture,只是标示任务是否已经完成,完成的情况可能有: * 1.正常终止、2.异常、3.取消 */ if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); } return promise; }
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { //初始化Channel实例:对channel实例进行相关参数设置。这里调用的是对应子类的实现 init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelFuture regFuture = group().register(channel); //如果注册失败,regFuture.cause会返回失败的异常对象 //如果注册出现异常,并channel已经注册,那么关闭。如果没有注册,那么强行关闭 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
@Override void init(Channel channel) throws Exception { //将传入Bootstrap final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //将AbstractBootstrap的Hanlder添加到NioServerketChannel //就是把handler(new SimpleServerHandler())这个handler添加到NioServerketChannel的pipline ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } /** * ChannelInitializer是一个特殊的ChannelInboundHandler,当channelRegistered事件触发后, * 会调用initChannel方法,调完后,这个handler会从piplne中删除 */ p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
@Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { ChannelPipeline pipeline = ctx.pipeline(); boolean success = false; try { //调用initChannel方法进行ChannelPipline初始化 initChannel((C) ctx.channel()); //从pipline删除这个handler pipeline.remove(this); //链式触发fireChannelRegistered事件 ctx.fireChannelRegistered(); success = true; } catch (Throwable t) { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); } finally { if (pipeline.context(this) != null) { pipeline.remove(this); } if (!success) { ctx.close(); } } }
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } });
我们再回到ServerBootstrapAcceptor,ServerBootstrapAcceptor重写了channelRead(ctx,msg)方法,主要的作用是当有client连接上Server的时,pipline触发channelRead 事件,然后会给pipline添加用户自己的handler。其中第二个形参msg 在Nio中,是Server accpet客户端的connect连接后,产生的NioSocketChannel对象。
init()方法完成通道的配置之后,我们继续initAndRegister的之后步骤ChannelFuture regFuture = group().register(channel);通道完成了后,无非把通道绑定到EventLoopGroup上,(这里是group,而非childgroup,也就是最上文的bossGroup)。
@Override public EventLoop next() { return (EventLoop) super.next(); } @Override public ChannelFuture register(Channel channel) { return next().register(channel); } @Override public ChannelFuture register(Channel channel, ChannelPromise promise) { return next().register(channel, promise); }
public EventExecutor next() { return this.children[Math.abs(this.childIndex.getAndIncrement() % this.children.length)]; }
由于NioEventLoop extends SingleThreadEventLoop,NioEventLoop没有重写该方法,因此看 SingleThreadEventLoop类中的register方法。
@Override public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; }具体调用了usafe类的register()方法,跟踪代码,具体逻辑的主要实现 在AbstractNioUnsafe的doRegister()方法。
@Override protected void doRegister() throws Exception { boolean selected = false; for (; ; ) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { /** * CancelledKeyException抛出的场景: * 当前Channel已经注册了给定的selector,但是相应个key却已经被取消了 */ if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
selectionKey = javaChannel().register(eventLoop().selector, 0, this);就完成了ServerSocketChannel注册到Selector中。
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. //判断注册事件是否成功,如果注册成功,则把bind任务丢到任务队列 channel.eventLoop().execute(new Runnable() { @Override public void run() { //channel注册到eventloop成功后,才开始调用Channel的bind方法 if (regFuture.isSuccess()) { //调用Channel的bind方法 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
@Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } validatePromise(promise, false); //从tail ---> head找Outbound执行 final DefaultChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
private DefaultChannelHandlerContext findContextOutbound() { DefaultChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }这里链式调用每一个outboundhandler的bind函数,最后到head的headhandler中bind实现
@Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { if (!ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() && Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); try { //调用子类的doBind()方法,如果使用的NIO那么这里调用的就是NioServerSocketChannel实现的doBind()方法 doBind(localAddress); } catch (Throwable t) { promise.setFailure(t); closeIfClosed(); return; } promise.setSuccess(); //判断端口是否已经成功,成功则触发fireChannelActive事件 if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } }
//调用ServerSocketChannel的bind方法 @Override protected void doBind(SocketAddress localAddress) throws Exception { // javaChannel().socket().bind(localAddress, config.getBacklog()); }最后无非调用底层ServerSocketChannel的bind方法~已经跑到jdk层面了