Elasticsearch学习笔记(3)

时间:2024-10-02 07:58:20

RestAPI

Elasticsearch(ES)官方提供了多种语言的客户端库,用于与Elasticsearch进行交互。这些客户端库的主要功能是帮助开发者更方便地构建和发送DSL(Domain Specific Language)查询语句,并通过HTTP请求与Elasticsearch集群进行通信。

官方文档地址:

https://www.elastic.co/guide/en/elasticsearch/client/index.html

初始化RestClient 

Maven 配置

以下是使用 maven 作为依赖项管理器配置依赖项的方法。将以下内容添加到pom.xml文件中:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.12.1</version>
</dependency>

application.properties

# 定义应用程序的名称,用于服务发现、健康检查等
spring.application.name=demo

# 配置Elasticsearch服务器的地址
spring.elasticsearch.uris=http://localhost:9201
# 配置Elasticsearch的用户名,用于身份验证
spring.elasticsearch.username=elastic
# 配置Elasticsearch的密码,用于身份验证
spring.elasticsearch.password=changeme

Elasticsearch配置类

package com.example.demo.Config;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Elasticsearch配置类,用于配置和创建Elasticsearch客户端
 */
@Configuration
public class ElasticsearchConfig {

    /**
     * 创建并配置RestHighLevelClient客户端实例
     * 该方法将在Spring容器中注册一个RestHighLevelClient类型的Bean,供应用程序使用
     *
     * @return 返回配置好的RestHighLevelClient实例
     */
    @Bean
    public RestHighLevelClient client() {
        // 使用RestClient.builder方法构建一个HttpHost对象,指定Elasticsearch服务器的地址和端口
        // 这里连接的是本地的Elasticsearch服务器,端口为9201,使用HTTP协议
        return new RestHighLevelClient(
                RestClient.builder(new HttpHost("localhost", 9201, "http"))
        );
    }
}

为了单元测试方便,我们创建一个测试类

package com.example.demo;

import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.MainResponse;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertNotNull;

@SpringBootTest
public class ElasticsearchConnectionTest {

    @Autowired
    private RestHighLevelClient client;

    @Test
    public void testElasticsearchConnection() throws IOException {
        MainResponse response = client.info(RequestOptions.DEFAULT);
        assertNotNull(response);
        System.out.println("Elasticsearch集群名称: " + response.getClusterName());
        System.out.println("Elasticsearch节点名称: " + response.getNodeName());
    }
}

创建、删除索引库

先创建ElasticsearchIndexService类

package com.example.demo.Service;

import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * Elasticsearch索引服务类
 * 提供创建和删除索引的功能
 */
@Service
public class ElasticsearchIndexService {

    @Autowired
    private RestHighLevelClient client;

    /**
     * 创建索引
     * 如果索引不存在,则创建一个新的索引
     *
     * @param indexName 索引名称
     */
    public void createIndex(String indexName) {
        try {
            // 检查索引是否存在
            if (!indexExists(indexName)) {
                // 创建索引请求
                CreateIndexRequest request = new CreateIndexRequest(indexName);
                // 发送创建索引请求
                CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
                // 判断索引是否成功创建
                if (createIndexResponse.isAcknowledged()) {
                    System.out.println("索引库创建成功: " + indexName);
                } else {
                    System.out.println("索引库创建失败: " + indexName);
                }
            } else {
                // 如果索引已存在,打印提示信息
                System.out.println("索引库已存在: " + indexName);
            }
        } catch (IOException e) {
            // 捕获并打印异常信息
            e.printStackTrace();
        }
    }
    /**
     * 删除索引
     * 如果索引存在,则删除该索引
     *
     * @param indexName 索引名称
     */
    public void deleteIndex(String indexName) {
        try {
            // 检查索引是否存在
            if (indexExists(indexName)) {
                // 创建删除索引请求
                DeleteIndexRequest request = new DeleteIndexRequest(indexName);
                // 发送删除索引请求
                AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
                // 判断索引是否成功删除
                if (deleteIndexResponse.isAcknowledged()) {
                    System.out.println("索引库删除成功: " + indexName);
                } else {
                    System.out.println("索引库删除失败: " + indexName);
                }
            } else {
                // 如果索引不存在,打印提示信息
                System.out.println("索引库不存在: " + indexName);
            }
        } catch (IOException e) {
            // 捕获并打印异常信息
            e.printStackTrace();
        }
    }
    /**
     * 检查索引是否存在
     *
     * @param indexName 索引名称
     * @return true如果索引存在,否则返回false
     */
    public boolean indexExists(String indexName) {
        // 创建一个获取索引请求对象,并传入索引名称
        GetIndexRequest request = new GetIndexRequest(indexName);
        try {
            // 调用Elasticsearch客户端的indices().exists()方法,传入请求对象和请求选项,返回索引是否存在的布尔值
            return client.indices().exists(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            // 捕获并打印异常信息
            e.printStackTrace();
            // 如果发生异常,则认为索引不存在,返回false
            return false;
        }
    }

}

在Spring Boot应用程序中,可以通过调用这些服务方法来操作Elasticsearch索引库。

package com.example.demo.Component;

import com.example.demo.Service.ElasticsearchIndexService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class ElasticsearchRunner implements CommandLineRunner {

    @Autowired
    private ElasticsearchIndexService elasticsearchIndexService;

    @Override
    public void run(String... args) throws Exception {
        String indexName = "my_index";

        // 创建索引
        elasticsearchIndexService.createIndex(indexName);

        // 判断索引是否存在
        boolean exists = elasticsearchIndexService.indexExists(indexName);
        System.out.println("索引库是否存在: " + exists);

        // 删除索引
        elasticsearchIndexService.deleteIndex(indexName);
    }
}

RestClient操作文档

单一文档 API

1. 索引 API (Index API)

用于将单一文档添加或更新到索引中。

  • HTTP 方法:PUT 或 POST
  • 请求 URL:PUT /<index>/_doc/<id> 或 POST /<index>/_doc/
  • 功能:如果提供了 ID,则更新文档;如果没有 ID,则创建一个新文档。

示例:

PUT /my-index/_doc/1
{
  "user": "kimchy",
  "message": "trying out Elasticsearch"
}
@Service  // 注解表明该类是一个 Spring 服务类,用于业务逻辑处理
public class ElasticsearchService {

    @Autowired  // 自动装配 RestHighLevelClient 实例
    private RestHighLevelClient client;

    // 索引文档方法
    public String indexDocument(String index, String id, Map<String, Object> document) throws IOException {
        // 创建 IndexRequest,指定索引名称和文档 ID,并设置要存储的文档内容
        IndexRequest request = new IndexRequest(index)
                .id(id)  // 设置文档 ID
                .source(document);  // 设置文档源数据(一个 Map 对象)
        
        // 使用 client 对象调用 index 方法,将请求发送到 Elasticsearch
        IndexResponse response = client.index(request, RequestOptions.DEFAULT);
        
        // 返回创建后的文档 ID
        return response.getId();
    }
}

 2. 获取 API (Get API)

用于通过文档 ID 获取文档。

  • HTTP 方法:GET
  • 请求 URL:GET /<index>/_doc/<id>
  • 功能:返回包含文档的 JSON 数据。

示例:

GET /my-index/_doc/1
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动注入客户端
    private RestHighLevelClient client;

    // 获取文档方法
    public Map<String, Object> getDocument(String index, String id) throws IOException {
        // 创建 GetRequest 请求,指定索引和文档 ID
        GetRequest request = new GetRequest(index, id);
        
        // 执行请求,返回 GetResponse 对象
        GetResponse response = client.get(request, RequestOptions.DEFAULT);
        
        // 返回文档内容 (source),即存储在文档中的数据
        return response.getSource();
    }
}

 3. 获取源 API (Get Source API)

仅返回文档的 _source 字段,而不包含元数据。

  • HTTP 方法:GET
  • 请求 URL:GET /<index>/_source/<id>
  • 功能:返回文档的 _source 部分,省略其他信息。

示例:

GET /my-index/_source/1
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动装配客户端
    private RestHighLevelClient client;

    // 获取文档源数据
    public Map<String, Object> getDocumentSource(String index, String id) throws IOException {
        // 创建 GetSourceRequest,指定索引和文档 ID
        GetSourceRequest request = new GetSourceRequest(index, id);
        
        // 执行请求,返回源数据 (即文档内容)
        return client.getSource(request, RequestOptions.DEFAULT);
    }
}

 4. 存在 API (Exists API)

检查文档是否存在。

  • HTTP 方法:HEAD
  • 请求 URL:HEAD /<index>/_doc/<id>
  • 功能:如果文档存在返回 200 状态码,否则返回 404。

示例:

HEAD /my-index/_doc/1
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动注入客户端
    private RestHighLevelClient client;

    // 检查文档是否存在
    public boolean documentExists(String index, String id) throws IOException {
        // 创建 GetRequest 请求,但不获取文档源数据
        GetRequest request = new GetRequest(index, id);
        request.fetchSourceContext(new FetchSourceContext(false));  // 禁止返回文档源
        request.storedFields("_none_");  // 不返回任何存储字段
        
        // 调用 exists 方法检查文档是否存在
        return client.exists(request, RequestOptions.DEFAULT);
    }
}

 5. 删除 API (Delete API)

用于删除指定 ID 的文档。

  • HTTP 方法:DELETE
  • 请求 URL:DELETE /<index>/_doc/<id>
  • 功能:删除指定文档,如果不存在则返回 404。

示例:

DELETE /my-index/_doc/1
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动装配客户端
    private RestHighLevelClient client;

    // 删除文档方法
    public String deleteDocument(String index, String id) throws IOException {
        // 创建 DeleteRequest 请求,指定索引和文档 ID
        DeleteRequest request = new DeleteRequest(index, id);
        
        // 执行请求,返回 DeleteResponse 响应
        DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
        
        // 返回删除结果(比如 "DELETED")
        return response.getResult().name();
    }
}

 6. 更新 API (Update API)

部分更新文档,允许你只更新部分字段而不是替换整个文档。

  • HTTP 方法:POST
  • 请求 URL:POST /<index>/_update/<id>
  • 功能:可以通过脚本或部分字段更新文档。

示例:

POST /my-index/_update/1
{
  "doc": {
    "message": "updated message"
  }
}
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动装配客户端
    private RestHighLevelClient client;

    // 更新文档
    public String updateDocument(String index, String id, Map<String, Object> updates) throws IOException {
        // 创建 UpdateRequest 请求,指定索引和文档 ID,并设置更新的内容
        UpdateRequest request = new UpdateRequest(index, id)
                .doc(updates);  // 使用 doc 方法更新指定字段
        
        // 执行更新请求,返回 UpdateResponse 响应
        UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
        
        // 返回更新结果
        return response.getResult().name();
    }
}

7. 词向量 API (Term Vectors API)

用于返回文档的词项向量,包括词项频率、词频逆文档频率等。

  • HTTP 方法:GET
  • 请求 URL:GET /<index>/_termvectors/<id>
  • 功能:分析文档并返回词项的详细统计数据。

示例:

GET /my-index/_termvectors/1
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动注入客户端
    private RestHighLevelClient client;

    // 获取词项向量
    public TermVectorsResponse getTermVectors(String index, String id) throws IOException {
        // 创建 TermVectorsRequest 请求,指定索引和文档 ID
        TermVectorsRequest request = new TermVectorsRequest(index, id);
        
        // 执行请求,返回 TermVectorsResponse 响应
        return client.termvectors(request, RequestOptions.DEFAULT);
    }
}

多文档 API

1. 批量 API (Bulk API)

允许你在一次请求中执行多次操作(如创建、更新、删除)。

  • HTTP 方法:POST
  • 请求 URL:POST /_bulk
  • 功能:在一个请求中进行批量的索引、删除、更新操作。

示例:

POST /_bulk
{ "index" : { "_index" : "my-index", "_id" : "1" } }
{ "user" : "kimchy", "message" : "trying out Elasticsearch" }
{ "delete" : { "_index" : "my-index", "_id" : "2" } }
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动注入客户端
    private RestHighLevelClient client;

    // 批量操作
    public BulkResponse bulkOperation(List<BulkOperation> operations) throws IOException {
        // 创建 BulkRequest 对象用于批量操作
        BulkRequest request = new BulkRequest();
        
        // 循环处理每个操作,将其加入 BulkRequest 请求
        operations.forEach(op -> {
            if (op.getAction().equals("index")) {
                request.add(new IndexRequest(op.getIndex())
                        .id(op.getId())  // 为索引操作设置 ID
                        .source(op.getDocument()));  // 设置文档内容
            } else if (op.getAction().equals("delete")) {
                request.add(new DeleteRequest(op.getIndex(), op.getId()));  // 删除文档操作
            }
        });
        
        // 执行批量请求,返回响应
        return client.bulk(request, RequestOptions.DEFAULT);
    }
}

 2. 多重获取 API (Multi Get API)

允许在一个请求中获取多个文档。

  • HTTP 方法:POST
  • 请求 URL:POST /_mget
  • 功能:在一个请求中获取多个文档,可以来自不同的索引。

示例:

POST /_mget
{
  "docs" : [
    { "_index" : "my-index", "_id" : "1" },
    { "_index" : "my-index", "_id" : "2" }
  ]
}
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动注入客户端
    private RestHighLevelClient client;

    // 多重获取文档
    public List<Map<String, Object>> multiGetDocuments(String index, List<String> ids) throws IOException {
        // 创建 MultiGetRequest 对象
        MultiGetRequest request = new MultiGetRequest();
        
        // 将每个文档 ID 添加到请求中
        ids.forEach(id -> request.add(new MultiGetRequest.Item(index, id)));
        
        // 执行请求,返回 MultiGetResponse 响应
        MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);

        // 处理每个文档的响应,返回其源数据
        List<Map<String, Object>> results = new ArrayList<>();
        for (MultiGetItemResponse item : response.getResponses()) {
            results.add(item.getResponse().getSource());
        }
        return results;
    }
}

 3. 重新索引 API (Reindex API)

用于从一个索引复制文档到另一个索引。

  • HTTP 方法:POST
  • 请求 URL:POST /_reindex
  • 功能:将数据从一个索引迁移到另一个索引。

示例:

POST /_reindex
{
  "source": {
    "index": "old-index"
  },
  "dest": {
    "index": "new-index"
  }
}
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动注入客户端
    private RestHighLevelClient client;

    // 重新索引方法
    public BulkByScrollResponse reindex(String sourceIndex, String destIndex) throws IOException {
        // 创建 ReindexRequest 对象,指定源索引和目标索引
        ReindexRequest request = new ReindexRequest();
        request.setSourceIndices(sourceIndex);
        request.setDestIndex(destIndex);
        
        // 执行重新索引操作,返回响应
        return client.reindex(request, RequestOptions.DEFAULT);
    }
}

 4. 通过查询 API 进行更新 (Update By Query API)

根据查询条件更新多个文档。

  • HTTP 方法:POST
  • 请求 URL:POST /<index>/_update_by_query
  • 功能:批量更新符合条件的文档。

示例:

POST /my-index/_update_by_query
{
  "script": {
    "source": "ctx._source.field += 1"
  },
  "query": {
    "term": {
      "field": "value"
    }
  }
}
@Service  // 声明服务类,用于包含业务逻辑
public class ElasticsearchService {

    @Autowired  // 自动注入 RestHighLevelClient 实例
    private RestHighLevelClient client;

    // 通过查询更新文档的方法
    public BulkByScrollResponse updateByQuery(String index, String field, String oldValue, String newValue) throws IOException {
        // 创建 UpdateByQueryRequest 对象,指定索引
        UpdateByQueryRequest request = new UpdateByQueryRequest(index);
        
        // 设置查询条件,使用 QueryBuilders 来查找匹配指定字段和值的文档
        request.setQuery(QueryBuilders.matchQuery(field, oldValue));
        
        // 使用 Script 进行更新,编写脚本来修改查询结果中的字段值
        request.setScript(new Script(
                ScriptType.INLINE,  // 脚本类型为内联脚本
                "painless",  // 脚本语言为 Painless
                "ctx._source['" + field + "'] = '" + newValue + "'",  // 脚本内容:更新指定字段的值
                Collections.emptyMap()));  // 无额外的参数传递
        
        // 执行更新操作并返回响应
        return client.updateByQuery(request, RequestOptions.DEFAULT);
    }
}

 5. 按查询删除 API (Delete By Query API)

根据查询条件删除文档。

  • HTTP 方法:POST
  • 请求 URL:POST /<index>/_delete_by_query
  • 功能:批量删除符合查询条件的文档。

示例:

POST /my-index/_delete_by_query
{
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
}
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动注入 RestHighLevelClient 实例
    private RestHighLevelClient client;

    // 按查询删除文档的方法
    public BulkByScrollResponse deleteByQuery(String index, String field, String value) throws IOException {
        // 创建 DeleteByQueryRequest 对象,指定索引
        DeleteByQueryRequest request = new DeleteByQueryRequest(index);
        
        // 设置查询条件,查找要删除的文档
        request.setQuery(QueryBuilders.matchQuery(field, value));
        
        // 执行删除操作,返回 BulkByScrollResponse 响应
        return client.deleteByQuery(request, RequestOptions.DEFAULT);
    }
}

 6. 重新节流 API (Rethrottle API)

动态调整某些任务的处理速率,例如 reindex、update_by_query 等。

  • HTTP 方法:POST
  • 请求 URL:POST /_tasks/<task_id>/_rethrottle
  • 功能:调整指定任务的吞吐量。

示例:

POST /_tasks/TASK_ID/_rethrottle?requests_per_second=1000
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动注入 RestHighLevelClient 实例
    private RestHighLevelClient client;

    // 重新节流方法,用于动态调整操作的节流速率
    public TaskSubmissionResponse rethrottleTask(String taskId, float requestsPerSecond) throws IOException {
        // 调用 Rethrottle API 来修改指定任务的节流速率
        // 通过 ReindexRethrottleRequest 对象进行节流
        ReindexRethrottleRequest request = new ReindexRethrottleRequest(taskId, requestsPerSecond);
        
        // 执行请求,并返回响应
        return client.reindexRethrottle(request, RequestOptions.DEFAULT);
    }
}

 7. 多项向量 API (Multi Term Vectors API)

允许在一个请求中为多个文档返回词项向量。

  • HTTP 方法:POST
  • 请求 URL:POST /<index>/_mtermvectors
  • 功能:获取多个文档的词向量信息。

示例:

POST /my-index/_mtermvectors
{
  "docs": [
    { "_id": "1" },
    { "_id": "2" }
  ]
}
@Service  // 声明服务类
public class ElasticsearchService {

    @Autowired  // 自动注入 RestHighLevelClient 实例
    private RestHighLevelClient client;

    // 获取多个文档的词向量方法
    public List<TermVectorsResponse> multiTermVectors(String index, List<String> ids) throws IOException {
        // 创建 MultiTermVectorsRequest 对象
        MultiTermVectorsRequest request = new MultiTermVectorsRequest();
        
        // 循环将每个文档 ID 加入请求中
        ids.forEach(id -> request.add(new TermVectorsRequest(index, id)));
        
        // 执行请求,返回 MultiTermVectorsResponse 响应
        MultiTermVectorsResponse response = client.mtermvectors(request, RequestOptions.DEFAULT);
        
        // 将每个 TermVectorsResponse 提取出来
        List<TermVectorsResponse> termVectorsResponses = new ArrayList<>();
        response.forEach(termVectorsResponses::add);
        
        // 返回所有文档的词向量
        return termVectorsResponses;
    }
}