netty源码解解析(4.0)-10 ChannelPipleline的默认实现--事件传递及处理

时间:2022-04-12 08:19:11

  事件触发、传递、处理是DefaultChannelPipleline实现的另一个核心能力。在前面在章节中粗略地讲过了事件的处理流程,本章将会详细地分析其中的所有关键细节。这些关键点包括:

  • 事件触发接口和对应的ChannelHandler处理方法。
  • inbound事件的传递。 
  • outbound事件的传递。
  • ChannelHandler的eventExecutor的分配。

  

  事件的触发方法和处理方法

  netty提供了三种触发事件的方式:通过Channel触发,通过ChannelPipleline触发,通过ChannelHandlerContext触发。

  

  Channel触发

  在netty源码解解析(4.0)-2 Chanel的接口设计这一章中,列出了Channel触发事件的所有方法。Channel定义的所有事件触发方法中,都是用来触发outbound事件的,只有read方法比较特殊,它直接触发outbound方法,如果能读到数据则会触发inbound方法。下面是Channel的事件触发方法,和ChannelHandler事件处理方法的对应关系。

  outbound事件

Channel方法 ChannelOutboundHandler方法
bind bind(ChannelHandlerContext, SocketAddress, ChannelPromise)  
connect connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
disconnect disconnect(ChannelHandlerContext, ChannelPromise)
close close(ChannelHandlerContext, ChannelPromise)
deregister deregister(ChannelHandlerContext, ChannelPromise)
write write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
flush flush(ChannelHandlerContext ctx)
writeAndFlush 先调用write然后调用flush
read read(ChannelHandlerContext)

  inbound事件

Channel方法 ChannelInboundHandler方法
read channelRead(ChannelHandlerContext, Object)
channelReadComplete(ChannelHandlerContext)

  Channel通过调用ChannelPipleline的同名方方法触发事件,以下是AbstractChannel实现的bind的方法

 @Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
3  return pipeline.bind(localAddress, promise);
}

  其他方法的实现和bind类似。

  

  ChannelPipleline触发

  在netty源码解解析(4.0)-8 ChannelPipeline的设计这一章中,列出了所有触发事件的方法。 ChannelPipleline的outbound事件的触发方法和处理方法的对应关系和Channel一样,这里就不再重复罗列。下面是inbound事件的触发方法和ChannelHandler事件处理方法的对应关系:

  inbound事件

ChannelPipleline方法 ChannelInboundHandler方法
fireChannelRegistered channelRegistered(ChannelHandlerContext) 
fireChannelUnregistered channelUnregistered(ChannelHandlerContext)
fireChannelActive channelActive(ChannelHandlerContext)
fireChannelInactive channelInactive(ChannelHandlerContext)
fireChannelRead channelRead(ChannelHandlerContext, Object)
fireChannelReadComplete channelReadComplete(ChannelHandlerContext)
fireExceptionCaught exceptionCaught(ChannelHandlerContext, Throwable)
fireUserEventTriggered userEventTriggered(ChannelHandlerContext, Object)
fireChannelWritabilityChanged channelWritabilityChanged(ChannelHandlerContext)

  在DefaultChannelPipleline实现中,事件触发是通过调用AbstractChannelHandlerContext的方法实现的。inbound事件的触发方式是调用对应的invokeXXX静态方法。例如: fireChannelRegistered方法会调用invokeChannelRegistered静态方法:

 @Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}

  这里会把链表的头作为输入参数,表明inbound事件是从链表头开始处理。其他inbound事件触发方法的实现和这个类似。

  outbound事件的触发方式是调用AbstractChannelHandlerContext的同名方法,例如bind的方法的实现:

     @Override
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}

  这调用链表尾的方法,表明outbind事件从链表尾开始处理。其他outbound事件的触发方法和这个类似。

  ChannelHandlerContext触发

  Channel的事件触发方法会调用DefaultChannelPipleline的触发方法,而DefaultChannelPipleline的触发方法调用AbstractChannelHandlerContext的触发方法。所以,不论是Channel还是ChannelPipleline,他们的事件触发能力都是AbstractChannelHandlerContext提供的,因此ChannelHandlerContext事件触发方法和ChannelHandler事件处理方法的对应关系和Channel,ChannelPipleline是一样的。

  

  三种触发方法的差异

  Channel只能触发outbound事件,事件从链表的tail开始,传递到head。ChannelPipleline既可以触发outbound事件,又能触发inbound事件,outbound事件的处理和Channel触发一样,inbound事件的从链表的head开始,传递到tail。ChannelHandlerContext触发方式最为灵活,如果调用ChannelHandlerContext的实例触发事件,outbound事件从这个实例的节点开始向head方向传递,inbound事件从这个实例的节点开始向tail传递,此外还可以调用AbstractChannelHandlerContext提供的静态方法从链表中的任意一个节点开始触发可处理事件。总结起来就是,Channel和ChannelPipleline触发的事件只能从链表的head或tail节点开始触发和传递事件,而ChannelHanderContext可以从链表中任何一个节点触发和传递事件。

  事件的传递

  事件传递的功能在AbstractChannelHandlerContext,这个类的定义如下:

    abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext

  inbound事件的触发和传递

  每个inbound事件的触发传递实现包含3个方法,一个普通方法fireXXX,一个静态方法invokeXXX, 和一个普通方法invokeXXX。每一次inbound事件传递就是一轮fire-invoke-invoe的调用。下面是channelRegisterd事件的相关的代码。

     @Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
} static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
} private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}

  这三个方法各有不同的职责:

  • fireChannelRegistered调用findContextInbound找到链表上下一个ChannelInboundHandler类型的节点,并把这个节点作为参数传给静态方法invokeChannelRegistered。
  • 静态invokeChannelRegistered负责调用普通invokeChannelRegistered方法,并确保这个方法在eventExecutor中调用。
  • 普通invokeChannelRegistered负责调用handler对应的事件处理方法,处理异常。如果这个handler对应的handlerAdded方法没有完成调用,这handler还不能处理事件,跳过这节点,继续下一轮fire-invoke-invoke循环。

  在普遍invoveChannelRegistered中,正常情况下会调用handler的事件处理方法,这里是handler的channelRegistered方法。如果事件处理方法没有调用对应的fire方法,那么这个事件的传递就算终止了。所以事件传递还需要handler的配合。

  inbound事件传递的关键实现在findContextInbound中,这个方法是实现如下:

     private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}

  这里使用next向后遍历节点,使用inbound属性判断节点持有的handler是否ChannelInboundHandler类型,直到找到一个合适的节点为止。如果没找到,则返回最后一个节点。这样就对链表中最后一个节点提出了一些特殊的要求:必须是持有ChannelInboundHandler的handler并且;并且要负责终止事件传递。DefaultPipleline.TailContext类的实现就满足了这两点要求。

  

  outbound事件的触发和传递

  每个outbound事件的触发和传递包含两点方法: XXX, invokeXXX。 下面以bind事件为例看看outbound事件的触发和传递:

     @Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
} final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
} private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}

  bind方法调用findContextOutbound找到链表上下一个持有ChannelOutboundHandler类型handler的节点,并且确保invokeBind方法在eventExecutor中执行。invokeBind方法负责调用handler对应的事件处理方法,这里是调用handler的bind方法。handler的bind方法中需要调用节点bind方法,这个事件才能继续传递下去。

  outbound事件传递的关键实现在findContextOutbound中,这个方法的实现如下:

     private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}

  这里使用链表的prev向前遍历,使用outbound属性判断节点持有的handler是否ChannelOutboundHandler类型,直到找到一个为止。如果没找到,将会返回链表头的节点。这样对链表头的节点也提出了特殊的要求:它持有的handler必须是ChannelOutboundHandler类型。

  

  链表节点持有的handler类型

  在事件的传递和处理过程中,必须把inbound事件交给ChannelInboundChandler类型的handler处理,把outbound事件交给ChannelOutboundChandler类型的handler处理。为了判断handler类型,定义了两个boolean类型的属性: inbound, outbound。inbound==true表示handler是ChannelInboundHandler类型, outbound==true表示handler是ChannelOutboundHandler类型。这两个值在AbstractChannelHandlerContext构造方法中初始化,初始化值来自输入的参数。DefaultChannelHandlerContext在构造方法中把这两个参数的值传入。

     DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}

  使用isInbound的的值设置inbound,isOutbound的值设置outbound。这两方法只是简单的使用了instanceof运算符。

     private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
} private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}

  为ChannelHandler分配eventExecutor

  把一个channleHandler添加到ChannelPipleline中时,ChannelPipleline会给它分配一个eventExecutor, 它的所有的事件处理方法都会在这个executor中执行。如果使用带group参数的add方法,executor会从group中取,否则会把channel的eventLoop当成这个handler的executor使用。 从group中分配execuor的操作在创建持有handler的链表节点时完成:

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

  childExecutor方法负责从group中取出一个executor分配给handler:

     private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>();
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}

  实际的分配操作要稍微复杂一些。取决于channel的ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP设置,如果没有设置这个选项或设置成true,  表示每个channelPipleline只能从一个group中分配一个executor,  这是默认行为,实现代码是地9行-19行,这种情况下每一个使用了同一个group的handler,都会被分配到同一个executor中。如果把这个选择设置成false,这是简单地从group中取出一个executor,实现代码是地7行,这种情况下,每一个使用了同一个group的handler被均匀地分配到group中的每一个executor中。

  如果没有指定group,会在地3行退出,这里没有分配executor。这种情况会在AbstractChannelHandlerContext的executor方法中得到妥善处理:

     @Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}

  第4行,处理了没分配executor的情况,调用channel的eventLoop方法得到channel的eventLoop。