netty Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
更确切的讲是一个组件,没有那么复杂。
例子 一 Discard服务器端
我们先写一个简单的服务端和客户端作为入门,接下来我们在深入介绍里面的内容 :(基于netty4 )
package io.netty.example.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; /**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
这是服务端的处理引擎,继承于ChannelInboundHandlerAdapter,但也可以实现接口
ChannelInboundHandler,不过需要实现很多方法。
channelRead()
是服务器接收到客户端的数据并处理的,读取的方式可以是bytebuf二进制缓存,也可以是pojo实体,这部分在下一章介绍。
ByteBuf是一个引用计数对象,必须通过release()方法显式释放。 请记住,处理程序有责任释放传递给处理程序的引用计数对象。 通常,channelRead()处理方法的实现方式 如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
exceptionCaught()
是异常处理方法,通常是日志记录,异常具体处理,如发送错误信息并关闭连接等操作。
接下来,我们编写服务端的主程序:
package io.netty.example.discard; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; /**
* Discards any incoming data.
*/
public class DiscardServer { private int port; public DiscardServer(int port) {
this.port = port;
} public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
} public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
1,NioEventLoopGroup是一个处理I / O操作的多线程事件循环。 Netty为不同类型的传输提供了各种EventLoopGroup实现。 在这个例子中我们正在实现一个服务器端的应用程序,因此将使用两个NioEventLoopGroup。 第一个,通常称为“老板”,接受传入的连接。 第二个,通常称为“工人”,一旦老板接受连接并将接受的连接注册到工作人员,就处理接受的连接的流量。 使用多少个线程以及它们如何映射到创建的通道取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。
2,ServerBootstrap是一个帮助类,用于设置服务器。 您可以直接使用Channel
设置服务器。 但是,请注意,这是一个繁琐的过程,在大多数情况下您不需要这样做。
3,在这里,我们指定使用NioServerSocketChannel类来实例化一个新的通道来接受传入的连接。
4,这里指定的处理程序将始终由新接受的Channel
进行评估。 ChannelInitializer是一个特殊的处理程序,旨在帮助用户配置新的Channel
。 很可能您想通过添加一些处理程序(如DiscardServerHandler)来配置新通道的ChannelPipeline来实现您的网络应用程序。 随着应用程序变得复杂,您可能会在ChannelPipeline
中添加更多的处理程序,并将这个匿名类最终提取到*类中。
5,您还可以设置特定于Channel实现的参数。 我们正在编写一个TCP / IP服务器,因此我们可以设置套接字选项,如tcpNoDelay和keepAlive。 请参阅ChannelOption的apidocs和特定的ChannelConfig实现,以获得有关支持的ChannelOptions的概述。
6,你有没有注意到option()和childOption()? option()用于接受传入连接的NioServerSocketChannel。 childOption()用于在这种情况下由主服务器通道(即NioServerSocketChannel)接受的通道。
7,我们现在准备好了。 剩下的是绑定到端口并启动服务器。 这里,我们绑定到机器中所有NIC(网络接口卡)的端口8080。 您现在可以根据需要调用bind()方法多次(使用不同的绑定地址)。
恭喜! 你刚刚完成了Netty上的第一台服务器。
========================================================
如何查看接收到的数据
现在我们已经写了我们的第一个服务器,我们需要测试它是否真的有效。 测试它的最简单的方法是使用telnet命令。 例如,您可以在命令行中输入telnet localhost 8080,然后输入发送内容。
但是如何证明服务器收到了信息了呢?我们已经知道,在收到数据时,会调用channelRead()方法。 让我们把一些代码放到DiscardServerHandler的channelRead()方法中:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
1,这个无效循环实际上可以简化为:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
2,可选的操作:in.release()
释放资源
接下来,你就可以看到打印的信息了。
========================================================
写一个回显的服务器
到目前为止,我们一直在消费数据,没有任何反应。 然而,服务器通常应该响应请求。 让我们学习如何通过实现ECHO协议向客户端写入响应消息,其中任何接收到的数据都将被发回。与前面部分实现的丢弃服务器的唯一区别在于它将接收到的数据发回,而不是将接收到的数据输出到控制台。 因此,再次修改channelRead()方法是足够的:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
1,ChannelHandlerContext对象提供了各种操作,可以触发各种I / O事件和操作。 在这里,我们调用write(Object)来逐字写入接收到的消息。 请注意,我们没有像DISCARD示例中那样发布接收的消息。 这是因为,当Netty发布给电子邮件时,Netty会为您发布。
2,ctx.write(Object)不会将消息写入管道。 它在内部缓冲,然后通过ctx.flush()刷出数据。 或者,为简洁起见,您可以调用ctx.writeAndFlush(msg)。
===========================================================
写一个时间服务器
本节中实现的协议是TIME协议。 与前面的示例不同的是,它发送一个包含32位整数的消息,而不接收任何请求,并在发送消息后关闭连接。 在此示例中,您将学习如何构建和发送消息,并在完成时关闭连接。
因为我们将忽略任何接收到的数据,而是在建立连接后立即发送消息,这次我们不能使用channelRead()方法。 相反,我们应该覆盖channelActive()方法。 以下是实现:
服务引擎:
package io.netty.example.time; public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1,如上所述,当建立连接并准备好生成连接时,将调用channelActive()方法。 我们来写一个32位整数,表示这个方法当前的时间。
2,要发送新消息,我们需要分配一个包含消息的新缓冲区。 我们要写一个32位整数,因此我们需要一个容量至少为4个字节的ByteBuf。 通过ChannelHandlerContext.alloc()获取当前的ByteBufAllocator,并分配一个新的缓冲区。
3,接下来,我构造消息:
但等等,消息切割在哪里?在NIO发送消息之前,我们是否曾经调用过java.nio.ByteBuffer.flip()? ByteBuf没有这样的方法,因为它有两个指针;一个用于读操作,另一个用于写操作。当读写器索引没有改变时,写入索引会增加。读者索引和写索引分别表示消息的开始和结束位置。
相比之下,NIO缓冲区不能提供一个干净的方式来确定消息内容的起始和结束位置,而不需要调用flip方法。当您忘记翻转缓冲区时,您将会遇到麻烦,因为没有发送任何数据或不正确的数据。在Netty中不会发生这样的错误,因为我们针对不同的操作类型有不同的指针。你会发现它使你的操作更简单。
另外要注意的是,ChannelHandlerContext.write()(和writeAndFlush())方法返回一个ChannelFuture。 ChannelFuture表示尚未发生的I / O操作。这意味着任何请求的操作可能尚未执行,因为所有操作在Netty中都是异步的。例如,即使在发送消息之前,以下代码也可能会关闭连接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此,您需要在ChannelFuture完成之后调用close()方法,该方法由write()方法返回,并且在写入操作完成后通知其监听器。 请注意,close()也可能不会立即关闭连接,并返回ChannelFuture。
4,当写请求完成后,我们如何得到通知? 这就像将ChannelFutureListener添加到返回的ChannelFuture一样简单。 在这里,我们创建了一个新的匿名ChannelFutureListener,当操作完成时关闭通道。
f.addListener(ChannelFutureListener.CLOSE);
要测试我们的时间服务器是否按预期工作,可以使用UNIX rdate命令:
$ rdate -o <port> -p <host>
其中<port>是在main()方法中指定的端口号,<host>通常是localhost。
=======================================================================
编写一个时间的客户端
与DISCARD服务器和回显服务器不同,我们需要TIME协议的客户端,因为人类无法将32位二进制数据转换为日历上的日期。 在本节中,我们将讨论如何确保服务器正常工作,并学习如何使用Netty编写客户端。
Netty中服务器和客户端之间最大和唯一的区别是使用不同的Bootstrap和Channel实现。 请看下面的代码:
package io.netty.example.time; public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
}); // Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5) // Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
1,Bootstrap类似于ServerBootstrap,除了它用于非服务器通道,如客户端或无连接通道。
2,如果只指定一个EventLoopGroup,它将同时用作一个老板组和一个工作组。 虽然老板的工作人员不用于客户端。
3,NioSocketChannel代替NioServerSocketChannel,用于创建客户端channel。
4,请注意,我们不使用childOption(),这与ServerBootstrap不同,因为客户端SocketChannel没有父级。
5,我们应该调用connect()方法而不是bind()方法。
可以看到,它与服务器端代码没有什么不同。 ChannelHandler的实现如何? 它应该从服务器收到一个32位整数,将其翻译成可读的格式,打印翻译时间,并关闭连接:
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1,在TCP / IP中,Netty将从对等体发送到ByteBuf的数据。
它看起来很简单,与服务器端的示例看起来没有什么不同。 但是,这个处理程序有时会拒绝提高IndexOutOfBoundsException。 我们讨论为什么会在下一节中发生这种情况。
===================================================
总结:这主要翻译官方文献,内容还是很清晰的。
地址:http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
也可以官方下载相应的例子进行研究