Quartz集群原理以及配置应用的方法详解

时间:2022-01-29 07:53:01

1、quartz任务调度的基本实现原理

  quartz是opensymphony开源组织在任务调度领域的一个开源项目,完全基于java实现。作为一个优秀的开源调度框架,quartz具有以下特点:

    (1)强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求;

    (2)灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式;

    (3)分布式和集群能力,terracotta收购后在原来功能基础上作了进一步提升。本文将对该部分相加阐述。

1.1 quartz 核心元素

  quartz任务调度的核心元素为:scheduler——任务调度器、trigger——触发器、job——任务。其中trigger和job是任务调度的元数据,scheduler是实际执行调度的控制器。

  trigger是用于定义调度时间的元素,即按照什么时间规则去执行任务。quartz中主要提供了四种类型的trigger:simpletrigger,crontirgger,dateintervaltrigger,和nthincludeddaytrigger。这四种trigger可以满足企业应用中的绝大部分需求。

  job用于表示被调度的任务。主要有两种类型的job:无状态的(stateless)和有状态的(stateful)。对于同一个trigger来说,有状态的job不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。job主要有两种属性:volatility和durability,其中volatility表示任务是否被持久化到数据库存储,而durability表示在没有trigger关联的时候任务是否被保留。两者都是在值为true的时候任务被持久化或保留。一个job可以被多个trigger关联,但是一个trigger只能关联一个job。

  scheduler由scheduler工厂创建:directschedulerfactory或者stdschedulerfactory。第二种工厂stdschedulerfactory使用较多,因为directschedulerfactory使用起来不够方便,需要作许多详细的手工编码设置。scheduler主要有三种:remotembeanscheduler,remotescheduler和stdscheduler。

  quartz核心元素之间的关系如图1.1所示:

Quartz集群原理以及配置应用的方法详解

图1.1 核心元素关系图

1.2 quartz 线程视图

  在quartz中,有两类线程,scheduler调度线程和任务执行线程,其中任务执行线程通常使用一个线程池维护一组线程。

Quartz集群原理以及配置应用的方法详解

图1.2 quartz线程视图

  scheduler调度线程主要有两个:执行常规调度的线程,和执行misfiredtrigger的线程。常规调度线程轮询存储的所有trigger,如果有需要触发的trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该trigger关联的任务。misfire线程是扫描所有的trigger,查看是否有misfiredtrigger,如果有的话根据misfire的策略分别处理(fire now or wait for the next fire)。

1.3 quartz job数据存储

  quartz中的trigger和job需要存储下来才能被使用。quartz中有两种存储方式:ramjobstore,jobstoresupport,其中ramjobstore是将trigger和job存储在内存中,而jobstoresupport是基于jdbc将trigger和job存储到数据库中。ramjobstore的存取速度非常快,但是由于其在系统被停止后所有的数据都会丢失,所以在集群应用中,必须使用jobstoresupport。

2、quartz集群原理2.1 quartz 集群架构

  一个quartz集群中的每个节点是一个独立的quartz应用,它又管理着其他的节点。这就意味着你必须对每个节点分别启动或停止。quartz集群中,独立的quartz节点并不与另一其的节点或是管理节点通信,而是通过相同的数据库表来感知到另一quartz应用的,如图2.1所示。

Quartz集群原理以及配置应用的方法详解

图2.1 quartz集群架构

2.2 quartz集群相关数据库表

  因为quartz集群依赖于数据库,所以必须首先创建quartz数据库表,quartz发布包中包括了所有被支持的数据库平台的sql脚本。这些sql脚本存放于<quartz_home>/docs/dbtables 目录下。这里采用的quartz 1.8.4版本,总共12张表,不同版本,表个数可能不同。数据库为mysql,用tables_mysql.sql创建数据库表。全部表如图2.2所示,对这些表的简要介绍如图2.3所示。

Quartz集群原理以及配置应用的方法详解

图2.2 quartz 1.8.4在mysql数据库中生成的表

Quartz集群原理以及配置应用的方法详解

图2.3 quartz数据表简介

2.2.1 调度器状态表(qrtz_scheduler_state)

  说明:集群中节点实例信息,quartz定时读取该表的信息判断集群中每个实例的当前状态。

  instance_name:配置文件中org.quartz.scheduler.instanceid配置的名字,如果设置为auto,quartz会根据物理机名和当前时间产生一个名字。

  last_checkin_time:上次检入时间

  checkin_interval:检入间隔时间

2.2.2 触发器与任务关联表(qrtz_fired_triggers)

  存储与已触发的trigger相关的状态信息,以及相联job的执行信息。

2.2.3 触发器信息表(qrtz_triggers)

  trigger_name:trigger的名字,该名字用户自己可以随意定制,无强行要求

  trigger_group:trigger所属组的名字,该名字用户自己随意定制,无强行要求

  job_name:qrtz_job_details表job_name的外键

  job_group:qrtz_job_details表job_group的外键

  trigger_state:当前trigger状态设置为acquired,如果设为waiting,则job不会触发

  trigger_cron:触发器类型,使用cron表达式

2.2.4 任务详细信息表(qrtz_job_details)

  说明:保存job详细信息,该表需要用户根据实际情况初始化

  job_name:集群中job的名字,该名字用户自己可以随意定制,无强行要求。

  job_group:集群中job的所属组的名字,该名字用户自己随意定制,无强行要求。

  job_class_name:集群中job实现类的完全包名,quartz就是根据这个路径到classpath找到该job类的。

  is_durable:是否持久化,把该属性设置为1,quartz会把job持久化到数据库中

  job_data:一个blob字段,存放持久化job对象。

2.2.5权限信息表(qrtz_locks)

  说明:tables_oracle.sql里有相应的dml初始化,如图2.4所示。

Quartz集群原理以及配置应用的方法详解

图2.4 quartz权限信息表中的初始化信息

2.3 quartz scheduler在集群中的启动流程

  quartz scheduler自身是察觉不到被集群的,只有配置给scheduler的jdbc jobstore才知道。当quartz scheduler启动时,它调用jobstore的schedulerstarted()方法,它告诉jobstore scheduler已经启动了。schedulerstarted() 方法是在jobstoresupport类中实现的。jobstoresupport类会根据quartz.properties文件中的设置来确定scheduler实例是否参与到集群中。假如配置了集群,一个新的clustermanager类的实例就被创建、初始化并启动。clustermanager是在jobstoresupport类中的一个内嵌类,继承了java.lang.thread,它会定期运行,并对scheduler实例执行检入的功能。scheduler也要查看是否有任何一个别的集群节点失败了。检入操作执行周期在quartz.properties中配置。

2.4 侦测失败的scheduler节点

  当一个scheduler实例执行检入时,它会查看是否有其他的scheduler实例在到达他们所预期的时间还未检入。这是通过检查scheduler_state表中scheduler记录在last_chedk_time列的值是否早于org.quartz.jobstore.clustercheckininterval来确定的。如果一个或多个节点到了预定时间还没有检入,那么运行中的scheduler就假定它(们) 失败了。

2.5 从故障实例中恢复job

  当一个sheduler实例在执行某个job时失败了,有可能由另一正常工作的scheduler实例接过这个job重新运行。要实现这种行为,配置给jobdetail对象的job可恢复属性必须设置为true(job.setrequestsrecovery(true))。如果可恢复属性被设置为false(默认为false),当某个scheduler在运行该job失败时,它将不会重新运行;而是由另一个scheduler实例在下一次触发时间触发。scheduler实例出现故障后多快能被侦测到取决于每个scheduler的检入间隔(即2.3中提到的org.quartz.jobstore.clustercheckininterval)。

3、quartz集群实例(quartz+spring)

3.1 spring不兼容quartz问题

  spring从2.0.2开始便不再支持quartz。具体表现在quartz+spring把quartz的task实例化进入数据库时,会产生:serializable的错误:

?
1
2
3
4
5
6
7
8
<bean id="jobtask" class="org.springframework.scheduling.quartz. methodinvokingjobdetailfactorybean ">
  <property name="targetobject">
    <ref bean="quartzjob"/>
  </property>
  <property name="targetmethod">
    <value>execute</value>
  </property>
</bean>

  这个methodinvokingjobdetailfactorybean类中的methodinvoking方法,是不支持序列化的,因此在把quartz的task序列化进入数据库时就会抛错。

  首先解决methodinvokingjobdetailfactorybean的问题,在不修改spring源码的情况下,可以避免使用这个类,直接调用jobdetail。但是使用jobdetail实现,需要自己实现mothodinvoking的逻辑,可以使用jobdetail的jobclass和jobdataasmap属性来自定义一个factory(manager)来实现同样的目的。例如,本示例中新建了一个mydetailquartzjobbean来实现这个功能。

3.2 mydetailquartzjobbean.java文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package org.lxh.mvc.jobbean;
import java.lang.reflect.method;
import org.apache.commons.logging.log;
import org.apache.commons.logging.logfactory;
import org.quartz.jobexecutioncontext;
import org.quartz.jobexecutionexception;
import org.springframework.context.applicationcontext;
import org.springframework.scheduling.quartz.quartzjobbean;
public class mydetailquartzjobbean extends quartzjobbean {
 protected final log logger = logfactory.getlog(getclass());
 private string targetobject;
 private string targetmethod;
 private applicationcontext ctx;
 protected void executeinternal(jobexecutioncontext context) throws jobexecutionexception {
  try {
   logger.info("execute [" + targetobject + "] at once>>>>>>");
   object otargetobject = ctx.getbean(targetobject);
   method m = null;
   try {
    m = otargetobject.getclass().getmethod(targetmethod, new class[] {});
    m.invoke(otargetobject, new object[] {});
   } catch (securityexception e) {
    logger.error(e);
   } catch (nosuchmethodexception e) {
    logger.error(e);
   }
  } catch (exception e) {
   throw new jobexecutionexception(e);
  }
 }
 
 public void setapplicationcontext(applicationcontext applicationcontext){
  this.ctx=applicationcontext;
 }
 
 public void settargetobject(string targetobject) {
  this.targetobject = targetobject;
 }
 
 public void settargetmethod(string targetmethod) {
  this.targetmethod = targetmethod;
 }
}

3.3真正的job实现类

  在test类中,只是简单实现了打印系统当前时间的功能。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
package org.lxh.mvc.job;
import java.io.serializable;
import java.util.date;
import org.apache.commons.logging.log;
import org.apache.commons.logging.logfactory;
public class test implements serializable{
 private log logger = logfactory.getlog(test.class);
 private static final long serialversionuid = -2073310586499744415l;
 public void execute () {
  date date=new date();
  system.out.println(date.tolocalestring());
 }
}

3.4 配置quartz.xml文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<bean id="test" class="org.lxh.mvc.job.test" scope="prototype">
 </bean>
 
 <bean id="testjobtask" class="org.springframework.scheduling.quartz.jobdetailbean">
  <property name="jobclass">
<value>org.lxh.mvc.jobbean.mydetailquartzjobbean</value>
  </property>
  <property name="jobdataasmap">
   <map>
    <entry key="targetobject" value="test" />
    <entry key="targetmethod" value="execute" />
    </map>
   </property>
  </bean>
 
 <bean name="testtrigger" class="org.springframework.scheduling.quartz.crontriggerbean">
  <property name="jobdetail" ref="testjobtask" />
  <property name="cronexpression" value="0/1 * * * * ?" />
 </bean>
<bean id="quartzscheduler"
 class="org.springframework.scheduling.quartz.schedulerfactorybean">
  <property name="configlocation" value="classpath:quartz.properties"/>
  <property name="triggers">
   <list>
    <ref bean="testtrigger" />
   </list>
  </property>
  <property name="applicationcontextschedulercontextkey" value="applicationcontext" />
 </bean>

3.5 测试

  servera、serverb的代码、配置完全一样,先启动servera,后启动serverb,当server关断之后,serverb会监测到其关闭,并将servera上正在执行的job接管,继续执行。

4、quartz集群实例(单独quartz)

  尽管我们已经实现了spring+quartz的集群配置,但是因为spring与quartz之间的兼容问题还是不建议使用该方式。在本小节中,我们实现了单独用quartz配置的集群,相对spring+quartz的方式来说,简单、稳定。

4.1 工程结构

  我们采用单独使用quartz来实现其集群功能,代码结构及所需的第三方jar包如图3.1所示。其中,mysql版本:5.1.52,mysql驱动版本:mysql-connector-java-5.1.5-bin.jar(针对于5.1.52,建议采用该版本驱动,因为quartz存在bug使得其与某些mysql驱动结合时不能正常运行)。

Quartz集群原理以及配置应用的方法详解

图4.1 quartz集群工程结构及所需第三方jar包

  其中quartz.properties为quartz配置文件,放在src目录下,若无该文件,quartz将自动加载jar包中的quartz.properties文件;simplerecoveryjob.java、simplerecoverystatefuljob.java为两个job;clusterexample.java中编写了调度信息、触发机制及相应的测试main函数。

4.2 配置文件quartz.properties

  默认文件名称quartz.properties,通过设置"org.quartz.jobstore.isclustered"属性为"true"来激活集群特性。在集群中的每一个实例都必须有一个唯一的"instance id" ("org.quartz.scheduler.instanceid" 属性), 但是应该有相同的"scheduler instance name" ("org.quartz.scheduler.instancename"),也就是说集群中的每一个实例都必须使用相同的quartz.properties 配置文件。除了以下几种例外,配置文件的内容其他都必须相同:

  a.线程池大小。

  b.不同的"org.quartz.scheduler.instanceid"属性值(通过设定为"auto"即可)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#==============================================================
#configure main scheduler properties
#==============================================================
org.quartz.scheduler.instancename = quartzscheduler
org.quartz.scheduler.instanceid = auto
 
#==============================================================
#configure jobstore
#==============================================================
org.quartz.jobstore.class = org.quartz.impl.jdbcjobstore.jobstoretx
org.quartz.jobstore.driverdelegateclass = org.quartz.impl.jdbcjobstore.stdjdbcdelegate
org.quartz.jobstore.tableprefix = qrtz_
org.quartz.jobstore.isclustered = true
org.quartz.jobstore.clustercheckininterval = 10000
org.quartz.jobstore.datasource = myds
 
#==============================================================
#configure datasource
#==============================================================
org.quartz.datasource.myds.driver = com.mysql.jdbc.driver
org.quartz.datasource.myds.url = jdbc:mysql://192.168.31.18:3306/test?useunicode=true&characterencoding=utf-8
org.quartz.datasource.myds.user = root
org.quartz.datasource.myds.password = 123456
org.quartz.datasource.myds.maxconnections = 30
 
#==============================================================
#configure threadpool
#==============================================================
org.quartz.threadpool.class = org.quartz.simpl.simplethreadpool
org.quartz.threadpool.threadcount = 5
org.quartz.threadpool.threadpriority = 5
org.quartz.threadpool.threadsinheritcontextclassloaderofinitializingthread = true

4.3 clusterexample.java文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package cluster;
import java.util.date;
import org.quartz.jobdetail;
import org.quartz.scheduler;
import org.quartz.schedulerfactory;
import org.quartz.simpletrigger;
import org.quartz.impl.stdschedulerfactory;
public class clusterexample {
 
 public void cleanup(scheduler inscheduler) throws exception {
  system.out.println("***** deleting existing jobs/triggers *****");
  // unschedule jobs
  string[] groups = inscheduler.gettriggergroupnames();
  for (int i = 0; i < groups.length; i++) {
   string[] names = inscheduler.gettriggernames(groups[i]);
   for (int j = 0; j < names.length; j++) {
    inscheduler.unschedulejob(names[j], groups[i]);
   }
  }
  // delete jobs
  groups = inscheduler.getjobgroupnames();
  for (int i = 0; i < groups.length; i++) {
   string[] names = inscheduler.getjobnames(groups[i]);
   for (int j = 0; j < names.length; j++) {
    inscheduler.deletejob(names[j], groups[i]);
   }
  }
 }
  
 public void run(boolean inclearjobs, boolean inschedulejobs)
  throws exception {
  // first we must get a reference to a scheduler
  schedulerfactory sf = new stdschedulerfactory();
  scheduler sched = sf.getscheduler();
  
  if (inclearjobs) {
   cleanup(sched);
  }
  system.out.println("------- initialization complete -----------");
  if (inschedulejobs) {
   system.out.println("------- scheduling jobs ------------------");
   string schedid = sched.getschedulerinstanceid();
   int count = 1;
   jobdetail job = new jobdetail("job_" + count, schedid, simplerecoveryjob.class);
   // ask scheduler to re-execute this job if it was in progress when
   // the scheduler went down...
   job.setrequestsrecovery(true);
   simpletrigger trigger =
    new simpletrigger("triger_" + count, schedid, 200, 1000l);
   trigger.setstarttime(new date(system.currenttimemillis() + 1000l));
   system.out.println(job.getfullname() +
     " will run at: " + trigger.getnextfiretime() +
     " and repeat: " + trigger.getrepeatcount() +
     " times, every " + trigger.getrepeatinterval() / 1000 + " seconds");
   sched.schedulejob(job, trigger);
   count++;
   job = new jobdetail("job_" + count, schedid,
     simplerecoverystatefuljob.class);
   // ask scheduler to re-execute this job if it was in progress when
   // the scheduler went down...
   job.setrequestsrecovery(false);
   trigger = new simpletrigger("trig_" + count, schedid, 100, 2000l);
   trigger.setstarttime(new date(system.currenttimemillis() + 2000l));
   system.out.println(job.getfullname() +
     " will run at: " + trigger.getnextfiretime() +
     " and repeat: " + trigger.getrepeatcount() +
     " times, every " + trigger.getrepeatinterval() / 1000 + " seconds");
   sched.schedulejob(job, trigger);
  }
  // jobs don't start firing until start() has been called...
  system.out.println("------- starting scheduler ---------------");
  sched.start();
  system.out.println("------- started scheduler ----------------");
  system.out.println("------- waiting for one hour... ----------");
  try {
   thread.sleep(3600l * 1000l);
  } catch (exception e) {
  }
  system.out.println("------- shutting down --------------------");
  sched.shutdown();
  system.out.println("------- shutdown complete ----------------");
  }
 
 public static void main(string[] args) throws exception {
  boolean clearjobs = true;
  boolean schedulejobs = true;
  for (int i = 0; i < args.length; i++) {
   if (args[i].equalsignorecase("clearjobs")) {
    clearjobs = true;   
   } else if (args[i].equalsignorecase("dontschedulejobs")) {
    schedulejobs = false;
   }
  }
  clusterexample example = new clusterexample();
  example.run(clearjobs, schedulejobs);
 }
}

4.4 simplerecoveryjob.java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package cluster;
 
import java.io.serializable;
import java.util.date;
import org.apache.commons.logging.log;
import org.apache.commons.logging.logfactory;
import org.quartz.job;
import org.quartz.jobexecutioncontext;
import org.quartz.jobexecutionexception;
//如果有想反复执行的动作,作业,任务就把相关的代码写在execute这个方法里,前提:实现job这个接口
//至于simplejob这个类什么时候实例化,execute这个方法何时被调用,我们不用关注,交给quartz
public class simplerecoveryjob implements job, serializable {
 private static log _log = logfactory.getlog(simplerecoveryjob.class);
 public simplerecoveryjob() {
 }
 public void execute(jobexecutioncontext context)
  throws jobexecutionexception {
  //这个作业只是简单的打印出作业名字和此作业运行的时间
  string jobname = context.getjobdetail().getfullname();
  system.out.println("job 1111111111111111111 simplerecoveryjob says: " + jobname + " executing at " + new date());
 }
}

4.5 运行结果

  server a与server b中的配置和代码完全一样。运行方法:运行任意主机上的clusterexample.java,将任务加入调度,观察运行结果:

  运行servera,结果如图4.2所示。

Quartz集群原理以及配置应用的方法详解

图4.2 servera运行结果1  

  开启serverb后,servera与serverb的输出如图4.3、4.4所示。

Quartz集群原理以及配置应用的方法详解

图4.3 servera运行结果2

Quartz集群原理以及配置应用的方法详解

图4.4 serverb运行结果1

  从图4.3、4.4可以看出,serverb开启后,系统自动实现了负责均衡,serverb接手job1。关断servera后,serverb的运行结果如图4.5所示。

Quartz集群原理以及配置应用的方法详解

图4.5 serverb运行结果2

  从图4.5中可以看出,serverb可以检测出servera丢失,将其负责的任务job2接手,并将servera丢失到server检测出这段异常时间中需要执行的job2重新执行了。

5、注意事项

5.1 时间同步问题

  quartz实际并不关心你是在相同还是不同的机器上运行节点。当集群放置在不同的机器上时,称之为水平集群。节点跑在同一台机器上时,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是无法接受的,因为一旦机器崩溃了,所有的节点也就被终止了。对于水平集群,存在着时间同步问题。

  节点用时间戳来通知其他实例它自己的最后检入时间。假如节点的时钟被设置为将来的时间,那么运行中的scheduler将再也意识不到那个结点已经宕掉了。另一方面,如果某个节点的时钟被设置为过去的时间,也许另一节点就会认定那个节点已宕掉并试图接过它的job重运行。最简单的同步计算机时钟的方式是使用某一个internet时间服务器(internet time server its)。

5.2 节点争抢job问题

  因为quartz使用了一个随机的负载均衡算法, job以随机的方式由不同的实例执行。quartz官网上提到当前,还不存在一个方法来指派(钉住) 一个 job 到集群中特定的节点。

5.3 从集群获取job列表问题

  当前,如果不直接进到数据库查询的话,还没有一个简单的方式来得到集群中所有正在执行的job列表。请求一个scheduler实例,将只能得到在那个实例上正运行job的列表。quartz官网建议可以通过写一些访问数据库jdbc代码来从相应的表中获取全部的job信息。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对服务器之家的支持。

原文链接:http://www.cnblogs.com/zhenyuyaodidiao/p/4755649.html