0、前言
大多公司都会有自己的一套私有协议,可能只在自己公司内部使用的协议。
使用Netty实现的私有协议可以用于内部各模块之间的通信,基于TCP/IP协议栈,异步NIO框架,提供高性能、异步化的通信能力。
一、Netty私有协议栈
1、功能概述
使用Netty实现的私有协议栈主要功能有:
- 握手请求:双方发送握手请求和握手应答信息;
- 接入认证机制:设置提供IP地址的白名单接入认证机制,不在白名单内拒绝接入;
- 消息合法性校验:对接收到的消息进行合法性校验:是否充分登录、IP地址是否合法;
- 消息编解码:提供消息编解码框架,实现对象序列化和凡序列化;
- 可靠性设计:提供心跳机制、重连机制等;
2、通信模型
(1)客户端发送握手请求消息,携带节点id信息;
(2)服务端对接收到的握手请求消息进行验证,包括节点id有效性、重复性登录、IP地址合法性校验等,验证通过后返回登录成功握手应答消息;
(3)链路建立后,客户端发送业务消息;
(4)服务端发送心跳消息;
(5)客户端发送心跳消息;
(6)服务端发送业务消息;
(7)服务端退出关闭连接,发送关闭连接消息给客户端,客户端关闭。
注:
Netty协议栈服务端和客户端通信链路建立后,
(1)全双工通信:服务端和客户端之间的通信方式为全双工通信,客户端可以发送消息给服务端,服务端也可以主动发送消息给客户端;
(2)心跳机制Ping-Pong:当链路处于空闲状态时,即客户端隔了一段时间未发送消息,则客户端每隔固定时间(一般为一分钟)发送Ping消息给服务端,服务端接收到Ping消息后立马返回Pong响应给客户端;
(3)重连机制:如果客户端在N个周期内未收到Pong消息,则主动关闭。间隔周期T后发起重连操作,直至连上服务器。
3、消息定义
Netty协议栈消息分为消息头和消息体两部分。
(1)消息头字段定义-Header
- validateCode: int,长度32,Netty消息验证码,由固定值(0xABEF,表明是Netty协议消息,2字节)+主版本号(1255,1个字节)+次版本号(1255,1个字节)三部分组成;
- length:int,长度32,代表整个消息长度,包括消息头和消息体;
- sessionId:long,长度64,节点id,全局唯一;
- type: Byte,长度8,代表业务类型。0-业务请求消息;1-业务响应消息;2-业务ONE WAY消息,既是请求也是响应;3-握手请求;4-握手应答;5-心跳请求;6-心跳应答;
- priority:Byte, 长度8,消息优先级;0~255;
- attachmentInfo:Map<String,Object>,可选字段,可以用来在消息头中存放扩展信息。
(2)消息体:Object
存放消息实际内容。
4、消息编解码
Netty协议栈编码
(1)Header编码:对于可选字段,使用ByteBuffer.putXXX方法进行编码,写入入ByteBuffer缓冲区;对于 attachment,首先获取key,对key进行编码;再对value序列化成二进制数组后放入入ByteBuffer缓冲区。
(2)body编码:将body内容序列化成byte数组后,写入ByteBuffer缓冲区。
(3)全部字段编码完成之后,需要更新消息头中的length字段对于的长度(只有所有字段都编码完成之后才知道整个消息长度),并重新写入ByteBuffer缓冲区。
Netty协议栈解码
(1)Header解码:对于可选字段,使用ByteBuffer.getXXX方法获取所需字段;
对于attachment字段,首先创建一个attachment对象,调用用ByteBuffer.getXXX获取扩展信息长度,如果为0无需解码;否则遍历循环进行解码。
(2)body解码:通过第三方框架进行解码;
5、消息链路建立
使用Netty协议栈的应用程序无需区分客户端或服务 端,任意一个节点,既可以是客户端,也可以是服务端提供服务。
链路建立由客户端主动发握手请求请求,需要通过一些认证机制,比如IP地址的验证、号段的黑白名单安全认证机制等。
握手请求消息定义:
- 消息头type字段值为3,代表握手请求消息;
- 扩展信息attachment为空;
- 消息体body为空;
- 握手消息长度为22个字节;
服务端接收到客户端发送的握手请求消息后,进行接入验证,验证通过后发送握手应答消息。
握手应答消息定义:
- 消息头type字段值为4,代表握手应答消息;
- 扩展信息attachment为空;
- 消息体body返回握手请求的结果,0-认证成功,1-认证失败。
6、消息链路关闭
客户端和服务端采用长链接进行通信,通过心跳机制和业务消息维持链路的通信,正常情况下无需关闭。
在以下情况下,客户端和服务端需要关闭链接:
- 服务发生宕机或者重启,主动关闭链路;
- 业务消息、心跳消息读写过程发生I/O异常,主动关闭链路;
- 心跳超时,主动关闭链路;
- 编码异常,主动关闭链路。
7、可靠性设计
在实际场景下,由于网络超时、闪断、对方进程僵死、处理缓慢等原因,需要Netty协议栈提供可靠性设计来保证程序的正常运行。
(1)心跳机制
正常情况下,客户端服务端通过发送业务消息保持链路建立,一旦服务端有异常也能立即发现问题;但是在网络空闲状态下(如凌晨或者业务低峰时期,可能几个小时内都没有业务请求),如果在这段时间内服务端宕机,在白天业务高峰期到来时,大量请求发送到服务端,服务端由于宕机无法处理消息,会发生大量的请求超时,严重会使客户端也进程资源耗尽。
为了在空闲状态下检测链路的互通性,可以使用心跳机制来检测,一旦发现网络故障,立即关闭链路,周期T后再主动重连。
设计思路如下:
- 客户端在空闲状态下,持续几个周期T内未未发送业务消息或者接收到服务端消息,就发送一条Ping消息给服务端;
- 在下一个周期T到来时未收到服务端发送的Pong消息获取业务消息,心跳失败次数计数器+1;
- 如果任意时刻接收到了服务端发送的Pong消息或者业务消息,就将心跳失败计数器清0;当心跳失败计数器次数达到阈值后,主动关闭链路;
- 服务端在周期T内未接收到任何消息,心跳失败计数器次数+1;任意时刻接收到客户端消息,则将心跳失败计数器次数清零;
- 服务端心跳失败计数器次数达到阈值后,关闭链路,释放资源,等待客户端重连。
通过Ping-Pong双向心跳机制,可以保证无论通信哪一方出现网络故障,都能被及时发现。注意,必须在心跳失败计数器次数达到阈值才关闭链路,防止因为对方时间内繁忙没有及时返回应答造成的误判。
(2)重连机制
如果链路中断,需要等待INTERVAL时间后,客户端再发起重连操作,如果重连失败,间隔周期INTERVAL后再发起重连,保证服务端有充足时间释放句柄资源。
重连失败后,需要打印异常堆栈信息,方便后续问题定位。
(3)重复登录保护
当客户端握手成功后,在链路处于正常状态下,不允许客户端重复登录,防止客户端异常状态下反复重连导致句柄资源被耗尽。
(1)服务端接收到客户端的握手请求消息后,首先检验IP地址合法性,然后在缓存的地址表中查看客户端是否已经登录,如果已经登录则拒绝重复登录,返回错误码-1,同时关闭TCP链路,并在服务端的日志中打印握手失败的原因。
(2)客户端接收到握手失败的应答消息之后,关闭客户端的TCP链接,等待INTERVAL时间之后再发起TCP连接,直到认证成功。
(3)当服务端由于N次心跳超时主动关闭链路后,清空客户端的地址缓存信息,保证后续客户端可以重连成功,防止被重复登录保护机制拒绝掉。
(4)消息缓存重发
在客户端或者服务端发生链路中断后,在链路恢复之前,消息需要缓存在消息队列中,且不能丢失;等待链路恢复之后,重新发送这些消息,保证链路中断期间消息不丢失。
二、Netty私有协议栈实战
与《Netty权威指南》书上有区别的地方:
(1)需要引入>jboss-marshalling和jboss-marshalling-serial两个jar包;
(2)MarshallingDecoder、MarshallingEncoder中的方法变成protected,所以需要我们用两个子类NettyMarshallingDecoder和NettyMarshallingEncoder去继承并实现decode和encode方法,该子类通过工厂类MarshallingCodeCFactory生成。
(3)自定义解码器继承基于长度的消息帧解码器LengthFieldBasedFrameDecoder, 支持粘包、拆包处理。
之前定义的消息结构由消息头(4字节validateCode + 4字节 length + …) + 消息体(body)组成,所以在解码时:
lengthFieldLength(消息长度length字段)的值为4;
lengthFieldOffset的值为4,表示需要跳过前4字节validateCode后读取的值即为length长度;
ch.pipeline().addLast("decoder", new NettyMessageDecoder(1024 * 1024, 4, 4))
实际运行时报错:
An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(4) + length(8) exceeds writerIndex(8): PooledSlicedByteBuf(ridx: 4, widx: 8, cap: 8/8, unwrapped: PooledUnsafeDirectByteBuf(ridx: 8, widx: 22, cap: 1024))
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:442)
从.IndexOutOfBoundsException: readerIndex(4) + length(8) exceeds writerIndex(8):
这行报错信息可以看出读取时会将lengthFieldOffset和lengthFieldLength之后相加,
而lengthFieldEndOffset = 4 + 4 = 8,所以要将lengthAdjustment设置为 -lengthFieldEndOffset。
lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
所以解码器初始化参数需要设置为:
ch.pipeline().addLast("decoder", new NettyMessageDecoder(1024 * 1024, 4, 4, -8, 0))
0、jar包
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>2.0.3.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.3.Final</version>
</dependency>
1、数据结构定义:消息体
package com.wgs.netty.demo6_netty4private.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Created by wanggenshen
* Date: on 2019/8/18 10:49.
* Description: Netty协议栈消息数据结构定义
*/
Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public final class NettyMessage {
/**
* 消息头
*/
private NettyMessageHeader header;
/**
* 消息体
*/
private Object body;
}
package com.wgs.netty.demo6_netty4private.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
/**
* Created by wanggenshen
* Date: on 2019/8/18 11:39.
* Description: 消息头
*/
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public final class NettyMessageHeader {
/**
* 验证码, 由固定值0xABEF + 主版本号 + 次版本号组成
*/
private Integer validateCode = 0xabef0101;
/**
* 节点id, 全局唯一
*/
private Long sessionId;
/**
* 消息类型
*
* @see com.wgs.netty.demo6_netty4private.enums.NettyMessageTypeEnum
*/
private Byte type;
/**
* 消息优先级
*/
private Byte priority;
/**
* 消息长度, 包括消息头和消息体
*/
private Integer length;
/**
* 扩展信息
*/
private Map<String, Object> attachmentInfo = new HashMap<>();
}
2、编解码
编码类
package com.wgs.netty.demo6_netty4private.coder;
import com.wgs.netty.demo6_netty4private.entity.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import java.util.Map;
/**
* Created by wanggenshen
* Date: on 2019/8/18 13:56.
* Description: Netty消息编码类
*/
public final class NettyMessageEncoder extends MessageToMessageEncoder<NettyMessage> {
/**
* jBoss提供的编码类
*/
private NettyMarshallingEncoder nettyMarshallingEncoder;
/**
* 初始化marshallingEncoder
*/
public NettyMessageEncoder() {
this.nettyMarshallingEncoder = MarshallingCodeCFactory.buildMarshallingEncoder();
System.out.println("NettyMessageEncoder init success");
}
@Override
protected void encode(ChannelHandlerContext ctx, NettyMessage msg, List<Object> out) throws Exception {
if (msg == null || msg.getHeader() == null) {
throw new Exception("The encode message is nul");
}
// 分配缓冲区
ByteBuf outBuf = Unpooled.buffer();
// 读取消息头字段, 写入缓冲区
outBuf.writeInt((int) getBufDefaultVal(msg.getHeader().getValidateCode(), 0));
outBuf.writeInt((int) getBufDefaultVal(msg.getHeader().getLength(), 0));
outBuf.writeLong(msg.getHeader().getSessionId() == null ? 0L : msg.getHeader().getSessionId());
outBuf.writeByte((byte)getBufDefaultVal(msg.getHeader().getType(), -1));
outBuf.writeByte(msg.getHeader().getPriority() == null ? (byte)-1 : msg.getHeader().getPriority());
// 扩展信息, 依次写入大小和内容
int size = msg.getHeader().getAttachmentInfo() == null ? 0 : msg.getHeader().getAttachmentInfo().size();
outBuf.writeInt(size);
String key;
byte[] keyArray;
Object value;
if (size > 0) {
for (Map.Entry<String, Object> entry : msg.getHeader().getAttachmentInfo().entrySet()) {
// key serialize
key = entry.getKey();
keyArray = key.getBytes("UTF-8");
outBuf.writeInt(keyArray.length);
outBuf.writeBytes(keyArray);
// value encode
value = entry.getValue();
nettyMarshallingEncoder.encode(ctx, value, outBuf);
}
}
// body使用MarshallingEncoder进行编码
if (msg.getBody() != null) {
nettyMarshallingEncoder.encode(ctx, msg.getBody(), outBuf);
} else {
outBuf.writeInt(0);
}
outBuf.setInt(4, outBuf.readableBytes());
out.add(outBuf);
}
private Object getBufDefaultVal(Object content, Object defaultVal) {
if (content == null) {
return defaultVal;
}
return content;
}
}
解码类
package com.wgs.netty.demo6_netty4private.coder;
import com.wgs.netty.demo6_netty4private.entity.NettyMessage;
import com.wgs.netty.demo6_netty4private.entity.NettyMessageHeader;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.util.HashMap;
import java.util.Map;
/**
* Created by wanggenshen
* Date: on 2019/8/18 14:35.
* Description: Netty协议栈解码类
* 基于长度的消息帧解码器LengthFieldBasedFrameDecoder, 支持粘包、拆包处理
*/
public final class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
/**
* 使用MarshallingDecoder
*/
NettyMarshallingDecoder nettyMarshallingDecoder;
public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
nettyMarshallingDecoder = MarshallingCodeCFactory.buildMarshallingDecoder();
System.out.println("NettyMessageDecoder init constructor success");
}
/**
* 构造器, 初始化解码器: NettyMarshallingDecoder
*
* @param maxFrameLength 消息帧最大长度
* @param lengthFieldOffset 消息偏移量
* @param lengthFieldLength 消息帧实际长度
*/
public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
nettyMarshallingDecoder = MarshallingCodeCFactory.buildMarshallingDecoder();
System.out.println("NettyMessageDecoder init constructor success");
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
NettyMessage message = new NettyMessage();
// read header
int validateCode = frame.readInt();
int length = frame.readInt();
long sessionId = frame.readLong();
Byte type = frame.readByte();
Byte priority = frame.readByte();
NettyMessageHeader header = NettyMessageHeader.builder()
.validateCode(validateCode)
.sessionId(sessionId)
.length(length)
.type(type)
.priority(priority)
.build();
int attachmentSize = frame.readInt();
if (attachmentSize > 0) {
Map<String, Object> attachmentInfo = new HashMap<>();
int keySize = 0;
byte[] keyArray = null;
String key = null;
for (int i = 0; i < attachmentSize; i++) {
keySize = in.readInt();
keyArray = new byte[keySize];
in.readBytes(keyArray);
key = new String(keyArray, "UTF-8");
attachmentInfo.put(key, nettyMarshallingDecoder.decode(ctx, in));
header.setAttachmentInfo(attachmentInfo);
}
}
message.setHeader(header);
// read body
if (frame.readableBytes() > 4) {
message.setBody(nettyMarshallingDecoder.decode(ctx, frame));
}
return message;
}
}
使用jBoss Marshalling进行编解码
package com.wgs.netty.demo6_netty4private.coder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
/**
* Created by wanggenshen
* Date: on 2019/8/18 14:19.
* Description: MarshallingDecoder中decode方法变成protected, 需要拓展才能使用
*/
public class NettyMarshallingDecoder extends MarshallingDecoder {
public NettyMarshallingDecoder(UnmarshallerProvider provider) {
super(provider);
}
public NettyMarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize) {
super(provider, maxObjectSize);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
return super.decode(ctx, in);
}
}
package com.wgs.netty.demo6_netty4private.coder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
/**
* Created by wanggenshen
* Date: on 2019/8/18 14:21.
* Description: XXX
*/
public class NettyMarshallingEncoder extends MarshallingEncoder {
/**
* Creates a new encoder.
*
* @param provider the {@link MarshallerProvider} to use
*/
public NettyMarshallingEncoder(MarshallerProvider provider) {
super(provider);
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
super.encode(ctx, msg, out);
}
}
工厂类
package com.wgs.netty.demo6_netty4private.coder;
import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
/**
* Created by wanggenshen
* Date: on 2019/8/18 13:54.
* Description: Marshalling 编码和解码的工厂类
*/
public class MarshallingCodeCFactory {
/**
* 创建Jboss Marshalling解码器MarshallingDecoder
*/
public static NettyMarshallingDecoder buildMarshallingDecoder() {
// 首先通过Marshalling工具类的getProvidedMarshallerFactory静态方法获取MarshallerFactory实例
// 参数“serial”表示创建的是Java序列化工厂对象,它由jboss-marshalling-serial-1.3.0.CR9.jar提供。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
// 创建了MarshallingConfiguration对象
final MarshallingConfiguration configuration = new MarshallingConfiguration();
// 将它的版本号设置为5
configuration.setVersion(5);
// 然后根据MarshallerFactory和MarshallingConfiguration创建UnmarshallerProvider实例
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
// 最后通过构造函数创建Netty的MarshallingDecoder对象
// 它有两个参数,分别是UnmarshallerProvider和单个消息序列化后的最大长度。
NettyMarshallingDecoder decoder = new NettyMarshallingDecoder(provider, 1024);
return decoder;
}
/**
* 创建Jboss Marshalling编码器MarshallingEncoder
*/
public static NettyMarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
// 创建MarshallerProvider对象,它用于创建Netty提供的MarshallingEncoder实例
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
// MarshallingEncoder用于将实现序列化接口的POJO对象序列化为二进制数组。
NettyMarshallingEncoder encoder = new NettyMarshallingEncoder(provider);
return encoder;
}
}
3、握手请求和登录
握手请求
package com.wgs.netty.demo6_netty4private.handler.auth;
import com.wgs.netty.demo6_netty4private.constant.NettyConstant;
import com.wgs.netty.demo6_netty4private.entity.NettyMessage;
import com.wgs.netty.demo6_netty4private.entity.NettyMessageHeader;
import com.wgs.netty.demo6_netty4private.enums.NettyMessageTypeEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* Created by wanggenshen
* Date: on 2019/8/18 21:24.
* Description: 握手认证客户端
*
* (1)channelActive: 首先TCP连接成功, 客户端构建握手请求消息, 发送至服务端
* (2)channelRead0: 读取来自服务端的握手响应消息, 判断是否连接成功
*/
public class LoginAuthReqHandler extends SimpleChannelInboundHandler{
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage nettyMessage = (NettyMessage) msg;
// 如果是来自服务端握手应答消息, 判断是否认证成功
if (nettyMessage.getHeader() != null
&& NettyMessageTypeEnum.HandShakeResp.getType().equals(nettyMessage.getHeader().getType())) {
doLoginAuthReqMessage(nettyMessage, ctx);
} else {
// 传给后面的ChannelHandler处理
ctx.fireChannelRead(msg);
}
}
private void doLoginAuthReqMessage(NettyMessage nettyMessage, ChannelHandlerContext ctx) {
int loginResult = (int) nettyMessage.getBody();
// 认证成功
if (loginResult == NettyConstant.LOGIN_SUCCESS) {
System.out.println("Login is ok: " + nettyMessage);
ctx.fireChannelRead(nettyMessage);
} else {
// 认证失败, 关闭连接
System.out.println("Login is failed : " + nettyMessage);
ctx.close();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyMessage nettyMessage = buildNettyMessage();
ctx.writeAndFlush(buildNettyMessage());
System.out.println("客户端首次登陆连接, Client send heart beat message: " + nettyMessage);
}
/**
* 构建握手请求消息
*
* @return
*/
private NettyMessage buildNettyMessage() {
NettyMessage nettyMessage = new NettyMessage();
NettyMessageHeader header = NettyMessageHeader.builder()
.type(NettyMessageTypeEnum.HandShakeReq.getType())
.build();
nettyMessage.setHeader(header);
return nettyMessage;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
握手登录验证和响应
package com.wgs.netty.demo6_netty4private.handler.auth;
import com.wgs.netty.demo6_netty4private.constant.NettyConstant;
import com.wgs.netty.demo6_netty4private.entity.NettyMessage;
import com.wgs.netty.demo6_netty4private.entity.NettyMessageHeader;
import com.wgs.netty.demo6_netty4private.enums.NettyMessageTypeEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by wanggenshen
* Date: on 2019/8/18 21:41.
* Description: 握手认证服务端, 包括握手接入和安全认证
*
* (1)握手接入: 判断是否在注册表中, 在则为-重复登录
* (2)安全认证: 是否在白名单中, 不在则拒绝登录;
* (3)发生异常, 需要清除缓存的客户端IP信息
*/
public class LoginAuthRespHandler extends SimpleChannelInboundHandler {
/**
* 客户端登录注册表
*/
private Map<String, Boolean> nettyNodeCheckMap = new ConcurrentHashMap<>();
/**
* 客户端白名单, 不在白名单中的IP拒绝接入
*/
private List<String> whiteList = Arrays.asList("127.0.0.1", "192.168.1.104");
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage nettyMessage = (NettyMessage) msg;
// 如果是握手请求消息, 重复登录、安全认证
if (nettyMessage.getHeader() != null
&& NettyMessageTypeEnum.HandShakeReq.getType().equals(nettyMessage.getHeader().getType())) {
System.out.println("===收到客户端握手请求消息, 开始处理===");
// 处理握手请求消息
NettyMessage loginResp = doLoginAuthRespMessage(ctx);
ctx.writeAndFlush(loginResp);
} else {
// 传给下个handler处理
ctx.fireChannelRead(msg);
}
}
private NettyMessage doLoginAuthRespMessage(ChannelHandlerContext ctx) {
NettyMessage loginResp = null;
// 节点
String nodeIndex = ctx.channel().remoteAddress().toString();
System.out.println("current nodeIndex is : " + nodeIndex);
// 重复登录
if (nettyNodeCheckMap.containsKey(nodeIndex) && nettyNodeCheckMap.get(nodeIndex)) {
loginResp = buildResponse((byte) NettyConstant.LOGIN_AGAIN);
System.out.println("客户端登录验证失败, 重新登录");
} else {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = address.getAddress().getHostAddress();
if (whiteList.contains(ip)) {
loginResp = buildResponse(NettyConstant.LOGIN_SUCCESS);
nettyNodeCheckMap.put(nodeIndex, true);
System.out.println("客户端登录验证通过");
}
}
return loginResp;
}
/**
* 构建握手应答响应
*
* @return
*/
private NettyMessage buildResponse(int result) {
NettyMessage nettyMessage = new NettyMessage();
NettyMessageHeader header = NettyMessageHeader.builder()
.type(NettyMessageTypeEnum.HandShakeResp.getType())
.build();
nettyMessage.setHeader(header);
nettyMessage.setBody(result);
return nettyMessage;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 发生异常, 将客户端信息从登录注册表去除
nettyNodeCheckMap.remove(ctx.channel().remoteAddress().toString());
ctx.close();
super.exceptionCaught(ctx, cause);
}
}
4、心跳机制
心跳请求
package com.wgs.netty.demo6_netty4private.handler.heartbeat;
import com.wgs.netty.demo6_netty4private.entity.NettyMessage;
import com.wgs.netty.demo6_netty4private.entity.NettyMessageHeader;
import com.wgs.netty.demo6_netty4private.enums.NettyMessageTypeEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.concurrent.ScheduledFuture;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* Created by wanggenshen
* Date: on 2019/8/18 22:49.
* Description: 心跳客户端
*/
public class HeartBeatReqHandler extends SimpleChannelInboundHandler {
private volatile ScheduledFuture<?> heartBeatScheduleFuture;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage nettyMessage = (NettyMessage) msg;
Byte type = nettyMessage.getHeader() != null ? nettyMessage.getHeader().getType() : null;
// 心跳响应
if (type != null && NettyMessageTypeEnum.HeartBeatResp.getType().equals(type)) {
System.out.println(formatTime(new Date()) + "【Client receive Server】 heart beat message: ----> " + nettyMessage);
} else if (type != null && NettyMessageTypeEnum.HandShakeResp.getType().equals(type)) {
// 如果登录上后, 开启线程, 定时发送心跳
heartBeatScheduleFuture = ctx.executor()
.scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 10, TimeUnit.SECONDS);
} else {
ctx.fireChannelRead(msg);
}
}
/**
* 定时发送心跳消息
*/
private class HeartBeatTask implements Runnable {
private final ChannelHandlerContext ctx;
public HeartBeatTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
NettyMessage heartBeatReq = buildHeartBeatReq();
System.out.println(formatTime(new Date()) + "【Client -—--> Server】send heart beat message: " + heartBeatReq);
ctx.writeAndFlush(heartBeatReq);
}
}
private String formatTime(Date date) {
String formatTime = DateFormatUtils.format(date, "yyyy-MM-dd HH:mm:ss");
return "【" + formatTime + "】";
}
private NettyMessage buildHeartBeatReq() {
NettyMessage nettyMessage = new NettyMessage();
NettyMessageHeader header = NettyMessageHeader.builder()
.type(NettyMessageTypeEnum.HeartBeatReq.getType())
.build();
nettyMessage.setHeader(header);
return nettyMessage;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof ReadTimeoutException) {
System.out.println("error cause" + cause);
cause.printStackTrace();
} else {
super.exceptionCaught(ctx, cause);
}
// 取消任务并清空
if (heartBeatScheduleFuture != null) {
heartBeatScheduleFuture.cancel(true);
heartBeatScheduleFuture = null;
}
ctx.fireExceptionCaught(cause);
}
}
心跳响应
package com.wgs.netty.demo6_netty4private.handler.heartbeat;
import com.wgs.netty.demo6_netty4private.entity.NettyMessage;
import com.wgs.netty.demo6_netty4private.entity.NettyMessageHeader;
import com.wgs.netty.demo6_netty4private.enums.NettyMessageTypeEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.ReadTimeoutException;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import java.util.Date;
/**
* Created by wanggenshen
* Date: on 2019/8/18 22:50.
* Description: 心跳机制服务端, 接收到客户端的心跳请求消息后, 不做处理直接返回响应
*/
public class HeartBeatRespHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage nettyMessage = (NettyMessage) msg;
if (nettyMessage.getHeader() != null
&& NettyMessageTypeEnum.HeartBeatReq.getType().equals(nettyMessage.getHeader().getType())) {
System.out.println(formatTime(new Date()) + "【Server receive Client】 heart beat request message : " + nettyMessage);
NettyMessage heartBeatResp = buildHeartBeatResponse();
ctx.writeAndFlush(heartBeatResp);
System.out.println(formatTime(new Date()) + "【Server -----> Client】Send heart beat response to client : " + heartBeatResp);
} else {
// 传递给下一个handler处理
ctx.fireChannelRead(msg);
}
}
private String formatTime(Date date) {
String formatTime = DateFormatUtils.format(date, "yyyy-MM-dd HH:mm:ss");
return "【" + formatTime + "】";
}
private NettyMessage buildHeartBeatResponse() {
NettyMessage nettyMessage = new NettyMessage();
NettyMessageHeader header = NettyMessageHeader.builder()
.type(NettyMessageTypeEnum.HeartBeatResp.getType())
.build();
nettyMessage.setHeader(header);
return nettyMessage;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof ReadTimeoutException) {
System.out.println("error cause " + cause);
cause.printStackTrace();
} else {
super.exceptionCaught(ctx, cause);
}
}
}
5、服务端和客户端
服务端
package com.wgs.netty.demo6_netty4private;
import com.wgs.netty.demo6_netty4private.coder.NettyMessageDecoder;
import com.wgs.netty.demo6_netty4private.coder.NettyMessageEncoder;
import com.wgs.netty.demo6_netty4private.constant.NettyConstant;
import com.wgs.netty.demo6_netty4private.handler.auth.LoginAuthRespHandler;
import com.wgs.netty.demo6_netty4private.handler.heartbeat.HeartBeatRespHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
/**
* Created by wanggenshen
* Date: on 2019/8/19 00:24.
* Description: Netty协议栈服务端
*/
public class NettyServer {
public static void bind() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new NettyMessageDecoder(1024 * 1024, 4, 4, -8, 0))
.addLast(new NettyMessageEncoder())
.addLast("readTimeoutHandler", new ReadTimeoutHandler(50))
.addLast("loginAuthRespHandler", new LoginAuthRespHandler())
.addLast("heartBeatRespHandler", new HeartBeatRespHandler());
}
});
bootstrap.bind(NettyConstant.REMOTE_IP, NettyConstant.PORT).sync();
System.out.println("netty server start ok:" + (NettyConstant.REMOTE_IP + NettyConstant.PORT));
}
public static void main(String[] args) throws InterruptedException {
NettyServer.bind();
}
}
客户端
package com.wgs.netty.demo6_netty4private;
import com.wgs.netty.demo6_netty4private.coder.NettyMessageDecoder;
import com.wgs.netty.demo6_netty4private.coder.NettyMessageEncoder;
import com.wgs.netty.demo6_netty4private.constant.NettyConstant;
import com.wgs.netty.demo6_netty4private.handler.auth.LoginAuthReqHandler;
import com.wgs.netty.demo6_netty4private.handler.heartbeat.HeartBeatReqHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.omg.CORBA.TIMEOUT;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Created by wanggenshen
* Date: on 2019/8/19 00:34.
* Description: Netty协议栈客户端
*/
public class NettyClient {
// 创建线程池
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
public static void connect(int port, String host) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("decoder", new NettyMessageDecoder(1024 * 1024, 4, 4, -8, 0))
.addLast("encoder", new NettyMessageEncoder())
.addLast("readTimeoutHandler", new ReadTimeoutHandler(50))
.addLast("loginAuthReqHandler", new LoginAuthReqHandler())
.addLast("heartBeatReqHandler", new HeartBeatReqHandler());
}
});
// L:/127.0.0.1:8080 - R:/127.0.0.1:12088
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port),
new InetSocketAddress(NettyConstant.LOCAL_IP, NettyConstant.LOCAL_PORT)).sync();
future.channel().closeFuture().sync();
} finally {
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("finally|客户端重新发起连接, port: " + NettyConstant.PORT + ", ip: " + NettyConstant.REMOTE_IP);
connect(NettyConstant.PORT, NettyConstant.REMOTE_IP);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
public static void main(String[] args) throws InterruptedException {
NettyClient.connect(NettyConstant.PORT, NettyConstant.REMOTE_IP);
}
}
6、常量类和枚举类
package com.wgs.netty.demo6_netty4private.constant;
/**
* Created by wanggenshen
* Date: on 2019/8/18 22:34.
* Description: 常量类
*/
public class NettyConstant {
/**
* 客户端登录成功
*/
public static final int LOGIN_SUCCESS = 1001;
/**
* 重复登录
*/
public static final int LOGIN_AGAIN = 4001;
public static final String REMOTE_IP = "127.0.0.1";
public static final String LOCAL_IP = "127.0.0.1";
public static final int PORT = 8080;
//public static final int LOCAL_PORT = 8889;
public static final int LOCAL_PORT = 12088;
}
package com.wgs.netty.demo6_netty4private.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* Created by wanggenshen
* Date: on 2019/8/18 11:43.
* Description: Netty协议栈消息类型
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum NettyMessageTypeEnum {
BusinessReq((byte)0, "业务请求"),
BusinessResp((byte)1, "业务响应"),
BusinessOnwWay((byte)2, "业务ONE WAY"),
HandShakeReq((byte)3, "握手请求"),
HandShakeResp((byte)4, "握手响应"),
HeartBeatReq((byte)5, "心跳请求"),
HeartBeatResp((byte)6, "心跳响应")
;
private Byte type;
private String desc;
}
7、测试
(1)正常情况 - 分别运行服务端和客户端,运行结果如下:
(2)异常场景 - 服务端宕机
服务端宕机后再重启,期间客户端需要做到:
- 客户端是否正常发起重连;
- 重连成功后不再重连;
- 重连期间心跳不再发生;
- 服务端重启后客户端能够正常接入并且发生连接和发生心跳;
关闭服务端后,可以看到客户端发起重连:
重启服务端后,可以看到客户端能够正常连到服务端:
(3))异常场景 - 客户端宕机
客户端宕机重启后,服务端需要清除缓存信息并且运行客户端重新登录。