XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

时间:2023-01-06 01:01:17


XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

文章目录

  • ​​1.引言​​
  • ​​2.调度关系​​
  • ​​3.执行器注册​​
  • ​​3.1.调度中心处理注册请求​​
  • ​​3.2.执行器发起注册请求​​
  • ​​4.执行器注销​​
  • ​​4.1.主动注销​​
  • ​​4.2.被动注销​​
  • ​​5.流程图​​
  • ​​6. 总结​​

1.引言

在前面三篇文章内容中,我们已经获取到了一个XXL-JOB的集群,以及一个可以执行任务的调度器,同时,在实际的项目中可以参照这个流程,引入定时任务。

接下来,我们就可以探索一下调度中心对执行器的上下线感知实现原理,主要包括以下几点:

  • 执行器注册流程
  • 执行器的注销流程
  • 调度中心探活流程

在运行过程中,调度中心要对执行器进行调度,得先获取到执行器的信息,才能根据信息发起调度请求,同时,我们又不希望因调度中心调用到已宕机的执行器而导致程序异常。

于是,​​XXL-JOB​​​在调度中心中,维护了一个注册中心,通过​​xxl_job_registry​​这张表来实现的,调度中心每次发起调度请求时,都会通过这张表中的数据来做负载均衡。那么,只需要做到将活跃的执行器信息注册上去,并在执行器停机或宕机后,将其从注册中心中移除,这样,调度中心就获得了对执行器的上线下感知。

2.调度关系

定时任务是如何被调用的呢?

我们先看一个分层结构图,XXL-JOB的调度关系分为了3层,每层向下进行调度,最上层是调度中心,最下层是定时任务需要执行的方法,调度中心可以调度不同的执行器,执行器再调用归属于自己的定时任务,如下图所示:

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

调度中心在调度执行器时,需要知道执行器的ip和端口号,以此来找到对应的执行器节点来进行调度。而调度中心获取到执行器ip的方式有两种,分别是:自动注册手动录入

一般不会使用手动录入的方式,为什么呢?可以想象一下,在新增、减少了执行器实例,执行器宕机时,都需要手动修改机器地址,意味着需要有人24小时盯着,这是一件很可怕的事。

所以,正常情况下我们都会选择使用自动注册的方式来创建,选择这种方式的话,就需要调度中心与执行器之间建立通信机制,通过网络请求传输注册信息。

注:下面是在后台管理系统中的配置。

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

3.执行器注册

当前版本(2.3.1)的​​XXL-JOB​​​采用的是​​Http​​​通信,而调度中心是通过​​SpringBoot​​​来实现的。实际上,调度中心就是启动了一个​​Tomcat​​​,并提供了执行器注册接口。执行器在启动的时候就会调用这个接口,将自己的​​ip​​,端口等信息传输到调度中心,再由调度中心存入数据库中,这样就完成了执行器注册。

3.1.调度中心处理注册请求

首先,需要调度中心向外暴露的注册接口位置。

​XXL-JOB​​​项目中的的命名还是比较规范的,我们可以在​​xxl-job-admin​​​的​​contoller​​​包中去搜索,很容易找到一个​​api​​​相关的Controller接口​​JobApiController​​​,进入到这个类中。
果然,在这个类里面有一个​​​api​​相关的方法,如下:

/**
* api
*
* @param uri
* @param data
* @return
*/
@RequestMapping("/{uri}")
@ResponseBody
@PermissionLimit(limit=false)
public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {

// valid
if (!"POST".equalsIgnoreCase(request.getMethod())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}

// services mapping
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
//服务注册
return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}

}

这是一个​​RESTFUL​​​接口,包含了三个策略路径,分别是:​​callback​​​,​​registry​​​,​​registryRemove​​​,通过语义,可以大胆的猜测​​registry​​​这条路径就是注册操作,​​registryRemove​​是注销操作,而callback是执行器的执行结果回调(本篇暂不关注回调接口)。

我们通过​​registry​​的路径一路顺藤摸瓜,就找到了实际做注册动作的方法

JobRegistryHelper

// ---------------------- helper ----------------------

public ReturnT<String> registry(RegistryParam registryParam) {

// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}

// async execute
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

// fresh
freshGroupRegistryInfo(registryParam);
}
}
});

return ReturnT.SUCCESS;
}

上图中的就是一个简单的​​saveOrUpdate​​方法,只是这里用异步来做了,很好理解:

  • ​getXxlJobRegistryDao​​​():表示获取​​xxl_job_registry​​​这张表对应的​​dao​​对象。
  • ​registrySave​​:创建一条执行器注册数据。
  • ​registryUpdate​​:更新执行器信息,这个操作是用来维持心跳连接的。

简单的说,就是调度中心会接收执行器的​​registry​​​请求,然后将请求中传入的参数保存到​​xxl_job_registy​​​表中。这就是调度中心运行的执行器注册主流程,一个非常简单的​​CRUD​​。

看完了主流程之后,我们再来看一下细节,可以发现这里的注册代码并不是同步执行的,而是通过一个线程池​​registryOrRemoveThreadPool​​​来进行的异步操作。这里也体现了​​XXL-JOB​​的一个设计思想,即全异步化调用,我们在研究后续原理的时候,还会经常看到这样的用法。

registryOrRemoveThreadPool的创建

​registryOrRemoveThreadPool​​​这个线程池是项目启动时提前创建好的,通过​​Idea​​​的​​usages​​​可以找到,选中代码中的​​registryOrRemoveThreadPool​​​使用快捷键​​alt+F7​​​,可以打开下图所示的界面,找到一个​​new ThreadPoolExecutor()​​的地方,这就创建线程池位置。

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理


这里还可以进一步查看​​XXL-JOB​​​的配置初始化过程,使用​​alt+鼠标左键​​​查看​​start()​​方法的使用位置。

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理


可以看到这里做了各种各样的初始化操作,后续想了解​​XXL-JOB​​中的某个流程的话,就可以以这里的初始化操作为线索,找到对应的代码流程,在后续的源码探索中,还会多次进入这个位置。

3.2.执行器发起注册请求

所谓的执行器,实际上就是一个引入了​​xxl-job-core​​​包的Spring-Web项目,在上一篇的内容中,我们在代码中只写了一个​​@XxlJob​​​注解就完成了一个定时任务方法,就是因为大部分工作都是由​​xxl-job-core​​这个包来完成的,现在我们可以去探索一下,执行器是如何注册到调度中心的。

在上面​​xxl-job-admin​​​中的注册接口吗,在这个接口中使用了一个​​AdminBiz​​接口,进入到这个接口中,找到registry方法,它有两个实现:

  • 一个在​​xxl-job-admin​​中,这是上面已经讲到的调度中心实现注册的方法。
  • 另一个在​​xxl-job-core​​中,这就是执行器发起注册请求的位置。

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

我们可以通过注册请求倒推回去,可以找到一个​​ExecutorRegistryThread​​类

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理


上图红框中的内容展示了是通过​​appName​​​与​​address​​​组成了一个请求参数,然后将这个参数传输到了​​xxl-job-admin​​​中,这就是执行器注册的入口。这里可以注意一下​​while(!toStop)​​​,说明当前的​​registryThread​​​线程会循环调用注册方法,还记得上面的​​registryUpdate​​吗?我们说这个是用来维持心跳连接的,那么心跳请求是多长时间发送一次呢?可以把代码往下拉:

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理


通过​​XXL-JOB​​​的架构,我们已经知道在执行器启动之后,需要调度中心的来做任务调度,而调度中心需要知道执行器的标识以及​​IP​​​地址、端口,才能对指定的执行器发送调度请求。这也是为什么上图中的请求参数中会有​​appName​​​和​​address​​。

既然执行器把地址交给了调度中心,很自然的可以想到,在交出地址之前,执行器会按照这个地址启动一个供调度中心调用的​​web​​服务。

继续往外层跳,可以找到​​web​​​服务的启动代码,这里使用的是​​netty​​。

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

综上,执行器这边的主要流程可以通过一张简图来表示:

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

4.执行器注销

执行器的注销分为主动注销和被动注销两种。

  • 主动注销:顾名思义,就是执行器向调度中心发送注销请求,调度中心接收后把这个执行器的注册信息删除掉。
  • 被动注销:就是执行器以外宕机后,无法正常的向调度中心发送注销请求,由调度中心的探活线程发现了某个执行器已下线,此时将该执行器的注册信息删除掉。

4.1.主动注销

主动注销的发起时机是在Spring容器正常关闭时,​​XXL-JOB​​​的执行器类​​XxlJobSpringExecutor​​​实现了​​DisposableBean​​​接口,这个接口提供了一个​​destory​​方法。

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

在后续的流程中,会停止​​Netty​​​服务,中断探活线程,并向调度中心发送​​removeRegistry​​请求。

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理


stop的状态修改后,这里的探活循环就会停止,进而会调用到下面的registryRemove方法。

调度中心收到请求后,也会通过​​registryOrRemoveThreadPool​​​线程池进行异步处理,最终将​​xxl_job_registry​​中对应的执行器信息删除掉。

4.2.被动注销

调度中心初始化时,会启动一个监控线程​​registryMonitorThread​​​,这个线程每30秒会触发一次探活操作(即每循环一次​​sleep 30​​​秒),探活操作触发时会查询​​xxl_job_registry​​​表中的数据,将​​update_time​​​与当前时间的差值大于​​90s​​的数据查询出来,将这部分数据删掉掉。

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

把sleep的时间差也考虑进去的话,就是执行器在最多120秒内都没有发送新的注册请求来维持心跳的话,这个执行器就会被调度中心注销掉。

心跳是怎么维持的呢?

看了上面执行器发起注册的流程,大概也能猜到了,执行器里面的​​registryThread​​​每30秒会调用一次调度中心的注册接口,调度中心收到请求后,更新​​update-time​​的值。

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

5.流程图

经过上面的探索,我们已经了解了执行器的注册与注销的流程,下面是这整个流程的流程图。

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

6. 总结

本篇内容主要是在探索执行器注册到调度中心的流程以及代码实现,流程如下:

  1. 调度中心启动了一个​​Tomcat​​​作为​​Web​​容器,暴露出注册与注销的接口,可以供执行器调用。
  2. 执行器在启动Netty服务暴露出调度接口后,将自己的​​name​​​、​​ip​​​、端口信息通过调度中心的注册接口传输到调度中心,同时每​​30​​秒会调用一次注册接口,用于更新注册信息。
  3. 同理,在执行器停止的时候,也会请求调度中心的注销接口,进行注销。
  4. 调度中心在接收到注册或注销请求后,会操作xxl_job_registry表,新增或删除执行器的注册信息。
  5. 调度中心会启动一个探活线程,将90秒都没有更新注册信息的执行器删除掉。

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

XXL-JOB分布式任务调度框架(四)-源码分析-调度中心对执行器的上下线感知实现原理

由于本篇只是在探索注册与发现的流程,所以忽略在这个流程中还涉及到的任务调度与回调相关的逻辑,这部分逻辑将在下一篇调度流程原理分析中讲到。