一文揭秘定时任务调度框架quartz

时间:2021-08-24 14:41:44

之前写过quartz或者引用过quartz的一些文章,有很多人给我发消息问quartz的相关问题,

quartz 报错:java.lang.classNotFoundException

quartz源码分析之深刻理解job,sheduler,calendar,trigger及listener之间的关系

Quartz框架多个trigger任务执行出现漏执行的问题分析--转

quartz集群调度机制调研及源码分析---转载

分布式定时任务调度系统技术选型--转

趁着年底比较清闲,把quartz的问题整理了一下,顺带翻了翻源码,做了一些总结,希望能帮助到一些人或者减少人们探索的时间。

注意,使用版本为quartz2.2.3  spring boot2.1.3

1.quartz的核心组件

1.1 Job组件

一文揭秘定时任务调度框架quartz

1.1.1Job

Job负责任务执行的逻辑,所有逻辑在execute()方法中,执行所需要的数据存放在JobExecutionContext 中

Job实例:

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class ColorJob implements Job { private static Logger _log = LoggerFactory.getLogger(ColorJob.class); // parameter names specific to this job
public static final String FAVORITE_COLOR = "favorite color";
public static final String EXECUTION_COUNT = "count"; // Since Quartz will re-instantiate a class every time it
// gets executed, members non-static member variables can
// not be used to maintain state!
private int _counter = 1; /**
* <p>
* Empty constructor for job initialization
* </p>
* <p>
* Quartz requires a public empty constructor so that the
* scheduler can instantiate the class whenever it needs.
* </p>
*/
public ColorJob() {
} /**
* <p>
* Called by the <code>{@link org.quartz.Scheduler}</code> when a
* <code>{@link org.quartz.Trigger}</code> fires that is associated with
* the <code>Job</code>.
* </p>
*
* @throws JobExecutionException
* if there is an exception while executing the job.
*/
public void execute(JobExecutionContext context)
throws JobExecutionException { // This job simply prints out its job name and the
// date and time that it is running
JobKey jobKey = context.getJobDetail().getKey(); // Grab and print passed parameters
JobDataMap data = context.getJobDetail().getJobDataMap();
String favoriteColor = data.getString(FAVORITE_COLOR);
int count = data.getInt(EXECUTION_COUNT);
_log.info("ColorJob: " + jobKey + " executing at " + new Date() + "\n" +
" favorite color is " + favoriteColor + "\n" +
" execution count (from job map) is " + count + "\n" +
" execution count (from job member variable) is " + _counter); // increment the count and store it back into the
// job map so that job state can be properly maintained
count++;
data.put(EXECUTION_COUNT, count); // Increment the local member variable
// This serves no real purpose since job state can not
// be maintained via member variables!
_counter++;
} }

1.1.2 JobDetail存储Job的信息

主要负责

1.指定执行的Job类,唯一标识(job名称和组别 名称)

2.存储JobDataMap信息

    // job1 will only run 5 times (at start time, plus 4 repeats), every 10 seconds
JobDetail job1 = newJob(ColorJob.class).withIdentity("job1", "group1").build(); // pass initialization parameters into the job
job1.getJobDataMap().put(ColorJob.FAVORITE_COLOR, "Green");
job1.getJobDataMap().put(ColorJob.EXECUTION_COUNT, 1);

数据库存储如下:

一文揭秘定时任务调度框架quartz

1.1.3 Quartz JobBuilder提供了一个链式api创建JobDetail

@Bean
public JobDetail jobDetail() {
return JobBuilder.newJob().ofType(SampleJob.class)
.storeDurably()
.withIdentity("Qrtz_Job_Detail")
.withDescription("Invoke Sample Job service...")
.build();
}

1.1.4 Spring JobDetailFactoryBean

spring提供的一个创建JobDetail的方式工厂bean

@Bean
public JobDetailFactoryBean jobDetail() {
JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
jobDetailFactory.setJobClass(SampleJob.class);
jobDetailFactory.setDescription("Invoke Sample Job service...");
jobDetailFactory.setDurability(true);
return jobDetailFactory;
}

1.2 Trigger组件

一文揭秘定时任务调度框架quartz

trigger的状态不同

trigger的状态

    // STATES
String STATE_WAITING = "WAITING"; String STATE_ACQUIRED = "ACQUIRED"; String STATE_EXECUTING = "EXECUTING"; String STATE_COMPLETE = "COMPLETE"; String STATE_BLOCKED = "BLOCKED"; String STATE_ERROR = "ERROR"; String STATE_PAUSED = "PAUSED"; String STATE_PAUSED_BLOCKED = "PAUSED_BLOCKED"; String STATE_DELETED = "DELETED";

状态的表结构

一文揭秘定时任务调度框架quartz

trigger的类型

    // TRIGGER TYPES
/** Simple Trigger type. */
String TTYPE_SIMPLE = "SIMPLE"; /** Cron Trigger type. */
String TTYPE_CRON = "CRON"; /** Calendar Interval Trigger type. */
String TTYPE_CAL_INT = "CAL_INT"; /** Daily Time Interval Trigger type. */
String TTYPE_DAILY_TIME_INT = "DAILY_I"; /** A general blob Trigger type. */
String TTYPE_BLOB = "BLOB";

对应表结构

一文揭秘定时任务调度框架quartz

1.2.1 trigger实例

    SimpleTrigger trigger1 = newTrigger().withIdentity("trigger1", "group1").startAt(startTime)
.withSchedule(simpleSchedule().withIntervalInSeconds(10).withRepeatCount(4)).build();

Trigger存储在mysql中

一文揭秘定时任务调度框架quartz

1.2.2 Quartz TriggerBuilder

提供了一个链式创建Trigger的api

@Bean
public Trigger trigger(JobDetail job) {
return TriggerBuilder.newTrigger().forJob(job)
.withIdentity("Qrtz_Trigger")
.withDescription("Sample trigger")
.withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1))
.build();
}

1.2.3 Spring SimpleTriggerFactoryBean

spring提供的一个创建SimpleTrigger的工厂类

@Bean
public SimpleTriggerFactoryBean trigger(JobDetail job) {
SimpleTriggerFactoryBean trigger = new SimpleTriggerFactoryBean();
trigger.setJobDetail(job);
trigger.setRepeatInterval(3600000);
trigger.setRepeatCount(SimpleTrigger.REPEAT_INDEFINITELY);
return trigger;
}

1.3 调度组件

一文揭秘定时任务调度框架quartz

1.3.1 quartz提供的工厂类

@Bean
public Scheduler scheduler(Trigger trigger, JobDetail job) {
    StdSchedulerFactory factory = new StdSchedulerFactory();
    factory.initialize(new ClassPathResource("quartz.properties").getInputStream());
 
    Scheduler scheduler = factory.getScheduler();
    scheduler.setJobFactory(springBeanJobFactory());
    scheduler.scheduleJob(job, trigger);
 
    scheduler.start();
    return scheduler;
}

1.3.2 spring提供的工厂bean

@Bean
public SchedulerFactoryBean scheduler(Trigger trigger, JobDetail job) {
SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties")); schedulerFactory.setJobFactory(springBeanJobFactory());
schedulerFactory.setJobDetails(job);
schedulerFactory.setTriggers(trigger);
return schedulerFactory;
}

2.工作原理

2.1 核心类QuartzScheduler

Scheduler实现类StdScheduler封装了核心工作类QuartzScheduler

    /**
* <p>
* Construct a <code>StdScheduler</code> instance to proxy the given
* <code>QuartzScheduler</code> instance, and with the given <code>SchedulingContext</code>.
* </p>
*/
public StdScheduler(QuartzScheduler sched) {
this.sched = sched;
}

2.2 JobDetail的存取

    public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) throws SchedulerException {
validateState(); if (!storeNonDurableWhileAwaitingScheduling && !jobDetail.isDurable()) {
throw new SchedulerException(
"Jobs added with no trigger must be durable.");
} resources.getJobStore().storeJob(jobDetail, replace);
notifySchedulerThread(0L);
notifySchedulerListenersJobAdded(jobDetail);
}

2.2.1 存储JobDetail信息(以mysql Jdbc方式为例)

    /**
* <p>
* Insert or update a job.
* </p>
*/
protected void storeJob(Connection conn,
JobDetail newJob, boolean replaceExisting)
throws JobPersistenceException { boolean existingJob = jobExists(conn, newJob.getKey());
try {
if (existingJob) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newJob);
}
getDelegate().updateJobDetail(conn, newJob);
} else {
getDelegate().insertJobDetail(conn, newJob);
}
} catch (IOException e) {
throw new JobPersistenceException("Couldn't store job: "
+ e.getMessage(), e);
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't store job: "
+ e.getMessage(), e);
}
}

调用StdJDBCDelegate实现

    /**
* <p>
* Insert the job detail record.
* </p>
*
* @param conn
* the DB Connection
* @param job
* the job to insert
* @return number of rows inserted
* @throws IOException
* if there were problems serializing the JobDataMap
*/
public int insertJobDetail(Connection conn, JobDetail job)
throws IOException, SQLException {
ByteArrayOutputStream baos = serializeJobData(job.getJobDataMap()); PreparedStatement ps = null; int insertResult = 0; try {
ps = conn.prepareStatement(rtp(INSERT_JOB_DETAIL));
ps.setString(1, job.getKey().getName());
ps.setString(2, job.getKey().getGroup());
ps.setString(3, job.getDescription());
ps.setString(4, job.getJobClass().getName());
setBoolean(ps, 5, job.isDurable());
setBoolean(ps, 6, job.isConcurrentExectionDisallowed());
setBoolean(ps, 7, job.isPersistJobDataAfterExecution());
setBoolean(ps, 8, job.requestsRecovery());
setBytes(ps, 9, baos); insertResult = ps.executeUpdate();
} finally {
closeStatement(ps);
} return insertResult;
}

注意:JobDataMap序列化后以Blob形式存储到数据库中

StdJDBCConstants中执行sql如下:

    String INSERT_JOB_DETAIL = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_JOB_DETAILS + " ("
+ COL_SCHEDULER_NAME + ", " + COL_JOB_NAME
+ ", " + COL_JOB_GROUP + ", " + COL_DESCRIPTION + ", "
+ COL_JOB_CLASS + ", " + COL_IS_DURABLE + ", "
+ COL_IS_NONCONCURRENT + ", " + COL_IS_UPDATE_DATA + ", "
+ COL_REQUESTS_RECOVERY + ", "
+ COL_JOB_DATAMAP + ") " + " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?)";

2.2.2  查询JobDetail

强调一下,因JobDetail中的JobDataMap是以Blob形式存放到数据库中的(也可以通过useProperties属性修改成string存储,默认是false,Blob形式存储),所以查询时需要特殊处理:StdJDBCDelegate.java

/**
* <p>
* Select the JobDetail object for a given job name / group name.
* </p>
*
* @param conn
* the DB Connection
* @return the populated JobDetail object
* @throws ClassNotFoundException
* if a class found during deserialization cannot be found or if
* the job class could not be found
* @throws IOException
* if deserialization causes an error
*/
public JobDetail selectJobDetail(Connection conn, JobKey jobKey,
ClassLoadHelper loadHelper)
throws ClassNotFoundException, IOException, SQLException {
PreparedStatement ps = null;
ResultSet rs = null; try {
ps = conn.prepareStatement(rtp(SELECT_JOB_DETAIL));
ps.setString(1, jobKey.getName());
ps.setString(2, jobKey.getGroup());
rs = ps.executeQuery(); JobDetailImpl job = null; if (rs.next()) {
job = new JobDetailImpl(); job.setName(rs.getString(COL_JOB_NAME));
job.setGroup(rs.getString(COL_JOB_GROUP));
job.setDescription(rs.getString(COL_DESCRIPTION));
job.setJobClass( loadHelper.loadClass(rs.getString(COL_JOB_CLASS), Job.class));
job.setDurability(getBoolean(rs, COL_IS_DURABLE));
job.setRequestsRecovery(getBoolean(rs, COL_REQUESTS_RECOVERY)); Map<?, ?> map = null;
if (canUseProperties()) {
map = getMapFromProperties(rs);
} else {
map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);
} if (null != map) {
job.setJobDataMap(new JobDataMap(map));
}
} return job;
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}

2.3 查询trigger

    /**
* <p>
* Retrieve the given <code>{@link org.quartz.Trigger}</code>.
* </p>
*
* @return The desired <code>Trigger</code>, or null if there is no
* match.
*/
public OperableTrigger retrieveTrigger(final TriggerKey triggerKey) throws JobPersistenceException {
return (OperableTrigger)executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return retrieveTrigger(conn, triggerKey);
}
});
} protected OperableTrigger retrieveTrigger(Connection conn, TriggerKey key)
throws JobPersistenceException {
try { return getDelegate().selectTrigger(conn, key);
} catch (Exception e) {
throw new JobPersistenceException("Couldn't retrieve trigger: "
+ e.getMessage(), e);
}
}

StdJDBCDelegate.java

   /**
* <p>
* Select a trigger.
* </p>
*
* @param conn
* the DB Connection
* @return the <code>{@link org.quartz.Trigger}</code> object
* @throws JobPersistenceException
*/
public OperableTrigger selectTrigger(Connection conn, TriggerKey triggerKey) throws SQLException, ClassNotFoundException,
IOException, JobPersistenceException {
PreparedStatement ps = null;
ResultSet rs = null; try {
OperableTrigger trigger = null; ps = conn.prepareStatement(rtp(SELECT_TRIGGER));
ps.setString(1, triggerKey.getName());
ps.setString(2, triggerKey.getGroup());
rs = ps.executeQuery(); if (rs.next()) {
String jobName = rs.getString(COL_JOB_NAME);
String jobGroup = rs.getString(COL_JOB_GROUP);
String description = rs.getString(COL_DESCRIPTION);
long nextFireTime = rs.getLong(COL_NEXT_FIRE_TIME);
long prevFireTime = rs.getLong(COL_PREV_FIRE_TIME);
String triggerType = rs.getString(COL_TRIGGER_TYPE);
long startTime = rs.getLong(COL_START_TIME);
long endTime = rs.getLong(COL_END_TIME);
String calendarName = rs.getString(COL_CALENDAR_NAME);
int misFireInstr = rs.getInt(COL_MISFIRE_INSTRUCTION);
int priority = rs.getInt(COL_PRIORITY); Map<?, ?> map = null;
if (canUseProperties()) {
map = getMapFromProperties(rs);
} else {
map = (Map<?, ?>) getObjectFromBlob(rs, COL_JOB_DATAMAP);
} Date nft = null;
if (nextFireTime > 0) {
nft = new Date(nextFireTime);
} Date pft = null;
if (prevFireTime > 0) {
pft = new Date(prevFireTime);
}
Date startTimeD = new Date(startTime);
Date endTimeD = null;
if (endTime > 0) {
endTimeD = new Date(endTime);
} if (triggerType.equals(TTYPE_BLOB)) {
rs.close(); rs = null;
ps.close(); ps = null; ps = conn.prepareStatement(rtp(SELECT_BLOB_TRIGGER));
ps.setString(1, triggerKey.getName());
ps.setString(2, triggerKey.getGroup());
rs = ps.executeQuery(); if (rs.next()) {
trigger = (OperableTrigger) getObjectFromBlob(rs, COL_BLOB);
}
}
else {
TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(triggerType); if(tDel == null)
throw new JobPersistenceException("No TriggerPersistenceDelegate for trigger discriminator type: " + triggerType); TriggerPropertyBundle triggerProps = null;
try {
triggerProps = tDel.loadExtendedTriggerProperties(conn, triggerKey);
} catch (IllegalStateException isex) {
if (isTriggerStillPresent(ps)) {
throw isex;
} else {
// QTZ-386 Trigger has been deleted
return null;
}
} TriggerBuilder<?> tb = newTrigger()
.withDescription(description)
.withPriority(priority)
.startAt(startTimeD)
.endAt(endTimeD)
.withIdentity(triggerKey)
.modifiedByCalendar(calendarName)
.withSchedule(triggerProps.getScheduleBuilder())
.forJob(jobKey(jobName, jobGroup)); if (null != map) {
tb.usingJobData(new JobDataMap(map));
} trigger = (OperableTrigger) tb.build(); trigger.setMisfireInstruction(misFireInstr);
trigger.setNextFireTime(nft);
trigger.setPreviousFireTime(pft); setTriggerStateProperties(trigger, triggerProps);
}
} return trigger;
} finally {
closeResultSet(rs);
closeStatement(ps);
}
}

执行的sql:

    String SELECT_TRIGGER = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_NAME + " = ? AND " + COL_TRIGGER_GROUP + " = ?";

和JobDetail一样,也存在Blob的问题,不再赘述。

2.4 调度执行线程QuartzSchedulerThread

 /**
* <p>
* The main processing loop of the <code>QuartzSchedulerThread</code>.
* </p>
*/
@Override
public void run() {
boolean lastAcquireFailed = false; while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
} if (halted.get()) {
break;
}
} int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List<OperableTrigger> triggers = null; long now = System.currentTimeMillis(); clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); //1.
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
} if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
} // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue; // set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); //2
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
} } for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException(); if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
} // it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
} JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
} if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
} } continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
} long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
} } catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted) // drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}

2.4.1 获取trigger(红色1)

protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
throws JobPersistenceException {
if (timeWindow < 0) {
throw new IllegalArgumentException();
} List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
final int MAX_DO_LOOP_RETRY = 3;
int currentLoopCount = 0;
do {
currentLoopCount ++;
try {
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount); // No trigger is ready to fire yet.
if (keys == null || keys.size() == 0)
return acquiredTriggers; long batchEnd = noLaterThan; for(TriggerKey triggerKey: keys) {
// If our trigger is no longer available, try a new one.
OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
if(nextTrigger == null) {
continue; // next trigger
} // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
// put it back into the timeTriggers set and continue to search for next trigger.
JobKey jobKey = nextTrigger.getJobKey();
JobDetail job;
try {
job = retrieveJob(conn, jobKey);
} catch (JobPersistenceException jpe) {
try {
getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);
} catch (SQLException sqle) {
getLog().error("Unable to set trigger state to ERROR.", sqle);
}
continue;
} if (job.isConcurrentExectionDisallowed()) {
if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
continue; // next trigger
} else {
acquiredJobKeysForNoConcurrentExec.add(jobKey);
}
} if (nextTrigger.getNextFireTime().getTime() > batchEnd) {
break;
}
// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
if (rowsUpdated <= 0) {
continue; // next trigger
}
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null); if(acquiredTriggers.isEmpty()) {
batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
}
acquiredTriggers.add(nextTrigger);
} // if we didn't end up with any trigger to fire from that first
// batch, try again for another batch. We allow with a max retry count.
if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
continue;
} // We are done with the while loop.
break;
} catch (Exception e) {
throw new JobPersistenceException(
"Couldn't acquire next trigger: " + e.getMessage(), e);
}
} while (true); // Return the acquired trigger list
return acquiredTriggers;
}

2.4.2 触发trigger(红色2)

 protected TriggerFiredBundle triggerFired(Connection conn,
OperableTrigger trigger)
throws JobPersistenceException {
JobDetail job;
Calendar cal = null; // Make sure trigger wasn't deleted, paused, or completed...
try { // if trigger was deleted, state will be STATE_DELETED
String state = getDelegate().selectTriggerState(conn,
trigger.getKey());
if (!state.equals(STATE_ACQUIRED)) {
return null;
}
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't select trigger state: "
+ e.getMessage(), e);
} try {
job = retrieveJob(conn, trigger.getJobKey());
if (job == null) { return null; }
} catch (JobPersistenceException jpe) {
try {
getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
getDelegate().updateTriggerState(conn, trigger.getKey(),
STATE_ERROR);
} catch (SQLException sqle) {
getLog().error("Unable to set trigger state to ERROR.", sqle);
}
throw jpe;
} if (trigger.getCalendarName() != null) {
cal = retrieveCalendar(conn, trigger.getCalendarName());
if (cal == null) { return null; }
} try {
getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't insert fired trigger: "
+ e.getMessage(), e);
} Date prevFireTime = trigger.getPreviousFireTime(); // call triggered - to update the trigger's next-fire-time state...
trigger.triggered(cal); String state = STATE_WAITING;
boolean force = true; if (job.isConcurrentExectionDisallowed()) {
state = STATE_BLOCKED;
force = false;
try {
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_BLOCKED, STATE_WAITING);
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_BLOCKED, STATE_ACQUIRED);
getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
STATE_PAUSED_BLOCKED, STATE_PAUSED);
} catch (SQLException e) {
throw new JobPersistenceException(
"Couldn't update states of blocked triggers: "
+ e.getMessage(), e);
}
} if (trigger.getNextFireTime() == null) {
state = STATE_COMPLETE;
force = true;
} storeTrigger(conn, trigger, job, true, state, force, false); job.getJobDataMap().clearDirtyFlag(); return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
.equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
}

2.4.3 数据库锁

一文揭秘定时任务调度框架quartz

StdRowLockSemaphore针对支持select for update的数据库如mysql

UpdateLockRowSemaphore针对不支持select for update的数据库如mssqlserver

StdRowLockSemaphore的实现如下:

    public static final String SELECT_FOR_LOCK = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE"; public static final String INSERT_LOCK = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES ("
+ SCHED_NAME_SUBST + ", ?)";

总结:

1.quartz的三大组件Job/trigger/scheduler,job负责业务逻辑,trigger负责执行时机,scheduler负责调度Job和trigger来执行。

2.使用mysql作为存储的话,使用StdJDBCDelegate和数据库进行交互,交互的sql在StdJDBCConstants中定义

一文揭秘定时任务调度框架quartz

3.QuartzScheduler是核心类,Scheduler做其代理,真正执行的是QuartzSchedulerThread

4.JobStore存储控制,JobStoreSupport的两个实现JobStoreCMT容器管理事务,不需要使用commit和rollback;JobStoreTX用在单机环境,需要处理commit和rollback

一文揭秘定时任务调度框架quartz

5.数据库锁使用了悲观锁select for update,定义为Semaphore

一文揭秘定时任务调度框架quartz

6.qrtz_scheduler_state定义了扫描间隔集群扫描间隔

一文揭秘定时任务调度框架quartz

参考文献:

【1】https://www.baeldung.com/spring-quartz-schedule

【2】https://blog.csdn.net/xiaojin21cen/article/details/79298883