es修改mapping类型

时间:2021-10-07 23:45:21

es不可以修改mapping(比如修改类型等)。是需要用户重新创建索引。

思路:

1.创建新的索引库

2.创建类型和相对应mapping信息

3.将使用scroll搜索方式结合批量入库

4.移除原有别名,将创建好的别名添加

具体实现如下:

//创建索引库
curl -XPUT '192.168.1.234:9200/zf_hg_201701 ?pretty'
//创建type和mapping
curl -XPUT 'http://192.168.1.234:9200/zf_hg_201701/zf_hg_e/_mapping' -d'{"zf_hg_e": {
"properties": {
"spbh":{
"type":"string",
"index":"not_analyzed"
},
"gb":{
"type":"string",
"index":"not_analyzed"
},
"myfs":{
"type":"string",
"index":"not_analyzed"
},
"jydw":{
"type":"string",
"index":"not_analyzed"
},
"jldw":{
"type":"string",
"index":"not_analyzed"
},
"sl":{
"type":"string",
"index":"not_analyzed"
},
"je":{
"type":"integer",
"index":"not_analyzed"
},
"ljsl":{
"type":"string",
"analyzer":"ik_max_word",
"search_analyzer":"ik_max_word",
"index":"not_analyzed"
},
"ljje":{
"type":"string",
"index":"not_analyzed"
},
"qnljsl":{
"type":"string",
"index":"not_analyzed"
},
"qnljje":{
"type":"string",
"index":"not_analyzed"
},
"jydwmc":{
"type":"string",
"index":"not_analyzed"
},
"es_datetime":{
"type":"string",
"index":"not_analyzed"
}
}
}
}'
java方式入库

import java.util.List;


import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequestBuilder;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
/**
 * mapping数据类型修改不支持
 * 方案,重建索引库,重建type和mapping
 * 数据导入将采用以下方式scoll导出导入
 * 重建别名关联,删除原有数据库以及别名
 * @author Administrator
 *
 */

public class DataTransferByScroll {
public static final Logger LOG = LoggerFactory.getLogger(EsSearch.class);
/**
    * 使用scroll进行搜索
    * @param client
    */
   public static String searchByScroll(Client client,String index,String type,int size,long millis,BulkProcessor bulkProcessor,String toIndex,String toType) {
       // 搜索条件
       SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
       searchRequestBuilder.setIndices(index);
       searchRequestBuilder.setTypes(type);
     //  searchRequestBuilder.setQuery(QueryBuilders.matchQuery("", ""));
      // searchRequestBuilder.setSearchType(SearchType.SCAN);
       searchRequestBuilder.setSize(size);
       searchRequestBuilder.setScroll(new TimeValue(millis));
       // 执行
       SearchResponse searchResponse = searchRequestBuilder.get();
       System.out.println(searchResponse.getHits().totalHits());
       String scrollId = searchResponse.getScrollId();
       SearchHit[] searchHits = searchResponse.getHits().getHits();
       for (SearchHit searchHit : searchHits) {
          bulkProcessor.add(new IndexRequest(toIndex, toType).source(JSONObject.toJSONString(searchHit.getSource())));
       } // for
       System.out.println(scrollId);
       return scrollId;
       
   }
   
   /**
    *  通过滚动ID获取文档
    * @param client
    * @param scrollId
    */
   public static void searchByScrollId(Client client, String scrollId,String toIndex,String toType,long millis,BulkProcessor bulkProcessor){
       TimeValue timeValue = new TimeValue(millis);
       SearchScrollRequestBuilder searchScrollRequestBuilder;
       SearchResponse response;
       // 结果
       while (true) {
           searchScrollRequestBuilder = client.prepareSearchScroll(scrollId);
           // 重新设定滚动时间
           searchScrollRequestBuilder.setScroll(timeValue);
           // 请求
           response = searchScrollRequestBuilder.get();
           System.out.println(response.getHits().getHits().length+"数量!");
           // 每次返回下一个批次结果 直到没有结果返回时停止 即hits数组空时
           if (response.getHits().getHits().length == 0) {
               break;
           } // if
           // 这一批次结果
           SearchHit[] searchHits = response.getHits().getHits();
           for (SearchHit searchHit : searchHits) {
            bulkProcessor.add(new IndexRequest(toIndex, toType).source(JSONObject.toJSONString(searchHit.getSource())));
           } // for
           // 只有最近的滚动ID才能被使用
           scrollId = response.getScrollId();
       } // while
   }
   
   /**
    * 清除滚动ID
    * @param client
    * @param scrollIdList
    * @return
    */
   public static boolean clearScroll(Client client, List scrollIdList){
       ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll();
       clearScrollRequestBuilder.setScrollIds(scrollIdList);
       ClearScrollResponse response = clearScrollRequestBuilder.get();
       return response.isSucceeded();
   }
   /**
    * 清除滚动ID
    * @param client
    * @param scrollId
    * @return
    */
   public static boolean clearScroll(Client client, String scrollId){
       ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll();
       clearScrollRequestBuilder.addScrollId(scrollId);
       ClearScrollResponse response = clearScrollRequestBuilder.get();
       return response.isSucceeded();
   }


public static void main(String[] args) {
//9689430
long millis =20000;
        String index = "zf_hg";
        String type = "zf_hg_e";
        
        String toIndex = "zf_hg_201701";
        String toType = "zf_hg_e";
        int size =10000;
EsSearch esSearch = EsSearchFactory.getInstance();
TransportClient  client =esSearch.clientElasticSearch();
BulkProcessor bulkProcessor=null;
try {
bulkProcessor = esSearch.initBulkProcessor(client);
} catch (Exception e) {
LOG.error(e.toString(), e);
} 
DataTransferByScroll  dataTransferByScroll =new DataTransferByScroll();
String scrollId =dataTransferByScroll.searchByScroll( client, index, type, size, millis, bulkProcessor,toIndex, toType);
//scrollId不为空
if(!"".equals(scrollId)){
dataTransferByScroll.searchByScrollId( client,  scrollId, toIndex, toType, millis, bulkProcessor);
}
}
}

//修改别名
curl -XPOST localhost:9200/_aliases -d '
{
     "actions" : [
        {  "remove" : {
             "alias" "zf_hg" ,
             "index" "zf_hg_2017"
        }},
        {  "add" : {
             "alias" "zf_hg" ,
             "index" "zf_hg_201701"
        }}
    ]
}