1.介绍
当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率。Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据。由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入。
2.示例
2.1 pom.xml
本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
<? xml version = "1.0" encoding = "UTF-8" ?>
< project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
< modelVersion >4.0.0</ modelVersion >
< groupId >com.hfcsbc.estl</ groupId >
< artifactId >es-etl</ artifactId >
< version >0.0.1-SNAPSHOT</ version >
< packaging >jar</ packaging >
< name >es-etl</ name >
< description >Demo project for Spring Boot</ description >
< parent >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-starter-parent</ artifactId >
< version >2.0.0.M7</ version >
< relativePath /> <!-- lookup parent from repository -->
</ parent >
< properties >
< project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding >
< project.reporting.outputEncoding >UTF-8</ project.reporting.outputEncoding >
< java.version >1.8</ java.version >
</ properties >
< dependencies >
< dependency >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-starter</ artifactId >
</ dependency >
< dependency >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-starter-data-jpa</ artifactId >
</ dependency >
< dependency >
< groupId >org.postgresql</ groupId >
< artifactId >postgresql</ artifactId >
</ dependency >
< dependency >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-starter-batch</ artifactId >
</ dependency >
< dependency >
< groupId >com.github.vanroy</ groupId >
< artifactId >spring-boot-starter-data-jest</ artifactId >
< version >3.0.0.RELEASE</ version >
</ dependency >
< dependency >
< groupId >io.searchbox</ groupId >
< artifactId >jest</ artifactId >
< version >5.3.2</ version >
</ dependency >
< dependency >
< groupId >org.projectlombok</ groupId >
< artifactId >lombok</ artifactId >
</ dependency >
< dependency >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-starter-test</ artifactId >
< scope >test</ scope >
</ dependency >
</ dependencies >
< build >
< plugins >
< plugin >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-maven-plugin</ artifactId >
</ plugin >
</ plugins >
</ build >
< repositories >
< repository >
< id >spring-snapshots</ id >
< name >Spring Snapshots</ name >
< url >https://repo.spring.io/snapshot</ url >
< snapshots >
< enabled >true</ enabled >
</ snapshots >
</ repository >
< repository >
< id >spring-milestones</ id >
< name >Spring Milestones</ name >
< url >https://repo.spring.io/milestone</ url >
< snapshots >
< enabled >false</ enabled >
</ snapshots >
</ repository >
</ repositories >
< pluginRepositories >
< pluginRepository >
< id >spring-snapshots</ id >
< name >Spring Snapshots</ name >
< url >https://repo.spring.io/snapshot</ url >
< snapshots >
< enabled >true</ enabled >
</ snapshots >
</ pluginRepository >
< pluginRepository >
< id >spring-milestones</ id >
< name >Spring Milestones</ name >
< url >https://repo.spring.io/milestone</ url >
< snapshots >
< enabled >false</ enabled >
</ snapshots >
</ pluginRepository >
</ pluginRepositories >
</ project >
|
2.2 实体类及repository
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package com.hfcsbc.esetl.domain;
import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToOne;
/**
* Create by pengchao on 2018/2/23
*/
@Document (indexName = "person" , type = "person" , shards = 1 , replicas = 0 , refreshInterval = "-1" )
@Entity
@Data
public class Person {
@Id
private Long id;
private String name;
@OneToOne
@Field (type = FieldType.Nested)
private Address address;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
package com.hfcsbc.esetl.domain;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
/**
* Create by pengchao on 2018/2/23
*/
@Entity
@Data
public class Address {
@Id
private Long id;
private String name;
}
|
1
2
3
4
5
6
7
8
|
package com.hfcsbc.esetl.repository.jpa;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* Create by pengchao on 2018/2/23
*/
public interface PersonRepository extends JpaRepository<Person, Long> {
}
|
1
2
3
4
5
6
7
8
|
package com.hfcsbc.esetl.repository.es;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
/**
* Create by pengchao on 2018/2/23
*/
public interface EsPersonRepository extends ElasticsearchRepository<Person, Long> {
}
|
2.3 配置elasticsearchItemWriter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
package com.hfcsbc.esetl.itemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;
import java.util.List;
/**
* Create by pengchao on 2018/2/23
*/
public class ElasticsearchItemWriter implements ItemWriter<Person>, ItemWriteListener<Person>, StepExecutionListener {
private EsPersonRepository personRepository;
public ElasticsearchItemWriter(EsPersonRepository personRepository) {
this .personRepository = personRepository;
}
@Override
public void beforeWrite(List<? extends Person> items) {
}
@Override
public void afterWrite(List<? extends Person> items) {
}
@Override
public void onWriteError(Exception exception, List<? extends Person> items) {
}
@Override
public void beforeStep(StepExecution stepExecution) {
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null ;
}
@Override
public void write(List<? extends Person> items) throws Exception {
//实现类AbstractElasticsearchRepository的saveAll方法调用的是elasticsearchOperations.bulkIndex(queries),为批量索引
personRepository.saveAll(items);
}
}
|
2.4 配置ElasticsearchItemReader(本示例未使用,仅供参考)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package com.hfcsbc.esetl.itemReader;
import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import java.util.Iterator;
/**
* Create by pengchao on 2018/2/24
*/
public class ElasticsearchItemReader<Person> extends AbstractPaginatedDataItemReader<Person> implements InitializingBean {
private final ElasticsearchOperations elasticsearchOperations;
private final SearchQuery query;
private final Class<? extends Person> targetType;
public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends Person> targetType) {
this .elasticsearchOperations = elasticsearchOperations;
this .query = query;
this .targetType = targetType;
}
@Override
protected Iterator<Person> doPageRead() {
return (Iterator<Person>)elasticsearchOperations.queryForList(query, targetType).iterator();
}
@Override
public void afterPropertiesSet() throws Exception {
}
}
|
2.5 配置spring batch需要的配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
package com.hfcsbc.esetl.config;
import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;
/**
* Create by pengchao on 2018/2/23
*/
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private EsPersonRepository personRepository;
@Bean
public ItemReader<Person> orderItemReader(EntityManagerFactory entityManagerFactory){
JpaPagingItemReader<Person> reader = new JpaPagingItemReader<Person>();
String sqlQuery = "select * from person" ;
try {
JpaNativeQueryProvider<Person> queryProvider = new JpaNativeQueryProvider<Person>();
queryProvider.setSqlQuery(sqlQuery);
queryProvider.setEntityClass(Person. class );
queryProvider.afterPropertiesSet();
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize( 10000 );
reader.setQueryProvider(queryProvider);
reader.afterPropertiesSet();
reader.setSaveState( true );
} catch (Exception e) {
e.printStackTrace();
}
return reader;
}
@Bean
public ElasticsearchItemWriter itemWriter(){
return new ElasticsearchItemWriter(personRepository);
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory,
ItemReader itemReader,
ItemWriter itemWriter){
return stepBuilderFactory
.get( "step1" )
.chunk( 10000 )
.reader(itemReader)
.writer(itemWriter)
.build();
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, Step step){
return jobBuilderFactory
.get( "importJob" )
.incrementer( new RunIdIncrementer())
.flow(step)
.end()
.build();
}
/**
* spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource
* @param dataSource
* @param manager
* @return
*/
@Bean
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(manager);
jobRepositoryFactoryBean.setDatabaseType( "postgres" );
try {
return jobRepositoryFactoryBean.getObject();
} catch (Exception e) {
e.printStackTrace();
}
return null ;
}
}
|
2.6配置数据库及es的连接地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
spring:
redis:
host: 192.168.1.222
data:
jest:
uri: http://192.168.1.222:9200
username: elastic
password: changeme
jpa:
database: POSTGRESQL
show-sql: true
hibernate:
ddl-auto: update
datasource:
platform: postgres
url: jdbc:postgresql://192.168.1.222:5433/person
username: hfcb
password: hfcb
driver-class-name: org.postgresql.Driver
max-active: 2
spring.batch.initialize-schema: always
|
2.7 配置入口类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
package com.hfcsbc.esetl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
@SpringBootApplication (exclude = {ElasticsearchAutoConfiguration. class , ElasticsearchDataAutoConfiguration. class })
@EnableElasticsearchRepositories (basePackages = "com.hfcsbc.esetl.repository" )
@EnableJpaRepositories (basePackages = "com.hfcsbc.esetl.repository.jpa" )
public class EsEtlApplication {
public static void main(String[] args) {
SpringApplication.run(EsEtlApplication. class , args);
}
}
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.wisely.top/2018/02/24/spring-batch-elasticsearch