Happy Netty5 客户端和服务端 自定义协议通信

时间:2021-09-16 08:57:00


网上好多连接好多demo都不是netty5的,都是以前的版本,况且还有好多运行时老报错。入门级demo就不写了,估计都是那些老套路。好多公司都会有最佳实践,今天就说说如何自定义协议,一般自定义协议都是公司内部各个部门定义的,当然了我写的比较简单。


注意:

本教程是采用netty-all-5.0.0.Alpha2.jar,netty5的版本,不是网上很多的例子都是采用以前老大版本


自定义协议:

协议 {

  协议头(header)

  消息体(body)

}


header格式 {

  固定头,

  命令码,

  版本号,

  长度

}


自定义协议嘛,都是可以自己定义实现的,内部商量好就可以

package com.nio.netty;
import java.io.Serializable;
import java.util.Arrays;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgHeader implements Serializable{
    //固定头
    private byte startTag;

    //命令码,4位
    private byte[] cmdCode;

    //版本 2位
    private byte[] version;

    private int length;

    public byte[] getVersion() {
        return version;
    }

    public void setVersion(byte[] version) {
        this.version = version;
    }

    public byte[] getCmdCode() {
        return cmdCode;
    }

    public void setCmdCode(byte[] cmdCode) {
        this.cmdCode = cmdCode;
    }

    public byte getStartTag() {
        return startTag;
    }

    public void setStartTag(byte startTag) {
        this.startTag = startTag;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    @Override
    public String toString() {
        return "MsgHeader{" +
                "startTag=" + startTag +
                ", cmdCode=" + Arrays.toString(cmdCode) +
                ", version=" + Arrays.toString(version) +
                ", length=" + length +
                '}';
    }
}


package com.nio.netty;/** * Created by sdc on 2017/8/26. */public class Message {    private MsgHeader header;    private Object body;    //检验和//    private byte crcCode;//    public byte getCrcCode() {//        return crcCode;//    }////    public void setCrcCode(byte crcCode) {//        this.crcCode = crcCode;//    }    public MsgHeader getHeader() {        return header;    }    public void setHeader(MsgHeader header) {        this.header = header;    }    public Object getBody() {        return body;    }    public void setBody(Object body) {        this.body = body;    }    @Override    public String toString() {        return "Message{" +                "header=" + header +                ", body=" + body +//                ", crcCode=" + crcCode +                '}';    }}


消息格式定义完成后,需要编码和解码,这里采用ByteToMessageDecoder MessageToByteEncoder,这两个编解码。

package com.nio.netty;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;/** * Created by sdc on 2017/8/26. */public class MsgEncoder extends MessageToByteEncoder {    public static byte getIndexToByte(int i, int index){        if(index == 0){            return (byte)(i % 10);        }else{            int num = (int)Math.pow(10, index);            return (byte)((i / num) % 10);        }    }    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf out) throws Exception {        if (o instanceof Message) {            try {                Message msg = (Message)o;                if (msg == null || msg.getHeader() == null) {                    throw new Exception("The encode message is null");                }                out.writeByte(msg.getHeader().getStartTag());                out.writeBytes(msg.getHeader().getCmdCode());                //占位                byte[] lengthBytes = new byte[]{0, 0, 0, 0};                out.writeBytes(lengthBytes);                out.writeBytes(msg.getHeader().getVersion());                String body = (String) msg.getBody();                int length = 0;                if (body != null) {                    byte[] bodyBytes = body.getBytes();                    out.writeBytes(bodyBytes);                    length = bodyBytes.length;//                    if (Constants.CRCCODE_DEFAULT != msg.getCrcCode()) {//                        msg.setCrcCode(CRC8.calcCrc8(bodyBytes));//                    }//                    msg.setCrcCode();                }                //长度从int转换为byte[4]                byte l1 = getIndexToByte(length, 3);                byte l2 = getIndexToByte(length, 2);                byte l3 = getIndexToByte(length, 1);                byte l4 = getIndexToByte(length, 0);                lengthBytes = new byte[]{l1, l2, l3, l4};                out.setBytes(5, lengthBytes);                System.out.println("encoder:" + msg.getBody());//                out.writeByte(msg.getCrcCode());            }catch(Exception e){                e.printStackTrace();                throw e;            }        }    }}
package com.nio.netty;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelPromise;import io.netty.handler.codec.ByteToMessageCodec;import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;/** * Created by sdc on 2017/8/26. */public class MsgDecoder extends ByteToMessageDecoder {    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {        try{            //内部协议这块可以约定一下,超过多大的长度就不可以了。//            if(in.readableBytes() < 12){//                return;//            }            System.out.println("开始解码消息,消息长度:" + in.readableBytes());            in.markReaderIndex();            //设置一些消息的属性            Message message = new Message();            MsgHeader header = new MsgHeader();            header.setStartTag(in.readByte());            byte[] cmdCode = new byte[4];            in.readBytes(cmdCode);            header.setCmdCode(cmdCode);            System.out.println(new String(cmdCode, "UTF-8"));            //长度从byte[4]转int            byte[] lengthBytes = new byte[4];            in.readBytes(lengthBytes);            int length = toInt(lengthBytes);            System.out.println("header:" + length);            header.setLength(length);            if(length < 0 || length > 10240){//过长消息或不合法消息                throw new IllegalArgumentException("wrong message length");            }            byte[] version = new byte[2];            in.readBytes(version);            header.setVersion(version);            System.out.println("version:" + new String(version, "UTF-8"));            if(header.getLength() > 0){                System.out.println("bytebuffer可读的范围" + in.readableBytes());                if(in.readableBytes() > length + 1){                    in.resetReaderIndex();                    System.out.println("返回了");                    return;                }                //读取body里的内容                byte[] bodyBytes = new byte[header.getLength()];                in.readBytes(bodyBytes);                message.setBody(new String(bodyBytes, "UTF-8"));            }            //crccode暂时去掉//            message.setCrcCode(in.readByte());            //设置头部            message.setHeader(header);            System.out.println("body:" + message.getBody());            out.add(message);        }catch(Exception e){            e.printStackTrace();            throw e;        }    }    public static int toInt(byte[] bytes){        int value = 0;        for(int i=0; i<bytes.length; i++){            int num = (int)Math.pow(10, bytes.length - 1 - i);            value += num * bytes[i];        }        return value;    }}


消息的类已经处理完了,下面来开始写客户端和服务端的代码。

客户端代码:

package com.nio.netty;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.ByteBufUtil;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;/** * Created by sdc on 2017/8/26. */public class MsgClient {    public void connect(String ip, int port) throws Exception {        EventLoopGroup workerGroup = new NioEventLoopGroup();        Message message = new Message();        String msgStr = "我想发送一条消息";        MsgHeader header = new MsgHeader();        header.setStartTag(new Byte("0"));        header.setCmdCode("1234".getBytes());        header.setLength(msgStr.length());        header.setVersion("11".getBytes());        message.setBody(msgStr);        message.setHeader(header);        try {            Bootstrap bs = new Bootstrap();            bs.group(workerGroup).channel(NioSocketChannel.class).handler(new ChildChannelHandler(message));            ChannelFuture f = bs.connect(ip,port).sync();            //写入消息            f.channel().writeAndFlush(message).sync();            f.channel().closeFuture().sync();        } finally {            workerGroup.shutdownGracefully();        }    }    public static class ChildChannelHandler extends ChannelInitializer {        Message message;        public ChildChannelHandler(Message message) {            this.message = message;        }        @Override        protected void initChannel(Channel channel) throws Exception {            channel.pipeline().addLast(new MsgDecoder())                    .addLast(new MsgEncoder());        }    }    public static void main(String[] args){        try {            new MsgClient().connect("127.0.0.1", 9080);        } catch (Exception e) {            e.printStackTrace();        }    }}


服务端代码

package com.nio.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * Created by sdc on 2017/8/26. */public class MsgServer {    public void bind(int port) throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap sb = new ServerBootstrap();            sb.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)                    .option(ChannelOption.SO_BACKLOG, 1024)                    .childHandler(new ChildChannelHandler());            ChannelFuture cf = sb.bind(port).sync();            cf.channel().closeFuture().sync();        }finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }    public static class ChildChannelHandler extends ChannelInitializer {        @Override        protected void initChannel(Channel channel) throws Exception {            channel.pipeline()                    .addLast(new MsgDecoder())                    .addLast(new MsgEncoder());        }    }    public static void main(String[] args){        try {            new MsgServer().bind(9080);        } catch (Exception e) {            e.printStackTrace();        }    }}


运行的时候先运行服务端,否则胡报如下的错误,

-all-5.0.0.Alpha2.jar com.nio.netty.MsgClientjava.net.ConnectException: Connection refused: no further information: /127.0.0.1:9080at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:223)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:276)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:531)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)at io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)at io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)at io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)Process finished with exit code 0


成功会打印以下信息:

开始解码消息,消息长度:35开始解码消息,消息长度2222:341234header:24version:11可读的范围24body:我想发送一条消息Process finished with exit code 1


基本流程已经走完,学框架,我还是习惯先写例子,然后再学概念性的东西,会理解的更深刻一些。


参考了这篇文章,但是这篇文章似乎看不了结果,所以还是自己改了改,写了点东西。

http://www.blogs8.cn/posts/Wx8Sd43


本文出自 “不积跬步无以至千里” 博客,请务必保留此出处http://shangdc.blog.51cto.com/10093778/1959680