前面已经说过:
TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,往往采用如下4种方式。
(1)消息长度固定:累计读取到固定长度为LENGTH之后就认为读取到了一个完整的消息。然后将计数器复位,重新开始读下一个数据报文。(2)回车换行符作为消息结束符:在文本协议中应用比较广泛。
(3)将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符。
(4)通过在消息头中定义长度字段来标示消息的总长度。
netty中针对这四种场景均有对应的解码器作为解决方案,比如:
(1)通过FixedLengthFrameDecoder 定长解码器来解决定长消息的黏包问题;
(2)通过LineBasedFrameDecoder和StringDecoder来解决以回车换行符作为消息结束符的TCP黏包的问题;
(3)通过DelimiterBasedFrameDecoder 特殊分隔符解码器来解决以特殊符号作为消息结束符的TCP黏包问题;
(4)最后一种,也是本文的重点,通过LengthFieldBasedFrameDecoder 自定义长度解码器解决TCP黏包问题。
大多数的协议在协议头中都会携带长度字段,用于标识消息体或则整包消息的长度。LengthFieldBasedFrameDecoder通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息,只要传入正确的参数,就可以轻松解决“读半包”的问题。
1. LengthFieldBasedFrameDecoder功能说明
下面我们通过实例来看如何通过配置不同的参数组合来实现不同的半包读取策略。
(1)场景一:消息的第一个字段是长度字段,后面是消息体,消息头中只包含一个长度字段,消息结构定义如下:
在解码前字节缓冲区占了14个字节,其中前两个字节是标识长度的字节,后面12个字节是消息体。
使用的组合参数如下:
1) lengthFieldOffset = 0;//长度字段的偏差
2) lengthFieldLength = 2;//长度字段占的字节数
3) lengthAdjustment = 0;//添加到长度字段的补偿值
4) initialBytesToStrip = 0。//从解码帧中第一次去除的字节数
解码后的字节缓冲区的内容是:
解码后还是14个字节。
(2)场景二:通过ByteBuf.readableBytes()方法我们可以获取当前消息的长度,所以解码后的字节缓冲区可以不携带长度字段,由于长度字段在起始位置并且长度为2,所以将initialBytesToStrip设置为2。参数组合修改为:
1) lengthFieldOffset = 0;
2) lengthFieldLength = 2;
3) lengthAdjustment = 0;
4) initialBytesToStrip = 2。
这时候解码后字节缓冲区的数据就是:
很明显跳过了长度字段后的字节缓冲区就只有12个字节了。
解码后的字节缓冲区丢弃了长度字段,仅仅包含消息体,对于大多数的协议,解码之后消息长度没有用处,因此可以丢弃。
(3)场景三:在大多数的应用场景中,长度字段仅用来标识消息体的长度,这类协议通常由消息长度字段+消息体组成,如上图所示的几个例子。但是,对于某些协议,长度字段还包含了消息头的长度。在这种应用场景中,往往需要使用lengthAdjustment进行修正。由于整个消息(包含消息头)的长度往往大于消息体的长度,所以,lengthAdjustment为负数。下图展示了通过指定lengthAdjustment字段来包含消息头的长度:
1) lengthFieldOffset = 0;
2) lengthFieldLength = 2;
3) lengthAdjustment = -2;
4) initialBytesToStrip = 0。
解码之前的码流:
包含字段长度的码流:
解码只有的码流是:
(4)场景四:但是由于协议的种类繁多,并不是所有的协议都将长度字段放在消息头的首位,当标识消息长度的字段位于消息头的中间或者尾部时,需要使用lengthFieldOffset字段进行标识,下面的参数组合给出了如何解决消息长度字段不在首位的问题:
1) lengthFieldOffset = 2;
2) lengthFieldLength = 3;
3) lengthAdjustment = 0;
4) initialBytesToStrip = 0。
其中lengthFieldOffset表示长度字段在消息头中偏移的字节数,lengthFieldLength 表示长度字段自身的长度,解码效果如下:
解码之前:
解码之后:
由于消息头1的长度为2,所以长度字段的偏移量为2;消息长度字段Length为3,所以lengthFieldLength值为3。由于长度字段仅仅标识消息体的长度,所以lengthAdjustment和initialBytesToStrip都为0。
(5)场景五:最后一种场景是长度字段夹在两个消息头之间或者长度字段位于消息头的中间,前后都有其它消息头字段,在这种场景下如果想忽略长度字段以及其前面的其它消息头字段,则可以通过initialBytesToStrip参数来跳过要忽略的字节长度,它的组合配置示意如下:
1) lengthFieldOffset = 1;
2) lengthFieldLength = 2;
3) lengthAdjustment = 1;
4) initialBytesToStrip = 3。
解码之前16字节:
解码之后13字节:
由于HDR1的长度为1,所以长度字段的偏移量lengthFieldOffset为1;长度字段为2个字节,所以lengthFieldLength为2。由于长度字段是消息体的长度,解码后如果携带消息头中的字段,则需要使用lengthAdjustment进行调整,此处它的值为1,代表的是HDR2的长度,最后由于解码后的缓冲区要忽略长度字段和HDR1部分,所以lengthAdjustment为3。解码后的结果为13个字节,HDR1和Length字段被忽略。
事实上,通过4个参数的不同组合,可以达到不同的解码效果,用户在使用过程中可以根据业务的实际情况进行灵活调整。
2. LengthFieldBasedFrameDecoder源码解析
下面我们就来看看基于消息长度的半包解码器,首先看看入口方法:
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
内部调用decode(ChannelHandlerContext ctx, ByteBuf in)
如果解码成功,就将其加入到输出的List out列表中。该函数较长我们还是分几部分来分析:
(1)判断discardingTooLongFrame标识,看是否需要丢弃当前可读的字节缓冲区,如果为真,则执行求其操作。
if (discardingTooLongFrame) {
//获取需要丢弃的长度
long bytesToDiscard = this.bytesToDiscard;
//丢弃的长度不能超过当前缓冲区可读的字节数
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
//跳过需要忽略的字节长度
in.skipBytes(localBytesToDiscard);
//bytesToDiscard减去已经忽略的字节长度
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(false);
}
(2)对当前缓冲区中可读字节数和长度偏移量进行对比,如果小于偏移量,谁明缓冲区数据报不够,直接返回null.
//数据报内数据不够,返回null,由IO线程继续读取数据。
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"negative pre-adjustment length field: " + frameLength);
}
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
in.skipBytes(lengthFieldEndOffset);
throw new CorruptedFrameException(
"Adjusted frame length (" + frameLength + ") is less " +
"than lengthFieldEndOffset: " + lengthFieldEndOffset);
}
再后面的具体代码就不分析了,其实核心就是:对消息进行解码,解码之后将解码后的字节数据放到一个新的ByteBuf中返回,并更新原来的消息msg对象的读写索引值。
3. LengthFieldPrepender功能说明
如果协议中的第一个字段为长度字段,netty提供了LengthFieldPrepender编码器,它可以计算当前待发送消息的二进制字节长度,将该长度添加到ByteBuf的缓冲区头中,如图所示:
通过LengthFieldPrepender可以将待发送消息的长度写入到ByteBuf的前2个字节,编码后的消息组成为长度字段+原消息的方式。
通过设置LengthFieldPrepender为true,消息长度将包含长度字段占用的字节数,打开LengthFieldPrepender后,图3-3示例中的编码结果如下图所示:
4. LengthFieldPrepender源码解析
LengthFieldPrepender工作原理分析如下:首先对长度字段进行设置,如果需要包含消息长度自身,则在原来长度的基础之上再加上lengthFieldLength的长度。
如果调整后的消息长度小于0,则抛出参数非法异常。对消息长度自身所占的字节数进行判断,以便采用正确的方法将长度字段写入到ByteBuf中,共有以下6种可能:
1) 长度字段所占字节为1:如果使用1个Byte字节代表消息长度,则最大长度需要小于256个字节。对长度进行校验,如果校验失败,则抛出参数非法异常;若校验通过,则创建新的ByteBuf并通过writeByte将长度值写入到ByteBuf中;
2) 长度字段所占字节为2:如果使用2个Byte字节代表消息长度,则最大长度需要小于65536个字节,对长度进行校验,如果校验失败,则抛出参数非法异常;若校验通过,则创建新的ByteBuf并通过writeShort将长度值写入到ByteBuf中;
3) 长度字段所占字节为3:如果使用3个Byte字节代表消息长度,则最大长度需要小于16777216个字节,对长度进行校验,如果校验失败,则抛出参数非法异常;若校验通过,则创建新的ByteBuf并通过writeMedium将长度值写入到ByteBuf中;
4) 长度字段所占字节为4:创建新的ByteBuf,并通过writeInt将长度值写入到ByteBuf中;
5) 长度字段所占字节为8:创建新的ByteBuf,并通过writeLong将长度值写入到ByteBuf中;
6) 其它长度值:直接抛出Error。
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
int length = msg.readableBytes() + lengthAdjustment;
if (lengthIncludesLengthFieldLength) {
length += lengthFieldLength;
}
if (length < 0) {
throw new IllegalArgumentException(
"Adjusted frame length (" + length + ") is less than zero");
}
switch (lengthFieldLength) {
case 1:
if (length >= 256) {
throw new IllegalArgumentException(
"length does not fit into a byte: " + length);
}
out.add(ctx.alloc().buffer(1).order(byteOrder).writeByte((byte) length));
break;
case 2:
if (length >= 65536) {
throw new IllegalArgumentException(
"length does not fit into a short integer: " + length);
}
out.add(ctx.alloc().buffer(2).order(byteOrder).writeShort((short) length));
break;
case 3:
if (length >= 16777216) {
throw new IllegalArgumentException(
"length does not fit into a medium integer: " + length);
}
out.add(ctx.alloc().buffer(3).order(byteOrder).writeMedium(length));
break;
case 4:
out.add(ctx.alloc().buffer(4).order(byteOrder).writeInt(length));
break;
case 8:
out.add(ctx.alloc().buffer(8).order(byteOrder).writeLong(length));
break;
default:
throw new Error("should not reach here");
}
out.add(msg.retain());
}