1. spring batch 批处理配置
import ;
import org.;
import org.;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* spring batch 配置
* @author
*/
@Configuration
@EnableBatchProcessing
public class BlackListBatchConfiguration {
private static final Logger logger = ();
/**
* 读取外部文件方法
* @return
* @throws IOException
*/
@Bean
@StepScope
public ItemReader<BlackListDO> reader(@Value("#{jobParameters[inputFileBlack]}") String inputFile) throws IOException {
("inputFile:"+new ClassPathResource(inputFile).getURL().getPath());
if(inputFile == null){
("The blacklist reader file is null");
return null;
}
FlatFileItemReader<BlackListDO> reader = new FlatFileItemReader<BlackListDO>();
(new ClassPathResource(inputFile));
(lineMapper());
(1);
(());
return reader;
}
/**
* 读取文本行映射POJO
* @return
*/
@Bean
@StepScope
public LineMapper<BlackListDO> lineMapper() {
DefaultLineMapper<BlackListDO> lineMapper = new DefaultLineMapper<BlackListDO>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
(",");
(false);
(new String[] { "type","value","fraudType"});
BeanWrapperFieldSetMapper<BlackListDO> fieldSetMapper = new BeanWrapperFieldSetMapper<BlackListDO>();
();
(lineTokenizer);
(new BlackListFieldSetMapper());
return lineMapper;
}
/**
* 处理过程
* @return
*/
@Bean
@StepScope
public ItemProcessor<BlackListDO, BlackListDO> processor(@Value("#{jobParameters[inputFileBlack]}") String inputFile) {
return new BlackListDOItemProcessor(inputFile);
}
/**
* 写出内容
* @return
*/
@Bean
@StepScope
public ItemWriter<BlackListDO> writer() {
return new BlackListItemWriter();
}
/**
* 构建job
* @param jobs
* @param s1
* @param listener
* @return
*/
@Bean
public Job importFileJob(JobBuilderFactory jobs, Step step1,JobExecutionListener listener,JobRepository jobRepository) {
return ("importFileJob")
.incrementer(new RunIdIncrementer())
.repository(jobRepository)
.listener(listener)
.flow(step1)
.end()
.build();
}
/**
* 声明step
* @param stepBuilderFactory
* @param reader
* @param writer
* @param processor
* @return
*/
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<BlackListDO> reader,
ItemWriter<BlackListDO> writer, ItemProcessor<BlackListDO, BlackListDO> processor,PlatformTransactionManager transactionManager) {
("step1");
return ("step1")
.<BlackListDO, BlackListDO> chunk(500)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.retry() // 重试
.noRetry()
.retryLimit(1) //每条记录重试一次
.listener(new RetryFailuireItemListener())
.skip()
.skipLimit(500) //一共允许跳过200次异常
.taskExecutor(new SimpleAsyncTaskExecutor()) //设置并发方式执行
.throttleLimit(10) //并发任务数为 10,默认为4
.transactionManager(transactionManager)
.build();
}
}
BlackListDOItemProcessor 处理类
import ;
import ;
import ;
import ;
import ;
/**
* @author zhengyong
*
*/
public class BlackListDOItemProcessor implements ItemProcessor<BlackListDO, BlackListDO> {
public String inputFile;
public BlackListDOItemProcessor() {
}
public BlackListDOItemProcessor(String inputFile) {
= inputFile;
}
// 数据处理
public BlackListDO process(BlackListDO blackListDO) throws Exception {
(0);
(().toString().replaceAll("-", ""));
return blackListDO;
}
}
BlackListItemWriter 写入数据库类
import ;
import ;
import ;
public class BlackListItemWriter implements ItemWriter<BlackListDO> {
@Override
public void write(List<? extends BlackListDO> blackList) throws Exception {
// 插入数据库操作
}
}
JobCompletionNotificationListener 监听任务
import ;
import ;
import ;
import ;
import ;
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
}
@Override
public void beforeJob(JobExecution jobExecution) {
(jobExecution);
}
}
RetryFailuireItemListener 重试监听
import org.;
import org.;
import ;
import ;
import ;
public class RetryFailuireItemListener implements RetryListener{
private static final Logger logger = ();
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
("【重试异常】:"+());
}
}
2. spring boot 配置
import ;
import ;
import ;
import ;
import ;
@ComponentScan("")
@SpringBootApplication
public class SpringBootJspApplication extends SpringBootServletInitializer{
/**
* 500一批
* oracle : 单条插入基本每分钟2.5W条(50W,19.5min) ,批量插入基本每分钟10W条(50W,4.7mim)
* mysql : 单条插入基本每分钟2.5W条(50W,11.4min) ,批量插入基本每分钟40W条(50W,1.3min)
*/
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return ();
}
public static void main(String[] args) throws Exception {
(,new String[]{"appStart=true"});
}
}
3. 配置文件
# mysql config
# = mysql
# = jdbc:mysql://127.0.0.1:3306/spring_batch?useUnicode=true&characterEncoding=utf8
# = admin
# = 123456
# =
# = classpath:/org/springframework/batch/core/
# = classpath:/org/springframework/batch/core/
# oracle config
= oracle
= jdbc:oracle:thin:@127.0.0.1:1521:spring_batch
= admin
= 123456
=
= classpath:org/springframework/batch/core/
= classpath:org/springframework/batch/core/
# batch config
= importFileJob
= true
=true
4. mybatis 配置
import ;
import ;
import ;
import ;
import org.;
import org.;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* mybatis配置
*
*/
@Configuration
public class MyBatisConfiguration {
private static final Logger logger = ();
@Autowired
SqlSessionFactory sqlSessionFactory;
@Autowired
SqlSessionTemplate sessionTemplate;
@Bean
public SqlSessionTemplate sqlSessionTemplate() {
return new SqlSessionTemplate(sqlSessionFactory);
}
@Bean
public BlackListDao blackListMapper() {
return ();
}
@Bean
@Primary
@ConfigurationProperties(prefix = "")
public DataSource dataSource() {
("初始化dataSource");
return new DruidDataSource();
}
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
PlatformTransactionManager transactionManager = new DataSourceTransactionManager(dataSource);
return transactionManager;
}
@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
(dataSource());
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
// 获取数据库类型
String dataBaseType = ("") == null ? "mysql"
: ("");
String directory = "classpath:/mapper/" + ().toLowerCase() + "/*.xml";
((directory));
return ();
}
}
5. 文件
<project xmlns="/POM/4.0.0" xmlns:xsi="http:///2001/XMLSchema-instance"
xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.">
<modelVersion>4.0.0</modelVersion>
<groupId></groupId>
<artifactId>batch-test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>batch-test</name>
<url></url>
<parent>
<groupId></groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.2.</version>
</parent>
<dependencies>
<dependency>
<groupId></groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-batch-core</artifactId>
<version>3.0.</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>tomcat-embed-jasper</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>jstl</artifactId>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>