spring boot下定时任务quartz的集群使用

时间:2021-12-31 02:27:32

单机模式下的定时任务调用很简单,有很多可实现的方案,这里不多说了。

这里说一下集群部署的情况下,定时任务的使用。这种情况下,quartz是一个比较好的选择。简单,稳定。


想象一下,现在有 A , B , C  3 台机器同时作为集群服务器对外统一提供 SERVICE :

A , B , C   3 台机器上各有一个 QUARTZ  Job,它们会按照即定的 SCHEDULE 自动执行各自的任务。

先不说实现什么功能,这样的架构有点像多线程。由于三台SERVER 里都有 QUARTZ ,因此会存在重复处理 TASK 的现象。

一般外面的解决方案是只在一台 服务器上装 QUARTZ ,其它两台不装,这样的话其实就是单机了,quartz存在单点问题,一旦装有quartz的服务器宕了。服务就无法提供了。

当然还有其他一些解决方案,无非就是改 QUARTZ JOB 的代码了,这对程序开发人员来说比较痛苦;


而quartz本身提供了很好的集群方案。下面我们来说一下在spring boot下的集成:


quartz集群需要数据库的支持(JobStore TX或者JobStoreCMT),从本质上来说,是使集群上的每一个节点通过共享同一个数据库来工作的


一、准备工作:

到quartz官网下载最新的包:http://www.quartz-scheduler.org/downloads/


spring boot下定时任务quartz的集群使用


解压后,可以看到结构目录。在\docs\dbTables下选择合适你数据库的SQL执行文件,创建quartz集群需要的表(共11张表)

spring boot下定时任务quartz的集群使用spring boot下定时任务quartz的集群使用

二、与springboot整合,这里假定你已经配好数据库连接等,项目能正常运行:


添加quartz的支持

 <!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>


添加quartz配置:

#quartz集群配置
# ===========================================================================
# Configure Main Scheduler Properties 调度器属性
# ===========================================================================
#调度标识名 集群中每一个实例都必须使用相同的名称
org.quartz.scheduler.instanceName=DefaultQuartzScheduler
#ID设置为自动获取 每一个必须不同
org.quartz.scheduler.instanceid=AUTO
#============================================================================
# Configure ThreadPool
#============================================================================
#线程池的实现类(一般使用SimpleThreadPool即可满足几乎所有用户的需求)
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
#指定线程数,至少为1(无默认值)(一般设置为1-100直接的整数合适)
org.quartz.threadPool.threadCount = 25
#设置线程的优先级(最大为java.lang.Thread.MAX_PRIORITY 10,最小为Thread.MIN_PRIORITY 1,默认为5)
org.quartz.threadPool.threadPriority = 5
#============================================================================
# Configure JobStore
#============================================================================
# 信息保存时间 默认值60秒
org.quartz.jobStore.misfireThreshold = 60000
#数据保存方式为数据库持久化
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
#数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以满足大部分数据库
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#JobDataMaps是否都为String类型
org.quartz.jobStore.useProperties = false
#数据库别名 随便取
org.quartz.jobStore.dataSource = myDS
#表的前缀,默认QRTZ_
org.quartz.jobStore.tablePrefix = QRTZ_
#是否加入集群
org.quartz.jobStore.isClustered = true
#调度实例失效的检查时间间隔
org.quartz.jobStore.clusterCheckinInterval = 20000
#============================================================================
# Configure Datasources
#============================================================================
#数据库引擎
org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
#数据库连接
org.quartz.dataSource.myDS.URL = jdbc:mysql://172.30.12.14:7001/rbl_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowMultiQueries=true
#数据库用户
org.quartz.dataSource.myDS.user = root
#数据库密码
org.quartz.dataSource.myDS.password = 123456
#允许最大连接
org.quartz.dataSource.myDS.maxConnections = 5
#验证查询sql,可以不设置
org.quartz.dataSource.myDS.validationQuery=select 0 from dual

这里提供两种实现方式(推荐使用第二种,添加了无用任务自动清理)

第一种:

添加任务注入的工厂类,否则任务注入会出错:

/*
* 文件名:InvokingJobDetailDetailFactory.java 版权:Copyright by www.poly.com 描述: 修改人:gogym
* 修改时间:2017年11月9日 跟踪单号: 修改单号: 修改内容:
*/

package com.poly.rbl.schedule;


import java.lang.reflect.Method;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;


public class InvokingJobDetailDetailFactory extends QuartzJobBean
{

// 计划任务所在类
private String targetObject;

// 具体需要执行的计划任务
private String targetMethod;

private ApplicationContext ctx;

@Override
protected void executeInternal(JobExecutionContext context)
throws JobExecutionException
{
try
{
Object otargetObject = ctx.getBean(targetObject);
Method m = null;
try
{
m = otargetObject.getClass().getMethod(targetMethod);
m.invoke(otargetObject);
}
catch (SecurityException e)
{
e.printStackTrace();
}
catch (NoSuchMethodException e)
{
e.printStackTrace();
}
}
catch (Exception e)
{
throw new JobExecutionException(e);
}
}

public void setApplicationContext(ApplicationContext applicationContext)
{
this.ctx = applicationContext;
}

public void setTargetObject(String targetObject)
{
this.targetObject = targetObject;
}

public void setTargetMethod(String targetMethod)
{
this.targetMethod = targetMethod;
}
}


配置quartz:

/*
* 文件名:QuartzConfig.java 版权:Copyright by www.poly.com 描述: 修改人:gogym 修改时间:2017年11月9日 跟踪单号: 修改单号:
* 修改内容:
*/

package com.poly.rbl.configuration;


import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.quartz.JobDetail;
import org.quartz.Trigger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import com.poly.rbl.schedule.InvokingJobDetailDetailFactory;


@Configuration
public class QuartzConfig
{

// 配置文件路径
static final String QUARTZ_CONFIG = "properties/quartz.properties";

/**
* Description: 定义调用对象和调用对象的方法
*
* @param redisQuartzJob
* @return
* @see
*/
@Bean(name = "enjoyQuartzJobTask")
public JobDetailFactoryBean enjoyQuartzJobTask()
{
//集群模式下必须使用JobDetailFactoryBean, MethodInvokingJobDetailFactoryBean 类中的 methodInvoking 方法,是不支持序列化的
JobDetailFactoryBean bean = new JobDetailFactoryBean();
bean.setName("enjoyQuartzJob");// 设置任务的名字
bean.setGroup("enjoyQuartzJobGroup");// 设置任务的分组,这些属性都可以存储在数据库中,在多任务的时候使用
bean.setDurability(true);
bean.setRequestsRecovery(true);
bean.setJobClass(InvokingJobDetailDetailFactory.class);
Map<String, String> map = new HashMap<>();
map.put("targetObject", "enjoyQuartzJob");//任务所在的类
map.put("targetMethod", "enjoyWork");//具体执行任务的方法
bean.setJobDataAsMap(map);
return bean;
}

/**
* Description: 定义任务触发规则
*
* @param redisTrigger
* @return
* @see
*/
@Bean(name = "enjoyQuartzJobTrigger")
public CronTriggerFactoryBean enjoyQuartzJobTrigger(@Qualifier("enjoyQuartzJobTask") JobDetail enjoyQuartzJobTask)
{
CronTriggerFactoryBean trigger = new CronTriggerFactoryBean();
trigger.setJobDetail(enjoyQuartzJobTask);
trigger.setCronExpression("0/5 * * * * ?");// cron表达式
trigger.setName("enjoyQuartzJobTrigger");// trigger的name
return trigger;
}

@Bean
public SchedulerFactoryBean schedulerFactoryBean(@Qualifier("enjoyQuartzJobTrigger") CronTriggerFactoryBean enjoyQuartzJobTrigger)
throws IOException
{
SchedulerFactoryBean factory = new SchedulerFactoryBean();
// 用于quartz集群,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
factory.setOverwriteExistingJobs(true);
// QuartzScheduler 延时启动,应用启动完10秒后 QuartzScheduler 再启动
factory.setStartupDelay(10);
// 直接使用配置文件,用于quartz集群,加载quartz数据源配置
factory.setConfigLocation(new ClassPathResource(QUARTZ_CONFIG));
factory.setAutoStartup(true);
//集群需要通过QuartzJobBean注入,需要设置上下文
factory.setApplicationContextSchedulerContextKey("applicationContext");
// 注册触发器
Trigger[] triggers = {enjoyQuartzJobTrigger.getObject()};
factory.setTriggers(triggers);

return factory;
}

}

JOB任务:


/*
* 文件名:EnjoyQuartzJob.java 版权:Copyright by www.poly.com 描述: 修改人:gogym 修改时间:2017年11月9日 跟踪单号: 修改单号:
* 修改内容:
*/

package com.poly.rbl.schedule;


import org.springframework.stereotype.Component;


@Component("enjoyQuartzJob")
public class EnjoyQuartzJob
{

public void enjoyWork()
{
System.out.println("定时任务执行");
}

}


第二种,更加简洁好用:


/*
* 文件名:QuartzConfig.java 版权:Copyright by www.poly.com 描述: 修改人:gogym 修改时间:2017年11月9日 跟踪单号: 修改单号:
* 修改内容:
*/

package com.poly.pay.configuration;


import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerKey;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import com.poly.pay.schedule.JobRefundWeichartBean;


@Configuration
public class QuartzConfig
{

// 配置文件路径
static final String QUARTZ_CONFIG = "properties/quartz.properties";

// 定时任务组名称
public static final String Quartz_Group_Name = "enjoyQuartzJobGroup";
//定时任务方法后缀
public static final String Quartz_Job_Suffix = "_job";
//定时任务触发器后缀
public static final String Quartz_Trigger_Suffix = "_trigger";


@Bean(name = "triggers")
public CronTriggerImpl[] createTriggers()
throws ParseException
{
List<CronTriggerImpl> l = new ArrayList<CronTriggerImpl>();
l.add(createTrigger(JobRefundWeichartBean.class, "0/20 * * * * ?"));
//l.add(createTrigger(JobRefundWeichartBean.class, "0/20 * * * * ?"));
//按你的需要添加多个任务:任务所在类.class cron表达式

return l.toArray(new CronTriggerImpl[l.size()]);
}

private JobDetail create(Class<?> c)
{
JobDetailFactoryBean d = new JobDetailFactoryBean();
d.setDurability(true);
d.setRequestsRecovery(true);
d.setJobClass(c);
d.setName(c.getSimpleName() + Quartz_Job_Suffix);
d.setGroup(Quartz_Group_Name);
d.afterPropertiesSet();
return d.getObject();
}

private CronTriggerImpl createTrigger(Class<?> t, String cronExpression)
throws ParseException
{
CronTriggerFactoryBean c = new CronTriggerFactoryBean();
c.setJobDetail(create(t));
c.setCronExpression(cronExpression);
c.setName(t.getSimpleName() + Quartz_Trigger_Suffix);
c.setGroup(Quartz_Group_Name);
c.afterPropertiesSet();
return (CronTriggerImpl)c.getObject();
}

@Bean(name = "schedulerFactoryBean")
public SchedulerFactoryBean schedulerFactoryBean(@Qualifier("triggers") CronTriggerImpl[] triggers)
throws IOException, ParseException, SchedulerException
{
SchedulerFactoryBean factory = new SchedulerFactoryBean();
// 用于quartz集群,QuartzScheduler,启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
factory.setOverwriteExistingJobs(true);
// QuartzScheduler 延时启动,应用启动完10秒后 QuartzScheduler 再启动
factory.setStartupDelay(10);
// 直接使用配置文件,用于quartz集群,加载quartz数据源配置
factory.setConfigLocation(new ClassPathResource(QUARTZ_CONFIG));
factory.setAutoStartup(true);
// 集群需要通过QuartzJobBean注入,需要设置上下文
factory.setApplicationContextSchedulerContextKey("applicationContext");
// 注册触发器
// factory.getScheduler().pauseAll();
factory.setTriggers(createTriggers());// 直接使用配置文件
// factory.setConfigLocation(new
// FileSystemResource(this.getClass().getResource("/quartz.properties").getPath()));
return factory;
}

/**
* 添加该方法的目的在于一个使用场景。如果代码中删除了不需要的定时任务,但是数据库中不会删除掉,会导致之前
* 的定时任务一直在运行,如果把定时任务依赖的类删除了,就会导致报错,找不到目标。所以配置动态删除任务
*/
@Bean
public String fulsh(@Qualifier("schedulerFactoryBean") SchedulerFactoryBean schedulerFactoryBean,
@Qualifier("triggers") CronTriggerImpl[] triggers)
throws SchedulerException
{
try
{
Scheduler s = schedulerFactoryBean.getScheduler();
if (null == s)
{
return "Scheduler is null";
}

// 最新配置的任务
List<String> newTriNames = new ArrayList<String>();
if (null != triggers)
{
for (CronTriggerImpl cronTriggerImpl : triggers)
{
newTriNames.add(cronTriggerImpl.getName());
}
}

// 现有数据库中已有的任务
Set<TriggerKey> myGroupTriggers = s.getTriggerKeys(GroupMatcher.triggerGroupEquals(Quartz_Group_Name));
if (null == myGroupTriggers || myGroupTriggers.size() == 0)
{
return "myGroupTriggers is null";
}

if (newTriNames != null && newTriNames.size() > 0)
{
for (TriggerKey triggerKey : myGroupTriggers)
{
String dbTriggerName = triggerKey.getName();
if (!newTriNames.contains(dbTriggerName))
{
// 暂停 触发器
s.pauseTrigger(triggerKey);
Trigger g = s.getTrigger(triggerKey);
JobKey jk = null;
if (null != g)
{
jk = g.getJobKey();
}
// 停止触发器
s.pauseTrigger(triggerKey);
// 注销 触发器
s.unscheduleJob(triggerKey);
if (null != jk)
{
// 暂停任务
s.pauseJob(jk);
// 删除任务
s.deleteJob(jk);
}
}
}
}
// 重要,如果不恢复所有,会导致无法使用
s.resumeAll();
}
catch (Exception e)
{
e.printStackTrace();
return "Exception:" + e.getMessage();
}
return "success";
}
}


任务类:


/*
* 文件名:JobRefundWeichartBean.java 版权:Copyright by www.poly.com 描述: 修改人:gogym 修改时间:2017年12月28日 跟踪单号:
* 修改单号: 修改内容:
*/

package com.poly.pay.schedule;


import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.PersistJobDataAfterExecution;
import org.springframework.scheduling.quartz.QuartzJobBean;


// 1.修改数据,防止并发,2不允许并发执行
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class JobRefundWeichartBean extends QuartzJobBean
{

@Override
protected void executeInternal(JobExecutionContext arg0)
throws JobExecutionException
{

System.out.println("执行方法");
}

}


启动即可看到数据库插入了任务相关的信息。集群完成。


需要注意的是:

当你运行水平集群时,时钟应当要同步,以免出现离奇且不可预知的行为。假如时钟没能够同步,Scheduler 实例将对其他节点的状态产生混乱。有几种简单的方法来保证时钟何持同步,而且也没有理由不这么做。最简单的同步计算机时钟的方式是使用某一个 Internet 时间服务器(Internet Time Server ITS)。


常用解决方案:

服务器中配置时间同步只要一台服务器同步互联网的时钟服务器,其它的服务以这台为时钟服务器!


Linux配置(局域网的客户端)

 

1、安装

   yum install ntp   (centos的安装方法)

2、先运行 # ntpdate 192.168.1.33 同步一次.

3、然后通过crontab计时器配置一个定时同步的任务,例如每月一号零点零分同步一次.代码如下:

# crontab -e  //添下面一行,新建的定时任务文件保存在/var/spool/cron/下,以创建人的用户名命名

0 0 1 * * /etc/ntp/ntprsync.sh  //每小时同步一次。

 

4、创建文件

 

# vi ntprsync.sh    //内容如下
#!/bin/sh
/usr/sbin/ntpdate 192.168.1.33 //时钟服务器的IP
/sbin/hwclock –w

 

5、设置权限 chmod 777 ntprsync.sh

6、注意防火墙的设置.

7、成功。

8、服务启动。

 

/sbin/service crond start //启动服务/sbin/service crond stop //关闭服务/sbin/service crond restart //重启服务/sbin/service crond reload //重新载入配置