Elasticsearch的Bulk API允许批量提交index和delete请求,有如下两种用法:
用法1
BulkRequestBuilder requestBuilder = client.prepareBulk();
for(Person person : personList){
String obj = getIndexDataFromHotspotData(person);
if(obj != null){
requestBuilder.add(client.prepareIndex("test_index","test",String.valueOf(person.getId())).setRefresh(true).setSource(obj));
}
}
用法2
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
}).setBulkActions(10000).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).setFlushInterval(TimeValue.timeValueSeconds(5)).build();
for(Person person : personList){
String obj = getIndexDataFromHotspotData(person);
if(obj != null){
bulkProcessor.add(new IndexRequest("test_index","test",String.valueOf(person.getId())).source(obj));
}
}
- beforeBulk会在批量提交之前执行,可以从BulkRequest中获取请求信息request.requests()或者请求数量request.numberOfActions()
- 第一个afterBulk会在批量成功后执行,可以跟beforeBulk配合计算批量所需时间
- 第二个afterBulk会在批量失败后执行
- 在例子中,当请求超过10000个(default=1000)或者总大小超过1GB(default=5MB)时,触发批量提交动作