Netty实践

时间:2023-03-09 07:48:00
Netty实践

  Netty是JBOSS针对网络开发的一套应用框架,它也是在NIO的基础上发展起来的。netty基于异步的事件驱动,具有高性能、高扩展性等特性,它提供了统一的底层协议接口,使得开发者从底层的网络协议(比如 TCP/IP、UDP)中解脱出来。

  TCP传输面向的是字节流,存在粘包半包问题。Netty提供了三种基本的解码类(显然只有读数据时才会有该问题)来解决粘包拆包问题:LineBasedFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder。其中,前二者所读码流到达指定长度或遇到分隔符时认为结束,若读到的数据大于指定长度则抛TooLongFrameException并忽略之前读到的码流;最后一者每次读固定长度码流。此外,也针对特定协议提供了一些解决该问题的解码类,如ProtobufVarint32FrameDecoder)

  Netty封装得很好,使得使用起来比较容易,按照“套路”填代码即可。给几个示例:

1、示例

Maven依赖:

         <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency> <dependency><!-- 只有使用到Protobuf时才需要 -->
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>

1.1、服务端/客户端

Server:

 public class SimpleChatServer {

     private static int port = 8080;

     public SimpleChatServer(int port) {
this.port = port;
} public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new SimpleChatServerInitializer()).childOption(ChannelOption.SO_KEEPALIVE, true); System.out.println("server 启动了"); // 绑定端口,开始接收进来的连接
// b.bind(11122);//可以绑定多个端口
ChannelFuture f = b.bind(port).sync(); // 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully(); System.out.println("server 关闭了");
}
} public static void main(String[] args) throws Exception {
new SimpleChatServer(port).run();
}
}

SimpleChatServer

 public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> {
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder(); @Override
public void initChannel(SocketChannel ch) throws Exception {// Pipeline里的Handler是从底层开始向上添加的,故流动方向为后添加的输出给先添加的、或先添加的读入给后添加的
ChannelPipeline pipeline = ch.pipeline(); // 添加ChannelHandler,顺序是敏感的;名字任意,不冲突即可,也可以不指定名字 // Netty提供了三种解码器解决粘包拆包问题:LineBasedFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder。
// 其中,前二者所读码流到达指定长度或遇到分隔符时认为结束,若读到的数据大于指定长度则抛TooLongFrameException并忽略之前读到的码流;最后一者每次读固定长度码流。
// 也可以继承ByteToMessageDecoder自己处理
pipeline.addLast("FrameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("StringDecoder", DECODER); // 解码只会应用于读数据时、编码只会应用于写数据时,因此解码器与编码器添加的先后顺序在客户端和服务端中可不同,但编码器添加的顺序须桶,解码器亦然。
pipeline.addLast("StringEncoder", ENCODER); pipeline.addLast("handler", new SimpleChatServerHandler()); System.out.println("client " + ch.remoteAddress() + " 连接上");
}
}

SimpleChatServerInitializer

 public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> {

     public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

     // 一个客户端连上再断开时,六个事件的触发顺序:加入、(连接上(在SimpleChatServerInitializer中))、在线、异常、掉线、离开
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {// 在ctx加入本Handler时触发,一般在此做初始化工作,如创建buf
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
}
System.out.println("client " + incoming.remoteAddress() + " 加入");
channels.add(ctx.channel());
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {// 当客户端和服务端建立tcp成功之后,Netty的NIO线程会调用channelActive
Channel incoming = ctx.channel();
System.out.println("client " + incoming.remoteAddress() + " 在线");
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Channel incoming = ctx.channel();
System.err.println("client " + incoming.remoteAddress() + " 异常:" + cause.getMessage());
// 当出现异常就关闭连接
// cause.printStackTrace();
ctx.close();
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("client " + incoming.remoteAddress() + " 掉线");
} @Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {// 从ctx移除本Handler时触发
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");
}
System.out.println("client " + incoming.remoteAddress() + " 离开");
channels.remove(ctx.channel());
} // 优先级高于messageReceived方法,有了这个方法就会屏蔽messageReceived方法
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// System.out.println("channelRead");
// Channel incoming = ctx.channel();
// for (Channel channel : channels) {
// if (channel != incoming){
// channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + msg + "\n");
// } else {
// channel.writeAndFlush("server: " + msg + "\n");
// }
// }
// } @Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
Channel incoming = ctx.channel();
System.out.println("**" + incoming.remoteAddress() + " send: " + msg);
for (Channel channel : channels) {
if (channel != incoming) {
// System.out.println("[" + incoming.remoteAddress() + "] " + msg);
channel.writeAndFlush("[" + incoming.remoteAddress() + "] " + msg + "\n");
} else {
// System.out.println("server: " + msg);
channel.writeAndFlush("server: " + msg + "\n");
}
}
} }

SimpleChatServerHandler

Client:

 public class SimpleChatClient {

     private String host;
private int port; public SimpleChatClient(String host, int port) {
this.host = host;
this.port = port;
} public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class)
.handler(new SimpleChatClientInitializer());
Channel channel = bootstrap.connect(host, port).sync().channel(); Scanner sc = new Scanner(System.in);
System.out.println("please enter...");
boolean exit = false;
// 输入exit,退出系统
while (!exit) {
String str = sc.next();
channel.writeAndFlush(str + "\r\n");
if (str.equalsIgnoreCase("exit")) {
exit = true;
channel.close();
}
}
sc.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
} public static void main(String[] args) throws Exception {
new SimpleChatClient("localhost", 8080).run();
}
}

SimpleChatClient

 public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder(); @Override
public void initChannel(SocketChannel ch) throws Exception {// Pipeline里的Handler是从底层开始向上添加的,故流动方向为后添加的输出给先添加的、或先添加的读入给后添加的
ChannelPipeline pipeline = ch.pipeline(); // 添加ChannelHandler,顺序是敏感的;名字任意,不冲突即可,也可以不指定名字 // Netty提供了三种解码器解决粘包拆包问题:LineBasedFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder。
// 其中,前二者所读码流到达指定长度或遇到分隔符时认为结束,若读到的数据大于指定长度则抛TooLongFrameException并忽略之前读到的码流;最后一者每次读固定长度码流。
// 也可以继承ByteToMessageDecoder自己处理
pipeline.addLast("FrameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("StringDecoder", DECODER); // 解码只会应用于读数据时、编码只会应用于写数据时,因此解码器与编码器添加的先后顺序在客户端和服务端中可不同,但编码器添加的顺序须桶,解码器亦然。
pipeline.addLast("StringEncoder", ENCODER); pipeline.addLast("handler", new SimpleChatClientHandler());
}
}

SimpleChatClientInitializer

 public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {

     // 优先级高于messageReceived方法,有了这个方法就会屏蔽messageReceived方法
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// System.out.println(msg.toString());
// } @Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
}

SimpleChatClientHandler

以下几个是官方示例(翻译):

1.2、DiscardServer

服务端接收客户端的消息,不返回任何信息:

 class DiscardServerHandler extends ChannelHandlerAdapter { // (1)

     // @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// // 以静默方式丢弃接收的数据
// ((ByteBuf) msg).release(); // (3)ByteBuf属于引用计数的对象,必须通过release()方法显式释放。
// } @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
ByteBuf in = (ByteBuf) msg;
try {
System.out.println(in.toString(Charset.defaultCharset()));
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
} // 这个低效的循环可以简化为 System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII));
} finally {
ReferenceCountUtil.release(msg);// (2) // 或者写为 in.release();
}
// 或者直接打印
System.out.println("Yes, A new client in = " + ctx.name());
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// 出现异常时关闭连接。
cause.printStackTrace();
ctx.close();
}
} public class DiscardServer {// 可以用telnet连接并输入进行测试,客户端没有任何输出
private int port; public DiscardServer(int port ) {
this.port = port;
} public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)NioEventLoopGroup是处理I/O操作的多线程事件循环。第一个通常称为“boss”,接受传入连接。 第二个通常称为“worker”,当“boss”接受连接并且向“worker”注册接受连接,则“worker”处理所接受连接的流量。
// 使用多少个线程以及如何将它们映射到创建的通道取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)该类用于实例化新的通道以接受传入连接
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)ChannelInitializer是一个特殊的处理程序,用于帮助用户配置新的通道。 很可能要通过添加一些处理程序。随着应用程序变得复杂,可能会向管道中添加更多处理程序
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());// 每次有新连接都会创建一个新的DiscardServerHandler处理之,,而不是只用一个来处理所有的
}
}).option(ChannelOption.SO_BACKLOG, 128) // (5)指定Channel实现的参数
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6) System.out.println("server 启动了"); // Bind and start to accept incoming connections.
// b.bind(11122);//可以绑定多个端口
ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully(); System.out.println("server 关闭了");
}
} public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();// 可以用telnet连接并输入进行测试,客户端没有任何输出
}
}

DiscardServer

1.3、TimeServer_StremBased

时间服务器:服务端收到客户端连接就往客户端发生时间,发完后就关闭连接;客户端连接上服务端,收到消息后就关闭连接,读的一方需要处理 粘包拆包 的问题

 //http://www.yiibai.com/netty/netty-time-server.htmlclass
/**
* 时间服务器<br>
* 服务端收到客户端连接就往客户端发生时间,发完后就关闭连接;客户端连接上服务端,收到消息后就关闭连接。<br>
* 只有读者会有粘包拆包的问题,所以这里只有客户端可能有该问题。
*/
class TimeServerHandler_StreamBased extends ChannelHandlerAdapter {
// 因为时间服务器将忽略任何接收到的数据,但是一旦建立连接就发送消息,所以我们不能使用channelRead()方法。而是覆盖channelActive()方法。
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2) // 在Java NIO中发送消息之前需调用java.nio.ByteBuffer.flip(),但Netty ByteBuf没有这样的方法,它只有两个指针;一个用于读取操作,另一个用于写入操作。当您向ByteBuf写入内容时,写入索引会增加,而读取器索引不会更改。读取器索引和写入器索引分别表示消息的开始和结束位置。
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); // ChannelHandlerContext.write()(和writeAndFlush())方法返回一个ChannelFuture。
// ChannelFuture表示尚未发生的I/O操作。这意味着,任何请求的操作可能尚未执行,因为所有操作在Netty中是异步的。因此,需要在ChannelFuture完成后调用close()方法。注意,close()也可能不会立即关闭连接,并返回一个ChannelFuture。
final ChannelFuture f = ctx.writeAndFlush(time); // (3) // 当写请求完成时,我们如何得到通知?添加监听器
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();// 发完就关闭连接
}
}); // (4)// 可以使用预定义的监听器来简化代码:f.addListener(ChannelFutureListener.CLOSE);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
} public class TimeServer_StreamBased {
private int port; public TimeServer_StreamBased(int port) {
this.port = port;
} public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)NioEventLoopGroup是处理I/O操作的多线程事件循环。第一个通常称为“boss”,接受传入连接。 第二个通常称为“worker”,当“boss”接受连接并且向“worker”注册接受连接,则“worker”处理所接受连接的流量。
// 使用多少个线程以及如何将它们映射到创建的通道取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)该类用于实例化新的通道以接受传入连接
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)ChannelInitializer是一个特殊的处理程序,用于帮助用户配置新的通道。 很可能要通过添加一些处理程序。随着应用程序变得复杂,可能会向管道中添加更多处理程序
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler_StreamBased());// 每次有新连接就创建一个新的该handler来处理,而不是只用一个来处理所有的
}
}).option(ChannelOption.SO_BACKLOG, 128) // (5)指定Channel实现的参数
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6) System.out.println("server 启动了"); // Bind and start to accept incoming connections.
// b.bind(11122);//可以绑定多个端口
ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully(); System.out.println("server 关闭了");
}
} public static void main(String[] args) throws Exception {
int port = 8080; new TimeServer_StreamBased(port).run();// 可以用telnet连接并输入进行测试,客户端输出二进制的32位整数,所以为乱码
}
} class TimeClient_StreamBased {
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
Bootstrap b = new Bootstrap(); // (1)Bootstrap与ServerBootstrap类似,只是它用于非服务器通道,例如客户端或无连接通道。
b.group(workerGroup); // (2)如果只指定一个EventLoopGroup,它将同时用作boss组和worker组。boss组和worker组不是用于客户端。
b.channel(NioSocketChannel.class); // (3)不使用NioServerSocketChannel,而是使用NioSocketChannel来创建客户端通道。
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)这里不像我们使用的ServerBootstrap,所以不使用childOption(),因为客户端SocketChannel没有父类。
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 以下两种方式都可以解决读取粘包拆包的问题
ch.pipeline().addLast(new TimeClientHandler_1_TimeDecoder(), new TimeClientHandler_1_withproblem());
// ch.pipeline().addLast(new TimeClientHandler_2());
}
}); // Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)应该调用connect()方法,而不是bind()方法 // Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
} /**
* 此Handler单独使用的话存在粘包拆包的问题
*/
class TimeClientHandler_1_withproblem extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {// 粘包拆包问题,在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节。
// 因此,不能保证读的是输入的完整。不过,由于返回的是32位int,数据量小,所以这里很少可能发生此问题。
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;// 每次读取到m的字节数不一定是4,所以存在粘包拆包的问题
Date currentTime = new Date(currentTimeMillis);
System.out.println("Default Date Format:" + currentTime.toString()); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = formatter.format(currentTime);
// 转换一下成中国人的时间格式
System.out.println("Date Format:" + dateString); ctx.close();
} finally {
m.release();
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
} /**
* 与TimeClientHandler_1_withproblem搭配使用,解决粘包拆包问题,且易扩展 <br>
* <br>
* 在客户端的ClientHandler之前加上一层,使得字节够了才往ClientHandler传,以解决粘包拆包的问题
*/
class TimeClientHandler_1_TimeDecoder extends ByteToMessageDecoder { // (1)ByteToMessageDecoder是ChannelHandlerAdapter的一个实现,它使得处理碎片问题变得容易
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
} /**
* 此的Handler能解决读取粘包拆包问题,但可扩展性差<br>
* <br>
* 在客户端的ClientHandler里解决粘包拆包问题:字节够了才读
*/
class TimeClientHandler_2 extends ChannelHandlerAdapter {
private ByteBuf buf; @Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)初始化
} @Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)初始化的释放
buf = null;
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release(); if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

TimeServer_StreamBased

1.4、TimeServer_POJOBased

时间服务器:同上,但通过POJO来通信,同样,读的一方需要处理 粘包拆包 的问题

 /**
* 时间服务器<br>
* 服务端收到客户端连接就往客户端发送时间,发完后就关闭连接;客户端连接上服务端,收到消息后就关闭连接。<br>
* 由于只有服务端往客户端发数据,所以服务端只要encode、客户端只要decode
*/ class UnixTime {
private final long value; public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
} public UnixTime(long value) {
this.value = value;
} public long value() {
return value;
} @Override
public String toString() {
Date date = new Date((value() - 2208988800L) * 1000L);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = formatter.format(date);
return dateString;
}
} class TimeEncoder_POJOBased extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int) msg.value());
}
} class TimeServerHandler_POJOBased extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {// 当客户端和服务端建立tcp成功之后,Netty的NIO线程会调用channelActive
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
} @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {// 在ctx加入本Handler时触发,一般在此做初始化工作,如创建buf
Channel incoming = ctx.channel();
System.out.println("client " + incoming.remoteAddress() + " 加入");
} @Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {// 从ctx移除本Handler时触发
Channel incoming = ctx.channel();
System.out.println("client " + incoming.remoteAddress() + " 离开");
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
} public class TimeServer_POJOBased {
private int port; public TimeServer_POJOBased(int port) {
this.port = port;
} public void run() throws Exception {
// NioEventLoopGroup类是个线程组,包含一组NIO线程,用于网络事件的处理(实际上它就是Reactor线程组)。 创建的2个线程组,1个用于服务端接收客户端的连接,另一个用于SocketChannel的 网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// ServerBootstrap类是启动NIO服务器的辅助启动类
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
.option(ChannelOption.SO_BACKLOG, 128).childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
// Pipeline里的Handler是从底层开始向上叠加的,即后者输出给前者、或前者读出的给后者
// 由于只有服务端往客户端发数据,所以服务端只要encode、客户端只要decode
ch.pipeline().addLast(new TimeEncoder_POJOBased(), new TimeServerHandler_POJOBased());
}
})// (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6) System.out.println("server 启动了"); // 绑定端口,开始接收进来的连接
// b.bind(11122);//可以绑定多个端口
ChannelFuture f = b.bind(port).sync(); // (7) // 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("server 关闭了");
}
} public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new TimeServer_POJOBased(port).run();
}
} class TimeClient_POJOBased { public static void main(String[] args) throws Exception { String host = "127.0.0.1";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// Pipeline里的Handler是从底层开始向上添加的,故流动方向为后添加的输出给先添加的、或先添加的读入给后添加的
// 由于只有服务端往客户端发数据,所以服务端只要encode、客户端只要decode
ch.pipeline().addLast(new TimeDecoder_POJOBased(), new TimeClientHandler_POJOBased());
}
}); // 启动客户端
ChannelFuture f = b.connect(host, port).sync(); // (5) // 等待连接关闭
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
} class TimeDecoder_POJOBased extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// TODO Auto-generated method stub
if (in.readableBytes() < 4) {// 只有读者存在粘包半包问题,这里不少于4个字节时才处理,以避免该问题
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
} class TimeClientHandler_POJOBased extends ChannelHandlerAdapter {
// @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

TimeServer_POJOBased

1.5、结合Protobuf

  (就是把自动生成的类当成一个JavaBean来用,但是该JavaBean比普通的Bean更强大,如包含了与Protobuf格式间进行序列化/反序列化的方法)

Netty提供了与Protobuf相关的 解决粘包半包问题的编解码器(ByteBuf与ByteBuf间) 以及 ByteBuf与自定义Protobuf类间的编解码器 :

  • ProtobufVarint32FrameDecoder()
  • ProtobufVarint32LengthFieldPrepender()
  • ProtobufDecoder(Custom Protobuf Class)
  • ProtobufEncoder()

示例(需要添加Protobuf依赖,):

 public final class PersonProbuf {
private PersonProbuf() {
} public static void registerAllExtensions(com.google.protobuf.ExtensionRegistry registry) {
} public interface PersonOrBuilder extends com.google.protobuf.MessageOrBuilder { // optional int64 id = 1;
/**
* <code>optional int64 id = 1;</code>
*
* <pre>
*可选的字段,为64位整数..
* </pre>
*/
boolean hasId(); /**
* <code>optional int64 id = 1;</code>
*
* <pre>
*可选的字段,为64位整数..
* </pre>
*/
long getId(); // optional string name = 2;
/**
* <code>optional string name = 2;</code>
*/
boolean hasName(); /**
* <code>optional string name = 2;</code>
*/
java.lang.String getName(); /**
* <code>optional string name = 2;</code>
*/
com.google.protobuf.ByteString getNameBytes(); // optional string sex = 3;
/**
* <code>optional string sex = 3;</code>
*/
boolean hasSex(); /**
* <code>optional string sex = 3;</code>
*/
java.lang.String getSex(); /**
* <code>optional string sex = 3;</code>
*/
com.google.protobuf.ByteString getSexBytes(); // optional string tel = 4;
/**
* <code>optional string tel = 4;</code>
*/
boolean hasTel(); /**
* <code>optional string tel = 4;</code>
*/
java.lang.String getTel(); /**
* <code>optional string tel = 4;</code>
*/
com.google.protobuf.ByteString getTelBytes();
} /**
* Protobuf type {@code Person}
*/
public static final class Person extends com.google.protobuf.GeneratedMessage implements PersonOrBuilder {
// Use Person.newBuilder() to construct.
private Person(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
} private Person(boolean noInit) {
this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance();
} private static final Person defaultInstance; public static Person getDefaultInstance() {
return defaultInstance;
} public Person getDefaultInstanceForType() {
return defaultInstance;
} private final com.google.protobuf.UnknownFieldSet unknownFields; @java.lang.Override
public final com.google.protobuf.UnknownFieldSet getUnknownFields() {
return this.unknownFields;
} private Person(com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet
.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields, extensionRegistry, tag)) {
done = true;
}
break;
}
case 8: {
bitField0_ |= 0x00000001;
id_ = input.readInt64();
break;
}
case 18: {
bitField0_ |= 0x00000002;
name_ = input.readBytes();
break;
}
case 26: {
bitField0_ |= 0x00000004;
sex_ = input.readBytes();
break;
}
case 34: {
bitField0_ |= 0x00000008;
tel_ = input.readBytes();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new com.google.protobuf.InvalidProtocolBufferException(e.getMessage()).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
} public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() {
return PersonProbuf.internal_static_Person_descriptor;
} protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() {
return PersonProbuf.internal_static_Person_fieldAccessorTable
.ensureFieldAccessorsInitialized(PersonProbuf.Person.class, PersonProbuf.Person.Builder.class);
} public static com.google.protobuf.Parser<Person> PARSER = new com.google.protobuf.AbstractParser<Person>() {
public Person parsePartialFrom(com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new Person(input, extensionRegistry);
}
}; @java.lang.Override
public com.google.protobuf.Parser<Person> getParserForType() {
return PARSER;
} private int bitField0_;
// optional int64 id = 1;
public static final int ID_FIELD_NUMBER = 1;
private long id_; /**
* <code>optional int64 id = 1;</code>
*
* <pre>
*可选的字段,为64位整数..
* </pre>
*/
public boolean hasId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
} /**
* <code>optional int64 id = 1;</code>
*
* <pre>
*可选的字段,为64位整数..
* </pre>
*/
public long getId() {
return id_;
} // optional string name = 2;
public static final int NAME_FIELD_NUMBER = 2;
private java.lang.Object name_; /**
* <code>optional string name = 2;</code>
*/
public boolean hasName() {
return ((bitField0_ & 0x00000002) == 0x00000002);
} /**
* <code>optional string name = 2;</code>
*/
public java.lang.String getName() {
java.lang.Object ref = name_;
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
name_ = s;
}
return s;
}
} /**
* <code>optional string name = 2;</code>
*/
public com.google.protobuf.ByteString getNameBytes() {
java.lang.Object ref = name_;
if (ref instanceof java.lang.String) {
com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref);
name_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
} // optional string sex = 3;
public static final int SEX_FIELD_NUMBER = 3;
private java.lang.Object sex_; /**
* <code>optional string sex = 3;</code>
*/
public boolean hasSex() {
return ((bitField0_ & 0x00000004) == 0x00000004);
} /**
* <code>optional string sex = 3;</code>
*/
public java.lang.String getSex() {
java.lang.Object ref = sex_;
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
sex_ = s;
}
return s;
}
} /**
* <code>optional string sex = 3;</code>
*/
public com.google.protobuf.ByteString getSexBytes() {
java.lang.Object ref = sex_;
if (ref instanceof java.lang.String) {
com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref);
sex_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
} // optional string tel = 4;
public static final int TEL_FIELD_NUMBER = 4;
private java.lang.Object tel_; /**
* <code>optional string tel = 4;</code>
*/
public boolean hasTel() {
return ((bitField0_ & 0x00000008) == 0x00000008);
} /**
* <code>optional string tel = 4;</code>
*/
public java.lang.String getTel() {
java.lang.Object ref = tel_;
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
tel_ = s;
}
return s;
}
} /**
* <code>optional string tel = 4;</code>
*/
public com.google.protobuf.ByteString getTelBytes() {
java.lang.Object ref = tel_;
if (ref instanceof java.lang.String) {
com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref);
tel_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
} private void initFields() {
id_ = 0L;
name_ = "";
sex_ = "";
tel_ = "";
} private byte memoizedIsInitialized = -1; public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1)
return isInitialized == 1; memoizedIsInitialized = 1;
return true;
} public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeInt64(1, id_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBytes(2, getNameBytes());
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, getSexBytes());
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(4, getTelBytes());
}
getUnknownFields().writeTo(output);
} private int memoizedSerializedSize = -1; public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1)
return size; size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream.computeInt64Size(1, id_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream.computeBytesSize(2, getNameBytes());
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream.computeBytesSize(3, getSexBytes());
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream.computeBytesSize(4, getTelBytes());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
} private static final long serialVersionUID = 0L; @java.lang.Override
protected java.lang.Object writeReplace() throws java.io.ObjectStreamException {
return super.writeReplace();
} public static PersonProbuf.Person parseFrom(com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
} public static PersonProbuf.Person parseFrom(com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
} public static PersonProbuf.Person parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
} public static PersonProbuf.Person parseFrom(byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
} public static PersonProbuf.Person parseFrom(java.io.InputStream input) throws java.io.IOException {
return PARSER.parseFrom(input);
} public static PersonProbuf.Person parseFrom(java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
} public static PersonProbuf.Person parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
} public static PersonProbuf.Person parseDelimitedFrom(java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
} public static PersonProbuf.Person parseFrom(com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
} public static PersonProbuf.Person parseFrom(com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
} public static Builder newBuilder() {
return Builder.create();
} public Builder newBuilderForType() {
return newBuilder();
} public static Builder newBuilder(PersonProbuf.Person prototype) {
return newBuilder().mergeFrom(prototype);
} public Builder toBuilder() {
return newBuilder(this);
} @java.lang.Override
protected Builder newBuilderForType(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
} /**
* Protobuf type {@code Person}
*/
public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder<Builder>
implements PersonProbuf.PersonOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() {
return PersonProbuf.internal_static_Person_descriptor;
} protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() {
return PersonProbuf.internal_static_Person_fieldAccessorTable
.ensureFieldAccessorsInitialized(PersonProbuf.Person.class, PersonProbuf.Person.Builder.class);
} // Construct using PersonProbuf.Person.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
} private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
} private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
} private static Builder create() {
return new Builder();
} public Builder clear() {
super.clear();
id_ = 0L;
bitField0_ = (bitField0_ & ~0x00000001);
name_ = "";
bitField0_ = (bitField0_ & ~0x00000002);
sex_ = "";
bitField0_ = (bitField0_ & ~0x00000004);
tel_ = "";
bitField0_ = (bitField0_ & ~0x00000008);
return this;
} public Builder clone() {
return create().mergeFrom(buildPartial());
} public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() {
return PersonProbuf.internal_static_Person_descriptor;
} public PersonProbuf.Person getDefaultInstanceForType() {
return PersonProbuf.Person.getDefaultInstance();
} public PersonProbuf.Person build() {
PersonProbuf.Person result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
} public PersonProbuf.Person buildPartial() {
PersonProbuf.Person result = new PersonProbuf.Person(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.id_ = id_;
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
result.name_ = name_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.sex_ = sex_;
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000008;
}
result.tel_ = tel_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
} public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof PersonProbuf.Person) {
return mergeFrom((PersonProbuf.Person) other);
} else {
super.mergeFrom(other);
return this;
}
} public Builder mergeFrom(PersonProbuf.Person other) {
if (other == PersonProbuf.Person.getDefaultInstance())
return this;
if (other.hasId()) {
setId(other.getId());
}
if (other.hasName()) {
bitField0_ |= 0x00000002;
name_ = other.name_;
onChanged();
}
if (other.hasSex()) {
bitField0_ |= 0x00000004;
sex_ = other.sex_;
onChanged();
}
if (other.hasTel()) {
bitField0_ |= 0x00000008;
tel_ = other.tel_;
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
} public final boolean isInitialized() {
return true;
} public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException {
PersonProbuf.Person parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (PersonProbuf.Person) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
} private int bitField0_; // optional int64 id = 1;
private long id_; /**
* <code>optional int64 id = 1;</code>
*
* <pre>
*可选的字段,为64位整数..
* </pre>
*/
public boolean hasId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
} /**
* <code>optional int64 id = 1;</code>
*
* <pre>
*可选的字段,为64位整数..
* </pre>
*/
public long getId() {
return id_;
} /**
* <code>optional int64 id = 1;</code>
*
* <pre>
*可选的字段,为64位整数..
* </pre>
*/
public Builder setId(long value) {
bitField0_ |= 0x00000001;
id_ = value;
onChanged();
return this;
} /**
* <code>optional int64 id = 1;</code>
*
* <pre>
*可选的字段,为64位整数..
* </pre>
*/
public Builder clearId() {
bitField0_ = (bitField0_ & ~0x00000001);
id_ = 0L;
onChanged();
return this;
} // optional string name = 2;
private java.lang.Object name_ = ""; /**
* <code>optional string name = 2;</code>
*/
public boolean hasName() {
return ((bitField0_ & 0x00000002) == 0x00000002);
} /**
* <code>optional string name = 2;</code>
*/
public java.lang.String getName() {
java.lang.Object ref = name_;
if (!(ref instanceof java.lang.String)) {
java.lang.String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
name_ = s;
return s;
} else {
return (java.lang.String) ref;
}
} /**
* <code>optional string name = 2;</code>
*/
public com.google.protobuf.ByteString getNameBytes() {
java.lang.Object ref = name_;
if (ref instanceof String) {
com.google.protobuf.ByteString b = com.google.protobuf.ByteString
.copyFromUtf8((java.lang.String) ref);
name_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
} /**
* <code>optional string name = 2;</code>
*/
public Builder setName(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000002;
name_ = value;
onChanged();
return this;
} /**
* <code>optional string name = 2;</code>
*/
public Builder clearName() {
bitField0_ = (bitField0_ & ~0x00000002);
name_ = getDefaultInstance().getName();
onChanged();
return this;
} /**
* <code>optional string name = 2;</code>
*/
public Builder setNameBytes(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000002;
name_ = value;
onChanged();
return this;
} // optional string sex = 3;
private java.lang.Object sex_ = ""; /**
* <code>optional string sex = 3;</code>
*/
public boolean hasSex() {
return ((bitField0_ & 0x00000004) == 0x00000004);
} /**
* <code>optional string sex = 3;</code>
*/
public java.lang.String getSex() {
java.lang.Object ref = sex_;
if (!(ref instanceof java.lang.String)) {
java.lang.String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
sex_ = s;
return s;
} else {
return (java.lang.String) ref;
}
} /**
* <code>optional string sex = 3;</code>
*/
public com.google.protobuf.ByteString getSexBytes() {
java.lang.Object ref = sex_;
if (ref instanceof String) {
com.google.protobuf.ByteString b = com.google.protobuf.ByteString
.copyFromUtf8((java.lang.String) ref);
sex_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
} /**
* <code>optional string sex = 3;</code>
*/
public Builder setSex(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000004;
sex_ = value;
onChanged();
return this;
} /**
* <code>optional string sex = 3;</code>
*/
public Builder clearSex() {
bitField0_ = (bitField0_ & ~0x00000004);
sex_ = getDefaultInstance().getSex();
onChanged();
return this;
} /**
* <code>optional string sex = 3;</code>
*/
public Builder setSexBytes(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000004;
sex_ = value;
onChanged();
return this;
} // optional string tel = 4;
private java.lang.Object tel_ = ""; /**
* <code>optional string tel = 4;</code>
*/
public boolean hasTel() {
return ((bitField0_ & 0x00000008) == 0x00000008);
} /**
* <code>optional string tel = 4;</code>
*/
public java.lang.String getTel() {
java.lang.Object ref = tel_;
if (!(ref instanceof java.lang.String)) {
java.lang.String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
tel_ = s;
return s;
} else {
return (java.lang.String) ref;
}
} /**
* <code>optional string tel = 4;</code>
*/
public com.google.protobuf.ByteString getTelBytes() {
java.lang.Object ref = tel_;
if (ref instanceof String) {
com.google.protobuf.ByteString b = com.google.protobuf.ByteString
.copyFromUtf8((java.lang.String) ref);
tel_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
} /**
* <code>optional string tel = 4;</code>
*/
public Builder setTel(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000008;
tel_ = value;
onChanged();
return this;
} /**
* <code>optional string tel = 4;</code>
*/
public Builder clearTel() {
bitField0_ = (bitField0_ & ~0x00000008);
tel_ = getDefaultInstance().getTel();
onChanged();
return this;
} /**
* <code>optional string tel = 4;</code>
*/
public Builder setTelBytes(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000008;
tel_ = value;
onChanged();
return this;
} // @@protoc_insertion_point(builder_scope:Person)
} static {
defaultInstance = new Person(true);
defaultInstance.initFields();
} // @@protoc_insertion_point(class_scope:Person)
} private static com.google.protobuf.Descriptors.Descriptor internal_static_Person_descriptor;
private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_Person_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() {
return descriptor;
} private static com.google.protobuf.Descriptors.FileDescriptor descriptor;
static {
java.lang.String[] descriptorData = {
"\n\006person\"<\n\006Person\022\n\n\002id\030\001 \001(\003\022\014\n\004name\030\002"
+ " \001(\t\022\013\n\003sex\030\003 \001(\t\022\013\n\003tel\030\004 \001(\tB\016B\014Person"
+ "Probuf" };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_Person_descriptor = getDescriptor().getMessageTypes().get(0);
internal_static_Person_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Person_descriptor, new java.lang.String[] { "Id", "Name", "Sex", "Tel", });
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {}, assigner);
} // @@protoc_insertion_point(outer_class_scope)
}

PersonProbuf(自动生成)

 public class ReqClient {

     public void connect(String host, int port) throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup group = new NioEventLoopGroup(); try {
// Bootstrap 类,是启动NIO服务器的辅助启动类
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 解码器在读时才用,编码器在写时才用 // 解码类
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());// frame解码得到ByteBuf
ch.pipeline().addLast(new ProtobufDecoder(PersonProbuf.Person.getDefaultInstance()));// ByteBuf解码得到自定义的Protobuf类
// 编码类
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new ReqClientHandler()); }
}); // 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
System.out.println("客户端启动."); // 等待客服端链路关闭
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
} public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException ex) {
}
}
new ReqClient().connect("127.0.0.1", port);
}
}

ReqClient

 public class ReqClientHandler extends ChannelHandlerAdapter {// 由于添加了编码器和解码器,这里输出和读入的都已是自定义的Protobuf类型

     @Override
public void channelActive(ChannelHandlerContext ctx) {
for (int i = 0; i < 2; i++) {
ctx.write(request(i));
}
ctx.flush();
} private PersonProbuf.Person request(int id) {
PersonProbuf.Person.Builder builder = PersonProbuf.Person.newBuilder();
builder.setId(id);
builder.setName("orange");
builder.setSex("man");
builder.setTel("999"); return builder.build();
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("receive server response:[" + msg + "]");
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

ReqClientHandler

 public class ReqServer {

     public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup WorkerGroup = new NioEventLoopGroup(); try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, WorkerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
// protobufDecoder仅仅负责编码,并不支持读半包,所以在之前,一定要有读半包的处理器。
// 有三种方式可以选择:
// 使用netty提供ProtobufVarint32FrameDecoder
// 继承netty提供的通用半包处理器 LengthFieldBasedFrameDecoder
// 继承ByteToMessageDecoder类,自己处理半包 // 解码器在读时才用,编码器在写时才用 // 解码类
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());// frame解码得到ByteBuf
ch.pipeline().addLast(new ProtobufDecoder(PersonProbuf.Person.getDefaultInstance()));// ByteBuf解码得到自定义的Protobuf类
// 编码类
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new ReqServerHandler());
}
}); // 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
System.out.println("服务端启动."); // 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 释放线程池资源
bossGroup.shutdownGracefully();
WorkerGroup.shutdownGracefully();
}
} public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException ex) {
}
}
new ReqServer().bind(port);
} }

ReqServer

 public class ReqServerHandler extends ChannelHandlerAdapter {//由于添加了编码器和解码器,这里输出和读入的都已是自定义的Protobuf类型
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
PersonProbuf.Person people = (PersonProbuf.Person) msg;
if ("Orange".equalsIgnoreCase(people.getName())) {
// if("Orange".equals(people.getName())){
System.out.println("accept client people:[" + people.toString() + "]");
ctx.writeAndFlush(response(people.getId()));
}
} private PersonProbuf.Person response(long peopleID) {
PersonProbuf.Person.Builder builder = PersonProbuf.Person.newBuilder();
builder.setId(peopleID);
builder.setName("karl");
builder.setSex("boy");
builder.setTel("110");
return builder.build();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

ReqServerHandler

其中用到了自动生成的 PersonProbuf.java,生成方法如下:

1、从 这里 下载压缩包并解压。(生成各种语言的代码时需要用到相关库).

2、从 这里 下载protoc.exe放在上述包的src目录下(用于根据.proto文件生产对应的java类),不妨把src目录加入到环境变量。

3、定义消息格式,person.proto(消息格式的定义规则可参看 这里),如下:

 //option java_package = "Serialization_ProtoBuf.ProtoBuf"; //写包名的话,会自动创建包名指定的文件夹并把生成的类放在其下,此外,类里属性或方法的调用会加上全限定名,移动到其他地方时难改,所以最好别加此项。
option java_outer_classname = "PersonProbuf"; message Person {
optional int64 id=1; //可选的字段,为64位整数..
optional string name=2;
optional string sex=3;
optional string tel=4;
}

person.proto

4、执行 protoc.exe --java_out=java类输出目录 proto文件路径 ,就自动生成了对应的类文件Personprobuf.java,文件名由person.proto里的 java_outer_classname 指定(很长。。。)

其他:Protobuf生成的类的基本操作(以Personprobuf.java为例)

 class TestProtobuf {

     public static void main(String[] args) {
PersonProbuf.Person.Builder builder = PersonProbuf.Person.newBuilder(); builder.setId(1);
builder.setName("Karl");
builder.setSex("boy");
builder.setTel("110");
PersonProbuf.Person person = builder.build(); System.out.println(person.toString()); System.out.println(person.toByteString());
System.out.println(); byte[] buf = person.toByteArray();
for (byte b : buf) {
System.out.print(b);
}
System.out.println("\n"); try {
PersonProbuf.Person person2 = PersonProbuf.Person.parseFrom(buf);
System.out.println(person2.getName() + ", " + person2.getTel());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
} //结果
id: 1
name: "Karl"
sex: "boy"
tel: "110" <ByteString@31cefde0 size=18> 81184759711410826398111121343494948 Karl, 110

TestProtobuf

2、参考资料

1、http://mangocool.com/1446174360500.html 1.1

2、http://netty.io/wiki/user-guide-for-4.x.html 官方示例

3、http://www.yiibai.com/netty/netty-discard-server.htm 官方示例翻译

4、https://github.com/orange1438/Netty_Course/tree/master/src/main/java/Serialization_ProtoBuf 结合Protobuf

5、https://blog.csdn.net/dc_726/article/details/47912337 Netty总结