我就废话不多说了,大家还是直接看代码吧~
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
/*
*es配置类
*
*/
@Configuration
public class ElasticSearchDataSourceConfigurer {
private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer. class );
@Bean
public TransportClient getESClient() {
//设置集群名称
Settings settings = Settings.builder().put( "cluster.name" , "bigData-cluster" ).put( "client.transport.sniff" , true ).build();
//创建client
TransportClient client = null ;
try {
client = new PreBuiltTransportClient(settings)
.addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName( "" ), 9300 )); //集群ip
LOG.info( "ESClient连接建立成功" );
} catch (UnknownHostException e) {
LOG.info( "ESClient连接建立失败" );
e.printStackTrace();
}
return client;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
/**
* Simple to Introduction
*
* @Description: [添加类]
*/
@Repository
public class UserDaoImpl implements userDao {
private static final String INDEXNAME = "user" ; //小写
private static final String TYPENAME = "info" ;
@Resource
TransportClient transportClient;
@Override
public int addUser(User[] user) {
IndexResponse indexResponse = null ;
int successNum = 0 ;
for ( int i = 0 ; i < user.length; i++) {
UUID uuid = UUID.randomUUID();
String str = uuid.toString();
String jsonValue = null ;
try {
jsonValue = JsonUtil.object2JsonString(user[i]);
if (jsonValue != null ) {
indexResponse = transportClient.prepareIndex(INDEXNAME, TYPENAME, str).setSource(jsonValue)
.execute().actionGet();
successNum++;
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
return successNum;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/**
*批量插入
*/
public static void bathAddUser(TransportClient client, List<User> users) {
BulkRequestBuilder bulkRequest = transportClient.prepareBulk();
for ( int i = 0 ; i < users.size(); i++) {
UUID uuid = UUID.randomUUID();
String str = uuid.toString();
String jsonValue = null ;
try {
jsonValue = JsonUtil.object2JsonString(users.get(i));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
bulkRequest.add(client.prepareIndex( "user" , "info" , str).setSource(jsonValue));
// 一万条插入一次
if (i % 10000 == 0 ) {
bulkRequest.execute().actionGet();
}
System.out.println( "已经插入第" + i + "多少条" );
}
}
|
补充知识:使用java创建ES(ElasticSearch)连接池
1.首先要有一个创建连接的工厂类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package com.aly.util;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
/**
* EliasticSearch连接池工厂对象
* @author 00000
*
*/
public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient>{
@Override
public void activateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {
System.out.println( "activateObject" );
}
/**
* 销毁对象
*/
@Override
public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
RestHighLevelClient highLevelClient = pooledObject.getObject();
highLevelClient.close();
}
/**
* 生产对象
*/
// @SuppressWarnings({ "resource" })
@Override
public PooledObject<RestHighLevelClient> makeObject() throws Exception {
// Settings settings = Settings.builder().put("cluster.name","elasticsearch").build();
RestHighLevelClient client = null ;
try {
/*client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"),9300));*/
client = new RestHighLevelClient(RestClient.builder(
new HttpHost( "192.168.1.121" , 9200 , "http" ), new HttpHost( "192.168.1.122" , 9200 , "http" ),
new HttpHost( "192.168.1.123" , 9200 , "http" ), new HttpHost( "192.168.1.125" , 9200 , "http" ),
new HttpHost( "192.168.1.126" , 9200 , "http" ), new HttpHost( "192.168.1.127" , 9200 , "http" )));
} catch (Exception e) {
e.printStackTrace();
}
return new DefaultPooledObject<RestHighLevelClient>(client);
}
@Override
public void passivateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {
System.out.println( "passivateObject" );
}
@Override
public boolean validateObject(PooledObject<RestHighLevelClient> arg0) {
return true ;
}
}
|
2.然后再写我们的连接池工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
package com.aly.util;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.client.RestHighLevelClient;
/**
* ElasticSearch 连接池工具类
*
* @author 00000
*
*/
public class ElasticSearchPoolUtil {
// 对象池配置类,不写也可以,采用默认配置
private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 采用默认配置maxTotal是8,池中有8个client
static {
poolConfig.setMaxTotal( 8 );
}
// 要池化的对象的工厂类,这个是我们要实现的类
private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();
// 利用对象工厂类和配置类生成对象池
private static GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<>(esClientPoolFactory,
poolConfig);
/**
* 获得对象
*
* @return
* @throws Exception
*/
public static RestHighLevelClient getClient() throws Exception {
// 从池中取一个对象
RestHighLevelClient client = clientPool.borrowObject();
return client;
}
/**
* 归还对象
*
* @param client
*/
public static void returnClient(RestHighLevelClient client) {
// 使用完毕之后,归还对象
clientPool.returnObject(client);
}
}
|
以上这篇java连接ElasticSearch集群操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/baidu_16217779/article/details/71633284