文章目录
- 1. 概要
- 2. @EnableScheduling 注解
- 3. @Scheduled 注解
- 4. postProcessAfterInitialization 解析
- 4.1 createRunnable
- 5. 任务 Task 和子类
- 6. ScheduledTaskRegistrar
- 6.1 添加任务的逻辑
- 6.2 调度器初始化
- 6.3 调用时机
- 7. taskScheduler 类型
- 7.1 ConcurrentTaskScheduler
- 7.2 ThreadPoolTaskScheduler
- 8. 小结
1. 概要
上一篇文章:定时/延时任务-Spring定时任务的两种实现方式。这篇文章就来看下 Spring 中 @Scheduled 和接口方式的定时任务是如何实现的。
2. @EnableScheduling 注解
Spring 中如果需要使用定时任务,就需要引入 @EnableScheduling,我们看下这个注解是怎么定义的。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
在这个注解中关键就是:@Import(SchedulingConfiguration.class)
,这里面通过 Import 引入了 SchedulingConfiguration 配置类。Import
是 Spring 中定义的一种引入配置类的方式,通过 Import 注解可以把对应的类交给 Spring 管理,达到动态开关配置的目的。然后我们再来看下引入的这个注解配置类。
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
可以看到这个配置类代码比较简单,就是注册了一个 ScheduledAnnotationBeanPostProcessor
后置处理器。后置处理器是 Spring 中用于在 bean 初始化之后调用来处理 bean 的方法。对于 @Scheduled 和 @Schedules 注解解析的核心逻辑就在 postProcessAfterInitialization
中。但是在这之前,我们看下 @Scheduled 注解的属性。
3. @Scheduled 注解
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
/**
* 是否禁用 cron 表达式,如果设置成 '-' 就表示不使用 cron 表达式
*/
String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;
/**
* cron 表达式
* @return
*/
String cron() default "";
/**
* 时区
* @return
*/
String zone() default "";
/**
* 固定延时任务的延时时间
* @return
*/
long fixedDelay() default -1;
/**
* 固定延时任务的延时时间字符串,可动态配置
* @return
*/
String fixedDelayString() default "";
/**
* 固定速率任务的延时时间
* @return
*/
long fixedRate() default -1;
/**
* 固定速率任务的延时时间字符串,可动态配置
* @return
*/
String fixedRateString() default "";
/**
* 第一次执行任务之前延迟多少秒
* @return
*/
long initialDelay() default -1;
/**
* 第一次执行任务之前延迟多少秒,字符串,可动态配置
* @return
*/
String initialDelayString() default "";
/**
* 时间单位,默认是毫秒
* @return
*/
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
@Scheduled 定义了几种定时任务的实现方式
- cron 表达式任务
- 固定延时任务 fixedDelay
- 固定速率任务 fixedRate
然后我们再看下 postProcessAfterInitialization
是如何处理上面这几种方法的。
4. postProcessAfterInitialization 解析
/**
* 初始化之后回回调后置处理器处理定时任务
* @param bean the new bean instance
* @param beanName the name of the bean
* @return
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// 注册的 bean 是上面类型的就不处理
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
// 获取实际要处理的 bean 的类型
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
// 1.如果这个目标类还没有被处理过
// 2.这个类能不能被 Scheduled 或者 Schedules 注解处理(如果这个类是以 java. 开头或者是 Ordered 类,就不可以)
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
// 从目标类中获取有 Scheduled 或者 Schedules 注解的方法
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
});
// 如果找不到方法
if (annotatedMethods.isEmpty()) {
// 加入 nonAnnotatedClasses 集合中
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// 找到了包含特定注解的方法
annotatedMethods.forEach((method, scheduledAnnotations) ->
// 遍历来处理所有的方法
scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
当 ScheduledAnnotationBeanPostProcessor 初始化完成之后,调用 postProcessAfterInitialization
来处理相关的注解,下面来看下具体逻辑。
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// 注册的 bean 是上面类型的就不处理
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
如果 bean 的类型是 上面三个类型的,就不处理。
// 获取实际要处理的 bean 的类型
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
// 1.如果这个目标类还没有被处理过
// 2.这个类能不能被 Scheduled 或者 Schedules 注解处理(如果这个类是以 java. 开头或者是 Ordered 类,就不可以)
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
// 从目标类中获取有 Scheduled 或者 Schedules 注解的方法
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
});
// 如果找不到方法
if (annotatedMethods.isEmpty()) {
// 加入 nonAnnotatedClasses 集合中
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// 找到了包含特定注解的方法
annotatedMethods.forEach((method, scheduledAnnotations) ->
// 遍历来处理所有的方法
scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
然后获取下 bean 的类型,并且判断下这个类型能不能被 Scheduled 或者 Schedules 注解处理,如果这个类是以 java. 开头或者是 Ordered 类,就不可以。最后遍历这些标记了上面两个注解的方法,一个一个处理,处理的逻辑是 processScheduled
,看下 processScheduled
的逻辑
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
// 根据调用的对象和调用的方法创建一个任务
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
...
}
这个方法中首先会创建一个任务,然后设置几个属性。
// 把 @Scheduled 注解的 initialDelay 属性转化成毫秒,initialDelay 是指延时多少秒进行第一次执行
long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());
// @Scheduled 注解的 initialDelayString 属性,作用和上面的 initialDelay 作用一样
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
// 如果指定了 initialDelayString,那么就不能指定 initialDelay 了
// 同时指定 initialDelay 会报错
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
// 因为 @Scheduled 注解的几个 String 类型的值都可以通过配置文件引入,也就是 ${} 的方式
// 这个方法就是去解析动态配置值的
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
// 如果配置了
if (StringUtils.hasLength(initialDelayString)) {
try {
// 转换成毫秒单位
initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
上面就是解析 @Scheduled 中 initialDelay
和 initialDelayString
的逻辑。
- 首先把 initialDelay 属性转化成毫秒
- 然后再解析 initialDelayString,需要注意的是
initialDelay
和initialDelayString
只能选一个,如果两个都填就会报错 -
this.embeddedValueResolver.resolveStringValue
是解析动态配置的逻辑,因为 initialDelayString 可以使用 ${} 动态配置 - 最后都转化成
initialDelay
毫秒
上面解析的 initialDelay
和 initialDelayString
表示延时多少 ms 再执行第一次任务。下面再来看下 cron
表达式的解析,这是第一种定时任务的配置方式。
// 获取 cron 表达式
String cron = scheduled.cron();
// 如果设置了 cron 表达式
if (StringUtils.hasText(cron)) {
// 获取指定的时区
String zone = scheduled.zone();
// 同时也是去解析 cron 表达式和时区的动态值
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
// 如果设置了 cron 表达式
if (StringUtils.hasLength(cron)) {
// 如果设置了 cron 表达式,那么就不能设置 initialDelay 了
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
// 设置下标记,表示已经设置 cron 表达式了
processedSchedule = true;
// 如果 cron 表达式没有被设置成 '-' 了,就代表用 cron 去触发
if (!Scheduled.CRON_DISABLED.equals(cron)) {
// 创建 cron 触发器
CronTrigger trigger;
if (StringUtils.hasText(zone)) {
// 设置时区和 cron 表达式
trigger = new CronTrigger(cron, StringUtils.parseTimeZoneString(zone));
}
else {
// 不需要设置时区,设置 cron 表达式
trigger = new CronTrigger(cron);
}
// 将创建的 ScheduledTask 加入任务集合中
// 使用 ScheduledTaskRegistrar 创建一个 ScheduledTask,同时需要传入触发器,这个触发器里面需要传入 cron 表达式和时区
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, trigger)));
}
}
}
- 解析 cron 表达式,先获取下时区
- 然后解析 cron 表达式和时区的动态值
- 如果设置了 cron 表达式,那么就不能设置 initialDelay 了,需要依靠 cron 表达式来执行定时任务
-
processedSchedule
标记位是代表有没有设置了定时任务的调度方式,这里如果使用 cron 表达式来调度任务,processedSchedule
就会设置为 true - 检查下 cron 有没有被设置成
-
,这个标记代表禁用 cron 表达式,如果没有设置为-
,就通过 cron 和时区创建一个CronTrigger
,这是 cron 触发器,在这个触发器里面可以获取表达式和时区等信息,同时也可以获取 cron 下一次执行的时间 - 新建一个
ScheduledTask
,把这个任务添加到任务集合中,这个是任务的统一包装,里面可以对CronTask
、FixedDelayTask
、FixedRateTask
进行包装
上面就是解析 cron 表达式的逻辑,下面继续看解析@Scheduled 注解的 fixedDelay 字段逻辑,这个字段就是固定延时任务。
// 下面检查 @Scheduled 注解的 fixedDelay 字段,这个字段就是固定延时任务
long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());
if (fixedDelay >= 0) {
// 如果前面 cron 已经设置了,那么这里就不能设置 fixedDelay 了
Assert.isTrue(!processedSchedule, errorMessage);
// 设置标记
processedSchedule = true;
// 同样添加任务,不过这里是创建一个 FixedDelayTask,表示固定延时的任务
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
// 看下有没有设置 fixedDelayString
String fixedDelayString = scheduled.fixedDelayString();
// 如果设置 fixedDelayString 了
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
// 解析动态字符串
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
// 如果前面没有设置 cron 和 fixDelay
Assert.isTrue(!processedSchedule, errorMessage);
// 设置标记,表示已解析
processedSchedule = true;
try {
// 转化成毫秒
fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
// 同样添加任务,不过这里是创建一个 FixedDelayTask,表示固定延时的任务
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
- 首先检测 @Scheduled 注解的 fixedDelay,转化成毫秒
- 判断下
processedSchedule
字段,如果前面已经设置 cron了,那么这里就会抛出异常,也就是说如果设置了 cron 表达式的调度方式,这里就不能设置 fixedDelay 了 - 创建一个 ScheduledTask 添加到集合里面,这个任务包装类包装的是 FixedDelayTask,里面设置了初始延时时间和固定速率
- 判断下
- 然后继续检测 @Scheduled 注解的 fixedDelayString
- 解析动态字符串
- 判断下
processedSchedule
字段,如果前面已经设置 cron 或者设置了 fixedDelay 了,那么这里就会抛出异常,也就是说如果设置了 cron 表达式的调度方式或者设置了 fixedDelay 字段,这里就不能设置 fixedDelayString 了 - 创建一个 ScheduledTask 添加到集合里面,这个任务包装类包装的是 FixedDelayTask,里面设置了初始延时时间和固定速率
上面就是解析固定速率的逻辑,下面继续看解析@Scheduled 注解的 fixedRate 字段逻辑,这个字段就是固定速率任务。
// 下面检查 @Scheduled 注解的 fixedRate 字段,这个字段就是固定速率任务
long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());
if (fixedRate >= 0) {
// 如果设置了 fixedRate,同时前面几种方式都没有设置
Assert.isTrue(!processedSchedule, errorMessage);
// 设置标记
processedSchedule = true;
// 同样添加任务,不过这里是创建一个 FixedRateTask,表示固定延时的任务
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
// 继续检测 fixedRateString
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
// 解析动态表达式
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
// 如果前面几种方式已经设置过了,这里就抛出异常
Assert.isTrue(!processedSchedule, errorMessage);
// 设置标记位
processedSchedule = true;
try {
// 把 fixedRateString 转化成毫秒
fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - ca