基于redis的分布式quartz定时任务

时间:2023-02-15 07:50:07

首先说一下原因 :项目需要引入定时任务,初步选型之后决定使用quartz,因为需要修改执行时间。但是后台是分布式的,就引入了重复执行问题。quartz本身支持的分布式模式需要在数据库建11张表,经过审批之后被拒绝,那么只能基于redis自己去加锁,最后仍然没有做到完美,因为首先没有完全解耦,需要实现job接口。考虑采用注解替换,可以实现ApplicationContextAware并且基于spring的扫描来找到所有注解类。其次是没有去解析corn,解锁方式只是固定时间解锁,那么如果有低频率执行就不适合了,因为第二次执行时候还没有结果会导致失败。

但是暂时所有的业务都是每天跑批。基于现状,完成了基于redis和内存的分布式quartz

QuartzManager.java,实现基本的quartz API

package cn.com.yitong.modules.quartz.demo;

import java.util.Map;

import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.JobListener;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.TriggerListener;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.com.yitong.modules.quartz.listener.MyJobListener;
import cn.com.yitong.modules.quartz.listener.MyTriggerListener;

public class QuartzManager {
  private static Logger logger=LoggerFactory.getLogger(QuartzManager.class);
  private static JobFactory jobFactory = new JobFactory();
  private static SchedulerFactory schedulerFactory = new StdSchedulerFactory();
  static {
    JobListener myJobListener = new MyJobListener();
    TriggerListener myTriggerListener=new MyTriggerListener();
    try {
      schedulerFactory.getScheduler().getListenerManager().addTriggerListener(myTriggerListener);
      schedulerFactory.getScheduler().getListenerManager().addJobListener(myJobListener);
    } catch (SchedulerException e) {
      logger.error("quartz manager listener init error",e);
    }
  }
  public static JobFactory getJobFactory(){
    return jobFactory;
  }
  public static SchedulerFactory getSchedulerFactory(){
    return schedulerFactory;
  }
  public static void runJob(String jobName,String jobGroupName,Map<String,Object> params){
    try{
      Scheduler sched = schedulerFactory.getScheduler();
      JobDataMap jobDataMap=new JobDataMap();
      jobDataMap.put("quartz_passFlagForQuartzJob", "true");
      jobDataMap.putAll(params);
      sched.triggerJob(JobKey.jobKey(jobName, jobGroupName),jobDataMap);
      // 任务名,任务组,任务执行类
    }catch(Exception e){
      throw new RuntimeException(e);
    }
  }
  
  /**
   * @Description: 添加一个定时任务
   * 
   * @param jobName 任务名
   * @param jobGroupName 任务组名
   * @param triggerName 触发器名
   * @param triggerGroupName 触发器组名
   * @param jobClass 任务 s
   * @param cron 时间设置,参考quartz说明文档
   */
  @SuppressWarnings({"unchecked", "rawtypes"})
  public static void addJob(String jobName, String jobGroupName, String triggerName,
      String triggerGroupName, String className, String cron) {
    try {
      Class jobClass = Class.forName(className);
      Scheduler sched = schedulerFactory.getScheduler();
      sched.setJobFactory(jobFactory);
      // 任务名,任务组,任务执行类
      JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
      // 触发器
      TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
      // 触发器名,触发器组
      triggerBuilder.withIdentity(triggerName, triggerGroupName);
      triggerBuilder.startNow();
      // 触发器时间设定
      triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
      // 创建Trigger对象
      CronTrigger trigger = (CronTrigger) triggerBuilder.build();

      // 调度容器设置JobDetail和Trigger
      sched.scheduleJob(jobDetail, trigger);

      // 启动
      if (!sched.isShutdown()) {
        sched.start();
      }
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * @Description: 修改一个任务的触发时间
   * 
   * @param jobName
   * @param jobGroupName
   * @param triggerName 触发器名
   * @param triggerGroupName 触发器组名
   * @param cron 时间设置,参考quartz说明文档
   */
  public static void modifyJobTime(String jobName, String jobGroupName, String triggerName,
      String triggerGroupName, String cron) {
    try {
      Scheduler sched = schedulerFactory.getScheduler();
      sched.setJobFactory(jobFactory);
      TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
      CronTrigger trigger = (CronTrigger) sched.getTrigger(triggerKey);
      if (trigger == null) {
        return;
      }

      String oldTime = trigger.getCronExpression();
      if (!oldTime.equalsIgnoreCase(cron)) {
        /** 方式一 :调用 rescheduleJob 开始 */
        // 触发器
        TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
        // 触发器名,触发器组
        triggerBuilder.withIdentity(triggerName, triggerGroupName);
        triggerBuilder.startNow();
        // 触发器时间设定
        triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
        // 创建Trigger对象
        trigger = (CronTrigger) triggerBuilder.build();
        // 方式一 :修改一个任务的触发时间
        sched.rescheduleJob(triggerKey, trigger);
        /** 方式一 :调用 rescheduleJob 结束 */

        /** 方式二:先删除,然后在创建一个新的Job */
        // JobDetail jobDetail = sched.getJobDetail(JobKey.jobKey(jobName, jobGroupName));
        // Class<? extends Job> jobClass = jobDetail.getJobClass();
        // removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
        // addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass, cron);
        /** 方式二 :先删除,然后在创建一个新的Job */
      }
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * @Description: 移除一个任务
   * 
   * @param jobName
   * @param jobGroupName
   * @param triggerName
   * @param triggerGroupName
   */
  public static void removeJob(String jobName, String jobGroupName, String triggerName,
      String triggerGroupName) {
    try {
      Scheduler sched = schedulerFactory.getScheduler();
      sched.setJobFactory(jobFactory);
      TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);

      sched.pauseTrigger(triggerKey);// 停止触发器
      sched.unscheduleJob(triggerKey);// 移除触发器
      sched.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 删除任务
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * @Description:启动所有定时任务
   */
  public static void startJobs() {
    try {
      Scheduler sched = schedulerFactory.getScheduler();
      sched.setJobFactory(jobFactory);
      sched.start();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * @Description:关闭所有定时任务
   */
  public static void shutdownJobs() {
    try {
      Scheduler sched = schedulerFactory.getScheduler();
      sched.setJobFactory(jobFactory);
      if (!sched.isShutdown()) {
        sched.shutdown();
      }
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

JobFactory.java 为实现双容器的javabean,需要同时在spring和quartz中注册,这类需要在spring的xml中配置并且非延时加载。

package cn.com.yitong.modules.quartz.demo;

import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.AdaptableJobFactory;

public class JobFactory extends AdaptableJobFactory implements ApplicationContextAware {
  private static ApplicationContext ctx = null;

  protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
    AutowireCapableBeanFactory capableBeanFactory = ctx.getAutowireCapableBeanFactory();
    Object jobInstance = super.createJobInstance(bundle);
    capableBeanFactory.autowireBean(jobInstance);
    return jobInstance;
  }

  @Override
  public void setApplicationContext(ApplicationContext arg0) throws BeansException {
    ctx = arg0;

  }
}

与前端页面交互的Controller,这个大同小异,就是提供了一些配置项去在页面设置,满足业务需求

package cn.com.yitong.modules.quartz.demo;

import java.util.Date;
import java.util.List;
import java.util.Map;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.util.WebUtils;





import com.thinkgem.jeesite.common.config.Global;

import cn.com.yitong.common.persistence.Page;
import cn.com.yitong.common.service.mybatis.MybatisBaseService;
import cn.com.yitong.common.web.mybatis.MybatisBaseController;
import cn.com.yitong.modules.quartz.model.SysQuartz;
import cn.com.yitong.modules.quartz.model.Test4quartz;
import cn.com.yitong.modules.quartz.service.SysQuartzService;
import cn.com.yitong.modules.quartz.service.Test4quartzService;

@Controller
@RequestMapping("${adminPath}" + QuartzController.BASE_URL)
public class QuartzController implements ApplicationListener<ContextRefreshedEvent> {
  public static final String BASE_URL = "/quartz/demo/";
  private static final String BASE_PATH = "modules/sys/quartzList";
  private Logger log=LoggerFactory.getLogger(getClass());
  @Resource
  Test4quartzService test4quartzService;
  @Resource
  SysQuartzService sysQuartzService;
  @RequestMapping({"/list",""})
  public String quartzList(SysQuartz sysQuartz,HttpServletRequest req, HttpServletResponse resp, Model model){
    Map<String,Object> params= WebUtils.getParametersStartingWith(req, null);
    Page<SysQuartz> page = new Page<SysQuartz>(req, resp);
    params.put("page", page);
    List<SysQuartz> quartzList=sysQuartzService.quartzList(params);
    System.out.println(quartzList);
    page.setList(quartzList);
    model.addAttribute("page", page);
    model.addAttribute("entry", sysQuartz);
    return "modules/sys/quartzList";
  }
  @RequestMapping("addForm")
  public String addForm(SysQuartz sysQuartz,HttpServletRequest req, HttpServletResponse resp, Model model){
    if(sysQuartz.getId()!=0){
      sysQuartz=sysQuartzService.queryOne(String.valueOf(sysQuartz.getId()));
    }
    model.addAttribute("sysQuartz", sysQuartz);
    return "modules/sys/quartzForm";
  }
  @RequestMapping("quartzSave")
  public String quartzSave(SysQuartz sysQuartz){
    if(sysQuartz.getId()==0){
      sysQuartzService.save(sysQuartz);
    }else{
      sysQuartzService.updateById(sysQuartz);
    }
    refreshAll();
    return "redirect:"+Global.getAdminPath()+BASE_URL;
  }
  @ResponseBody
  @RequestMapping("executeByHand")
  public String executeByHand(String id,HttpServletRequest req, HttpServletResponse resp, Model model){
    Map<String,Object> params= WebUtils.getParametersStartingWith(req, null);
    
    try{
      SysQuartz sysQuartz=sysQuartzService.queryOne(id);
      QuartzManager.runJob(sysQuartz.getJobName(), sysQuartz.getJobGroupName(),params);
      return "1";
    }catch(Exception e){
      log.error("quartz:executeByHand",e);
      return "2";
    }
  }
  @RequestMapping("handParams")
  public String hdfsParams(String className, Model model){
    model.addAttribute("className", className);
    return "modules/sys/quartzHandParams";
  }
  @RequestMapping("/execute")
  public void excuteJob(String id) {
    Test4quartz test4quartz = test4quartzService.queryById(id);
    QuartzManager.addJob(test4quartz.getJobName(), test4quartz.getJobGroupName(),
        test4quartz.getTriggerName(), test4quartz.getTriggerGroupName(), test4quartz.getClassName(),
        test4quartz.getCornString());
  }
  @ResponseBody
  @RequestMapping("/addAll")
  public String addAll() {
//    if("2".equals(allowStart)){
//      return "2";
//    }
    try{
      List<SysQuartz> list = sysQuartzService.getAll();
    
    for (SysQuartz entity : list) {
      String jobName = entity.getJobName();
      String jobGroupName = entity.getJobGroupName();
      String triggerName = entity.getTriggerName();
      String triggerGroupName = entity.getTriggerGroupName();
      String className = entity.getClassName();
      String cornString = entity.getCornString();
      QuartzManager.addJob(jobName, jobGroupName, triggerName, triggerGroupName, className,
          cornString);
    }
    return "1";
    }catch(Exception e){
      log.error("quartzAddAllError",e);
      return "2";
    }
  }
  
  @RequestMapping("/add")
  public void addJob() {
    String id = "" + new Date();
    String jobName = "job4";
    String jobGroupName = "group4";
    String triggerName = "trigger4";
    String triggerGroupName = "group4";
    String className = "cn.com.yitong.modules.quartz.job.Job2";
    String cornString = "0/5 * * * * ?";
    Test4quartz test4quartz = new Test4quartz();
    test4quartz.setId(id);
    test4quartz.setJobName(jobName);
    test4quartz.setJobGroupName(jobGroupName);
    test4quartz.setTriggerName(triggerName);
    test4quartz.setTriggerGroupName(triggerGroupName);
    test4quartz.setClassName(className);
    test4quartz.setCornString(cornString);
    QuartzManager.addJob(jobName, jobGroupName, triggerName, triggerGroupName, className,
        cornString);
    test4quartzService.save(test4quartz);
  }

  @RequestMapping("/modify")
  public void modify(String id) {
    String cornString = "* * * * * ?";
    Test4quartz test4quartz = test4quartzService.queryById(id);
    test4quartz.setCornString(cornString);
    QuartzManager.modifyJobTime(test4quartz.getJobName(), test4quartz.getJobGroupName(),
        test4quartz.getTriggerName(), test4quartz.getTriggerGroupName(),
        test4quartz.getCornString());
    test4quartzService.save(test4quartz);
  }
  @RequestMapping("deleteAll")
  @ResponseBody
  public String deleteAll(){
//    if("2".equals(allowStart)){
//      return "2";
//    }
    try{
      List<SysQuartz> list = sysQuartzService.getAll();
    
    for (SysQuartz entity : list) {
      String jobName = entity.getJobName();
      String jobGroupName = entity.getJobGroupName();
      String triggerName = entity.getTriggerName();
      String triggerGroupName = entity.getTriggerGroupName();
      QuartzManager.removeJob(jobName, jobGroupName,
          triggerName, triggerGroupName);
    }
    return "1";
    }catch(Exception e){
      log.error("quartzAddAllError",e);
      return "2";
    }
  }
  @RequestMapping("refreshAll")
  @ResponseBody
  public String refreshAll(){
//    if("2".equals(allowStart)){
//      return "2";
//    }
    try{
      List<SysQuartz> list = sysQuartzService.getAll();
    
    for (SysQuartz entity : list) {
      String jobName = entity.getJobName();
      String jobGroupName = entity.getJobGroupName();
      String triggerName = entity.getTriggerName();
      String triggerGroupName = entity.getTriggerGroupName();
      String className = entity.getClassName();
      String cornString = entity.getCornString();
      QuartzManager.removeJob(jobName, jobGroupName,
          triggerName, triggerGroupName);
      QuartzManager.addJob(jobName, jobGroupName, triggerName, triggerGroupName, className,
          cornString);
    }
    return "1";
    }catch(Exception e){
      log.error("quartzAddAllError",e);
      return "2";
    }
  }
  @RequestMapping("/deleteOne")
  public void delete(String id) {
    Test4quartz test4quartz = test4quartzService.queryById(id);
    QuartzManager.removeJob(test4quartz.getJobName(), test4quartz.getJobGroupName(),
        test4quartz.getTriggerName(), test4quartz.getTriggerGroupName());
  }
  @Override
  public void onApplicationEvent(ContextRefreshedEvent event) {
//    if("2".equals(allowStart)){
//      return;
//    }
    if(event.getApplicationContext().getParent()==null){
      addAll();
    }
    
  }
 
}

MyJobListener.java  :负责就监听job的执行,功能暂时只有打印日志

package cn.com.yitong.modules.quartz.listener;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyJobListener implements JobListener {
  Logger log=LoggerFactory.getLogger(MyJobListener.class);
  @Override // 相当于为我们的监听器命名
  public String getName() {
    return "myJobListener";
  }

  @Override
  public void jobToBeExecuted(JobExecutionContext context) {
    log.debug("{}:{}:开始执行",getName(),context.getJobDetail().getJobClass());
  }

  @Override // “否决JobDetail”是在Triiger被其相应的监听器监听时才具备的能力
  public void jobExecutionVetoed(JobExecutionContext context) {
    log.debug("{}:{}:被否决",getName(),context.getJobDetail().getJobClass());
  }

  @Override
  public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
    log.debug("{}:{}:执行结束",getName(),context.getJobDetail().getJobClass());

  }

}

MyTriggerListener.java :实现了分布式的锁。保证同时只有一台机器执行。基于redis

注* : setWithCas方法是自己的一个redis脚本,实际上就是保证了只有为空时才能上锁,要不然失败

package cn.com.yitong.modules.quartz.listener;

import java.util.Map;

import org.quartz.JobExecutionContext;
import org.quartz.Trigger;
import org.quartz.Trigger.CompletedExecutionInstruction;
import org.quartz.TriggerListener;
import org.springframework.util.StringUtils;

import cn.onebank.dmb.client.util.StringUtil;

import com.thinkgem.jeesite.common.utils.BallonUtil;

public class MyTriggerListener implements TriggerListener {

  @Override
  public String getName() {
    return "myTriggerListener";
  }

  @Override
  public void triggerFired(Trigger trigger, JobExecutionContext context) {
    
  }

  @Override
  public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
    Map<String,Object> paramsMap=context.getMergedJobDataMap();
    String lockKey=String.format("%s:%s:lock", trigger.getJobKey().getGroup(),trigger.getJobKey().getName());
    if(paramsMap.get("quartz_passFlagForQuartzJob")!=null){
      return false;
    }else if(BallonUtil.getInstance().setWithCas(lockKey, null, "locked")){
      BallonUtil.getInstance().expire(lockKey, 1800);
      return false;
    }else{
      return true;
    }
  }

  @Override
  public void triggerMisfired(Trigger trigger) {
    
  }

  @Override
  public void triggerComplete(Trigger trigger, JobExecutionContext context,
      CompletedExecutionInstruction triggerInstructionCode) {
    
  }

}