1.Index API: 创建并建立索引
PUT twitter/tweet/
{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}
官方文档参考:Index API。
2.Get API: 获取文档
curl -XGET 'http://localhost:9200/twitter/tweet/1'
官方文档参考:Get API。
3.DELETE API: 删除文档
$ curl -XDELETE 'http://localhost:9200/twitter/tweet/1'
官方文档参考:Delete API。
4.UPDATE API: 更新文档
PUT test/type1/1{ "counter" : 1, "tags" : ["red"]}
官方文档参考:Update API。
5.Multi Get API: 一次批量获取文档
PUT 'localhost:9200/_mget
{
"docs" :
[
{"_index" : "test",
"_type" : "type",
"_id" : ""
},
{ "_index" : "test",
"_type" : "type",
"_id" : ""
}
]
}
官方文档参考:Multi Get API。
6.Bulk API: 批量操作,增删改查
1.本地文件批量操作
e$ curl -s -XPOST localhost:/blog/user/_bulk --data-binary @requests
requests文件内容如下
{"index":{"_id":""}}
{"name":"黎明","id":}
{"index":{"_id":""}}
{"name":"小明","id":}
{"index":{"_id":""}}
{"name":"雄安","id":}
{"index":{"_id":""}}
{"name":"笑话","id":}
2.resp 方法
curl -H "Content-Type: application/json" -XPOST 'http://47.52.199.51:9200/book/english/_bulk' -d'
{"index":{"_id":""}}
{"name":"cddd","id":}
{"index":{"_id":""}}
{"name":"cddd","id":}
{"index":{"_id":""}}
{"name":"cddd","id":}
{"index":{"_id":""}}
{"name":"cddd","id":}
'
官方文档参考:Bulk API。
7.DELETE By Query API: 查询删除
POST /book/_delete_by_query
{
"query":{
"match":{
"name": "yangxioa"
}
}
}
7.1.删除所有
POST /book/_delete_by_query
{
"query":{
"match_all":{}
}
}
7.2.支持路由查询(routing=XXX,匹配分片数)
POST twitter/_delete_by_query?routing=
{
"query": {
"range" : {
"age" : {
"gte" :
}
}
}
}
{
"took" : , // 整个操作从开始到结束的毫秒数
"timed_out": false, // true如果在通过查询执行删除期间执行的任何请求超时 ,则将此标志设置为。
"total": , // 已成功处理的文档数。
"deleted": , // 已成功删除的文档数。
"batches": , // 通过查询删除拉回的滚动响应数。
"version_conflicts": , // 按查询删除的版本冲突数。
"noops": , // 对于按查询删除,此字段始终等于零。它只存在,以便通过查询删除,按查询更新和reindex API返回具有相同结构的响应。
"retries": { // 通过查询删除尝试的重试次数。bulk是重试的批量操作search的数量,是重试的搜索操作的数量。
"bulk": ,
"search":
},
"throttled_millis": , // 请求睡眠符合的毫秒数requests_per_second。
"requests_per_second": -1.0, // 在通过查询删除期间有效执行的每秒请求数。
"throttled_until_millis": , //在按查询响应删除时,此字段应始终等于零。它只在使用Task API时有意义,它指示下一次(自纪元以来的毫秒数),为了符合,将再次执行受限制的请求
"failures" : [ ]
//如果在此过程中存在任何不可恢复的错误,则会出现故障数组。如果这是非空的,那么请求因为那些失败而中止。逐个查询是使用批处理实现的,
任何故障都会导致整个进程中止,但当前批处理中的所有故障都会被收集到数组中。您可以使用该conflicts选项来防止reindex在版本冲突中中止。
}
官方文档参考:Delete By Query API。
8.update更新api
8.1.脚本更新
POST test/_doc//_update
{
"script" : {
"source": "ctx._source.counter += params.count",
"lang": "painless",// ES语言类型
"params" : {
"count" :
}
}
}
8.2.新增字段
POST test/_doc//_update
{
"script" : "ctx._source.new_field = 'value_of_new_field'"
}
8.3.删除字段
POST test/_doc//_update
{
"script" : "ctx._source.remove('new_field')"
}
8.4.存在就更新
POST test/_doc//_update
{
"script" : {
"source": "if (ctx._source.tags.contains(params.tag)) { ctx.op = 'delete' } else { ctx.op = 'none' }",
"lang": "painless",
"params" : {
"tag" : "green"
}
}
}
8.5.更新部分字段
POST test/_doc//_update
{
"doc" : {
"name" : "new_name"
}
}
8.6.upsert:存在就更新,不存在插入
POST test/_doc//_update
{
"script" : {
"source": "ctx._source.counter += params.count",
"lang": "painless",
"params" : {
"count" :
}
},
"upsert" : {
"counter" :
}
}
官方文档参考:Update 脚本更新API
9.UPDATE BY QUERY API:查询更新
9.1.更新,重新索引
POST twitter/_update_by_query?conflicts=proceed
{
"took" : ,
"timed_out": false,
"updated": ,
"deleted": ,
"batches": ,
"version_conflicts": ,
"noops": ,
"retries": {
"bulk": ,
"search":
},
"throttled_millis": ,
"requests_per_second": -1.0,
"throttled_until_millis": ,
"total": ,
"failures" : [ ]
}
ES内部自带实现乐观锁控制,先查询出要更新的记录的版本号,更新时匹配版本号时候一致。
所有更新和查询失败都会导致_update_by_query
中止并failures
在响应中返回。已执行的更新仍然存在。换句话说,该过程不会回滚,只会中止。当第一个失败导致中止时,失败的批量请求返回的所有失败都将在failures
元素中返回; 因此,可能存在相当多的失败实体。
如果您只想计算版本冲突,不要导致_update_by_query
中止,您可以conflicts=proceed
在URL或"conflicts": "proceed",改配置当第一个冲突时会会继续执行,
version_conflicts冲突数量。
9.2.查询更新
POST twitter/_update_by_query?conflicts=proceed
{
"query": {
"term": {
"user": "kimchy"
}
}
}
9.3.查询脚本更新
POST twitter/_update_by_query
{
"script": {
"source": "ctx._source.likes++",
"lang": "painless"
},
"query": {
"term": {
"user": "kimchy"
}
}
}
也可以同时在多个索引和多个类型上完成这一切,就像搜索API一样:
POST twitter,blog / _doc,post / _update_by_query
routing
则路由将复制到滚动查询,将进程限制为与该路由值匹配的分片:
POST twitter/_update_by_query?routing=
默认情况下,_update_by_query
使用1000的滚动批次。可以使用scroll_size
URL参数更改批量大小:
POST twitter/_update_by_query?scroll_size=
9.4.使用TASK API获取所有正在运行的逐个查询请求的状态
GET _tasks?detailed=true&actions=*byquery
结果:
{
"nodes" : {
"r1A2WoRbTwKZ516z6NEs5A" : {
"name" : "r1A2WoR",
"transport_address" : "127.0.0.1:9300",
"host" : "127.0.0.1",
"ip" : "127.0.0.1:9300",
"attributes" : {
"testattr" : "test",
"portsfile" : "true"
},
"tasks" : {
"r1A2WoRbTwKZ516z6NEs5A:36619" : {
"node" : "r1A2WoRbTwKZ516z6NEs5A",
"id" : ,
"type" : "transport",
"action" : "indices:data/write/update/byquery",
"status" : {
"total" : ,
"updated" : ,
"created" : ,
"deleted" : ,
"batches" : ,
"version_conflicts" : ,
"noops" : ,
"retries": {
"bulk": ,
"search":
}
"throttled_millis":
},
"description" : ""
}
}
}
}
}
使用任务ID,您可以直接查找任务:
GET /_tasks/taskId:
可以使用任务取消API取消任何按查询更新:
POST _tasks/task_id:/_cancel
手动切片:
POST twitter/_update_by_query
{
"slice": {
"id": ,
"max":
},
"script": {
"source": "ctx._source['extra'] = 'test'"
}
}
官方文档参考:Update By Query API
10.Reindex API:重新索引
10.1.复制整个索引
最基本的形式_reindex
只是将文档从一个索引复制到另一个索引。这会将twitter
索引中的文档复制到new_twitter
索引中(前提是要有相同的索引类型):
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}
10.2.复制匹配的文档
POST _reindex
{
"source": {
"index": "twitter",
"type": "_doc",
"query": {
"term": {
"user": "kimchy"
}
}
},
"dest": {
"index": "new_twitter"
}
}
10.3.复制多个索引文档
POST _reindex
{
"source": {
"index": ["book", "blog"],
"type": ["english", "user"]
},
"dest": {
"index": "book1"
}
}
ES 6.3只支持一个索引一个类型,所以上面这个并没有实验成功!提示:
"reason": "Rejecting mapping update to [book1] as the final mapping would have more than 1 type: [english, user]"
10.4.是否覆盖版本号
POST reindex
{
"source": {
"index": ["book"],
"type": ["english"]
},
"dest": {
"index": "book1",
"version_type":"external"
}
}
“external”:表示使用source的版本号覆盖dest的版本号,当source的版本号<=dest的版本号会提示冲突,“internal”:表示保持dest的版本号自增。
10.5.只复制不存在的记录,已经存在的记录提示冲突
POST _reindex
{
"source": {
"index": ["book"],
"type": ["english"]
},
"dest": {
"index": "book1",
"op_type": "create"
}
}
默认情况下,版本冲突会中止该_reindex
过程,但可以通过"conflicts": "proceed"
请求正文中的设置对它们进行计数
10.6.排序复制指定数量
POST _reindex
{
"size":,
"source": {
"index": ["book"],
"sort": { "name": "desc" }
},
"dest": {
"index": "book1",
"op_type": "create"
}
}
如果报错禁止排序:Fielddata is disabled on text fields by...
聚合这些操作用单独的数据结构(fielddata)缓存到内存里了,需要单独开启:
PUT book/_mapping/english {
"properties": {
"name": {
"type": "text",
"fielddata": true
}
}
}
10.7.复制部分字段
POST _reindex
{"source": {
"index": "book",
"_source": ["age", "name"]
},
"dest": {
"index": "book1"
}
}
10.8.过滤修改元数据再复制
POST _reindex
{
"size":,
"source": {
"index": "book",
"_source": ["age", "name"]
},
"dest": {
"index": "book1",
"routing": "=age" // 根据age进行路由
},
"script": {
"source": "if (ctx._source.age == 12) {ctx._source.age++}",
"lang": "painless"
}
}
就像在_update_by_query
,您可以设置ctx.op
更改在目标索引上执行的操作:
noop
- 设置
ctx.op = "noop"
脚本是否确定不必在目标索引中编制索引。这种无操作将noop
在响应机构的计数器中报告。 delete
-
ctx.op = "delete"
如果脚本确定必须从目标索引中删除文档,请进行 设置 。
10.9.从远程复制
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200",
"username": "user",
"password": "pass"
},
"index": "source",
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest"
}
}
10.10.查看重建索引任务
GET _tasks?detailed=true&actions=*reindex
官方文档参考:Reindex API
11.term Vectors:分词api
11.1. term的基本信息
# term_freq:在在该字段中的频率 # position:词在该字段中的位置 # start_offset:从什么偏移量开始的 # end_offset: 到什么偏移量结束
11.2 term的统计信息
如果启用了term的统计信息,即term_statistics设为true,那么有哪些统计信息呢?
# doc_freq: 该词在文档中出现的频率 # ttf:total term frequency的缩写,一个term在所有document中出现的频率
11.3字段的统计信息
如果启用了字段统计信息,即field_statistics设为true,那么有哪些统计信息呢? # sum_doc_freq: 一个字段中所有term的文档频率之和 # doc_count: 有多少个文档包含这个字段 # sum_ttf:sum total term frequency的缩写,一个字段中的每一个term的在所有文档出现之和 term statistics和field statistics并不精准,不会被考虑有的doc可能被删除了
11.5采集term信息的方式
采集term信息的方式有两种:index-time(从已经存储的索引中查看) 和 query-time(及时生成)
11.6 index-time方式
需要在mapping配置一下,然后建立索引的时候,就直接生成这些词条和文档的统计信息
PUT /website { "mappings": { "article":{ "properties":{ "text":{ "type": "text", "term_vector": "with_positions_offsets", "store": "true", "analyzer" : "fulltext" } } } }, "settings": { "analysis": { "analyzer": { "fulltext":{ "type": "custom", "tokenizer": "whitespace", "filter": [ "lowercase", "type_as_payload" ] } } } } }
11.7 query-time方式
即之前没有在mapping里配置过,而是通过查询的方式产生这些统计信息
POST /ecommerce/music//_termvectors { "fields":["desc"], "offsets":true, "payloads":true, "positions":true, "term_statistics":true, "field_statistics" : true }
11.8 手动指定analyzer来生成termvector
我么可以通过指定per_field_analyzer设置一个分词器对该字段文本进行分词。
POST /ecommerce/music//_termvectors { "fields":["desc"], "offsets":true, "payloads":true, "positions":true, "term_statistics":true, "field_statistics" : true, "per_field_analyzer":{ "text":"standard" } }
11.9 在线文档及时生成termvector
POST book/english/_termvectors
{
"doc" : {
"name" : "hellow word",
"text" : "twitter test test test"
},
"fields": ["name"],
"per_field_analyzer" : {
"name":"standard"
}
}
response
{
"_index": "book",
"_type": "english",
"_version": ,
"found": true,
"took": ,
"term_vectors": {
"name": {
"field_statistics": {
"sum_doc_freq": ,
"doc_count": ,
"sum_ttf":
},
"terms": {
"hellow": {
"term_freq": ,
"tokens": [
{
"position": ,
"start_offset": ,
"end_offset":
}
]
},
"word": {
"term_freq": ,
"tokens": [
{
"position": ,
"start_offset": ,
"end_offset":
}
]
}
}
}
}
}
11.10 term的统计信息
我们可以根据term的统计信息,过滤出我么想看的统计结果,比如过滤掉一些出现频率过低的term,比如我要过滤出该字段最多只有10个term,而且那些term在该字段中出现的频率为2,且
POST /ecommerce/music//_termvectors { "fields":["desc"],
"offsets":true,
"payloads":true,
"positions":true,
"term_statistics":true,
"field_statistics" : true,
"filter":{
"max_num_terms":, // 返回的最大分词输
"min_term_freq" : , // 忽略低于源文档中出现的次数
"min_doc_freq" : 1 // 忽略低于所有文档中出现的次数
}
}
11.11 term过滤参数说明
max_num_terms:每个字段必须返回的最大分词数。默认为25。 min_term_freq:忽略源文档中低于此频率的单词。默认为1。
max_term_freq:忽略源文档中超过此频率的单词。默认为无限制。 min_doc_freq:忽略至少在这么多文档中没有出现的分词。默认为1。
max_doc_freq:忽略超过这么多文档中出现的单词。默认为无限制。 min_word_length:最小字长,低于该字长将被忽略。默认为0。
max_word_length:最大字长,高于该字长将被忽略。默认为unbounded()。
官方文档参考:Term Vector Api
12 批量返回分词:Multi termvectors API
采集term信息的方式有两种:index-time(从已经存储的索引中查看) 和 query-time(及时生成)
12.1 index-time
POST /_mtermvectors
{
"docs": [
{
"_index": "twitter",
"_type": "_doc",
"_id": "",
"term_statistics": true
},
{
"_index": "twitter",
"_type": "_doc",
"_id": "",
"fields": [
"message"
]
}
]
}
url中指定索引:
POST /twitter/_mtermvectors
{
"docs": [
{
"_type": "_doc",
"_id": "",
"fields": [
"message"
],
"term_statistics": true
},
{
"_type": "_doc",
"_id": ""
}
]
}
url中指定索引类型:
POST /twitter/_doc/_mtermvectors
{
"docs": [
{
"_id": "",
"fields": [
"message"
],
"term_statistics": true
},
{
"_id": ""
}
]
}
如果索引类型和字段都相同:
POST /twitter/_doc/_mtermvectors
{
"ids" : ["", ""],
"parameters": {
"fields": [
"message"
],
"term_statistics": true
}
}
12.2及时批量生成
POST_mtermvectors
{
"docs": [
{
"_index": "book",
"_type": "english",
"doc" : {
"name" : "John Doe",
"message" : "twitter test test test"
},
"fields": ["name"],
"per_field_analyzer" : {
"name":"standard"
}
},
{
"_index": "book",
"_type": "english",
"doc" : {
"name" : "Jane Doe",
"message" : "Another twitter test ..."
},
"fields": ["name"],
"per_field_analyzer" : {
"name":"standard"
}
}
]
}
response:
{
"docs": [
{
"_index": "book",
"_type": "english",
"_version": ,
"found": true,
"took": ,
"term_vectors": {
"name": {
"field_statistics": {
"sum_doc_freq": ,
"doc_count": ,
"sum_ttf":
},
"terms": {
"doe": {
"term_freq": ,
"tokens": [
{
"position": ,
"start_offset": ,
"end_offset":
}
]
},
"john": {
"term_freq": ,
"tokens": [
{
"position": ,
"start_offset": ,
"end_offset":
}
]
}
}
}
}
},
{
"_index": "book",
"_type": "english",
"_version": ,
"found": true,
"took": ,
"term_vectors": {
"name": {
"field_statistics": {
"sum_doc_freq": ,
"doc_count": ,
"sum_ttf":
},
"terms": {
"doe": {
"term_freq": ,
"tokens": [
{
"position": ,
"start_offset": ,
"end_offset":
}
]
},
"jane": {
"term_freq": ,
"tokens": [
{
"position": ,
"start_offset": ,
"end_offset":
}
]
}
}
}
}
}
]
}
12.2.返回该索引全部文档的分词统计
POST book/_search
{
"size" : 0,
"aggs" : {
"messages" : {
"terms" : {
"size" : 10,
"field" : "name"
}
}
}
}
官方文档参考:Multi termvectors API
13.?refresh
ES的索引数据是写入到磁盘上的。但这个过程是分阶段实现的,因为IO的操作是比较费时的。
- 先写到内存中,此时不可搜索
- 默认经过 1s 之后会(refresh)被写入 lucene 的底层文件 segment 中 ,此时可以搜索到
- flush之后才会写入磁盘
以上过程由于随时可能被中断导致数据丢失,所以每一个过程都会有 translog 记录,如果中间有任何一步失败了,等服务器重启之后就会重试,保证数据写入。translog也是先存在内存里的,然后默认5秒刷一次写到硬盘里。
在 index ,Update , Delete , Bulk 等操作中,可以设置 refresh 的值。取值如下:
13.1.refresh=true
更新数据之后,立刻对相关的分片(包括副本) 刷新,这个刷新操作保证了数据更新的结果可以立刻被搜索到。
13.2.refresh=wait_for
这个参数表示,刷新后返回。刷新不会立刻进行,而是等待一段时间才刷新 ( index.refresh_interval
),默认时间是 1 秒。刷新时间间隔可以通过index 的配置动态修改。或者直接手动刷新 POST /twitter/_refresh
13.3.refresh=false
refresh 的默认值,立即返回。更新数据之后不立刻刷新,在返回结果之后的某个时间点会自动刷新,也就是随机的,看es服务器的运行情况。
那么选择哪种刷新方式?
wait_for 和 true 对比,前者每次会积累一定的工作量再去刷新
true 是低效的,因为每次实时刷新会产生很小的 segment,随后这些零碎的小段会被合并到效率更高的大 segment 中。也就是说使用 true 的代价在于,在 index 阶段会创建这些小的 segment,在搜索的时候也是搜索这些小的 segment,在合并的时候去将小的 segment 合并到大的 segment 中
不要在多个请求中对每一条数据都设置
refresh=wait_for
, 用bulk 去批量更新,然后在单个的请求中设置refresh=wait_for
会好一些如果
index.refresh_interval: -1
,将会禁用刷新,那带上了refresh=wait_for
参数的请求实际上刷新的时间是未知的。如果index.refresh_interval
的值设置的比默认值( 1s )更小,比如 200 ms,那带上了refresh=wait_for
参数的请求将很快刷新,但是仍然会产生一些低效的segment。refresh=wait_for
只会影响到当前需要强制刷新的请求,refresh=true
却会影响正在处理的其他请求。所以如果想尽可能小的缩小影响范围时,应该用refresh=wait_for
官方文档参考:Refresh api