0、业务场景
将ES中某个index的某个字段的所有数据,导出到文件中
1、ES数据导出方法简述
ES数据导出方法,我主要找到了以下几个方面,欢迎大家补充:
- ES官方API:snapshot and restore module
The snapshot and restore module allows to create snapshots of individual indices or an entire cluster into a remote repository like shared file system, S3, or HDFS. These snapshots are great for backups because they can be restored relatively quickly but they are not archival because they can only be restored to versions of Elasticsearch that can read the index.
简而言之,是个对ES集群的镜像化以及快速回复的工具。不满足本次需求的针对某个字段输出的要求,所以不再继续看。感兴趣的同学可以查看 Elasticsearch Reference [5.0]» Modules» Snapshot And Restore
- ES的Java API:
虽说Java大法是我用的最多的编程语言,但是linux上运行Java脚本实在麻烦。抛出一个Java ES导出文件的链接,感兴趣的同学请自便:elasticsearch使用Java API批量数据导入和导出
- ES的Python API:
回归正题,Google搜“elasticsearch导出数据”的第一匹配结果,是一个Python脚本写的,链接是:lein-wang/elasticsearch_migrate
#!/usr/bin/python
#coding:utf-8
'''
Export and Import ElasticSearch Data.
Simple Example At __main__
@author: wgzh159@163.com
@modifier: lzkhit@163.com
@note: uncheck consistency of data, please do it by self
''' import json
import os
import sys
import time
import urllib2 reload(sys)
sys.setdefaultencoding('utf-8') class exportEsData():
size = 10000
def __init__(self, url,index,type,target_index):
self.url = url+"/"+index+"/"+type+"/_search"
self.index = index
self.type = type
self.target_index = target_index #替换原有的index
self.file_name = self.target_index+"_"+self.type+".json"
def exportData(self):
print("export data begin...\n")
begin = time.time()
try:
os.remove(self.file_name)
except:
os.mknod(self.file_name)
msg = urllib2.urlopen(self.url).read()
#print(msg)
obj = json.loads(msg)
num = obj["hits"]["total"]
start = 0
end = num/self.size+1 # read size data one bulk
while(start<end):
try:
msg = urllib2.urlopen(self.url+"?from="+str(start*self.size)+"&size="+str(self.size)).read()
self.writeFile(msg)
start=start+1
except urllib2.HTTPError, e:
print 'There was an error with the request'
print e
break
print(start)
print("export data end!!!\n total consuming time:"+str(time.time()-begin)+"s")
def writeFile(self,msg):
obj = json.loads(msg)
vals = obj["hits"]["hits"]
try:
cnt = 0
f = open(self.file_name,"a")
for val in vals:
val_json = val["_source"]["content"]
f.write(str(val_json)+"\n")
cnt += 1
finally:
print(cnt)
f.flush()
f.close() class importEsData():
def __init__(self,url,index,type):
self.url = url
self.index = index
self.type = type
self.file_name = self.index+"_"+self.type+".json"
def importData(self):
print("import data begin...\n")
begin = time.time()
try:
s = os.path.getsize(self.file_name)
f = open(self.file_name,"r")
data = f.read(s)
#此处有坑: 注意bulk操作需要的格式(以\n换行)
self.post(data)
finally:
f.close()
print("import data end!!!\n total consuming time:"+str(time.time()-begin)+"s")
def post(self,data):
print data
print self.url
req = urllib2.Request(self.url,data)
r = urllib2.urlopen(req)
response = r.read()
print response
r.close() if __name__ == '__main__':
'''
Export Data
e.g.
URL index type
exportEsData("http://10.100.142.60:9200","watchdog","mexception").exportData() export file name: watchdog_mexception.json
'''
exportEsData("http://88.88.88.88:9200","mtnews","articles","corpus").exportData() '''
Import Data *import file name:watchdog_test.json (important)
"_" front part represents the elasticsearch index
"_" after part represents the elasticsearch type
e.g.
URL index type
mportEsData("http://10.100.142.60:9200","watchdog","test").importData()
'''
#importEsData("http://10.100.142.60:9200","watchdog","test").importData()
#importEsData("http://127.0.0.1:9200/_bulk","chat","CHAT").importData()
#importEsData("http://127.0.0.1:9200/_bulk","chat","TOPIC").importData()
3、遇到的问题
万事俱备,python run代码后,出现了问题:
"urllib2.HTTPError: HTTP Error 500: Internal Server Error"
而且根据程序中的doc count计数信息,发现不论bulk size如何变(尝试了10/50/100/500/1000/5000/10000),总是卡在了第10000篇文档,然后urllib就抛异常。
同事黄大哥分析原因,可能是以下几个方面:
- 没有平衡bulk的速率,生产多,超过了消费能力,超过了es服务端的TPS (这里黄大哥按照人生经验建议一个bulk在5~15MB最合适)
- 系统端问题,需查看日志
首先,通过在while循环里面增加sleep语句并减少bulk size,降低ES的TPS,但是仍然在10000篇文档导出的时候出现了 HTTP STATUS 500 的错误,此法不通。
第二种原因,这时候需登录ES宿主机查看log。
发现log中有如下信息,
Caused by: QueryPhaseExecutionException[Result window is too large, from + size must be less than or equal to: [10000] but was [11000].
See the scroll api for a more efficient way to request lar ge data sets. This limit can be set by changing the [index.max_result_window]
index level parameter.]
正如 urllib2中HTTP状态码含义 一文中的
“5XX 回应代码以“5”开头的状态码表示服务器端发现自己出现错误,不能继续执行请求”
确实是服务器端的问题。
4、解决的方法
言归正传,这个问题既然定位了,那么解决方法肯定是有的,参考ES报错Result window is too large问题处理
需要对对应index在配置上,做如下定义:
curl -XPUT http://88.88.88.88:9200/mtnews/_settings -d '{ "index" : { "max_result_window" : 10000000}}'
对log中提示的 index.max_result_window 字段进行修改(默认的为10000)
5、ES学习的经验
- 发现问题要及时看日志,这样可以节约时间 23333