Spring Batch 初始化数据,清空数据的做法

时间:2021-08-12 21:07:34

这两天用了一下Spring Batch来导数据,感觉是个很强大的工具。网上的资料很多,没看到连续运行的示例。从mkong和spring官网上搞了几段代码,在本地跑了一下发现几个问题:


1. initialize-database执行官方的drop table失败

报权限问题,检查了一下,无法就是清理spring batch任务表中的数据

DROP TABLE  BATCH_STEP_EXECUTION_CONTEXT ;
DROP TABLE BATCH_JOB_EXECUTION_CONTEXT ;
DROP TABLE BATCH_STEP_EXECUTION ;
DROP TABLE BATCH_JOB_EXECUTION_PARAMS ;
DROP TABLE BATCH_JOB_EXECUTION ;
DROP TABLE BATCH_JOB_INSTANCE ;

DROP TABLE BATCH_STEP_EXECUTION_SEQ ;
DROP TABLE BATCH_JOB_EXECUTION_SEQ ;
DROP TABLE BATCH_JOB_SEQ ;
其实多次执行的时候不用drop这个表,所以将这块去掉了,改成了由

<jdbc:script location="classpath:init.sql" />
调用一段本地的脚本


2. 连续运行job失败

这个是传参的问题,每次给JobParameters设置不同的参数才能顺利的连续运行。

HashMap<String, JobParameter> parameters = new HashMap<String, JobParameter>();
parameters.put("currentTime", new JobParameter(System.currentTimeMillis()));
JobExecution execution = jobLauncher.run(job, new JobParameters(parameters));
加了个当前时间,搞定


3.第二次执行导入的时候报主键错误

因为每次都是完全拷贝,没做检查,所以在插入前有必要清理所有的内容。可是程序在多次运行(注意,是连续运行)的情况下,只会运行一次initialize-database的内容。

翻遍了例子所有的step都要有reader和writer。于是手动写了一个Reader和Writer来跑清理的过程。注意还要写一个Listener,不然会一直执行下去,这个是用来让计数器归位的。

job的配置

<batch:tasklet transaction-manager="transactionManagerJob">
<batch:chunk reader="deleteReader" writer="deleteWriter"
commit-interval="1">
</batch:chunk>
<batch:listeners>
<batch:listener ref="resettingListener"/>
</batch:listeners>
</batch:tasklet>
reader的配置

<beans:bean id="deleteReader" class="xxx.InitReader">
<property name="limit" value="1"/>
</beans:bean>
writer的配置

<beans:bean id="deleteWriter" class="xxxx.InitWriter">
<property name="dataSource" ref="targetDataSource" />
<property name="sql"
value="
DELETE FROM XXX;
DELETE FROM YYY;
" />
</beans:bean>
listener的配置

<beans:bean id="resettingListener" class="xxx.InitResettingListener">
<beans:property name="reader" ref="deleteReader" />
</beans:bean>

reader的代码

public class InitReader extends ItemReaderAdapter<Object> {
private int limit = 1;
private int counter = 0;

public Object read() throws Exception, UnexpectedInputException, ParseException {
if(counter<limit){
counter++;
return new Object();
}
return null;
}

/**
* @param limit number of items that will be generated
* (null returned on consecutive calls).
*/
public void setLimit(int limit) {
this.limit = limit;
}

public int getCounter() {
return counter;
}

public int getLimit() {
return limit;
}

public void resetCounter()
{
this.counter = 0;
}

public void afterPropertiesSet() throws Exception {
Assert.notNull(limit, "limit must be set");
}
}
writer的代码

public class InitWriter extends ItemWriterAdapter<Object>{
private DriverManagerDataSource dataSource;
private String sql;


public DriverManagerDataSource getDataSource() {
return dataSource;
}

public void setDataSource(DriverManagerDataSource dataSource) {
this.dataSource = dataSource;
}

public String getSql() {
return sql;
}

public void setSql(String sql) {
this.sql = sql;
}

public void write(List<? extends Object> items) throws Exception {
//System.out.println("init write delete");
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement=connection.prepareStatement(sql);
preparedStatement.executeUpdate();
System.out.println("init delete data complete.");
}

public void afterPropertiesSet() throws Exception {
Assert.notNull(dataSource, "dataSource limit must be set");
Assert.notNull(sql, "sql limit must be set");
}
}

listener的代码

public class InitResettingListener extends StepExecutionListenerSupport implements InitializingBean {

private InitReader reader;

public ExitStatus afterStep(StepExecution stepExecution) {
this.reader.resetCounter();
return null;
}

public void setReader(InitReader reader) {
this.reader = reader;
}

public void afterPropertiesSet() throws Exception {
Assert.notNull(this.reader, "The 'reader' must be set.");
}
}


搞定!!!