首先说一下原因 :项目需要引入定时任务,初步选型之后决定使用quartz,因为需要修改执行时间。但是后台是分布式的,就引入了重复执行问题。quartz本身支持的分布式模式需要在数据库建11张表,经过审批之后被拒绝,那么只能基于redis自己去加锁,最后仍然没有做到完美,因为首先没有完全解耦,需要实现job接口。考虑采用注解替换,可以实现ApplicationContextAware并且基于spring的扫描来找到所有注解类。其次是没有去解析corn,解锁方式只是固定时间解锁,那么如果有低频率执行就不适合了,因为第二次执行时候还没有结果会导致失败。
但是暂时所有的业务都是每天跑批。基于现状,完成了基于redis和内存的分布式quartz
QuartzManager.java,实现基本的quartz API
package cn.com.yitong.modules.quartz.demo; import java.util.Map; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.JobListener; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SchedulerFactory; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.quartz.TriggerListener; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.com.yitong.modules.quartz.listener.MyJobListener; import cn.com.yitong.modules.quartz.listener.MyTriggerListener; public class QuartzManager { private static Logger logger=LoggerFactory.getLogger(QuartzManager.class); private static JobFactory jobFactory = new JobFactory(); private static SchedulerFactory schedulerFactory = new StdSchedulerFactory(); static { JobListener myJobListener = new MyJobListener(); TriggerListener myTriggerListener=new MyTriggerListener(); try { schedulerFactory.getScheduler().getListenerManager().addTriggerListener(myTriggerListener); schedulerFactory.getScheduler().getListenerManager().addJobListener(myJobListener); } catch (SchedulerException e) { logger.error("quartz manager listener init error",e); } } public static JobFactory getJobFactory(){ return jobFactory; } public static SchedulerFactory getSchedulerFactory(){ return schedulerFactory; } public static void runJob(String jobName,String jobGroupName,Map<String,Object> params){ try{ Scheduler sched = schedulerFactory.getScheduler(); JobDataMap jobDataMap=new JobDataMap(); jobDataMap.put("quartz_passFlagForQuartzJob", "true"); jobDataMap.putAll(params); sched.triggerJob(JobKey.jobKey(jobName, jobGroupName),jobDataMap); // 任务名,任务组,任务执行类 }catch(Exception e){ throw new RuntimeException(e); } } /** * @Description: 添加一个定时任务 * * @param jobName 任务名 * @param jobGroupName 任务组名 * @param triggerName 触发器名 * @param triggerGroupName 触发器组名 * @param jobClass 任务 s * @param cron 时间设置,参考quartz说明文档 */ @SuppressWarnings({"unchecked", "rawtypes"}) public static void addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, String className, String cron) { try { Class jobClass = Class.forName(className); Scheduler sched = schedulerFactory.getScheduler(); sched.setJobFactory(jobFactory); // 任务名,任务组,任务执行类 JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build(); // 触发器 TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger(); // 触发器名,触发器组 triggerBuilder.withIdentity(triggerName, triggerGroupName); triggerBuilder.startNow(); // 触发器时间设定 triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron)); // 创建Trigger对象 CronTrigger trigger = (CronTrigger) triggerBuilder.build(); // 调度容器设置JobDetail和Trigger sched.scheduleJob(jobDetail, trigger); // 启动 if (!sched.isShutdown()) { sched.start(); } } catch (Exception e) { throw new RuntimeException(e); } } /** * @Description: 修改一个任务的触发时间 * * @param jobName * @param jobGroupName * @param triggerName 触发器名 * @param triggerGroupName 触发器组名 * @param cron 时间设置,参考quartz说明文档 */ public static void modifyJobTime(String jobName, String jobGroupName, String triggerName, String triggerGroupName, String cron) { try { Scheduler sched = schedulerFactory.getScheduler(); sched.setJobFactory(jobFactory); TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName); CronTrigger trigger = (CronTrigger) sched.getTrigger(triggerKey); if (trigger == null) { return; } String oldTime = trigger.getCronExpression(); if (!oldTime.equalsIgnoreCase(cron)) { /** 方式一 :调用 rescheduleJob 开始 */ // 触发器 TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger(); // 触发器名,触发器组 triggerBuilder.withIdentity(triggerName, triggerGroupName); triggerBuilder.startNow(); // 触发器时间设定 triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron)); // 创建Trigger对象 trigger = (CronTrigger) triggerBuilder.build(); // 方式一 :修改一个任务的触发时间 sched.rescheduleJob(triggerKey, trigger); /** 方式一 :调用 rescheduleJob 结束 */ /** 方式二:先删除,然后在创建一个新的Job */ // JobDetail jobDetail = sched.getJobDetail(JobKey.jobKey(jobName, jobGroupName)); // Class<? extends Job> jobClass = jobDetail.getJobClass(); // removeJob(jobName, jobGroupName, triggerName, triggerGroupName); // addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, cron); /** 方式二 :先删除,然后在创建一个新的Job */ } } catch (Exception e) { throw new RuntimeException(e); } } /** * @Description: 移除一个任务 * * @param jobName * @param jobGroupName * @param triggerName * @param triggerGroupName */ public static void removeJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName) { try { Scheduler sched = schedulerFactory.getScheduler(); sched.setJobFactory(jobFactory); TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName); sched.pauseTrigger(triggerKey);// 停止触发器 sched.unscheduleJob(triggerKey);// 移除触发器 sched.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 删除任务 } catch (Exception e) { throw new RuntimeException(e); } } /** * @Description:启动所有定时任务 */ public static void startJobs() { try { Scheduler sched = schedulerFactory.getScheduler(); sched.setJobFactory(jobFactory); sched.start(); } catch (Exception e) { throw new RuntimeException(e); } } /** * @Description:关闭所有定时任务 */ public static void shutdownJobs() { try { Scheduler sched = schedulerFactory.getScheduler(); sched.setJobFactory(jobFactory); if (!sched.isShutdown()) { sched.shutdown(); } } catch (Exception e) { throw new RuntimeException(e); } } }
JobFactory.java 为实现双容器的javabean,需要同时在spring和quartz中注册,这类需要在spring的xml中配置并且非延时加载。
package cn.com.yitong.modules.quartz.demo; import org.quartz.spi.TriggerFiredBundle; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.scheduling.quartz.AdaptableJobFactory; public class JobFactory extends AdaptableJobFactory implements ApplicationContextAware { private static ApplicationContext ctx = null; protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { AutowireCapableBeanFactory capableBeanFactory = ctx.getAutowireCapableBeanFactory(); Object jobInstance = super.createJobInstance(bundle); capableBeanFactory.autowireBean(jobInstance); return jobInstance; } @Override public void setApplicationContext(ApplicationContext arg0) throws BeansException { ctx = arg0; } }
与前端页面交互的Controller,这个大同小异,就是提供了一些配置项去在页面设置,满足业务需求
package cn.com.yitong.modules.quartz.demo; import java.util.Date; import java.util.List; import java.util.Map; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.util.WebUtils; import com.thinkgem.jeesite.common.config.Global; import cn.com.yitong.common.persistence.Page; import cn.com.yitong.common.service.mybatis.MybatisBaseService; import cn.com.yitong.common.web.mybatis.MybatisBaseController; import cn.com.yitong.modules.quartz.model.SysQuartz; import cn.com.yitong.modules.quartz.model.Test4quartz; import cn.com.yitong.modules.quartz.service.SysQuartzService; import cn.com.yitong.modules.quartz.service.Test4quartzService; @Controller @RequestMapping("${adminPath}" + QuartzController.BASE_URL) public class QuartzController implements ApplicationListener<ContextRefreshedEvent> { public static final String BASE_URL = "/quartz/demo/"; private static final String BASE_PATH = "modules/sys/quartzList"; private Logger log=LoggerFactory.getLogger(getClass()); @Resource Test4quartzService test4quartzService; @Resource SysQuartzService sysQuartzService; @RequestMapping({"/list",""}) public String quartzList(SysQuartz sysQuartz,HttpServletRequest req, HttpServletResponse resp, Model model){ Map<String,Object> params= WebUtils.getParametersStartingWith(req, null); Page<SysQuartz> page = new Page<SysQuartz>(req, resp); params.put("page", page); List<SysQuartz> quartzList=sysQuartzService.quartzList(params); System.out.println(quartzList); page.setList(quartzList); model.addAttribute("page", page); model.addAttribute("entry", sysQuartz); return "modules/sys/quartzList"; } @RequestMapping("addForm") public String addForm(SysQuartz sysQuartz,HttpServletRequest req, HttpServletResponse resp, Model model){ if(sysQuartz.getId()!=0){ sysQuartz=sysQuartzService.queryOne(String.valueOf(sysQuartz.getId())); } model.addAttribute("sysQuartz", sysQuartz); return "modules/sys/quartzForm"; } @RequestMapping("quartzSave") public String quartzSave(SysQuartz sysQuartz){ if(sysQuartz.getId()==0){ sysQuartzService.save(sysQuartz); }else{ sysQuartzService.updateById(sysQuartz); } refreshAll(); return "redirect:"+Global.getAdminPath()+BASE_URL; } @ResponseBody @RequestMapping("executeByHand") public String executeByHand(String id,HttpServletRequest req, HttpServletResponse resp, Model model){ Map<String,Object> params= WebUtils.getParametersStartingWith(req, null); try{ SysQuartz sysQuartz=sysQuartzService.queryOne(id); QuartzManager.runJob(sysQuartz.getJobName(), sysQuartz.getJobGroupName(),params); return "1"; }catch(Exception e){ log.error("quartz:executeByHand",e); return "2"; } } @RequestMapping("handParams") public String hdfsParams(String className, Model model){ model.addAttribute("className", className); return "modules/sys/quartzHandParams"; } @RequestMapping("/execute") public void excuteJob(String id) { Test4quartz test4quartz = test4quartzService.queryById(id); QuartzManager.addJob(test4quartz.getJobName(), test4quartz.getJobGroupName(), test4quartz.getTriggerName(), test4quartz.getTriggerGroupName(), test4quartz.getClassName(), test4quartz.getCornString()); } @ResponseBody @RequestMapping("/addAll") public String addAll() { // if("2".equals(allowStart)){ // return "2"; // } try{ List<SysQuartz> list = sysQuartzService.getAll(); for (SysQuartz entity : list) { String jobName = entity.getJobName(); String jobGroupName = entity.getJobGroupName(); String triggerName = entity.getTriggerName(); String triggerGroupName = entity.getTriggerGroupName(); String className = entity.getClassName(); String cornString = entity.getCornString(); QuartzManager.addJob(jobName, jobGroupName, triggerName, triggerGroupName, className, cornString); } return "1"; }catch(Exception e){ log.error("quartzAddAllError",e); return "2"; } } @RequestMapping("/add") public void addJob() { String id = "" + new Date(); String jobName = "job4"; String jobGroupName = "group4"; String triggerName = "trigger4"; String triggerGroupName = "group4"; String className = "cn.com.yitong.modules.quartz.job.Job2"; String cornString = "0/5 * * * * ?"; Test4quartz test4quartz = new Test4quartz(); test4quartz.setId(id); test4quartz.setJobName(jobName); test4quartz.setJobGroupName(jobGroupName); test4quartz.setTriggerName(triggerName); test4quartz.setTriggerGroupName(triggerGroupName); test4quartz.setClassName(className); test4quartz.setCornString(cornString); QuartzManager.addJob(jobName, jobGroupName, triggerName, triggerGroupName, className, cornString); test4quartzService.save(test4quartz); } @RequestMapping("/modify") public void modify(String id) { String cornString = "* * * * * ?"; Test4quartz test4quartz = test4quartzService.queryById(id); test4quartz.setCornString(cornString); QuartzManager.modifyJobTime(test4quartz.getJobName(), test4quartz.getJobGroupName(), test4quartz.getTriggerName(), test4quartz.getTriggerGroupName(), test4quartz.getCornString()); test4quartzService.save(test4quartz); } @RequestMapping("deleteAll") @ResponseBody public String deleteAll(){ // if("2".equals(allowStart)){ // return "2"; // } try{ List<SysQuartz> list = sysQuartzService.getAll(); for (SysQuartz entity : list) { String jobName = entity.getJobName(); String jobGroupName = entity.getJobGroupName(); String triggerName = entity.getTriggerName(); String triggerGroupName = entity.getTriggerGroupName(); QuartzManager.removeJob(jobName, jobGroupName, triggerName, triggerGroupName); } return "1"; }catch(Exception e){ log.error("quartzAddAllError",e); return "2"; } } @RequestMapping("refreshAll") @ResponseBody public String refreshAll(){ // if("2".equals(allowStart)){ // return "2"; // } try{ List<SysQuartz> list = sysQuartzService.getAll(); for (SysQuartz entity : list) { String jobName = entity.getJobName(); String jobGroupName = entity.getJobGroupName(); String triggerName = entity.getTriggerName(); String triggerGroupName = entity.getTriggerGroupName(); String className = entity.getClassName(); String cornString = entity.getCornString(); QuartzManager.removeJob(jobName, jobGroupName, triggerName, triggerGroupName); QuartzManager.addJob(jobName, jobGroupName, triggerName, triggerGroupName, className, cornString); } return "1"; }catch(Exception e){ log.error("quartzAddAllError",e); return "2"; } } @RequestMapping("/deleteOne") public void delete(String id) { Test4quartz test4quartz = test4quartzService.queryById(id); QuartzManager.removeJob(test4quartz.getJobName(), test4quartz.getJobGroupName(), test4quartz.getTriggerName(), test4quartz.getTriggerGroupName()); } @Override public void onApplicationEvent(ContextRefreshedEvent event) { // if("2".equals(allowStart)){ // return; // } if(event.getApplicationContext().getParent()==null){ addAll(); } } }
MyJobListener.java :负责就监听job的执行,功能暂时只有打印日志
package cn.com.yitong.modules.quartz.listener; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyJobListener implements JobListener { Logger log=LoggerFactory.getLogger(MyJobListener.class); @Override // 相当于为我们的监听器命名 public String getName() { return "myJobListener"; } @Override public void jobToBeExecuted(JobExecutionContext context) { log.debug("{}:{}:开始执行",getName(),context.getJobDetail().getJobClass()); } @Override // “否决JobDetail”是在Triiger被其相应的监听器监听时才具备的能力 public void jobExecutionVetoed(JobExecutionContext context) { log.debug("{}:{}:被否决",getName(),context.getJobDetail().getJobClass()); } @Override public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) { log.debug("{}:{}:执行结束",getName(),context.getJobDetail().getJobClass()); } }
MyTriggerListener.java :实现了分布式的锁。保证同时只有一台机器执行。基于redis
注* : setWithCas方法是自己的一个redis脚本,实际上就是保证了只有为空时才能上锁,要不然失败
package cn.com.yitong.modules.quartz.listener; import java.util.Map; import org.quartz.JobExecutionContext; import org.quartz.Trigger; import org.quartz.Trigger.CompletedExecutionInstruction; import org.quartz.TriggerListener; import org.springframework.util.StringUtils; import cn.onebank.dmb.client.util.StringUtil; import com.thinkgem.jeesite.common.utils.BallonUtil; public class MyTriggerListener implements TriggerListener { @Override public String getName() { return "myTriggerListener"; } @Override public void triggerFired(Trigger trigger, JobExecutionContext context) { } @Override public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) { Map<String,Object> paramsMap=context.getMergedJobDataMap(); String lockKey=String.format("%s:%s:lock", trigger.getJobKey().getGroup(),trigger.getJobKey().getName()); if(paramsMap.get("quartz_passFlagForQuartzJob")!=null){ return false; }else if(BallonUtil.getInstance().setWithCas(lockKey, null, "locked")){ BallonUtil.getInstance().expire(lockKey, 1800); return false; }else{ return true; } } @Override public void triggerMisfired(Trigger trigger) { } @Override public void triggerComplete(Trigger trigger, JobExecutionContext context, CompletedExecutionInstruction triggerInstructionCode) { } }