spring-batch 基于java config 配置方式

时间:2021-07-08 20:33:24

spring batch

  • 是一个数据批处理框架,应用场景有很多比如,数据迁移,大量数据处理,它可以支持处理过程中的监控等,可以把一个复杂的任务进行拆解,然后很方便的处理数据.处理状态和参数都可以存储到数据库中
  • 文档: https://docs.spring.io/spring-batch/trunk/reference/html/index.html

本文主要使用java config 的方式来配置spring btach

基础的配置

@Configuration
@EnableBatchProcessing
public class SpringBatchConfig extends DefaultBatchConfigurer {

/*
* 作业仓库,把job运行过程中产生的数据持久化到数据库
* HikariDataSource 数据源,多个数据源的时候指定按名称注入
*/

@Bean
public JobRepository jobRepository(@Qualifier("backUpDataSource") HikariDataSource dataSource, DataSourceTransactionManager dataSourceTransactionManager) throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(dataSourceTransactionManager);
jobRepositoryFactoryBean.setDatabaseType(DatabaseType.POSTGRES.name());
return jobRepositoryFactoryBean.getObject();
}

/**
* JobBuilderFactory
*
* @param jobRepository JobRepository
* @return JobBuilderFactory
*/

@Bean
JobBuilderFactory jobBuilderFactory(JobRepository jobRepository) {
return new JobBuilderFactory(jobRepository);
}

/**
* StepBuilderFactory
*
* @param jobRepository jobRepository
* @param dataSourceTransactionManager dataSourceTransactionManager
* @return stepBuilderFactory
*/

@Bean
StepBuilderFactory stepBuilderFactory(JobRepository jobRepository, DataSourceTransactionManager dataSourceTransactionManager) {
return new StepBuilderFactory(jobRepository, dataSourceTransactionManager);
}

/**
* 作业调度器
*/

@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
return jobLauncher;
}

/**
* 作业注册器
*/

@Bean
public MapJobRegistry mapJobRegistry() {
return new MapJobRegistry();
}


/*** JobRegistryBeanPostProcessor
*
* @param mapJobRegistry MapJobRegistry
* @return JobRegistryBeanPostProcessor
*/

@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(MapJobRegistry mapJobRegistry) {
JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
jobRegistryBeanPostProcessor.setJobRegistry(mapJobRegistry);
return jobRegistryBeanPostProcessor;
}

/**
* 作业线程池
*/

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(500);
threadPoolTaskExecutor.setKeepAliveSeconds(30000);
threadPoolTaskExecutor.setMaxPoolSize(1000);
threadPoolTaskExecutor.setQueueCapacity(1024);
return threadPoolTaskExecutor;
}

}

例如 每天定时把一个库的数据经过处理后存储到另一个库中

@Configuration
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
//mybatis mapper的路径,和sqlSessionFactoryRef
@MapperScan(value = "org.batch.mapper", sqlSessionFactoryRef = "backupSqlSessionFactory")
public class LxDeviceDataBatchConfig {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

//任务线程
@Autowired
@Qualifier("taskExecutor")
private ThreadPoolTaskExecutor taskExecutor;

//任务监听器,任务启动时,结束时的通知
@Autowired
private DataJobListener dataJobListener;


@Resource
private Environment environment;

//数据来源的配置
@Bean("dataFromSource")
public HikariDataSource dataFromSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName(getBackupDriver());
dataSource.setJdbcUrl(getBackupUrl());
dataSource.setUsername(getBackupUserName());
dataSource.setPassword(getBackupPassword());
dataSource.setConnectionTimeout(300000);
dataSource.setAutoCommit(true);
dataSource.setIdleTimeout(6000000);
dataSource.setMaxLifetime(1800000);
dataSource.setLeakDetectionThreshold(1000000);
dataSource.setMaximumPoolSize(10);
return dataSource;
}

//事务
@Bean("dataSourceTransactionManager2")
public DataSourceTransactionManager dataSourceTransactionManager2() throws NamingException {
return new DataSourceTransactionManager(dataFromSource());
}

//mybatis 的会话工厂
@Bean(name = "backupSqlSessionFactory")
public SqlSessionFactoryBean sqlSessionFactory() throws Exception {
SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(dataFromSource());
//mapper文件location
org.springframework.core.io.Resource[] resources =
new PathMatchingResourcePatternResolver().getResources(
"classpath*:org/back/**/*Mapper.xml");
sessionFactory.setMapperLocations(resources);
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
configuration.setMapUnderscoreToCamelCase(true);//驼峰转换
sessionFactory.setConfiguration(configuration);
return sessionFactory;
}

/**
* 一个flow 分为几个step
*
* @param flow 流程
* @return job
*/

@Bean
public Job job(Flow flow) {
return jobBuilderFactory.get("customerJob")
.listener(lxDeviceDataJobListener)
.incrementer(new RunIdIncrementer())
.start(flow)
.end()
.build();
}

/**
* 导入流程
*
* @param step1 步骤1
* @return flow
*/

@Bean
public Flow flow(Step step1) {
return new FlowBuilder<Flow>("flow").start(step1).build();
}

/**
* 步骤2的具体事宜
* 一个步骤 读数据,处理数据.写数据
* @param itemReader jdbcPagingItemReader
* @param step1Processor step1Processor
* @param myFileWriter myFileWriter
* @return Step
*/

@Bean
protected Step step1(MyBatisPagingItemReader itemReader, Step1Processor step1Processor, MyFileWriter myFileWriter) {
StepBuilder stepBuilder = stepBuilderFactory.get("step1");
SimpleStepBuilder simpleStepBuilder = stepBuilder.chunk(1000);
simpleStepBuilder.allowStartIfComplete(false);
simpleStepBuilder.throttleLimit(10);
simpleStepBuilder.reader(itemReader);
simpleStepBuilder.processor(step1Processor);
simpleStepBuilder.writer(myFileWriter);
simpleStepBuilder.taskExecutor(taskExecutor);
return simpleStepBuilder.build();
}

/**
*
* @param backupSqlSessionFactory
* @param startDate 自定义参数开始时间
* @param endDate 自定义参数结束时间时间
* @return
*/

@Bean
@StepScope
public MyBatisPagingItemReader itemReader(@Qualifier("backupSqlSessionFactory") SqlSessionFactory backupSqlSessionFactory, @Value("#{jobParameters[startDate]}") Date startDate, @Value("#{jobParameters[endDate]}") Date endDate) {
MyBatisPagingItemReader myBatisPagingItemReader = new MyBatisPagingItemReader();
myBatisPagingItemReader.setQueryId("org.batch.mapper.CustomerMapper.selectDataPage");//mapper 中分页查询数据方法的全限定名
myBatisPagingItemReader.setSqlSessionFactory(backupSqlSessionFactory);
myBatisPagingItemReader.setPageSize(1000);//一页多少条数据
Map<String, Date> parameterValues = new HashMap<String, Date>(20);
parameterValues.put("startDate", startDate);
parameterValues.put("endDate", endDate);
//自定义的参数
myBatisPagingItemReader.setParameterValues(parameterValues);
return myBatisPagingItemReader;
}
// 处理步骤
@Bean
Step1Processor step2Processor() {
return new Step1Processor();
}

//写步骤
@Bean
MyFileWriter myFileWriter() {
return new MyFileWriter();
}

public String getBackupDriver() {
return environment.getRequiredProperty("jdbc.postgres.driver");
}

public String getBackupUrl() {
return environment.getRequiredProperty("jdbc.postgres.url");
}

public String getBackupUserName() {
return environment.getRequiredProperty("jdbc.postgres.username");
}

public String getBackupPassword() {
return environment.getRequiredProperty("jdbc.postgres.password");
}
}

监听者

@Component
public class DataJobListener implements JobExecutionListener {

private static final Logger logger = LoggerFactory.getLogger(EveryDaySliceDataScheduler.class);

@Autowired
private JavaMailSender javaMailSender;

@Override
public void beforeJob(JobExecution jobExecution) {
//执行开始时执行
}

@Override
public void afterJob(JobExecution jobExecution) {
//执行结束后执行
}

处理者

public class Step1Processor implements ItemProcessor<DataDTO, MyFileWriteDTO> {

@Override
public MyFileWriteDTO process( DataDTO item) throws Exception {
//处理数据

}
}

写数据

public class MyFileWriter implements ItemWriter<MyFileWriteDTO>, InitializingBean {
@Override
public void write(List<? extends MyFileWriteDTO> items) {
// 如果是写文件的话
//会有并发问题需要自己控制并发

}

}

以上就是一个job的基本配置

启动时传参数
    @Autowired
private JobLauncher jobLauncher;
// 注入之前写的那个任务
@Autowired
private Job job;

@Scheduled(cron = "0 0 06 * * ?")
public void syncLxDeviceData() {
logger.info("data start ==[{}]" + System.currentTimeMillis());

Date startDate = new DateTime().minusDays(1).millisOfDay().withMinimumValue().toDate();//一天的 00:00:00
Date endDate = new DateTime().minusDays(1).millisOfDay().withMaximumValue().toDate(); //一天的 23:59:59
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addDate("startDate", startDate);
jobParametersBuilder.addDate("endDate", endDate);
JobParameters jobParameters = jobParametersBuilder.toJobParameters();
try {
jobLauncher.run(job, jobParameters);
} catch (JobExecutionAlreadyRunningException e) {
e.printStackTrace();
} catch (JobRestartException e) {
e.printStackTrace();
} catch (JobInstanceAlreadyCompleteException e) {
e.printStackTrace();
} catch (JobParametersInvalidException e) {
e.printStackTrace();
}
logger.info("data end ==[{}]" + System.currentTimeMillis());
}