本节主要讨论了 Netty 的数据处理组件 ChannelHandler。
一、Channel 生命周期
Channel 有个简单但强大的状态模型,下面是 Channel 的四个状态:
Channel 的正常生命周期如下图,当这些状态发生变化时,对应的事件将会生成。
二、ChannelHandler 生命周期
ChannelHandler 定义的生命周期如下,当 ChannelHandler 添加到 ChannelPipeline,或者从 ChannelPipeline 移除后,这些将会调用。
三、ChannelInbounderHandler
ChannelInboundHandler 的生命周期方法如下,当接收到数据或者与之关联的 Channel 状态改变时调用。
可以看到,这些方法与 Channel 的生命周期接近。
其中 channelRead 和 channelReadComplete 方法在读操作开始和完成时调用。
另外,channelWritabilityChanged 方法在 channel 写状态发生变化时调用。
四、ChannelOutboundHandler
ChannelOutboundHandler 提供了出站操作时调用的方法。
另外,它具有在请求时演示操作或者事件的能力。比如,当你在写数据到远程的过程中被意外暂停,你可以延时进行刷新操作,然后在迟些时候继续。
下面是提供的方法(继承自 ChannelHandler 未列出来):
五、资源管理
Netty 使用引用计数器来处理池化的 ByteBuf。所以当 ByteBuf 完全处理后,要确保引用计数器被调整。
当你覆盖了 channelRead 操作,在处理完消息之后,需要释放它,如下:
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
/**
* 收到数据时调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 丢弃收到的数据
((ByteBuf) msg).release();
}
}
Netty 提供了一个特殊的称为 SimpleChannelInboundHander 的实现,该实现将在用户通过 channelRead0() 方法处理完数据之后,自动释放该消息。
当你在处理写操作,并丢弃消息时,你需要通知 ChannelPromise 数据已经被处理,如下:
public class DiscardOutbounderHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx,
Object msg, ChannelPromise promise) throws Exception {
ReferenceCountUtil.release(msg); // 释放资源
promise.setSuccess(); // 通知 ChannelPromise 数据已经被处理
}
}
六、使用 ChannelHandler
下图展示了 ChannelPipeline,Channel,ChannelHandler 和 ChanelHandlerContext 的关系:
- Channel 绑定到 ChannelPipeline
- ChannelPipeline 绑定到包含 ChannelHandler 的 Channel
- ChannelHandler
- 当添加 ChannelHandler 到 ChannelPipeline 时,ChannelHandlerContext 被创建
如果要完成一个写操作,有以下两种方式:
第一种就是从 ChannelHandlerContext 获取到 Channel 的引用,执行 Channel 上的 write() 方法,如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 丢弃收到的数据
((ByteBuf) msg).release(); Channel channel = ctx.channel(); // 获取channel引用
// 通过channel写缓存
channel.write(Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8));
}
第二种是从 ChannelHandlerContext 获取到 ChannelPipeline 的引用,执行 ChannelPipeline 上的 write() 方法,如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ChannelPipeline pipeline = ctx.pipeline(); // 获取pipeline引用
// 通过pipeline写缓存
pipeline.write(Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8));
}
写操作流程如下:
- 事件传递给 ChannelPipeline 的第一个 ChannelHandler
- ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的下一个 ChannelHandler
- 与 2 类似
但是有些时候不希望总是从 ChannelPipeline 的第一个 ChannelHandler 开始事件,我们希望从一个特定的 ChannelHandler 开始处理。你必须引用于此 ChannelHandler 的前一个 ChannelHandler 关联的 ChannelHandlerContext,利用它调用与自身关联的 ChannelHandler 的下一个 ChannelHandler。如下:
ChannelHandlerContext ctx = context; // 获得 ChannelHandlerContext引用
// write()将会把缓冲区发送到下一个ChannelHandler
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
流程如下:
- 直接从特定的 ChannelHandler 开始执行
- 事件发送到下一个 ChannelHandler
- 经过最后一个 ChannelHandler 后,事件从 ChannelPipeline 移除