Netty 教程 – 初窥Netty编程

时间:2022-10-07 01:13:38

什么是Netty

Netty是业界有名且最流行的NIO框架之一,健壮,稳定,高性能,可定制,可扩展在同类框架都是首屈一指,而且成功的运用在各大商业项目中,比如HadoopRPC框架avro,当当接盘的DubboX都在使用…

Netty 的优点

  • API使用简单,开发门槛低
  • 功能强大,多种解码与编码器
  • 支持多种主流的通讯协议
  • 定制能力强大,可以通过ChannelHandler对通讯框架灵活的扩展
  • 相比业界主流NIO框架,Netty综合评价更高
  • 成熟稳定,社区活跃

Netty 缺点

  • 5.x 模型存在问题,已被废弃

编译

GIT:GitHub - netty/netty: Netty project - an event-driven asynchronous network application framework

如果需要编译Netty,需要最低JDK1.7,在运行时Netty3.x只需要JDK1.5,同时博客参考 李林峰 大神的 《Netty权威指南第二版》

因为主要是学习Netty,而不是实战,同时为了更好的适配即将推出的Netty6,用Netty5的API也许会更好点,就当为Netty6做技术储备吧…

如果使用Maven,在项目中需要添加

<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>5.0.0.Alpha2</version>
    </dependency>
</dependencies>

Hello Netty

继续用TimeServer 与 TimeClient 为例,改造代码使用Netty实现服务端与客户端的通讯,以及上一章博客遗留的数据丢失问题,在使用Netty后都是不在的…

TimeServer

public static void bind(int port) {
    EventLoopGroup masterGroup = new NioEventLoopGroup();//创建线程组
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap bootstrap = new ServerBootstrap();//创建NIO服务端启动辅助类
        bootstrap.group(masterGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)//连接数
                    .option(ChannelOption.TCP_NODELAY, true)//不延迟,立即推送
                    .childHandler(new ChildChannelHandler());
        //绑定端口,同步等待成功,
        System.out.println("绑定端口,同步等待成功......");
        ChannelFuture future = bootstrap.bind(port).sync();
        //等待服务端监听端口关闭
        future.channel().closeFuture().sync();
        System.out.println("等待服务端监听端口关闭......");
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        //优雅退出释放线程池
        masterGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        System.out.println("优雅退出释放线程池......");
    }
}

1.实例化个含NIO线程的线程组,专门用来处理网络事件,管理线程
2.使用ServerBootstrap类来初始化Netty服务器,并且开始监听端口的socket请求
3.根据ServerBootstrap內封装好的方法设置服务器基础信息
4.设置服务端的连接数 ChannelOption.SO_BACKLOG 为 1024
5.设置服务端消息不延迟,立即推送 ChannelOption.TCP_NODELAY
6.配置好服务器,在服务器启动时绑定闯入的port端口,等待同步
7.优雅退出,释放资源

数据操作类ServerHandler:在该类中对客户端发来的数据进行操作,发送给客户端数据

private static class ChildChannelHandler extends ChannelInitializer {

    @Override
    protected void initChannel(Channel channel) throws Exception {
        //创建Channel通道
        channel.pipeline().addLast(new TimeServerHandler());//往通道中添加i/o事件处理类
    }

    private static class TimeServerHandler extends ChannelHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //channelRead方法中的msg参数(服务器接收到的客户端发送的消息)强制转换成ByteBuf类型
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");//返回String,指定编码(有可能出现不支持的编码类型异常,所以要trycatch
            System.out.println("TimeServer 接收到的消息 :" + body);
            ByteBuf resp = Unpooled.copiedBuffer("你在说什么呢...".getBytes());
            ctx.write(resp);// 将消息放入缓冲数组
        }
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //将消息队列中信息写入到SocketChannel中去,解决了频繁唤醒Selector所带来不必要的性能开销
            //Netty的 write 只是将消息放入缓冲数组,再通过调用 flush 才会把缓冲区的数据写入到 SocketChannel
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();//发生异常时候,执行重写后的 exceptionCaught 进行资源关闭
        }
    }
}

public static void main(String[] args) {
    TimeServer.bind(4040);
}

ChildChannelHandler可以看到,只需重写对应的方法,即可简单的实现一个Netty通讯的服务端,这里的ByteBuf可以看成是NettyByteBuffer的增强实现…

接下来编写TimeClient程序

TimeClient

public static void connect(String host, int port) {
    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    ChannelFuture future = null;
    try {
        bootstrap.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)//不延迟,消息立即推送
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new TimeClientHandler());
                    }
                });
        //发起异步请求
        future = bootstrap.connect(host, port).sync();
        //等待客户端链路关闭
        future.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        group.shutdownGracefully();
    }
}

一个很简单的connect连接操作,在服务端我们使用的是ServerBootstrap 客户端使用 Bootstrap就可以了,是不是很像NIO编程中的ServerSocketChannel 与 SocketChannel,这里与服务端不同的是它的Channel需要设置为NioSocketChannel,然后为其添加一个handler…..

private static class TimeClientHandler extends ChannelHandlerAdapter {
    private final ByteBuf firstMessage;
    public TimeClientHandler() {
        byte[] req = "QUERY TIME ORDER ".getBytes();
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(firstMessage);//推送消息
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("TimeClient 接收到的消息 :" + body);
        //ctx.close();//接受完消息关闭连接,注释掉可以看到释放资源,否则请求完后就关闭连接是看不到异常情况的
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("释放资源:" + cause.getMessage());//不重写将会看到堆栈信息以及资源无法关闭
        ctx.close();
    }
}

public static void main(String[] args) {
    TimeClient.connect("127.0.0.1", 4040);
}

1.首先跟服务器操作类一样继承ChannelHandlerAdapter类,重写channelReadchannelActiveexceptionCaught三个方法
2.其中channelActive方法是用来发送客户端信息的,channelRead方法客户端是接收服务器数据的
3.先声明一个全局变量firstMessage,用来接收客户端发出去的信息的值
4.在channelActive方法中,把要传输的数据转化为字节数组
5.在channelRead方法中,通过ByteBuf接收服务端回复的内容,然后关闭当前连接(当然具体是否关闭取决于业务了,如果请求完就关闭连接是看不到释放资源的日志)
6.在exceptionCaught方法中,可以处理一些异常情况,比如服务器关闭,该方法会监听到…

测试一下

启动 TimeServer

绑定端口,同步等待成功......
TimeServer 接收到的消息 :QUERY TIME ORDER

启动 TimeClient 然后关闭 TimeServer

TimeClient 接收到的消息 :你在说什么呢...
释放资源:远程主机强迫关闭了一个现有的连接。