事件触发、传递、处理是DefaultChannelPipleline实现的另一个核心能力。在前面在章节中粗略地讲过了事件的处理流程,本章将会详细地分析其中的所有关键细节。这些关键点包括:
- 事件触发接口和对应的ChannelHandler处理方法。
- inbound事件的传递。
- outbound事件的传递。
- ChannelHandler的eventExecutor的分配。
事件的触发方法和处理方法
netty提供了三种触发事件的方式:通过Channel触发,通过ChannelPipleline触发,通过ChannelHandlerContext触发。
Channel触发
在netty源码解解析(4.0)-2 Chanel的接口设计这一章中,列出了Channel触发事件的所有方法。Channel定义的所有事件触发方法中,都是用来触发outbound事件的,只有read方法比较特殊,它直接触发outbound方法,如果能读到数据则会触发inbound方法。下面是Channel的事件触发方法,和ChannelHandler事件处理方法的对应关系。
outbound事件
Channel方法 | ChannelOutboundHandler方法 |
bind | bind(ChannelHandlerContext, SocketAddress, ChannelPromise) |
connect | connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise) |
disconnect | disconnect(ChannelHandlerContext, ChannelPromise) |
close | close(ChannelHandlerContext, ChannelPromise) |
deregister | deregister(ChannelHandlerContext, ChannelPromise) |
write | write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) |
flush | flush(ChannelHandlerContext ctx) |
writeAndFlush | 先调用write然后调用flush |
read | read(ChannelHandlerContext) |
inbound事件
Channel方法 | ChannelInboundHandler方法 |
read | channelRead(ChannelHandlerContext, Object) |
channelReadComplete(ChannelHandlerContext) |
Channel通过调用ChannelPipleline的同名方方法触发事件,以下是AbstractChannel实现的bind的方法
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
3 return pipeline.bind(localAddress, promise);
}
其他方法的实现和bind类似。
ChannelPipleline触发
在netty源码解解析(4.0)-8 ChannelPipeline的设计这一章中,列出了所有触发事件的方法。 ChannelPipleline的outbound事件的触发方法和处理方法的对应关系和Channel一样,这里就不再重复罗列。下面是inbound事件的触发方法和ChannelHandler事件处理方法的对应关系:
inbound事件
ChannelPipleline方法 | ChannelInboundHandler方法 |
fireChannelRegistered | channelRegistered(ChannelHandlerContext) |
fireChannelUnregistered | channelUnregistered(ChannelHandlerContext) |
fireChannelActive | channelActive(ChannelHandlerContext) |
fireChannelInactive | channelInactive(ChannelHandlerContext) |
fireChannelRead | channelRead(ChannelHandlerContext, Object) |
fireChannelReadComplete | channelReadComplete(ChannelHandlerContext) |
fireExceptionCaught | exceptionCaught(ChannelHandlerContext, Throwable) |
fireUserEventTriggered | userEventTriggered(ChannelHandlerContext, Object) |
fireChannelWritabilityChanged | channelWritabilityChanged(ChannelHandlerContext) |
在DefaultChannelPipleline实现中,事件触发是通过调用AbstractChannelHandlerContext的方法实现的。inbound事件的触发方式是调用对应的invokeXXX静态方法。例如: fireChannelRegistered方法会调用invokeChannelRegistered静态方法:
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
这里会把链表的头作为输入参数,表明inbound事件是从链表头开始处理。其他inbound事件触发方法的实现和这个类似。
outbound事件的触发方式是调用AbstractChannelHandlerContext的同名方法,例如bind的方法的实现:
@Override
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}
这调用链表尾的方法,表明outbind事件从链表尾开始处理。其他outbound事件的触发方法和这个类似。
ChannelHandlerContext触发
Channel的事件触发方法会调用DefaultChannelPipleline的触发方法,而DefaultChannelPipleline的触发方法调用AbstractChannelHandlerContext的触发方法。所以,不论是Channel还是ChannelPipleline,他们的事件触发能力都是AbstractChannelHandlerContext提供的,因此ChannelHandlerContext事件触发方法和ChannelHandler事件处理方法的对应关系和Channel,ChannelPipleline是一样的。
三种触发方法的差异
Channel只能触发outbound事件,事件从链表的tail开始,传递到head。ChannelPipleline既可以触发outbound事件,又能触发inbound事件,outbound事件的处理和Channel触发一样,inbound事件的从链表的head开始,传递到tail。ChannelHandlerContext触发方式最为灵活,如果调用ChannelHandlerContext的实例触发事件,outbound事件从这个实例的节点开始向head方向传递,inbound事件从这个实例的节点开始向tail传递,此外还可以调用AbstractChannelHandlerContext提供的静态方法从链表中的任意一个节点开始触发可处理事件。总结起来就是,Channel和ChannelPipleline触发的事件只能从链表的head或tail节点开始触发和传递事件,而ChannelHanderContext可以从链表中任何一个节点触发和传递事件。
事件的传递
事件传递的功能在AbstractChannelHandlerContext,这个类的定义如下:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext
inbound事件的触发和传递
每个inbound事件的触发传递实现包含3个方法,一个普通方法fireXXX,一个静态方法invokeXXX, 和一个普通方法invokeXXX。每一次inbound事件传递就是一轮fire-invoke-invoe的调用。下面是channelRegisterd事件的相关的代码。
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
} static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
} private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
这三个方法各有不同的职责:
- fireChannelRegistered调用findContextInbound找到链表上下一个ChannelInboundHandler类型的节点,并把这个节点作为参数传给静态方法invokeChannelRegistered。
- 静态invokeChannelRegistered负责调用普通invokeChannelRegistered方法,并确保这个方法在eventExecutor中调用。
- 普通invokeChannelRegistered负责调用handler对应的事件处理方法,处理异常。如果这个handler对应的handlerAdded方法没有完成调用,这handler还不能处理事件,跳过这节点,继续下一轮fire-invoke-invoke循环。
在普遍invoveChannelRegistered中,正常情况下会调用handler的事件处理方法,这里是handler的channelRegistered方法。如果事件处理方法没有调用对应的fire方法,那么这个事件的传递就算终止了。所以事件传递还需要handler的配合。
inbound事件传递的关键实现在findContextInbound中,这个方法是实现如下:
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
这里使用next向后遍历节点,使用inbound属性判断节点持有的handler是否ChannelInboundHandler类型,直到找到一个合适的节点为止。如果没找到,则返回最后一个节点。这样就对链表中最后一个节点提出了一些特殊的要求:必须是持有ChannelInboundHandler的handler并且;并且要负责终止事件传递。DefaultPipleline.TailContext类的实现就满足了这两点要求。
outbound事件的触发和传递
每个outbound事件的触发和传递包含两点方法: XXX, invokeXXX。 下面以bind事件为例看看outbound事件的触发和传递:
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
} final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
} private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
bind方法调用findContextOutbound找到链表上下一个持有ChannelOutboundHandler类型handler的节点,并且确保invokeBind方法在eventExecutor中执行。invokeBind方法负责调用handler对应的事件处理方法,这里是调用handler的bind方法。handler的bind方法中需要调用节点bind方法,这个事件才能继续传递下去。
outbound事件传递的关键实现在findContextOutbound中,这个方法的实现如下:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
这里使用链表的prev向前遍历,使用outbound属性判断节点持有的handler是否ChannelOutboundHandler类型,直到找到一个为止。如果没找到,将会返回链表头的节点。这样对链表头的节点也提出了特殊的要求:它持有的handler必须是ChannelOutboundHandler类型。
链表节点持有的handler类型
在事件的传递和处理过程中,必须把inbound事件交给ChannelInboundChandler类型的handler处理,把outbound事件交给ChannelOutboundChandler类型的handler处理。为了判断handler类型,定义了两个boolean类型的属性: inbound, outbound。inbound==true表示handler是ChannelInboundHandler类型, outbound==true表示handler是ChannelOutboundHandler类型。这两个值在AbstractChannelHandlerContext构造方法中初始化,初始化值来自输入的参数。DefaultChannelHandlerContext在构造方法中把这两个参数的值传入。
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
使用isInbound的的值设置inbound,isOutbound的值设置outbound。这两方法只是简单的使用了instanceof运算符。
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
} private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
为ChannelHandler分配eventExecutor
把一个channleHandler添加到ChannelPipleline中时,ChannelPipleline会给它分配一个eventExecutor, 它的所有的事件处理方法都会在这个executor中执行。如果使用带group参数的add方法,executor会从group中取,否则会把channel的eventLoop当成这个handler的executor使用。 从group中分配execuor的操作在创建持有handler的链表节点时完成:
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
childExecutor方法负责从group中取出一个executor分配给handler:
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>();
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}
实际的分配操作要稍微复杂一些。取决于channel的ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP设置,如果没有设置这个选项或设置成true, 表示每个channelPipleline只能从一个group中分配一个executor, 这是默认行为,实现代码是地9行-19行,这种情况下每一个使用了同一个group的handler,都会被分配到同一个executor中。如果把这个选择设置成false,这是简单地从group中取出一个executor,实现代码是地7行,这种情况下,每一个使用了同一个group的handler被均匀地分配到group中的每一个executor中。
如果没有指定group,会在地3行退出,这里没有分配executor。这种情况会在AbstractChannelHandlerContext的executor方法中得到妥善处理:
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
第4行,处理了没分配executor的情况,调用channel的eventLoop方法得到channel的eventLoop。