注解方式:
核心类摘要:
1.ScheduledAnnotationBeanPostProcessor
2.ScheduledTaskRegistrar
3.TaskScheduler
4.ReschedulingRunnable
具体说明:
1.ScheduledAnnotationBeanPostProcessor
(1)核心方法:Object postProcessAfterInitialization(final Object bean, String beanName)
功能:负责@Schedule注解的扫描,构建ScheduleTask
(2)核心方法:onApplicationEvent(ContextRefreshedEvent event)
功能:spring容器加载完毕之后调用,ScheduleTask向ScheduledTaskRegistrar中注册, 调用ScheduledTaskRegistrar.afterPropertiesSet()
2.ScheduledTaskRegistrar
(1)核心方法:void afterPropertiesSet()
功能:初始化所有定时器,启动定时器
3.TaskScheduler
主要的实现类有三个 ThreadPoolTaskScheduler, ConcurrentTaskScheduler,TimerManagerTaskScheduler
作用:这些类的作用主要是将task和executor用ReschedulingRunnable包装起来进行生命周期管理。
(1)核心方法:ScheduledFuture schedule(Runnable task, Trigger trigger)
4.ReschedulingRunnable
(1)核心方法:public ScheduledFuture schedule()
(2)核心方法:public void run()
public ScheduledFuture schedule() {
synchronized (this.triggerContextMonitor) {
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
@Override
public void run() {
Date actualExecutionTime = new Date();
super.run();
Date completionTime = new Date();
synchronized (this.triggerContextMonitor) {
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
}
if (!this.currentFuture.isCancelled()) {
schedule();
}
}
通过schedule方法及run方法互相调用,再利用ScheduledExecutorService接口的schedule(Runnable command,long delay,TimeUnit unit)单次执行效果,从而实现一定时间重复触发的效果。
配置文件的方式基本相似只是通过ScheduledTasksBeanDefinitionParser类读取节点组装对应定时任务bean
Spring定时器的实现
ScheduledThreadPoolExecutor类
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
delayedExecute(t);//延迟执行
return t;
}
//执行到delaydExecute方法
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);//把任务加入到执行队列中
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();//任务未取消则调用
}
}
任务未取消则调用ensurePrestart(),ensurePrestart方法中调用了addWorker()方法,addWorker()方法中创建执行任务的Woker并且调用woker的run方法,调用runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法中的任务通过调用了ThreadPoolExecutor类中的getTask方法获得
getTask()方法中调用了ScheduledThreadPoolExecutor内部类DelayedWorkQueue重写的take方法或poll方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//实现延迟执行,根据延迟时间等待直到执行时间返回RunnableScheduledFuture
for (; ; ) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}