---恢复内容开始---
俺工作已经一年又6个月了,想想过的真快,每天写业务,写业务,写业务......。然后就是祈祷着,这次上线不要出现线上bug。继续这每天无聊的增删改查,学习学习一下自己感兴趣的事,就把自己当作小学生。然后学学习,打发打发时间,如果以后自己能用到呢?这又有谁说的清楚。
好了,最近在学习Netty,主要看了这2本书的一些内容,第一本就是《Netty实战》,第二本就是《Netty权威指南》。然后在看到Netty权威指南上有一章比较感兴趣,用了整整一章用来描写如何取自己定义一个协议。接着阅读完后,我就按照书本上的相关内容,去实现了一下。纠正了一下书本上的错误代码。工作都是在开发电商项目,基本上对底层传输这一块接触甚少。如果有机会想去一个游戏公司,这样看看能不能接触更多的网络传输相关内容。哎,不知道这样的去转有木有要,纠结。。。。。。。。。
好了,现在开始看书和事件的经历吧。
现在,我们设计一个传输协议如下
2字节:协议固定值 1字节:主版本号 |
消息长度 :消息头 和消息体 |
回话ID, 全局唯一 |
业务请求消息 |
优先级别 |
附件 |
code |
length |
sessionId |
type |
primary |
attachment |
上面的定义,是来着Netty的权威指南。这个是协议的头。然后接下来是一个协议体。而协议体在编码上就是一个Object.
协议头 | 协议体 |
customHeader |
bodyMessage |
根据上面的定义,直接写出协议定义model.直接上代码:
@Data
@ToString
public class NettyCustomHeader {
/**
* code 2字节:netty协议消息, 1字节:主版本号 1字节:副版本号 4
*/
private int code = 0xABCD0101; /**
* 消息长度 :消息头 和消息题 32
*/
private int length; /**
* 回话ID, 全局唯一 64
*/
private long sessionId; /**
* 业务请求消息 1:业务请求消息 2:业务响应消息 3:握手请求消息 4:握手应答消息 5:心跳请求消息 6:心跳应答消息
*/
private byte type; /**
* 优先级别
*/
private byte primary; /**
* 附件
*/
Map<String, Object> attachment; }
@Data
@ToString
public class NettyCustomMessage { /**
* 消息头
*/
private NettyCustomHeader customHeader; /**
* 消息体
*/
private Object bodyMessage; }
学过Netty的同学或者了解的同学知道,Netty是通过ChannelHandler来处理IO消息的。我编码的Netty版本是4。那么处理消息首先第一步就是解码,LengthFieldBasedFrameDecoder这个解码器是基于长度的解码器,并且能解决TCP/IP包的粘包和拆包问题。代码如下。
public class ByteBuf2NettyMessageDecoder extends LengthFieldBasedFrameDecoder { // private NettyMarshallingDecoder marshallingDecoder = NettyMarshallingFactory.buildNettyMarshallingDecoder(); public ByteBuf2NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
} public ByteBuf2NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
} public ByteBuf2NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
} public ByteBuf2NettyMessageDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(byteOrder, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
} @Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//调用父类decode ,得到整包消息
ByteBuf readBuf = (ByteBuf) super.decode(ctx, in);
if (readBuf == null) {
return null;
}
NettyCustomMessage customMessage = new NettyCustomMessage();
NettyCustomHeader customHeader = new NettyCustomHeader();
customHeader.setCode(readBuf.readInt());
customHeader.setLength(readBuf.readInt());
customHeader.setSessionId(readBuf.readLong());
customHeader.setType(readBuf.readByte());
customHeader.setPrimary(readBuf.readByte()); int attachmentSize = readBuf.readByte();
if (attachmentSize > 0) {
Map<String, Object> attachment = new HashMap<String, Object>();
for (int i = 0; i < attachmentSize; i++) {
int keySize = readBuf.readInt();
byte[] keyByte = new byte[keySize];
readBuf.readBytes(keyByte);
String key = new String(keyByte, CharsetUtil.UTF_8.name()); Object value = JavaByteFactory.decode(readBuf);
//Object value = marshallingDecoder.decode(ctx, readBuf);
attachment.put(key, value);
}
customHeader.setAttachment(attachment);
} customMessage.setCustomHeader(customHeader);
if (readBuf.readableBytes() > 0) {
Object body = JavaByteFactory.decode(readBuf);
//Object body = marshallingDecoder.decode(ctx, readBuf);
customMessage.setBodyMessage(body);
} return customMessage;
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getStackTrace());
cause.getStackTrace();
super.exceptionCaught(ctx, cause);
}
}
上面注释的原因,marshallingDecoder不支持java7,所以我自己写了一个编码/解码帮助类,就是前4个字节代表长度,后面是就是时间内容。从上面的代码我们知道,就是把ByteBuf转化为自己定义的协议对象。从上面的解码上,可能有点模糊,但是从下面的如何编码上,就可以知道为啥是这么解码的。
public class NettyMessage2ByteBufEncoder extends MessageToMessageEncoder<NettyCustomMessage> { private NettyMarshallingEncoder nettyMarshallingEncoder; public NettyMessage2ByteBufEncoder() {
// this.nettyMarshallingEncoder = NettyMarshallingFactory.buildNettyMarshallingEncoder(); } protected void encode(ChannelHandlerContext ctx, NettyCustomMessage msg, List<Object> out) throws Exception { if (msg == null || msg.getCustomHeader() == null) {
throw new Exception("the encode message is null");
} ByteBuf sendBuf = Unpooled.buffer();
sendBuf.writeInt(msg.getCustomHeader().getCode());
sendBuf.writeInt(msg.getCustomHeader().getLength());
sendBuf.writeLong(msg.getCustomHeader().getSessionId());
sendBuf.writeByte(msg.getCustomHeader().getType());
sendBuf.writeByte(msg.getCustomHeader().getPrimary()); //attachment , if (msg.getCustomHeader().getAttachment() != null) {
sendBuf.writeByte(msg.getCustomHeader().getAttachment().size());
String key = null;
byte[] keyArray = null;
for (Map.Entry<String, Object> entryKey : msg.getCustomHeader().getAttachment().entrySet()) {
key = entryKey.getKey();
keyArray = key.getBytes(CharsetUtil.UTF_8.name());
sendBuf.writeInt(keyArray.length);
sendBuf.writeBytes(keyArray);
ByteBuf value = JavaByteFactory.encode(entryKey.getValue());
sendBuf.writeBytes(value);
// nettyMarshallingEncoder.encode(ctx, entryKey.getValue(), sendBuf);
}
} else {
sendBuf.writeByte(0);
} if (msg.getBodyMessage() != null) {
ByteBuf value = JavaByteFactory.encode(msg.getBodyMessage());
sendBuf.writeBytes(value);
//nettyMarshallingEncoder.encode(ctx, msg.getBodyMessage(), sendBuf);
} //在第5个字节开始的int 是长度,重新设置
sendBuf.setInt(4, sendBuf.readableBytes()); out.add(sendBuf);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getStackTrace());
cause.getStackTrace();
super.exceptionCaught(ctx, cause);
}
}
从上面可以知道解码,就是把自定义协议对象 NettyCustomMessage 通过自己的规则放到ByteBuf上。代码比较简单,不解释。JavaByteFactory的代码如下:
public class JavaByteFactory { public static Object decode(ByteBuf byteBuf) {
if (byteBuf == null || byteBuf.readableBytes() <= 0) {
return null;
}
int valueSize = byteBuf.readInt();
byte[] value = new byte[valueSize];
byteBuf.readBytes(value); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(value);
ObjectInputStream inputStream = null;
try {
inputStream = new ObjectInputStream(byteArrayInputStream);
return inputStream.readObject();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null; } public static ByteBuf encode(Object object) {
if (object == null) {
return null;
}
ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
try {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteOutput);
objectOutputStream.writeObject(object);
byte[] bytes = byteOutput.toByteArray(); ByteBuf buffer = Unpooled.buffer(bytes.length + 4);
buffer.writeInt(bytes.length);
buffer.writeBytes(bytes);
return buffer; } catch (IOException e) {
e.printStackTrace();
}
return null;
}
编码就是首选把Object 对象转换了byte []数组,然后写入4个字节为byte[]数组的长度,接着是数组的内容到ByteBuf对象上。相应的解码就是先获取4个字节,得到后面字节长度,接着读取指定长度即可。
接着心跳和权限检测都是在解码器之后进行业务的处理。直接上代码。
下面是权限认证的请求handler和响应handler.
public class AuthorityCertificationRequestHanlder extends ChannelInboundHandlerAdapter { @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buildAuthorityCertificationMsg());
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyCustomMessage message = (NettyCustomMessage) msg;
if (message != null && message.getCustomHeader() != null && message.getCustomHeader().getType() == NettyMessageConstant.CUSTOMER_AUTH_CERTI_TYPE) {
byte authResult = (Byte) message.getBodyMessage();
if (authResult != (byte) 0) { //握手失败。关闭链接
ctx.close();
return;
}
System.out.println("authority certification is success .....");
ctx.fireChannelRead(msg);
} else {
ctx.fireChannelRead(msg);
} } @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.getStackTrace();
ctx.channel().close();
System.out.println(cause.getStackTrace());
ctx.fireExceptionCaught(cause);
} protected NettyCustomMessage buildAuthorityCertificationMsg() {
NettyCustomMessage message = new NettyCustomMessage();
NettyCustomHeader customHeader = new NettyCustomHeader();
customHeader.setType(NettyMessageConstant.CUSTOMER_AUTH_CERTI_TYPE);
message.setCustomHeader(customHeader);
return message;
} }
public class AuthorityCertificationResponseHanlder extends ChannelInboundHandlerAdapter { private Map<String, Boolean> authority = new ConcurrentHashMap<String, Boolean>(); private String[] ipList = new String[]{"127.0.0.1"}; @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyCustomMessage customMessage = (NettyCustomMessage) msg;
NettyCustomMessage response;
if (customMessage.getCustomHeader() != null && customMessage.getCustomHeader().getType() == NettyMessageConstant.CUSTOMER_AUTH_CERTI_TYPE) {
String remoteAddress = ctx.channel().remoteAddress().toString();
if (authority.containsKey(remoteAddress)) { //重复登陆
response = buildAuthorCertiResponseMessage((byte) -1);
} else {
InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
boolean isAuth = false;
for (String ip : ipList) {
if (ip.equals(inetSocketAddress.getAddress().getHostAddress())) {
isAuth = true;
break;
}
}
if (isAuth) {
response = buildAuthorCertiResponseMessage((byte) 0);
authority.put(remoteAddress, true);
} else {
response = buildAuthorCertiResponseMessage((byte) -1);
}
}
System.out.println("the client [" + remoteAddress + "] is connecting ,status:" + response);
ctx.writeAndFlush(response);
return;
}
ctx.fireChannelRead(msg);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getStackTrace());
cause.getStackTrace();
String remoteAddress = ctx.channel().remoteAddress().toString();
authority.remove(remoteAddress);
ctx.channel().close();
ctx.fireExceptionCaught(cause);
} private NettyCustomMessage buildAuthorCertiResponseMessage(byte body) {
NettyCustomMessage message = new NettyCustomMessage();
NettyCustomHeader customHeader = new NettyCustomHeader();
customHeader.setType(NettyMessageConstant.SERVER_AUTH_CERTI_TYPE);
message.setCustomHeader(customHeader);
message.setBodyMessage(body);
return message;
} }
下面是心跳检测handler
public class HeartBeatCheckRequestHandler extends ChannelInboundHandlerAdapter { private volatile ScheduledFuture<?> scheduledFuture; @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyCustomMessage customMessage = (NettyCustomMessage) msg;
if (customMessage.getCustomHeader() != null && customMessage.getCustomHeader().getType() == NettyMessageConstant.SERVER_AUTH_CERTI_TYPE) {
scheduledFuture = ctx.executor().scheduleAtFixedRate(new HeartBeatCheckTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
System.out.println("the client [ " + ctx.channel().localAddress().toString() + " ] send heart beat ...........");
} else if (customMessage.getCustomHeader() != null && customMessage.getCustomHeader().getType() == NettyMessageConstant.HEART_BEAT_CHECK_PONG_TYPE) {
System.out.println("the client [ " + ctx.channel().localAddress().toString() + " ] recieve heart beat .............");
} else {
ctx.fireChannelRead(msg);
} } @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getStackTrace());
cause.getStackTrace();
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
scheduledFuture = null;
}
ctx.fireExceptionCaught(cause);
} class HeartBeatCheckTask implements Runnable { private ChannelHandlerContext context; public HeartBeatCheckTask(ChannelHandlerContext context) {
this.context = context;
} @Override
public void run() {
NettyCustomMessage customMessage = new NettyCustomMessage();
NettyCustomHeader customHeader = new NettyCustomHeader();
customHeader.setType(NettyMessageConstant.HEART_BEAT_CHECK_PING_TYPE);
customMessage.setCustomHeader(customHeader);
context.writeAndFlush(customMessage);
System.out.println("the client [ " + context.channel().localAddress().toString() + " ] send heart beat to server ...."); }
}
}
public class HeartBeatCheckResponseHandler extends ChannelInboundHandlerAdapter { @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyCustomMessage customMessage = (NettyCustomMessage) msg;
if (customMessage.getCustomHeader() != null && customMessage.getCustomHeader().getType() == NettyMessageConstant.HEART_BEAT_CHECK_PING_TYPE) {
System.out.println("the server recieve the client [ " + ctx.channel().remoteAddress().toString() + " ] heart beat check package,"); NettyCustomMessage sendPongMessage = new NettyCustomMessage();
NettyCustomHeader customHeader = new NettyCustomHeader();
customHeader.setType(NettyMessageConstant.HEART_BEAT_CHECK_PONG_TYPE);
sendPongMessage.setCustomHeader(customHeader);
ctx.writeAndFlush(customMessage);
return;
}
ctx.fireChannelRead(msg);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getStackTrace());
cause.getStackTrace();
super.exceptionCaught(ctx, cause);
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("the client [ " + ctx.channel().remoteAddress().toString() + " ] is close ....,then close channel");
ctx.channel().close();
} }
最后是我们的客户端和服务端代码,如下:
public class NettyProtocalClient {
private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); private Bootstrap bootstrap; private EventLoopGroup eventLoopGroup; private String host; private int port; private int localPort; public NettyProtocalClient(String host, int port) {
this(7777, host, port);
} public NettyProtocalClient(int localPort, String host, int port) {
this.host = host;
this.port = port;
this.localPort = localPort;
} public void connect() throws InterruptedException {
try {
bootstrap = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<io.netty.channel.Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast("log", new LoggingHandler(LogLevel.INFO))
.addLast("decoder", new ByteBuf2NettyMessageDecoder(6 * 1024, 4, 4, -8, 0, true))
.addLast("encoder", new NettyMessage2ByteBufEncoder())
.addLast("timeout", new ReadTimeoutHandler(50))
.addLast("authority", new AuthorityCertificationRequestHanlder())
.addLast("hearbeat", new HeartBeatCheckRequestHandler()); }
});
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port), new InetSocketAddress("127.0.0.1", localPort)).sync();
future.channel().closeFuture().sync();
} finally {
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully().sync();
}
executorService.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
connect();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}); }
}
}
public class NettyProtocalServer {
private ServerBootstrap serverBootstrap; private EventLoopGroup boss; private EventLoopGroup worker; private String host; private int port; public NettyProtocalServer(String host, int port) {
this.host = host;
this.port = port;
} public void start() throws InterruptedException {
try {
serverBootstrap = new ServerBootstrap();
boss = new NioEventLoopGroup(1);
worker = new NioEventLoopGroup(); serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast("log",new LoggingHandler(LogLevel.INFO))
.addLast("decoder", new ByteBuf2NettyMessageDecoder(6 * 1024, 4, 4, -8, 0, true))
.addLast("encoder", new NettyMessage2ByteBufEncoder())
.addLast("timeout", new ReadTimeoutHandler(50))
.addLast("authority", new AuthorityCertificationResponseHanlder())
.addLast("hearbeat", new HeartBeatCheckResponseHandler()); }
});
ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(host, port)).sync();
future.channel().closeFuture().sync();
} finally {
if (boss != null) {
boss.shutdownGracefully();
}
if (worker != null) {
worker.shutdownGracefully();
}
}
}
}
最后看一看运行结果吧:
服务端显示内容:
客户端显示内容:
---恢复内容结束---