ElasticJob 使用场景

时间:2024-04-09 08:36:01

ElasticJob 使用场景


使用场景介绍

Quartz 定时任务使用普遍多,其中最常见Spring-Quartz,Spring框架针对Quartz进行整合,其中核心组件包括(Scheduler、Trigger、JobDetail),可配置属性文件、应用注解
开启执行任务,使用简便、易上手,适用于单点、小型企业架构,不支持负载均衡

ElasticJob 当当网使用的任务调度框架,支持并行调度,支持分布式调度协调,支持弹性扩容,是一种分布式调度解决方案

优势如下:

  • 分布式调度
    Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能
  • 作业高可用
    Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行
  • 最大限度利用资源
    Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项
    例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量
  • 作业类型分类
  • SimpleJob:简单作业,只执行一次,适合小数据量的简单单次执行。
    DataflowJob:流式作业,适合大数据量的批次执行。分为fetchData和processData,当fetchData不为空,会进行下一次取数,直到fetchData为空则本次作业结束,下个触发时间点重新开始

架构图如下
ElasticJob 使用场景

使用过程

Maven
//引入elastic-job-lite核心模块
< dependency>
< groupId>io.elasticjob</ groupId>
< artifactId>elastic-job-lite-core</ artifactId>
< version>2.0.5</ version>
</ dependency>

//使用springframework自定义命名空间时引入
< dependency>
< groupId>io.elasticjob</ groupId>
< artifactId>elastic-job-lite-spring</ artifactId>
< version>2.0.5</ version>
</ dependency>

SimpleJob Demo 如下

public class MyElasticJob implements SimpleJob {

@Override
public void execute(ShardingContext context) {
    switch (context.getShardingItem()) {
        case 0: 
            // do something by sharding item 0
            break;
        case 1: 
            // do something by sharding item 1
            break;
        case 2: 
            // do something by sharding item 2
            break;
        // case n: ...
    }
}

}

属性配置

<?xml version="1.0" encoding="UTF-8"?>

< beans xmlns=“http://www.springframework.org/schema/beans
xmlns:xsi=“http://www.w3.org/2001/XMLSchema-instance
xmlns:reg=“http://www.dangdang.com/schema/ddframe/reg
xmlns:job=“http://www.dangdang.com/schema/ddframe/job
xsi:schemaLocation=“http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd”>

< reg:zookeeper id=“regCenter” server-lists=“地址:2181” namespace=“dd-job” base-sleep-time-milliseconds=“1000” max-sleep-time-milliseconds=“3000” max-retries=“3” />

<!-- 配置作业-->
<job:simple id="demoJob" class="Demo.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />

</ beans>

DataflowJob Demo如下

public class DataflowJobTest implements DataflowJob< DemoVo>{

/**
 * (非 Javadoc)
* <p>Title: fetchData</p>
* <p>Description: fetchData</p>
* @param shardingContext
* @return
* @see 
 */
@Override
public List<DemoVo> fetchData(ShardingContext shardingContext) {
     
    logger.info("@[email protected]分片项: "+shardingContext.getShardingItem()+"线程ID: "+Thread.currentThread().getId());
    System.out.println("第"+shardingContext.getShardingItem()+"片执行!");
    TestVo jobVo = new TestVo();
     
    //sql取模
    jobVo.setShardingTotalCount(shardingContext.getShardingTotalCount());
    jobVo.setShardingItem(shardingContext.getShardingItem());
    jobVo.setBatchCount(8000);
    List<TestVo> jobVoList = jobBiz.findJobByCondition(jobVo);
    //SELECT * FROM t_job_detail_new WHERE status = 1 AND MOD(id, #{shardingTotalCount}) = #{shardingItem} Limit 0, #{batchCount}
    return jobVoList;
     
    //逻辑取模
    /*Page page = new Page();
    page.setCurrentPage(1);
    page.setPageSize(160000);
    List<DemoVo> jobVoList = jobBiz.findTestJobByPage(demoVo, page);
    List<DemoVo> jobResult = new ArrayList<DemoVo>();
    if(jobVoList != null){
        for(DemoVojobVoItem : jobVoList){
            if(Integer.parseInt(jobVoItem.getId()) % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()){
                jobResult.add(jobVoItem);
            }
        }
    }
    return jobResult;*/
}
 
/**
 * (非 Javadoc)
* <p>Title: processData</p>
* <p>Description: processData</p>
* @param shardingContext
* @param data
* @see com.dangdang.ddframe.job.api.dataflow.DataflowJob#processData(com.dangdang.ddframe.job.api.ShardingContext, java.util.List)
 */
@Override
public void processData(ShardingContext shardingContext, List<DemoVo> data) {
    logger.debug("@[email protected]"+shardingContext.getShardingItem()+"processDataStart ===============================");
    logger.info("@[email protected]分片项: "+shardingContext.getShardingItem()+"线程ID: "+Thread.currentThread().getId());
    for(DemoVo jobVo : data){
        logger.debug("@[email protected]"+shardingContext.getShardingItem()+"processedData"+jobVo.getName());
        jobVo.setStatus(3);
        jobBiz.updateJob(jobVo);
    }
    logger.debug("@[email protected]"+shardingContext.getShardingItem()+"processDataEnd ===============================");
}

}

参数详解:
id:在Spring容器中的主键
class:作业实现类,需实现ElasticJob接口,脚本类型作业不需要配置
registry-center-ref:注册中心Bean的引用,需引用reg:zookeeper的声明
cron:cron表达式,用于配置作业触发时间
sharding-total-count:分片***和参数用等号分隔,多个键值对用逗号分隔,分片***从0开始,不可大于或者等于作业分片总数,如:0=A,1=B,2=C
overwrite:本地配置是否可覆盖注册中心配置,如果可覆盖,每次启动作业以本地配置为准,默认:false
job-parameter:作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业,例如:每次获取的数据量、作业实例从数据库读取的主键等
streaming-process:是否流式处理数据,如果流式处理数据,则fetchData不返回空结果将持续执行作业,如果非流式处理数据, 则处理数据完成后作业结束
description:作业描述信息
monitor-execution:监控作业运行时状态,默认:true
monitor-port:作业监控端口,默认:-1(不监控)
max-time-diff-seconds:最大允许的本机与注册中心的时间误差秒数,默认:-1(不校验时间误差)
failover:是否开启失效转移,仅monitorExecution开启,失效转移才有效,默认:false
misfire:是否开启错过任务重新执行,默认:true
job-sharding-strategy-class:作业分片策略实现类全路径,默认使用平均分配策略,默认:true
disabled:作业是否禁止启动,可用于部署作业时,先禁止启动,部署结束后统一启动,默认:false
jobProperties:作业订制化属性,目前支持job_exception_handler和executor_service_handler,用于扩展异常处理和自定义作业处理线程池
event-trace-rdb-data-source:作业事件追踪的数据源Bean引用
0 */1 * * * ?

作者简介:张程 技术研究

更多文章请关注微信公众号:zachary分解狮 (frankly0423)
ElasticJob 使用场景