java连接ElasticSearch集群操作

时间:2022-08-23 13:43:53

我就废话不多说了,大家还是直接看代码吧~

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/*
 *es配置类
 *
 */
 
@Configuration
public class ElasticSearchDataSourceConfigurer {
 
  private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer.class);
  @Bean
  public TransportClient getESClient() {
    //设置集群名称
    Settings settings = Settings.builder().put("cluster.name", "bigData-cluster").put("client.transport.sniff", true).build();
    //创建client
    TransportClient client = null;
    try {
      client = new PreBuiltTransportClient(settings)
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(""), 9300));//集群ip
      LOG.info("ESClient连接建立成功");
    } catch (UnknownHostException e) {
      LOG.info("ESClient连接建立失败");
      e.printStackTrace();
    }
    return client;
  }
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * Simple to Introduction
 *
 * @Description: [添加类]
 */
@Repository
public class UserDaoImpl implements userDao {
 
    private static final String INDEXNAME = "user";//小写
    private static final String TYPENAME = "info";
 
    @Resource
    TransportClient transportClient;
 
    @Override
    public int addUser(User[] user) {
        IndexResponse indexResponse = null;
        int successNum = 0;
        for (int i = 0; i < user.length; i++) {
            UUID uuid = UUID.randomUUID();
            String str = uuid.toString();
            String jsonValue = null;
            try {
                jsonValue = JsonUtil.object2JsonString(user[i]);
                if (jsonValue != null) {
                    indexResponse = transportClient.prepareIndex(INDEXNAME, TYPENAME, str).setSource(jsonValue)
                            .execute().actionGet();
                    successNum++;
                }
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
 
        }
        return successNum;
    }
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
 *批量插入
 */
public static void bathAddUser(TransportClient client, List<User> users) {
 
        BulkRequestBuilder bulkRequest = transportClient.prepareBulk();
        for (int i = 0; i < users.size(); i++) {
            UUID uuid = UUID.randomUUID();
            String str = uuid.toString();
 
            String jsonValue = null;
            try {
                jsonValue = JsonUtil.object2JsonString(users.get(i));
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            bulkRequest.add(client.prepareIndex("user", "info", str).setSource(jsonValue));
            // 一万条插入一次
            if (i % 10000 == 0) {
                bulkRequest.execute().actionGet();
            }
            System.out.println("已经插入第" + i + "多少条");
        }
 
    }

补充知识:使用java创建ES(ElasticSearch)连接池

1.首先要有一个创建连接的工厂类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.aly.util;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
 
/**
 * EliasticSearch连接池工厂对象
 * @author 00000
 *
 */
public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient>{
 
    @Override
    public void activateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {
        System.out.println("activateObject");
        
    }
    
    /**
     * 销毁对象
     */
    @Override
    public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
        RestHighLevelClient highLevelClient = pooledObject.getObject();
        highLevelClient.close();
    }
    
    /**
     * 生产对象
     */
//  @SuppressWarnings({ "resource" })
    @Override
    public PooledObject<RestHighLevelClient> makeObject() throws Exception {
//      Settings settings = Settings.builder().put("cluster.name","elasticsearch").build();
        RestHighLevelClient client = null;
        try {
            /*client = new PreBuiltTransportClient(settings)
          .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"),9300));*/
            client = new RestHighLevelClient(RestClient.builder(
                    new HttpHost("192.168.1.121", 9200, "http"), new HttpHost("192.168.1.122", 9200, "http"),
                    new HttpHost("192.168.1.123", 9200, "http"), new HttpHost("192.168.1.125", 9200, "http"),
                    new HttpHost("192.168.1.126", 9200, "http"), new HttpHost("192.168.1.127", 9200, "http")));
 
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new DefaultPooledObject<RestHighLevelClient>(client);
    }
 
    @Override
    public void passivateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {
        System.out.println("passivateObject");
    }
 
    @Override
    public boolean validateObject(PooledObject<RestHighLevelClient> arg0) {
        return true;
    }  
}

2.然后再写我们的连接池工具类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.aly.util;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.client.RestHighLevelClient;
 
/**
 * ElasticSearch 连接池工具类
 *
 * @author 00000
 *
 */
public class ElasticSearchPoolUtil {
    // 对象池配置类,不写也可以,采用默认配置
    private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    // 采用默认配置maxTotal是8,池中有8个client
    static {
        poolConfig.setMaxTotal(8);
    }
    // 要池化的对象的工厂类,这个是我们要实现的类
    private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();
    // 利用对象工厂类和配置类生成对象池
    private static GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<>(esClientPoolFactory,
            poolConfig);
 
    /**
     * 获得对象
     *
     * @return
     * @throws Exception
     */
    public static RestHighLevelClient getClient() throws Exception {
        // 从池中取一个对象
        RestHighLevelClient client = clientPool.borrowObject();
        return client;
    }
 
    /**
     * 归还对象
     *
     * @param client
     */
    public static void returnClient(RestHighLevelClient client) {
        // 使用完毕之后,归还对象
        clientPool.returnObject(client);
    }
}

以上这篇java连接ElasticSearch集群操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/baidu_16217779/article/details/71633284