Netty实战六之ChannelHandler和ChannelPipeline

时间:2021-12-31 00:40:22

1、Channel的生命周期

Interface Channel定义了一组和ChannelInboundHandler API密切相关的简单但功能强大的状态模型,以下列出Channel的4个状态。

ChannelUnregistered:Channel已经被创建,但还未注册到EventLoop

ChannelRegistered:Channel已经被注册到了EventLoop

ChannelActive:Channel处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了

ChannelInactive:Channel没有连接到远程节点

Channel的正常生命周期如下图所示,当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline中的ChannelHandler,其可以随后对它们做出响应。 Netty实战六之ChannelHandler和ChannelPipeline

2、ChannelHandler的生命周期

下表列出了interface ChannelHandler定义的生命周期操作,在ChannelHandler被添加到ChannelPipeline中或者被从ChannelPipeline中移除时会调用这些操作,这些方法中的每一个都接受一个ChannelHandlerContext参数。

handlerAdded:当把ChannelHandler添加到ChannelPipeline中时被调用

handlerRemoved:当从ChannelPipeline中移除ChannelHandler时被调用

exceptionCaught:当处理过程中在ChannelPipeline中有错误产生时被调用

Netty定义了下面两个重要的ChannelHandler子接口:

·ChannelInboundHandler——处理入站数据以及各种状态变化

·ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作

3、ChannelInboundHandler接口

当某个ChannelInboundHandler的实现重写channelRead()方法时,它将负责显式地释放与池化ByteBuf实例相关的内存,Netty为此提供了一个实用方法ReferenceCountUtil.release()。

@ChannelHandler.Sharable
//扩展了ChannelInboundHandlerAdapter
public class DiscardHandler extends ChannelInboundHandlerAdapter{ @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//丢弃已接收的消息
ReferenceCountUtil.release(msg);
}
}

Netty将使用WARN级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例,但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用SimpleChannelInboundHandler。

@ChannelHandler.Sharable
//扩展了SimpleChannelInboundHandler
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object>{ @Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
//不需要任何显式的资源释放
//No need to do anything special
}
}

由于SimpleChannelInboundHandler会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。

4、ChannelOutboundHandler接口

出站操作和数据将由ChannelOutboundHandler处理,它的方法将被Channel、ChannelPipeline以及ChannelHandlerContext调用。

ChannelOutboundHandler的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷并在稍后继续。

ChannelPromise与ChannelFuture : ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(),从而使ChannelFuture不可变。

5、ChannelHandler适配器

你可以使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter类作为自己的ChannelHandler的起始点。这两个适配器分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现,通过扩展抽象类ChannelHandlerAdapter,他们获得了他们共同的超接口ChannelHandler的方法。生成的类的层次结构如下图。 Netty实战六之ChannelHandler和ChannelPipelineChannelHandlerAdapter还提供了使用方法isSharable(),如果其对应的实现被标注为Sharable,那么这个方法都将返回true,表示它可以被添加到多个ChannelPipeline中。

在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法体调用了其相关联的ChannelHandlerContext上的等效方法,从而将事件转发到了ChannelPipeline中的下一个ChannelHandler中。

6、资源管理

每当通过调用ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()方法来处理数据时,你都需要确保没有任何的资源泄露。你可能还记得前面的章节中所提到的,Netty使用引用技术来处理池化的ByteBuf。所以在完全使用完某个ByteBuf后,调整其引用计数是很重要的。

为了帮助你诊断潜在的(资源泄露)问题,Netty提供了class ResourceLeakDetector,它将对你应用程序的缓冲区分配做大约1%的采样来检测内存泄露。相关的开销是非常小的。

Netty定义了4种泄露检测级别。

DISABLED——禁用泄露检测

SIMPLE——使用1%的默认采样率检测并报告任何发现的泄露

ADVANCED——使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置

PARANOID——类似于ADVANCED,但是其将会对每次访问都进行采样,这对性能将会有很大的影响,应该只在调试阶段使用

泄露检测级别可以通过将下面的Java系统属性设置为表中的一个值来定义:

java -Dio.netty.leakDetectionLevel = ADVANCED

如果带着该JVM选项重新启动你的应用程序,你将看到自己的应用程序最近被泄露的缓冲区被访问的位置。

实现ChannelInboundHandler.channelRead()和ChannelOutboundHandler.write()方法时,应该如何使用这个诊断工具来防止泄露呢?让我们看看你的channelRead()操作直接消费入站消息的情况,也就是说,他不会通过调用ChannelHandlerContext.fireChannelRead()方法将入站消息转发给下一个ChannelInboundHandler。

消费入站消息的简单方式: 由于消费入站数据是一项常规任务,所以Netty提供了一个特殊的被称为SimpleChannelInboundHandler的ChannelInboundHandler实现,这个实现会在消息被channelRead0()方法消费之后自动释放消息。

在出站方向这边,如果你处理了write()操作并丢弃了一个消息,那么你也应该负责释放它。以下代码展示了一个丢弃所有的写入数据的实现。

@ChannelHandler.Sharable
public class DiscardoutBoundHandler extends ChannelOutboundHandlerAdapter{ @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
//释放资源
ReferenceCountUtil.release(msg);
//通知ChannelPromise数据已经被处理了
promise.setSuccess();
}
}

重要的是,不仅要释放资源,还要通知ChannelPromise。否则可能会出现ChannelFutureListener收不到某个消息已经被处理了的通知的消息。

总之,如果一个消息被消费或者丢弃了,并且没有传递给ChannelPipeline中的下一个ChannelOutboundHandler,那么用户就有责任调用ReferenceCountUtil.release()。如果消息到达了实际的传输层,那么当它被写入时或者Channel关闭时,都将被自动释放。

7、ChannelPipeline接口

如果你认为ChannelPipeline是一个拦截流经Channel的入站和出站事件的ChannelHandler实例链,那么就很容易看出这些ChannelHandler之间的交互式如何组成一个应用程序数据和时间处理逻辑的核心的。

每一个新创建的Channel都将会被分配一个新的ChannelPipeline。这项关联时永久的,Channel即不能附加另外一个ChannelPipeline,也不能分离其当前的,在Netty组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理,随后,通过调用ChannelHandlerContext实现,它将被转发给同一个超类型的下一个ChannelHandler。

ChannelHandlerContext:ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler交互,ChannelHandler可以通知其所属的ChannelPipeline中的下一个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline。ChannelHandlerContext具有丰富的用于处理事件和执行I/O操作的API。

下图展示了一个典型的同时具有入站和出站ChannelHandler的ChannelPipeline的布局,并且印证了我们之前的关于ChannelPipeline主要由一系列的ChannelHandler所组成的说法,ChannelPipeline还提供了通过ChannelPipeline本身传播事件的方法。如果一个入站事件被触发,它将被从ChannelPipeline的头部开始一直被传播到ChannelPipeline的尾端。如图所示,一个出站I/O事件将从ChannelPipeline的最右边开始,然后向左传播。 Netty实战六之ChannelHandler和ChannelPipeline在ChannelPipeline传播事件时,它会测试ChannelPipeline中的下一个ChannelHandler的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline将跳过该ChannelHandler并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。

8、修改ChannelPipeline

通过调用ChannelPipeline上的相关方法,ChannelHandler可以添加、删除或者替换其他的ChannelHandler,从而实时地修改ChannelPipeline的布局。

ChannelPipeline pipeline = ...;
FirstHandler firstHandler = new FirstHandler();
//将该实例作为“handler1”添加到ChannelPipeline中
pipeline.addLast("handler1",firstHandler);
//将一个SecondHandler的实例作为“handler2”添加到ChannelPipeline的第一个槽中,这意味着它将被放置在已有的“handler1”之前
pipeline.addLast("handler2",new SecondHandler());
//将一个ThirdHandler的实例作为“handler3”添加到ChannelPipeline的最后一个槽中
pipeline.addLast("handler3",new ThirdHandler());
...
//通过名称移除“handler3”
pipeline.remove("handler3");
//通过引用移除FirstHandler
pipeline.remove(firstHandler);
//将SecondHandler(“handler2”)替换为FourthHandler:"handler4"
pipeline.replace("handler2","handler4",new ForthHandler());

ChannelHandler的执行和阻塞:通常ChannelPipeline中的每一个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的I/O处理产生负面的影响。但有时可能需要与那些使用阻塞API的遗留代码进行交互,对于这个情况,ChannelPipeline有一些接受一个EventExecutorGroup的add()方法,如果一个事件被传递给一个自定义的EventExecutorGroup,它将被包含在这个EventExecutorGroup中的某个EventExecutor所处理,从而被从该Channel本身的EventLoop中移除,对于这种用例,Netty提供了一种叫DefaultEventExecutorGroup的默认实现。

——ChannelPipeline保存了与Channel相关联的ChannelHandler

——ChannelPipeline可以根据需要、通过添加或者删除ChannelHandler来动态修改

——ChannelPipeline有着丰富的API用以被调用、以响应入站和出站事件

——ChannelHandlerContext和ChannelHandler之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的

9、使用ChannelHandlerContext

以下代码,将通过ChannelHandlerContext获取到Channel的引用,调用Channel上的write()方法将会导致写入事件从尾端到头部地流经ChannelPipeline。 Netty实战六之ChannelHandler和ChannelPipeline以下代码展示了一个类似的例子,但是这一次是写入ChannelPipeline。我们再次看到,(到ChannelPipeline的)引用是通过ChannelHandlerContext获取的。

ChannelHandlerContext ctx = ..;
//获取到与ChannelHandlerContext相关联的Channel的引用
Channel channel = ctx.channel();
//通过Channel写入缓冲区
channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

为什么会想要从ChannelPipeline中的某个特定点开始传播事件呢?

——为了减少将事件传经对它不感兴趣的ChannelHandler所带来的开销

——为了避免将事件传经那些可能会对它感兴趣的ChannelHandler。

10、ChannelHandler和ChannelHandlerContext的高级用法

可以通过将ChannelHandler添加到ChannelPipeline中来实现动态的协议切换,缓存到ChannelHandlerContext的引用以供稍后使用,这可能会发生在任何的ChannelHandler方法之外,甚至来自于不同的线程。

以下代码,缓存到ChannelHandlerContext的引用

public class WriteHandler extends ChannelHandlerAdapter{

    private ChannelHandlerContext ctx;

    @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//存储到ChannelHandlerContext的引用以供稍后使用
this.ctx = ctx;
} public void send(String msg){
//使用之前存储的到ChannelHandlerContext的引用来发送消息
ctx.writeAndFlush(msg);
}
}

因为一个ChannelHandler可以从属于多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext实例,对于这种用法(指在多个ChannelPipeline*享同一个ChannelHandler),对应的ChannelHandler必须要使用@Sharable注解标注;否则,试图将它添加到多个ChannelPipeline时将会触发异常,显而易见,为了安全地被用于多个并发的Channel(连接),这样的ChannelHandler必须是线程安全的。

以下代码,展示这种模式。

@ChannelHandler.Sharable
//使用注解@Sharable标注
public class SharableHandler extends ChannelInboundHandlerAdapter{ @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Channel read message: " + msg);
//记录方法调用,并转发给下一个ChannelHandler
ctx.fireChannelRead(msg);
} }

前面的ChannelHandler实现了符合所有的将其加入到多个ChannelPipeline的需求,即它使用了注解@Sharable标注,并且也不持有任何的状态。

以下代码,演示@Sharable的错误用法

@ChannelHandler.Sharable
public class UnSharableHandler extends ChannelInboundHandlerAdapter{ private int count; @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将count字段值加1
count++;
System.out.println("channelRead(...) called the " + count + " time");
//记录方法调用,并转发给下一个ChannelHandler
ctx.fireChannelRead(msg);
}
}

这段代码的问题在于它拥有状态,即用于跟踪方法调用次数的实例变量count。将这个类的一个实例添加到ChannelPipeline将极有可能在它被多个并发Channel访问时导致问题。(可以将ChannelRead()方法变为同步方法)

总之,只应该在确定了你的ChannelHandler是线程安全的时才使用@Sharable注解。

为何要共享同一个ChannelHandler:在多个ChannelPipeline中安装同一个ChannelHandler的一个常见的原因是用于收集跨越多个Channel的统计信息。

11、处理入站异常

异常处理是任何真实应用程序的重要组成部分,它也可以通过多种方式来实现,因此,Netty提供了几种方式用于处理入站或者出站处理过程中所抛出的异常。

如果在处理入站事件的过程中有异常被抛出,那么它将从它在ChannelInboundHandler里被触发的那一点开始流经ChannelPipeline。要想处理这种类型的入站异常,你需要在你的ChannelInboundHandler实现exceptionCaught方法。

以下代码,展示了其关闭Channel并打印了异常的栈跟踪信息

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter{

    @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

因为异常将会继续按照入站方向流动(就像所有入站事件一样),所以实现了前面所示逻辑的ChannelInboundHandler通常位于ChannelPipeline的最后,这确保了所有的入站异常都总是会被处理,无论他们可能会发生在ChannelPipeline中的什么位置。

你应该如何响应异常,可能很大程序上取决于你的应用程序,你可能想要关闭Channel(和连接),也可能会尝试进行恢复。如果你不实现任何处理入站异常的逻辑,那么Netty将会记录该异常没有被处理的事实。

——ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline中的下一个ChannelHandler

——如果异常到达了ChannelPipeline的尾端,它将会被记录为未处理

——要想定义自定义的处理逻辑,你需要重写exceptionCaught方法,然后你需要决定是否需要将该异常传播出去

12、处理出站异常

——每个出站操作都将返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener将在操作完成时被通知该操作是成功了还是出错了

——几乎所有的ChannelOutboundHandler上的方法都会传入一个ChannelPromise的实例,作为ChannelFuture的子类,ChannelPromise也可以被分配用于异步通知的监听器,但是,ChannelPromise还具有提供立即通知的可写方法。

以下代码,添加channelFutureListener,它将打印栈跟踪信息,并且随后关闭Channel

ChannelFuture future = channel.wirte(someMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (!channelFuture.isSuccess()){
channelFuture.cause().printStackTrace();
channelFuture.channel().close();
}
}
});

第二种方式是将ChannelFutrueListener添加到即将作为参数传递给ChannelOutboundHandler的方法的ChannelPromise。

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter{

    @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (!f.isSuccess()){
f.cause().printStackTrace();
f.channel().close();
}
}
});
}
}

ChannelPromise的可写方法:通过调用ChannelPromise上的setSuccess()和setFailure()方法,可以使一个操作的状态在ChannelHandler的方法返回给其调用者时便即刻被感知到。

Netty实战六之ChannelHandler和ChannelPipeline

Netty实战六之ChannelHandler和ChannelPipeline的更多相关文章

  1. Netty 系列四(ChannelHandler 和 ChannelPipeline)&period;

    一.概念 先来整体的介绍一下这篇博文要介绍的几个概念(Channel.ChannelHandler.ChannelPipeline.ChannelHandlerContext.ChannelPromi ...

  2. 【Netty】ChannelHandler和ChannelPipeline

    一.前言 前面学习了Netty的ByteBuf,接着学习ChannelHandler和ChannelPipeline. 二.ChannelHandler和ChannelPipeline 2.1 Cha ...

  3. netty中的ChannelHandler和ChannelPipeline

    netty中的ChannelHandler和ChannelPipeline ChannelHandler 家族 https://www.w3cschool.cn/essential_netty_in_ ...

  4. Netty的ChannelHandler&comma;ChannelHandlerContext&comma;ChannelPipeline

    本小节一起学习一下ChannelHandler,ChannelHandlerContext,ChannelPipeline这三个Netty常用的组件,不探究它们的底层源码,我们就简单的分析一下用法 首 ...

  5. Netty学习笔记(番外篇) - ChannelHandler、ChannelPipeline和ChannelHandlerContext的联系

    这一篇是 ChannelHandler 和 ChannelPipeline 的番外篇,主要从源码的角度来学习 ChannelHandler.ChannelHandler 和 ChannelPipeli ...

  6. Netty学习笔记(二) - ChannelPipeline和ChannelHandler

    ChannelPipeline 和 ChannelHandler 是 Netty 重要的组件之一,通过这篇文章,重点了解这些组件是如何驱动数据流动和处理的. 一.ChannelHandler 在上一篇 ...

  7. Netty 框架学习 —— ChannelHandler 与 ChannelPipeline

    ChannelHandler 1. Channel 生命周期 Channel 的生命周期状态如下: 状态 描述 ChannelUnregistered Channel 已经被创建,但还未注册到 Eve ...

  8. 1、Netty 实战入门详解

    一.Netty 简介 Netty 是基于 Java NIO 的异步事件驱动的网络应用框架,使用 Netty 可以快速开发网络应用,Netty 提供了高层次的抽象来简化 TCP 和 UDP 服务器的编程 ...

  9. Netty实战入门详解——让你彻底记住什么是Netty(看不懂你来找我)

    一.Netty 简介 Netty 是基于 Java NIO 的异步事件驱动的网络应用框架,使用 Netty 可以快速开发网络应用,Netty 提供了高层次的抽象来简化 TCP 和 UDP 服务器的编程 ...

随机推荐

  1. H-The Cow Lineup(POJ 1989)

    The Cow Lineup Time Limit: 1000MS   Memory Limit: 30000K Total Submissions: 5367   Accepted: 3196 De ...

  2. MySQL海量数据查询优化策略

    1.对查询进行优化,应尽量避免全表扫描,首先应考虑在 where 及 order by 涉及的列上建立索引. 2.应尽量避免在 where 子句中对字段进行 null 值判断,否则将导致引擎放弃使用索 ...

  3. LeetCode&lpar;3&rpar; &vert;&vert; Median of Two Sorted Arrays

    LeetCode(3) || Median of Two Sorted Arrays 题记 之前做了3题,感觉难度一般,没想到突然来了这道比较难的,星期六花了一天的时间才做完,可见以前基础太差了. 题 ...

  4. 【CF1132F】Clear the String(动态规划)

    [CF1132F]Clear the String(动态规划) 题面 CF 题解 考虑区间\(dp\). 增量考虑,每次考虑最后一个字符和谁一起删去,然后直接转移就行了. #include<io ...

  5. Plickers——教师拿手机、学生拿卡片,就可以完成即时全员互动!

    全员互动.立刻反馈.无设备添加.无能耗增加,风靡全球教育界,杭州师范大学硕士生导师杨俊锋教授推荐!老师拿手机,学生拿卡片就可以完成!      操作方法:      1.注册:登录www.plicke ...

  6. Caused by&colon; java&period;io&period;FileNotFoundException&colon; class path resource &lbrack;spring&sol;springmvc&period;xml&rsqb; cannot be opene

                        Caused by: java.io.FileNotFoundException: class path resource [spring/springmvc. ...

  7. 已知一个函数rand7&lpar;&rpar;能够生成1-7的随机数,请给出一个函数rand10&lpar;&rpar;,该函数能够生成1-10的随机数。

    题目: 已知一个函数rand7()能够生成1-7的随机数,请给出一个函数,该函数能够生成1-10的随机数. 思路: 假如已知一个函数能够生成1-49的随机数,那么如何以此生成1-10的随机数呢? 解法 ...

  8. poj 3083 Children of th

    #include <iostream> #include<stdio.h> #include<string.h> using namespace std; int ...

  9. HDU 1711 Number Sequence &lpar;字符串匹配,KMP算法&rpar;

    HDU 1711 Number Sequence (字符串匹配,KMP算法) Description Given two sequences of numbers : a1, a2, ...... , ...

  10. SecureCRT 7&period;0破解

    激活步骤如下: 1)准备工作:安装好SecureCRT软件,下载并得到该注册机. 2)保持SecureCRT软件关闭(运行的话会提示你正在运行的,关闭就好). 3)将注册机拷贝到你的CRT软件的安装的 ...