es不可以修改mapping(比如修改类型等)。是需要用户重新创建索引。
思路:
1.创建新的索引库
2.创建类型和相对应mapping信息
3.将使用scroll搜索方式结合批量入库
4.移除原有别名,将创建好的别名添加
具体实现如下:
//创建索引库
curl -XPUT '192.168.1.234:9200/zf_hg_201701
?pretty'
//创建type和mapping
curl -XPUT 'http://192.168.1.234:9200/zf_hg_201701/zf_hg_e/_mapping' -d'{"zf_hg_e": {
"properties": {
"spbh":{
"type":"string",
"index":"not_analyzed"
},
"gb":{
"type":"string",
"index":"not_analyzed"
},
"myfs":{
"type":"string",
"index":"not_analyzed"
},
"jydw":{
"type":"string",
"index":"not_analyzed"
},
"jldw":{
"type":"string",
"index":"not_analyzed"
},
"sl":{
"type":"string",
"index":"not_analyzed"
},
"je":{
"type":"integer",
"index":"not_analyzed"
},
"ljsl":{
"type":"string",
"analyzer":"ik_max_word",
"search_analyzer":"ik_max_word",
"index":"not_analyzed"
},
"ljje":{
"type":"string",
"index":"not_analyzed"
},
"qnljsl":{
"type":"string",
"index":"not_analyzed"
},
"qnljje":{
"type":"string",
"index":"not_analyzed"
},
"jydwmc":{
"type":"string",
"index":"not_analyzed"
},
"es_datetime":{
"type":"string",
"index":"not_analyzed"
}
}
}
}'
java方式入库
import java.util.List; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.ClearScrollRequestBuilder; import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject; /** * mapping数据类型修改不支持 * 方案,重建索引库,重建type和mapping * 数据导入将采用以下方式scoll导出导入 * 重建别名关联,删除原有数据库以及别名 * @author Administrator * */ public class DataTransferByScroll { public static final Logger LOG = LoggerFactory.getLogger(EsSearch.class); /** * 使用scroll进行搜索 * @param client */ public static String searchByScroll(Client client,String index,String type,int size,long millis,BulkProcessor bulkProcessor,String toIndex,String toType) { // 搜索条件 SearchRequestBuilder searchRequestBuilder = client.prepareSearch(); searchRequestBuilder.setIndices(index); searchRequestBuilder.setTypes(type); // searchRequestBuilder.setQuery(QueryBuilders.matchQuery("", "")); // searchRequestBuilder.setSearchType(SearchType.SCAN); searchRequestBuilder.setSize(size); searchRequestBuilder.setScroll(new TimeValue(millis)); // 执行 SearchResponse searchResponse = searchRequestBuilder.get(); System.out.println(searchResponse.getHits().totalHits()); String scrollId = searchResponse.getScrollId(); SearchHit[] searchHits = searchResponse.getHits().getHits(); for (SearchHit searchHit : searchHits) { bulkProcessor.add(new IndexRequest(toIndex, toType).source(JSONObject.toJSONString(searchHit.getSource()))); } // for System.out.println(scrollId); return scrollId; } /** * 通过滚动ID获取文档 * @param client * @param scrollId */ public static void searchByScrollId(Client client, String scrollId,String toIndex,String toType,long millis,BulkProcessor bulkProcessor){ TimeValue timeValue = new TimeValue(millis); SearchScrollRequestBuilder searchScrollRequestBuilder; SearchResponse response; // 结果 while (true) { searchScrollRequestBuilder = client.prepareSearchScroll(scrollId); // 重新设定滚动时间 searchScrollRequestBuilder.setScroll(timeValue); // 请求 response = searchScrollRequestBuilder.get(); System.out.println(response.getHits().getHits().length+"数量!"); // 每次返回下一个批次结果 直到没有结果返回时停止 即hits数组空时 if (response.getHits().getHits().length == 0) { break; } // if // 这一批次结果 SearchHit[] searchHits = response.getHits().getHits(); for (SearchHit searchHit : searchHits) { bulkProcessor.add(new IndexRequest(toIndex, toType).source(JSONObject.toJSONString(searchHit.getSource()))); } // for // 只有最近的滚动ID才能被使用 scrollId = response.getScrollId(); } // while } /** * 清除滚动ID * @param client * @param scrollIdList * @return */ public static boolean clearScroll(Client client, List scrollIdList){ ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll(); clearScrollRequestBuilder.setScrollIds(scrollIdList); ClearScrollResponse response = clearScrollRequestBuilder.get(); return response.isSucceeded(); } /** * 清除滚动ID * @param client * @param scrollId * @return */ public static boolean clearScroll(Client client, String scrollId){ ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll(); clearScrollRequestBuilder.addScrollId(scrollId); ClearScrollResponse response = clearScrollRequestBuilder.get(); return response.isSucceeded(); } public static void main(String[] args) { //9689430 long millis =20000; String index = "zf_hg"; String type = "zf_hg_e"; String toIndex = "zf_hg_201701"; String toType = "zf_hg_e"; int size =10000; EsSearch esSearch = EsSearchFactory.getInstance(); TransportClient client =esSearch.clientElasticSearch(); BulkProcessor bulkProcessor=null; try { bulkProcessor = esSearch.initBulkProcessor(client); } catch (Exception e) { LOG.error(e.toString(), e); } DataTransferByScroll dataTransferByScroll =new DataTransferByScroll(); String scrollId =dataTransferByScroll.searchByScroll( client, index, type, size, millis, bulkProcessor,toIndex, toType); //scrollId不为空 if(!"".equals(scrollId)){ dataTransferByScroll.searchByScrollId( client, scrollId, toIndex, toType, millis, bulkProcessor); } } }
//修改别名
curl -XPOST localhost:9200/_aliases -d '
{
"actions"
: [
{
"remove"
: {
"alias"
:
"zf_hg"
,
"index"
:
"zf_hg_2017"
}},
{
"add"
: {
"alias"
:
"zf_hg"
,
"index"
:
"zf_hg_201701"
}}
]
}