ElasticJob 使用场景
使用场景介绍
Quartz 定时任务使用普遍多,其中最常见Spring-Quartz,Spring框架针对Quartz进行整合,其中核心组件包括(Scheduler、Trigger、JobDetail),可配置属性文件、应用注解
开启执行任务,使用简便、易上手,适用于单点、小型企业架构,不支持负载均衡
ElasticJob 当当网使用的任务调度框架,支持并行调度,支持分布式调度协调,支持弹性扩容,是一种分布式调度解决方案
优势如下:
- 分布式调度
Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能 - 作业高可用
Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行 - 最大限度利用资源
Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项
例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量 - 作业类型分类
-
SimpleJob:简单作业,只执行一次,适合小数据量的简单单次执行。
DataflowJob:流式作业,适合大数据量的批次执行。分为fetchData和processData,当fetchData不为空,会进行下一次取数,直到fetchData为空则本次作业结束,下个触发时间点重新开始
架构图如下
使用过程
Maven
//引入elastic-job-lite核心模块
< dependency>
< groupId>io.elasticjob</ groupId>
< artifactId>elastic-job-lite-core</ artifactId>
< version>2.0.5</ version>
</ dependency>
//使用springframework自定义命名空间时引入
< dependency>
< groupId>io.elasticjob</ groupId>
< artifactId>elastic-job-lite-spring</ artifactId>
< version>2.0.5</ version>
</ dependency>
SimpleJob Demo 如下
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
属性配置
<?xml version="1.0" encoding="UTF-8"?>< beans xmlns=“http://www.springframework.org/schema/beans”
xmlns:xsi=“http://www.w3.org/2001/XMLSchema-instance”
xmlns:reg=“http://www.dangdang.com/schema/ddframe/reg”
xmlns:job=“http://www.dangdang.com/schema/ddframe/job”
xsi:schemaLocation=“http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd”>
< reg:zookeeper id=“regCenter” server-lists=“地址:2181” namespace=“dd-job” base-sleep-time-milliseconds=“1000” max-sleep-time-milliseconds=“3000” max-retries=“3” />
<!-- 配置作业-->
<job:simple id="demoJob" class="Demo.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
</ beans>
DataflowJob Demo如下
public class DataflowJobTest implements DataflowJob< DemoVo>{
/**
* (非 Javadoc)
* <p>Title: fetchData</p>
* <p>Description: fetchData</p>
* @param shardingContext
* @return
* @see
*/
@Override
public List<DemoVo> fetchData(ShardingContext shardingContext) {
logger.info("@[email protected]分片项: "+shardingContext.getShardingItem()+"线程ID: "+Thread.currentThread().getId());
System.out.println("第"+shardingContext.getShardingItem()+"片执行!");
TestVo jobVo = new TestVo();
//sql取模
jobVo.setShardingTotalCount(shardingContext.getShardingTotalCount());
jobVo.setShardingItem(shardingContext.getShardingItem());
jobVo.setBatchCount(8000);
List<TestVo> jobVoList = jobBiz.findJobByCondition(jobVo);
//SELECT * FROM t_job_detail_new WHERE status = 1 AND MOD(id, #{shardingTotalCount}) = #{shardingItem} Limit 0, #{batchCount}
return jobVoList;
//逻辑取模
/*Page page = new Page();
page.setCurrentPage(1);
page.setPageSize(160000);
List<DemoVo> jobVoList = jobBiz.findTestJobByPage(demoVo, page);
List<DemoVo> jobResult = new ArrayList<DemoVo>();
if(jobVoList != null){
for(DemoVojobVoItem : jobVoList){
if(Integer.parseInt(jobVoItem.getId()) % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()){
jobResult.add(jobVoItem);
}
}
}
return jobResult;*/
}
/**
* (非 Javadoc)
* <p>Title: processData</p>
* <p>Description: processData</p>
* @param shardingContext
* @param data
* @see com.dangdang.ddframe.job.api.dataflow.DataflowJob#processData(com.dangdang.ddframe.job.api.ShardingContext, java.util.List)
*/
@Override
public void processData(ShardingContext shardingContext, List<DemoVo> data) {
logger.debug("@[email protected]"+shardingContext.getShardingItem()+"processDataStart ===============================");
logger.info("@[email protected]分片项: "+shardingContext.getShardingItem()+"线程ID: "+Thread.currentThread().getId());
for(DemoVo jobVo : data){
logger.debug("@[email protected]"+shardingContext.getShardingItem()+"processedData"+jobVo.getName());
jobVo.setStatus(3);
jobBiz.updateJob(jobVo);
}
logger.debug("@[email protected]"+shardingContext.getShardingItem()+"processDataEnd ===============================");
}
}
参数详解:
id:在Spring容器中的主键
class:作业实现类,需实现ElasticJob接口,脚本类型作业不需要配置
registry-center-ref:注册中心Bean的引用,需引用reg:zookeeper的声明
cron:cron表达式,用于配置作业触发时间
sharding-total-count:分片***和参数用等号分隔,多个键值对用逗号分隔,分片***从0开始,不可大于或者等于作业分片总数,如:0=A,1=B,2=C
overwrite:本地配置是否可覆盖注册中心配置,如果可覆盖,每次启动作业以本地配置为准,默认:false
job-parameter:作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业,例如:每次获取的数据量、作业实例从数据库读取的主键等
streaming-process:是否流式处理数据,如果流式处理数据,则fetchData不返回空结果将持续执行作业,如果非流式处理数据, 则处理数据完成后作业结束
description:作业描述信息
monitor-execution:监控作业运行时状态,默认:true
monitor-port:作业监控端口,默认:-1(不监控)
max-time-diff-seconds:最大允许的本机与注册中心的时间误差秒数,默认:-1(不校验时间误差)
failover:是否开启失效转移,仅monitorExecution开启,失效转移才有效,默认:false
misfire:是否开启错过任务重新执行,默认:true
job-sharding-strategy-class:作业分片策略实现类全路径,默认使用平均分配策略,默认:true
disabled:作业是否禁止启动,可用于部署作业时,先禁止启动,部署结束后统一启动,默认:false
jobProperties:作业订制化属性,目前支持job_exception_handler和executor_service_handler,用于扩展异常处理和自定义作业处理线程池
event-trace-rdb-data-source:作业事件追踪的数据源Bean引用
0 */1 * * * ?
作者简介:张程 技术研究
更多文章请关注微信公众号:zachary分解狮 (frankly0423)