导入netty依赖
implementation 'io.netty:netty-all:4.1.107.Final'
使用netty
关闭netty
/**
* 关闭
*/
private void closeSocket() {
LogUtils.i(TAG, "closeSocket");
if (nettyManager != null) {
nettyManager.close();
nettyManager = null;
}
if (nettyExecutor != null) {
try {
nettyExecutor.shutdown();
nettyExecutor.shutdownNow();
} catch (Exception e) {
}
}
}
创建netty
private void initSocket() {
closeSocket(); //关闭之前的连接
nettyManager = new NettyManager(ip, port, dataTimeOut, readTimeOut, whiteTimeOut);
nettyExecutor = Executors.newSingleThreadExecutor();
nettyExecutor.execute(new Runnable() {
@Override
public void run() {
nettyManager.connect(); //连接
}
});
nettyManager.setOnNettyListener(new NettyManager.OnNettyListener() {
@Override
public void onConnectSuccess() {
LogUtils.i(TAG, "onConnectSuccess");
}
@Override
public void onConnectFail(int connectFailNum) {
LogUtils.i(TAG, "onConnectFail >>" + connectFailNum);
if (connectFailNum >= 5) { //重连次数达到阈值 则重设nett的ip
resetNeetyIpAndConnect();
}
}
@Override
public void onReceiveData(byte[] bytes) {
LogUtils.i(TAG, "onReceiveData");
parseReceiveData(bytes);
}
@Override
public void onNeedReCreate() {
initSocket();
}
});
}
/**
* 重新设置ip地址 断开重设ip不需要调用 nettyManager.reConnect(); 因为会自动重连
*/
private void resetNeetyIpAndConnect() {
ip = (String) MmkvUtils.getInstance().decode(Content.SP_KEY_SOCKET_TALK_IP, "");
port = (int) MmkvUtils.getInstance().decode(Content.SP_KEY_SOCKET_TALK_PORT, 0);
LogUtils.i(TAG, "resetNeetyIpAndConnect>>ip:" + ip + " port:" + port);
nettyManager.setIp(ip);
nettyManager.setPort(port);
}
/**
连接中时 重设ip 需要调用nettyManager.reConnect();
*/
private void resetIp(){
String ip = (String) MmkvUtils.getInstance().decode(Content.SP_KEY_SOCKET_TALK_IP, "");
int port = (int) MmkvUtils.getInstance().decode(Content.SP_KEY_SOCKET_TALK_PORT, 0);
nettyManager.setIp(ip);
nettyManager.setPort(port);
nettyManager.reConnect(); //重连ip}
}
有时候不需要netty进行数据粘包处理的情况,直接返回原始响应数据则使用 具体参数看完整代码
nettyManager = new NettyManager(ip, port, dataTimeOut, readTimeOut, whiteTimeOut, 0, 0, 0, 0);
完整代码
package com.baolan.netty;
import android.util.Log;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.internal.StringUtil;
public class NettyManager {
private static final String TAG = "bl_netty";
private ChannelFuture channelFuture;
private String ip;
private int port;
private int dataTimeOut; //收发超时
private int readTimeOut; // 读超时时间
private int whiteTimeOut; // 写超时时间
private ChannelFutureListener channelFutureListener;
private NioEventLoopGroup nioEventLoopGroup;
private Bootstrap bootstrap;
private int connectFailNum = 0; //重连次数
private int lengthFieldOffset = 3; //长度域偏移。就是说数据开始的几个字节可能不是表示数据长度,需要后移几个字节才是长度域
private int lengthFieldLength = 2; //长度域字节数。用几个字节来表示数据长度。
private int lengthAdjustment = 1; //数据长度修正。因为长度域指定的长度可以使header+body的整个长度,也可以只是body的长度。如果表示header+body的整个长度,那么我们需要修正数据长度。
private int initialBytesToStrip = 0; //跳过的字节数。如果你需要接收header+body的所有数据,此值就是0,如果你只想接收body数据,那么需要跳过header所占用的字节数。
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public NettyManager(String ip, int port, int dataTimeOut, int readTimeOut, int whiteTimeOut) {
this(ip, port, dataTimeOut, readTimeOut, whiteTimeOut, 3, 2, 1, 0);
}
/**
* @param ip 连接的地址
* @param port 连接的端口
* @param dataTimeOut 收发超时
* @param readTimeOut 读超时时间
* @param whiteTimeOut 写超时时间
*/
public NettyManager(String ip, int port, int dataTimeOut, int readTimeOut, int whiteTimeOut, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
this.ip = ip;
this.port = port;
this.dataTimeOut = dataTimeOut;
this.readTimeOut = readTimeOut;
this.whiteTimeOut = whiteTimeOut;
this.lengthFieldOffset = lengthFieldOffset;
this.lengthFieldLength = lengthFieldLength;
this.lengthAdjustment = lengthAdjustment;
this.initialBytesToStrip = initialBytesToStrip;
Log.i(TAG, "create ip>>" + ip);
Log.i(TAG, "create port>>" + port);
Log.i(TAG, "create dataTimeOut>>" + dataTimeOut);
Log.i(TAG, "create readTimeOut>>" + readTimeOut);
Log.i(TAG, "create whiteTimeOut>>" + whiteTimeOut);
//进行初始化
//初始化线程组
nioEventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class).group(nioEventLoopGroup);
bootstrap.option(ChannelOption.TCP_NODELAY, true); //无阻塞
bootstrap.option(ChannelOption.SO_KEEPALIVE, true); //长连接
bootstrap.option(ChannelOption.SO_TIMEOUT, dataTimeOut); //收发超时
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); //收发超时
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (lengthFieldOffset > 0) { //为0时 不处理应答数据
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65530, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip));
}
pipeline.addLast("decoder", new ByteArrayDecoder()) //接收解码方式
.addLast("encoder", new ByteArrayEncoder()) //发送编码方式
.addLast(new ChannelHandle(NettyManager.this))//处理数据接收
.addLast(new IdleStateHandler(readTimeOut, whiteTimeOut, 0)); //心跳 参数1:读超时时间 参数2:写超时时间 参数3: 将在未执行读取或写入时触发超时回调,0代表不处理;读超时尽量设置大于写超时代表多次写超时时写心跳包,多次写了心跳数据仍然读超时代表当前连接错误,即可断开连接重新连接
}
});
}
private void create() {
if (StringUtil.isNullOrEmpty(ip) || port == 0 || dataTimeOut == 0 || readTimeOut == 0 || whiteTimeOut == 0) {
//TODO 设置回调通知service 重连
if (onNettyListener != null) {
onNettyListener.onNeedReCreate();
}
return;
}
if (channelFuture != null && channelFuture.channel().isActive()) {
return;
}
//开始建立连接并监听返回
try {
channelFuture = bootstrap.connect(new InetSocketAddress(ip, port));
channelFuture.addListener(channelFutureListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
boolean success = future.isSuccess();
Log.i(TAG, "connect success>>" + success);
if (success) {
connectFailNum = 0;
Log.i(TAG, "connect success !");
if (onNettyListener != null) {
onNettyListener.onConnectSuccess();
}
} else { //失败
connectFailNum++;
future.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
reConnect();
}
}, 5, TimeUnit.SECONDS);
if (onNettyListener != null) {
onNettyListener.onConnectFail(connectFailNum);
}
}
}
}).sync();
} catch (Exception e) {
Log.i(TAG, "e>>" + e);
e.printStackTrace();
}
}
/**
* 发送数据
*
* @param sendBytes
*/
public void sendData(byte[] sendBytes) {
if (channelFuture == null) {
return;
}
Log.i(TAG, "sendDataToServer");
if (sendBytes != null && sendBytes.length > 0) {
if (channelFuture != null && channelFuture.channel().isActive()) {
Log.i(TAG, "writeAndFlush");
channelFuture.channel().writeAndFlush(sendBytes);
}
}
}
public void receiveData(byte[] byteBuf) {
Log.i(TAG, "receiveData>>" + byteBuf.toString());
if (onNettyListener != null && byteBuf.length > 0) {
onNettyListener.onReceiveData(byteBuf);
}
}
public void connect() {
Log.i(TAG, "connect ");
if (channelFuture != null && channelFuture.channel() != null && channelFuture.channel().isActive()) {
channelFuture.channel().close();//已经连接时先关闭当前连接,关闭时回调exceptionCaught进行重新连接
} else {
create(); //当前未连接,直接连接即可
}
}
public void reConnect() {
Log.i(TAG, "reConnect");
if (StringUtil.isNullOrEmpty(ip) || port == 0 || dataTimeOut == 0 || readTimeOut == 0 || whiteTimeOut == 0) {
//TODO 设置回调通知service 重连
if (onNettyListener != null) {
onNettyListener.onNeedReCreate();
}
return;
}
connect(); //当前未连接,直接连接即可
}
public void close() {
if (channelFuture != null && channelFutureListener != null) {
channelFuture.removeListener(channelFutureListener);
channelFuture.cancel(true);
}
}
private OnNettyListener onNettyListener;
public void setOnNettyListener(OnNettyListener onNettyListener) {
this.onNettyListener = onNettyListener;
}
public interface OnNettyListener {
/**
* 连接成功
*/
void onConnectSuccess();
/**
* 连接失败
*/
void onConnectFail(int connectFailNum);
/**
* 接收到数据
*
* @param bytes
*/
void onReceiveData(byte[] bytes);
/**
* 参数丢失 需重新创建
*/
void onNeedReCreate();
}
}
package com.baolan.netty;
import android.util.Log;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
public class ChannelHandle extends SimpleChannelInboundHandler<ByteBuf> {
private static final String TAG = "bl_netty";
private NettyManager nettyManager;
public ChannelHandle(NettyManager nettyManager) {
this.nettyManager = nettyManager;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
//成功
Log.i(TAG,"channelActive");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
//连接失败
Log.i(TAG,"channelInactive 连接失败");
if (nettyManager != null) {
nettyManager.reConnect(); //重新连接
}
}
/**
* 心跳检测 当设置时间没有收到事件 会调用
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state().equals(IdleState.WRITER_IDLE)) { //写超时,此时可以发送心跳数据给服务器
Log.i(TAG, "userEventTriggered write idle");
if (nettyManager == null){
return;
}
}else if (idleStateEvent.state().equals(IdleState.READER_IDLE)){ //读超时,此时代表没有收到心跳返回可以关闭当前连接进行重连
Log.i(TAG, "userEventTriggered read idle");
ctx.channel().close();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
Log.i(TAG, "exceptionCaught");
cause.printStackTrace();
ctx.close();
if (nettyManager != null) {
nettyManager.reConnect(); //重新连接
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
Log.i(TAG, "channelRead0");
if (nettyManager == null){
return;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
Log.i(TAG, "channelRead >>"+msg.getClass());
if (nettyManager == null){
return;
}
if(msg instanceof byte[]){
nettyManager.receiveData((byte[]) msg); //收到数据处理
}
}
}