BulkResponse bulkResponse) {}@Overridepublic void afterBulk

时间:2022-01-20 06:30:28

Elasticsearch的Bulk API允许批量提交index和delete请求,有如下两种用法:

用法1 BulkRequestBuilder requestBuilder = client.prepareBulk(); for(Person person : personList){ String obj = getIndexDataFromHotspotData(person); if(obj != null){ requestBuilder.add(client.prepareIndex("test_index","test",String.valueOf(person.getId())).setRefresh(true).setSource(obj)); } } 用法2 BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long l, BulkRequest bulkRequest) { } @Override public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) { } @Override public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) { } }).setBulkActions(10000).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).setFlushInterval(TimeValue.timeValueSeconds(5)).build(); for(Person person : personList){ String obj = getIndexDataFromHotspotData(person); if(obj != null){ bulkProcessor.add(new IndexRequest("test_index","test",String.valueOf(person.getId())).source(obj)); } }

beforeBulk会在批量提交之前执行,可以从BulkRequest中获取请求信息request.requests()或者请求数量request.numberOfActions()

第一个afterBulk会在批量告成后执行,可以跟beforeBulk共同计算批量所需时间

第二个afterBulk会在批量掉败后执行

在例子中,当请求赶过10000个(default=1000)或者总巨细赶过1GB(default=5MB)时,触发批量提交行动