一、概念
网络传输的单位是字节,如何将应用程序的数据转换为字节,以及将字节转换为应用程序的数据,就要说到到我们该篇介绍的编码器和解码器。
将应用程序的数据转换为网络格式,以及将网络格式转换为应用程序的数据的组件分别叫作编码器和解码器,同时具有这两种功能的单一组件叫作编解码器。 Netty 提供了一系列用来创建所有这些编码器、解码器以及编解码器的工具,还可以按需定制通用的消息转换编解码器。
Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是一种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。
对于编码器和解码器来说,其过程也是相当的简单:一旦消息被编码或者解码,它就会被 ReferenceCountUtil.release(message)调用自动释放。如果你需要保留引用以便稍后使用,那么你可以调用 ReferenceCountUtil.retain(message)方法。这将会增加该引用计数,从而防止该消息被释放。
二、编码器
编码器将应用程序的数据转换为网络格式,出站消息时被调用,主要有两类:
1、将消息编码为字节:MessageToByteEncoder
public abstract class MessageToByteEncoder<I> extends ChannelHandlerAdapter{}
public class ShortToByteEncoder extends MessageToByteEncoder<Short> { /** * 1、类型为 I 的出站消息被编码为 ByteBuf * 2、该 ByteBuf 随后将会被转发给 ChannelPipeline中的下一个 ChannelOutboundHandler。 */ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Short aShort, ByteBuf byteBuf) throws Exception { byteBuf.writeShort(aShort); } }
2、将消息编码为消息:MessageToMessageEncoder
public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter{}
public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> { /** * 1、类型为 I 的出站消息被编码为目标类型 存入List 中 * 2、该 List 随后将会被转发给 ChannelPipeline中的下一个 ChannelOutboundHandler。 */ @Override protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception { out.add(String.valueOf(msg)); } }
三、解码器
解码器将网络格式转换为应用程序的数据,入站消息时被调用。
解码器比编码器多了一个 decodeLast 方法,原因是解码器通常需要在 Channel 关闭之后产生最后一个消息。这显然不适用于编码器的场景 —— 在连接被关闭之后仍然产生一个消息是毫无意义的。所以,当 Channel 的状态变为非活动时,这个方法将会被调用一次。可以重写该方法以提供特殊的处理。
解码器主要有两类:
1、将字节解码为消息:ByteToMessageDecoder 和 ReplayingDecoder
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {}
public class MyDecoder extends ByteToMessageDecoder { private static final int MAX_FRAME_SIZE = 1024; /** * 1、该方法被调用时,将会传入一个包含了传入数据的 ByteBuf,以及一个用来添加解码消息的 List. * 2、对该方法的调用将会重复进行,直到确定没有新的元素被添加到该 List,或者Butebuf 没有更多可读取的字节为止。 * 3、List 的内容将会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。 */ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { int readableBytes = byteBuf.readableBytes(); //不能让解码器缓冲大量的数据以致于耗尽可用的内存 if (readableBytes > MAX_FRAME_SIZE){ //跳过所有的可读字节 byteBuf.skipBytes(readableBytes); throw new TooLongFrameException("数据超过可缓存字节..."); } //假设需要解析 int 类型的消息(int 4个字节) if (readableBytes > 4){ list.add(byteBuf.readInt()); } } }
----分割线----
//类型参数 S 指定了用于状态管理的类型,其中 Void 代表不需要状态管理。 public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder{}
public class MyReplayingDecoder extends ReplayingDecoder<Void> { /** * 1、ReplayingDecoder 扩展了 ByteToMessageDecoder,并且自定义了 ByteBuf 的实现 ReplayingDecoderByteBuf。 * 2、ReplayingDecoderByteBuf 对要转换的消息的字节数进行内部管理,如果没有足够的字节使用,将会抛出一个 Signal,由ReplayingDecoder进行处理。 * * @param byteBuf 传入的 ByteBuf 实际上是 ReplayingDecoderByteBuf*/ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { list.add(byteBuf.readInt()); } }
继承 ByteToMessageDecoder 和继承 ReplayingDecoder 有什么区别呢?ReplayingDecoder 唯一的好处就是解码的时候不用进行字节数的判断,如上 ,因为它交由自定义的 ReplayingDecoderByteBuf 去处理了。但是 ReplayingDecoder 的效率稍慢于ByteToMessageDecoder。一般我们在这两个解码器中进行抉择的准则是:如果使用 ByteToMessageDecoder 不会引入太多的复杂性,那么请使用它;否则,请使用 ReplayingDecoder!
2、将消息解码为消息:MessageToMessageDecoder
public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter{}
public class IntegerToStringDecoder extends MessageToMessageEncoder<Integer> { /** * 1、对于每个需要被解码为另一种格式的入站消息来说,该方法将会被调用。 * 2、解码消息随后会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler */ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Integer integer, List<Object> list) throws Exception { list.add(String.valueOf(integer)); } }
四、编解码器
Netty 的抽象编解码器类捆绑一个解码器/编码器对,主要用于在同一个类中管理入站和出站数据和消息的转换。
个人觉得这个编解码器略显鸡肋呀,还是喜欢将编码器和解码器分开来写。因为 Netty 设计的一个基本准则就是:尽可能地将两种功能(编码器、解码器)分开,最大化代码的可重用性和可扩展性。
编解码器也主要有两类:
1、字节消息编解码器:ByteToMessageCodec
public abstract class ByteToMessageCodec<I> extends ChannelHandlerAdapter {}
public class MyBytetoMessageCodec extends ByteToMessageCodec<Short> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { while (in.readableBytes() > 4){ out.add(in.readInt()); } } @Override protected void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception { out.writeShort(msg); } }
2、消息转换编解码器:MessageToMessageCodec
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelHandlerAdapter {}
public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> { /** * 1、对于每个 OUTBOUND_IN 类型的消息,这个方法都会被调用。 * 2、这个消息将会被编码为 INBOUND_IN 类型的消息。 * 3、然后被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler */ @Override protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception { ByteBuf byteBuf = msg.getByteBuf().duplicate().retain(); switch (msg.getFrameType()) { case BINARY: out.add(new BinaryWebSocketFrame(byteBuf)); break; case TEXT: out.add(new TextWebSocketFrame(byteBuf)); break; case CLOSE: out.add(new CloseWebSocketFrame(true, 0, byteBuf)); break; case CONTINUATION: out.add(new ContinuationWebSocketFrame(byteBuf)); break; case PONG: out.add(new PongWebSocketFrame(byteBuf)); break; case PING: out.add(new PingWebSocketFrame(byteBuf)); default: break; } } /** * 1、传入 INBOUND_IN 类型的消息,该方法会被调用。 * 2、这个消息会被解码为 OUTBOUND_IN 类型的消息。 * 3、然后被转发给 ChannelPipeline 中的下一个 ChannelInboundHandler */ @Override protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception { ByteBuf byteBuf = msg.content().duplicate().retain(); if (msg instanceof BinaryWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, byteBuf)); } else if (msg instanceof CloseWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, byteBuf)); } else if (msg instanceof TextWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, byteBuf)); } else if (msg instanceof PingWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, byteBuf)); } else if (msg instanceof PongWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, byteBuf)); } else if (msg instanceof ContinuationWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, byteBuf)); } else { throw new IllegalStateException("Unsupported websocket msg " + msg); } } public static final class MyWebSocketFrame { public enum FrameType { BINARY, CLOSE, PING, PONG, TEXT, CONTINUATION } private final FrameType frameType; private final ByteBuf byteBuf; public MyWebSocketFrame(FrameType frameType, ByteBuf byteBuf) { this.frameType = frameType; this.byteBuf = byteBuf; } public FrameType getFrameType() { return frameType; } public ByteBuf getByteBuf() { return byteBuf; } } }
参考资料:《Netty IN ACTION》