Quartz学习--二 Hello Quartz! 和源码分析
三. Hello Quartz!
我会跟着 第一章 6.2 的图来 进行同步代码编写
-
简单入门示例:
-
创建一个新的java普通工程 引入对应版本jar包:
jar包 maven地址为:
<!-- Quartz jar 包 2.2.1 版本 -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.2.1</version>
</dependency>
<!-- 若你使用的框架是SpringBoot 需要引入:
这里只是个示例简单程序 所以只需要导入上面两个的依赖就可
-->
<!--<dependency>-->
<!--<groupId>org.springframework</groupId>-->
<!--<artifactId>spring-context-support</artifactId>-->
<!--<version>4.1.6.RELEASE</version>-->
<!--</dependency>--> -
自创建任务 (Job)
仅仅需要对 org.quartz.Job 接口进行实现 将来调度器会执行我们重写的execute()方法
package com.ws.quartzdemo1001.job01_HelloWorld;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 实现 quartz 对使用人员开放的 Job接口
*/
public class HelloJob implements Job {
private static Logger log = LoggerFactory.getLogger(HelloJob.class);
@Override
public void execute(JobExecutionContext jobExecutionContext)
throws JobExecutionException {
log.info("Hello Quartz - Job");
}
} -
编写使用Quartz的代码
package com.ws.quartzdemo1001.job01_HelloWorld;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
public class HelloQuartz {
private static Logger logger = LoggerFactory.getLogger(HelloQuartz.class);
public static void main(String[] args) throws SchedulerException {
// 1 创建 Scheduler 的工厂
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
// 2 从工厂中获取调度器 的实例
Scheduler scheduler = schedulerFactory.getScheduler();
// 3 创建JobDetail
JobDetail jobDetail = JobBuilder.newJob(HelloJob.class)
.withDescription("this is my first job01_HelloWorld ") // 设置job相关描述
.withIdentity("hello job01_HelloWorld" ,"normal job01_HelloWorld") // 设置任务 名称和组名
.build(); //创建 JobDetail
// 4 创建 trigger
CronTrigger trigger = TriggerBuilder.newTrigger()
.withDescription("this is my first trigger") //设置 trigger 相关描述
.withIdentity("say hello trigger", "cron trigger") //设置 当前触发其 名字 和归属组名
.startAt(new Date()) // 设置任务启动时间
.withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?"))
.build();
// 5 将 job01_HelloWorld 和 trigger 绑定 并注册到 调度器
scheduler.scheduleJob(jobDetail,trigger);
// 6 启动 调度器
scheduler.start();
logger.info(new Date() +" <<<<<< 启动");
}
}主要编写了:
创建了Scheduler 工厂
从工厂中获取调度器的实例
使用自己实现了Job接口的类 来创建 JobDetail
创建触发器 并指定触发规则
将JobDetail 和触发器进行绑定放入 调度器中 (或者说注册到scheduler)
启动调度器
执行结果:
-
入门示例 (追根溯源)
-
作为一个简单的应用程序 我们使用起来给我最大的感触就是 每个主要实例都有一个特定的创建方法
比如 调度器实例就是依靠 Scheduler工厂而创建出来为我们使用
JobDetail 和Trigger 亦是如此
-
HelloJob代码分析:
接口: org.quartz.Job
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
package org.quartz;
public interface Job {
void execute(JobExecutionContext context) throws JobExecutionException;
}
一个任务是一个实现Job接口的类, 且任务类必须含有空构造器
当关联这个任务实例的触发器声明的执行时间到了的时候,调度程序Scheduler 会调用这个execute()方法来执行任务,我们的任务内容就可以在这个方法中执行
JobExecutionContext: 工作执行的上下文 自动传入
实际上在该方法退出之前会设置一个结果对象到 上下文中 ,
来让JobListeners 或者TriggerListeners 获得当前任务执行的状态
-
HelloQuartz 代码分析:
// 1 创建 Scheduler 的工厂
SchedulerFactory schedulerFactory = new StdSchedulerFactory();创建一个生产调度器的工厂
查看Scheduler 接口源码:
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
package org.quartz;
import java.util.Collection;
public interface SchedulerFactory {
Scheduler getScheduler() throws SchedulerException;
Scheduler getScheduler(String var1) throws SchedulerException;
Collection<Scheduler> getAllSchedulers() throws SchedulerException;
}
接口规范了 三个必须实现的方法:
三个方法的作用实际上都是 来获取 调度器实例
getScheduler()
getScheduler(String schedName) // 返回指定了名字的调度器实例
getAllSchedulers()
Scheduler 的实现有 两种:
代码中创建的是 new StdSchedulerFactory() ;
查看 DirectSchedulerFactory 中实现代码:
public class DirectSchedulerFactory implements SchedulerFactory {
public static final String DEFAULT_INSTANCE_ID = "SIMPLE_NON_CLUSTERED";
public static final String DEFAULT_SCHEDULER_NAME = "SimpleQuartzScheduler";
private static final boolean DEFAULT_JMX_EXPORT = false;
private static final String DEFAULT_JMX_OBJECTNAME = null;
private static final DefaultThreadExecutor DEFAULT_THREAD_EXECUTOR = new DefaultThreadExecutor();
private static final int DEFAULT_BATCH_MAX_SIZE = 1;
private static final long DEFAULT_BATCH_TIME_WINDOW = 0L;
private boolean initialized = false;
private static DirectSchedulerFactory instance = new DirectSchedulerFactory();
private final Logger log = LoggerFactory.getLogger(this.getClass());
protected Logger getLog() {
return this.log;
}
protected DirectSchedulerFactory() {
} ....仅从字面意思上理解 DirectSchedulerFactory 就是一个直接的 调度器创建工厂
public static final String DEFAULT_INSTANCE_ID = "SIMPLE_NON_CLUSTERED";
public static final String DEFAULT_SCHEDULER_NAME = "SimpleQuartzScheduler";开始两行中明确指明 这个直接的工厂 是没有实现分布式集群 而且预期返回 简单的调度器 实例
再对比 StdSchedulerFactory的 源码:
public class StdSchedulerFactory implements SchedulerFactory {
public static final String PROPERTIES_FILE = "org.quartz.properties";
public static final String PROP_SCHED_INSTANCE_NAME = "org.quartz.scheduler.instanceName";
public static final String PROP_SCHED_INSTANCE_ID = "org.quartz.scheduler.instanceId";
public static final String PROP_SCHED_INSTANCE_ID_GENERATOR_PREFIX = "org.quartz.scheduler.instanceIdGenerator";
public static final String PROP_SCHED_INSTANCE_ID_GENERATOR_CLASS = "org.quartz.scheduler.instanceIdGenerator.class";
public static final String PROP_SCHED_THREAD_NAME = "org.quartz.scheduler.threadName";
public static final String PROP_SCHED_SKIP_UPDATE_CHECK = "org.quartz.scheduler.skipUpdateCheck";
public static final String PROP_SCHED_BATCH_TIME_WINDOW = "org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow";
public static final String PROP_SCHED_MAX_BATCH_SIZE = "org.quartz.scheduler.batchTriggerAcquisitionMaxCount";
public static final String PROP_SCHED_JMX_EXPORT = "org.quartz.scheduler.jmx.export";
public static final String PROP_SCHED_JMX_OBJECT_NAME = "org.quartz.scheduler.jmx.objectName";
public static final String PROP_SCHED_JMX_PROXY = "org.quartz.scheduler.jmx.proxy";
public static final String PROP_SCHED_JMX_PROXY_CLASS = "org.quartz.scheduler.jmx.proxy.class";
public static final String PROP_SCHED_RMI_EXPORT = "org.quartz.scheduler.rmi.export";
public static final String PROP_SCHED_RMI_PROXY = "org.quartz.scheduler.rmi.proxy";
public static final String PROP_SCHED_RMI_HOST = "org.quartz.scheduler.rmi.registryHost";
public static final String PROP_SCHED_RMI_PORT = "org.quartz.scheduler.rmi.registryPort";
public static final String PROP_SCHED_RMI_SERVER_PORT = "org.quartz.scheduler.rmi.serverPort";
public static final String PROP_SCHED_RMI_CREATE_REGISTRY = "org.quartz.scheduler.rmi.createRegistry";
public static final String PROP_SCHED_RMI_BIND_NAME = "org.quartz.scheduler.rmi.bindName";
public static final String PROP_SCHED_WRAP_JOB_IN_USER_TX = "org.quartz.scheduler.wrapJobExecutionInUserTransaction";
public static final String PROP_SCHED_USER_TX_URL = "org.quartz.scheduler.userTransactionURL";
public static final String PROP_SCHED_IDLE_WAIT_TIME = "org.quartz.scheduler.idleWaitTime";
public static final String PROP_SCHED_DB_FAILURE_RETRY_INTERVAL = "org.quartz.scheduler.dbFailureRetryInterval";
public static final String PROP_SCHED_MAKE_SCHEDULER_THREAD_DAEMON = "org.quartz.scheduler.makeSchedulerThreadDaemon";
public static final String PROP_SCHED_SCHEDULER_THREADS_INHERIT_CONTEXT_CLASS_LOADER_OF_INITIALIZING_THREAD = "org.quartz.scheduler.threadsInheritContextClassLoaderOfInitializer";
public static final String PROP_SCHED_CLASS_LOAD_HELPER_CLASS = "org.quartz.scheduler.classLoadHelper.class";
public static final String PROP_SCHED_JOB_FACTORY_CLASS = "org.quartz.scheduler.jobFactory.class";
public static final String PROP_SCHED_JOB_FACTORY_PREFIX = "org.quartz.scheduler.jobFactory";
public static final String PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN = "org.quartz.scheduler.interruptJobsOnShutdown";
public static final String PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN_WITH_WAIT = "org.quartz.scheduler.interruptJobsOnShutdownWithWait";
public static final String PROP_SCHED_CONTEXT_PREFIX = "org.quartz.context.key";
public static final String PROP_THREAD_POOL_PREFIX = "org.quartz.threadPool";
public static final String PROP_THREAD_POOL_CLASS = "org.quartz.threadPool.class";
public static final String PROP_JOB_STORE_PREFIX = "org.quartz.jobStore";
public static final String PROP_JOB_STORE_LOCK_HANDLER_PREFIX = "org.quartz.jobStore.lockHandler";
public static final String PROP_JOB_STORE_LOCK_HANDLER_CLASS = "org.quartz.jobStore.lockHandler.class";
public static final String PROP_TABLE_PREFIX = "tablePrefix";
public static final String PROP_SCHED_NAME = "schedName";
public static final String PROP_JOB_STORE_CLASS = "org.quartz.jobStore.class";
public static final String PROP_JOB_STORE_USE_PROP = "org.quartz.jobStore.useProperties";
public static final String PROP_DATASOURCE_PREFIX = "org.quartz.dataSource";
public static final String PROP_CONNECTION_PROVIDER_CLASS = "connectionProvider.class";
.....看起来明显区别是相对于 简单的调度器实例来说 它多出了好多常量
从常量: PROPERTIES_FILE 中可以看出 它实际上是 对应的一个配置
我们再返回去查看 第一章 .7 的properties 配置文件中的 可以配置的属性
不难发现 其实我们可以设置的配置 再quartz的声明处 就在 这里
它为我们创建了的调度器实现了各种复杂模式
作为初始demo我现在不再深入它的具体实现
继续进行代码分析:
这里我们使用了 JobBulider 返回的是 JobDetail
使用JobBulider 的newJob 方法 将 我们自实现的工作类当做参数传入
追溯源码:
public class JobBuilder {
private JobKey key;
private String description;
private Class<? extends Job> jobClass;
private boolean durability;
private boolean shouldRecover;
private JobDataMap jobDataMap = new JobDataMap();
...
protected JobBuilder() {
}
public static JobBuilder newJob() {
return new JobBuilder();
}
public static JobBuilder newJob(Class<? extends Job> jobClass) {
JobBuilder b = new JobBuilder();
b.ofType(jobClass);
return b;
}
...
public JobBuilder ofType(Class<? extends Job> jobClazz) {
this.jobClass = jobClazz;
return this;
}
...发现 JobBuilder 的静态方法中 实例了一个 JobBuilder的对象
并调用了 ofType 将我们传入的自实现的工作类设置成这个对象的 成员属性
再看 构造器 权限是 protected
看上去感觉像是 单例模式 不能直接创建 对象~!!! 而是提供一个静态方法 返回一个当前类的一个实例
区别就是 这个方法是个有参数的调用
ok 我们现在就深入了解到这里 jobClass 具体什么时候使用 我们慢慢探索
总体看起来 JobBuilder只是一个 充满 附加参数的 而且封装了Job的Pojo一样
继续代码分析 发现:
.withDescription("this is my first job01_HelloWorld ") // 设置job相关描述
没有什么可以赘述
.withIdentity("hello job01_HelloWorld" ,"normal job01_HelloWorld") // 设置任务 名称和组名
看一下 设置身份 它传入了两个 参数
源码查看:
public JobBuilder withIdentity(String name, String group) {
this.key = new JobKey(name, group);
return this;
}翻看 JobKey 和Key<JobKey>的源码 :
public class Key<T> implements Serializable, Comparable<Key<T>> {
private static final long serialVersionUID = -7141167957642391350L;
public static final String DEFAULT_GROUP = "DEFAULT";
private final String name;
private final String group;
...
public static String createUniqueName(String group) {
if (group == null) {
group = "DEFAULT";
}
String n1 = UUID.randomUUID().toString();
String n2 = UUID.nameUUIDFromBytes(group.getBytes()).toString();
return String.format("%s-%s", n2.substring(24), n1);
}实际上他只是为了达到 唯一的生成名称而创建的一个数据结构
继续 再看:
.build(); //创建 JobDetail
源码:
public JobDetail build() {
JobDetailImpl job = new JobDetailImpl();
job.setJobClass(this.jobClass);
job.setDescription(this.description);
if (this.key == null) {
this.key = new JobKey(Key.createUniqueName((String)null), (String)null);
}
job.setKey(this.key);
job.setDurability(this.durability);
job.setRequestsRecovery(this.shouldRecover);
if (!this.jobDataMap.isEmpty()) {
job.setJobDataMap(this.jobDataMap);
}
return job;
}创建了 JobDetail的一个子类 它的名字 (JobDetailImpl) 说明的是 JobDetail的一个实现
他这样做的目的: 实际上 Quartz 只提供给我们 JobDetail里的一些可以用户自定义的属性设置接口 其他的Quartz负责了自动组装
继续 分析代码:
CronTrigger trigger = TriggerBuilder.newTrigger()
.withDescription("this is my first trigger") //设置 trigger 相关描述
.withIdentity("say hello trigger", "cron trigger") //设置 当前触发其 名字 和归属组名
.startAt(new Date()) // 设置任务启动时间
.withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?"))
.build();翻看后发现与Job创建大同小异 不再赘述
继续 分析代码 scheduleJob():
scheduler.scheduleJob(jobDetail,trigger);
我们一直翻看 StdScheduler 的实现 跟踪主要方法:
跟踪到了QuartzScheduler 类中
查看QuartzScheduler所属包名: package org.quartz.core;
ok~ 到了核心类@!@
package org.quartz.core;
...
public class QuartzScheduler implements RemotableQuartzScheduler {
...
public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException {
this.validateState();
if (jobDetail == null) {
throw new SchedulerException("JobDetail cannot be null");
} else if (trigger == null) {
throw new SchedulerException("Trigger cannot be null");
} else if (jobDetail.getKey() == null) {
throw new SchedulerException("Job's key cannot be null");
} else if (jobDetail.getJobClass() == null) {
throw new SchedulerException("Job's class cannot be null");
} else {
OperableTrigger trig = (OperableTrigger)trigger;
if (trigger.getJobKey() == null) {
trig.setJobKey(jobDetail.getKey());
} else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
throw new SchedulerException("Trigger does not reference given job!");
}
trig.validate();//验证trigger
Calendar cal = null;
if (trigger.getCalendarName() != null) {
cal = this.resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
}
Date ft = trig.computeFirstFireTime(cal);
if (ft == null) {
throw new SchedulerException("Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
} else {
this.resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
this.notifySchedulerListenersJobAdded(jobDetail);
this.notifySchedulerThread(trigger.getNextFireTime().getTime());
this.notifySchedulerListenersSchduled(trigger);
return ft;
}
}
}一系列非空判断之后 有如下几行 :
OperableTrigger trig = (OperableTrigger)trigger;
trig.setJobKey(jobDetail.getKey());
...
if (trigger.getCalendarName() != null) {
cal = this.resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
}
Date ft = trig.computeFirstFireTime(cal);
this.resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
this.notifySchedulerListenersJobAdded(jobDetail);
this.notifySchedulerThread(trigger.getNextFireTime().getTime());
this.notifySchedulerListenersSchduled(trigger);
return ft;第一步 对一个触发器 设置了 JobKey 实际上就绑定了 trigger和job之间的关系
第二步 获取jobStore 翻看源码时 翻看 RAMJobStore 的retrieveCalendar()方法实现
其实这里还没涉及到多个calendar 它的作用就是获取指定名称的日历
-
第三步 获取第一次触发的时间
在触发器首次添加到调度程序时由调度程序调用,以便让触发器基于任何关联的日历计算其第一次触发时间。调用此方法后,getNextFireTime() 应返回有效的答案。
-
第四步 "存储" 注册到JobStore
通知调度器ListenersJob 添加 Job
通知调度器线程 下次调用时间
通知调度器工作安排
ok ~ 其实 我们可以简单理解 主要作用就是将 trgger和job绑定 然后 quartz又通知了它的各个核心组件
源码中:
this.resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
追溯到JobStore 发现他有几个实现:
-
"存储" 注册到JobStore的 方式区别也就在这里托盘而出:
因为我们是RAMJobStore 我们查看源代码:
public class RAMJobStore implements JobStore {
...
public void storeJob(JobDetail newJob, boolean replaceExisting) throws ObjectAlreadyExistsException {
JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
boolean repl = false;
Object var5 = this.lock;
synchronized(this.lock) {
if (this.jobsByKey.get(jw.key) != null) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newJob);
}
repl = true;
}
if (!repl) {
HashMap<JobKey, JobWrapper> grpMap = (HashMap)this.jobsByGroup.get(newJob.getKey().getGroup());
if (grpMap == null) {
grpMap = new HashMap(100);
this.jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
}
grpMap.put(newJob.getKey(), jw);
this.jobsByKey.put(jw.key, jw);
} else {
JobWrapper orig = (JobWrapper)this.jobsByKey.get(jw.key);
orig.jobDetail = jw.jobDetail;
}
}
}它的实现是把内容放在了 一个Map中维护了 所以说再重启之后 这个map 并没有持久化到硬盘中 它的生命周期就在jvm关闭时丢失了
再看一个不完全实现:
public abstract class JobStoreSupport implements JobStore, Constants {
...
public void storeJobAndTrigger(final JobDetail newJob, final OperableTrigger newTrigger) throws JobPersistenceException {
this.executeInLock(this.isLockOnInsert() ? "TRIGGER_ACCESS" : null, new JobStoreSupport.VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
JobStoreSupport.this.storeJob(conn, newJob, false);
JobStoreSupport.this.storeTrigger(conn, newTrigger, newJob, false, "WAITING", false, false);
}
});
}
...
protected void storeJob(Connection conn, JobDetail newJob, boolean replaceExisting) throws JobPersistenceException {
boolean existingJob = this.jobExists(conn, newJob.getKey());
try {
if (existingJob) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newJob);
}
this.getDelegate().updateJobDetail(conn, newJob);
} else {
this.getDelegate().insertJobDetail(conn, newJob);
}
} catch (IOException var6) {
throw new JobPersistenceException("Couldn't store job: " + var6.getMessage(), var6);
} catch (SQLException var7) {
throw new JobPersistenceException("Couldn't store job: " + var7.getMessage(), var7);
}
}
...
protected void storeTrigger(Connection conn, OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state, boolean forceState, boolean recovering) throws JobPersistenceException {
boolean existingTrigger = this.triggerExists(conn, newTrigger.getKey());
if (existingTrigger && !replaceExisting) {
throw new ObjectAlreadyExistsException(newTrigger);
} else {
try {
if (!forceState) {
boolean shouldBepaused = this.getDelegate().isTriggerGroupPaused(conn, newTrigger.getKey().getGroup());
if (!shouldBepaused) {
shouldBepaused = this.getDelegate().isTriggerGroupPaused(conn, "_$_ALL_GROUPS_PAUSED_$_");
if (shouldBepaused) {
this.getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup());
}
}
if (shouldBepaused && (state.equals("WAITING") || state.equals("ACQUIRED"))) {
state = "PAUSED";
}
}
if (job == null) {
job = this.getDelegate().selectJobDetail(conn, newTrigger.getJobKey(), this.getClassLoadHelper());
}
if (job == null) {
throw new JobPersistenceException("The job (" + newTrigger.getJobKey() + ") referenced by the trigger does not exist.");
} else {
if (job.isConcurrentExectionDisallowed() && !recovering) {
state = this.checkBlockedState(conn, job.getKey(), state);
}
if (existingTrigger) {
this.getDelegate().updateTrigger(conn, newTrigger, state, job);
} else {
this.getDelegate().insertTrigger(conn, newTrigger, state, job);
}
}
} catch (Exception var10) {
throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '" + newTrigger.getJobKey() + "' job:" + var10.getMessage(), var10);
}
}
}
其中的updateJobDetail 翻看源码实际上是将内容存储到了 数据库中
这其实就是jdbc 方式的原理
现在话题拉回来
继续 分析代码:
我们完成了所有的创建注册绑定操作 万事俱备只欠东风了
启动:
// 6 启动 调度器
scheduler.start();源码:
public void start() throws SchedulerException {
if (!this.shuttingDown && !this.closed) {
this.notifySchedulerListenersStarting();
if (this.initialStart == null) {
this.initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
this.startPlugins();
} else {
this.resources.getJobStore().schedulerResumed();
}
this.schedThread.togglePause(false);//设置 不暂停
this.getLog().info("Scheduler " + this.resources.getUniqueIdentifier() + " started.");
this.notifySchedulerListenersStarted();// 提醒 调度器的监听 启动
} else {
throw new SchedulerException("The Scheduler cannot be restarted after shutdown() has been called.");
}
}初始化JobStore源码分析:
// 判断初始化标识 保证jobStore
if (this.initialStart == null) { // 没有初始化过: 进行初始化
this.initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
this.startPlugins();
} else { // 已经初始化过 进行 恢复
this.resources.getJobStore().schedulerResumed();
}public abstract class JobStoreSupport implements JobStore, Constants {
...
public void schedulerStarted() throws SchedulerException {
if (this.isClustered()) {//判断是否是是集群
this.clusterManagementThread = new JobStoreSupport.ClusterManager();
if (this.initializersLoader != null) {
this.clusterManagementThread.setContextClassLoader(this.initializersLoader);
}
this.clusterManagementThread.initialize();
} else { // 不是集群的话
try {
this.recoverJobs();// 恢复 工作
} catch (SchedulerException var2) {
throw new SchedulerConfigException("Failure occured during job recovery.", var2);
}
}
this.misfireHandler = new JobStoreSupport.MisfireHandler();
if (this.initializersLoader != null) {
this.misfireHandler.setContextClassLoader(this.initializersLoader);
}
this.misfireHandler.initialize();
this.schedulerRunning = true;
this.getLog().debug("JobStore background threads started (as scheduler was started).");
}
...
// 恢复 工作:
// 将恢复任何失败的工作和丢失了触发的工作,并根据需要清理数据存储
protected void recoverJobs() throws JobPersistenceException {
this.executeInNonManagedTXLock("TRIGGER_ACCESS", new JobStoreSupport.VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
JobStoreSupport.this....(conn);//恢复job
}
}, (JobStoreSupport.TransactionValidator)null);
}
... protected void recoverJobs(Connection conn) throws JobPersistenceException {
try {
int rows = this.getDelegate().updateTriggerStatesFromOtherStates(conn, "WAITING", "ACQUIRED", "BLOCKED");
rows += this.getDelegate().updateTriggerStatesFromOtherStates(conn, "PAUSED", "PAUSED_BLOCKED", "PAUSED_BLOCKED");
this.getLog().info("Freed " + rows + " triggers from 'acquired' / 'blocked' state.");
this.recoverMisfiredJobs(conn, true);
List<OperableTrigger> recoveringJobTriggers = this.getDelegate().selectTriggersForRecoveringJobs(conn);
this.getLog().info("Recovering " + recoveringJobTriggers.size() + " jobs that were in-progress at the time of the last shut-down.");
Iterator i$ = recoveringJobTriggers.iterator();
while(i$.hasNext()) {
OperableTrigger recoveringJobTrigger = (OperableTrigger)i$.next();
if (this.jobExists(conn, recoveringJobTrigger.getJobKey())) {
recoveringJobTrigger.computeFirstFireTime((Calendar)null);
this.storeTrigger(conn, recoveringJobTrigger, (JobDetail)null, false, "WAITING", false, true);
}
}
this.getLog().info("Recovery complete.");
List<TriggerKey> cts = this.getDelegate().selectTriggersInState(conn, "COMPLETE");
Iterator i$ = cts.iterator();
while(i$.hasNext()) {
TriggerKey ct = (TriggerKey)i$.next();
this.removeTrigger(conn, ct);
}
this.getLog().info("Removed " + cts.size() + " 'complete' triggers.");
int n = this.getDelegate().deleteFiredTriggers(conn);
this.getLog().info("Removed " + n + " stale fired job entries.");
} catch (JobPersistenceException var7) {
throw var7;
} catch (Exception var8) {
throw new JobPersistenceException("Couldn't recover jobs: " + var8.getMessage(), var8);
}
}初始JobStore时 恢复相关: 将 失败标识的 或者 丢失触发的 工作 进行一个找回 并设置到 调度器触发工作日程中
分析:
this.schedThread.togglePause(false);
探索:
// 设置主处理循环在下一个可能的点暂停。
void togglePause(boolean pause) {
Object var2 = this.sigLock;
synchronized(this.sigLock) {
this.paused = pause;
if (this.paused) {
this.signalSchedulingChange(0L);
} else {
this.sigLock.notifyAll(); // 唤醒所有等待线程
}
}
}
...
//通知主要处理循环,已经进行了调度的改变
public void signalSchedulingChange(long candidateNewNextFireTime) {
Object var3 = this.sigLock;
synchronized(this.sigLock) {
this.signaled = true;
this.signaledNextFireTime = candidateNewNextFireTime;
this.sigLock.notifyAll();//中断在等待时间到达时可能发生的任何睡眠线程
}
}
-
Quartz学习--二 Hello Quartz! 和源码分析的更多相关文章
-
Quartz任务调度:MisFire策略和源码分析
Quartz是为大家熟知的任务调度框架,先看看官网的介绍: ---------------------------------------------------------------------- ...
-
Java并发编程(七)ConcurrentLinkedQueue的实现原理和源码分析
相关文章 Java并发编程(一)线程定义.状态和属性 Java并发编程(二)同步 Java并发编程(三)volatile域 Java并发编程(四)Java内存模型 Java并发编程(五)Concurr ...
-
Kubernetes Job Controller 原理和源码分析(一)
概述什么是 JobJob 入门示例Job 的 specPod Template并发问题其他属性 概述 Job 是主要的 Kubernetes 原生 Workload 资源之一,是在 Kubernete ...
-
memcached学习笔记——存储命令源码分析下篇
上一篇回顾:<memcached学习笔记——存储命令源码分析上篇>通过分析memcached的存储命令源码的过程,了解了memcached如何解析文本命令和mencached的内存管理机制 ...
-
memcached学习笔记——存储命令源码分析上篇
原创文章,转载请标明,谢谢. 上一篇分析过memcached的连接模型,了解memcached是如何高效处理客户端连接,这一篇分析memcached源码中的process_update_command ...
-
LinearLayout属性用法和源码分析
转载自:http://www.jianshu.com/p/650c3fd7e6ab 一. LinearLayout的属性和用法 LinearLayout对于开发来说,是使用最常用的布局控件之一,但 ...
-
Android Debuggerd 简要介绍和源码分析(转载)
转载: http://dylangao.com/2014/05/16/android-debuggerd-%E7%AE%80%E8%A6%81%E4%BB%8B%E7%BB%8D%E5%92%8C%E ...
-
Java 序列化和反序列化(二)Serializable 源码分析 - 1
目录 Java 序列化和反序列化(二)Serializable 源码分析 - 1 1. Java 序列化接口 2. ObjectOutputStream 源码分析 2.1 ObjectOutputSt ...
-
定时组件quartz系列<;三>;quartz调度机制调研及源码分析
quartz2.2.1集群调度机制调研及源码分析引言quartz集群架构调度器实例化调度过程触发器的获取触发trigger:Job执行过程:总结:附: 引言 quratz是目前最为成熟,使用最广泛的j ...
随机推荐
-
JSP+JavaBean+Servlet+Oracle新增功能中对Date类型的字段的处理
Oracle库中userinfo表borth字段是Date类型,age年纪字段是int类型.age字段要根据borth来自动计算 先说一下我遇到的问题: insert into的时候遇到日期转换类型错 ...
-
【JavaScript学习笔记】画图
<!DOCTYPE HTML> <html> <head> <script type="text/javascript"> var ...
-
Java抓取网页数据
http://ayang1588.github.io/blog/2013/04/08/catchdata/ 最近处于离职状态,正赶清闲,开始着手自己的毕业设计,课题定的是JavaWeb购物平台,打算用 ...
-
[Server Running] [Node.js, PM2] Using PM2 To Keep Your Node Apps Alive
PM2 is a production process manager for Node.js applications with a built-in load balancer. It allow ...
-
thymeleaf的初次使用(带参请求以及调用带参js方法)
之前对于前端框架接触较少,第一次接触thymeleaf,虽说看起来并不复杂但我还是花费了好一会儿才弄懂. 话不多少下面就简单说一下我在项目中的应用. 首先是java代码 controller层 将需要 ...
-
最近国外很拉风的,,基于.net 的一个手表
site:http://agentwatches.com/ 这个项目是一个国外工作室,筹集资金 创立的. 直接用c# 代码编译显示在手机上.能和智能手机通信等. 并且是开源的. 很酷 其次.它提供了. ...
-
1002 Fire Net
用递归实现各种情况的枚举,可以看做是考察DPS的简单实现. #include <stdio.h> ][]; int place(int x,int y){ int i; ;i--){ ) ...
-
[Angular Tutorial] 6-Two-way Data Binding
在这一步中,您将会添加一个新特性来使得您的用户可以控制电话列表中电话的顺序,动态改变顺序是由创建一个新的数据模型的特性实现的,将它和迭代器绑定在一起,并且让数据绑定神奇地处理下面的工作. ·除了搜索框 ...
-
Android Task 任务
关于Android中的组件和应用,之前涉及,大都是静态的概念.而当一个应用运行起来,就难免会需要关心进程.线程这样的概念.在Android中,组件的动态运行,有一个最与众不同的概念,就是Task,翻译 ...
-
Ajax异步交互 [异步对象连接服务器]
<!DOCTYPE html><html> <head> <meta charset="utf-8"> <title>X ...