spring batch 是一个开源的批处理框架.执行一系列的任务. 在 spring batch 中 一个job 是由许多 step 组成的。而每一个 step 又是由 read-process-write task或者 单个 task 组成。
1. "read-process-write" 处理,根据字面意思理解就可以:
- read 就是从资源文件里面读取数据,比如从xml文件,csv文件,数据库中读取数据.
- process 就是处理读取的数据
- write 就是将处理过的数据写入到其他资源文件中去,可以是xml,csv,或者数据库.
比如:从csv文件中 读取数据,经过处理之后,保存到数据库. spring batch 提供了很多类去处理这方面的东西。
2.单个task, 也就是处理单个任务。比如在一个step 开始之前或者完成之后清除资源文件等.
3.许多个step 组成在一起,就组成了一个job. 所以他们之间的关系,就如同下面的描述:
一个 job = 很多steps
一个step = 一个read-process-write 或者 一个task.
同样一个job = step1 -->step2--step3 这样链表形式的组成.
spring batch 例子
考虑如下一个批处理的例子,看起来有点啰嗦,只是为了说明用途:
1. step1 : 从 a 文件夹中读取csv 文件,处理之后,写入到b文件夹中(read-process-write)
2. step2 : 从 b 文件夹中读取csv文件 ,处理之后, 存储到数据库中(read-process-write).
3. step3 : 删除b文件夹下的csv文件。(用到单个task)
4. step4 : 从数据库读取数据,处理之后,生成xml报表文件(read-process-write).
5. step5 : 读取xml报表,并发送email给管理员(用到单个task)
用spring batch 我们可以如下定义这个job:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
<job id= "abcjob" xmlns= "http://www.springframework.org/schema/batch" >
<step id= "step1" next= "step2" >
<tasklet>
<chunk reader= "cvsitemreader" writer= "cvsitemwriter"
processor= "itemprocesser" commit-interval= "1" />
</tasklet>
</step>
<step id= "step2" next= "step3" >
<tasklet>
<chunk reader= "cvsitemreader" writer= "databaseitemwriter"
processor= "itemprocesser" commit-interval= "1" />
</tasklet>
</step>
<step id= "step3" next= "step4" >
<tasklet ref= "filedeletingtasklet" />
</step>
<step id= "step4" next= "step5" >
<tasklet>
<chunk reader= "databaseitemreader" writer= "xmlitemwriter"
processor= "itemprocesser" commit-interval= "1" />
</tasklet>
</step>
<step id= "step5" >
<tasklet ref= "sendingemailtasklet" />
</step>
</job>
|
整个 job 的执行是存储在数据库中的,所以即使是某一个step出错失败,也不需要全部从头开始执行这个job.下面是一个真正的入门教程例子.
采用 jar包如下:
spring-batch-2.2.3 以上版本,但是我在2.2.3版本中发现 org/springframework/batch/core/schema-mysql.sql 里面的的mysql 创建表的语句是有问题的,也就是少了“," 号导致的问题( not null, 后面几个创建表的语句not null 后面少了逗号),当然你可以自己修改后再执行,执行完毕后有如下几个表:
xstream-1.3.jar 必须的。
jettison-1.3.3.jar也是必须的, 否则会出现
java.lang.noclassdeffounderror: org/codehaus/jettison/mapped/mappedxmloutputfactory错误。
另外我用的spring 是 3.1 版本的,可以下载相关jar包,还有apache common 相关jar包就可以了。
mysql-connect-java-5.1.jar 连接mysql 数据库用的。
假设要将如下 csv 文件读取出来处理之后,写入到一个xml文件之中.
1
2
3
|
, "213,100" , 980 , "mkyong" , 29 / 7 / 2013
, "320,200" , 1080 , "staff 1" , 30 / 7 / 2013
, "342,197" , 1200 , "staff 2" , 31 / 7 / 2013
|
用 flatfileitemreader 去读取csv 文件, 用 itemprocessor 去处理数据,用 staxeventitemwriter 去写数据
job 的定义如下(job-hello-world.xml):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
<beans xmlns= "http://www.springframework.org/schema/beans"
xmlns:batch= "http://www.springframework.org/schema/batch" xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance"
xsi:schemalocation="http: //www.springframework.org/schema/batch
http: //www.springframework.org/schema/batch/spring-batch-2.2.xsd
http: //www.springframework.org/schema/beans
http: //www.springframework.org/schema/beans/spring-beans-3.1.xsd
">
< import resource= "../config/context.xml" />
< import resource= "../config/database.xml" />
<bean id= "report" class = "yihaomen.model.report" scope= "prototype" />
<bean id= "itemprocessor" class = "yihaomen.customitemprocessor" />
<batch:job id= "helloworldjob" >
<batch:step id= "step1" >
<batch:tasklet>
<batch:chunk reader= "cvsfileitemreader" writer= "xmlitemwriter" processor= "itemprocessor"
commit-interval= "10" >
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id= "cvsfileitemreader" class = "org.springframework.batch.item.file.flatfileitemreader" >
<property name= "resource" value= "classpath:cvs/input/report.csv" />
<property name= "linemapper" >
<bean class = "org.springframework.batch.item.file.mapping.defaultlinemapper" >
<property name= "linetokenizer" >
<bean
class = "org.springframework.batch.item.file.transform.delimitedlinetokenizer" >
<property name= "names" value= "id,sales,qty,staffname,date" />
</bean>
</property>
<property name= "fieldsetmapper" >
<bean class = "yihaomen.reportfieldsetmapper" />
<!-- if no data type conversion, use beanwrapperfieldsetmapper to map by name
<bean
class = "org.springframework.batch.item.file.mapping.beanwrapperfieldsetmapper" >
<property name= "prototypebeanname" value= "report" />
</bean>
-->
</property>
</bean>
</property>
</bean>
<bean id= "xmlitemwriter" class = "org.springframework.batch.item.xml.staxeventitemwriter" >
<property name= "resource" value= "file:xml/outputs/report.xml" />
<property name= "marshaller" ref= "reportmarshaller" />
<property name= "roottagname" value= "report" />
</bean>
<bean id= "reportmarshaller" class = "org.springframework.oxm.jaxb.jaxb2marshaller" >
<property name= "classestobebound" >
<list>
<value>yihaomen.model.report</value>
</list>
</property>
</bean>
</beans>
|
映射csv文件到 report 对象并写xml文件 (通过 jaxb annotations).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
package yihaomen.model;
import java.math.bigdecimal;
import java.util.date;
import javax.xml.bind.annotation.xmlattribute;
import javax.xml.bind.annotation.xmlelement;
import javax.xml.bind.annotation.xmlrootelement;
@xmlrootelement (name = "record" )
public class report {
private int id;
private bigdecimal sales;
private int qty;
private string staffname;
private date date;
@xmlattribute (name = "id" )
public int getid() {
return id;
}
public void setid( int id) {
this .id = id;
}
@xmlelement (name = "sales" )
public bigdecimal getsales() {
return sales;
}
public void setsales(bigdecimal sales) {
this .sales = sales;
}
@xmlelement (name = "qty" )
public int getqty() {
return qty;
}
public void setqty( int qty) {
this .qty = qty;
}
@xmlelement (name = "staffname" )
public string getstaffname() {
return staffname;
}
public void setstaffname(string staffname) {
this .staffname = staffname;
}
public date getdate() {
return date;
}
public void setdate(date date) {
this .date = date;
}
@override
public string tostring() {
return "report [id=" + id + ", sales=" + sales + ", qty=" + qty + ", staffname=" + staffname + "]" ;
}
}
|
为了转换日期,用了自定义的 fieldsetmapper. 如果没有数据需要转换, beanwrapperfieldsetmapper 通过名称name 去自动映射值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package yihaomen;
import java.text.parseexception;
import java.text.simpledateformat;
import org.springframework.batch.item.file.mapping.fieldsetmapper;
import org.springframework.batch.item.file.transform.fieldset;
import org.springframework.validation.bindexception;
import yihaomen.model.report;
public class reportfieldsetmapper implements fieldsetmapper<report> {
private simpledateformat dateformat = new simpledateformat( "yyyy-mm-dd" );
@override
public report mapfieldset(fieldset fieldset) throws bindexception {
report report = new report();
report.setid(fieldset.readint( 0 ));
report.setsales(fieldset.readbigdecimal( 1 ));
report.setqty(fieldset.readint( 2 ));
report.setstaffname(fieldset.readstring( 3 ));
//default format yyyy-mm-dd
//fieldset.readdate(4);
string date = fieldset.readstring( 4 );
try {
report.setdate(dateformat.parse(date));
} catch (parseexception e) {
e.printstacktrace();
}
return report;
}
}
|
在写入数据之前调用itemprocessor 处理数据
1
2
3
4
5
6
7
8
9
10
11
|
package yihaomen;
import org.springframework.batch.item.itemprocessor;
import yihaomen.model.report;
public class customitemprocessor implements itemprocessor<report, report> {
@override
public report process(report item) throws exception {
system.out.println( "processing..." + item);
return item;
}
}
|
spring 配置文件和数据库配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
<beans xmlns= "http://www.springframework.org/schema/beans"
xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance"
xsi:schemalocation="
http: //www.springframework.org/schema/beans
http: //www.springframework.org/schema/beans/spring-beans-3.2.xsd">
<!-- stored job-meta in memory -->
<!--
<bean id= "jobrepository"
class = "org.springframework.batch.core.repository.support.mapjobrepositoryfactorybean" >
<property name= "transactionmanager" ref= "transactionmanager" />
</bean>
-->
<!-- stored job-meta in database -->
<bean id= "jobrepository"
class = "org.springframework.batch.core.repository.support.jobrepositoryfactorybean" >
<property name= "datasource" ref= "datasource" />
<property name= "transactionmanager" ref= "transactionmanager" />
<property name= "databasetype" value= "mysql" />
</bean>
<bean id= "transactionmanager"
class = "org.springframework.batch.support.transaction.resourcelesstransactionmanager" />
<bean id= "joblauncher"
class = "org.springframework.batch.core.launch.support.simplejoblauncher" >
<property name= "jobrepository" ref= "jobrepository" />
</bean>
</beans>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
<beans xmlns= "http://www.springframework.org/schema/beans"
xmlns:jdbc= "http://www.springframework.org/schema/jdbc"
xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance"
xsi:schemalocation="http: //www.springframework.org/schema/beans
http: //www.springframework.org/schema/beans/spring-beans-3.2.xsd
http: //www.springframework.org/schema/jdbc
http: //www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd">
<!-- connect to mysql database -->
<bean id= "datasource"
class = "org.springframework.jdbc.datasource.drivermanagerdatasource" >
<property name= "driverclassname" value= "com.mysql.jdbc.driver" />
<property name= "url" value= "jdbc:mysql://localhost:3306/test" />
<property name= "username" value= "root" />
<property name= "password" value= "" />
</bean>
<bean id= "transactionmanager"
class = "org.springframework.batch.support.transaction.resourcelesstransactionmanager" />
<!-- create job-meta tables automatically -->
<jdbc:initialize-database data-source= "datasource" >
<jdbc:script location= "org/springframework/batch/core/schema-drop-mysql.sql" />
<jdbc:script location= "org/springframework/batch/core/schema-mysql.sql" />
</jdbc:initialize-database>
</beans>
|
运行程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
package yihaomen;
import org.springframework.batch.core.job;
import org.springframework.batch.core.jobexecution;
import org.springframework.batch.core.jobparameters;
import org.springframework.batch.core.launch.joblauncher;
import org.springframework.context.applicationcontext;
import org.springframework.context.support.classpathxmlapplicationcontext;
public class app {
public static void main(string[] args) {
string[] springconfig =
{
"spring/batch/jobs/job-hello-world.xml"
};
applicationcontext context =
new classpathxmlapplicationcontext(springconfig);
joblauncher joblauncher = (joblauncher) context.getbean( "joblauncher" );
job job = (job) context.getbean( "helloworldjob" );
try {
jobexecution execution = joblauncher.run(job, new jobparameters());
system.out.println( "exit status : " + execution.getstatus());
} catch (exception e) {
e.printstacktrace();
}
system.out.println( "done" );
}
}
|
运行结果 :
1
2
3
4
5
6
7
8
9
10
11
|
十二月 03 , 2013 8 : 56 : 24 下午 org.springframework.batch.core.launch.support.simplejoblauncher$ 1 run
info: job: [flowjob: [name=helloworldjob]] launched with the following parameters: [{}]
十二月 03 , 2013 8 : 56 : 24 下午 org.springframework.batch.core.job.simplestephandler handlestep
info: executing step: [step1]
processing...report [id= 1001 , sales= 213100 , qty= 980 , staffname=yihaomen]
processing...report [id= 1002 , sales= 320200 , qty= 1080 , staffname=staff 1 ]
processing...report [id= 1003 , sales= 342197 , qty= 1200 , staffname=staff 2 ]
十二月 03 , 2013 8 : 56 : 25 下午 org.springframework.batch.core.launch.support.simplejoblauncher$ 1 run
info: job: [flowjob: [name=helloworldjob]] completed with the following parameters: [{}] and the following status: [completed]
exit status : completed
done
|
结果生成了output.xml 在你工程目录的 xml 目录下。
整个源代码,除去jar包之后下载:spring batch 入门教程下载
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://blog.csdn.net/achuo/article/details/50982298