1.理解netty的decode处理过程,的Bytebuf在decode时的处理过程,两个指针
netty是tcp包–>decode整理报文–>业务handler的过程。decode如果()但是没有读取报文,报DecoderException说你decode但是没有读取任何报文,这是netty为了防止自己开发decode出现bug
(0)会返回一个空bytebuf导致
list.add((0))
报错decode() did not read anything but decoded a message
要理解netty的Bytebuf,有两个指针readerIndex,writerIndex来记录你的decode的读与netty的报文写入,可以看看这个
一起学Netty(五)之 初识ByteBuf和ByteBuf的常用API
decode中(msg)把你decode整理的一个完整报文放入list给业务handler处理。由于异步,会先调用多次decode,然后是多次的业务handler(MyHandler), MyHandler中的channelRead的Object msg就是你()加入的一个报文
如果报文中有其他冗余的信息,可以在decode中设置ByteBuf的readerIndex去略过这个信息,或者可以设置一个waste的 byte数组,让…(waste),但是waste不加入list,就是空读,只读不用,来清除冗余信息,也可以在()前,加入一些信息到报文中
提供模拟tcp接口数据的源,可以自己定义byte[]或者保存的tcp接口的报文写入的文件
import ;
import ;
import ;
import ;
import ;
import ;
import .*;
public class Test
{
public static void main(String[] args) throws Exception
{
ServerSocket serverSocket = new ServerSocket(10086);
while (true){
Socket socket = ();
("accept");
// byte[] resp = ("a@$@bbbbb@$@cccccccccc@$@ddddddddddddddddddddddddddddddddddddddd@$@33333333333@$@4 v@$@").getBytes();
// InputStream inputStream = new FileInputStream(new File("D:/proj/netty/"));
InputStream inputStream = new FileInputStream(new File(""));
OutputStream outputStream = ();
byte[] readBuf = new byte[10240];
int readLen;
readLen= (readBuf,0,10240);
(readLen);
(readBuf,0,readLen);
();
();
}
}
}
netty的启动类
package com.tgram.api2Mq;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
public class TcpInterface {
public void connect(int port,String host) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyDecodeHandler());
ch.pipeline().addLast(new MyHandler());
}
});
ChannelFuture f = b.connect(host,port).sync();
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
TcpInterface tcpInterface = new TcpInterface();
tcpInterface.connect(10086,"127.0.0.1");
}
}
自定义解码拼装成一个完整报文(就是我们定义的一个完整的信息报文,不是tcp的底层一个报文,tcp的报文是分片,是一串字节流)
import ;
import ;
import ;
import ;
import ;
public class MyDecodeHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
//取到tag与报文长度后再处理,各2字节
if(()<4){
return;
}
//记录当前ByteBuf的读指针位置,以便下面取报文长度字节
//pos是一个完整报文的开始位置,报文整体会在ByteBuf中移动,类似内存管理,所以基于字节的判断报文长度等等,都是基于pos,否则可以在()之后加,();整理ByteBuf,使pos回到0开始位置
int pos = ();
int msgLen = (((pos +3) &0xFF)<<8) | ((pos+2) &0xFF);
//收到的报文长度不足一个完整的报文,继续接收
if(()<msgLen){
return;
}
//提出完整报文(readBytes读到msg中),放到list给下一个Handler处理
if(msgLen>0){
(msg);
}
}
}
正式处理解码后的报文
import ;
import ;
import ;
import ;
public class MyHandler extends ChannelInboundHandlerAdapter
{
private int count = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
ByteBuf byteBuf = (ByteBuf) msg;
// 判断报文长度
int bufLen = ();
//当前处理的是第几个字节开始的报文
("pos:" + count);
//统计已处理的所有报文字节长度
count += bufLen;
("msg:" + bufLen);
//业务的报文处理
//必须释放,如果继承simplechannelinboundhandler会自动释放,但是报文处理写在channelRead0
(msg)
}
}