分布式调度框架TBSchedule源码解析

时间:2021-12-05 17:19:06

基于周大神《详解应对平台高并发的分布式调度框架TBSchedule》的阅读和对源码的解析,整体上对tbschedule设计思路有了更深刻的认识,希望与TBSchedule热爱者一起交流、讨论。

要充分理解tbschedule设计思路,需重点理解zookeeper在此调度框架中的作用,zookeeper主要存储调度任务策略、调度任务类型和调度管理器配置信息,各分布式任务调度运行信息统一存储在zookeeper相应节点中,其目录结构梳理如下:

分布式调度框架TBSchedule源码解析

Tbschedule启动初始化过程可参照《详解应对平台高并发的分布式调度框架TBSchedule》,周大神已将启动初始化过程描述的非常清楚,这里不作过多介绍,这里重点介绍三个重要的类:

类名

执行间隔

作用

ManagerFactoryTimerTask

2秒

定时分配线程组,创建TBScheduleManagerStatic

初始化运行信息,将所有taskitem清除,重新创建任务项,生成taskitem目录,不负责分配任务项

HeartBeatTimerTask

调度任务心跳周期

定时分配任务项

TBScheduleManagerStatic

生成调度处理器TBScheduleProcessor,根据开始时间、结束时间定时执行。

1、ManagerFactoryTimerTask(2秒定时执行)

该定时任务每隔两秒定时执行,首先重新注册managerFactory,并根据strategyName下的TBScheduleManagerFactory数量和assignNum(调度策略所规定的最大线程组)数量,重新分配每个managerFactory可获取多少个线程组,其中managerMap变量以strategyName为键值记录实例化的TBScheduleManagerStatic实例,重新分配之后,如果TBScheduleManagerStatic实例不足,则创建新的TBScheduleManagerStatic实例,每个TBScheduleManagerStatic实例会创建调度服务器scheduleServer和调度处理器TBScheduleProcessor, 每个scheduleServer根据调度任务的心跳周期创建HeartBeatTimrTask定时任务。其时序图如下:

分布式调度框架TBSchedule源码解析

2、HeartBeatTimerTask(以调度任务心跳周期为执行频率)

HeartBeatTimerTask类首先定时发送心跳周期,对于server不同的调度服务器,zookeeper删除其目录,如果当前server为leader,则对任务项重新分配,计算现调度服务器server数量,再根据taskitem任务项数据量,重新分配各个taskitem任务项所执行的cur_server,其分配原则如下:

(1)如果taskitem中cur_server为空,则去serverlsit中取值,赋值给taskitem的cur_server,req_server赋值为null。

(2)如果taskitem中cur_server不为空、且为当前调度服务器server名称,并且reqServerValue=null时,不做任何修改。

(3)其他情况下,只给req_server赋值为serverlsit所取值。

HeartBeatTimerTask其时序图如下:

分布式调度框架TBSchedule源码解析

3、TBScheduleManagerStatic 

该类实例化调度服务器scheduleServer,并创建HeartBeatTimerTask定时发送心跳信息,并根据scheduleServer数量、taskitem数量重新分配;通过getCurrentScheduleTaskItemListNow获取当前scheduleServer的任务项,解析cron表达式,计算下次开始时间、下次结束时间,执行PauseOrResumeScheduleTask实现定时调度,到达执行开始时间,则调用resume()方法,根据调度任务线程数threadNum,生成多线程调度处理器BScheduleProcessorSleep,其中taskList存储dealBean中的selectTask()方法返回值,其中必须保持一个线程处于未休眠状态,用于调用dealBean的select()方法加载数据,如果taskList不为空,唤醒其他休眠线程,调用execute()方法处理数据。PauseOrResumeScheduleTask利用java类库中的Timer实现定时调度,其过程主要如下:

分布式调度框架TBSchedule源码解析

其中:当treadNum>1时,selectTask()方法为单线程,execute()方法为treadNum-1个线程运行。

定时调度时根据执行开始时间和执行结束时间综合判断,具体如下:

到达开始执行时间后,生成TBScheduleProcessor处理器,根据线程数据,启动线程,当一直存在executeTask时,循环调用,当当前线程获取不到executeTask时,线程休眠,当时剩下最后一个线程时禁止休眠,调用selectTask()方法加载task,如果selectTask()返回值不为空则唤醒其他线程进行处理;如果selectTask()返回值为空,判断是否退出调度,退出调度判断准则如下:
1、在前调度服务scheduleServer所获取taskitem列表大于零且执行开始时间不为空的情况,
(1)如果执行结束时间为空(-1)则退出调度;
(2)如果执行结束时间不为空,则不退出调度;
2、其他情况下(当前ScheduleServer未获取到任务项或者执行开始时间为空),不退出调度;

总结TBSchedule执行开始时间和结束时间执行方案如下:

执行开始时间

执行结束时间

加载数据(selectTasks返回值)

是否退出调度

调度区间

非空

非空

执行开始时间至执行结束时间

非空

非空

 

 

执行开始时间至加载数据时间

非空

从执行开始时间一直运行

非空

立即运行至执行结束时间

非空

立即运行至执行结束时间

立即运行至加载数据时间

非空

一直运行


参考博文:

  • 周立伟-详解应对平台高并发的分布式调度框架TBSchedule: http://geek.csdn.net/news/detail/65738