Android Netty开发示例

时间:2025-01-25 07:20:38

Android Netty开发示例

本文涉及到其它的基础知识有:

  • 对数据的字节处理
  • 对数据进行CRC32校验
  • 对数据的内容进行Blowfish加/解密

这部分内容不会在本文深入探究,在“代码片段讲解”部分会提到一二。


Netty是什么

Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。扩展阅读

代码片段讲解

package ;

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;

/**
 * Netty客户端
 * Created by LiuSaibao on 11/23/2016.
 */
public class NettyClient {

    private static NettyClient nettyClient = new NettyClient();

    private EventLoopGroup group;

    private NettyListener listener;

    private Channel channel;

    private boolean isConnect = false;

    private int reconnectNum = Integer.MAX_VALUE;

    private long reconnectIntervalTime = 5000;

    public static NettyClient getInstance(){
        return nettyClient;
    }

    public synchronized NettyClient connect() {
        if (!isConnect) {
            group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap().group(group)
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .channel()
                    .handler(new NettyClientInitializer(listener));

            try {
                (, Const.TCP_PORT).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (()) {
                            isConnect = true;
                            channel = ();
                        } else {
                            isConnect = false;
                        }
                    }
                }).sync();

            } catch (Exception e) {
                (e, ());
                (NettyListener.STATUS_CONNECT_ERROR);
                reconnect();
            }
        }
        return this;
    }

    public void disconnect() {
        ();
    }

    public void reconnect() {
        if(reconnectNum >0 && !isConnect){
            reconnectNum--;
            try {
                (reconnectIntervalTime);
            } catch (InterruptedException e) {}
            ("重新连接");
            disconnect();
            connect();
        }else{
            disconnect();
        }
    }

    public boolean sendMsgToServer(byte[] data, ChannelFutureListener listener) {
        boolean flag = channel != null && isConnect;
        if (flag) {
            ByteBuf buf = (data);
            (buf).addListener(listener);
        }
        return flag;
    }

    public void setReconnectNum(int reconnectNum) {
        this.reconnectNum = reconnectNum;
    }

    public void setReconnectIntervalTime(long reconnectIntervalTime) {
        this.reconnectIntervalTime = reconnectIntervalTime;
    }

    public boolean getConnectStatus(){
        return isConnect;
    }

    public void setConnectStatus(boolean status){
        this.isConnect = status;
    }

    public void setListener(NettyListener listener) {
        this.listener = listener;
    }
}

NettyClient类是对Netty进行封装,用于建立连接、断开连接、异常断开重连以及向服务器发送消息。

package ;

import ;

/**
 *
 * Created by LiuSaibao on 11/23/2016.
 */
public interface NettyListener {

    public final static byte STATUS_CONNECT_SUCCESS = 1;

    public final static byte STATUS_CONNECT_CLOSED = 0;

    public final static byte STATUS_CONNECT_ERROR = 0;


    /**
     * 对消息的处理
     */
    void onMessageResponse(ByteBuf byteBuf);

    /**
     * 当服务状态发生变化时触发
     */
    public void onServiceStatusConnectChanged(int statusCode);
}

NettyListener类用于监听连接的状态和消息响应。

package ;

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;

/**
 *
 * Created by LiuSaibao on 11/23/2016.
 */
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {

    private NettyListener listener;

    private int WRITE_WAIT_SECONDS = 10;

    private int READ_WAIT_SECONDS = 13;

    public NettyClientInitializer(NettyListener listener) {
        this.listener = listener;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        SslContext sslCtx = ()
                .trustManager().build();

        ChannelPipeline pipeline = ();
        ((()));    // 开启SSL
        (new LoggingHandler());    // 开启日志,可以设置日志等级
//        (new IdleStateHandler(30, 60, 100));
        (new NettyClientHandler(listener));
    }
}

在Netty中,每个Channel被创建的时候都需要被关联一个对应的pipeline(通道),这种关联关系是永久的(整个程序运行的生命周期中)。ChannelPipeline可以理解成一个消息( 或消息事件,ChanelEvent)流转的通道,在这个通道中可以被附上许多用来处理消息的handler,当消息在这个通道中流转的时候,如果有与这个消息类型相对应的handler,就会触发这个handler去执行相应的动作。扩展阅读

package ;

import ;
import ;
import ;

/**
 *
 * Created by LiuSaibao on 11/23/2016.
 */
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private NettyListener listener;
    public NettyClientHandler(NettyListener listener){
        this.listener = listener;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ().setConnectStatus(true);
        (NettyListener.STATUS_CONNECT_SUCCESS);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ().setConnectStatus(false);
        (NettyListener.STATUS_CONNECT_CLOSED);
        ().reconnect();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        (byteBuf);
    }

    /*@Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (() == IdleState.READER_IDLE){
                ();
            }else if (() == IdleState.WRITER_IDLE){
                try{
                    ().writeAndFlush("Chilent-Ping\r\n");
                } catch (Exception e){
                    (());
                }
            }
        }
        (ctx, evt);
    }*/
}

NettyClientHandler类注释掉了Netty提供心跳机制,稍后会在NettyService类中看到实现自定义的心跳。该类用于获取状态发生改变和消息响应。

package ;

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import .;


import ;
import ;

import ;
import ;
import ;

import ;
import ;
import ;
import ;

/**
 *
 * Created by LiuSaibao on 11/17/2016.
 */
public class NettyService extends Service implements NettyListener {

    private NetworkReceiver receiver;
    private static String sessionId = null;

    private ScheduledExecutorService mScheduledExecutorService;
    private void shutdown() {
        if (mScheduledExecutorService != null) {
            ();
            mScheduledExecutorService = null;
        }
    }

    @Override
    public void onCreate() {
        super.onCreate();

        receiver = new NetworkReceiver();
        IntentFilter filter=new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
        (this).registerReceiver(receiver, filter);

        // 自定义心跳,每隔20秒向服务器发送心跳包
        mScheduledExecutorService = ();
        (new Runnable() {
            @Override
            public void run() {
                byte[] requestBody = {(byte) 0xFE, (byte)0xED, (byte)0xFE, 5};
                ().sendMsgToServer(requestBody, new ChannelFutureListener() {    //3
                    @Override
                    public void operationComplete(ChannelFuture future) {
                        if (()) {                //4
                            ("Write heartbeat successful");

                        } else {
                            ("Write heartbeat error");
                            ("heartbeat error");
                        }
                    }
                });
            }
        }, 20, 20, );
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        ().setListener(this);
        connect();
        return START_NOT_STICKY;
    }

    @Override
    public void onServiceStatusConnectChanged(int statusCode) {     //连接状态监听
        ("connect status:%d", statusCode);
        if (statusCode == NettyListener.STATUS_CONNECT_SUCCESS) {
            authenticData();
        } else {
            ("tcp connect error");
        }
    }

    /**
     * 认证数据请求
     */
    private void authenticData() {
        AuthModel auth = new AuthModel();
        (102);
        ("YAMSUCFS8G");
        ("G4-一单元");
        ("51");
        ((int)(() / 1000));
        byte[] content = (auth);
        byte[] requestHeader = (content, 1, 1001);
        byte[] requestBody = (requestHeader, content);
        ().sendMsgToServer(requestBody, new ChannelFutureListener() {    //3
            @Override
            public void operationComplete(ChannelFuture future) {
                if (()) {                //4
                    ("Write auth successful");
                } else {
                    ("Write auth error");
                    ("tcp auth error");
                }
            }
        });
    }

    @Override
    public void onMessageResponse(ByteBuf byteBuf) {
        byte[] bytes = ();
        ("tcp receive data:%s", (bytes));
        // 接收
        if (0xED == (bytes[0])
                && 0xFE == (bytes[1])) {
            if (1 == bytes[2]) {
                int cardinal = (int)ByteUtil.unsigned4BytesToInt(bytes, 5);
                int realLen = cardinal + 9;
                int len = ();
                // 接收到的数据有可能会粘包,只需要判断数据的长度大于或者等于真实的长度即可
                if (len >= realLen) {

                    int word = ((bytes, 3, 2));
                    if (word == 1001) {
                        byte[] data = new byte[cardinal];
                        (bytes, 9, data, 0, );
                        Blowfish blowfish = new Blowfish();
                        String result = new String((data));
                        try {
                            JSONObject jsonObject = new JSONObject(result);
                            sessionId = ("s");
                        } catch (JSONException e) {
                            (e, ());
                        }
                    } else if (word == 2002) {
                        byte[] data = new byte[cardinal];
                        (bytes, 9, data, 0, );
                        Blowfish blowfish = new Blowfish();
                        String result = new String((data));
                        (result);
                        try {
                            JSONObject jsonObject = new JSONObject(result);
                            handle(word, ("i"), ("r"));
                        } catch (JSONException e) {
                            (e, ());
                        }
                    } else {
                        String log = "undefined request type";
                        (log);
                        (log);
                    }
                } else {
                    String log = ("request byte array content length inequality, realLen=%d, len=%d", realLen, len);
                    (log);
                    (log);
                }
            } else if (5 == bytes[2]) {
                ("heartbeat");
            }

            // 响应
        } else if (0xFE == (bytes[0])
                && 0xED == (bytes[1])
                && 0xFE == (bytes[2])) {
            if (1 == bytes[3]) {
                // 忽略bytes[4],bytes[5]。作用是接口升级
                int cardinal = (int)ByteUtil.unsigned4BytesToInt(bytes, 8);
                int len = ();
                // 前12个字节是请求头,后4个字节是校验值
                int realLen = cardinal + 12 + 4;
                // 返回的数据有可能会粘包,只需要判断数据的长度大于或者等于真实的长度即可
                if (len >= realLen) {
                    int word = ((bytes, 6, 2));
                    if (word == 2001) {
                        byte[] data = new byte[cardinal];
                        (bytes, 12, data, 0, );
                        byte[] crc32 = new byte[4];
                        (bytes, realLen - 4, crc32, 0, );

                        // 对内容进行CRC校验
                        if (CRC32Util.getCRC32Long(data) == ByteUtil.unsigned4BytesToInt(crc32, 0)) {
                            Blowfish blowfish = new Blowfish();
                            String result = new String((data));
                            try {
                                JSONObject jsonObject = new JSONObject(result);
                                int i = ("i");
                                if (sessionId == null) {
                                    ("sessionId is null");
                                    authenticData();
                                    handle(word, i, 0);
                                    return;
                                }
                                byte[] session = ();
                                byte[] sign = "WiseUC@2016".getBytes();
                                byte[] content = new byte[ + ];
                                (session, 0, content, 0, );
                                (sign, 0, content, , );

                                // 对Session ID进行CRC校验
                                if (("c") == CRC32Util.getCRC32(content)) {
                                    handle(word, i, 1);
                                } else {
                                    String log = "open the door session id crc32 verification failure";
                                    (log);
                                    (log);
                                }
                            } catch (JSONException e) {
                                (e, ());
                            }
                        } else {
                            String log = "open the door crc32 data verification failure";
                            (log);
                            (log);
                        }
                    } else {
                        String log = "undefined response type";
                        (log);
                        (log);
                    }
                } else {
                    String log = ("response byte array content length inequality, realLen=%d, len=%d", realLen, len);
                    (log);
                    (log);
                }
            } else if (5 == bytes[3]) {
                ("heartbeat");
            }
        } else {
            ("unknown");
            ("unknown");
        }
    }

    private void handle(int t, int i, int f) {
        // TODO 实现自己的业务逻辑
    }

    private void connect(){
        if (!().getConnectStatus()) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    ().connect();//连接服务器
                }
            }).start();
        }
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        (this).unregisterReceiver(receiver);
        shutdown();
        ().setReconnectNum(0);
        ().disconnect();
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public class NetworkReceiver extends BroadcastReceiver {
        @Override
        public void onReceive(Context context, Intent intent) {
            ConnectivityManager cm = (ConnectivityManager) (Context.CONNECTIVITY_SERVICE);
            NetworkInfo activeNetwork = ();
            if (activeNetwork != null) { // connected to the internet
                if (() == ConnectivityManager.TYPE_WIFI
                        || () == ConnectivityManager.TYPE_MOBILE) {
                    connect();
                }
            }
        }
    }
}

NettyService类实现了业务的处理逻辑。

基本数据类型对应二进制的位数

基本数据类型 二进制位数
boolean 1
byte 8
char 16
short 16
int 32
long 64
float 32
double 64

这张表大家都不会陌生,对于网络编程这块,都是对字节数组操作。例如:截取字节数组;将多个字节数组拼接到一个新的字节数组中传输。Java中的基本数据类型是没有无符号的,需要自己去封装,基本数据类型之间的转换就需要看这张表,将byte[]转换成short,int,long等其他类型是有区别,看对应二进制位数。

网络编程会出现粘包的情况,可以和服务端约定数据长度来解决粘包问题,示例就是使用这种方式解决粘包问题的。

示例源代码