编解码技术主要应用在网络传输中,将对象比如BOJO进行编解码以利于网络中进行传输。平常我们也会将编解码说成是序列化/反序列化
定义:当进行远程跨进程服务调用时,需要把被传输的java对象编码为字节数组或者ByteBuffer对象。而当远程服务读取到ByteBuffer对象或者字节数组时,需要将其解码为发送时的java对象。这被称为java对象编解码技术。比如java的序列化。
但是,java的序列化有一定的弊端;
- java序列化是java私有的协议,其他语言不支持,故而不能实现跨语言;
- 其次,序列化后的码流太大;
- 再次,序列化性能太低,耗时长。
因此,通常不会选择java序列化作为远程跨节点调用的编解码框架。
当前业界主流的编解码框架有:1)MessagePack高效的二进制序列化框架;2)Google 的Protobuf;3)Facebook的Thrift;4)JBoss Marshalliing
下面运行MessagePack的编解码
这个示例在权威指南上,作者并没有给出完整代码,本博主刚开始运行也没有运行出来,经过网络搜索,参考相关文章运行了出来,其中潜在存在着一些坑,运行中本博主也发现一些现象也总结出来。
一、首先我们需要引入相关jar包
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.11</version> </dependency>
<!-- 创建项目时已经存在,这里贴出来,但本博主不重复放包
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.22.0-GA</version>
</dependency>
-->
二、放上编解码代码,
坑一、一定要在继承的类后面加上泛型,这样,方法中的参数才能跟着发生变化
如过没有添加<>,则解码实现接口方法时直接生成的方法的参数如下:
编码代码
package com.messagePack; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack; public class MsgPackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
MessagePack msgPack = new MessagePack();
// 编码,然后转为ButyBuf传递
byte[] bytes = msgPack.write(o);
byteBuf.writeBytes(bytes);
}
}
解码代码
package com.messagePack; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack; import java.util.List; public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// 获取要解码的byte数组
final byte[] bytes;
final int length = byteBuf.readableBytes();
bytes = new byte[length];
byteBuf.getBytes(byteBuf.readerIndex(),bytes,0,length);
// 调用MessagePack 的read方法将其反序列化为Object对象
MessagePack msgPack = new MessagePack();
list.add(msgPack.read(bytes));
}
}
如果实现的接口后面没有添加泛型<ByteBuf>,则解码实现接口方法时直接生成的方法的参数如下:
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception { }
不过经过实验,将接收到的Object进行转换后仍然可以,要记好netty接收和传递信息都是经过ByteBuf进行的
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
ByteBuf byteBuf = (ByteBuf) o;
// 获取要解码的byte数组
final byte[] bytes;
final int length = byteBuf.readableBytes();
bytes = new byte[length];
byteBuf.getBytes(byteBuf.readerIndex(),bytes,0,length);
// 调用MessagePack 的read方法将其反序列化为Object对象
MessagePack msgPack = new MessagePack();
list.add(msgPack.read(bytes));
}
三、服务端代码
package com.messagePack; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; public class EchoServer {
public void bind(int port) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast("decoder",new MsgPackDecoder())
.addLast("encoder",new MsgPackEncoder())
.addLast(new EchoServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
if(args.length>0&&args!=null){
port = Integer.parseInt(args[0]);
}
new EchoServer().bind(port); }
}
IO处理
package com.messagePack; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.List; public class EchoServerHandler extends ChannelInboundHandlerAdapter {
int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server receive the msgpack message : "+msg+"");
// 原路返回给客户端
ctx.writeAndFlush(msg);
/* 在EchoClientHandler中向服务端发送一个pojo对象,经过MessagePack编解码后,
在EchoServerHandler中的channelRead方法中打印的msg为pojo对象的toString方法内容,
不可以直接将msg转换为User,如果采用如下代码运行不成功*/
/* List<User> users = (List<User>) msg;
System.out.println("到这里面来了,users是否为空:");
System.out.println(users!=null);
for(User u : users){
System.out.println("This is"+ ++count +" times server receive client request."+u);
ctx.write(u);
} ctx.flush();*/
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
坑二、这里的坑比较大,在上述代码注释中已经说明了,下面运行结果是注释掉以上14~16行代码,放开20~28行代码时候运行结果,结果表明收到的消息转化为User数组时候,是空的,但是后台并没有报错,不知道为什么在我的IDEA上运行不下去但是不报错
补充解释,这里是因为采用messagepack解码后,得到的是一个Object list列表,所以不能转化为pojo,应该用List<Object> 来接收解码后的msg,同时这个list中的object也不能转化为pojo,它或许是pojo中的具体属性
四、客户端代码
package com.messagePack; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder; public class EchoClient {
public void connection(int port,String host) throws InterruptedException {
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("msgpack decoder",new MsgPackDecoder())
.addLast("msgpack encoder",new MsgPackEncoder())
.addLast(new EchoClientHandler());
//
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host,port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
if(args.length>0&&args!=null){
System.out.println(args[0]);
port = Integer.parseInt(args[0]);
}
new EchoClient().connection(port,"127.0.0.1");
}
}
IO处理类
package com.messagePack; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private int count; @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
/* User user = getUser();
ctx.writeAndFlush(user);*/
User[] users = getUsers();
for(User u : users){
ctx.write(u);
}
ctx.flush();
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("this is client receive msg【 "+ ++count +" 】times:【"+msg+"】");
if(count<5){ //控制运行次数,因为不加这个控制直接调用下面代码的话,客户端和服务端会形成闭环循环,一直运行
ctx.write(msg);
}
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
private User[] getUsers(){
User[] users = new User[5];
for(int i=0;i<5;i++){
User user = new User();
user.setId(String.valueOf(i));
user.setAge(18+i);
user.setName("张元"+i);
user.setSex("男"+String.valueOf(i*2));
users[i]=user;
}
return users;
} private User getUser(){
User user = new User();
user.setId("11");
user.setAge(18);
user.setName("张元");
user.setSex("男");
return user;
}
}
五、User代码
坑三、要传输的javabean一定要加上注解@message
package com.messagePack; import org.msgpack.annotation.Message; @Message
public class User {
private String name;
private int age;
private String id;
private String sex; public int getAge() {
return age;
} public void setAge(int age) {
this.age = age;
} public String getId() {
return id;
} public void setId(String id) {
this.id = id;
} public String getSex() {
return sex;
} public void setSex(String sex) {
this.sex = sex;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} @Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
", id='" + id + '\'' +
", sex='" + sex + '\'' +
'}';
}
}
六、运行结果
客户端
服务端
七、上面运行的结果发现打印的数据都完全一样,这是因为没有考虑粘包/半包的处理,还不能正常工作,下面我们利用Netty的LengthFieldPrepender和LengthFieldBasedFrameDecoder,来解决上述问题,这里只需要对客户端和服务端添加相关的处理类就可以了,改动代码如下:
客户端
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,
0,4,0,4))
.addLast("msgpack decoder",new MsgPackDecoder())
.addLast("frameEncoder",new LengthFieldPrepender(4))
.addLast("msgpack encoder",new MsgPackEncoder())
.addLast(new EchoClientHandler());
//
}
});
服务端
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast("framDecoder",new LengthFieldBasedFrameDecoder(65535,
0,4,0,4))
.addLast("decoder",new MsgPackDecoder())
.addLast("frameEncoder",new LengthFieldPrepender(4))
.addLast("encoder",new MsgPackEncoder())
.addLast(new EchoServerHandler());
}
});
其实两者改动的地方完全一样,下面看一下运行效果
客户端
服务端
这次运行结果显示的是0~5的数据,但是遗憾的是不知道为什么有重复了一下,而且之重复了部分的运行,暂时不理会。以后应用多了可能就明白了。或者有路过的朋友知道的留个言,谢谢!