Spring+quartz 实现动态管理任务

时间:2021-10-24 08:07:46
在实际项目应用中经常会用到定时任务,可以通过quartz和spring的简单配置即可完成,但如果要改变任务的执行时间、频率,废弃任务等就需要改变配置甚至代码需要重启服务器,这里介绍一下如何通过quartz与spring的组合实现动态的改变定时任务的状态的一个实现。

本文章适合对quartz和spring有一定了解的读者。
spring版本为3.2 quartz版本为2.2.1 如果使用了quartz2.2.1 则spring版本需3.1以上
1、spring配置文件中引入注册bean

<bean id="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" />
<bean id="loadTask" class="*.quartz.LoadTask" init-method="initTask" />

2、LoadTask.java(初始化加载数据库任务)

public class LoadTask {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
private TaskService taskService;

public void initTask() throws SchedulerException {
// 可执行的任务列表
List<QuartzJobBean> taskList = taskService.getTaskList();
logger.info("初始化加载定时任务......");
for (QuartzJobBean job : taskList) {
taskService.addJob(job);
}
}
}

3、TaskService.java

Service(value = "taskService")
public class TaskService {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Reference(registry = "real-registry")
private DataExchangeModuleService moduleService;

/**
* 获取单个任务
* @param jobName
* @param jobGroup
* @return
* @throws SchedulerException
*/
public QuartzJobBean getJob(String jobName,String jobGroup) throws SchedulerException{
QuartzJobBean job = null;
Scheduler scheduler = schedulerFactoryBean.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (null != trigger) {
job = new QuartzJobBean();
job.setJobName(jobName);
job.setJobGroup(jobGroup);
job.setDescription("触发器:" + trigger.getKey());
job.setNextTime(trigger.getNextFireTime()); //下次触发时间
job.setPreviousTime(trigger.getPreviousFireTime());//上次触发时间
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
job.setJobStatus(triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCronExpression(cronExpression);
}
}
return job;
}
/**
* 获取所有任务
* @return
* @throws SchedulerException
*/
public List<QuartzJobBean> getAllJobs() throws SchedulerException{
Scheduler scheduler = schedulerFactoryBean.getScheduler();
GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
List<QuartzJobBean> jobList = new ArrayList<QuartzJobBean>();
for (JobKey jobKey : jobKeys) {
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) {
QuartzJobBean job = new QuartzJobBean();
job.setJobName(jobKey.getName());
job.setJobGroup(jobKey.getGroup());
job.setDescription("触发器:" + trigger.getKey());

job.setNextTime(trigger.getNextFireTime()); //下次触发时间
// trigger.getFinalFireTime();//最后一次执行时间
job.setPreviousTime(trigger.getPreviousFireTime());//上次触发时间
// trigger.getStartTime();//开始时间
// trigger.getEndTime();//结束时间
//触发器当前状态
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
job.setJobStatus(triggerState.name());
//
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCronExpression(cronExpression);
}
jobList.add(job);
}
}
return jobList;
}

/**
* 所有正在运行的job
*
* @return
* @throws SchedulerException
*/
public List<QuartzJobBean> getRunningJob() throws SchedulerException {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
List<QuartzJobBean> jobList = new ArrayList<QuartzJobBean>(executingJobs.size());
for (JobExecutionContext executingJob : executingJobs) {
QuartzJobBean job = new QuartzJobBean();
JobDetail jobDetail = executingJob.getJobDetail();
JobKey jobKey = jobDetail.getKey();
Trigger trigger = executingJob.getTrigger();
job.setJobName(jobKey.getName());
job.setJobGroup(jobKey.getGroup());
job.setDescription("触发器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
job.setJobStatus(triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCronExpression(cronExpression);
}
jobList.add(job);
}
return jobList;
}

/**
* 查询任务列表
* @return
*/
public List<QuartzJobBean> getTaskList(){
List<QuartzJobBean> jobs = new ArrayList<QuartzJobBean>();
List<DataExchangeModuleBO> taskList = moduleService.findAll();
QuartzJobBean job = null;
for(DataExchangeModuleBO bo:taskList){
job = getTask(bo);
if(job!=null){
jobs.add(job);
}
}
return jobs;
}
/**
* 查询任务列表
* @return
*/
public QuartzJobBean getTask(DataExchangeModuleBO bo){
QuartzJobBean job = null;
if(bo!=null){
job = new QuartzJobBean();
job.setJobId(String.valueOf(bo.getId()));
job.setJobName(bo.getModuleName());
job.setJobGroup(bo.getSystemName());
job.setJobStatus(bo.getStatus());//初始状态
job.setCronExpression(bo.getCron());
job.setSpringId(bo.getSpringId());
job.setConcurrent(bo.getConcurrent());
job.setJobClass(bo.getClazzName());
job.setMethodName(bo.getMethodName());
job.setDescription(bo.getSystemName()+"->"+bo.getModuleName()+"->"+bo.getInterfaceInfo());
}
return job;
}

/**
* 添加任务
*
* @param scheduleJob
* @throws SchedulerException
*/
public boolean addJob(QuartzJobBean job) throws SchedulerException {
if (job == null || !QuartzJobBean.STATUS_RUNNING.equals(job.getJobStatus())) {
return false;
}
if(!TaskUtils.isValidExpression(job.getCronExpression())){
logger.error("时间表达式错误("+job.getJobName()+","+job.getJobGroup()+"),"+job.getCronExpression());
return false;
}else{
Scheduler scheduler = schedulerFactoryBean.getScheduler();
// 任务名称和任务组设置规则: // 名称:task_1 .. // 组 :group_1 ..
TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 不存在,创建一个
if (null == trigger) {
//是否允许并发执行
Class<? extends Job> clazz = QuartzJobBean.CONCURRENT_IS.equals(job.isConcurrent()) ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class;
JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build();
jobDetail.getJobDataMap().put("scheduleJob", job);
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
// 按新的表达式构建一个新的trigger
trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
} else { // trigger已存在,则更新相应的定时设置
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
}
}
return true;
}

/**
* 暂停任务
* @param scheduleJob
* @return
*/
public boolean pauseJob(QuartzJobBean scheduleJob){
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
try {
scheduler.pauseJob(jobKey);
return true;
} catch (SchedulerException e) {
}
return false;
}

/**
* 恢复任务
* @param scheduleJob
* @return
*/
public boolean resumeJob(QuartzJobBean scheduleJob){
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
try {
scheduler.resumeJob(jobKey);
return true;
} catch (SchedulerException e) {
}
return false;
}

/**
* 删除任务
*/
public boolean deleteJob(QuartzJobBean scheduleJob){
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
try{
scheduler.deleteJob(jobKey);
return true;
} catch (SchedulerException e) {
}
return false;
}

/**
* 立即执行一个任务
* @param scheduleJob
* @throws SchedulerException
*/
public void testJob(QuartzJobBean scheduleJob) throws SchedulerException{
Scheduler scheduler = schedulerFactoryBean.getScheduler();
JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
scheduler.triggerJob(jobKey);
}

/**
* 更新任务时间表达式
* @param scheduleJob
* @throws SchedulerException
*/
public void updateCronExpression(QuartzJobBean scheduleJob) throws SchedulerException{
Scheduler scheduler = schedulerFactoryBean.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
//获取trigger,即在spring配置文件中定义的 bean id="myTrigger"
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
//表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression());
//按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
//按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
}

4、TaskUtils.java

public class TaskUtils {

private final static Logger logger = LoggerFactory.getLogger(TaskUtils.class);

/**
* 通过反射调用scheduleJob中定义的方法
*
* @param scheduleJob
*/
public static void invokMethod(QuartzJobBean scheduleJob) {
Object object = null;
Class<?> clazz = null;
//springId不为空先按springId查找bean
if (StringUtils.isNotBlank(scheduleJob.getSpringId())) {
object = SpringUtils.getBean(scheduleJob.getSpringId());
} else if (StringUtils.isNotBlank(scheduleJob.getJobClass())) {//按jobClass查找
try {
clazz = Class.forName(scheduleJob.getJobClass());
object = clazz.newInstance();
} catch (Exception e) {
e.printStackTrace();
}
}
if (object == null) {
logger.error("任务名称 = [" + scheduleJob.getJobName() + "]---------------未启动成功,请检查执行类是否配置正确!!!");
return;
}
clazz = object.getClass();
Method method = null;
try {
method = clazz.getDeclaredMethod(scheduleJob.getMethodName());
} catch (NoSuchMethodException e) {
logger.error("任务名称 = [" + scheduleJob.getJobName() + "]---------------未启动成功,请检查执行类的方法名是否设置错误!!!");
} catch (SecurityException e) {
e.printStackTrace();
}
if (method != null) {
try {
method.invoke(object);
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvocationTargetException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

/**
* 判断cron时间表达式正确性
* @param cronExpression
* @return
*/
public static boolean isValidExpression(final String cronExpression){
//CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
CronTriggerImpl trigger = new CronTriggerImpl();
try {
trigger.setCronExpression(cronExpression);
Date date = trigger.computeFirstFireTime(null);
return date != null && date.after(new Date());
} catch (ParseException e) {
}
return false;
}

public static void main(String[] args){
System.out.println(isValidExpression("0 0/1 * * * ?"));
}

/*
* 任务运行状态
*/
public enum TASK_STATE{
NONE("NONE","未知"),
NORMAL("NORMAL", "正常运行"),
PAUSED("PAUSED", "暂停状态"),
COMPLETE("COMPLETE",""),
ERROR("ERROR","错误状态"),
BLOCKED("BLOCKED","锁定状态");

private TASK_STATE(String index,String name) {
this.name = name;
this.index = index;
}
private String index;
private String name;
public String getName() {
return name;
}
public String getIndex() {
return index;
}
}
}

5、QuartzJobFactory.java

/**
* Job实现类 无状态
* 若此方法上一次还未执行完,而下一次执行时间轮到时则该方法也可并发执行
* @author root
*/
public class QuartzJobFactory implements Job {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
QuartzJobBean scheduleJob = (QuartzJobBean)context.getMergedJobDataMap().get("scheduleJob");
logger.info("运行任务名称 = [" + scheduleJob.getJobName() + "]");
TaskUtils.invokMethod(scheduleJob);
}

}

6、QuartzJobFactoryDisallowConcurrentExecution.java

/**
* Job有状态实现类,不允许并发执行
* 若一个方法一次执行不完下次轮转时则等待该方法执行完后才执行下一次操作
* 主要是通过注解:@DisallowConcurrentExecution
* @author root
*
*/
@DisallowConcurrentExecution
public class QuartzJobFactoryDisallowConcurrentExecution implements Job {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
QuartzJobBean scheduleJob = (QuartzJobBean)context.getMergedJobDataMap().get("scheduleJob");
logger.info("运行任务名称 = [" + scheduleJob.getJobName() + "]");
TaskUtils.invokMethod(scheduleJob);
}

}



7、QuartzJobBean.java

public static final String STATUS_RUNNING = "1";
public static final String STATUS_NOT_RUNNING = "0";
public static final String CONCURRENT_IS = "1";
public static final String CONCURRENT_NOT = "0";

/** 任务id */
private String jobId;

/** 任务名称 */
private String jobName;

/** 任务分组,任务名称+组名称应该是唯一的 */
private String jobGroup;

/** 任务初始状态 0禁用 1启用 2删除*/
private String jobStatus;

/** 任务是否有状态(并发) */
private String isConcurrent = "1";

/** 任务运行时间表达式 */
private String cronExpression;

/** 任务描述 */
private String description;

/** 任务调用类在spring中注册的bean id,如果spingId不为空,则按springId查找 */
private String springId;

/** 任务调用类名,包名+类名,通过类反射调用 ,如果spingId为空,则按jobClass查找 */
private String jobClass;

/** 任务调用的方法名 */
private String methodName;

/** 启动时间 */
private Date startTime;

/** 前一次运行时间 */
private Date previousTime;

/** 下次运行时间 */
private Date nextTime;

8、DECMService.java
此类就是所有具体要调用的任务方法都写在此服务类中。


转自:http://windows9834.blog.163.com