spring batch学习笔记

时间:2021-11-23 09:53:32

Spring Batch是什么?
      Spring
Batch是一个基于Spring的企业级批处理框架,按照我师父的说法,所有基于Spring的框架都是使用了spring的IoC特性,然后加上自己
的一些处理规则。因此,要理解Spring Batch的设计和使用,首先需要理解批处理的机制和特点。

      所谓企业批处理就是指在企业级应用中,不需要人工干预,定期读取数据,进行相应的业务处理之后,再进行归档的这类操作。从上面的描述中可以看出,批处理的整个流程可以明显的分为3个阶段:

         1、读数据

         2、业务处理

         3、归档结果数据

另外,从定义中可以发现批处理的一个重要特色就是无需人工干预、定期执行,因此一个批处理框架,需要关注事务的粒度,日志监控,执行方式,资源管理,读数据,处理数据,写数据的解耦等方面。

      SpringBatch为我们提供了什么呢?

       1、统一的读写接口

       2、丰富的任务处理方式、

       3、灵活的事务管理及并发处理

       4、日志、监控、任务重启与跳过等特性

注意,Spring Batch未提供关于批处理任务调度的功能,因此如何周期性的调用批处理任务需要自己想办法解决,就Java来说,Quartz是一个不错的解决方案,或者写脚本处理之。

Spring Batch First Demo

      前面讲了很多Spring Batch的特性,接下来就通过一个小例子来看看Spring Batch是如何实现 批处理的读数据-》处理数据-》归档结果这一过程的。

       首先,搭建项目框架,推荐大家使用Maven或者Gradle结构的项目,不会的,赶紧学学,对于学习新技术省很多时间。一个Spring项目需要依赖的lib(可能有多,大家可以试探性的删掉一些不必要的包)如下:

  1. <dependency>
  2. <groupId>org.springframework</groupId>
  3. <artifactId>spring-beans</artifactId>
  4. <version>${springframework.core.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework</groupId>
  8. <artifactId>spring-aop</artifactId>
  9. <version>${springframework.core.version}</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework</groupId>
  13. <artifactId>spring-context</artifactId>
  14. <version>${springframework.core.version}</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.springframework</groupId>
  18. <artifactId>spring-core</artifactId>
  19. <version>${springframework.core.version}</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework</groupId>
  23. <artifactId>spring-jdbc</artifactId>
  24. <version>${springframework.core.version}</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.springframework</groupId>
  28. <artifactId>spring-test</artifactId>
  29. <version>${springframework.core.version}</version>
  30. <scope>test</scope>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework</groupId>
  34. <artifactId>spring-tx</artifactId>
  35. <version>${springframework.core.version}</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.springframework.batch</groupId>
  39. <artifactId>spring-batch-core</artifactId>
  40. <version>${spring.batch.version}</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.springframework.batch</groupId>
  44. <artifactId>spring-batch-infrastructure</artifactId>
  45. <version>${spring.batch.version}</version>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.springframework.batch</groupId>
  49. <artifactId>spring-batch-test</artifactId>
  50. <version>${spring.batch.version}</version>
  51. <scope>test</scope>
  52. </dependency>
  53. <dependency>
  54. <groupId>junit</groupId>
  55. <artifactId>junit</artifactId>
  56. <version>4.10</version>
  57. <scope>test</scope>
  58. </dependency>

项目构建好以后,首先开始写读取数据的逻辑,Spring
Batch针对读、写操作提供了很多实现方式,包括文件,数据库,对于数据库的操作还提供了很多ORM框架(Hibernate,iBatis,JPA)
的支持,这儿为了简单,以读文件作为例子,假设我们需要读取一个文件中所有人的信息,大于16岁的需要发信息需要发信息通知它去*局办理身份证。简化文
件如下:

  1. TWer1,15
  2. TWer2,21
  3. TWer3,13
  4. TWer4,16
  5. TWer5,25
  6. TWer6,45
  7. TWer7,16

,这儿需要的Spring Batch的读文件功能就是把文件中的每一行都能转化为一个内存对象,其对应的类就是User.java

  1. public class User {
  2. String name;
  3. int age;
  4. public String getName() {
  5. return name;
  6. }
  7. public void setName(String name) {
  8. this.name = name;
  9. }
  10. public int getAge() {
  11. return age;
  12. }
  13. public void setAge(int age) {
  14. this.age = age;
  15. }
  16. }

另外,需要在message_job.xml中配置如下内容

  1. <bean id="messageReader" class="org.springframework.batch.item.file.FlatFileItemReader">
  2. <property name="lineMapper" ref="lineMapper"/>
  3. <property name="resource" value="/message/user.txt"/>
  4. </bean>
  5. <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
  6. <property name="lineTokenizer" ref="lineTokenizer"/>
  7. <property name="fieldSetMapper" ref="fieldSetMapper"/>
  8. </bean>
  9. <bean id="fieldSetMapper" class="com.ning.demo.UserMapper"/>
  10. <bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"/>

,该配置文件中除了UserMapper外,都是SpringBatch默认提供的。UserMapper.java代码如下:

  1. public class UserMapper implements FieldSetMapper<User> {
  2. @Override
  3. public User mapFieldSet(FieldSet fieldSet) throws BindException {
  4. User user = new User();
  5. user.setName(fieldSet.readString(0));
  6. user.setAge(fieldSet.readInt(1));
  7. return user;
  8. }
  9. }

这样,文件中的每一行数据都会变成一个User类的instance。

      接下来,是处理数据的过程,判断每个user的年龄,如果大于16,就生成一条Message。

  1. public class MessageProcessor implements ItemProcessor<User, Message> {
  2. @Override
  3. public Message process(User item) throws Exception {
  4. Message message = null;
  5. if (item.getAge() > 16) {
  6. message = new Message();
  7. message.setContent(item.getName() + ",Please come to police station!");
  8. }
  9. return message;
  10. }
  11. }

该类实现了SpringBatch的ItemProcessor接口,

最后,把处理后得到的所有Message打印到Console上,

  1. public class MessageWriter implements ItemWriter<Message> {
  2. @Override
  3. public void write(List<? extends Message> items) throws Exception {
  4. System.out.println("Results:");
  5. for (Message item : items) {
  6. System.out.println(item.getContent());
  7. }
  8. }
  9. }

该类实现了SpringBatch的ItemWriter接口。SpringBatch本身提供了多种Writer实现。

通过上面的几个步骤,把读数据,处理数据,写数据都构造出来了,那么那么是如何串联起来的呢?答案是配置文件,

  1. <batch:job id="messageJob">
  2. <batch:step id="messageStep">
  3. <batch:tasklet>
  4. <batch:chunk reader="messageReader" processor="messageProcessor" writer="messageWriter"
  5. commit-interval="10"
  6. chunk-completion-policy="">
  7. </batch:chunk>
  8. </batch:tasklet>
  9. </batch:step>
  10. </batch:job>
  11. <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
  12. <property name="transactionManager" ref="transactionManager"/>
  13. </bean>
  14. <bean id="messageReader" class="org.springframework.batch.item.file.FlatFileItemReader">
  15. <property name="lineMapper" ref="lineMapper"/>
  16. <property name="resource" value="/message/user.txt"/>
  17. </bean>
  18. <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
  19. <property name="lineTokenizer" ref="lineTokenizer"/>
  20. <property name="fieldSetMapper" ref="fieldSetMapper"/>
  21. </bean>
  22. <bean id="fieldSetMapper" class="com.ning.demo.UserMapper"/>
  23. <bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"/>
  24. <bean id="messageProcessor" class="com.ning.demo.MessageProcessor"/>
  25. <bean id="messageWriter" class="com.ning.demo.MessageWriter"/>
  26. <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

Spring
Batch 将批处理任务称为一个 Job,同时,Job 下分为多个 Step。Step
是一个独立的、顺序的处理步骤,包含该步骤批处理中需要的所有信息。多个批处理 Step 按照一定的流程组成一个
Job。通过这样的设计方式,我们可以灵活配置 Job 的处理过程。

接下来的问题是如何运行构建好的BatchJob呢?SpringBatch提供了JobLauncher接口用于运行Job,并提供了一个默认的SimpleJobLauncher实现。

  1. public class Main {
  2. public static void main(String[] args) {
  3. ClassPathXmlApplicationContext c =
  4. new ClassPathXmlApplicationContext("message_job.xml");
  5. SimpleJobLauncher launcher = new SimpleJobLauncher();
  6. launcher.setJobRepository((JobRepository) c.getBean("jobRepository"));
  7. launcher.setTaskExecutor(new SimpleAsyncTaskExecutor());
  8. try {
  9. launcher.run((Job) c.getBean("messageJob"), new JobParameters());
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }


行BatchJob时需要为 JobLauncher 指定一个 JobRepository,该类负责创建一个 JobExecution 对象来执行
Job,其次,需要指定一个任务执行器,我们使用 Spring Batch 提供的 SimpleAsyncTaskExecutor。最后,通过
run 方法来执行指定的 Job。运行结果如下:

Results:

TWer2,Please come to police station!

TWer5,Please come to police station!

TWer6,Please come to police station!

转:http://ningandjiao.iteye.com/blog/1616584