项目中使用Quartz集群分享--转载

时间:2024-07-26 12:34:38

项目中使用Quartz集群分享--转载

在公司分享了Quartz,发布出来,希望大家讨论补充. 
CRM使用Quartz集群分享  一:CRM对定时任务的依赖与问题  二:什么是quartz,如何使用,集群,优化  三:CRM中quartz与Spring结合使用 
1:CRM对定时任务的依赖与问题  1)依赖  (1)每天晚上的定时任务,通过sql脚本 + crontab方式执行

  1. #crm
  2. 0 2 * * * /opt/***/javafiles/***/shell/***_daily_stat.sql
  3. 30 7 * * * /opt/***/javafiles/***/shell/***_data_fix
  4. 30 0 * * * /opt/***/javafiles/***/shell/***_sync_log
  5. 0 1 * * * /opt/***/javafiles/***/shell/***_clear_log
  6. 20 8 * * * /opt/***/javafiles/***/shell/***_daily >> /var/***/logs/***_daily.log 2>&1
  7. 40 1 * * * /opt/***/javafiles/***/shell/***_sync_account2
  8. 0 2 * * 1 /opt/***/javafiles/***/shell/***_weekly >> /var/***/logs/***_weekly.log 2>&1

存在的问题:当需要跨库或许数据的,sql无能为力,引入许多中间表,完成复杂统计需求。大范围对线上热表扫描,造成锁表,延迟严重  (2)使用python(多数据源) + SQL的方式

  1. def connectCRM():
  2. return MySQLdb.Connection("localhost", "***", "***", "***", 3306, charset="utf8")
  3. def connectTemp():
  4. return MySQLdb.Connection("localhost", "***", "***",  "***", 3306, charset="utf8")
  5. def connectOA():
  6. return MySQLdb.Connection("localhost", "***", "***",  "***", 3306, charset="utf8")
  7. def connectCore():
  8. return MySQLdb.Connection("localhost", "***", "***",  "***", 3306, charset="utf8")
  9. def connectCT():
  10. return MySQLdb.Connection("localhost", "***", "***", "***", 3306, charset="utf8")

存在的问题:直接访问数据,需要理解各系统的数据结构,无法满足动态任务问题,各系统业务接口没有重用 
(3)使用spring + JDK timer方式调用接口完成定时任务

  1. <bean id="accountStatusTaskScanner"  class="***.impl.AccountStatusTaskScanner" />
  2. <task:scheduler id="taskScheduler" pool-size="5" />
  3. <task:scheduled-tasks scheduler="taskScheduler">
  4. <task:scheduled ref="accountStatusTaskScanner" method="execute" cron="0 0 1 * * ?" />
  5. </task:scheduled-tasks>

使用写死服务器Host(srv23)的方式,控制只在一台服务器上执行task

  1. public abstract class SingletonServerTaskScanner implements TaskScanner {
  2. private final Logger logger = LoggerFactory.getLogger(SingletonServerTaskScanner.class);
  3. @Override
  4. public void execute() {
  5. String hostname = "";
  6. try {
  7. hostname = InetAddress.getLocalHost().getHostName();
  8. } catch (UnknownHostException e) {
  9. logger.error(e.getMessage(), e);
  10. }
  11. //判断是否为当前可执行服务器
  12. if (ConfigUtil.getValueByKey("core.scan.server").equals(hostname)) {
  13. doScan();
  14. }
  15. }
  16. public abstract void doScan();
  17. }

//对于srv23的重启,保存在内存中的任务将丢失,每次重启srv23重新生成定时任务

  1. public class CrmInitializer implements InitializingBean {
  2. private Logger logger = LoggerFactory.getLogger(CrmInitializer.class);
  3. @Override
  4. public void afterPropertiesSet() throws Exception {
  5. // 扫描商家状态,创建定时任务
  6. logger.info("扫描商家状态,创建定时任务");
  7. accountStatusTaskScanner.execute();
  8. // 扫描N天未拜访商家,创建定时任务
  9. logger.info("扫描N天未拜访商家,创建定时任务");
  10. nDaysActivityScanner.execute();
  11. }
  12. }
  1. //通过调用srv23的特定URL的方式,动态指定任务(如取消N天未拜访,私海进保护期,保护期进公海等)
  2. public class SingletonServerTaskController {
  3. @Resource
  4. private AccountService accountService;
  5. @RequestMapping(value = "/reschedule")
  6. public @ResponseBody
  7. String checkAndRescheduleAccount(Integer accountId) {
  8. logger.debug("reschedule task for accountId:" + accountId);
  9. if (isCurrentServer()) {
  10. accountService.checkAndRescheduleAccount(Arrays.asList(accountId));
  11. }
  12. return "ok";
  13. }
  14. private boolean isCurrentServer() {
  15. String hostname = "";
  16. try {
  17. hostname = InetAddress.getLocalHost().getHostName();
  18. } catch (UnknownHostException e) {
  19. logger.error(e.getMessage(), e);
  20. }
  21. if (ConfigUtil.getValueByKey("core.scan.server").equals(hostname)) {
  22. return true;
  23. } else {
  24. return false;
  25. }
  26. }
  27. }

存在的问题:实现步骤复杂,分散,任务调度不能恢复,严重依赖于srv23,回调URL时可能失败等情况。  CRM定时任务走过了很多弯路:  定时任务多种实现方式,使配置和代码分散在多处,难以维护和监控  任务执行过程没有保证,没有错误恢复  任务执行异常没有反馈(邮件)  没有集群支持  CRM需要分布式的任务调度框架,统一解决问题.  JAVA可以使用的任务调度框架:Quartz , Jcrontab , cron4j , taobao-pamirs-schedule  为什么选择Quartz:  1)资历够老,创立于1998年,比struts1还早,但是一直在更新(27 April 2012: Quartz 2.1.5 Released),文档齐全.  2)完全由Java写成,设计用于J2SE和J2EE应用.方便集成:JVM,RMI.  3)设计清晰简单:核心概念scheduler,trigger,job,jobDetail,listener,calendar  4)支持集群:org.quartz.jobStore.isClustered  5)支持任务恢复:requestsRecovery 
从http://www.quartz-scheduler.org 获取最新Quartz  1)学习Quartz 
项目中使用Quartz集群分享--转载  图1 介绍了quartz关键的组件和简单流程 
(1)Quartz 的目录结构和内容 
docs/api                                      Quartz 框架的JavaDoc Api 说明文档  docs/dbTables                            创建 Quartz 的数据库对象的脚本  docs/wikidocs                             Quartz 的帮助文件,点击 index.html 开始查看  Examples                                    多方面使用 Quartz 的例子Lib Quartz 使用到的第三方包  src/java/org/quartz                      使用 Quartz 的客户端程序源代码,公有 API  src/java/org/quartz/core              使用 Quartz 的服务端程序源代码,私有 API  src/java/org/quartz/simpl            Quartz 提供的不衣赖于第三方产品的简单实现  src/java/org/quartz/impl              依赖于第三方产品的支持模块的实现  src/java/org/quartz/utils              整个框架要用到的辅助类和工具组件  src/jboss                                     提供了特定于 JBoss 特性的源代码  src/oracle                                   提供了特定于 Oracle 特性的源代码  src/weblogic                              提供了特定于 WebLogic 特性的源代码 
Quartz 框架包含许多的类和接口,它们分布在大概 11 个包中。多数所要使用到的类或接口放置在 org.quartz 包中。这个包含盖了 Quartz 框架的公有 API. 
(2)Quartz核心接口 Scheduler 
项目中使用Quartz集群分享--转载 图2  Scheduler 是 Quartz 的主要 API。与Quartz大部分交互是发生于 Scheduler 之上的。客服端与Scheduler 交互是通过org.quartz.Scheduler接口。  Scheduler的实现:对方法调用会传递到 QuartzScheduler 实例上。QuartzScheduler 对于客户端是不可见的,并且也不存在与此实例的直接交互。 
项目中使用Quartz集群分享--转载 
图3 
创建Scheduler  Quartz 框架提供了 org.quartz.SchedulerFactory 接口。  SchedulerFactory 实例就是用来产生 Scheduler 实例的。当 Scheduler 实例被创建之后,就会存到一个仓库中(org.quartz.impl.SchedulerRepository).  Scheduler 工厂分别是 org.quartz.impl.DirectSchedulerFactory 和 org.quartz.impl.StdSchedulerFactory  DirectSchedulerFactory 是为精细化控制 Scheduler 实例产生的工厂类,一般不用,不过有利于理解quartz内部组件。

  1. -- 最简单
  2. public void createScheduler(ThreadPool threadPool, JobStore jobStore);
  3. -- 最复杂
  4. public void createScheduler(String schedulerName, String schedulerInstanceId,ThreadPool threadPool, JobStore jobStore, String rmiRegistryHost, int rmiRegistryPort);
  1. public scheduler createScheduler(){
  2. DirectSchedulerFactory factory=DirectSchedulerFactory.getInstance();
  3. try {
  4. //创建线程池
  5. SimpleThreadPool threadPool = new SimpleThreadPool(10, Thread.NORM_PRIORITY);
  6. threadPool.initialize();
  7. //创建job存储类
  8. JobStoreTX jdbcJobStore = new JobStoreTX();
  9. jdbcJobStore.setDataSource("someDatasource");
  10. jdbcJobStore.setPostgresStyleBlobs(true);
  11. jdbcJobStore.setTablePrefix("QRTZ_");
  12. jdbcJobStore.setInstanceId("My Instance");
  13. logger.info("Scheduler starting up...");
  14. factory.createScheduler(threadPool,jdbcJobStore);
  15. // Get a scheduler from the factory
  16. Scheduler scheduler = factory.getScheduler();
  17. // 必须启动scheduler
  18. scheduler.start();
  19. return scheduler;
  20. }
  21. return null;
  22. }

org.quartz.impl.StdSchedulerFactory 依赖于属性类(Properties)决定如何生产 Scheduler 实例 
通过加载属性文件,Properties 提供启动参数:

  1. public scheduler createScheduler(){
  2. // Create an instance of the factory
  3. StdSchedulerFactory factory = new StdSchedulerFactory();
  4. // Create the properties to configure the factory
  5. Properties props = new Properties();
  6. // required to supply threadpool class and num of threads
  7. props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS,"org.quartz.simpl.SimpleThreadPool");
  8. props.put("org.quartz.threadPool.threadCount", "10");
  9. try {
  10. // Initialize the factory with properties
  11. factory.initialize(props);
  12. Scheduler scheduler = factory.getScheduler();
  13. logger.info("Scheduler starting up...");
  14. scheduler.start();
  15. } catch (SchedulerException ex) {
  16. logger.error(ex);
  17. }
  18. }

调用静态方法 getDefaultScheduler() 方法中调用了空的构造方法。如果之前未调用过任何一个 initialize() 方法,那么无参的initialize() 方法会被调用。这会开始去按照下面说的顺序加载文件。  默认情况下,quartz.properties 会被定位到,并从中加载属性。 
properties加载顺序:  1. 检查 System.getProperty("org.quartz.properties") 中是否设置了别的文件名  2. 否则,使用 quartz.properties 作为要加载的文件名  3. 试图从当前工作目录中加载这个文件  4. 试图从系统 classpath 下加载这个文件  在 Quartz Jar 包中有一个默认的 quartz.properties 文件 
默认配置如下  # Default Properties file for use by StdSchedulerFactory  # to create a Quartz Scheduler Instance, if a different  # properties file is not explicitly specified. 
org.quartz.scheduler.instanceName = DefaultQuartzScheduler  org.quartz.scheduler.rmi.export = false  org.quartz.scheduler.rmi.proxy = false  org.quartz.scheduler.wrapJobExecutionInUserTransaction = false  org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool  org.quartz.threadPool.threadCount = 10  org.quartz.threadPool.threadPriority = 5  org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true  org.quartz.jobStore.misfireThreshold = 60000  org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore  到此创建Scheduler完成 
通过Scheduler理解Quartz  Scheduler 的 API 可以分组成以下三个类别:  ·管理 Scheduler 
(1)启动 Scheduler

  1. //Start the scheduler
  2. scheduler.start();

start() 方法被调用,Scheduler 就开始搜寻需要执行的 Job。在你刚得到一个 Scheduler 新的实例时,或者 Scheduler  被设置为 standby 模式后,你才可以调用 start() 方法。

  1. public void standby() throws SchedulerException;

只要调用了 shutdown() 方法之后,你就不能再调用 Scheduler 实例的 start() 方法了。  这是因为 shutdown() 方法销毁了为 Scheduler 创建的所有的资源(线程,数据库连接等)。  你可能需要Standby 模式:设置 Scheduler 为 standby 模式会导致 Scheduler搜寻要执行的 Job 的线程被暂停下来 
停止 Scheduler

  1. //waitForJobsToComplete 是否让当前正在进行的Job正常执行完成才停止Scheduler
  2. public void shutdown(boolean waitForJobsToComplete) throws SchedulerException;
  3. public void shutdown() throws SchedulerException;

其它管理Scheduler 方法见API...  管理 Job  什么是 Quartz Job?  一个Quart Job就是一个任何一个继承job或job子接口的Java类,你可以用这个类做任何事情! 
org.quartz.Job 接口

  1. public void execute(JobExecutionContext context)throws JobExecutionException;
  2. JobExecutionContext

当 Scheduler 调用一个 Job,一个 JobexecutionContext 传递给 execute() 方法。JobExecutionContext 对象让 Job 能  访问 Quartz 运行时候环境和 Job 本身的数据。类似于在 Java Web 应用中的 servlet 访问 ServletContext 。  通过 JobExecutionContext,Job 可访问到所处环境的所有信息,包括注册到 Scheduler 上与该 Job 相关联的 JobDetail 和 Trigger。  JobDetail  部署在 Scheduler 上的每一个 Job 只创建了一个 JobDetail实例。JobDetail 是作为 Job 实例进行定义的  // Create the JobDetail  JobDetail jobDetail = new JobDetail("PrintInfoJob",Scheduler.DEFAULT_GROUP, PrintInfoJob.class);  // Create a trigger that fires now and repeats forever  Trigger trigger = TriggerUtils.makeImmediateTrigger(  SimpleTrigger.REPEAT_INDEFINITELY, 10000);  trigger.setName("PrintInfoJobTrigger");// register with the Scheduler  scheduler.scheduleJob(jobDetail, trigger);  JobDetail 被加到 Scheduler 中了,而不是 job。Job 类是作为 JobDetail 的一部份,job直到Scheduler准备要执行它的时候才会被实例化的,因此job不存在线成安全性问题. 
使用 JobDataMap 对象设定 Job 状态

  1. public void executeScheduler() throws SchedulerException{
  2. scheduler = StdSchedulerFactory.getDefaultScheduler();
  3. scheduler.start();
  4. logger.info("Scheduler was started at " + new Date());
  5. // Create the JobDetail
  6. JobDetail jobDetail = new JobDetail("PrintJobDataMapJob",Scheduler.DEFAULT_GROUP,PrintJobDataMapJob.class);
  7. // Store some state for the Job
  8. jobDetail.getJobDataMap().put("name", "John Doe");
  9. jobDetail.getJobDataMap().put("age", 23);
  10. jobDetail.getJobDataMap().put("balance",new BigDecimal(1200.37));
  11. // Create a trigger that fires once
  12. Trigger trigger = TriggerUtils.makeImmediateTrigger(0, 10000);
  13. trigger.setName("PrintJobDataMapJobTrigger");
  14. scheduler.scheduleJob(jobDetail, trigger);
  15. }
  16. //Job 能通过 JobExecutionContext 对象访问 JobDataMap
  17. public class PrintJobDataMapJob implements Job {
  18. public void execute(JobExecutionContext context)throws JobExecutionException {
  19. logger.info("in PrintJobDataMapJob");
  20. // Every job has its own job detail
  21. JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
  22. // Iterate through the key/value pairs
  23. Iterator iter = jobDataMap.keySet().iterator();
  24. while (iter.hasNext()) {
  25. Object key = iter.next();
  26. Object value = jobDataMap.get(key);
  27. logger.info("Key: " + key + " - Value: " + value);
  28. }
  29. }
  30. }

在Quartz 1.5之后,JobDataMap在 Trigger 级也是可用的。它的用途类似于Job级的JobDataMap,支持在同一个JobDetail上的多个Trigger。  伴随着加入到 Quartz 1.5 中的这一增强特性,可以使用 JobExecutionContext 的一个新的更方便的方法获取到 Job 和 Trigger 级的并集的 map 中的值。 
这个方法就是getMergedJobDataMap() 取job 和 Trigger级的并集map,它能够在 Job 中使用。管法推荐使用这个方法. 
* 实际使用中trigger级别有时取不到map中的值, 使用getMergedJobDataMap 可以获取到(官方推荐此方法). 
有状态的Job: org.quartz.StatefulJob 接口  当需要在两次 Job 执行间维护状态,使用StatefulJob 接口. 
Job 和 StatefulJob 在框架中使用中存在两个关键差异。  (一) JobDataMap 在每次执行之后重新持久化到 JobStore 中。这样就确保你对 Job 数据的改变直到下次执行仍然保持着。  (二) 两个或多个有状态的 JobDetail 实例不能并发执行。保证JobDataMap线程安全 
注意:实际使用时使用jobStoreTX/jobStoreCMT ,StatefulJob,大量的trigger对应一个JobDetail的情况下Mysql会产生锁超时问题. 
中断 Job  Quartz 包括一个接口叫做 org.quartz.InterruptableJob,它扩展了普通的 Job 接口并提供了一个 interrupt() 方法: 没有深入研究,只知道 Scheduler会调用自定义的Job的 interrupt()方法。由用户决定 Job 决定如何中断.没有测试!!!
job的特性  易失性 volatility  一个易失性的 Job 是在程序关闭之后不会被持久化。一个 Job 是通过调用 JobDetail 的 setVolatility(true)被设置为易失.  Job易失性的默认值是 false.  注意:只有采用持久性JobStore时才有效 
Job 持久性 durability  设置JobDetail 的 setDurability(false),在所有的触发器触发之后JobDetail将从 JobStore 中移出。  Job持久性默认值是false.  Scheduler将移除没有trigger关联的jobDetail 
Job 可恢复性 shuldRecover  当一个Job在执行中,Scheduler非正常的关闭,设置JobDetail 的setRequestsRecovery(true) 在 Scheduler 重启之后可恢复的Job还会再次被执行。这个  Job 会重新开始执行。注意job代码事务特性.  Job可恢复性默认为false,Scheduler不会试着去恢复job操作。 
项目中使用Quartz集群分享--转载 
图为表述没有执行完成的job数据库记录 
Scheduler 中移除 Job  移除所有与这个 Job 相关联的 Trigger;如果这个 Job 是非持久性的,它将会从 Scheduler 中移出。  更直接的方式是使用 deleteJob() 方法,它还会删除所有与当前job关联的trigger 
public boolean deleteJob(String jobName, String groupName) throws SchedulerException;  quartz 本身提供的 Job  org.quartz.jobs.FileScanJob 检查某个指定文件是否变化,并在文件被改变时通知到相应监听器的 Job  org.quartz.jobs.FileScanListener 在文件被修改后通知 FileScanJob 的监听器  org.quartz.jobs.NativeJob 用来执行本地程序(如 windows 下 .exe 文件) 的 Job  org.quartz.jobs.NoOpJob 什么也不做,但用来测试监听器不是很有用的。一些用户甚至仅仅用它来导致一个监听器的运行  org.quartz.jobs.ee.mail.SendMailJob 使用 JavaMail API 发送 e-mail 的 Job  org.quartz.jobs.ee.jmx.JMXInvokerJob 调用 JMX bean 上的方法的 Job  org.quartz.jobs.ee.ejb.EJBInvokerJob 用来调用 EJB 上方法的 Job 
job的理解到此结束 
理解quartz Trigger  Job 包含了要执行任务的逻辑,但是Job不负责何时执行。这个事情由触发器(Trigger)负责。  Quartz Trigger继承了抽象的org.quartz.Trigger 类。  目前,Quartz 有三个可用的实现 
org.quartz.SimpleTrigger  org.quartz.CronTrigger  org.quartz.NthIncludeDayTrigger  使用org.quartz.SimpleTrigger  SimpleTrigger 是设置和使用是最为简单的一种 Quartz Trigger。它是为那种需要在特定的日期/时间启动,且以一个可能的间隔时间重复执行 n 次的 Job 所设计的。 
SimpleTrigger 存在几个变种的构造方法。他们是从无参的版本一直到带全部参数的版本。 
下面代码版断显示了一个仅带有trigger 的名字和组的简单构造方法 
SimpleTrigger sTrigger = new SimpleTrigger("myTrigger", Scheduler.DEFAULT_GROUP);  这个 Trigger 会立即执行,而不重复。还有一个构造方法带有多个参数,配置 Triiger 在某一特定时刻触发,重复执行多次,和两  次触发间的延迟时间。

  1. public SimpleTrigger(String name, String group,String jobName, String jobGroup,
  2. Date startTime,Date endTime, int repeatCount, long repeatInterval);

使用org.quartz.CronTrigger  CronTrigger 是基于 Unix 类似于 cron 的表达式触发,也是功能最强大和最常用的Trigger  Cron表达式:

  1. "0 0 12 * * ?"                     Fire at 12pm (noon) every day
  2. "0 15 10 ? * *"                   Fire at 10:15am every day
  3. "0 15 10 * * ?"                   Fire at 10:15am every day
  4. "0 15 10 * * ? *"                 Fire at 10:15am every day
  5. "0 15 10 * * ? 2005"           Fire at 10:15am every day during the year 2005
  6. "0 * 14 * * ?"                     Fire every minute starting at 2pm and ending at 2:59pm, every day
  7. "0 0/5 14 * * ?"                  Fire every 5 minutes starting at 2pm and ending at 2:55pm, every day
  8. "0 0/5 14,18 * * ?"              Fire every 5 minutes starting at 2pm and ending at 2:55pm, AND fire every 5 minutes starting at 6pm and ending at 6:55pm, every day
  9. "0 0-5 14 * * ?"                   Fire every minute starting at 2pm and ending at 2:05pm, every day
  10. "0 10,44 14 ? 3 WED"         Fire at 2:10pm and at 2:44pm every Wednesday in the month of March.
  11. "0 15 10 ? * MON-FRI"        Fire at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday
  12. "0 15 10 15 * ?"                  Fire at 10:15am on the 15th day of every month
  13. "0 15 10 L * ?"                    Fire at 10:15am on the last day of every month
  14. "0 15 10 ? * 6L"                   Fire at 10:15am on the last Friday of every month
  15. "0 15 10 ? * 6L"                   Fire at 10:15am on the last Friday of every month
  16. "0 15 10 ? * 6L 2002-2005"   Fire at 10:15am on every last Friday of every month during the years 2002, 2003, 2004 and 2005
  17. "0 15 10 ? * 6#3"                 Fire at 10:15am on the third Friday of every month

使用 org.quartz.NthIncludedDayTrigger  org.quartz.NthIncludedDayTrigger是设计用于在每一间隔类型的第几天执行 Job。  例如,你要在每个月的 12 号执行发工资提醒的Job。接下来的代码片断描绘了如何创建一个 NthIncludedDayTrigger.

  1. //创建每个月的12号的NthIncludedDayTrigger
  2. NthIncludedDayTrigger trigger = new NthIncludedDayTrigger("MyTrigger", Scheduler.DEFAULT_GROUP);
  3. trigger.setN(12);
  4. trigger.setIntervalType(NthIncludedDayTrigger.INTERVAL_TYPE_MONTHLY);

jobDetail + trigger组成最基本的定时任务:  特别注意:一个job可以对应多个Trgger , 一个Trigger只能对应一个job . 
如:CRM中N天未拜访的job对应所有的N天未拜访商家(一个商家一个trigger) 大约1:1000的比例      job和trigger都是通过name 和 group 属性确定唯一性的. 
Quartz Calendar  Quartz 的 Calendar 对象与 Java API 的 java.util.Calendar不同。  Java 的 Calender 对象是通用的日期和时间工具;  Quartz 的 Calender 专门用于屏闭一个时间区间,使 Trigger 在这个区间中不被触发。  例如,让我们假如取消节假日执行job。 
Quartz包括许多的 Calender 实现足以满足大部分的需求. 
org.quartz.impl.calendar.BaseCalender 为高级的 Calender 实现了基本的功能,实现了 org.quartz.Calender 接口  org.quartz.impl.calendar.WeeklyCalendar 排除星期中的一天或多天,例如,可用于排除周末  org.quartz.impl.calendar.MonthlyCalendar 排除月份中的数天,例如,可用于排除每月的最后一天  org.quartz.impl.calendar.AnnualCalendar 排除年中一天或多天  org.quartz.impl.calendar.HolidayCalendar 特别的用于从 Trigger 中排除节假日 
使用Calendar,只需实例化后并加入你要排除的日期,然后用 Scheduler 注册,最后必须让Calender依附于Trigger实例。 
排除国庆节实例

  1. private void scheduleJob(Scheduler scheduler, Class jobClass) {
  2. try {
  3. // Create an instance of the Quartz AnnualCalendar
  4. AnnualCalendar cal = new AnnualCalendar();
  5. // exclude 国庆节
  6. Calendar gCal = GregorianCalendar.getInstance();
  7. gCal.set(Calendar.MONTH, Calendar.OCTOBER);
  8. List<Calendar> mayHolidays = new ArraysList<Calendar>();
  9. for(int i=1; i<=7; i++){
  10. gCal.set(Calendar.DATE, i);
  11. mayHolidays.add(gCal);
  12. }
  13. cal.setDaysExcluded(mayHolidays);
  14. // Add to scheduler, replace existing, update triggers
  15. scheduler.addCalendar("crmHolidays", cal, true, true);
  16. /*
  17. * Set up a trigger to start firing now, repeat forever
  18. * and have (60000 ms) between each firing.
  19. */
  20. Trigger trigger = TriggerUtils.makeImmediateTrigger("myTrigger",-1,60000);
  21. // Trigger will use Calendar to exclude firing times
  22. trigger.setCalendarName("crmHolidays");
  23. JobDetail jobDetail = new JobDetail(jobClass.getName(), Scheduler.DEFAULT_GROUP, jobClass);
  24. // Associate the trigger with the job in the scheduler
  25. scheduler.scheduleJob(jobDetail, trigger);
  26. } catch (SchedulerException ex) {
  27. logger.error(ex);
  28. }
  29. }

Quartz 监听器  Quartz 提供了三种类型的监听器:监听Job,监听Trigger,和监听Scheduler. 
监听器是作为扩展点存在的.  Quartz 监听器是扩展点,可以扩展框架并定制来做特定的事情。跟Spring,Hibernate,Servlet监听器类似.  实现监听  1. 创建一个 Java 类,实现监听器接口  2. 用你的应用中特定的逻辑实现监听器接口的所有方法  3. 注册监听器 
全局和非全局监听器  JobListener 和 TriggerListener 可被注册为全局或非全局监听器。一个全局监听器能接收到所有的 Job/Trigger 的事件通知。  而一个非全局监听器只能接收到那些在其上已注册了监听器的 Job 或 Triiger 的事件。 
作者:James House描述全局和非全局监听器  全局监听器是主动意识的,它们为了执行它们的任务而热切的去寻找每一个可能的事件。通常,全局监听器要做的工作不用指定到特定的 Job 或 Trigger。  非全局监听器一般是被动意识的,它们在所关注的 Trigger 激发之前或是 Job 执行之前什么事也不做。因此,非全局的监听器比起全局监听器而言更适合于修改或增加 Job 执行的工作。  类似装饰设计模式  监听 Job 事件  org.quartz.JobListener 接口包含一系列的方法,它们会由 Job 在其生命周期中产生的某些关键事件时被调用

  1. public interface JobListener {
  2. //命名jobListener 只对非全局监听器有效
  3. public String getName();
  4. //Scheduler 在 JobDetail 将要被执行时调用这个方法。
  5. public void jobToBeExecuted(JobExecutionContext context);
  6. //Scheduler 在 JobDetail 即将被执行,但又被否决时调用这个方法。
  7. public void jobExecutionVetoed(JobExecutionContext context);
  8. //Scheduler 在 JobDetail 被执行之后调用这个方法。
  9. public void jobWasExecuted(JobExecutionContext context,JobExecutionException jobException);

项目中使用Quartz集群分享--转载  图7 job listener参与job的执行生命周期 
注册全局监听器

  1. scheduler.addGlobalJobListener(jobListener);

注册非全局监听器(依次完成,顺序不能颠倒)

  1. scheduler.addJobListener(jobListener);
  2. jobDetail.addJobListener(jobListener.getName());
  3. //如果已经存在jobDetail则覆盖.
  4. scheduler.addjob(jobDetail,true);

监听 Trigger 事件  org.quartz.TriggerListener 接口定义Trigger监听器

  1. public interface TriggerListener {
  2. //命名triggerListener 只对非全局监听器有效
  3. public String getName();
  4. //当与监听器相关联的 Trigger 被触发,Job 上的 execute() 方法将要被执行时,调用这个方法。
  5. //在全局TriggerListener 情况下,这个方法为所有 Trigger 被调用。(不要做耗时操作)
  6. public void triggerFired(Trigger trigger, JobExecutionContext context);
  7. //在 Trigger 触发后,Job 将要被执行时由调用这个方法。
  8. //TriggerListener给了一个选择去否决 Job 的执行。假如这个方法返回 true,这个 Job 将不会为此次 Trigger 触发而得到执行。
  9. public boolean vetoJobExecution(Trigger trigger, JobExecutidonContext context);
  10. // Scheduler 调用这个方法是在 Trigger 错过触发时。
  11. // JavaDoc 指出:你应该关注此方法中持续时间长的逻辑:在出现许多错过触发的 Trigger 时,长逻辑会导致骨牌效应。你应当保持这上方法尽量的小
  12. public void triggerMisfired(Trigger trigger);
  13. //Trigger 被触发并且完成了Job的执行时调用这个方法。
  14. public void triggerComplete(Trigger trigger, JobExecutionContext context, int triggerInstructionCode);
  15. }

triggerListener的注册与jobListener相同 
监听 Scheduler 事件  org.quartz.SchedulerListener 接口定义Trigger监听器

  1. public interface SchedulerListener {
  2. //有新的JobDetail部署调用这个方法。
  3. public void jobScheduled(Trigger trigger);
  4. //卸载时调用这个方法。
  5. public void jobUnscheduled(String triggerName, String triggerGroup);
  6. //当一个Trigger到达再也不会触发时调用这个方法。
  7. public void triggerFinalized(Trigger trigger);
  8. //Scheduler 调用这个方法是发生在一个Trigger或多个Trigger被暂停时。假如是多个Trigger的话,triggerName 参数将为null。
  9. public void triggersPaused(String triggerName, String triggerGroup);
  10. //Scheduler 调用这个方法是发生成一个 Trigger 或 Trigger 组从暂停中恢复时。假如是多个Trigger的话,triggerName 参数将为 null。
  11. public void triggersResumed(String triggerName,String triggerGroup);
  12. //当一个或一组 JobDetail 暂停时调用这个方法。
  13. public void jobsPaused(String jobName, String jobGroup);
  14. //当一个或一组 Job 从暂停上恢复时调用这个方法。假如是多个Job,jobName参数将为 null。
  15. public void jobsResumed(String jobName, String jobGroup);
  16. // 在Scheduler 的正常运行期间产生一个严重错误时调用这个方法。错误的类型会各式的,但是下面列举了一些错误例子:
  17. // 可以使用 SchedulerException 的 getErrorCode() 或者 getUnderlyingException() 方法或获取到特定错误的更详尽的信息
  18. public void schedulerError(String msg, SchedulerException cause);
  19. //Scheduler 调用这个方法用来通知 SchedulerListener Scheduler 将要被关闭。
  20. public void schedulerShutdown();
  21. }

注册SchedulerListener(SchedulerListener不存在全局非全局性)  scheduler.addSchedulerListener(schedulerListener);  由于scheduler异常存在不打印问题,CRM使用监听器代码打印.

  1. public class QuartzExceptionSchedulerListener extends SchedulerListenerSupport{
  2. private Logger logger = LoggerFactory.getLogger(QuartzExceptionSchedulerListener.class);
  3. @Override
  4. public void schedulerError(String message, SchedulerException e) {
  5. super.schedulerError(message, e);
  6. logger.error(message, e.getUnderlyingException());
  7. }
  8. }
  1. <bean  id="quartzExceptionSchedulerListener"  class="com.***.crm.quartz.listener.QuartzExceptionSchedulerListener"></bean>
  2. <!-- 配置监听器 -->
  3. <property name="schedulerListeners">
  4. <list>
  5. <ref bean="quartzExceptionSchedulerListener"/>
  6. </list>
  7. </property>

quartz与线程  主处理线程:QuartzSchedulerThread  启动Scheduler时。QuartzScheduler被创建并创建一个org.quartz.core.QuartzSchedulerThread 类的实例。  QuartzSchedulerThread 包含有决定何时下一个Job将被触发的处理循环。QuartzSchedulerThread 是一个 Java 线程。它作为一个非守护线程运行在正常优先级下。 
QuartzSchedulerThread 的主处理轮循步骤:  1. 当 Scheduler 正在运行时:  A. 检查是否有转换为 standby 模式的请求。  1. 假如 standby 方法被调用,等待继续的信号  B. 询问 JobStore 下次要被触发的 Trigger.  1. 如果没有 Trigger 待触发,等候一小段时间后再次检查  2. 假如有一个可用的 Trigger,等待触发它的确切时间的到来  D. 时间到了,为 Trigger 获取到 triggerFiredBundle.  E. 使用Scheduler和triggerFiredBundle 为 Job 创建一个JobRunShell实例  F. 在ThreadPool 申请一个线程运行 JobRunShell 实例. 
代码逻辑在QuartzSchedulerThread 的 run() 中,如下:

  1. /**
  2. * QuartzSchedulerThread.run
  3. * <p>
  4. * The main processing loop of the <code>QuartzSchedulerThread</code>.
  5. * </p>
  6. */
  7. public void run() {
  8. boolean lastAcquireFailed = false;
  9. while (!halted.get()) {
  10. try {
  11. // check if we're supposed to pause...
  12. synchronized (sigLock) {
  13. while (paused && !halted.get()) {
  14. try {
  15. // wait until togglePause(false) is called...
  16. sigLock.wait(1000L);
  17. } catch (InterruptedException ignore) {
  18. }
  19. }
  20. if (halted.get()) {
  21. break;
  22. }
  23. }
  24. int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
  25. if(availTreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
  26. Trigger trigger = null;
  27. long now = System.currentTimeMillis();
  28. clearSignaledSchedulingChange();
  29. try {
  30. trigger = qsRsrcs.getJobStore().acquireNextTrigger(
  31. ctxt, now + idleWaitTime);
  32. lastAcquireFailed = false;
  33. } catch (JobPersistenceException jpe) {
  34. if(!lastAcquireFailed) {
  35. qs.notifySchedulerListenersError(
  36. "An error occured while scanning for the next trigger to fire.",
  37. jpe);
  38. }
  39. lastAcquireFailed = true;
  40. } catch (RuntimeException e) {
  41. if(!lastAcquireFailed) {
  42. getLog().error("quartzSchedulerThreadLoop: RuntimeException "
  43. +e.getMessage(), e);
  44. }
  45. lastAcquireFailed = true;
  46. }
  47. if (trigger != null) {
  48. now = System.currentTimeMillis();
  49. long triggerTime = trigger.getNextFireTime().getTime();
  50. long timeUntilTrigger = triggerTime - now;
  51. while(timeUntilTrigger > 2) {
  52. synchronized(sigLock) {
  53. if(!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
  54. try {
  55. // we could have blocked a long while
  56. // on 'synchronize', so we must recompute
  57. now = System.currentTimeMillis();
  58. timeUntilTrigger = triggerTime - now;
  59. if(timeUntilTrigger >= 1)
  60. sigLock.wait(timeUntilTrigger);
  61. } catch (InterruptedException ignore) {
  62. }
  63. }
  64. }
  65. if(releaseIfScheduleChangedSignificantly(trigger, triggerTime)) {
  66. trigger = null;
  67. break;
  68. }
  69. now = System.currentTimeMillis();
  70. timeUntilTrigger = triggerTime - now;
  71. }
  72. if(trigger == null)
  73. continue;
  74. // set trigger to 'executing'
  75. TriggerFiredBundle bndle = null;
  76. boolean goAhead = true;
  77. synchronized(sigLock) {
  78. goAhead = !halted.get();
  79. }
  80. if(goAhead) {
  81. try {
  82. bndle = qsRsrcs.getJobStore().triggerFired(ctxt,
  83. trigger);
  84. } catch (SchedulerException se) {
  85. qs.notifySchedulerListenersError(
  86. "An error occured while firing trigger '"
  87. + trigger.getFullName() + "'", se);
  88. } catch (RuntimeException e) {
  89. getLog().error(
  90. "RuntimeException while firing trigger " +
  91. trigger.getFullName(), e);
  92. // db connection must have failed... keep
  93. // retrying until it's up...
  94. releaseTriggerRetryLoop(trigger);
  95. }
  96. }
  97. // it's possible to get 'null' if the trigger was paused,
  98. // blocked, or other similar occurrences that prevent it being
  99. // fired at this time...  or if the scheduler was shutdown (halted)
  100. if (bndle == null) {
  101. try {
  102. qsRsrcs.getJobStore().releaseAcquiredTrigger(ctxt,
  103. trigger);
  104. } catch (SchedulerException se) {
  105. qs.notifySchedulerListenersError(
  106. "An error occured while releasing trigger '"
  107. + trigger.getFullName() + "'", se);
  108. // db connection must have failed... keep retrying
  109. // until it's up...
  110. releaseTriggerRetryLoop(trigger);
  111. }
  112. continue;
  113. }
  114. // TODO: improvements:
  115. //
  116. // 2- make sure we can get a job runshell before firing trigger, or
  117. //   don't let that throw an exception (right now it never does,
  118. //   but the signature says it can).
  119. // 3- acquire more triggers at a time (based on num threads available?)
  120. JobRunShell shell = null;
  121. try {
  122. shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
  123. shell.initialize(qs, bndle);
  124. } catch (SchedulerException se) {
  125. try {
  126. qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
  127. trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
  128. } catch (SchedulerException se2) {
  129. qs.notifySchedulerListenersError(
  130. "An error occured while placing job's triggers in error state '"
  131. + trigger.getFullName() + "'", se2);
  132. // db connection must have failed... keep retrying
  133. // until it's up...
  134. errorTriggerRetryLoop(bndle);
  135. }
  136. continue;
  137. }
  138. if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
  139. try {
  140. // this case should never happen, as it is indicative of the
  141. // scheduler being shutdown or a bug in the thread pool or
  142. // a thread pool being used concurrently - which the docs
  143. // say not to do...
  144. getLog().error("ThreadPool.runInThread() return false!");
  145. qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
  146. trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
  147. } catch (SchedulerException se2) {
  148. qs.notifySchedulerListenersError(
  149. "An error occured while placing job's triggers in error state '"
  150. + trigger.getFullName() + "'", se2);
  151. // db connection must have failed... keep retrying
  152. // until it's up...
  153. releaseTriggerRetryLoop(trigger);
  154. }
  155. }
  156. continue;
  157. }
  158. } else { // if(availTreadCount > 0)
  159. continue; // should never happen, if threadPool.blockForAvailableThreads() follows contract
  160. }
  161. long now = System.currentTimeMillis();
  162. long waitTime = now + getRandomizedIdleWaitTime();
  163. long timeUntilContinue = waitTime - now;
  164. synchronized(sigLock) {
  165. try {
  166. sigLock.wait(timeUntilContinue);
  167. } catch (InterruptedException ignore) {
  168. }
  169. }
  170. } catch(RuntimeException re) {
  171. getLog().error("Runtime error occured in main trigger firing loop.", re);
  172. }
  173. } // loop...
  174. // drop references to scheduler stuff to aid garbage collection...
  175. qs = null;
  176. qsRsrcs = null;
  177. }

quartz工作者线程  Quartz 不会在主线程(QuartzSchedulerThread)中处理用户的Job。Quartz 把线程管理的职责委托给ThreadPool。 一般的设置使用org.quartz.simpl.SimpleThreadPool。SimpleThreadPool 创建了一定数量的 WorkerThread 实例来使得Job能够在线程中进行处理。  WorkerThread 是定义在 SimpleThreadPool 类中的内部类,它实质上就是一个线程。  要创建 WorkerThread 的数量以及配置他们的优先级是在文件quartz.properties中并传入工厂。 
spring properties

  1. <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>
  2. <prop key="org.quartz.threadPool.threadCount">20</prop>
  3. <prop key="org.quartz.threadPool.threadPriority">5</prop>

主线程(QuartzSchedulerThread)请求ThreadPool去运行 JobRunShell 实例,ThreadPool 就检查看是否有一个可用的工作者线  程。假如所以已配置的工作者线程都是忙的,ThreadPool 就等待直到有一个变为可用。当一个工作者线程是可用的,  并且有一个JobRunShell 等待执行,工作者线程就会调用 JobRunShell 类的 run() 方法。 
Quartz 框架允许替换线程池,但必须实现org.quartz.spi.ThreadPool 接口. 
项目中使用Quartz集群分享--转载  图4 quartz内部的主线程和工作者线程 
Quartz的存储和持久化  Quartz 用 JobStores 对 Job、Trigger、calendar 和 Schduler 数据提供一种存储机制。Scheduler 应用已配置的JobStore 来存储和获取到部署信息,并决定正被触发执行的 Job 的职责。  所有的关于哪个 Job 要执行和以什么时间表来执行他们的信息都来存储在 JobStore。 
在 Quartz 中两种可用的 Job 存储类型是:  内存(非持久化) 存储  持久化存储 
JobStore 接口  Quartz 为所有类型的Job存储提供了一个接口。叫 JobStore。所有的Job存储机制,不管是在哪里或是如何存储他们的信息的,都必须实现这个接口。  JobStore 接口的 API 可归纳为下面几类:  Job 相关的 API  Trigger 相关的 API  Calendar 相关的 API  Scheduler 相关的 API 
使用内存来存储 Scheduler 信息  Quartz 的内存Job存储类叫做 org.quartz.simple.RAMJobStore,它实现了JobStore 接口的。  RAMJobStore 是 Quartz 的默认的解决方案。  使用这种内存JobStore的好处。 
RAMJobStore是配置最简单的 JobStore:默认已经配置好了。见quartz.jar:org.quartz.quartz.properties  RAMJobStore的速度非常快。所有的 quartz存储操作都在计算机内存中 
使用持久性的 JobStore  持久性 JobStore = JDBC + 关系型数据库 
Quartz 所有的持久化的 JobStore 都扩展自 org.quartz.impl.jdbcjobstore.JobStoreSupport 类。 
项目中使用Quartz集群分享--转载 
图5  JobStoreSupport 实现了 JobStore 接口,是作为 Quartz 提供的两个具体的持久性 JobStore 类的基类。  Quartz 提供了两种不同类型的JobStoreSupport实现类,每一个设计为针对特定的数据库环境和配置:  ·org.quartz.impl.jdbcjobstore.JobStoreTX  ·org.quartz.impl.jdbcjobstore.JobStoreCMT 
独立环境中的持久性存储  JobStoreTX 类设计为用于独立环境中。这里的 "独立",我们是指这样一个环境,在其中不存在与应用容器的事务集成。 
#properties配置  org.quartz.jobStore.class = org.quartz.ompl.jdbcjobstore.JobStoreTX 
依赖容器相关的持久性存储  JobStoreCMT 类设计为与程序容器事务集成,容器管理的事物(Container Managed Transactions (CMT)) 
crm使用JobStoreTX 因为quart有长时间锁等待情况,不参与系统本身事务(crm任务内事务与quartz本身事务分离).
Quartz 数据库结构 
表名描述  QRTZ_CALENDARS 以 Blob 类型存储 Quartz 的 Calendar 信息  QRTZ_CRON_TRIGGERS 存储 Cron Trigger,包括 Cron 表达式和时区信息  QRTZ_FIRED_TRIGGERS 存储与已触发的 Trigger 相关的状态信息,以及相联 Job 的执行信息  QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的 Trigger 组的信息  QRTZ_SCHEDULER_STATE 存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例(假如是用于一个集群中)  QRTZ_LOCKS 存储程序的非观锁的信息(假如使用了悲观锁)  QRTZ_JOB_DETAILS 存储每一个已配置的 Job 的详细信息  QRTZ_JOB_LISTENERS 存储有关已配置的 JobListener 的信息  QRTZ_SIMPLE_TRIGGERS 存储简单的 Trigger,包括重复次数,间隔,以及已触的次数  QRTZ_BLOG_TRIGGERS Trigger 作为 Blob 类型存储(用于 Quartz 用户用 JDBC 创建他们自己定制的 Trigger 类型,JobStore 并不知道如何存储实例的时候)  QRTZ_TRIGGER_LISTENERS 存储已配置的 TriggerListener 的信息  QRTZ_TRIGGERS 存储已配置的 Trigger 的信息  所有的表默认以前缀QRTZ_开始。可以通过在 quartz.properties配置修改(org.quartz.jobStore.tablePrefix = QRTZ_)。  可以对不同的Scheduler实例使用多套的表,通过改变前缀来实现。 
优化 quartz数据表结构  -- 1:对关键查询路径字段建立索引

  1. create index idx_qrtz_t_next_fire_time on QRTZ_TRIGGERS(NEXT_FIRE_TIME);
  2. create index idx_qrtz_t_state on QRTZ_TRIGGERS(TRIGGER_STATE);
  3. create index idx_qrtz_t_nf_st on QRTZ_TRIGGERS(TRIGGER_STATE,NEXT_FIRE_TIME);
  4. create index idx_qrtz_ft_trig_group on QRTZ_FIRED_TRIGGERS(TRIGGER_GROUP);
  5. create index idx_qrtz_ft_trig_name on QRTZ_FIRED_TRIGGERS(TRIGGER_NAME);
  6. create index idx_qrtz_ft_trig_n_g on QRTZ_FIRED_TRIGGERS(TRIGGER_NAME,TRIGGER_GROUP);
  7. create index idx_qrtz_ft_trig_inst_name on QRTZ_FIRED_TRIGGERS(INSTANCE_NAME);
  8. create index idx_qrtz_ft_job_name on QRTZ_FIRED_TRIGGERS(JOB_NAME);
  9. create index idx_qrtz_ft_job_group on QRTZ_FIRED_TRIGGERS(JOB_GROUP);

-- 2:根据Mysql innodb表结构特性,调整主键,降低二级索引的大小

  1. ALTER TABLE QRTZ_TRIGGERS
  2. ADD UNIQUE KEY IDX_NAME_GROUP(TRIGGER_NAME,TRIGGER_GROUP),
  3. DROP PRIMARY KEY,
  4. ADD ID INT UNSIGNED NOT NULL AUTO_INCREMENT FIRST,
  5. ADD PRIMARY KEY (ID);
  6. ALTER TABLE QRTZ_JOB_DETAILS
  7. ADD UNIQUE KEY IDX_NAME_GROUP(JOB_NAME,JOB_GROUP),
  8. DROP PRIMARY KEY,
  9. ADD ID INT UNSIGNED NOT NULL AUTO_INCREMENT FIRST,
  10. ADD PRIMARY KEY (ID);

Quartz集群  只有使用持久的JobStore才能完成Quqrtz集群 
项目中使用Quartz集群分享--转载  图6  一个 Quartz 集群中的每个节点是一个独立的 Quartz 应用,它又管理着其他的节点。  需要分别对每个节点分别启动或停止。不像应用服务器的集群,独立的 Quartz 节点并不与另一个节点或是管理节点通信。  Quartz 应用是通过数据库表来感知到另一应用。 
配置集群

  1. <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
  2. <!-- 集群配置 -->
  3. <prop key="org.quartz.jobStore.isClustered">true</prop>
  4. <prop key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>
  5. <prop key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>
  6. <!-- 数据源配置 使用DBCP连接池 数据源与dataSource一致 -->
  7. <prop key="org.quartz.jobStore.dataSource">myDS</prop>
  8. <prop key="org.quartz.dataSource.myDS.driver">${database.driverClassName}</prop>
  9. <prop key="org.quartz.dataSource.myDS.URL">${database.url}</prop>
  10. <prop key="org.quartz.dataSource.myDS.user">${database.username}</prop>
  11. <prop key="org.quartz.dataSource.myDS.password">${database.password}</prop>
  12. <prop key="org.quartz.dataSource.myDS.maxConnections">5</prop>

org.quartz.jobStore.class 属性为 JobStoreTX,  将任务持久化到数据中。因为集群中节点依赖于数据库来传播Scheduler实例的状态,你只能在使用 JDBC JobStore 时应用 Quartz 集群。 
org.quartz.jobStore.isClustered 属性为 true,通知Scheduler实例要它参与到一个集群当中。 
org.quartz.jobStore.clusterCheckinInterval 
属性定义了Scheduler 实例检入到数据库中的频率(单位:毫秒)。  Scheduler 检查是否其他的实例到了它们应当检入的时候未检入;  这能指出一个失败的 Scheduler 实例,且当前 Scheduler 会以此来接管任何执行失败并可恢复的 Job。  通过检入操作,Scheduler 也会更新自身的状态记录。clusterChedkinInterval 越小,Scheduler 节点检查失败的 Scheduler 实例就越频繁。默认值是 15000 (即15 秒) 
集群实现分析  Quartz原来码分析:  基于数据库表锁实现多Quartz_Node 对Job,Trigger,Calendar等同步机制

  1. -- 数据库锁定表
  2. CREATE TABLE `QRTZ_LOCKS` (
  3. `LOCK_NAME` varchar(40) NOT NULL,
  4. PRIMARY KEY (`LOCK_NAME`)
  5. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  6. -- 记录
  7. +-----------------+
  8. | LOCK_NAME       |
  9. +-----------------+
  10. | CALENDAR_ACCESS |
  11. | JOB_ACCESS      |
  12. | MISFIRE_ACCESS  |
  13. | STATE_ACCESS    |
  14. | TRIGGER_ACCESS  |
  15. +-----------------+

通过行级别锁实现多节点处理

  1. /**
  2. * Internal database based lock handler for providing thread/resource locking
  3. * in order to protect resources from being altered by multiple threads at the
  4. * same time.
  5. *
  6. * @author jhouse
  7. */
  8. public class StdRowLockSemaphore extends DBSemaphore {
  9. /*
  10. * Constants.
  11. * 锁定SQL语句
  12. *
  13. */
  14. public static final String SELECT_FOR_LOCK = "SELECT * FROM "
  15. + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_LOCK_NAME
  16. + " = ? FOR UPDATE";
  17. /**
  18. * This constructor is for using the <code>StdRowLockSemaphore</code> as
  19. * a bean.
  20. */
  21. public StdRowLockSemaphore() {
  22. super(DEFAULT_TABLE_PREFIX, null, SELECT_FOR_LOCK);
  23. }
  24. public StdRowLockSemaphore(String tablePrefix, String seletWithLockSQL) {
  25. super(tablePrefix, selectWithLockSQL, SELECT_FOR_LOCK);
  26. }
  27. /**
  28. * Execute the SQL select for update that will lock the proper database row.
  29. * 指定锁定SQL
  30. */
  31. protected void executeSQL(Connection conn, String lockName, String expandedSQL) throws LockException {
  32. PreparedStatement ps = null;
  33. ResultSet rs = null;
  34. try {
  35. ps = conn.prepareStatement(expandedSQL);
  36. ps.setString(1, lockName);
  37. if (getLog().isDebugEnabled()) {
  38. getLog().debug(
  39. "Lock '" + lockName + "' is being obtained: " +
  40. Thread.currentThread().getName());
  41. }
  42. rs = ps.executeQuery();
  43. if (!rs.next()) {
  44. throw new SQLException(Util.rtp(
  45. "No row exists in table " + TABLE_PREFIX_SUBST +
  46. TABLE_LOCKS + " for lock named: " + lockName, getTablePrefix()));
  47. }
  48. } catch (SQLException sqle) {
  49. if (getLog().isDebugEnabled()) {
  50. getLog().debug(
  51. "Lock '" + lockName + "' was not obtained by: " +
  52. Thread.currentThread().getName());
  53. }
  54. throw new LockException("Failure obtaining db row lock: "
  55. + sqle.getMessage(), sqle);
  56. } finally {
  57. if (rs != null) {
  58. try {
  59. rs.close();
  60. } catch (Exception ignore) {
  61. }
  62. }
  63. if (ps != null) {
  64. try {
  65. ps.close();
  66. } catch (Exception ignore) {
  67. }
  68. }
  69. }
  70. }
  71. protected String getSelectWithLockSQL() {
  72. return getSQL();
  73. }
  74. public void setSelectWithLockSQL(String selectWithLockSQL) {
  75. setSQL(selectWithLockSQL);
  76. }
  77. }
  78. /**
  79. * Grants a lock on the identified resource to the calling thread (blocking
  80. * until it is available).
  81. * 获取QRTZ_LOCKS行级锁
  82. * @return true if the lock was obtained.
  83. */
  84. public boolean obtainLock(Connection conn, String lockName) throws LockException {
  85. lockName = lockName.intern();
  86. Logger log = getLog();
  87. if(log.isDebugEnabled()) {
  88. log.debug(
  89. "Lock '" + lockName + "' is desired by: "
  90. + Thread.currentThread().getName());
  91. }
  92. if (!isLockOwner(conn, lockName)) {
  93. executeSQL(conn, lockName, expandedSQL);
  94. if(log.isDebugEnabled()) {
  95. log.debug(
  96. "Lock '" + lockName + "' given to: "
  97. + Thread.currentThread().getName());
  98. }
  99. getThreadLocks().add(lockName);
  100. //getThreadLocksObtainer().put(lockName, new
  101. // Exception("Obtainer..."));
  102. } else if(log.isDebugEnabled()) {
  103. log.debug(
  104. "Lock '" + lockName + "' Is already owned by: "
  105. + Thread.currentThread().getName());
  106. }
  107. return true;
  108. }
  109. /**
  110. * Release the lock on the identified resource if it is held by the calling thread.
  111. * 释放QRTZ_LOCKS行级锁
  112. */
  113. public void releaseLock(Connection conn, String lockName) {
  114. lockName = lockName.intern();
  115. if (isLockOwner(conn, lockName)) {
  116. if(getLog().isDebugEnabled()) {
  117. getLog().debug(
  118. "Lock '" + lockName + "' returned by: "
  119. + Thread.currentThread().getName());
  120. }
  121. getThreadLocks().remove(lockName);
  122. //getThreadLocksObtainer().remove(lockName);
  123. } else if (getLog().isDebugEnabled()) {
  124. getLog().warn(
  125. "Lock '" + lockName + "' attempt to return by: "
  126. + Thread.currentThread().getName()
  127. + " -- but not owner!",
  128. new Exception("stack-trace of wrongful returner"));
  129. }
  130. }

JobStoreTX 控制并发代码

  1. /**
  2. * Execute the given callback having optionally aquired the given lock.
  3. * For <code>JobStoreTX</code>, because it manages its own transactions
  4. * and only has the one datasource, this is the same behavior as
  5. * executeInNonManagedTXLock().
  6. * @param lockName The name of the lock to aquire, for example
  7. * "TRIGGER_ACCESS".  If null, then no lock is aquired, but the
  8. * lockCallback is still executed in a transaction.
  9. *
  10. * @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback)
  11. * @see JobStoreCMT#executeInLock(String, TransactionCallback)
  12. * @see JobStoreSupport#getNonManagedTXConnection()
  13. * @see JobStoreSupport#getConnection()
  14. */
  15. protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException {
  16. return executeInNonManagedTXLock(lockName, txCallback);
  17. }
  18. 使用JobStoreSupport.executeInNonManagedTXLock 实现:
  19. /**
  20. * Execute the given callback having optionally aquired the given lock.
  21. * This uses the non-managed transaction connection.
  22. *
  23. * @param lockName The name of the lock to aquire, for example
  24. * "TRIGGER_ACCESS".  If null, then no lock is aquired, but the
  25. * lockCallback is still executed in a non-managed transaction.
  26. */
  27. protected Object executeInNonManagedTXLock(
  28. String lockName,
  29. TransactionCallback txCallback) throws JobPersistenceException {
  30. boolean transOwner = false;
  31. Connection conn = null;
  32. try {
  33. if (lockName != null) {
  34. // If we aren't using db locks, then delay getting DB connection
  35. // until after acquiring the lock since it isn't needed.
  36. if (getLockHandler().requiresConnection()) {
  37. conn = getNonManagedTXConnection();
  38. }
  39. //获取锁
  40. transOwner = getLockHandler().obtainLock(conn, lockName);
  41. }
  42. if (conn == null) {
  43. conn = getNonManagedTXConnection();
  44. }
  45. //回调需要执行的sql语句如:(更新Trigger为运行中(ACQUIRED),删除执行过的Trigger等)
  46. Object result = txCallback.execute(conn);
  47. //JobStoreTX自身维护事务
  48. commitConnection(conn);
  49. Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
  50. if(sigTime != null && sigTime >= 0) {
  51. signalSchedulingChangeImmediately(sigTime);
  52. }
  53. return result;
  54. } catch (JobPersistenceException e) {
  55. rollbackConnection(conn);
  56. throw e;
  57. } catch (RuntimeException e) {
  58. rollbackConnection(conn);
  59. throw new JobPersistenceException("Unexpected runtime exception: " + e.getMessage(), e);
  60. } finally {
  61. try {
  62. //释放锁
  63. releaseLock(conn, lockName, transOwner);
  64. } finally {
  65. cleanupConnection(conn);
  66. }
  67. }
  68. }

JobStoreCMT 控制并发代码

  1. /**
  2. * Execute the given callback having optionally acquired the given lock.
  3. * Because CMT assumes that the connection is already part of a managed
  4. * transaction, it does not attempt to commit or rollback the
  5. * enclosing transaction.
  6. *
  7. * @param lockName The name of the lock to acquire, for example
  8. * "TRIGGER_ACCESS".  If null, then no lock is acquired, but the
  9. * txCallback is still executed in a transaction.
  10. *
  11. * @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback)
  12. * @see JobStoreTX#executeInLock(String, TransactionCallback)
  13. * @see JobStoreSupport#getNonManagedTXConnection()
  14. * @see JobStoreSupport#getConnection()
  15. */
  16. protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException {
  17. boolean transOwner = false;
  18. Connection conn = null;
  19. try {
  20. if (lockName != null) {
  21. // If we aren't using db locks, then delay getting DB connection
  22. // until after acquiring the lock since it isn't needed.
  23. if (getLockHandler().requiresConnection()) {
  24. conn = getConnection();
  25. }
  26. transOwner = getLockHandler().obtainLock(conn, lockName);
  27. }
  28. if (conn == null) {
  29. conn = getConnection();
  30. }
  31. //没有事务提交操作,与任务共享一个事务
  32. return txCallback.execute(conn);
  33. } finally {
  34. try {
  35. releaseLock(conn, LOCK_TRIGGER_ACCESS, transOwner);
  36. } finally {
  37. cleanupConnection(conn);
  38. }
  39. }
  40. }

CRM中quartz与Spring结合使用  Spring 通过提供org.springframework.scheduling.quartz下的封装类对quartz支持  但是目前存在问题  1:Spring3.0目前不支持Quartz2.x以上版本 
Caused by: java.lang.IncompatibleClassChangeError: class org.springframework.scheduling.quartz.CronTriggerBean  has interface org.quartz.CronTrigger as super class  原因是 org.quartz.CronTrigger在2.0从class变成了一个interface造成IncompatibleClassChangeError错误。 
解决:无解,要想使用spring和quartz结合的方式 只能使用Quartz1.x版本。 
2:org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean报  java.io.NotSerializableException异常,需要自己实现QuartzJobBean。 
解决:spring bug己经在http://jira.springframework.org/browse/SPR-3797找到解决方案,  作者重写了MethodInvokingJobDetailFactoryBean. 
3:Spring内bean必须要实现序列化接口,否则不能通过Sprng 属性注入的方式为job提供业务对象 
解决:

  1. //使用可序列化工具类获取Spring容器对象
  2. @Service("springBeanService")
  3. public class SpringBeanService implements Serializable{private static final long serialVersionUID = -2228376078979553838L;
  4. public <T> T getBean(Class<T> clazz,String beanName){
  5. ApplicationContext context = ContextLoader.getCurrentWebApplicationContext();
  6. return (T)context.getBean(beanName);
  7. }
  8. }

CRM中quartz模块部分代码  1:定义所有job的父类,并负责异常发送邮件任务和日志任务

  1. public abstract class BaseQuartzJob implements Job, Serializable {
  2. private static final long serialVersionUID = 3347549365534415931L;
  3. private Logger logger = LoggerFactory.getLogger(this.getClass());
  4. //定义抽象方法,供子类实现
  5. public abstract void action(JobExecutionContext context);
  6. @Override
  7. public void execute(JobExecutionContext context) throws JobExecutionException {
  8. try {
  9. long start = System.currentTimeMillis();
  10. this.action(context);
  11. long end = System.currentTimeMillis();
  12. JobDetail jobDetail = context.getJobDetail();
  13. Trigger trigger = context.getTrigger();
  14. StringBuilder buffer = new StringBuilder();
  15. buffer.append("jobName = ").append(jobDetail.getName()).append(" triggerName = ")
  16. .append(trigger.getName()).append(" 执行完成 , 耗时: ").append((end - start)).append(" ms");
  17. logger.info(buffer.toString());
  18. } catch (Exception e) {
  19. doResolveException(context != null ? context.getMergedJobDataMap() : null, e);
  20. }
  21. }
  22. @SuppressWarnings("unchecked")
  23. private void doResolveException(JobDataMap dataMap, Exception ex) {
  24. //发送邮件实现此处省略
  25. //...
  26. }
  27. }

2:抽象Quartz操作接口(实现类 toSee: QuartzServiceImpl)

  1. /**
  2. *
  3. * @author zhangyijun
  4. * @created 2012-10-22
  5. *
  6. * @version 1.0
  7. */
  8. @Service
  9. public interface QuartzService {
  10. /**
  11. * 获取所有trigger
  12. * @param page
  13. * @param orderName
  14. * @param sortType
  15. * @return
  16. */
  17. List<Map<String, Object>> getQrtzTriggers(Page page, String orderName, String sortType);
  18. /**
  19. * 获取所有jobDetail
  20. *
  21. * @return
  22. */
  23. List<Map<String, Object>> getQrtzJobDetails();
  24. /**
  25. * 执行Trigger操作
  26. *
  27. * @param name
  28. * @param group
  29. * @param action
  30. * <br/>
  31. */
  32. void executeTriggerAction(String name, String group, Integer action);
  33. /**
  34. * 执行JobDetail操作
  35. *
  36. * @param name
  37. * @param group
  38. * @param action
  39. * <br/>
  40. */
  41. void executeJobAction(String name, String group, Integer action);
  42. /**
  43. * 动态添加trigger
  44. *
  45. * @param jobName
  46. * @param jobGroup
  47. * @param triggerBean
  48. */
  49. void addTrigger(String jobName, String jobGroup, TriggerViewBean triggerBean);
  50. /**
  51. * 定时执行任务
  52. *
  53. * @param jobDetail
  54. * @param data
  55. */
  56. void addTriggerForDate(JobDetail jobDetail, String triggerName , String
  57. triggerGroup , Date date, Map<String, Object> triggerDataMap) ;
  58. /**
  59. * 获取分布式Scheduler列表
  60. *
  61. * @return
  62. */
  63. List<Map<String, Object>> getSchedulers();
  64. /**
  65. * 获取触发器
  66. * @param name
  67. * @param group
  68. * @return
  69. */
  70. public Trigger getTrigger(String name, String group);
  71. /**
  72. * 获取JobDetail
  73. * @param name
  74. * @param group
  75. * @return
  76. */
  77. public JobDetail getJobDetail(String name, String group);
  78. }

3:在Spring配置job,trigger,Scheduler,Listener组件

  1. <!-- 扫描商家状态创建定时任务 -->
  2. <bean id="accountStatusTaskScannerJobDetail"
  3. class="org.springframework.scheduling.quartz.JobDetailBean">
  4. <property name="name" value="accountStatusTaskScannerJobDetail"></property>
  5. <property name="group" value="CrmAccountGroup"></property>
  6. <property name="jobClass" value="***.crm.quartz.job.AccountStatusTaskScannerJob"></property>
  7. <!-- requestsRecovery属性为true,则当Quartz服务被中止后,再次启动任务时会尝试恢复执行之前未完成的所有任务-->
  8. <property name="requestsRecovery" value="true"/>
  9. <!-- 标识job是持久的,删除所有触发器的时候不被删除 -->
  10. <property name="durability" value="true"/>
  11. <property name="volatility" value="false"></property>
  12. </bean>
  13. <bean id="accountStatusTaskScannerTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
  14. <property name="group" value="CrmDealGroup"></property>
  15. <property name="name" value="accountStatusTaskScannerTrigger"></property>
  16. <property name="jobDetail" ref="accountStatusTaskScannerJobDetail"></property>
  17. <property name="cronExpression" value="0 0 1 * * ?"></property>
  18. </bean>
  19. <!-- 定义Quartz 监听器 -->
  20. <bean id="quartzExceptionSchedulerListener"
  21. class="***.crm.quartz.listener.QuartzExceptionSchedulerListener"></bean>
  22. <!-- Quartz调度工厂 -->
  23. <bean id="quartzScheduler"
  24. class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
  25. <property name="quartzProperties">
  26. <props>
  27. <prop key="org.quartz.scheduler.instanceName">CRMscheduler</prop>
  28. <prop key="org.quartz.scheduler.instanceId">AUTO</prop>
  29. <!-- 线程池配置 -->
  30. <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>
  31. <prop key="org.quartz.threadPool.threadCount">20</prop>
  32. <prop key="org.quartz.threadPool.threadPriority">5</prop>
  33. <!-- JobStore 配置 -->
  34. <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
  35. <!-- 集群配置 -->
  36. <prop key="org.quartz.jobStore.isClustered">false</prop>
  37. <prop key="org.quartz.jobStore.clusterCheckinInterval">15000</prop>
  38. <prop key="org.quartz.jobStore.maxMisfiresToHandleAtATime">1</prop>
  39. <!-- 数据源配置 使用DBCP连接池 数据源与dataSource一致 -->
  40. <prop key="org.quartz.jobStore.dataSource">myDS</prop>
  41. <prop key="org.quartz.dataSource.myDS.driver">${database.driverClassName}</prop>
  42. <prop key="org.quartz.dataSource.myDS.URL">${database.url}</prop>
  43. <prop key="org.quartz.dataSource.myDS.user">${database.username}</prop>
  44. <prop key="org.quartz.dataSource.myDS.password">${database.password}</prop>
  45. <prop key="org.quartz.dataSource.myDS.maxConnections">5</prop>
  46. <prop key="org.quartz.jobStore.misfireThreshold">120000</prop>
  47. </props>
  48. </property>
  49. <property name="schedulerName" value="CRMscheduler" />
  50. <!--必须的,QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动-->
  51. <property name="startupDelay" value="30"/>
  52. <property name="applicationContextSchedulerContextKey" value="applicationContextKey" />
  53. <!--可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 -->
  54. <property name="overwriteExistingJobs" value="true" />
  55. <!-- 设置自动启动 -->
  56. <property name="autoStartup" value="true" />
  57. <!-- 注册触发器 -->
  58. <property name="triggers">
  59. <list>
  60. <ref bean="dailyStatisticsTrigger" />
  61. <ref bean="accountGrabedScannerTrigger" />
  62. <ref bean="syncAccountFromPOITrigger" />
  63. <ref bean="userSyncScannerTrigger" />
  64. <ref bean="syncParentBranchFromPOITrigger"/>
  65. <ref bean="privateReminderTrigger" />
  66. <ref bean="onlineBranchesScannerTrigger" />
  67. <ref bean="syncCtContactServiceTrigger" />
  68. <ref bean="dealLinkDianpingScannerTrigger" />
  69. <ref bean="accountStatusTaskScannerTrigger"/>
  70. <ref bean="nDaysActivityScannerTrigger"/>
  71. </list>
  72. </property>
  73. <!-- 注册jobDetail -->
  74. <property name="jobDetails">
  75. <list>
  76. <ref bean="myTestQuartzJobDetail"/>
  77. <ref bean="accountPrivateToProtectedJobDetail"/>
  78. <ref bean="accountProtectedToPublicJobDetail"/>
  79. <ref bean="nDaysActivityToProtectedJobDetail"/>
  80. </list>
  81. </property>
  82. <property name="schedulerListeners">
  83. <list>
  84. <ref bean="quartzExceptionSchedulerListener"/>
  85. </list>
  86. </property>
  87. </bean>

Crm目前可以做到对Quartz实例的监控,操作.动态部署Trigger  项目中使用Quartz集群分享--转载项目中使用Quartz集群分享--转载项目中使用Quartz集群分享--转载  项目中使用Quartz集群分享--转载 
后续待开发功能和问题 
1:目前实现对job,Trigger操作,动态部署Trigger,后续需要加入Calendar(排除特定日期),Listener(动态加载监控),Job的动态部署(只要bean的名称和方法名,就可完成对job生成,部署) 
2:由于Quartz集群中的job目前是在任意一台server中执行,Quartz日志生成各自的系统目录中, quartz日志无法统一. 
3:Quartz2.x已经支持可选节点执行job(期待Spring升级后对新Quartz支持) 
4:Quartz内部的DB操作大量Trigger存在严重竞争问题,瞬间大量trigger执行,目前只能通过(org.quartz.jobStore.tablePrefix = QRTZ)分表操作,存在长时间lock_wait(新版本据说有提高); 
5:如果有需要,可以抽取出Quartz,变成单独的服务,供其它系统调度使用使用

本文转自:http://www.cnblogs.com/davidwang456/p/4205572.html