什么是spring batch
spring batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。spring batch是spring的一个子项目,使用java语言并基于spring框架为基础开发,使的已经使用 spring 框架的开发者或者企业更容易访问和利用企业服务。
spring batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务,spring batch 同样提供了高级功能和特性来支持,比如分区功能、远程功能。总之,通过 spring batch 能够支持简单的、复杂的和大数据量的批处理作业。
spring batch 使用
我们首先配置spring batch 在spring boot 中的使用,数据库用的是mysql,pom文件如下,因为spring boot 中的spring batch 包含 hsqsldb 所以我们将其去除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-batch</artifactid>
<exclusions> <!-- 注意这里-->
<exclusion>
<groupid>org.hsqldb</groupid>
<artifactid>hsqldb</artifactid>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-jdbc</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
<dependency>
<groupid>org.hibernate</groupid>
<artifactid>hibernate-validator</artifactid>
</dependency>
<dependency>
<groupid>mysql</groupid>
<artifactid>mysql-connector-java</artifactid>
<version> 5.1 . 21 </version>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-test</artifactid>
<scope>test</scope>
</dependency>
|
配置好我们需要的实体类。页面就不展示了。
如果有数据校验添加的话那么我们需要配置自定义的检验器。若果没有课略过该步骤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public class csvbeanvalidator<t> implements validator<t>,initializingbean {
private javax.validation.validator validator;
@override
public void validate(t value) throws validationexception {
set<constraintviolation<t >> constraintviolations=validator.validate(value);
if (constraintviolations.size()> 0 ){
stringbuilder message= new stringbuilder();
for (constraintviolation<t> constraintviolation:constraintviolations){
message.append(constraintviolation.getmessage() + "\n" );
}
throw new validationexception(message.tostring());
}
}
//在这里我们使用的是jsr-303校验数据,在此进行初始化
@override
public void afterpropertiesset() throws exception {
validatorfactory validatorfactory= validation.builddefaultvalidatorfactory();
validator=validatorfactory.usingcontext().getvalidator();
}
}
public class csvitemprocessor extends validatingitemprocessor<person> {
@override
public person process(person item) throws validationexception {
super .process(item); // 在这里启动 然后才会调用我们自定义的校验器,否则不能通过 。
if (item.getnation().equals( "汉族" )){
item.setname( "01" );
} else {
item.setnation( "02" );
}
return item;
}
}
|
进行job任务监听 自定义类实现jobexecutionlistener 即可
1
2
3
4
5
6
7
8
9
10
11
12
13
|
long starttime;
long endtime;
@override
public void beforejob(jobexecution jobexecution) {
starttime = system.currenttimemillis();
system.out.println( "任务处理开始" );
}
@override
public void afterjob(jobexecution jobexecution) {
endtime = system.currenttimemillis();
system.out.println( "耗时多长时间:" + (endtime - starttime) + "ms" );
system.out.println( "任务处理结束" );
}
|
进行spring batch 的注入 方法有xml文件注入bean ,在这里选择java注入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
@configuration
@enablebatchprocessing //开启批处理
public class csvbatchconfig {
/**1 首先我们通过 flatfileitemreader 读取我们需要的文件 通过setresource来实现
* 2 设置map 在这里通过先设置解析器 setlinetokenizer 来解析我们csv文件中的数 据
* 3 setfieldsetmapper 将我们需要的数据转化为我们的实体对象 存储
* 4 如果想 跳过前面的几行 需要使用setlinestoskip就可以实现
*/
@bean
public itemreader<person> reader() throws exception {
flatfileitemreader<person> reader = new flatfileitemreader<person>(); //1
reader.setresource( new classpathresource( "people.csv" )); //2
reader.setlinemapper( new defaultlinemapper<person>() {{ //3
setlinetokenizer( new delimitedlinetokenizer() {{
setnames( new string[] { "name" , "age" , "nation" , "address" });
}});
setfieldsetmapper( new beanwrapperfieldsetmapper<person>() {{
settargettype(person. class );
}});
}});
reader.setlinestoskip( 3 );
return reader;
}
@bean
public itemprocessor<person, person> processor() {
csvitemprocessor processor = new csvitemprocessor(); //1
processor.setvalidator(csvbeanvalidator()); //2
return processor;
}
/**
*写入数据到数据库中
* 1执行的sql 语句 2 设置数据源
*/
@bean
public itemwriter<person> writer(datasource datasource) { //1
jdbcbatchitemwriter<person> writer = new jdbcbatchitemwriter<person>(); //2
writer.setitemsqlparametersourceprovider( new beanpropertyitemsqlparametersourceprovider<person>());
string sql = "insert into person " + "(id,name,age,nation,address) "
+ "values(hibernate_sequence.nextval, :name, :age, :nation,:address)" ;
writer.setsql(sql); //3
writer.setdatasource(datasource);
return writer;
}
// 作业的仓库 就是设置数据源
@bean
public jobrepository jobrepository(datasource datasource, platformtransactionmanager transactionmanager)
throws exception {
jobrepositoryfactorybean jobrepositoryfactorybean = new jobrepositoryfactorybean();
jobrepositoryfactorybean.setdatasource(datasource);
jobrepositoryfactorybean.settransactionmanager(transactionmanager);
jobrepositoryfactorybean.setdatabasetype( "mysql" );
return jobrepositoryfactorybean.getobject();
}
//调度器 使用它来执行 我们的批处理
@bean
public simplejoblauncher joblauncher(datasource datasource, platformtransactionmanager transactionmanager)
throws exception {
simplejoblauncher joblauncher = new simplejoblauncher();
joblauncher.setjobrepository(jobrepository(datasource, transactionmanager));
return joblauncher;
}
//将监听器加入到job中
@bean
public job importjob(jobbuilderfactory jobs, step s1) {
return jobs.get( "importjob" )
.incrementer( new runidincrementer())
.flow(s1) //1
.end()
.listener(csvjoblistener()) //2
.build();
}
//步骤绑定 reader 与writer 一次性处理65000条记录
@bean
public step step1(stepbuilderfactory stepbuilderfactory, itemreader<person> reader, itemwriter<person> writer,
itemprocessor<person,person> processor) {
return stepbuilderfactory
.get( "step1" )
.<person, person>chunk( 65000 ) //1
.reader(reader) //2
.processor(processor) //3
.writer(writer) //4
.build();
}
@bean
public csvjoblistener csvjoblistener() {
return new csvjoblistener();
}
@bean
public validator<person> csvbeanvalidator() {
return new csvbeanvalidator<person>();
}
}
|
在配置文件中 启动自动执行批处理
spring.batch.job.names = job1,job2 #启动时要执行的job,默认执行全部job
spring.batch.job.enabled=true #是否自动执行定义的job,默认是
spring.batch.initializer.enabled=true #是否初始化spring batch的数据库,默认为是
spring.batch.schema=
spring.batch.table-prefix= #设置springbatch的数据库表的前缀
项目汇总
从 项目中我们可以看到 总的步骤就是 首先读取我们需要实现的文件进行解析,然后转换成需要的实体类并且绑定到reader中,二 实现我们需要的writer 并且帮到到数据库上,三实现job监听器将其绑定到步骤中 。最后开启批处理 自动执行入库即可 。这个简单步骤主要是配置中用到的 理解流程 自己也可以方便实现 批处理的流程。
总结
以上所述是小编给大家介绍的springboot和springbatch 使用,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
原文链接:https://yq.aliyun.com/articles/619189