一:ByteToMessageDecoder的作用
ByteToMessageDecoder在Netty中主要是用来解决半包积累的问题,是一种解码器,LineBasedFrameDecoder ,DelimiterBasedFrameDecoder,FixedLengthFrameDecoder都是其的一种具体的实现。因为要想netty解决半包拆包问题,需要从认识ByteToMessageDecoder源码开始。
二:具体实现
首先带着一种思路,当从TCP无边界的字节流数据中找出相应的完整的数据,一般的思路是:
首先,不停的读取TCP中的获取数据
1.如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包
2.如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。
带着这种想法,我们来看一下ByteToMessageDecoder的源码
首先,ByteToMessageDecoder是继承Handler的,所以也继承了handler的非私有属性
将 ByteToMessageDecoder字节流消息解码器分为三个部分讲解。
先看下部分成员变量的含义:
1)字节流消息的累加器
先看ByteToMessageDecoder内部定义的一个接口
主要作用是定义字节流缓冲累加规则。
ByteToMessageDecode定义了两种类型的数据缓冲累加机制:
这是一种内存复制的累加缓冲方式:
这是非内存复制方式实现缓冲累加,使用API 中CompositeByteBuf进行累加
通过:
可以看出默认使用第一种累加机制。
详细看下内存复制的累加方式:
内存复制方式扩充缓冲
即通过内存复制的方式,把待读取的缓冲累加到累加器中,并释放待读取缓冲的内存
2)字节流消息解码后存储到CodecOutputList
CodecOutputList是一个list集合,
ByteToMessageDecoder会使用它作为容器来存放,解码后的对象。
具体CodecOutputList实例化步骤
创建一个线程变量ThreadLocal里面存放CodecOutputLists数组,CodecOutputLists中的每个值存放的是CodecOutputList,所以可以理解为一个二维数据
分配空数组并返回一个数组集合:
CodecOutputList 可以理解为一个对象吃,每次channelRead()方法的时候,都会从对象池中拿出一个对象数组,用来存放从cumulation缓冲累积容器中解码出来的对象, 并把CodecOutputList对象数组的数据,传输到下一个handler处理器处理。
3)channelRead() 方法读取处理IO消息
channelRead() 方法是读取和处理IO消息的
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// 1.从对象池中取出一个对象数组,用来存放cumulator缓冲积累器中解码后的对象
CodecOutputList out = CodecOutputList.newInstance();
try {
//2.先积累数据
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 3.解码积累器中的缓冲数据,并把结果添加到out对象数组中
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
//4.释放积累器中的内存
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
//5.将out对象池中的对象,即解码成功的对象,传输到其他channel继续后续业务流程
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
// 发送到下一个handler处理
ctx.fireChannelRead(msg);
}
}
解码缓冲积累器中的数据到out中
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 1、只要积累器in中有数据,就读取数据
while (in.isReadable()) {
int outSize = out.size();
//2、如果 对象池中有已经解码的对象数据,则先把解码后的对象发出去
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
//3、解码数据in 到out对象数组中
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
解码缓冲积累器中的缓冲数据:
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 此为抽象方法,为子类自定义实现,由此可以看出,ByteToMessageDecoder采用的是模板方法的设计模式
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
解码方法
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
由此可以看出,采用模板方法的设计模式,有自定义的子类来实现,根据不同情况来对 缓冲积累器中的数据进行解码,最后添加到out数组中
总结:
ByteToMessageDecoder这个类的主要作用的对TCP传输字节流进行解码,以编码TCP传输的粘包/拆包问题
具体步骤是:
1、通过channelRead()方法为入口(进行TCP消息处理的入口),将TCP中获得的消息都存放到cumulation缓冲累积器中
2、通过将缓冲累积器中的数据,进行解码处理,并将解码后的数据存放到CodecOutputList对象数组中
3、CodecOutputList对象数组中中的数据发出去,发送到下个handler进行进一步业务处理
4、解码过程由用户根据不同的编码需求自定义实现decode(ChannelHandlerContext ctx, ByteBuf in, List out)方法进行解码
理解此ByteToMessageDecoder,再来看其实现LineBasedFrameDecoder ,DelimiterBasedFrameDecoder,FixedLengthFrameDecoder发现只要实现decode()方法即可,并按照指定的分割条件进行字节流数据分割,然后将处理好的数据加入到out的集合中即可。