netty解析自定义长度的tcp报文--java处理tcp接口数据

时间:2024-11-13 18:00:03

1.理解netty的decode处理过程,的Bytebuf在decode时的处理过程,两个指针

netty是tcp包–>decode整理报文–>业务handler的过程。decode如果()但是没有读取报文,报DecoderException说你decode但是没有读取任何报文,这是netty为了防止自己开发decode出现bug

(0)会返回一个空bytebuf导致
list.add((0)) 
报错decode() did not read anything but decoded a message

要理解netty的Bytebuf,有两个指针readerIndex,writerIndex来记录你的decode的读与netty的报文写入,可以看看这个
一起学Netty(五)之 初识ByteBuf和ByteBuf的常用API

decode中(msg)把你decode整理的一个完整报文放入list给业务handler处理。由于异步,会先调用多次decode,然后是多次的业务handler(MyHandler), MyHandler中的channelRead的Object msg就是你()加入的一个报文

如果报文中有其他冗余的信息,可以在decode中设置ByteBuf的readerIndex去略过这个信息,或者可以设置一个waste的 byte数组,让…(waste),但是waste不加入list,就是空读,只读不用,来清除冗余信息,也可以在()前,加入一些信息到报文中

提供模拟tcp接口数据的源,可以自己定义byte[]或者保存的tcp接口的报文写入的文件

import ;
import ;
import ;
import ;
import ;
import ;
import .*;

public class Test
{

    public static void main(String[] args) throws Exception
    {
        ServerSocket serverSocket = new ServerSocket(10086);


        while (true){
            Socket socket = ();
            ("accept");

//                byte[] resp = ("a@$@bbbbb@$@cccccccccc@$@ddddddddddddddddddddddddddddddddddddddd@$@33333333333@$@4              v@$@").getBytes();
//                InputStream inputStream = new FileInputStream(new File("D:/proj/netty/"));
                InputStream inputStream = new FileInputStream(new File(""));
                OutputStream outputStream = ();
                byte[] readBuf = new byte[10240];
                int readLen;
                readLen= (readBuf,0,10240);
                (readLen);
                (readBuf,0,readLen);
                ();
                ();
            }
    }

}


netty的启动类

package com.tgram.api2Mq;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;



public class TcpInterface {

    public void connect(int port,String host) throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ch.pipeline().addLast(new MyDecodeHandler());
                            ch.pipeline().addLast(new MyHandler());
                        }
                    });
            ChannelFuture f = b.connect(host,port).sync();

            f.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        TcpInterface tcpInterface = new TcpInterface();
        tcpInterface.connect(10086,"127.0.0.1");
    }
}

自定义解码拼装成一个完整报文(就是我们定义的一个完整的信息报文,不是tcp的底层一个报文,tcp的报文是分片,是一串字节流)


import ;
import ;
import ;
import ;

import ;

public class MyDecodeHandler  extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        //取到tag与报文长度后再处理,各2字节
        if(()<4){
            return;
        }
        //记录当前ByteBuf的读指针位置,以便下面取报文长度字节
        //pos是一个完整报文的开始位置,报文整体会在ByteBuf中移动,类似内存管理,所以基于字节的判断报文长度等等,都是基于pos,否则可以在()之后加,();整理ByteBuf,使pos回到0开始位置
        int pos = ();
        int msgLen = (((pos +3) &0xFF)<<8) | ((pos+2) &0xFF);

        //收到的报文长度不足一个完整的报文,继续接收
        if(()<msgLen){
            return;
        }

        //提出完整报文(readBytes读到msg中),放到list给下一个Handler处理
        if(msgLen>0){
            (msg);
        }
     }
}

正式处理解码后的报文


import ;
import ;
import ;
import ;

public class MyHandler extends ChannelInboundHandlerAdapter
{
    private int count = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        ByteBuf byteBuf = (ByteBuf) msg;
        // 判断报文长度
        int bufLen = ();
        //当前处理的是第几个字节开始的报文
        ("pos:" + count);
        //统计已处理的所有报文字节长度
        count += bufLen;

        ("msg:" + bufLen);

        //业务的报文处理

        //必须释放,如果继承simplechannelinboundhandler会自动释放,但是报文处理写在channelRead0
        (msg)
    }
}