Android Netty开发示例
本文涉及到其它的基础知识有:
- 对数据的字节处理
- 对数据进行CRC32校验
- 对数据的内容进行Blowfish加/解密
这部分内容不会在本文深入探究,在“代码片段讲解”部分会提到一二。
Netty是什么
Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。扩展阅读
代码片段讲解
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* Netty客户端
* Created by LiuSaibao on 11/23/2016.
*/
public class NettyClient {
private static NettyClient nettyClient = new NettyClient();
private EventLoopGroup group;
private NettyListener listener;
private Channel channel;
private boolean isConnect = false;
private int reconnectNum = Integer.MAX_VALUE;
private long reconnectIntervalTime = 5000;
public static NettyClient getInstance(){
return nettyClient;
}
public synchronized NettyClient connect() {
if (!isConnect) {
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap().group(group)
.option(ChannelOption.SO_KEEPALIVE,true)
.channel()
.handler(new NettyClientInitializer(listener));
try {
(, Const.TCP_PORT).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (()) {
isConnect = true;
channel = ();
} else {
isConnect = false;
}
}
}).sync();
} catch (Exception e) {
(e, ());
(NettyListener.STATUS_CONNECT_ERROR);
reconnect();
}
}
return this;
}
public void disconnect() {
();
}
public void reconnect() {
if(reconnectNum >0 && !isConnect){
reconnectNum--;
try {
(reconnectIntervalTime);
} catch (InterruptedException e) {}
("重新连接");
disconnect();
connect();
}else{
disconnect();
}
}
public boolean sendMsgToServer(byte[] data, ChannelFutureListener listener) {
boolean flag = channel != null && isConnect;
if (flag) {
ByteBuf buf = (data);
(buf).addListener(listener);
}
return flag;
}
public void setReconnectNum(int reconnectNum) {
this.reconnectNum = reconnectNum;
}
public void setReconnectIntervalTime(long reconnectIntervalTime) {
this.reconnectIntervalTime = reconnectIntervalTime;
}
public boolean getConnectStatus(){
return isConnect;
}
public void setConnectStatus(boolean status){
this.isConnect = status;
}
public void setListener(NettyListener listener) {
this.listener = listener;
}
}
NettyClient类是对Netty进行封装,用于建立连接、断开连接、异常断开重连以及向服务器发送消息。
package ;
import ;
/**
*
* Created by LiuSaibao on 11/23/2016.
*/
public interface NettyListener {
public final static byte STATUS_CONNECT_SUCCESS = 1;
public final static byte STATUS_CONNECT_CLOSED = 0;
public final static byte STATUS_CONNECT_ERROR = 0;
/**
* 对消息的处理
*/
void onMessageResponse(ByteBuf byteBuf);
/**
* 当服务状态发生变化时触发
*/
public void onServiceStatusConnectChanged(int statusCode);
}
NettyListener类用于监听连接的状态和消息响应。
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
*
* Created by LiuSaibao on 11/23/2016.
*/
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
private NettyListener listener;
private int WRITE_WAIT_SECONDS = 10;
private int READ_WAIT_SECONDS = 13;
public NettyClientInitializer(NettyListener listener) {
this.listener = listener;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
SslContext sslCtx = ()
.trustManager().build();
ChannelPipeline pipeline = ();
((())); // 开启SSL
(new LoggingHandler()); // 开启日志,可以设置日志等级
// (new IdleStateHandler(30, 60, 100));
(new NettyClientHandler(listener));
}
}
在Netty中,每个Channel被创建的时候都需要被关联一个对应的pipeline(通道),这种关联关系是永久的(整个程序运行的生命周期中)。ChannelPipeline可以理解成一个消息( 或消息事件,ChanelEvent)流转的通道,在这个通道中可以被附上许多用来处理消息的handler,当消息在这个通道中流转的时候,如果有与这个消息类型相对应的handler,就会触发这个handler去执行相应的动作。扩展阅读
package ;
import ;
import ;
import ;
/**
*
* Created by LiuSaibao on 11/23/2016.
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private NettyListener listener;
public NettyClientHandler(NettyListener listener){
this.listener = listener;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
().setConnectStatus(true);
(NettyListener.STATUS_CONNECT_SUCCESS);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
().setConnectStatus(false);
(NettyListener.STATUS_CONNECT_CLOSED);
().reconnect();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
(byteBuf);
}
/*@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (() == IdleState.READER_IDLE){
();
}else if (() == IdleState.WRITER_IDLE){
try{
().writeAndFlush("Chilent-Ping\r\n");
} catch (Exception e){
(());
}
}
}
(ctx, evt);
}*/
}
NettyClientHandler类注释掉了Netty提供心跳机制,稍后会在NettyService类中看到实现自定义的心跳。该类用于获取状态发生改变和消息响应。
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import .;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
*
* Created by LiuSaibao on 11/17/2016.
*/
public class NettyService extends Service implements NettyListener {
private NetworkReceiver receiver;
private static String sessionId = null;
private ScheduledExecutorService mScheduledExecutorService;
private void shutdown() {
if (mScheduledExecutorService != null) {
();
mScheduledExecutorService = null;
}
}
@Override
public void onCreate() {
super.onCreate();
receiver = new NetworkReceiver();
IntentFilter filter=new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
(this).registerReceiver(receiver, filter);
// 自定义心跳,每隔20秒向服务器发送心跳包
mScheduledExecutorService = ();
(new Runnable() {
@Override
public void run() {
byte[] requestBody = {(byte) 0xFE, (byte)0xED, (byte)0xFE, 5};
().sendMsgToServer(requestBody, new ChannelFutureListener() { //3
@Override
public void operationComplete(ChannelFuture future) {
if (()) { //4
("Write heartbeat successful");
} else {
("Write heartbeat error");
("heartbeat error");
}
}
});
}
}, 20, 20, );
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
().setListener(this);
connect();
return START_NOT_STICKY;
}
@Override
public void onServiceStatusConnectChanged(int statusCode) { //连接状态监听
("connect status:%d", statusCode);
if (statusCode == NettyListener.STATUS_CONNECT_SUCCESS) {
authenticData();
} else {
("tcp connect error");
}
}
/**
* 认证数据请求
*/
private void authenticData() {
AuthModel auth = new AuthModel();
(102);
("YAMSUCFS8G");
("G4-一单元");
("51");
((int)(() / 1000));
byte[] content = (auth);
byte[] requestHeader = (content, 1, 1001);
byte[] requestBody = (requestHeader, content);
().sendMsgToServer(requestBody, new ChannelFutureListener() { //3
@Override
public void operationComplete(ChannelFuture future) {
if (()) { //4
("Write auth successful");
} else {
("Write auth error");
("tcp auth error");
}
}
});
}
@Override
public void onMessageResponse(ByteBuf byteBuf) {
byte[] bytes = ();
("tcp receive data:%s", (bytes));
// 接收
if (0xED == (bytes[0])
&& 0xFE == (bytes[1])) {
if (1 == bytes[2]) {
int cardinal = (int)ByteUtil.unsigned4BytesToInt(bytes, 5);
int realLen = cardinal + 9;
int len = ();
// 接收到的数据有可能会粘包,只需要判断数据的长度大于或者等于真实的长度即可
if (len >= realLen) {
int word = ((bytes, 3, 2));
if (word == 1001) {
byte[] data = new byte[cardinal];
(bytes, 9, data, 0, );
Blowfish blowfish = new Blowfish();
String result = new String((data));
try {
JSONObject jsonObject = new JSONObject(result);
sessionId = ("s");
} catch (JSONException e) {
(e, ());
}
} else if (word == 2002) {
byte[] data = new byte[cardinal];
(bytes, 9, data, 0, );
Blowfish blowfish = new Blowfish();
String result = new String((data));
(result);
try {
JSONObject jsonObject = new JSONObject(result);
handle(word, ("i"), ("r"));
} catch (JSONException e) {
(e, ());
}
} else {
String log = "undefined request type";
(log);
(log);
}
} else {
String log = ("request byte array content length inequality, realLen=%d, len=%d", realLen, len);
(log);
(log);
}
} else if (5 == bytes[2]) {
("heartbeat");
}
// 响应
} else if (0xFE == (bytes[0])
&& 0xED == (bytes[1])
&& 0xFE == (bytes[2])) {
if (1 == bytes[3]) {
// 忽略bytes[4],bytes[5]。作用是接口升级
int cardinal = (int)ByteUtil.unsigned4BytesToInt(bytes, 8);
int len = ();
// 前12个字节是请求头,后4个字节是校验值
int realLen = cardinal + 12 + 4;
// 返回的数据有可能会粘包,只需要判断数据的长度大于或者等于真实的长度即可
if (len >= realLen) {
int word = ((bytes, 6, 2));
if (word == 2001) {
byte[] data = new byte[cardinal];
(bytes, 12, data, 0, );
byte[] crc32 = new byte[4];
(bytes, realLen - 4, crc32, 0, );
// 对内容进行CRC校验
if (CRC32Util.getCRC32Long(data) == ByteUtil.unsigned4BytesToInt(crc32, 0)) {
Blowfish blowfish = new Blowfish();
String result = new String((data));
try {
JSONObject jsonObject = new JSONObject(result);
int i = ("i");
if (sessionId == null) {
("sessionId is null");
authenticData();
handle(word, i, 0);
return;
}
byte[] session = ();
byte[] sign = "WiseUC@2016".getBytes();
byte[] content = new byte[ + ];
(session, 0, content, 0, );
(sign, 0, content, , );
// 对Session ID进行CRC校验
if (("c") == CRC32Util.getCRC32(content)) {
handle(word, i, 1);
} else {
String log = "open the door session id crc32 verification failure";
(log);
(log);
}
} catch (JSONException e) {
(e, ());
}
} else {
String log = "open the door crc32 data verification failure";
(log);
(log);
}
} else {
String log = "undefined response type";
(log);
(log);
}
} else {
String log = ("response byte array content length inequality, realLen=%d, len=%d", realLen, len);
(log);
(log);
}
} else if (5 == bytes[3]) {
("heartbeat");
}
} else {
("unknown");
("unknown");
}
}
private void handle(int t, int i, int f) {
// TODO 实现自己的业务逻辑
}
private void connect(){
if (!().getConnectStatus()) {
new Thread(new Runnable() {
@Override
public void run() {
().connect();//连接服务器
}
}).start();
}
}
@Override
public void onDestroy() {
super.onDestroy();
(this).unregisterReceiver(receiver);
shutdown();
().setReconnectNum(0);
().disconnect();
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
throw new UnsupportedOperationException("Not yet implemented");
}
public class NetworkReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
ConnectivityManager cm = (ConnectivityManager) (Context.CONNECTIVITY_SERVICE);
NetworkInfo activeNetwork = ();
if (activeNetwork != null) { // connected to the internet
if (() == ConnectivityManager.TYPE_WIFI
|| () == ConnectivityManager.TYPE_MOBILE) {
connect();
}
}
}
}
}
NettyService类实现了业务的处理逻辑。
基本数据类型对应二进制的位数
基本数据类型 | 二进制位数 |
---|---|
boolean | 1 |
byte | 8 |
char | 16 |
short | 16 |
int | 32 |
long | 64 |
float | 32 |
double | 64 |
这张表大家都不会陌生,对于网络编程这块,都是对字节数组操作。例如:截取字节数组;将多个字节数组拼接到一个新的字节数组中传输。Java中的基本数据类型是没有无符号的,需要自己去封装,基本数据类型之间的转换就需要看这张表,将byte[]转换成short,int,long等其他类型是有区别,看对应二进制位数。
网络编程会出现粘包的情况,可以和服务端约定数据长度来解决粘包问题,示例就是使用这种方式解决粘包问题的。
示例源代码