spring+quartz定时任务调度实现集群环境下的整合

时间:2021-08-02 07:58:00

由于在项目中用到了定时任务调度,所以就是用了spring+quartz进行整合,但是开发完之后,发现对于集群的需求没有考虑到。由于是第一次配合,所以网上找了一些资料,但是在实际配置中却发现了不少问题,于是自己边改边试,最终通过。下面附上部分代码。

1.配置applicationContext_job.xml文件
applicationContext_job.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans default-lazy-init="true"
xmlns="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd"
>

<!-- 此处忽略其他代码 -->
<!-- quartz 集群配置 系统中涉及了多个任务调度-->
<bean id="Job" class="com.core.quartz.Task"></bean>
<bean id="Job1" class="com.core.quartz.Task1"></bean>
<bean id="Job2" class="com.core.quartz.Task2"></bean>

<bean id="JobTask" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="durability" value="true" />
<property name="requestsRecovery" value="true" />
<property name="jobClass" value="com.core.quartz.MyDetailQuartzJobBean" />
<property name="jobDataAsMap">
<map>
<entry key="targetObject" value="Job" />
<entry key="targetMethod" value="execute" />
</map>
</property>
</bean>
<bean id="JobTask1" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="durability" value="true" />
<property name="requestsRecovery" value="true" />
<property name="jobClass" value="com.core.quartz.MyDetailQuartzJobBean" />
<property name="jobDataAsMap">
<map>
<entry key="targetObject" value="Job1" />
<entry key="targetMethod" value="execute" />
</map>
</property>
</bean>
<bean id="JobTask2" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="durability" value="true" />
<property name="requestsRecovery" value="true" />
<property name="jobClass" value="com.core.quartz.MyDetailQuartzJobBean" />
<property name="jobDataAsMap">
<map>
<entry key="targetObject" value="Job2" />
<entry key="targetMethod" value="execute" />
</map>
</property>
</bean>

<bean id="doTime" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="JobTask" />
<property name="cronExpression" value="0 0/10 * * * ?"/>
</bean>
<bean id="doTime1" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="JobTask1" />
<property name="cronExpression" value="0 0/5 * * * ?"/>
</bean>
<bean id="doTime2" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="JobTask2" />
<property name="cronExpression" value="0 0/3 * * * ?"/>
</bean>

<!-- 总管理类 如果将lazy-init='false'那么容器启动就会执行调度程序 -->
<bean id="startQuertz" lazy-init="false" autowire="no" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="configLocation" value="classpath:config/quartz.properties" />
<property name="overwriteExistingJobs" value="true" />
<property name="dataSource" ref="dataSource" />
<property name="triggers">
<list>
<ref bean="doTime"/>
<ref bean="doTime1"/>
<ref bean="doTime2"/>
</list>
</property>
<!-- 就是下面这句,因为该 bean 只能使用类反射来重构 -->
<property name="applicationContextSchedulerContextKey" value="applicationContext" />
</bean>
</beans>

2.这个文件名称不可更改,将文件路径放置在src/config目录下,内容如下:
quartz.properties

#============================================================== 
#Configure Main Scheduler Properties
#==============================================================
#配置集群时,quartz调度器的id,由于配置集群时,只有一个调度器,必须保证每个服务器该值都相同
org.quartz.scheduler.instanceName=quartzScheduler
#集群中每台服务器自己的id,AUTO表示自动生成,无需修改
org.quartz.scheduler.instanceId=AUTO

#==============================================================
#Configure ThreadPool
#==============================================================
#quartz线程池的实现类,无需修改
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
#quartz线程池中线程数,可根据任务数量和负责度来调整
org.quartz.threadPool.threadCount=5
#quartz线程优先级
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true

#==============================================================
#Configure JobStore
#==============================================================
#实现集群时,任务的存储实现方式,org.quartz.impl.jdbcjobstore.JobStoreTX表示数据库存储,无需修改
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#quartz存储任务相关数据的表的前缀,无需修改
org.quartz.jobStore.tablePrefix=QRTZ_
#是否启用集群,启用,改为true,注意:启用集群后,必须配置下面的数据源,否则quartz调度器会初始化失败
org.quartz.jobStore.isClustered=true
#集群中服务器相互检测间隔,每台服务器都会按照下面配置的时间间隔往服务器中更新自己的状态,如果某台服务器超过以下时间没有checkin,调度器就会认为该台服务器已经down掉,不会再分配任务给该台服务器
org.quartz.jobStore.clusterCheckinInterval = 20000
#连接数据库数据源名称,与下面配置中org.quartz.dataSource.myDS的myDS一致即可,可以无需修改
org.quartz.jobStore.dataSource=demo

#==============================================================
#Configure DataSource (此处填你自己的数据库连接信息)
#==============================================================
org.quartz.dataSource.myDS.driver=com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL=jdbc\:mysql\://127.0.0.1\:3306/demo?useUnicode\=true&autoReconnect\=true&characterEncoding\=UTF-8&zeroDateTimeBehavior\=convertToNull&allowMultiQueries\=true
org.quartz.dataSource.myDS.user=root
org.quartz.dataSource.myDS.password=1234
org.quartz.dataSource.myDS.maxConnections=10

3.重写 quartz 的 QuartzJobBean 类
原因是在使用 quartz+spring 把 quartz 的 task 实例化进入数据库时,会产生: serializable 的错误,原因在于:
这个 MethodInvokingJobDetailFactoryBean 类中的 methodInvoking 方法,是不支持序列化的,因此在把 QUARTZ 的 TASK 序列化进入数据库时就会抛错。网上有说把 SPRING 源码拿来,修改一下这个方案,然后再打包成 SPRING.jar 发布,这些都是不好的方法,是不安全的。
必须根据 QuartzJobBean 来重写一个自己的类,然后使用 SPRING 把这个重写的类(我们就名命它为: MyDetailQuartzJobBean )注入 appContext 中后,再使用 AOP 技术反射出原有的 quartzJobx( 就是开发人员原来已经做好的用于执行 QUARTZ 的 JOB 的执行类 ) 。我们这个主要是通过反射机制调用实际要执行的操作方法,所以,调用的时候要符合反射机制的规范。

MyDetailQuartzJobBean.java

package com.core.quartz;

import java.lang.reflect.Method;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;

public class MyDetailQuartzJobBean extends QuartzJobBean {
private static Logger logger = LoggerFactory
.getLogger(MyDetailQuartzJobBean.class);

private String targetObject;
private String targetMethod;
private ApplicationContext ctx;

@Override
protected void executeInternal(JobExecutionContext context)
throws JobExecutionException {
try {
Object otargetObject = ctx.getBean(targetObject);
Method m = null;
try {
m = otargetObject.getClass().getMethod(targetMethod);
m.invoke(otargetObject);
} catch (SecurityException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
} catch (Exception e) {
throw new JobExecutionException(e);
}
}

public void setApplicationContext(ApplicationContext applicationContext) {
this.ctx = applicationContext;
}

public void setTargetObject(String targetObject) {
this.targetObject = targetObject;
}

public void setTargetMethod(String targetMethod) {
this.targetMethod = targetMethod;
}
}

4.执行类

Task.java

package com.core.quartz;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import com.core.service.task.TaskService;

public class Task{
private final static Logger log = LoggerFactory.getLogger(Task.class);
@Autowired
private TaskService TaskService;

public void execute(){
......
}

}

5.创建存储quartz相关数据表
下载quartz-2.2.3包,解压,在quartz-2.2.3-distribution/quartz-2.2.3/docs/dbTables中有各个数据库的SQL运行脚本文件,我们只需要打开相应的数据库运行相应的脚本文件就可以在数据库中创建存储quartz相关的数据表了。

至此大功告成!

quartz集群与不集群之间就是配置的不同。

在部署到生产环境时出现问题,查看日志,发现报了一个异常

org.quartz.JobPersistenceException: Couldn't acquire next trigger: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'OPTION SQL_SELECT_LIMIT=1' at line 1
at org.quartz.impl.jdbcjobstore.JobStoreSupport.acquireNextTrigger(JobStoreSupport.java:2848)
at org.quartz.impl.jdbcjobstore.JobStoreSupport$40.execute(JobStoreSupport.java:2759)
at org.quartz.impl.jdbcjobstore.JobStoreSupport$40.execute(JobStoreSupport.java:2757)
at org.quartz.impl.jdbcjobstore.JobStoreSupport.executeInNonManagedTXLock(JobStoreSupport.java:3787)
at org.quartz.impl.jdbcjobstore.JobStoreSupport.acquireNextTriggers(JobStoreSupport.java:2756)
at org.quartz.core.QuartzSchedulerThread.run(QuartzSchedulerThread.java:272)
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'OPTION SQL_SELECT_LIMIT=1' at line 1
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
at com.mysql.jdbc.Util.getInstance(Util.java:386)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1052)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3609)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3541)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2002)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2618)
at com.mysql.jdbc.StatementImpl.executeSimpleNonQuery(StatementImpl.java:1644)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2280)
at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:2714)
at com.alibaba.druid.filter.FilterEventAdapter.preparedStatement_executeQuery(FilterEventAdapter.java:465)
at com.alibaba.druid.filter.FilterChainImpl.preparedStatement_executeQuery(FilterChainImpl.java:2711)
at com.alibaba.druid.proxy.jdbc.PreparedStatementProxyImpl.executeQuery(PreparedStatementProxyImpl.java:132)
at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeQuery(DruidPooledPreparedStatement.java:227)
at org.quartz.impl.jdbcjobstore.StdJDBCDelegate.selectTriggerToAcquire(StdJDBCDelegate.java:2613)
at org.quartz.impl.jdbcjobstore.JobStoreSupport.acquireNextTrigger(JobStoreSupport.java:2800)
... 5 common frames omitted

查找原因,发现是由于mysql的驱动版本问题。由于测试环境是5.6的版本,而生产环境是5.7的版本。更新mysql驱动jar包后,重新部署,问题解决。

参考文章:http://blog.csdn.net/l1028386804/article/details/49150267