spring batch + spring boot 配置

时间:2024-11-17 07:20:50

1. spring batch 批处理配置

import ;

import org.;
import org.;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;

/**
 * spring batch 配置
 * @author 
 */
@Configuration
@EnableBatchProcessing
public class BlackListBatchConfiguration {
	
	
	private static final Logger logger = ();
    /**
     * 读取外部文件方法
     * @return
     * @throws IOException 
     */
	@Bean
	@StepScope
    public ItemReader<BlackListDO> reader(@Value("#{jobParameters[inputFileBlack]}") String inputFile) throws IOException { 
		("inputFile:"+new ClassPathResource(inputFile).getURL().getPath());
		if(inputFile == null){
			("The blacklist reader file is null");
			return null;
		}
        FlatFileItemReader<BlackListDO> reader = new FlatFileItemReader<BlackListDO>();
       
        (new ClassPathResource(inputFile));
        
        (lineMapper());
        
        (1);
        
        (());
        
        return reader;
    }
	
	
    /**
     * 读取文本行映射POJO
     * @return
     */
    @Bean
    @StepScope
    public LineMapper<BlackListDO> lineMapper() {
    	DefaultLineMapper<BlackListDO> lineMapper = new DefaultLineMapper<BlackListDO>();
    	DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
    	(",");
    	(false);
    	(new String[] { "type","value","fraudType"});
    	
    	BeanWrapperFieldSetMapper<BlackListDO> fieldSetMapper = new BeanWrapperFieldSetMapper<BlackListDO>();
    	();
    	(lineTokenizer);
    	(new BlackListFieldSetMapper());
    	return lineMapper;
    }

    /**
     * 处理过程
     * @return
     */
    @Bean
    @StepScope
    public ItemProcessor<BlackListDO, BlackListDO> processor(@Value("#{jobParameters[inputFileBlack]}") String inputFile) {
        return new BlackListDOItemProcessor(inputFile);
    }

    /**
     * 写出内容
     * @return
     */
    @Bean
    @StepScope
    public ItemWriter<BlackListDO> writer() {
        return new BlackListItemWriter();
    }

    /**
     * 构建job
     * @param jobs
     * @param s1 
     * @param listener
     * @return
     */
    @Bean
    public Job importFileJob(JobBuilderFactory jobs, Step step1,JobExecutionListener listener,JobRepository jobRepository) {
    	return ("importFileJob")
                .incrementer(new RunIdIncrementer())
                .repository(jobRepository)
                .listener(listener)
                .flow(step1)
                .end()
                .build();
    }
    
    /**
     * 声明step
     * @param stepBuilderFactory 
     * @param reader 
     * @param writer
     * @param processor
     * @return
     */
    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<BlackListDO> reader,
            ItemWriter<BlackListDO> writer, ItemProcessor<BlackListDO, BlackListDO> processor,PlatformTransactionManager transactionManager) {
    	("step1");
    	return ("step1")
    			.<BlackListDO, BlackListDO> chunk(500)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .faultTolerant()
        		.retry()   // 重试
        		.noRetry()
        		.retryLimit(1)           //每条记录重试一次
        		.listener(new RetryFailuireItemListener())  
        		.skip()
        		.skipLimit(500)         //一共允许跳过200次异常
        		.taskExecutor(new SimpleAsyncTaskExecutor()) //设置并发方式执行
        		.throttleLimit(10)        //并发任务数为 10,默认为4
        		.transactionManager(transactionManager)
        		.build();
    }
    
}

  BlackListDOItemProcessor 处理类

import ;
import ;
import ;

import ;

import ;

/**
 * @author zhengyong
 *
 */

public class BlackListDOItemProcessor implements ItemProcessor<BlackListDO, BlackListDO> {
    
    public String inputFile;

    public BlackListDOItemProcessor() {
    }
    
    public BlackListDOItemProcessor(String inputFile) {
         = inputFile;
    }
    // 数据处理
    public BlackListDO process(BlackListDO blackListDO) throws Exception {
        (0);
        (().toString().replaceAll("-", ""));
        return blackListDO;
    }

}

  BlackListItemWriter  写入数据库类

import ;
import ;

import ;

public class BlackListItemWriter implements ItemWriter<BlackListDO> {

    @Override
    public void write(List<? extends BlackListDO> blackList) throws Exception {
            // 插入数据库操作 
    }
    
}

  JobCompletionNotificationListener 监听任务

import ;
import ;
import ;
import ;
import ;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    @Override
    public void afterJob(JobExecution jobExecution) {
        
    }

    @Override
    public void beforeJob(JobExecution jobExecution) {
        (jobExecution);
    }
}

  RetryFailuireItemListener 重试监听

import org.;
import org.;
import ;
import ;
import ;

public class RetryFailuireItemListener implements RetryListener{
    
    private static final Logger logger = ();

    @Override
    public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
        return true;
    }

    @Override
    public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
            Throwable throwable) {
    }

    @Override
    public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
            Throwable throwable) {
        ("【重试异常】:"+());
    }

}

2. spring boot 配置

import ;
import ;
import ;
import ;
import ;

@ComponentScan("")
@SpringBootApplication
public class SpringBootJspApplication extends SpringBootServletInitializer{

    /**
     * 500一批
     * oracle : 单条插入基本每分钟2.5W条(50W,19.5min) ,批量插入基本每分钟10W条(50W,4.7mim)
     * mysql  : 单条插入基本每分钟2.5W条(50W,11.4min) ,批量插入基本每分钟40W条(50W,1.3min)
     */
    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        return ();
    }
    
    public static void main(String[] args) throws Exception {
        (,new String[]{"appStart=true"});
    }
}
3. 配置文件

# mysql config
# = mysql 
# = jdbc:mysql://127.0.0.1:3306/spring_batch?useUnicode=true&characterEncoding=utf8
# = admin
# = 123456
# = 
# = classpath:/org/springframework/batch/core/
# = classpath:/org/springframework/batch/core/

# oracle config
 = oracle 
 = jdbc:oracle:thin:@127.0.0.1:1521:spring_batch
 = admin
 = 123456
 = 
 = classpath:org/springframework/batch/core/
 = classpath:org/springframework/batch/core/

# batch config
 = importFileJob
 = true
=true

4. mybatis 配置

import ;

import ;
import ;
import ;
import org.;
import org.;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;

import ;

import ;
import ;

/**
 * mybatis配置
 *
 */
@Configuration
public class MyBatisConfiguration {

    private static final Logger logger = ();

    @Autowired
    SqlSessionFactory sqlSessionFactory;
    @Autowired
    SqlSessionTemplate sessionTemplate;

    @Bean
    public SqlSessionTemplate sqlSessionTemplate() {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

    @Bean
    public BlackListDao blackListMapper() {
        return ();
    }

    @Bean
    @Primary
    @ConfigurationProperties(prefix = "")
    public DataSource dataSource() {
        ("初始化dataSource");
        return new DruidDataSource();
    }

    @Bean
    public PlatformTransactionManager transactionManager(DataSource dataSource) {
        PlatformTransactionManager transactionManager = new DataSourceTransactionManager(dataSource);
        return transactionManager;
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {

        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        (dataSource());

        PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();

        // 获取数据库类型
        String dataBaseType = ("") == null ? "mysql"
                : ("");

        String directory = "classpath:/mapper/" + ().toLowerCase() + "/*.xml";
        ((directory));

        return ();
    }

}


5. 文件

<project xmlns="/POM/4.0.0" xmlns:xsi="http:///2001/XMLSchema-instance"
    xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.">
    <modelVersion>4.0.0</modelVersion>

    <groupId></groupId>
    <artifactId>batch-test</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>batch-test</name>
    <url></url>

    <parent>
        <groupId></groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.2.</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId></groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>spring-batch-core</artifactId>
            <version>3.0.</version>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>tomcat-embed-jasper</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>jstl</artifactId>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>     
    </dependencies>

</project>