定时/延时任务-万字解析Spring定时任务原理

时间:2024-12-18 17:32:40

文章目录

  • 1. 概要
  • 2. @EnableScheduling 注解
  • 3. @Scheduled 注解
  • 4. postProcessAfterInitialization 解析
    • 4.1 createRunnable
  • 5. 任务 Task 和子类
  • 6. ScheduledTaskRegistrar
    • 6.1 添加任务的逻辑
    • 6.2 调度器初始化
    • 6.3 调用时机
  • 7. taskScheduler 类型
    • 7.1 ConcurrentTaskScheduler
    • 7.2 ThreadPoolTaskScheduler
  • 8. 小结

1. 概要

上一篇文章:定时/延时任务-Spring定时任务的两种实现方式。这篇文章就来看下 Spring 中 @Scheduled 和接口方式的定时任务是如何实现的。

2. @EnableScheduling 注解

Spring 中如果需要使用定时任务,就需要引入 @EnableScheduling,我们看下这个注解是怎么定义的。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

在这个注解中关键就是:@Import(SchedulingConfiguration.class),这里面通过 Import 引入了 SchedulingConfiguration 配置类。Import 是 Spring 中定义的一种引入配置类的方式,通过 Import 注解可以把对应的类交给 Spring 管理,达到动态开关配置的目的。然后我们再来看下引入的这个注解配置类。

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

	@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
		return new ScheduledAnnotationBeanPostProcessor();
	}

}

可以看到这个配置类代码比较简单,就是注册了一个 ScheduledAnnotationBeanPostProcessor 后置处理器。后置处理器是 Spring 中用于在 bean 初始化之后调用来处理 bean 的方法。对于 @Scheduled 和 @Schedules 注解解析的核心逻辑就在 postProcessAfterInitialization 中。但是在这之前,我们看下 @Scheduled 注解的属性。

3. @Scheduled 注解

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {

	/**
	 * 是否禁用 cron 表达式,如果设置成 '-' 就表示不使用 cron 表达式
	 */
	String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;


	/**
	 * cron 表达式
	 * @return
	 */
	String cron() default "";

	/**
	 * 时区
	 * @return
	 */
	String zone() default "";

	/**
	 * 固定延时任务的延时时间
	 * @return
	 */
	long fixedDelay() default -1;

	/**
	 * 固定延时任务的延时时间字符串,可动态配置
	 * @return
	 */
	String fixedDelayString() default "";

	/**
	 * 固定速率任务的延时时间
	 * @return
	 */
	long fixedRate() default -1;

	/**
	 * 固定速率任务的延时时间字符串,可动态配置
	 * @return
	 */
	String fixedRateString() default "";

	/**
	 * 第一次执行任务之前延迟多少秒
	 * @return
	 */
	long initialDelay() default -1;

	/**
	 * 第一次执行任务之前延迟多少秒,字符串,可动态配置
	 * @return
	 */
	String initialDelayString() default "";

	/**
	 * 时间单位,默认是毫秒
	 * @return
	 */
	TimeUnit timeUnit() default TimeUnit.MILLISECONDS;

}

@Scheduled 定义了几种定时任务的实现方式

  1. cron 表达式任务
  2. 固定延时任务 fixedDelay
  3. 固定速率任务 fixedRate

然后我们再看下 postProcessAfterInitialization 是如何处理上面这几种方法的。


4. postProcessAfterInitialization 解析

/**
 * 初始化之后回回调后置处理器处理定时任务
 * @param bean the new bean instance
 * @param beanName the name of the bean
 * @return
 */
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
	if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
			bean instanceof ScheduledExecutorService) {
		// 注册的 bean 是上面类型的就不处理
		// Ignore AOP infrastructure such as scoped proxies.
		return bean;
	}
	// 获取实际要处理的 bean 的类型
	Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
	// 1.如果这个目标类还没有被处理过
	// 2.这个类能不能被 Scheduled 或者 Schedules 注解处理(如果这个类是以 java. 开头或者是 Ordered 类,就不可以)
	if (!this.nonAnnotatedClasses.contains(targetClass) &&
			AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
		// 从目标类中获取有 Scheduled 或者 Schedules 注解的方法
		Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
				(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
					Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
							method, Scheduled.class, Schedules.class);
					return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
				});
		// 如果找不到方法
		if (annotatedMethods.isEmpty()) {
			// 加入 nonAnnotatedClasses 集合中
			this.nonAnnotatedClasses.add(targetClass);
			if (logger.isTraceEnabled()) {
				logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
			}
		}
		else {
			// 找到了包含特定注解的方法
			annotatedMethods.forEach((method, scheduledAnnotations) ->
					// 遍历来处理所有的方法
					scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
			if (logger.isTraceEnabled()) {
				logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
						"': " + annotatedMethods);
			}
		}
	}
	return bean;
}

当 ScheduledAnnotationBeanPostProcessor 初始化完成之后,调用 postProcessAfterInitialization 来处理相关的注解,下面来看下具体逻辑。

if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
	bean instanceof ScheduledExecutorService) {
	// 注册的 bean 是上面类型的就不处理
	// Ignore AOP infrastructure such as scoped proxies.
	return bean;
}

如果 bean 的类型是 上面三个类型的,就不处理。

// 获取实际要处理的 bean 的类型
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
// 1.如果这个目标类还没有被处理过
// 2.这个类能不能被 Scheduled 或者 Schedules 注解处理(如果这个类是以 java. 开头或者是 Ordered 类,就不可以)
if (!this.nonAnnotatedClasses.contains(targetClass) &&
		AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
	// 从目标类中获取有 Scheduled 或者 Schedules 注解的方法
	Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
			(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
				Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
						method, Scheduled.class, Schedules.class);
				return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
			});
	// 如果找不到方法
	if (annotatedMethods.isEmpty()) {
		// 加入 nonAnnotatedClasses 集合中
		this.nonAnnotatedClasses.add(targetClass);
		if (logger.isTraceEnabled()) {
			logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
		}
	}
	else {
		// 找到了包含特定注解的方法
		annotatedMethods.forEach((method, scheduledAnnotations) ->
				// 遍历来处理所有的方法
				scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
		if (logger.isTraceEnabled()) {
			logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
					"': " + annotatedMethods);
		}
	}
}
return bean;

然后获取下 bean 的类型,并且判断下这个类型能不能被 Scheduled 或者 Schedules 注解处理,如果这个类是以 java. 开头或者是 Ordered 类,就不可以。最后遍历这些标记了上面两个注解的方法,一个一个处理,处理的逻辑是 processScheduled,看下 processScheduled 的逻辑

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
	// 根据调用的对象和调用的方法创建一个任务
	Runnable runnable = createRunnable(bean, method);
	boolean processedSchedule = false;
	String errorMessage =
			"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
	
	Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
	...
}

这个方法中首先会创建一个任务,然后设置几个属性。

// 把 @Scheduled 注解的 initialDelay 属性转化成毫秒,initialDelay 是指延时多少秒进行第一次执行
long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());
// @Scheduled 注解的 initialDelayString 属性,作用和上面的 initialDelay 作用一样
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
	// 如果指定了 initialDelayString,那么就不能指定 initialDelay 了
	// 同时指定 initialDelay 会报错
	Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
	if (this.embeddedValueResolver != null) {
		// 因为 @Scheduled 注解的几个 String 类型的值都可以通过配置文件引入,也就是 ${} 的方式
		// 这个方法就是去解析动态配置值的
		initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
	}
	// 如果配置了
	if (StringUtils.hasLength(initialDelayString)) {
		try {
			// 转换成毫秒单位
			initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());
		}
		catch (RuntimeException ex) {
			throw new IllegalArgumentException(
					"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
		}
	}
}

上面就是解析 @Scheduled 中 initialDelayinitialDelayString 的逻辑。

  1. 首先把 initialDelay 属性转化成毫秒
  2. 然后再解析 initialDelayString,需要注意的是 initialDelay initialDelayString 只能选一个,如果两个都填就会报错
  3. this.embeddedValueResolver.resolveStringValue 是解析动态配置的逻辑,因为 initialDelayString 可以使用 ${} 动态配置
  4. 最后都转化成 initialDelay 毫秒

上面解析的 initialDelayinitialDelayString 表示延时多少 ms 再执行第一次任务。下面再来看下 cron 表达式的解析,这是第一种定时任务的配置方式。

// 获取 cron 表达式
String cron = scheduled.cron();
// 如果设置了 cron 表达式
if (StringUtils.hasText(cron)) {
	// 获取指定的时区
	String zone = scheduled.zone();
	// 同时也是去解析 cron 表达式和时区的动态值
	if (this.embeddedValueResolver != null) {
		cron = this.embeddedValueResolver.resolveStringValue(cron);
		zone = this.embeddedValueResolver.resolveStringValue(zone);
	}
	// 如果设置了 cron 表达式
	if (StringUtils.hasLength(cron)) {
		// 如果设置了 cron 表达式,那么就不能设置 initialDelay 了
		Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
		// 设置下标记,表示已经设置 cron 表达式了
		processedSchedule = true;
		// 如果 cron 表达式没有被设置成 '-' 了,就代表用 cron 去触发
		if (!Scheduled.CRON_DISABLED.equals(cron)) {
			// 创建 cron 触发器
			CronTrigger trigger;
			if (StringUtils.hasText(zone)) {
				// 设置时区和 cron 表达式
				trigger = new CronTrigger(cron, StringUtils.parseTimeZoneString(zone));
			}
			else {
				// 不需要设置时区,设置 cron 表达式
				trigger = new CronTrigger(cron);
			}
			// 将创建的 ScheduledTask 加入任务集合中
			// 使用 ScheduledTaskRegistrar 创建一个 ScheduledTask,同时需要传入触发器,这个触发器里面需要传入 cron 表达式和时区
			tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, trigger)));
		}
	}
}
  1. 解析 cron 表达式,先获取下时区
  2. 然后解析 cron 表达式和时区的动态值
  3. 如果设置了 cron 表达式,那么就不能设置 initialDelay 了,需要依靠 cron 表达式来执行定时任务
  4. processedSchedule 标记位是代表有没有设置了定时任务的调度方式,这里如果使用 cron 表达式来调度任务,processedSchedule 就会设置为 true
  5. 检查下 cron 有没有被设置成 -,这个标记代表禁用 cron 表达式,如果没有设置为 -,就通过 cron 和时区创建一个 CronTrigger,这是 cron 触发器,在这个触发器里面可以获取表达式和时区等信息,同时也可以获取 cron 下一次执行的时间
  6. 新建一个 ScheduledTask,把这个任务添加到任务集合中,这个是任务的统一包装,里面可以对 CronTaskFixedDelayTaskFixedRateTask 进行包装

上面就是解析 cron 表达式的逻辑,下面继续看解析@Scheduled 注解的 fixedDelay 字段逻辑,这个字段就是固定延时任务。

// 下面检查 @Scheduled 注解的 fixedDelay 字段,这个字段就是固定延时任务
long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());
if (fixedDelay >= 0) {
	// 如果前面 cron 已经设置了,那么这里就不能设置 fixedDelay 了
	Assert.isTrue(!processedSchedule, errorMessage);
	// 设置标记
	processedSchedule = true;
	// 同样添加任务,不过这里是创建一个 FixedDelayTask,表示固定延时的任务
	tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
// 看下有没有设置 fixedDelayString
String fixedDelayString = scheduled.fixedDelayString();
// 如果设置 fixedDelayString 了
if (StringUtils.hasText(fixedDelayString)) {
	if (this.embeddedValueResolver != null) {
		// 解析动态字符串
		fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
	}
	if (StringUtils.hasLength(fixedDelayString)) {
		// 如果前面没有设置 cron 和 fixDelay
		Assert.isTrue(!processedSchedule, errorMessage);
		// 设置标记,表示已解析
		processedSchedule = true;
		try {
			// 转化成毫秒
			fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());
		}
		catch (RuntimeException ex) {
			throw new IllegalArgumentException(
					"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
		}
		// 同样添加任务,不过这里是创建一个 FixedDelayTask,表示固定延时的任务
		tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
	}
}
  1. 首先检测 @Scheduled 注解的 fixedDelay,转化成毫秒
    • 判断下 processedSchedule 字段,如果前面已经设置 cron了,那么这里就会抛出异常,也就是说如果设置了 cron 表达式的调度方式,这里就不能设置 fixedDelay 了
    • 创建一个 ScheduledTask 添加到集合里面,这个任务包装类包装的是 FixedDelayTask,里面设置了初始延时时间和固定速率
  2. 然后继续检测 @Scheduled 注解的 fixedDelayString
    • 解析动态字符串
    • 判断下 processedSchedule 字段,如果前面已经设置 cron 或者设置了 fixedDelay 了,那么这里就会抛出异常,也就是说如果设置了 cron 表达式的调度方式或者设置了 fixedDelay 字段,这里就不能设置 fixedDelayString 了
    • 创建一个 ScheduledTask 添加到集合里面,这个任务包装类包装的是 FixedDelayTask,里面设置了初始延时时间和固定速率

上面就是解析固定速率的逻辑,下面继续看解析@Scheduled 注解的 fixedRate 字段逻辑,这个字段就是固定速率任务。

// 下面检查 @Scheduled 注解的 fixedRate 字段,这个字段就是固定速率任务
long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());
if (fixedRate >= 0) {
	// 如果设置了 fixedRate,同时前面几种方式都没有设置
	Assert.isTrue(!processedSchedule, errorMessage);
	// 设置标记
	processedSchedule = true;
	// 同样添加任务,不过这里是创建一个 FixedRateTask,表示固定延时的任务
	tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
// 继续检测 fixedRateString
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
	if (this.embeddedValueResolver != null) {
		// 解析动态表达式
		fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
	}
	if (StringUtils.hasLength(fixedRateString)) {
		// 如果前面几种方式已经设置过了,这里就抛出异常
		Assert.isTrue(!processedSchedule, errorMessage);
		// 设置标记位
		processedSchedule = true;
		try {
			// 把 fixedRateString 转化成毫秒
			fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());
		}
		catch (RuntimeException ex) {
			throw new IllegalArgumentException(
					"Invalid fixedRateString value \"" + fixedRateString + "\" - ca