1、quartz任务调度的基本实现原理
quartz是opensymphony开源组织在任务调度领域的一个开源项目,完全基于java实现。作为一个优秀的开源调度框架,quartz具有以下特点:
(1)强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求;
(2)灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式;
(3)分布式和集群能力,terracotta收购后在原来功能基础上作了进一步提升。本文将对该部分相加阐述。
1.1 quartz 核心元素
quartz任务调度的核心元素为:scheduler——任务调度器、trigger——触发器、job——任务。其中trigger和job是任务调度的元数据,scheduler是实际执行调度的控制器。
trigger是用于定义调度时间的元素,即按照什么时间规则去执行任务。quartz中主要提供了四种类型的trigger:simpletrigger,crontirgger,dateintervaltrigger,和nthincludeddaytrigger。这四种trigger可以满足企业应用中的绝大部分需求。
job用于表示被调度的任务。主要有两种类型的job:无状态的(stateless)和有状态的(stateful)。对于同一个trigger来说,有状态的job不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。job主要有两种属性:volatility和durability,其中volatility表示任务是否被持久化到数据库存储,而durability表示在没有trigger关联的时候任务是否被保留。两者都是在值为true的时候任务被持久化或保留。一个job可以被多个trigger关联,但是一个trigger只能关联一个job。
scheduler由scheduler工厂创建:directschedulerfactory或者stdschedulerfactory。第二种工厂stdschedulerfactory使用较多,因为directschedulerfactory使用起来不够方便,需要作许多详细的手工编码设置。scheduler主要有三种:remotembeanscheduler,remotescheduler和stdscheduler。
quartz核心元素之间的关系如图1.1所示:
图1.1 核心元素关系图
1.2 quartz 线程视图
在quartz中,有两类线程,scheduler调度线程和任务执行线程,其中任务执行线程通常使用一个线程池维护一组线程。
图1.2 quartz线程视图
scheduler调度线程主要有两个:执行常规调度的线程,和执行misfiredtrigger的线程。常规调度线程轮询存储的所有trigger,如果有需要触发的trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该trigger关联的任务。misfire线程是扫描所有的trigger,查看是否有misfiredtrigger,如果有的话根据misfire的策略分别处理(fire now or wait for the next fire)。
1.3 quartz job数据存储
quartz中的trigger和job需要存储下来才能被使用。quartz中有两种存储方式:ramjobstore,jobstoresupport,其中ramjobstore是将trigger和job存储在内存中,而jobstoresupport是基于jdbc将trigger和job存储到数据库中。ramjobstore的存取速度非常快,但是由于其在系统被停止后所有的数据都会丢失,所以在集群应用中,必须使用jobstoresupport。
2、quartz集群原理2.1 quartz 集群架构
一个quartz集群中的每个节点是一个独立的quartz应用,它又管理着其他的节点。这就意味着你必须对每个节点分别启动或停止。quartz集群中,独立的quartz节点并不与另一其的节点或是管理节点通信,而是通过相同的数据库表来感知到另一quartz应用的,如图2.1所示。
图2.1 quartz集群架构
2.2 quartz集群相关数据库表
因为quartz集群依赖于数据库,所以必须首先创建quartz数据库表,quartz发布包中包括了所有被支持的数据库平台的sql脚本。这些sql脚本存放于<quartz_home>/docs/dbtables 目录下。这里采用的quartz 1.8.4版本,总共12张表,不同版本,表个数可能不同。数据库为mysql,用tables_mysql.sql创建数据库表。全部表如图2.2所示,对这些表的简要介绍如图2.3所示。
图2.2 quartz 1.8.4在mysql数据库中生成的表
图2.3 quartz数据表简介
2.2.1 调度器状态表(qrtz_scheduler_state)
说明:集群中节点实例信息,quartz定时读取该表的信息判断集群中每个实例的当前状态。
instance_name:配置文件中org.quartz.scheduler.instanceid配置的名字,如果设置为auto,quartz会根据物理机名和当前时间产生一个名字。
last_checkin_time:上次检入时间
checkin_interval:检入间隔时间
2.2.2 触发器与任务关联表(qrtz_fired_triggers)
存储与已触发的trigger相关的状态信息,以及相联job的执行信息。
2.2.3 触发器信息表(qrtz_triggers)
trigger_name:trigger的名字,该名字用户自己可以随意定制,无强行要求
trigger_group:trigger所属组的名字,该名字用户自己随意定制,无强行要求
job_name:qrtz_job_details表job_name的外键
job_group:qrtz_job_details表job_group的外键
trigger_state:当前trigger状态设置为acquired,如果设为waiting,则job不会触发
trigger_cron:触发器类型,使用cron表达式
2.2.4 任务详细信息表(qrtz_job_details)
说明:保存job详细信息,该表需要用户根据实际情况初始化
job_name:集群中job的名字,该名字用户自己可以随意定制,无强行要求。
job_group:集群中job的所属组的名字,该名字用户自己随意定制,无强行要求。
job_class_name:集群中job实现类的完全包名,quartz就是根据这个路径到classpath找到该job类的。
is_durable:是否持久化,把该属性设置为1,quartz会把job持久化到数据库中
job_data:一个blob字段,存放持久化job对象。
2.2.5权限信息表(qrtz_locks)
说明:tables_oracle.sql里有相应的dml初始化,如图2.4所示。
图2.4 quartz权限信息表中的初始化信息
2.3 quartz scheduler在集群中的启动流程
quartz scheduler自身是察觉不到被集群的,只有配置给scheduler的jdbc jobstore才知道。当quartz scheduler启动时,它调用jobstore的schedulerstarted()方法,它告诉jobstore scheduler已经启动了。schedulerstarted() 方法是在jobstoresupport类中实现的。jobstoresupport类会根据quartz.properties文件中的设置来确定scheduler实例是否参与到集群中。假如配置了集群,一个新的clustermanager类的实例就被创建、初始化并启动。clustermanager是在jobstoresupport类中的一个内嵌类,继承了java.lang.thread,它会定期运行,并对scheduler实例执行检入的功能。scheduler也要查看是否有任何一个别的集群节点失败了。检入操作执行周期在quartz.properties中配置。
2.4 侦测失败的scheduler节点
当一个scheduler实例执行检入时,它会查看是否有其他的scheduler实例在到达他们所预期的时间还未检入。这是通过检查scheduler_state表中scheduler记录在last_chedk_time列的值是否早于org.quartz.jobstore.clustercheckininterval来确定的。如果一个或多个节点到了预定时间还没有检入,那么运行中的scheduler就假定它(们) 失败了。
2.5 从故障实例中恢复job
当一个sheduler实例在执行某个job时失败了,有可能由另一正常工作的scheduler实例接过这个job重新运行。要实现这种行为,配置给jobdetail对象的job可恢复属性必须设置为true(job.setrequestsrecovery(true))。如果可恢复属性被设置为false(默认为false),当某个scheduler在运行该job失败时,它将不会重新运行;而是由另一个scheduler实例在下一次触发时间触发。scheduler实例出现故障后多快能被侦测到取决于每个scheduler的检入间隔(即2.3中提到的org.quartz.jobstore.clustercheckininterval)。
3、quartz集群实例(quartz+spring)
3.1 spring不兼容quartz问题
spring从2.0.2开始便不再支持quartz。具体表现在quartz+spring把quartz的task实例化进入数据库时,会产生:serializable的错误:
1
2
3
4
5
6
7
8
|
<bean id= "jobtask" class = "org.springframework.scheduling.quartz. methodinvokingjobdetailfactorybean " >
<property name= "targetobject" >
<ref bean= "quartzjob" />
</property>
<property name= "targetmethod" >
<value>execute</value>
</property>
</bean>
|
这个methodinvokingjobdetailfactorybean类中的methodinvoking方法,是不支持序列化的,因此在把quartz的task序列化进入数据库时就会抛错。
首先解决methodinvokingjobdetailfactorybean的问题,在不修改spring源码的情况下,可以避免使用这个类,直接调用jobdetail。但是使用jobdetail实现,需要自己实现mothodinvoking的逻辑,可以使用jobdetail的jobclass和jobdataasmap属性来自定义一个factory(manager)来实现同样的目的。例如,本示例中新建了一个mydetailquartzjobbean来实现这个功能。
3.2 mydetailquartzjobbean.java文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package org.lxh.mvc.jobbean;
import java.lang.reflect.method;
import org.apache.commons.logging.log;
import org.apache.commons.logging.logfactory;
import org.quartz.jobexecutioncontext;
import org.quartz.jobexecutionexception;
import org.springframework.context.applicationcontext;
import org.springframework.scheduling.quartz.quartzjobbean;
public class mydetailquartzjobbean extends quartzjobbean {
protected final log logger = logfactory.getlog(getclass());
private string targetobject;
private string targetmethod;
private applicationcontext ctx;
protected void executeinternal(jobexecutioncontext context) throws jobexecutionexception {
try {
logger.info( "execute [" + targetobject + "] at once>>>>>>" );
object otargetobject = ctx.getbean(targetobject);
method m = null ;
try {
m = otargetobject.getclass().getmethod(targetmethod, new class [] {});
m.invoke(otargetobject, new object[] {});
} catch (securityexception e) {
logger.error(e);
} catch (nosuchmethodexception e) {
logger.error(e);
}
} catch (exception e) {
throw new jobexecutionexception(e);
}
}
public void setapplicationcontext(applicationcontext applicationcontext){
this .ctx=applicationcontext;
}
public void settargetobject(string targetobject) {
this .targetobject = targetobject;
}
public void settargetmethod(string targetmethod) {
this .targetmethod = targetmethod;
}
}
|
3.3真正的job实现类
在test类中,只是简单实现了打印系统当前时间的功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
package org.lxh.mvc.job;
import java.io.serializable;
import java.util.date;
import org.apache.commons.logging.log;
import org.apache.commons.logging.logfactory;
public class test implements serializable{
private log logger = logfactory.getlog(test. class );
private static final long serialversionuid = -2073310586499744415l;
public void execute () {
date date= new date();
system.out.println(date.tolocalestring());
}
}
|
3.4 配置quartz.xml文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
<bean id= "test" class = "org.lxh.mvc.job.test" scope= "prototype" >
</bean>
<bean id= "testjobtask" class = "org.springframework.scheduling.quartz.jobdetailbean" >
<property name= "jobclass" >
<value>org.lxh.mvc.jobbean.mydetailquartzjobbean</value>
</property>
<property name= "jobdataasmap" >
<map>
<entry key= "targetobject" value= "test" />
<entry key= "targetmethod" value= "execute" />
</map>
</property>
</bean>
<bean name= "testtrigger" class = "org.springframework.scheduling.quartz.crontriggerbean" >
<property name= "jobdetail" ref= "testjobtask" />
<property name= "cronexpression" value= "0/1 * * * * ?" />
</bean>
<bean id= "quartzscheduler"
class = "org.springframework.scheduling.quartz.schedulerfactorybean" >
<property name= "configlocation" value= "classpath:quartz.properties" />
<property name= "triggers" >
<list>
<ref bean= "testtrigger" />
</list>
</property>
<property name= "applicationcontextschedulercontextkey" value= "applicationcontext" />
</bean>
|
3.5 测试
servera、serverb的代码、配置完全一样,先启动servera,后启动serverb,当server关断之后,serverb会监测到其关闭,并将servera上正在执行的job接管,继续执行。
4、quartz集群实例(单独quartz)
尽管我们已经实现了spring+quartz的集群配置,但是因为spring与quartz之间的兼容问题还是不建议使用该方式。在本小节中,我们实现了单独用quartz配置的集群,相对spring+quartz的方式来说,简单、稳定。
4.1 工程结构
我们采用单独使用quartz来实现其集群功能,代码结构及所需的第三方jar包如图3.1所示。其中,mysql版本:5.1.52,mysql驱动版本:mysql-connector-java-5.1.5-bin.jar(针对于5.1.52,建议采用该版本驱动,因为quartz存在bug使得其与某些mysql驱动结合时不能正常运行)。
图4.1 quartz集群工程结构及所需第三方jar包
其中quartz.properties为quartz配置文件,放在src目录下,若无该文件,quartz将自动加载jar包中的quartz.properties文件;simplerecoveryjob.java、simplerecoverystatefuljob.java为两个job;clusterexample.java中编写了调度信息、触发机制及相应的测试main函数。
4.2 配置文件quartz.properties
默认文件名称quartz.properties,通过设置"org.quartz.jobstore.isclustered"属性为"true"来激活集群特性。在集群中的每一个实例都必须有一个唯一的"instance id" ("org.quartz.scheduler.instanceid" 属性), 但是应该有相同的"scheduler instance name" ("org.quartz.scheduler.instancename"),也就是说集群中的每一个实例都必须使用相同的quartz.properties 配置文件。除了以下几种例外,配置文件的内容其他都必须相同:
a.线程池大小。
b.不同的"org.quartz.scheduler.instanceid"属性值(通过设定为"auto"即可)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
#==============================================================
#configure main scheduler properties
#==============================================================
org.quartz.scheduler.instancename = quartzscheduler
org.quartz.scheduler.instanceid = auto
#==============================================================
#configure jobstore
#==============================================================
org.quartz.jobstore. class = org.quartz.impl.jdbcjobstore.jobstoretx
org.quartz.jobstore.driverdelegateclass = org.quartz.impl.jdbcjobstore.stdjdbcdelegate
org.quartz.jobstore.tableprefix = qrtz_
org.quartz.jobstore.isclustered = true
org.quartz.jobstore.clustercheckininterval = 10000
org.quartz.jobstore.datasource = myds
#==============================================================
#configure datasource
#==============================================================
org.quartz.datasource.myds.driver = com.mysql.jdbc.driver
org.quartz.datasource.myds.url = jdbc:mysql: //192.168.31.18:3306/test?useunicode=true&characterencoding=utf-8
org.quartz.datasource.myds.user = root
org.quartz.datasource.myds.password = 123456
org.quartz.datasource.myds.maxconnections = 30
#==============================================================
#configure threadpool
#==============================================================
org.quartz.threadpool. class = org.quartz.simpl.simplethreadpool
org.quartz.threadpool.threadcount = 5
org.quartz.threadpool.threadpriority = 5
org.quartz.threadpool.threadsinheritcontextclassloaderofinitializingthread = true
|
4.3 clusterexample.java文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
package cluster;
import java.util.date;
import org.quartz.jobdetail;
import org.quartz.scheduler;
import org.quartz.schedulerfactory;
import org.quartz.simpletrigger;
import org.quartz.impl.stdschedulerfactory;
public class clusterexample {
public void cleanup(scheduler inscheduler) throws exception {
system.out.println( "***** deleting existing jobs/triggers *****" );
// unschedule jobs
string[] groups = inscheduler.gettriggergroupnames();
for ( int i = 0 ; i < groups.length; i++) {
string[] names = inscheduler.gettriggernames(groups[i]);
for ( int j = 0 ; j < names.length; j++) {
inscheduler.unschedulejob(names[j], groups[i]);
}
}
// delete jobs
groups = inscheduler.getjobgroupnames();
for ( int i = 0 ; i < groups.length; i++) {
string[] names = inscheduler.getjobnames(groups[i]);
for ( int j = 0 ; j < names.length; j++) {
inscheduler.deletejob(names[j], groups[i]);
}
}
}
public void run( boolean inclearjobs, boolean inschedulejobs)
throws exception {
// first we must get a reference to a scheduler
schedulerfactory sf = new stdschedulerfactory();
scheduler sched = sf.getscheduler();
if (inclearjobs) {
cleanup(sched);
}
system.out.println( "------- initialization complete -----------" );
if (inschedulejobs) {
system.out.println( "------- scheduling jobs ------------------" );
string schedid = sched.getschedulerinstanceid();
int count = 1 ;
jobdetail job = new jobdetail( "job_" + count, schedid, simplerecoveryjob. class );
// ask scheduler to re-execute this job if it was in progress when
// the scheduler went down...
job.setrequestsrecovery( true );
simpletrigger trigger =
new simpletrigger( "triger_" + count, schedid, 200 , 1000l);
trigger.setstarttime( new date(system.currenttimemillis() + 1000l));
system.out.println(job.getfullname() +
" will run at: " + trigger.getnextfiretime() +
" and repeat: " + trigger.getrepeatcount() +
" times, every " + trigger.getrepeatinterval() / 1000 + " seconds" );
sched.schedulejob(job, trigger);
count++;
job = new jobdetail( "job_" + count, schedid,
simplerecoverystatefuljob. class );
// ask scheduler to re-execute this job if it was in progress when
// the scheduler went down...
job.setrequestsrecovery( false );
trigger = new simpletrigger( "trig_" + count, schedid, 100 , 2000l);
trigger.setstarttime( new date(system.currenttimemillis() + 2000l));
system.out.println(job.getfullname() +
" will run at: " + trigger.getnextfiretime() +
" and repeat: " + trigger.getrepeatcount() +
" times, every " + trigger.getrepeatinterval() / 1000 + " seconds" );
sched.schedulejob(job, trigger);
}
// jobs don't start firing until start() has been called...
system.out.println( "------- starting scheduler ---------------" );
sched.start();
system.out.println( "------- started scheduler ----------------" );
system.out.println( "------- waiting for one hour... ----------" );
try {
thread.sleep(3600l * 1000l);
} catch (exception e) {
}
system.out.println( "------- shutting down --------------------" );
sched.shutdown();
system.out.println( "------- shutdown complete ----------------" );
}
public static void main(string[] args) throws exception {
boolean clearjobs = true ;
boolean schedulejobs = true ;
for ( int i = 0 ; i < args.length; i++) {
if (args[i].equalsignorecase( "clearjobs" )) {
clearjobs = true ;
} else if (args[i].equalsignorecase( "dontschedulejobs" )) {
schedulejobs = false ;
}
}
clusterexample example = new clusterexample();
example.run(clearjobs, schedulejobs);
}
}
|
4.4 simplerecoveryjob.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package cluster;
import java.io.serializable;
import java.util.date;
import org.apache.commons.logging.log;
import org.apache.commons.logging.logfactory;
import org.quartz.job;
import org.quartz.jobexecutioncontext;
import org.quartz.jobexecutionexception;
//如果有想反复执行的动作,作业,任务就把相关的代码写在execute这个方法里,前提:实现job这个接口
//至于simplejob这个类什么时候实例化,execute这个方法何时被调用,我们不用关注,交给quartz
public class simplerecoveryjob implements job, serializable {
private static log _log = logfactory.getlog(simplerecoveryjob. class );
public simplerecoveryjob() {
}
public void execute(jobexecutioncontext context)
throws jobexecutionexception {
//这个作业只是简单的打印出作业名字和此作业运行的时间
string jobname = context.getjobdetail().getfullname();
system.out.println( "job 1111111111111111111 simplerecoveryjob says: " + jobname + " executing at " + new date());
}
}
|
4.5 运行结果
server a与server b中的配置和代码完全一样。运行方法:运行任意主机上的clusterexample.java,将任务加入调度,观察运行结果:
运行servera,结果如图4.2所示。
图4.2 servera运行结果1
开启serverb后,servera与serverb的输出如图4.3、4.4所示。
图4.3 servera运行结果2
图4.4 serverb运行结果1
从图4.3、4.4可以看出,serverb开启后,系统自动实现了负责均衡,serverb接手job1。关断servera后,serverb的运行结果如图4.5所示。
图4.5 serverb运行结果2
从图4.5中可以看出,serverb可以检测出servera丢失,将其负责的任务job2接手,并将servera丢失到server检测出这段异常时间中需要执行的job2重新执行了。
5、注意事项
5.1 时间同步问题
quartz实际并不关心你是在相同还是不同的机器上运行节点。当集群放置在不同的机器上时,称之为水平集群。节点跑在同一台机器上时,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是无法接受的,因为一旦机器崩溃了,所有的节点也就被终止了。对于水平集群,存在着时间同步问题。
节点用时间戳来通知其他实例它自己的最后检入时间。假如节点的时钟被设置为将来的时间,那么运行中的scheduler将再也意识不到那个结点已经宕掉了。另一方面,如果某个节点的时钟被设置为过去的时间,也许另一节点就会认定那个节点已宕掉并试图接过它的job重运行。最简单的同步计算机时钟的方式是使用某一个internet时间服务器(internet time server its)。
5.2 节点争抢job问题
因为quartz使用了一个随机的负载均衡算法, job以随机的方式由不同的实例执行。quartz官网上提到当前,还不存在一个方法来指派(钉住) 一个 job 到集群中特定的节点。
5.3 从集群获取job列表问题
当前,如果不直接进到数据库查询的话,还没有一个简单的方式来得到集群中所有正在执行的job列表。请求一个scheduler实例,将只能得到在那个实例上正运行job的列表。quartz官网建议可以通过写一些访问数据库jdbc代码来从相应的表中获取全部的job信息。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对服务器之家的支持。
原文链接:http://www.cnblogs.com/zhenyuyaodidiao/p/4755649.html