Elasticsearch的CRUD:REST与Java API

时间:2022-06-29 13:08:25

CRUD(Create, Retrieve, Update, Delete)是数据库系统的四种基本操作,分别表示创建、查询、更改、删除,俗称“增删改查”。Elasticsearch作为NoSQL数据库(虽然ES是为搜索引擎而生的,但我更愿意将其看作带有强大文本搜索功能的NoSQL)。

以下示例基于Elasticsearch 2.4版本。

Create

在默认情况下,ES的REST接口的端口号为9200,对接Java client的端口号为9300。

Create操作为向index中索引文档,若index不存在则ES会自动创建;

$ curl -XPUT 'http://localhost:9200/twitter/tweet/1' -d '{<json data>}'

Java API("org.elasticsearch" % "elasticsearch" % "2.4.1")通过TransportClient与ES集群连接,CRUD操作便是基于此而实现的。

final Settings settings = Settings.settingsBuilder()
.put("client.transport.sniff", true)
.put("client.transport.ping_timeout", 20, TimeUnit.SECONDS)
.put("client", true)
.put("data", false)
.put("cluster.name", "<cluster name>")
.build(); Client client = TransportClient.builder()
.settings(settings).build()
.addTransportAddresses(
new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));

Index Java API创建index或索引document:

import org.elasticsearch.action.index.IndexResponse;

IndexResponse response = client.prepareIndex("twitter", "tweet")
.setSource(documentJson)
.get();

Retrieve

ES的查询DSL大致可以分为两种:

  • Query DSL,主要配合bool、match等使用,相当于SQL中的where子句;
  • Aggregations,相当于SQL中的group by部分,细分为如下三类
  1. Bucketing,聚合函数只能是count(*),表示的是doc命中数,可以嵌套子aggs;
  2. Metric,相比于Bucketing其非常灵活,可配合avgmaxsum等聚合函数,但是不能嵌套子aggs;
  3. Pipeline,以其他aggs的结果作为输入,而不是直接在文档集合上进行操作。

ES的Query DSL功能实在是强大,在本文短短的篇幅中很难阐述完全,故只列举了两个简单实例。在以前的项目中,我使用过1.7版本ES,后来发现2.0.0-beta1版本及之后DSL语法发生很大的变化,比如filteredandor等被废弃掉了,而被bool取而代之;对应的Java API支持链式操作,与Java 8配合写起来非常舒服。

REST通过_search接口进行DSL查询:

$ curl -XGET 'localhost:9200/<index>/_search?pretty' -d'{<dsl>}'

实战List<List<String>> idsList作为过滤条件,其中内一层为and关系、内二层为or关系;然后多字段(为bucketSizeMap的key)aggs,Java 8实现:

BoolQueryBuilder mustQueryBuilder = boolQuery();
if (!(idsList.size() == 1 && idsList.get(0).isEmpty())) {
mustQueryBuilder = idsList.stream().reduce(
boolQuery(),
(mustQB, ids) -> {
BoolQueryBuilder shouldQB = ids.stream().reduce(boolQuery(),
(qb, id) -> qb.should(termQuery(SearchSystem.getEsType(id, idMap), id)),
BoolQueryBuilder::should);
return mustQB.must(shouldQB);
},
BoolQueryBuilder::must);
}
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(mustQueryBuilder); for (Map.Entry<String, Integer> entry : bucketSizeMap.entrySet()) {
AggregationBuilder aggregationBuilder = AggregationBuilders
.terms(entry.getKey())
.field(entry.getKey()).size(entry.getValue());
searchRequestBuilder.addAggregation(aggregationBuilder);
}
SearchResponse response = searchRequestBuilder.execute().actionGet();

Bucket Aggregations支持filter aggs,即满足过滤条件后做aggs,

aggs:
<aggs_name>:
filter:
aggs:

其与filter query + aggs在功能上是等价的,

query:
bool:
filter:
aggs:

但是,经测试发现filter query + aggs是比filter aggs查询要快。

Update

update为document级别的操作,即仅支持对某个具体document进行更新;REST通过_update接口:

$ curl -XPOST 'localhost:9200/<_index>/<_type>/<_id>/_update' -d '{<data>}'

Java API则有两种实现方式:UpdateRequest + updateprepareUpdate

// case 1
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get(); // case 2
client.prepareUpdate("ttl", "doc", "1")
.setDoc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.get();

Delete

delete操作通常都伴随着检查index是否存在(exist),exist的RESTful接口与Java API分别如下:

$ curl -XHEAD -i 'http://localhost:9200/twitter'
client.admin().indices()
.prepareExists(indexName)
.execute().actionGet().isExists();

ES提供了三种粗细粒度的删除操作:

  • 删除整个index;
  • 删除index中某一type;
  • 删除特定的document.

RESTful接口:

-- delete complete index
$ curl -XDELETE 'http://localhost:9200/<indexname>'
-- delete a type in index
$ curl -XDELETE 'http://localhost:9200/<indexname>/<typename>'
-- delete a particular document
$ curl -XDELETE 'http://localhost:9200/<indexname>/<typename>/<documentId>

Java API实现:

// delete complete index
client.admin().indices().delete(new DeleteIndexRequest("<indexname>")).actionGet(); // delete a type in index
client.prepareDelete().setIndex("<indexname>").setType("<typename>").setId("*").execute().actionGet(); // delete a particular document
client.prepareDelete().setIndex("<indexname>").setType("<typename>").setId("<documentId>").execute().actionGet(); // or
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
.execute()
.actionGet();