一、bulk的操作类型
1.1批量增
语法一:index操作:可以是创建文档,也可以是全量替换文档(类似于普通的put操作)
POST /_bulk
{"index":{"_index":"test_index","_type":"test_type","_id":"12"}}
{"score_num":86,"tags":"my love"}
运行结果
{
"took": 79,
"errors": false,
"items": [
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "12",
"_version": 10,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 14,
"_primary_term": 6,
"status": 200
}
}
]
}
验证查询是否创建成功
GET test_index/test_type/12
运行结果:
{
"_index": "test_index",
"_type": "test_type",
"_id": "12",
"_version": 10,
"found": true,
"_source": {
"score_num": 86,
"tags": "my love"
}
}
修改字段tags里面内容
POST /_bulk
{"index":{"_index":"test_index","_type":"test_type","_id":"12"}}
{"score_num":86,"tags":"you love"}
运行结果
{
"took": 73,
"errors": false,
"items": [
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "12",
"_version": 11,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 15,
"_primary_term": 6,
"status": 200
}
}
]
}
继续验证是否修改成功
GET test_index/test_type/12
运行结果
{
"_index": "test_index",
"_type": "test_type",
"_id": "12",
"_version": 11,
"found": true,
"_source": {
"score_num": 86,
"tags": "you love"
}
}
修改成功!
以上是增加一条数据,这时候我们批量增加2条数据测试一下
POST /_bulk
{"index":{"_index":"test_index","_type":"test_type","_id":"12"}}
{"score_num":86,"tags":"you love"}
{"index":{"_index":"test_index","_type":"test_type","_id":"13"}}
{"score_num":89,"tags":"my love"}
运行结果
{
"took": 183,
"errors": false,
"items": [
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "12",
"_version": 14,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 18,
"_primary_term": 6,
"status": 201
}
},
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "13",
"_version": 3,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 9,
"_primary_term": 6,
"status": 201
}
}
]
}
查询验证
GET test_index/test_type/_mget
{
"ids":[12,13]
}
运行结果:
{
"docs": [
{
"_index": "test_index",
"_type": "test_type",
"_id": "12",
"_version": 14,
"found": true,
"_source": {
"score_num": 86,
"tags": "you love"
}
},
{
"_index": "test_index",
"_type": "test_type",
"_id": "13",
"_version": 3,
"found": true,
"_source": {
"score_num": 89,
"tags": "my love"
}
}
]
}
语法二:create:强制创建,id已存在会失败,但不影响已成功的语句(类似于:PUT /index/type/id/_create)
POST /_bulk
{ "create": { "_index":"test_index", "_type": "test_type", "_id": "12" }}
{ "score_num": 68, "tags":"my love" }
此时,我们演示一下,批量新增id=13,14两条document数据,此时由于id=13已经创建成功了,会失败,但是14会创建成功。我们验证一下结果是不是如此
POST /_bulk
{ "create": { "_index":"test_index", "_type": "test_type", "_id": "13" }}
{ "score_num": 68, "tags":"my love" }
{ "create": { "_index":"test_index", "_type": "test_type", "_id": "14" }}
{ "score_num": 60, "tags":"dog love" }
运行结果:
{
"took": 101,
"errors": true,
"items": [
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "13",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[test_type][13]: version conflict, document already exists (current version [3])",
"index_uuid": "6lY0aliBRTitpKfC5N4vdQ",
"shard": "3",
"index": "test_index"
}
}
},
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "14",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 3,
"_primary_term": 6,
"status": 201
}
}
]
}
由此,可以看出id=13创建失败,但是id=14创建成功了。
用mget查询看一下,id=13的数据值是否还是score_num=89
GET test_index/test_type/_mget
{
"ids":[13,14]
}
运行结果
{
"docs": [
{
"_index": "test_index",
"_type": "test_type",
"_id": "13",
"_version": 3,
"found": true,
"_source": {
"score_num": 89,
"tags": "my love"
}
},
{
"_index": "test_index",
"_type": "test_type",
"_id": "14",
"_version": 1,
"found": true,
"_source": {
"score_num": 60,
"tags": "dog love"
}
}
]
}
1.2批量删
语法:
POST /_bulk
{"delete":{"_index":"test_index","_type":"test_type","_id":"12"}}
{"delete":{"_index":"test_index","_type":"test_type","_id":"13"}}
运行结果
{
"took": 212,
"errors": false,
"items": [
{
"delete": {
"_index": "test_index",
"_type": "test_type",
"_id": "12",
"_version": 15,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 19,
"_primary_term": 6,
"status": 200
}
},
{
"delete": {
"_index": "test_index",
"_type": "test_type",
"_id": "13",
"_version": 4,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 10,
"_primary_term": 6,
"status": 200
}
}
]
}
已经成功删除,验证一下
GET test_index/test_type/_mget
{
"ids":[12,13]
}
运行结果
{
"docs": [
{
"_index": "test_index",
"_type": "test_type",
"_id": "12",
"found": false
},
{
"_index": "test_index",
"_type": "test_type",
"_id": "13",
"found": false
}
]
}
确实已经删除成功!
1.3批量更新
更新分为全量替换跟partial update(局部更新)
update:全量替换语法:
POST /_bulk
{"index":{"_index":"test_index","_type":"test_type","_id":"12"}}
{"score_num":86,"tags":"you love"}
{"index":{"_index":"test_index","_type":"test_type","_id":"13"}}
{"score_num":89,"tags":"my love"}
上面已经演示过了,这里不再演示.
update:partial update语法:
POST /_bulk
{"update":{"_index":"test_index","_type":"test_type","_id":"14"}}
{"doc":{"score_num":100,"tags":"my love"}}
注意:doc是关键字,必须要加
运行结果
{
"took": 104,
"errors": false,
"items": [
{
"update": {
"_index": "test_index",
"_type": "test_type",
"_id": "14",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 4,
"_primary_term": 6,
"status": 200
}
}
]
}
验证
GET test_index/test_type/14
运行结果
{
"_index": "test_index",
"_type": "test_type",
"_id": "14",
"_version": 2,
"found": true,
"_source": {
"score_num": 100,
"tags": "my love"
}
}
上面是把id=14全部更新了,现在我们局部更新一下tags内容
POST /_bulk
{"update":{"_index":"test_index","_type":"test_type","_id":"14"}}
{"doc":{"tags":"you love"}}
运行结果
{
"took": 106,
"errors": false,
"items": [
{
"update": {
"_index": "test_index",
"_type": "test_type",
"_id": "14",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 5,
"_primary_term": 6,
"status": 200
}
}
]
}
验证是不是局部更新
GET test_index/test_type/14
运行结果:
{
"_index": "test_index",
"_type": "test_type",
"_id": "14",
"_version": 3,
"found": true,
"_source": {
"score_num": 100,
"tags": "you love"
}
}
局部更新成功!
此时,我们发现批量增跟批量更新都是两个json串,而批量删除只要一个json串
二、注意
注意:bulk api对json的语法,有严格的要求,每个json串不能换行,只能放一行,同时一个json串和一个json串之间,必须有一个换行。 bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志
三、指定index
现在我们将数据删除,查询一下看一下
GET _search
{
"query": {
"match_all": {}
}
}
运行结果
{
"took": 4,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}
ok,现在我们演示一下制定index的批量增删改操作
3.1指定index的批量增
create批量增语法:
POST test_index/_bulk
{"create":{"_type":"test_type","_id":"1"}}
{"score_num":90,"tags":"my love"}
{"create":{"_type":"test_type","_id":"2"}}
{"score_num":80,"tags":"you love"}
运行结果
{
"took": 177,
"errors": false,
"items": [
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 12,
"_primary_term": 6,
"status": 201
}
},
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 6,
"status": 201
}
}
]
}
查询一下是否添加成功:
GET test_index/test_type/_mget
{
"ids":[1,2]
}
运行结果:
{
"docs": [
{
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"score_num": 90,
"tags": "my love"
}
},
{
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 1,
"found": true,
"_source": {
"score_num": 80,
"tags": "you love"
}
}
]
}
index:批量增语法(可以是创建文档,也可以是全量替换文档):
POST test_index/_bulk
{"create":{"_type":"test_type","_id":"3"}}
{"score_num":99,"tags":"xiaoming love"}
{"create":{"_type":"test_type","_id":"4"}}
{"score_num":89,"tags":"xiaohong love"}
运行结果
{
"took": 171,
"errors": false,
"items": [
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 4,
"_primary_term": 6,
"status": 201
}
},
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 6,
"status": 201
}
}
]
}
查询是否添加成功
GET test_index/test_type/_mget
{
"ids":[3,4]
}
运行结果:
{
"docs": [
{
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 1,
"found": true,
"_source": {
"score_num": 99,
"tags": "xiaoming love"
}
},
{
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 1,
"found": true,
"_source": {
"score_num": 89,
"tags": "xiaohong love"
}
}
]
}
3.2 指定index批量修改操作
partial update语法:
POST test_index/_bulk
{"update":{"_type":"test_type","_id":"1"}}
{"doc":{"score_num":60}}
{"update":{"_type":"test_type","_id":"2"}}
{"doc":{"score_num":59}}
运行结果
{
"took": 196,
"errors": false,
"items": [
{
"update": {
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 13,
"_primary_term": 6,
"status": 200
}
},
{
"update": {
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 2,
"_primary_term": 6,
"status": 200
}
}
]
}
查询一下,看是否更新分数成功
GET test_index/test_type/_mget
{
"ids":[1,2]
}
运行结果:
{
"docs": [
{
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 2,
"found": true,
"_source": {
"score_num": 60,
"tags": "my love"
}
},
{
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 2,
"found": true,
"_source": {
"score_num": 59,
"tags": "you love"
}
}
]
}
partial update 局部更新文档成功!
全量替换更新操作这里不在演示,参考上面。
3.3指定index的批量删除操作
语法:
POST test_index/_bulk
{"delete":{"_type":"test_type","_id":"3"}}
{"delete":{"_type":"test_type","_id":"4"}}
运行结果
{
"took": 143,
"errors": false,
"items": [
{
"delete": {
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 5,
"_primary_term": 6,
"status": 200
}
},
{
"delete": {
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 3,
"_primary_term": 6,
"status": 200
}
}
]
}
验证是否删除成功
GET test_index/test_type/_mget
{
"ids":[3,4]
}
运行结果:
{
"docs": [
{
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"found": false
},
{
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"found": false
}
]
}
指定index批量删除成功!
四、指定index、type
这里就不在演示,跟指定index语法差不多。
五、ulk size最佳大小
bulk request会加载到内存里,如果太大的话,性能反而会下降,因此需要反复尝试一个最佳的bulk size。一般从10005000条数据开始,尝试逐渐增加。另外,如果看大小的话,最好是在515MB之间。