JAVA开发与JAVA(一文学会使用ElasticSearch)

时间:2021-09-14 01:01:28

        在web网站的架设中特别是数据量大的网站或者APP小程序需要搜索或者全文检索的场景,几乎都需要借助ElasticSearch来作为全文检索引擎,以提高网站的搜索效率和性能。

这一节,我们通过一篇文章介绍,使大家通过一文就学会使用ElasticSearch。

一、ElasticSearch介绍:

JAVA开发与JAVA(一文学会使用ElasticSearch)

 

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。

ElasticSearch相关概念:

a)、索引index,相当于数据库中的database。
b)、类型type相当于数据库中的table。
c)、主键id相当于数据库中记录的主键,是唯一的。
d)、文档 document (相当于一条数据)
文档是ElasticSearch的基本单位。在Es中文档以JSON格式来表示
向es中的index下面的type中存储json类型的数据。
e) 、字段是文档中的field 属性,需要对每一个属性定义索引和被搜索的方式

二、ElasticSearch的安装:

1、先安装jdk

2、安装ElasticSearch

直接进入elasticsearch的官网,下载最新的安装包:https://www.elastic.co/downloads/elasticsearch,此教程使用的是5.1.1版本。

将下载的安装包上传到centos,或者直接在centos使用wget命令下载。

解压:

unzip elasticsearch-5.1.1.zip

运行:

cd bin
./elasticsearch

三、java语言操作ElasticSearch:

1、maven依赖

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.6.2</version>
        </dependency>
 
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.6.2</version>
        </dependency>
 

2、连接ElasticSearch

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
 
import java.io.IOException;
 
public class EsClientTest {
 
    public static void main(String[] args) throws IOException {
        RestHighLevelClient esClient = new RestHighLevelClient(
                RestClient.builder(new HttpHost("IP",9200,"http"))
        );
        System.out.println("success");
        esClient.close();
    }
 
}

 3、连接的相关api

    public static RestHighLevelClient esClient;
 
    static {
        esClient = new RestHighLevelClient(
                RestClient.builder(new HttpHost("IP", 9200, "http"))
        );
    }

4、创建索引操作:

    /**
     * 创建索引
     * @throws IOException
     */
    public static void createIndex() throws IOException {
        CreateIndexRequest createIndexRequest = new CreateIndexRequest("user");
        CreateIndexResponse indexResponse = esClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
        boolean acknowledged = indexResponse.isAcknowledged();
        System.out.println("索引创建状态:" + acknowledged);
    }

5、获取索引:

/**
     * 索引信息查询
     * @throws IOException
     */
    public static void getIndex() throws IOException {
        GetIndexRequest getIndexRequest = new GetIndexRequest("user");
        GetIndexResponse getIndexResponse = esClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
        System.out.println(getIndexResponse.getAliases());
        System.out.println(getIndexResponse.getMappings());
        System.out.println(getIndexResponse.getSettings());
    }

6、删除索引:

    /**
     * 删除索引
     * @throws IOException
     */
    public static void deleteIndex() throws IOException {
        DeleteIndexRequest getIndexRequest = new DeleteIndexRequest("user");
        AcknowledgedResponse delete = esClient.indices().delete(getIndexRequest, RequestOptions.DEFAULT);
        System.out.println("索引删除状态:" + delete.isAcknowledged());
    }

7、添加数据:

    /**
     * 添加数据
     * @throws Exception
     */
    public static void add() throws Exception{
        IndexRequest indexRequest = new IndexRequest();
        indexRequest.index("user").id("1008");
        User user = new User();
        user.setName("茅河野人");
        user.setAge(28);
        user.setSex("男");
        user.setSalary(50000);
 
        String userData = objectMapper.writeValueAsString(user);
        indexRequest.source(userData,XContentType.JSON);
        //插入数据
        IndexResponse response = esClient.index(indexRequest, RequestOptions.DEFAULT);
        System.out.println(response.status());
        System.out.println(response.getResult());
    }

8、修改数据:

    /**
     * 修改数据
     * @throws Exception
     */
    public static void update() throws Exception{
        UpdateRequest request = new UpdateRequest();
        request.index("user").id("1008");
        request.doc(XContentType.JSON,"name","茅河野人");
        //插入数据
        UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT);
        System.out.println(response.getResult());
    }

9、删除数据:

    /**
     * 删除
     * @throws Exception
     */
    public static void delete() throws Exception{
        DeleteRequest request = new DeleteRequest();
        request.index("user").id("1008");
        //插入数据
        DeleteResponse delete = esClient.delete(request, RequestOptions.DEFAULT);
        System.out.println(delete.getResult());
    }

10、批量添加数据:

    /**
     * 批量添加
     * @throws Exception
     */
    public static void batchInsert() throws Exception{
 
        BulkRequest bulkRequest = new BulkRequest();
 
        User user1 = new User("关羽","男",33,5500);
        String userData1 = objectMapper.writeValueAsString(user1);
        IndexRequest indexRequest1 = new IndexRequest().index("user").id("1002").source(userData1, XContentType.JSON);
 
        bulkRequest.add(indexRequest1);
 
        User user2 = new User("黄忠","男",50,8000);
        String userData2 = objectMapper.writeValueAsString(user2);
        IndexRequest indexRequest2 = new IndexRequest().index("user").id("1003").source(userData2, XContentType.JSON);
        bulkRequest.add(indexRequest2);
 
        User user3 = new User("黄忠2","男",49,10000);
        String userData3 = objectMapper.writeValueAsString(user3);
        IndexRequest indexRequest3 = new IndexRequest().index("user").id("1004").source(userData3, XContentType.JSON);
        bulkRequest.add(indexRequest3);
 
        User user4 = new User("赵云","男",33,12000);
        String userData4 = objectMapper.writeValueAsString(user4);
        IndexRequest indexRequest4 = new IndexRequest().index("user").id("1005").source(userData4, XContentType.JSON);
        bulkRequest.add(indexRequest4);
 
        User user5 = new User("马超","男",38,20000);
        String userData5 = objectMapper.writeValueAsString(user5);
        IndexRequest indexRequest5 = new IndexRequest().index("user").id("1006").source(userData5, XContentType.JSON);
        bulkRequest.add(indexRequest5);
 
        User user6 = new User("关羽","男",41,27000);
        String userData6 = objectMapper.writeValueAsString(user6);
        IndexRequest indexRequest6 = new IndexRequest().index("user").id("1007").source(userData6, XContentType.JSON);
        bulkRequest.add(indexRequest6);
 
        BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        System.out.println(bulkResponse.status());
        System.out.println(bulkResponse.getItems());
    }

11、批量删除数据:

    /**
     * 批量删除
     * @throws Exception
     */
    public static void batchDelete() throws Exception{
        BulkRequest bulkRequest = new BulkRequest();
        DeleteRequest indexRequest1 = new DeleteRequest().index("user").id("1002");
        DeleteRequest indexRequest2 = new DeleteRequest().index("user").id("1003");
        DeleteRequest indexRequest3 = new DeleteRequest().index("user").id("1004");
        DeleteRequest indexRequest4 = new DeleteRequest().index("user").id("1005");
        DeleteRequest indexRequest5 = new DeleteRequest().index("user").id("1006");
        DeleteRequest indexRequest6 = new DeleteRequest().index("user").id("1007");
 
        bulkRequest.add(indexRequest1);
        bulkRequest.add(indexRequest2);
        bulkRequest.add(indexRequest3);
        bulkRequest.add(indexRequest4);
        bulkRequest.add(indexRequest5);
        bulkRequest.add(indexRequest6);
 
        BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        System.out.println(bulkResponse.status());
        System.out.println(bulkResponse.getItems());
    }

13、删除某个索引下所有数据:

    /**
     * 查询某个索引下的所有数据
     * @throws Exception
     */
    public static void searchIndexAll() throws Exception{
        SearchRequest request = new SearchRequest();
        request.indices("user");
        // 索引中的全部数据查询
        SearchSourceBuilder query = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
        request.source(query);
        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        for (SearchHit searchHit : hits){
            System.out.println(searchHit.getSourceAsString());
        }
    }

 14、根据条件查询:

        TermQueryBuilder ageQueryBuilder = QueryBuilders.termQuery("sex", "女");
        SearchSourceBuilder query = new SearchSourceBuilder().query(ageQueryBuilder);
        request.source(query);
        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        System.out.println(response.getHits().getHits());
        System.out.println(response.getHits().getTotalHits());
        SearchHits hits = response.getHits();
        for (SearchHit searchHit : hits){
            System.out.println(searchHit.getSourceAsString());
        }

15、分页查询:

        SearchSourceBuilder sourceBuilder = new                         
        SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
        sourceBuilder.from(0).size(3);
        request.source(sourceBuilder);
        SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
        System.out.println(response.getHits().getHits());
        System.out.println(response.getHits().getTotalHits());
        SearchHits hits = response.getHits();
        for (SearchHit searchHit : hits){
            System.out.println(searchHit.getSourceAsString());
        }

四、在springboot中的运用

1、maven依赖

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
      </dependency>

 

2、yml配置文件:

# es 服务地址
elasticsearch.host=IP
# es 服务端口
elasticsearch.port=9200
# 配置日志级别,开启 debug 日志
logging.level.com.congge=debug

3、实际例子:

创建一个实体类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
 
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Document(indexName = "shopping", shards = 3, replicas = 1)
public class Product {
    //必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"
    @Id
    private Long id;//商品唯一标识
 
    /**
     * type : 字段数据类型
     * analyzer : 分词器类型
     * index : 是否索引(默认:true)
     * Keyword : 短语,不进行分词
     */
    @Field(type = FieldType.Text, analyzer = "ik_max_word")
    private String title;//商品名称
 
    @Field(type = FieldType.Keyword)
    private String category;//分类名称
 
    @Field(type = FieldType.Double)
    private Double price;//商品价格
 
    @Field(type = FieldType.Keyword, index = false)
    private String images;//图片地址
}

提供接口:

import com.congge.entity.Product;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
 
@Repository
public interface ProductDao extends ElasticsearchRepository<Product, Long>{
 
}

配置类:

import lombok.Data;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
//import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
 
@ConfigurationProperties(prefix = "elasticsearch")
@Configuration
@Data
public class EsConfig extends com.congge.config.AbstractElasticsearchConfiguration {
 
    private String host ;
    private Integer port ;
 
    //重写父类方法
    @Override
    public RestHighLevelClient elasticsearchClient() {
        RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));
        RestHighLevelClient restHighLevelClient = new
                RestHighLevelClient(builder);
        return restHighLevelClient;
    }
}
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
 
public abstract class AbstractElasticsearchConfiguration extends ElasticsearchConfigurationSupport {
 
    //需重写本方法
    public abstract RestHighLevelClient elasticsearchClient();
 
    @Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
    public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter elasticsearchConverter) {
        return new ElasticsearchRestTemplate(elasticsearchClient(), elasticsearchConverter);
    }
}

测试1:

import com.congge.entity.Product;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.test.context.junit4.SpringRunner;
 
@RunWith(SpringRunner.class)
@SpringBootTest
public class EsIndexTest {
 
    //注入 ElasticsearchRestTemplate
    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;
 
    //创建索引并增加映射配置
    @Test
    public void createIndex(){
        //创建索引,系统初始化会自动创建索引
        System.out.println("创建索引");
    }
 
    @Test
    public void deleteIndex(){
        //创建索引,系统初始化会自动创建索引
        boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);
        System.out.println("删除索引 = " + flg);
    }
 
}

测试2:

import com.congge.dao.ProductDao;
import com.congge.entity.Product;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.test.context.junit4.SpringRunner;
 
import java.util.ArrayList;
import java.util.List;
 
@RunWith(SpringRunner.class)
@SpringBootTest
public class EsDocTest {
 
 
    @Autowired
    private ProductDao productDao;
 
    /**
     * 新增
     */
    @Test
    public void save() {
        Product product = new Product();
        product.setId(2L);
        product.setTitle("ipad mini");
        product.setCategory("ipad");
        product.setPrice(1998.0);
        product.setImages("http://ipad.jpg");
        productDao.save(product);
    }
 
 
    //修改
    @Test
    public void update(){
        Product product = new Product();
        product.setId(2L);
        product.setTitle("iphone");
        product.setCategory("mobile");
        product.setPrice(6999.0);
        product.setImages("http://www.phone.jpg");
        productDao.save(product);
    }
 
    //根据 id 查询
    @Test
    public void findById(){
        Product product = productDao.findById(2L).get();
        System.out.println(product);
    }
 
    //查询所有
    @Test
    public void findAll(){
        Iterable<Product> products = productDao.findAll();
        for (Product product : products) {
            System.out.println(product);
        }
    }
 
    //删除
    @Test
    public void delete(){
        Product product = new Product();
        product.setId(2L);
        productDao.delete(product);
    }
 
    //批量新增
    @Test
    public void saveAll(){
        List<Product> productList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Product product = new Product();
            product.setId(Long.valueOf(i));
            product.setTitle("iphone" + i);
            product.setCategory("mobile");
            product.setPrice(5999.0 + i);
            product.setImages("http://www.phone.jpg");
            productList.add(product);
        }
        productDao.saveAll(productList);
    }
 
    //分页查询
    @Test
    public void findByPageable(){
        //设置排序(排序方式,正序还是倒序,排序的 id)
        Sort sort = Sort.by(Sort.Direction.DESC,"id");
        int currentPage=0;//当前页,第一页从 0 开始, 1 表示第二页
        int pageSize = 5;//每页显示多少条
        //设置查询分页
        PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
        //分页查询
        Page<Product> productPage = productDao.findAll(pageRequest);
        for (Product Product : productPage.getContent()) {
            System.out.println(Product);
        }
    }
 
    /**
     * term 查询
     * search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
     */
    @Test
    public void termQuery(){
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "iphone");
        Iterable<Product> products = productDao.search(termQueryBuilder);
        for (Product product : products) {
            System.out.println(product);
        }
    }
 
    /**
     * term 查询加分页
     */
    @Test
    public void termQueryByPage(){
        int currentPage= 0 ;
        int pageSize = 5;
        //设置查询分页
        PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "phone");
        Iterable<Product> products =
                productDao.search(termQueryBuilder,pageRequest);
        for (Product product : products) {
            System.out.println(product);
        }
    }
 
}

五、将mysql数据写入Elasticsearch例子

package com.example.esdemo.service.impl;

import com.example.esdemo.config.DBHelper;
import com.example.esdemo.imports.ImportDb2Es;
import com.example.esdemo.service.ImportService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;


/**
 * 导入db2es 实现类
 */
@Component
public class ImportServiceImpl implements ImportService {

    private static final Logger logger = LogManager.getLogger(ImportServiceImpl.class);

    @Autowired
    private RestHighLevelClient client;


    @Override
    public void importDb2Es(ImportDb2Es importDb2Es) {
        writeMySQLDataToES(importDb2Es.getDbTableName(),importDb2Es.getDbTableName());
    }


    private void writeMySQLDataToES(String tableName,String esIndeName) {
        BulkProcessor bulkProcessor = getBulkProcessor(client);
        Connection connection = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            connection = DBHelper.getConn();
            logger.info("start handle data :" + tableName);
            String sql = "select * from " + tableName;
            ps = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            // 根据自己需要设置 fetchSize
            ps.setFetchSize(20);
            rs = ps.executeQuery();
            ResultSetMetaData colData = rs.getMetaData();
            ArrayList<HashMap<String, String>> dataList = new ArrayList<>();
            HashMap<String, String> map = null;
            int count = 0;
            // c 就是列的名字   v 就是列对应的值
            String c = null;
            String v = null;
            while (rs.next()) {
                count++;
                map = new HashMap<String, String>(128);
                for (int i = 1; i < colData.getColumnCount(); i++) {
                    c = colData.getColumnName(i);
                    v = rs.getString(c);
                    map.put(c, v);
                }
                dataList.add(map);
                // 每1万条 写一次   不足的批次的数据 最后一次提交处理
                if (count % 10000 == 0) {
                    logger.info("mysql handle data  number:" + count);
                    // 将数据添加到 bulkProcessor
                    for (HashMap<String, String> hashMap2 : dataList) {
                        bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
                    }
                    // 每提交一次 清空 map 和  dataList
                    map.clear();
                    dataList.clear();
                }
            }
            // 处理 未提交的数据
            for (HashMap<String, String> hashMap2 : dataList) {
                bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
            }
            bulkProcessor.flush();

        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            try {
                rs.close();
                ps.close();
                connection.close();
                boolean terinaFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
                logger.info(terinaFlag);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    private BulkProcessor getBulkProcessor(RestHighLevelClient client) {

        BulkProcessor bulkProcessor = null;
        try {

            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    logger.info("Try to insert data number : "
                            + request.numberOfActions());
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      BulkResponse response) {
                    logger.info("************** Success insert data number : "
                            + request.numberOfActions() + " , id: " + executionId);
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
                }
            };

            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
                    .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
            builder.setBulkActions(5000);
            builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
            builder.setConcurrentRequests(10);
            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
            // 注意点:让参数设置生效
            bulkProcessor = builder.build();

        } catch (Exception e) {
            e.printStackTrace();
            try {
                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
            } catch (Exception e1) {
                logger.error(e1.getMessage());
            }
        }
        return bulkProcessor;
    }
}