Google的Protobuf在业界非常流行,很多商业项目选择Protobuf作为编解码框架,Protobuf的优点。
(1)在谷歌内部长期使用,产品成熟度高;
(2)跨语言,支持多种语言,包括C++、Java和Python;
(3)编码后的消息更小,更加有利于存储和传输;
(4)编解码的性能非常高;
(5)支持不同协议版本的前向兼容;
(6)支持定义可选和必选字段。
Protobuf的入门
Protobuf是一个灵活、高效、结构化的数据序列化框架,相比于XML等传统的序列化工具,它更小,更快,更简单。Protobuf支持数据结构化一次可以到处使用,甚至跨语言使用,通过代码生成工具可以自动生成不同语言版本的源代码,甚至可以在使用不同版本的数据结构进程间进行数据传递,实现数据结构的前向兼容。
Netty的Protobuf应用开发
服务端代码示例:
import cn.sf.redis.socket.javaser.SubReqServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; public class SubReqServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO 线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(Channel ch) {
//首先向ChannelPipeline添加ProtobufVarint32FrameDecoder,它主要用于半包处理
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
//随后继续添加ProtobufDecoder解码器,它的参数是com.google.protobuf.MessageLite,
//实际上就是要告诉ProtobufDecoder需要解码的目标类是什么,
//否则仅仅从字节数组中是无法判断出要解码的目标类型信息的。
ch.pipeline().addLast(
new ProtobufDecoder(
SubscribeReqProto.SubscribeReq
.getDefaultInstance()));
ch.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
}
}); // 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
} public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new SubReqServer().bind(port);
}
} import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; @ChannelHandler.Sharable
public class SubReqServerHandler extends ChannelHandlerAdapter { @Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
//由于ProtobufDecoder已经对消息进行了自动解码,因此接收到的订购请求消息可以直接使用。
SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
//对用户名进行校验,校验通过后构造应答消息返回给客户端
if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
System.out.println("Service accept client subscribe req : ["+ req.toString() + "]");
//由于使用了ProtobufEncoder,所以不需要对SubscribeRespProto.SubscribeResp进行手工编码。
ctx.writeAndFlush(resp(req.getSubReqID()));
}
} private SubscribeRespProto.SubscribeResp resp(int subReqID) {
SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
builder.setSubReqID(subReqID);
builder.setRespCode(0);
builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
return builder.build();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();// 发生异常,关闭链路
}
}
客户端代码示例:
import cn.sf.redis.socket.javaser.SubReqClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; public class SubReqClient { public void connect(int port, String host) throws Exception {
// 配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
public void initChannel(Channel ch)
throws Exception {
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
//客户端需要解码的对象是订购响应,
//所以使用SubscribeResp Proto.SubscribeResp的实例做入参。
ch.pipeline().addLast(
new ProtobufDecoder(
SubscribeRespProto.SubscribeResp
.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new SubReqClientHandler());
}
}); // 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync(); // 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO 线程组
group.shutdownGracefully();
}
} public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new SubReqClient().connect(port, "127.0.0.1");
}
} import com.google.common.collect.Lists;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import java.util.List; public class SubReqClientHandler extends ChannelHandlerAdapter { public SubReqClientHandler() {
} @Override
public void channelActive(ChannelHandlerContext ctx) {
for (int i = 0; i < 10; i++) {
ctx.write(subReq(i));
}
ctx.flush();
} private SubscribeReqProto.SubscribeReq subReq(int i) {
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq
.newBuilder();
builder.setSubReqID(i);
builder.setUserName("Lilinfeng");
builder.setProductName("Netty Book For Protobuf");
List<String> address = Lists.newArrayList();
address.add("NanJing YuHuaTai");
address.add("BeiJing LiuLiChang");
address.add("ShenZhen HongShuLin");
builder.addAllAddress(address);
return builder.build();
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Receive server response : [" + msg + "]");
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
运行结果:
服务端运行结果如下:
Service accept client subscribe req : [subReqID: 0
userName: "Lilinfeng"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChang"
address: "ShenZhen HongShuLin"
]
.....................................................................
Service accept client subscribe req : [subReqID: 9
userName: "Lilinfeng"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChang"
address: "ShenZhen HongShuLin"
]
客户端运行结果如下。
Receive server response : [subReqID: 0
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
.....................................................................
Receive server response : [subReqID: 9
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
利用Netty提供的Protobuf编解码能力,我们在不需要了解Protobuf实现和使用细节的情况下就能轻松支持Protobuf编解码,可以方便地实现跨语言的远程服务调用和与周边的异构系统进行通信对接。
Protobuf的使用注意事项
ProtobufDecoder仅仅负责解码,它不支持读半包。因此,在ProtobufDecoder前面,一定要有能够处理读半包的解码器,有三种方式可以选择。
- 使用Netty提供的ProtobufVarint32FrameDecoder,它可以处理半包消息;
- 继承Netty提供的通用半包解码器LengthFieldBasedFrameDecoder;
- 继承ByteToMessageDecoder类,自己处理半包消息。
如果你只使用ProtobufDecoder解码器而忽略对半包消息的处理,程序是不能正常工作的。
服务端注释掉ProtobufVarint32FramepDecoder: