RestAPI
Elasticsearch(ES)官方提供了多种语言的客户端库,用于与Elasticsearch进行交互。这些客户端库的主要功能是帮助开发者更方便地构建和发送DSL(Domain Specific Language)查询语句,并通过HTTP请求与Elasticsearch集群进行通信。
官方文档地址:
https://www.elastic.co/guide/en/elasticsearch/client/index.html
初始化RestClient
Maven 配置
以下是使用 maven 作为依赖项管理器配置依赖项的方法。将以下内容添加到pom.xml文件中:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.12.1</version>
</dependency>
application.properties
# 定义应用程序的名称,用于服务发现、健康检查等
spring.application.name=demo
# 配置Elasticsearch服务器的地址
spring.elasticsearch.uris=http://localhost:9201
# 配置Elasticsearch的用户名,用于身份验证
spring.elasticsearch.username=elastic
# 配置Elasticsearch的密码,用于身份验证
spring.elasticsearch.password=changeme
Elasticsearch配置类
package com.example.demo.Config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Elasticsearch配置类,用于配置和创建Elasticsearch客户端
*/
@Configuration
public class ElasticsearchConfig {
/**
* 创建并配置RestHighLevelClient客户端实例
* 该方法将在Spring容器中注册一个RestHighLevelClient类型的Bean,供应用程序使用
*
* @return 返回配置好的RestHighLevelClient实例
*/
@Bean
public RestHighLevelClient client() {
// 使用RestClient.builder方法构建一个HttpHost对象,指定Elasticsearch服务器的地址和端口
// 这里连接的是本地的Elasticsearch服务器,端口为9201,使用HTTP协议
return new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9201, "http"))
);
}
}
为了单元测试方便,我们创建一个测试类
package com.example.demo;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.MainResponse;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@SpringBootTest
public class ElasticsearchConnectionTest {
@Autowired
private RestHighLevelClient client;
@Test
public void testElasticsearchConnection() throws IOException {
MainResponse response = client.info(RequestOptions.DEFAULT);
assertNotNull(response);
System.out.println("Elasticsearch集群名称: " + response.getClusterName());
System.out.println("Elasticsearch节点名称: " + response.getNodeName());
}
}
创建、删除索引库
先创建ElasticsearchIndexService类
package com.example.demo.Service;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* Elasticsearch索引服务类
* 提供创建和删除索引的功能
*/
@Service
public class ElasticsearchIndexService {
@Autowired
private RestHighLevelClient client;
/**
* 创建索引
* 如果索引不存在,则创建一个新的索引
*
* @param indexName 索引名称
*/
public void createIndex(String indexName) {
try {
// 检查索引是否存在
if (!indexExists(indexName)) {
// 创建索引请求
CreateIndexRequest request = new CreateIndexRequest(indexName);
// 发送创建索引请求
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
// 判断索引是否成功创建
if (createIndexResponse.isAcknowledged()) {
System.out.println("索引库创建成功: " + indexName);
} else {
System.out.println("索引库创建失败: " + indexName);
}
} else {
// 如果索引已存在,打印提示信息
System.out.println("索引库已存在: " + indexName);
}
} catch (IOException e) {
// 捕获并打印异常信息
e.printStackTrace();
}
}
/**
* 删除索引
* 如果索引存在,则删除该索引
*
* @param indexName 索引名称
*/
public void deleteIndex(String indexName) {
try {
// 检查索引是否存在
if (indexExists(indexName)) {
// 创建删除索引请求
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
// 发送删除索引请求
AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
// 判断索引是否成功删除
if (deleteIndexResponse.isAcknowledged()) {
System.out.println("索引库删除成功: " + indexName);
} else {
System.out.println("索引库删除失败: " + indexName);
}
} else {
// 如果索引不存在,打印提示信息
System.out.println("索引库不存在: " + indexName);
}
} catch (IOException e) {
// 捕获并打印异常信息
e.printStackTrace();
}
}
/**
* 检查索引是否存在
*
* @param indexName 索引名称
* @return true如果索引存在,否则返回false
*/
public boolean indexExists(String indexName) {
// 创建一个获取索引请求对象,并传入索引名称
GetIndexRequest request = new GetIndexRequest(indexName);
try {
// 调用Elasticsearch客户端的indices().exists()方法,传入请求对象和请求选项,返回索引是否存在的布尔值
return client.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
// 捕获并打印异常信息
e.printStackTrace();
// 如果发生异常,则认为索引不存在,返回false
return false;
}
}
}
在Spring Boot应用程序中,可以通过调用这些服务方法来操作Elasticsearch索引库。
package com.example.demo.Component;
import com.example.demo.Service.ElasticsearchIndexService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class ElasticsearchRunner implements CommandLineRunner {
@Autowired
private ElasticsearchIndexService elasticsearchIndexService;
@Override
public void run(String... args) throws Exception {
String indexName = "my_index";
// 创建索引
elasticsearchIndexService.createIndex(indexName);
// 判断索引是否存在
boolean exists = elasticsearchIndexService.indexExists(indexName);
System.out.println("索引库是否存在: " + exists);
// 删除索引
elasticsearchIndexService.deleteIndex(indexName);
}
}
RestClient操作文档
单一文档 API
1. 索引 API (Index API)
用于将单一文档添加或更新到索引中。
- HTTP 方法:PUT 或 POST
- 请求 URL:PUT /<index>/_doc/<id> 或 POST /<index>/_doc/
- 功能:如果提供了 ID,则更新文档;如果没有 ID,则创建一个新文档。
示例:
PUT /my-index/_doc/1
{
"user": "kimchy",
"message": "trying out Elasticsearch"
}
@Service // 注解表明该类是一个 Spring 服务类,用于业务逻辑处理
public class ElasticsearchService {
@Autowired // 自动装配 RestHighLevelClient 实例
private RestHighLevelClient client;
// 索引文档方法
public String indexDocument(String index, String id, Map<String, Object> document) throws IOException {
// 创建 IndexRequest,指定索引名称和文档 ID,并设置要存储的文档内容
IndexRequest request = new IndexRequest(index)
.id(id) // 设置文档 ID
.source(document); // 设置文档源数据(一个 Map 对象)
// 使用 client 对象调用 index 方法,将请求发送到 Elasticsearch
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
// 返回创建后的文档 ID
return response.getId();
}
}
2. 获取 API (Get API)
用于通过文档 ID 获取文档。
- HTTP 方法:GET
- 请求 URL:GET /<index>/_doc/<id>
- 功能:返回包含文档的 JSON 数据。
示例:
GET /my-index/_doc/1
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动注入客户端
private RestHighLevelClient client;
// 获取文档方法
public Map<String, Object> getDocument(String index, String id) throws IOException {
// 创建 GetRequest 请求,指定索引和文档 ID
GetRequest request = new GetRequest(index, id);
// 执行请求,返回 GetResponse 对象
GetResponse response = client.get(request, RequestOptions.DEFAULT);
// 返回文档内容 (source),即存储在文档中的数据
return response.getSource();
}
}
3. 获取源 API (Get Source API)
仅返回文档的 _source 字段,而不包含元数据。
- HTTP 方法:GET
- 请求 URL:GET /<index>/_source/<id>
- 功能:返回文档的 _source 部分,省略其他信息。
示例:
GET /my-index/_source/1
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动装配客户端
private RestHighLevelClient client;
// 获取文档源数据
public Map<String, Object> getDocumentSource(String index, String id) throws IOException {
// 创建 GetSourceRequest,指定索引和文档 ID
GetSourceRequest request = new GetSourceRequest(index, id);
// 执行请求,返回源数据 (即文档内容)
return client.getSource(request, RequestOptions.DEFAULT);
}
}
4. 存在 API (Exists API)
检查文档是否存在。
- HTTP 方法:HEAD
- 请求 URL:HEAD /<index>/_doc/<id>
- 功能:如果文档存在返回 200 状态码,否则返回 404。
示例:
HEAD /my-index/_doc/1
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动注入客户端
private RestHighLevelClient client;
// 检查文档是否存在
public boolean documentExists(String index, String id) throws IOException {
// 创建 GetRequest 请求,但不获取文档源数据
GetRequest request = new GetRequest(index, id);
request.fetchSourceContext(new FetchSourceContext(false)); // 禁止返回文档源
request.storedFields("_none_"); // 不返回任何存储字段
// 调用 exists 方法检查文档是否存在
return client.exists(request, RequestOptions.DEFAULT);
}
}
5. 删除 API (Delete API)
用于删除指定 ID 的文档。
- HTTP 方法:DELETE
- 请求 URL:DELETE /<index>/_doc/<id>
- 功能:删除指定文档,如果不存在则返回 404。
示例:
DELETE /my-index/_doc/1
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动装配客户端
private RestHighLevelClient client;
// 删除文档方法
public String deleteDocument(String index, String id) throws IOException {
// 创建 DeleteRequest 请求,指定索引和文档 ID
DeleteRequest request = new DeleteRequest(index, id);
// 执行请求,返回 DeleteResponse 响应
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
// 返回删除结果(比如 "DELETED")
return response.getResult().name();
}
}
6. 更新 API (Update API)
部分更新文档,允许你只更新部分字段而不是替换整个文档。
- HTTP 方法:POST
- 请求 URL:POST /<index>/_update/<id>
- 功能:可以通过脚本或部分字段更新文档。
示例:
POST /my-index/_update/1
{
"doc": {
"message": "updated message"
}
}
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动装配客户端
private RestHighLevelClient client;
// 更新文档
public String updateDocument(String index, String id, Map<String, Object> updates) throws IOException {
// 创建 UpdateRequest 请求,指定索引和文档 ID,并设置更新的内容
UpdateRequest request = new UpdateRequest(index, id)
.doc(updates); // 使用 doc 方法更新指定字段
// 执行更新请求,返回 UpdateResponse 响应
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
// 返回更新结果
return response.getResult().name();
}
}
7. 词向量 API (Term Vectors API)
用于返回文档的词项向量,包括词项频率、词频逆文档频率等。
- HTTP 方法:GET
- 请求 URL:GET /<index>/_termvectors/<id>
- 功能:分析文档并返回词项的详细统计数据。
示例:
GET /my-index/_termvectors/1
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动注入客户端
private RestHighLevelClient client;
// 获取词项向量
public TermVectorsResponse getTermVectors(String index, String id) throws IOException {
// 创建 TermVectorsRequest 请求,指定索引和文档 ID
TermVectorsRequest request = new TermVectorsRequest(index, id);
// 执行请求,返回 TermVectorsResponse 响应
return client.termvectors(request, RequestOptions.DEFAULT);
}
}
多文档 API
1. 批量 API (Bulk API)
允许你在一次请求中执行多次操作(如创建、更新、删除)。
- HTTP 方法:POST
- 请求 URL:POST /_bulk
- 功能:在一个请求中进行批量的索引、删除、更新操作。
示例:
POST /_bulk
{ "index" : { "_index" : "my-index", "_id" : "1" } }
{ "user" : "kimchy", "message" : "trying out Elasticsearch" }
{ "delete" : { "_index" : "my-index", "_id" : "2" } }
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动注入客户端
private RestHighLevelClient client;
// 批量操作
public BulkResponse bulkOperation(List<BulkOperation> operations) throws IOException {
// 创建 BulkRequest 对象用于批量操作
BulkRequest request = new BulkRequest();
// 循环处理每个操作,将其加入 BulkRequest 请求
operations.forEach(op -> {
if (op.getAction().equals("index")) {
request.add(new IndexRequest(op.getIndex())
.id(op.getId()) // 为索引操作设置 ID
.source(op.getDocument())); // 设置文档内容
} else if (op.getAction().equals("delete")) {
request.add(new DeleteRequest(op.getIndex(), op.getId())); // 删除文档操作
}
});
// 执行批量请求,返回响应
return client.bulk(request, RequestOptions.DEFAULT);
}
}
2. 多重获取 API (Multi Get API)
允许在一个请求中获取多个文档。
- HTTP 方法:POST
- 请求 URL:POST /_mget
- 功能:在一个请求中获取多个文档,可以来自不同的索引。
示例:
POST /_mget
{
"docs" : [
{ "_index" : "my-index", "_id" : "1" },
{ "_index" : "my-index", "_id" : "2" }
]
}
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动注入客户端
private RestHighLevelClient client;
// 多重获取文档
public List<Map<String, Object>> multiGetDocuments(String index, List<String> ids) throws IOException {
// 创建 MultiGetRequest 对象
MultiGetRequest request = new MultiGetRequest();
// 将每个文档 ID 添加到请求中
ids.forEach(id -> request.add(new MultiGetRequest.Item(index, id)));
// 执行请求,返回 MultiGetResponse 响应
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
// 处理每个文档的响应,返回其源数据
List<Map<String, Object>> results = new ArrayList<>();
for (MultiGetItemResponse item : response.getResponses()) {
results.add(item.getResponse().getSource());
}
return results;
}
}
3. 重新索引 API (Reindex API)
用于从一个索引复制文档到另一个索引。
- HTTP 方法:POST
- 请求 URL:POST /_reindex
- 功能:将数据从一个索引迁移到另一个索引。
示例:
POST /_reindex
{
"source": {
"index": "old-index"
},
"dest": {
"index": "new-index"
}
}
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动注入客户端
private RestHighLevelClient client;
// 重新索引方法
public BulkByScrollResponse reindex(String sourceIndex, String destIndex) throws IOException {
// 创建 ReindexRequest 对象,指定源索引和目标索引
ReindexRequest request = new ReindexRequest();
request.setSourceIndices(sourceIndex);
request.setDestIndex(destIndex);
// 执行重新索引操作,返回响应
return client.reindex(request, RequestOptions.DEFAULT);
}
}
4. 通过查询 API 进行更新 (Update By Query API)
根据查询条件更新多个文档。
- HTTP 方法:POST
- 请求 URL:POST /<index>/_update_by_query
- 功能:批量更新符合条件的文档。
示例:
POST /my-index/_update_by_query
{
"script": {
"source": "ctx._source.field += 1"
},
"query": {
"term": {
"field": "value"
}
}
}
@Service // 声明服务类,用于包含业务逻辑
public class ElasticsearchService {
@Autowired // 自动注入 RestHighLevelClient 实例
private RestHighLevelClient client;
// 通过查询更新文档的方法
public BulkByScrollResponse updateByQuery(String index, String field, String oldValue, String newValue) throws IOException {
// 创建 UpdateByQueryRequest 对象,指定索引
UpdateByQueryRequest request = new UpdateByQueryRequest(index);
// 设置查询条件,使用 QueryBuilders 来查找匹配指定字段和值的文档
request.setQuery(QueryBuilders.matchQuery(field, oldValue));
// 使用 Script 进行更新,编写脚本来修改查询结果中的字段值
request.setScript(new Script(
ScriptType.INLINE, // 脚本类型为内联脚本
"painless", // 脚本语言为 Painless
"ctx._source['" + field + "'] = '" + newValue + "'", // 脚本内容:更新指定字段的值
Collections.emptyMap())); // 无额外的参数传递
// 执行更新操作并返回响应
return client.updateByQuery(request, RequestOptions.DEFAULT);
}
}
5. 按查询删除 API (Delete By Query API)
根据查询条件删除文档。
- HTTP 方法:POST
- 请求 URL:POST /<index>/_delete_by_query
- 功能:批量删除符合查询条件的文档。
示例:
POST /my-index/_delete_by_query
{
"query": {
"term": {
"user": "kimchy"
}
}
}
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动注入 RestHighLevelClient 实例
private RestHighLevelClient client;
// 按查询删除文档的方法
public BulkByScrollResponse deleteByQuery(String index, String field, String value) throws IOException {
// 创建 DeleteByQueryRequest 对象,指定索引
DeleteByQueryRequest request = new DeleteByQueryRequest(index);
// 设置查询条件,查找要删除的文档
request.setQuery(QueryBuilders.matchQuery(field, value));
// 执行删除操作,返回 BulkByScrollResponse 响应
return client.deleteByQuery(request, RequestOptions.DEFAULT);
}
}
6. 重新节流 API (Rethrottle API)
动态调整某些任务的处理速率,例如 reindex、update_by_query 等。
- HTTP 方法:POST
- 请求 URL:POST /_tasks/<task_id>/_rethrottle
- 功能:调整指定任务的吞吐量。
示例:
POST /_tasks/TASK_ID/_rethrottle?requests_per_second=1000
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动注入 RestHighLevelClient 实例
private RestHighLevelClient client;
// 重新节流方法,用于动态调整操作的节流速率
public TaskSubmissionResponse rethrottleTask(String taskId, float requestsPerSecond) throws IOException {
// 调用 Rethrottle API 来修改指定任务的节流速率
// 通过 ReindexRethrottleRequest 对象进行节流
ReindexRethrottleRequest request = new ReindexRethrottleRequest(taskId, requestsPerSecond);
// 执行请求,并返回响应
return client.reindexRethrottle(request, RequestOptions.DEFAULT);
}
}
7. 多项向量 API (Multi Term Vectors API)
允许在一个请求中为多个文档返回词项向量。
- HTTP 方法:POST
- 请求 URL:POST /<index>/_mtermvectors
- 功能:获取多个文档的词向量信息。
示例:
POST /my-index/_mtermvectors
{
"docs": [
{ "_id": "1" },
{ "_id": "2" }
]
}
@Service // 声明服务类
public class ElasticsearchService {
@Autowired // 自动注入 RestHighLevelClient 实例
private RestHighLevelClient client;
// 获取多个文档的词向量方法
public List<TermVectorsResponse> multiTermVectors(String index, List<String> ids) throws IOException {
// 创建 MultiTermVectorsRequest 对象
MultiTermVectorsRequest request = new MultiTermVectorsRequest();
// 循环将每个文档 ID 加入请求中
ids.forEach(id -> request.add(new TermVectorsRequest(index, id)));
// 执行请求,返回 MultiTermVectorsResponse 响应
MultiTermVectorsResponse response = client.mtermvectors(request, RequestOptions.DEFAULT);
// 将每个 TermVectorsResponse 提取出来
List<TermVectorsResponse> termVectorsResponses = new ArrayList<>();
response.forEach(termVectorsResponses::add);
// 返回所有文档的词向量
return termVectorsResponses;
}
}