Spring Batch是一个轻量级的框架,完全面向Spring的批处理框架,用于企业级大量的数据读写处理系统。以POJO和Spring 框架为基础,包括日志记录/跟踪,事务管理、 作业处理统计工作重新启动、跳过、资源管理等功能。
业务方案:
1、批处理定期提交。
2、并行批处理:并行处理工作。
3、企业消息驱动处理
4、大规模的并行处理
5、手动或是有计划的重启
6、局部处理:跳过记录(如:回滚)
技术目标:
1、利用Spring编程模型:使程序员专注于业务处理,让Spring框架管理流程。
2、明确分离批处理的执行环境和应用。
3、提供核心的,共通的接口。
4、提供开箱即用(out of the box)的简单的默认的核心执行接口。
5、提供Spring框架中配置、自定义、和扩展服务。
6、所有存在的核心服务可以很容的被替换和扩展,不影响基础层。
7、提供一个简单的部署模式,利用Maven构建独立的Jar文件。
实现步骤:
1.定义处理对象
2.创建中间转换器 ***ItemProcessor 实现 ItemProcessor<I,O>接口
3.创建工作Job BatchConfiguration 主要处理读数据、处理数据、写数据等操作
4.创建listener job执行监听器
一般的批处理系统需要处理大量的数据, 内部消化单条记录失败的情况, 还要管理中断,在重启后也不去重复执行已经处理过的部分
Spring Batch单/多处理单元(processors), 以及多个微线程(tasklets)
具体实现过程:
1.引入jar:
<!-- Spring-boot启动项目 -->
<parent>
<groupId></groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.</version>
</parent>
<properties>
<>1.8</>
</properties>
<!--Spring batch核心包->
<dependencies>
<dependency>
<groupId></groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
2.配置 :配置数据源
######mysql\u6570\u636E\u6E90#########
-class-name=
= wbw123456
= jdbc:mysql://localhost/mydatabase
=root
<a target=_blank href="/spring-batch/">点击打开链接</a>
3.新建实体类
package ;
public class Person {
//ID
private Integer personId;
//姓名
private String personName;
//年龄
private String personAge;
//性别
private String personSex;
public Person(){};
public Person( String personName, String personAge,
String personSex) {
= personName;
= personAge;
= personSex;
}
public Integer getPersonId() {
return personId;
}
public void setPersonId(Integer personId) {
= personId;
}
public String getPersonName() {
return personName;
}
public void setPersonName(String personName) {
= personName;
}
public String getPersonAge() {
return personAge;
}
public void setPersonAge(String personAge) {
= personAge;
}
public String getPersonSex() {
return personSex;
}
public void setPersonSex(String personSex) {
= personSex;
}
}
4.中间转换器:
package ;
import ;
import ;
import ;
import org.;
import org.;
import ;
import ;
import ;
import ;
import ;
/**
* 中间转换器
* @author wbw
*
*/
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
//查询
private static final String GET_PRODUCT = "select * from Person where personName = ?";
private static final Logger log = ();
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public Person process(final Person person) throws Exception {
List<Person> personList = (GET_PRODUCT, new Object[] {()}, new RowMapper<Person>() {
@Override
public Person mapRow( ResultSet resultSet, int rowNum ) throws SQLException {
Person p = new Person();
((1));
((2));
((3));
return p;
}
});
if(() >0){
("该数据已录入!!!");
}
String sex = null;
if(().equals("0")){
sex ="男";
}else{
sex ="女";
}
("转换 (性别:"+()+") 为 (" + sex + ")");
final Person transformedPerson = new Person((), (),sex);
("转换 (" + person + ") 为 (" + transformedPerson + ")");
return transformedPerson;
}
}
5.处理具体工作业务 主要包含三个部分:读数据、处理数据、写数据
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* 处理具体工作业务 主要包含三个部分:读数据、处理数据、写数据
* @author wbw
*
*/
@Configuration
@EnableBatchProcessing
public class PersonBatchConfiguration {
//插入语句
private static final String PERSON_INSERT = "INSERT INTO Person (personName, personAge,personSex) VALUES (:personName, :personAge,:personSex)";
public static final String Person_INSERT = "INSERT INTO Person (id, name,description,quantity) VALUES (:id, :name,:description,:quantity)";
// tag::readerwriterprocessor[] 1.读数据
@Bean
public ItemReader<Person> reader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
//加载外部文件数据 文件类型:CSV
(new ClassPathResource(""));
(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "personName","personAge","personSex" });
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType();
}});
}});
return reader;
}
//2.处理数据
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
//3.写数据
@Bean
public ItemWriter<Person> writer(DataSource dataSource) {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
(new BeanPropertyItemSqlParameterSourceProvider<Person>());
(PERSON_INSERT);
(dataSource);
return writer;
}
// end::readerwriterprocessor[]
// tag::jobstep[]
@Bean
public Job importUserJob(JobBuilderFactory jobs, @Qualifier("step1")Step s1, JobExecutionListener listener) {
return ("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(s1)
.end()
.build();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader,
ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) {
return ("step1")
.<Person, Person> chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// end::jobstep[]
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
}
6.监听器:用于处理任务执行之后和之前
<pre name="code" class="java">package ;
import ;
import ;
import ;
import org.;
import org.;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* Job执行监听器
* @author wbw
*
*/
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
private static final String PERSON_SQL = "SELECT personName, personAge,personSex FROM Person";
private static final Logger log = ();
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
if(==null){
= jdbcTemplate;
}
}
@Override
public void afterJob(JobExecution jobExecution) {
if(() == ) {
("!!! JOB 执行完成!");
List<Person> results = (PERSON_SQL, new RowMapper<Person>() {
@Override
public Person mapRow(ResultSet rs, int row) throws SQLException {
return new Person((1), (2),(3));
}
});
("入库条数---------"+());
for (Person person : results) {
("新增 <" + () + "> 成功!!!!!");
}
}
}
/* (non-Javadoc)
* @see #beforeJob()
*/
@Override
public void beforeJob(JobExecution jobExecution) {
// TODO Auto-generated method stub
(jobExecution);
}
}
7.新建csv文件
<img src="" alt="" />
</pre><pre code_snippet_ snippet_file_name="blog_20160124_10_8470765" name="code" class="java">8.启动执行
<pre name="code" class="java">package ;
import ;
import ;
@SpringBootApplication
public class Application {
public static void main(String[] args) throws Exception {
(, args);
}
}
更多相关资源:/spring-batch/