本篇文章主要详细分析Netty中的核心组件。
启动器Bootstrap和ServerBootstrap作为Netty构建客户端和服务端的路口,是编写Netty网络程序的第一步。它可以让我们把Netty的核心组件像搭积木一样组装在一起。在Netty Server端构建的过程中,我们需要关注三个重要的步骤
- 配置线程池
- Channel初始化
- Handler处理器构建
调度器详解
前面我们讲过NIO多路复用的设计模式之Reactor模型,Reactor模型的主要思想就是把网络连接、事件分发、任务处理的职责进行分离,并且通过引入多线程来提高Reactor模型中的吞吐量。其中包括三种Reactor模型
- 单线程单Reactor模型
- 多线程单Reactor模型
- 多线程多Reactor模型
在Netty中,可以非常轻松的实现上述三种线程模型,并且Netty推荐使用主从多线程模型,这样就可以轻松的实现成千上万的客户端连接的处理。在海量的客户端并发请求中,主从多线程模型可以通过增加SubReactor线程数量,充分利用多核能力提升系统吞吐量。
Reactor模型的运行机制分为四个步骤,如图2-10所示。
- 连接注册,Channel建立后,注册到Reactor线程中的Selector选择器
- 事件轮询,轮询Selector选择器中已经注册的所有Channel的I/O事件
- 事件分发,为准备就绪的I/O事件分配相应的处理线程
- 任务处理,Reactor线程还负责任务队列中的非I/O任务,每个Worker线程从各自维护的任务队列中取出任务异步执行。
图2-10 Reactor工作流程
EventLoop事件循环
在Netty中,Reactor模型的事件处理器是使用EventLoop来实现的,一个EventLoop对应一个线程,EventLoop内部维护了一个Selector和taskQueue,分别用来处理网络IO事件以及内部任务,它的工作原理如图2-11所示。
图2-11 NioEventLoop原理
EventLoop基本应用
下面这段代码表示EventLoop,分别实现Selector注册以及普通任务提交功能。
public class EventLoopExample {
public static void main(String[] args) {
EventLoopGroup group=new NioEventLoopGroup(2);
System.out.println(group.next()); //输出第一个NioEventLoop
System.out.println(group.next()); //输出第二个NioEventLoop
System.out.println(group.next()); //由于只有两个,所以又会从第一个开始
//获取一个事件循环对象NioEventLoop
group.next().register(); //注册到selector上
group.next().submit(()->{
System.out.println(Thread.currentThread().getName()+"-----");
});
}
}
EventLoop的核心流程
基于上述的讲解,理解了EventLoop的工作机制后,我们再通过一个整体的流程图来说明,如图2-12所示。
EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理IO事件和内部任务。IO事件和内部任务执行时间百分比通过ioRatio来调节,ioRatio表示执行IO时间所占百分比。任务包括普通任务和已经到时的延迟任务,延迟任务存放到一个优先级队列PriorityQueue中,执行任务前从PriorityQueue读取所有到时的task,然后添加到taskQueue中,最后统一执行task。
图2-12 EventLoop工作机制
EventLoop如何实现多种Reactor模型
-
单线程模式
EventLoopGroup group=new NioEventLoopGroup(1);
ServerBootstrap b=new ServerBootstrap();
b.group(group); -
多线程模式
EventLoopGroup group =new NioEventLoopGroup(); //默认会设置cpu核心数的2倍
ServerBootstrap b=new ServerBootstrap();
b.group(group); -
多线程主从模式
EventLoopGroup boss=new NioEventLoopGroup(1);
EventLoopGroup work=new NioEventLoopGroup();
ServerBootstrap b=new ServerBootstrap();
b.group(boss,work);
EventLoop实现原理
-
EventLoopGroup初始化方法,在MultithreadEventExecutorGroup.java中,根据配置的nThreads数量,构建一个EventExecutor数组
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads"); if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
} children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
}
}
} -
注册channel到多路复用器的实现,MultithreadEventLoopGroup.register方法()
SingleThreadEventLoop ->AbstractUnsafe.register ->AbstractChannel.register0->AbstractNioChannel.doRegister()
可以看到会把channel注册到某一个eventLoop中的unwrappedSelector复路器中。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
}
} -
事件处理过程,通过NioEventLoop中的run方法不断遍历
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
//计算策略,根据阻塞队列中是否含有任务来决定当前的处理方式
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) { //如果队列中数据为空,则调用select查询就绪事件
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
default:
}
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
/* ioRatio调节连接事件和内部任务执行事件百分比
* ioRatio越大,连接事件处理占用百分比越大 */
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) { //处理IO时间
processSelectedKeys();
}
} finally {
//确保每次都要执行队列中的任务
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
}
}
服务编排层Pipeline的协调处理
通过EventLoop可以实现任务的调度,负责监听I/O事件、信号事件等,当收到相关事件后,需要有人来响应这些事件和数据,而这些事件是通过ChannelPipeline中所定义的ChannelHandler完成的,他们是Netty中服务编排层的核心组件。
在下面这段代码中,我们增加了h1和h2两个InboundHandler,用来处理客户端数据的读取操作,代码如下。
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
//配置Server的通道,相当于NIO中的ServerSocketChannel
.channel(NioServerSocketChannel.class)
//childHandler表示给worker那些线程配置了一个处理器,
// 这个就是上面NIO中说的,把处理业务的具体逻辑抽象出来,放到Handler里面
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// socketChannel.pipeline().addLast(new NormalMessageHandler());
socketChannel.pipeline().addLast("h1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("handler-01");
super.channelRead(ctx, msg);
}
}).addLast("h2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("handler-02");
super.channelRead(ctx, msg);
}
});
}
});
上述代码构建了一个ChannelPipeline,得到如图2-13所示的结构,每个Channel都会绑定一个ChannelPipeline,一个ChannelPipeline包含多个ChannelHandler,这些Handler会被包装成ChannelHandlerContext加入到Pipeline构建的双向链表中。
ChannelHandlerContext用来保存ChannelHandler的上下文,它包含了ChannelHandler生命周期中的所有事件,比如connect/bind/read/write等,这样设计的好处是,各个ChannelHandler进行数据传递时,前置和后置的通用逻辑就可以直接保存到ChannelHandlerContext中进行传递。
图2-13
出站和入站操作
根据网络数据的流向,ChannelPipeline分为入站ChannelInBoundHandler和出站ChannelOutboundHandler两个处理器,如图2-14所示,客户端与服务端通信过程中,数据从客户端发向服务端的过程叫出站,对于服务端来说,数据从客户端流入到服务端,这个时候是入站。
图2-14 InBound和OutBound的关系
ChannelHandler事件触发机制
当某个Channel触发了IO事件后,会通过Handler进行处理,而ChannelHandler是围绕I/O事件的生命周期来设计的,比如建立连接、读数据、写数据、连接销毁等。
ChannelHandler有两个重要的子接口实现,分别拦截数据流入和数据流出的I/O事件
- ChannelInboundHandler
- ChannelOutboundHandler
图2-15中显示的Adapter类,提供很多默认操作,比如ChannelHandler中有很多很多方法,我们用户自定义的方法有时候不需要重载全部,只需要重载一两个方法,那么可以使用Adapter类,它里面有很多默认的方法。其它框架中结尾是Adapter的类的作用也大都是如此。所以我们在使用netty的时候,往往很少直接实现ChannelHandler的接口,经常是继承Adapter类。
图2-15 ChannelHandler类关系图
ChannelInboundHandler事件回调和触发时机如下
事件回调方法 | 触发时机 |
---|---|
channelRegistered | Channel 被注册到 EventLoop |
channelUnregistered | Channel 从 EventLoop 中取消注册 |
channelActive | Channel 处于就绪状态,可以被读写 |
channelInactive | Channel 处于非就绪状态 |
channelRead | Channel 可以从远端读取到数据 |
channelReadComplete | Channel 读取数据完成 |
userEventTriggered | 用户事件触发时 |
channelWritabilityChanged | Channel 的写状态发生变化 |
ChannelOutboundHandler时间回调触发时机
事件回调方法 | 触发时机 |
---|---|
bind | 当请求将channel绑定到本地地址时被调用 |
connect | 当请求将channel连接到远程节点时被调用 |
disconnect | 当请求将channel从远程节点断开时被调用 |
close | 当请求关闭channel时被调用 |
deregister | 当请求将channel从它的EventLoop注销时被调用 |
read | 当请求通过channel读取数据时被调用 |
flush | 当请求通过channel将入队数据刷新到远程节点时调用 |
write | 当请求通过channel将数据写到远程节点时被调用 |
事件传播机制演示
public class NormalOutBoundHandler extends ChannelOutboundHandlerAdapter {
private final String name;
public NormalOutBoundHandler(String name) {
this.name = name;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandler:"+name);
super.write(ctx, msg, promise);
}
}
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
private final String name;
private final boolean flush;
public NormalInBoundHandler(String name, boolean flush) {
this.name = name;
this.flush = flush;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InboundHandler:"+name);
if(flush){
ctx.channel().writeAndFlush(msg);
}else {
super.channelRead(ctx, msg);
}
}
}
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
//配置Server的通道,相当于NIO中的ServerSocketChannel
.channel(NioServerSocketChannel.class)
//childHandler表示给worker那些线程配置了一个处理器,
// 这个就是上面NIO中说的,把处理业务的具体逻辑抽象出来,放到Handler里面
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new NormalInBoundHandler("NormalInBoundA",false))
.addLast(new NormalInBoundHandler("NormalInBoundB",false))
.addLast(new NormalInBoundHandler("NormalInBoundC",true));
socketChannel.pipeline()
.addLast(new NormalOutBoundHandler("NormalOutBoundA"))
.addLast(new NormalOutBoundHandler("NormalOutBoundB"))
.addLast(new NormalOutBoundHandler("NormalOutBoundC"));
}
});
上述代码运行后会得到如下执行结果
InboundHandler:NormalInBoundA
InboundHandler:NormalInBoundB
InboundHandler:NormalInBoundC
OutBoundHandler:NormalOutBoundC
OutBoundHandler:NormalOutBoundB
OutBoundHandler:NormalOutBoundA
当客户端向服务端发送请求时,会触发服务端的NormalInBound调用链,按照排列顺序逐个调用Handler,当InBound处理完成后调用WriteAndFlush方法向客户端写回数据,此时会触发NormalOutBoundHandler调用链的write事件。
从执行结果来看,Inbound和Outbound的事件传播方向是不同的,Inbound传播方向是head->tail,Outbound传播方向是Tail-Head。
异常传播机制
ChannelPipeline时间传播机制是典型的责任链模式,那么有同学肯定会有疑问,如果这条链路中某个handler出现异常,那会导致什么问题呢?我们对前面的例子修改NormalInBoundHandler
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
private final String name;
private final boolean flush;
public NormalInBoundHandler(String name, boolean flush) {
this.name = name;
this.flush = flush;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InboundHandler:"+name);
if(flush){
ctx.channel().writeAndFlush(msg);
}else {
//增加异常处理
throw new RuntimeException("InBoundHandler:"+name);
}
}
}
这个时候一旦抛出异常,会导致整个请求链被中断,在ChannelHandler中提供了一个异常捕获方法,这个方法可以避免ChannelHandler链中某个Handler异常导致请求链路中断。它会把异常按照Handler链路的顺序从head节点传播到Tail节点。如果用户最终没有对异常进行处理,则最后由Tail节点进行统一处理
修改NormalInboundHandler,重写下面这个方法。
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InboundHandlerException:"+name);
super.exceptionCaught(ctx, cause);
}
在Netty应用开发中,好的异常处理非常重要能够让问题排查变得很轻松,所以我们可以通过一种统一拦截的方式来解决异常处理问题。
添加一个复合处理器实现类
public class ExceptionHandler extends ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if(cause instanceof RuntimeException){
System.out.println("处理业务异常");
}
super.exceptionCaught(ctx, cause);
}
}
把新增的ExceptionHandler添加到ChannelPipeline中
bootstrap.group(bossGroup, workerGroup)
//配置Server的通道,相当于NIO中的ServerSocketChannel
.channel(NioServerSocketChannel.class)
//childHandler表示给worker那些线程配置了一个处理器,
// 这个就是上面NIO中说的,把处理业务的具体逻辑抽象出来,放到Handler里面
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new NormalInBoundHandler("NormalInBoundA",false))
.addLast(new NormalInBoundHandler("NormalInBoundB",false))
.addLast(new NormalInBoundHandler("NormalInBoundC",true));
socketChannel.pipeline()
.addLast(new NormalOutBoundHandler("NormalOutBoundA"))
.addLast(new NormalOutBoundHandler("NormalOutBoundB"))
.addLast(new NormalOutBoundHandler("NormalOutBoundC"))
.addLast(new ExceptionHandler());
}
});
最终,我们就能够实现异常的统一处理。
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自
Mic带你学架构
!
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注「跟着Mic学架构」公众号公众号获取更多技术干货!
基于大量图片与实例深度解析Netty中的核心组件的更多相关文章
-
深度揭秘Netty中的FastThreadLocal为什么比ThreadLocal效率更高?
阅读这篇文章之前,建议先阅读和这篇文章关联的内容. 1. 详细剖析分布式微服务架构下网络通信的底层实现原理(图解) 2. (年薪60W的技巧)工作了5年,你真的理解Netty以及为什么要用吗?(深度干 ...
-
深度解析VC中的消息(转发)
http://blog.csdn.net/chenlycly/article/details/7586067 这篇转发的文章总结的比较好,但是没有告诉我为什么ON_MESSAGE的返回值必须是LRES ...
-
深度解析javascript中的浅复制和深复制
原文:深度解析javascript中的浅复制和深复制 在谈javascript的浅复制和深复制之前,我们有必要在来讨论下js的数据类型.我们都知道有Number,Boolean,String,Null ...
-
基于synchronized锁的深度解析
1. 问题引入 小伙伴们都接触过线程,也都会使用线程,今天我们要讲的是线程安全相关的内容,在这之前我们先来看一个简单的代码案例. 代码案例: /** * @url: i-code.online * @ ...
-
深度解析VC中的消息传递机制
摘要:Windows编程和Dos编程,一个很大的区别就是,Windows编程是事件驱动,消息传递的.所以,要学好Windows编程,必须 对消息机制有一个清楚的认识,本文希望能够对消息的传递做一个全面 ...
-
深度解析Struts2中ValueStack
1.什么是ValueStack 对于每一个action的调用,Struts在执行相应的动作方法之前会先创建一个名为ValueStack的对象.Value Stack用来保存该动作对象或者对象.由于最终 ...
-
深度解析VC中的消息
消息是指什么? 消息系统对于一个win32程序来说十分重要,它是一个程序运行的动力源泉.一个消息,是系统定义的一个32位的值,他唯一的定义了一个事件,向Windows发出一个通知,告诉应用程序某个事情 ...
-
深度解析Java中的5个“黑魔法”
现在的编程语言越来越复杂,尽管有大量的文档和书籍,这些学习资料仍然只能描述编程语言的冰山一角.而这些编程语言中的很多功能,可能被永远隐藏在黑暗角落.本文将为你解释其中5个Java中隐藏的秘密,可以称其 ...
-
【Python Deap库】遗传算法/遗传编程 进化算法基于python DEAP库深度解析讲解
目录 前言 概述 启发式的理解(重点) 优化问题的定义 个体编码 初始族群的创建 评价 配种选择 锦标赛 轮盘赌选择 随机普遍抽样选择 变异 单点交叉 两点交叉 均匀交叉 部分匹配交叉 突变 高斯突变 ...
随机推荐
-
DSP using MATLAB示例Example3.18
代码: % Analog Signal Dt = 0.00005; t = -0.005:Dt:0.005; xa = exp(-1000*abs(t)); % Continuous-time Fou ...
-
PHP+jQuery 列表分页类 ( 支持 url 分页 / ajax 分页 )
/* ******* 环境:Apache2.2.8 ( 2.2.17 ) + PHP5.2.6 ( 5.3.3 ) + MySQL5.0.51b ( 5.5.8 ) + jQuery-1.8.3.mi ...
-
lt&;gt&;eq
lt:less than,小于 gt:greater than,大于 eq:equal,等于 le:less equal,小于等于 ge:greater than,大于等于
-
selenium + robotframework的运行原理
1.点击ride界面启动用例执行时,首先会调用脚本 2.打开pybot脚本查看内容. 3.打开robot包下面的run文件,我们可以看到信息 run文件内容 程序启动的入口, sys.agv所表达的含 ...
-
【Data Structure】-NO.117.DS.1 -【Tree-23树】
[Data Structure]-NO.117.DS.1 -[Tree-23树] Style:Mac Series:Java Since:2018-09-10 End:2018-09-10 Total ...
-
java中四舍五入——double转BigDecimal的精度损失问题
代码: double d = -123456789012345.3426;//5898895455898954895989; NumberFormat nf = new DecimalFormat(& ...
-
Xib给特定view添加手势
步骤1.拖拽手势注意:拖拽到First Responder下方,成功后会出现一个分类Objects(如图 拖拽成功后会多出一个分类Objects ) 步骤2.给需要的view绑定手势控件 拖拽gest ...
-
PL/SQL Developer的安装以及与64位Oracle Database进行连接
本文转载自budongs 一.下载 官网安装包(1106版本)下载链接: plsqldev1106.exe 官网中文语言包(110版本) 下载链接: chinese.exe [中文语言包的使用方法为: ...
-
HDU 4507 吉哥系列故事――恨7不成妻(数位DP+结构体)
题目链接:http://acm.hdu.edu.cn/showproblem.php?pid=4507 题目大意:如果一个整数符合下面3个条件之一,那么我们就说这个整数和7有关 1.整数中某一位是7: ...
-
webdriver元素定位
#1 通过id定位 driver.find_element_by_id("pop_setting_save").click() #2 通过name定位 driver.find_el ...