Oracle和Elasticsearch数据同步

时间:2021-12-21 08:54:11

一、版本

Python版本 x64 2.7.12

Oracle(x64 12.1.0.2.0)和Elasticsearch(2.2.0)

python编辑器 PyCharm
 
下载安装请选择适合自己机器的版本
 
二、下载模块
通过官网下载和安装cx_Oracle和pyes模块,分别用于操作Oracle数据库和ES。安装fcntl模块用于解决python脚本单例执行问题。

如果是远程连接数据库和ES,请一定注意安装的模块或包版本。务必选择相应的版本,不然会遇到问题。

三、安装过程中会遇到的问题

cx_Oracle在本地安装过程中出现的一些问题:
1、安装c++for python的环境
2、安装Oracle数据库(或者安装API接口中需要的文件而不必下载配置整个oracle环境)
3、打开数据库工具 oracle SQL developor 按要求创建连接,并新建用户(创建数据库用户名时以c##开头,不然会提示)
4、oracle连接不上远程的服务器,检查版本是否匹配
 
fcntl在windows上安装时出现的问题:
1、用pip install fcntl 报错:indentationerror: unexpected indent(模块版本有问题)
 

四、源码

  1. # -*- coding: utf-8 -*-
  2. """
  3. 作者:陈龙
  4. 日期:2016-7-22
  5. 功能:oracle数据库到ES的数据同步
  6. """
  7. import os
  8. import sys
  9. import datetime, time
  10. # import fcntl
  11. import threading
  12. import pyes  # 引入pyes模块,ES接口
  13. import cx_Oracle  # 引入cx_Oracle模块,Oracle接口
  14. os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'  # 中文编码
  15. reload(sys)  # 默认编码设置为utf-8
  16. sys.setdefaultencoding('utf-8')
  17. # 创建ES连接 并返回连接参数
  18. def connect_ES(addr):
  19. try:
  20. global conn
  21. conn = pyes.ES(addr)  # 链接ES '127.0.0.1:9200'
  22. print 'ES连接成功'
  23. return conn
  24. except:
  25. print 'ES连接错误'
  26. pass
  27. # 创建ES映射mapping 注意各各个字段的类型
  28. def create_ESmapping():
  29. global spiderInfo_mapping, involveVideo_mapping, involveCeefax_mapping,keyWord_mapping,sensitiveWord_mapping
  30. spiderInfo_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},
  31. 'tableId': {'index': 'not_analyzed', 'type': 'integer'},
  32. 'title': {'index': 'analyzed', 'type': 'string'},
  33. 'author': {'index': 'not_analyzed', 'type': 'string'},
  34. 'content': {'index': 'analyzed', 'type': 'string'},
  35. 'publishTime': {'index': 'not_analyzed', 'type': 'string'},
  36. 'browseNum': {'index': 'not_analyzed', 'type': 'integer'},
  37. 'commentNum': {'index': 'not_analyzed', 'type': 'integer'},
  38. 'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 除去涉我部分内容的ES映射结构
  39. involveVideo_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},
  40. 'tableId': {'index': 'not_analyzed', 'type': 'integer'},
  41. 'title': {'index': 'analyzed', 'type': 'string'},
  42. 'author': {'index': 'not_analyzed', 'type': 'string'},
  43. 'summary': {'index': 'analyzed', 'type': 'string'},
  44. 'publishTime': {'index': 'not_analyzed', 'type': 'string'},
  45. 'url': {'index': 'not_analyzed', 'type': 'string'},
  46. 'imgUrl': {'index': 'not_analyzed', 'type': 'string'},
  47. 'ranking': {'index': 'not_analyzed', 'type': 'integer'},
  48. 'playNum': {'index': 'not_analyzed', 'type': 'integer'},
  49. 'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 涉我视音频内容的ES映射结构
  50. involveCeefax_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},
  51. 'tableId': {'index': 'not_analyzed', 'type': 'integer'},
  52. 'title': {'index': 'analyzed', 'type': 'string'},
  53. 'author': {'index': 'not_analyzed', 'type': 'string'},
  54. 'content': {'index': 'analyzed', 'type': 'string'},
  55. 'publishTime': {'index': 'not_analyzed', 'type': 'string'},
  56. 'keyWords': {'index': 'not_analyzed', 'type': 'string'},
  57. 'popularity': {'index': 'not_analyzed', 'type': 'integer'},
  58. 'url': {'index': 'not_analyzed', 'type': 'string'},
  59. 'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 涉我图文资讯内容的ES映射结构
  60. keyWord_mapping = {'id':{'index': 'not_analyzed', 'type': 'integer'},
  61. 'keywords':{'index': 'not_analyzed', 'type': 'string'}}
  62. sensitiveWord_mapping = {'id':{'index': 'not_analyzed', 'type': 'integer'},
  63. 'sensitiveType':{'index': 'not_analyzed', 'type': 'string'},
  64. 'sensitiveTopic': {'index': 'not_analyzed', 'type': 'string'},
  65. 'sensitiveWords': {'index': 'not_analyzed', 'type': 'string'}}
  66. # 创建ES相关索引和索引下的type
  67. def create_ESindex(ES_index, index_type1,index_type2,index_type3,index_type4,index_type5):
  68. if conn.indices.exists_index(ES_index):
  69. pass
  70. else:
  71. conn.indices.create_index(ES_index)  # 如果所有Str不存在,则创建Str索引
  72. create_ESmapping()
  73. conn.indices.put_mapping(index_type1, {'properties': spiderInfo_mapping},[ES_index])  # 在索引pom下创建spiderInfo的_type  "spiderInfo"
  74. conn.indices.put_mapping(index_type2, {'properties': involveVideo_mapping},[ES_index])  # 在索引pom下创建involveVideo的_type  "involveVideo"
  75. conn.indices.put_mapping(index_type3, {'properties': involveCeefax_mapping},[ES_index])  # 在索引pom下创建involveCeefax的_type  "involveCeefax"
  76. conn.indices.put_mapping(index_type4, {'properties': keyWord_mapping}, [ES_index])
  77. conn.indices.put_mapping(index_type5, {'properties': sensitiveWord_mapping}, [ES_index])
  78. # conn.ensure_index
  79. # 创建数据库连接 并返回连接参数
  80. def connect_Oracle(name, password, address):
  81. try:
  82. global conn1
  83. # conn1 = cx_Oracle.connect('c##chenlong','1234567890','localhost:1521/ORCL') #链接本地数据库
  84. conn1 = cx_Oracle.connect(name, password, address)  # 链接远程数据库 "pom","Bohui@123","172.17.7.118:1521/ORCL"
  85. print 'Oracle连接成功'
  86. return conn1
  87. except:
  88. print 'ES数据同步脚本连接不上数据库,请检查connect参数是否正确,或者模块版本是否匹配'
  89. pass
  90. def fetch_account(accountcode):  # 取两个‘_’之间的账号名称
  91. end = accountcode.find('_')
  92. return accountcode[0:end].strip()
  93. # 根据表的个数创建不同的对象
  94. # 从记录文档中读取各个表的记录ID,判断各个表的ID是否有变化
  95. # 分别读取各个表中的相关数据
  96. # 读取各个表的ID与记录的ID(记录在文本或者数据库中)并判断
  97. """def read_compare_ID():
  98. global tuple_tableName_IdNum
  99. global cur
  100. tuple_tableName_IdNum = {}
  101. tablename = []
  102. cur = conn1.cursor()
  103. result1 = cur.execute("select * from tabs")  ##执行数据库操作 读取各个表名
  104. row = result1.fetchall()
  105. for x in row:
  106. tablename.append(x[0])  # 将表名取出并赋值给tablename数组
  107. result2 = cur.execute('select {}_ID  from {}'.format(x[0], x[0]))
  108. ID_num = result2.fetchall()
  109. tuple_tableName_IdNum[x[0]] = ID_num"""
  110. def readOracle_writeES(tableName, ES_index, index_type):
  111. global cc
  112. cur = conn1.cursor()
  113. #result_AlltableNames = cur.execute("select * from tabs")
  114. result_latestId = cur.execute("select max({}_Id) from {} ".format(tableName,tableName))
  115. num1 = result_latestId.fetchone() #当前表中的最大ID
  116. print '当前表中的最大ID{}'.format(num1[0])
  117. result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName.upper())) #通过数据库表拿到更新的ID tablename 都转化成大写
  118. num2 = result_rememberId.fetchone() #上次记录的更新ID
  119. print '上次记录的更新ID{}'.format(num2[0])
  120. if tableName.upper() == 'T_SOCIAL':
  121. while num2[0] < num1[0]:
  122. result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,likeNum,forwardNum,commentNum,accountCode from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
  123. result_tuple1 = result_readOracle.fetchall()  #之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率
  124. for i in result_tuple1:  #一条一条写入ES,这个速度太慢,改进 通过bulk接口导入
  125. aa= (i[5]+i[6])
  126. bb=  (i[7]+i[8])
  127. if conn.index(
  128. {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[1]), 'author': unicode(i[2]),
  129. 'content': unicode(i[3]), 'publishTime': str(i[4]), 'browseNum': aa,
  130. 'commentNum':bb, 'dataType':fetch_account(i[9])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo
  131. cc += 1
  132. print 'bulk导入后的ID:{}'.format(i[0])
  133. rememberId = i[0] #如果写入成功才赋值
  134. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))
  135. conn1.commit()
  136. result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  137. num2 = result_rememberId.fetchone()
  138. print "{}读{}写成功".format(tableName,index_type)
  139. if tableName.upper() == 'T_HOTSEARCH':
  140. while num2[0] < num1[0]:
  141. result_readOracle = cur.execute("select {}_ID,accountCode,title,publishTime from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
  142. result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率
  143. for i in result_tuple1:  #一条一条写入ES,这个速度太慢,改进 通过bulk接口导入
  144. if conn.index(
  145. {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': '','content': '', 'publishTime': str(i[3]), 'browseNum': 0,
  146. 'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo
  147. cc += 1
  148. print 'bulk导入后的ID:{}'.format(i[0])
  149. rememberId = i[0]
  150. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
  151. conn1.commit()
  152. result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  153. num2 = result_rememberId.fetchone()
  154. print "{}读{}写成功".format(tableName, index_type)
  155. if tableName.upper() == 'T_VIDEO_HOT':
  156. while num2[0] < num1[0]:
  157. result_readOracle = cur.execute("select {}_ID,accountCode,title,Author,publishTime from {} where {}_ID > {} and rownum<=40 ".format(tableName,tableName,tableName,num2[0]))
  158. result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率
  159. for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?
  160. if conn.index(
  161. {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),
  162. 'content': '', 'publishTime': str(i[4]), 'browseNum': 0,
  163. 'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo
  164. cc += 1
  165. print 'bulk导入后的ID:{}'.format(i[0])
  166. rememberId = i[0]
  167. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
  168. conn1.commit()
  169. result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  170. num2 = result_rememberId.fetchone()
  171. print "{}读写成功".format(tableName)
  172. if tableName.upper() == 'T_PRESS':
  173. while num2[0] < num1[0]:
  174. result_readOracle = cur.execute(
  175. "select {}_ID,accountCode,title,Author,PublishDate,Content from {} where {}_ID > {} and rownum<=40 ".format(
  176. tableName, tableName, tableName, num2[0]))
  177. result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率
  178. for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?
  179. if conn.index(
  180. {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),
  181. 'content': unicode(i[5]), 'publishTime': str(i[4]), 'browseNum': 0,
  182. 'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo
  183. cc += 1
  184. print 'bulk导入后的ID:{}'.format(i[0])
  185. rememberId = i[0]
  186. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
  187. conn1.commit()
  188. result_rememberId = cur.execute(
  189. "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  190. num2 = result_rememberId.fetchone()
  191. print "{}读写成功".format(tableName)
  192. if tableName.upper() == 'T_INDUSTRY':
  193. while num2[0] < num1[0]:
  194. result_readOracle = cur.execute(
  195. "select {}_ID,accountCode,title,Author,PublishTime,Content,BrowseNum from {} where {}_ID > {} and rownum<=40 ".format(
  196. tableName, tableName, tableName, num2[0]))
  197. result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率
  198. for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?
  199. if conn.index(
  200. {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),
  201. 'content': unicode(i[5]), 'publishTime': str(i[4]), 'browseNum': i[6],
  202. 'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True) : # 将数据写入索引pom的spiderInfo
  203. cc += 1
  204. print 'bulk导入后的ID:{}'.format(i[0])
  205. rememberId = i[0]
  206. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
  207. conn1.commit()
  208. result_rememberId = cur.execute(
  209. "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  210. num2 = result_rememberId.fetchone()
  211. print "{}读写成功".format(tableName)
  212. if tableName.upper() == 'T_SOCIAL_SITESEARCH':
  213. while num2[0] < num1[0]:
  214. result_readOracle = cur.execute('select {}_ID,title,author,content,publishTime,keyWords,browseNum,likeNum,forwardNum,commentNum,url,accountCode from {} where ({}_ID > {})'.format(tableName, tableName, tableName, num2[0]))
  215. result_tuple1 = result_readOracle.fetchmany(50)  #因为数据量太大,超过了变量的内存空间,所以一次性取40条
  216. for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?
  217. popularity = (i[6] + i[7] + i[8] * 2 + i[9] * 2)
  218. if conn.index(
  219. {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),
  220. 'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(i[5]),
  221. 'popularity':popularity,'url': i[10],
  222. 'dataType':fetch_account(i[11])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo
  223. cc += 1
  224. print 'bulk导入后的ID:{}'.format(i[0])
  225. rememberId = i[0]
  226. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))
  227. conn1.commit()
  228. result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  229. num2 = result_rememberId.fetchone()
  230. print "{}读写成功".format(tableName)
  231. if tableName.upper() == 'T_REALTIME_NEWS':
  232. while num2[0] < num1[0]:
  233. result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
  234. result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率
  235. for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?
  236. popularity = (i[5] + i[6] * 2)
  237. if conn.index(
  238. {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),
  239. 'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(''),
  240. 'popularity':popularity,'url': i[8],'dataType':fetch_account(i[7])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo
  241. cc += 1
  242. print 'bulk导入后的ID:{}'.format(i[0])
  243. rememberId = i[0]
  244. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
  245. conn1.commit()
  246. result_rememberId = cur.execute(
  247. "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  248. num2 = result_rememberId.fetchone()
  249. print "{}读{}写成功".format(tableName, index_type)
  250. if tableName.upper() == 'T_KEY_NEWS':
  251. while num2[0] < num1[0]:
  252. result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
  253. result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率
  254. for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?
  255. popularity = (i[5] + i[6] * 2)
  256. if conn.index(
  257. {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),
  258. 'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(''),
  259. 'popularity':popularity,'url': i[8],'dataType':fetch_account(i[7])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo
  260. cc += 1
  261. print 'bulk导入后的ID:{}'.format(i[0])
  262. rememberId = i[0]
  263. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
  264. conn1.commit()
  265. result_rememberId = cur.execute(
  266. "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  267. num2 = result_rememberId.fetchone()
  268. print "{}读{}写成功".format(tableName, index_type)
  269. if tableName.upper() == 'T_LOCAL_NEWS':
  270. while num2[0] < num1[0]:
  271. result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
  272. result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率
  273. for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?
  274. popularity = (i[5] + i[6] * 2)
  275. if conn.index(
  276. {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[1]), 'author': unicode(i[2]),
  277. 'content': unicode(i[3]), 'publishTime': str(i[4]), 'keyWords': unicode(''),
  278. 'popularity': popularity, 'url': i[8], 'dataType': fetch_account(i[7])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo
  279. cc += 1
  280. print 'bulk导入后的ID:{}'.format(i[0])
  281. rememberId = i[0]
  282. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
  283. conn1.commit()
  284. result_rememberId = cur.execute(
  285. "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  286. num2 = result_rememberId.fetchone()
  287. print "{}读{}写成功".format(tableName, index_type)
  288. if tableName.upper() == 'T_VIDEO_SITESEARCH':
  289. while num2[0] < num1[0]:
  290. result_readOracle = cur.execute("select {}_ID,accountCode,title,Author,publishTime,url,imgUrl,playNum,keyWords from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
  291. result_tuple1 = result_readOracle.fetchall()  # 之前是因为数据量太大,超过了变量的内存空间,所以用fetchmany取40条  后来大神建议数据库中限制查询数 然后fetchall,这样查询更有效率
  292. for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?
  293. if conn.index(
  294. {
  295. 'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]), 'author': unicode(i[3]),
  296. 'summary': unicode('0'), 'publishTime': str(i[4]), 'browseNum': i[7],'url':i[5],'imgUrl':i[6],'ranking':0,
  297. 'playNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo
  298. cc += 1
  299. print 'bulk导入后的ID:{}'.format(i[0])
  300. rememberId = i[0]
  301. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
  302. conn1.commit()
  303. result_rememberId = cur.execute(
  304. "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  305. num2 = result_rememberId.fetchone()
  306. print "{}读{}写成功".format(tableName,index_type)
  307. if tableName.upper() == 'T_BASE_KEYWORDS':
  308. while num2[0] < num1[0]:
  309. result_readOracle = cur.execute('select {}_ID,keywords from {} where {}_ID > {} and rownum<=50'.format(tableName, tableName, tableName, num2[0]))
  310. result_tuple1 = result_readOracle.fetchall()  #因为数据量太大,超过了变量的内存空间,所以一次性取40条
  311. for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?
  312. if conn.index({'id': i[0], 'keywords': i[1]}, ES_index, index_type,bulk=True):  # 将数据写入索引pom的spiderInfo
  313. cc += 1
  314. print 'bulk导入后的ID:{}'.format(i[0])
  315. rememberId = i[0]
  316. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))
  317. conn1.commit()
  318. result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  319. num2 = result_rememberId.fetchone()
  320. print "{}读写成功".format(tableName)
  321. if tableName.upper() == 'T_BASE_SENSITIVEWORDS':
  322. while num2[0] < num1[0]:
  323. result_readOracle = cur.execute('select {}_ID,SensitiveType,SensitiveTopic,SensitiveWords from {} where {}_ID > {} and rownum<=50'.format(tableName, tableName, tableName,num2[0]))
  324. result_tuple1 = result_readOracle.fetchall()  # 因为数据量太大,超过了变量的内存空间,所以一次性取40条
  325. for i in result_tuple1:  # 一条一条写入ES,这个速度太慢,强烈需要改进 通过bulk接口导入?
  326. if conn.index({'id':i[0],
  327. 'sensitiveType':unicode(i[1]),
  328. 'sensitiveTopic': unicode(i[2]),
  329. 'sensitiveWords':unicode(i[3])}, ES_index, index_type, bulk=True):  # 将数据写入索引pom的spiderInfo
  330. cc +=1
  331. print 'bulk导入后的ID:{}'.format(i[0])
  332. rememberId = i[0]
  333. cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
  334. conn1.commit()
  335. result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通过数据库表拿到更新的ID
  336. num2 = result_rememberId.fetchone()
  337. print "{}读写成功".format(tableName)
  338. else:
  339. pass
  340. def ww(a):
  341. while True:
  342. print a
  343. time.sleep(0.5)  #用于多线程的一个实验函数
  344. if __name__ == "__main__":
  345. cc = 0
  346. connect_ES('172.17.5.66:9200')
  347. # conn.indices.delete_index('_all')  # 清除所有索引
  348. create_ESindex("pom", "spiderInfo", "involveVideo", "involveCeefax","keyWord","sensitiveWord")
  349. connect_Oracle("pom", "Bohui@123", "172.17.7.118:1521/ORCL")
  350. # thread.start_new_thread(readOracle_writeES,("T_SOCIAL","pom","spiderInfo"),)#创建一个多线程
  351. # thread.start_new_thread(readOracle_writeES,("T_SOCIAL_SITESEARCH", "pom", "spiderInfo"),)#创建一个多线程
  352. mm = time.clock()
  353. readOracle_writeES("T_SOCIAL", "pom", "spiderInfo") #表名虽然在程序中设置了转化为大写,但是还是全大写比较好
  354. readOracle_writeES("T_HOTSEARCH", "pom", "spiderInfo")
  355. readOracle_writeES("T_VIDEO_HOT", "pom", "spiderInfo")
  356. readOracle_writeES("T_PRESS", "pom", "spiderInfo")
  357. readOracle_writeES("T_INDUSTRY", "pom", "spiderInfo")
  358. readOracle_writeES("T_VIDEO_SITESEARCH", "pom", "involveVideo")
  359. readOracle_writeES("T_REALTIME_NEWS", "pom", "involveCeefax")
  360. readOracle_writeES("T_KEY_NEWS", "pom", "involveCeefax")
  361. readOracle_writeES("T_LOCAL_NEWS", "pom", "involveCeefax")
  362. readOracle_writeES("T_SOCIAL_SITESEARCH", "pom", "involveCeefax")
  363. readOracle_writeES("T_BASE_KEYWORDS", "pom", "keyWord")
  364. readOracle_writeES("T_BASE_SENSITIVEWORDS", "pom", "sensitiveWord")
  365. nn = time.clock()
  366. # conn.indices.close_index('pom')
  367. conn1.close()
  368. print '数据写入耗时:{}  成功写入数据{}条'.format(nn-mm,cc)
  369. #实验多线程
  370. """
  371. while a < 100:
  372. conn.index(
  373. {'tableName': 'T_base_account', 'type': '1', 'tableId': '123', 'title': unicode('陈龙'), 'author': 'ABC',
  374. 'content': 'ABC', 'publishTime': '12:00:00', 'browseNum': '12', 'commentNum': '12', 'dataType': '1'},
  375. "pom", "spiderInfo", )  # 将数据写入索引pom的spiderInfo
  376. a += 1
  377. print time.ctime()
  378. """
  379. """
  380. threads = []
  381. t1 = threading.Thread(target=readOracle_writeES,args=("T_SOCIAL","pom","spiderInfo"))
  382. threads.append(t1)
  383. #t3 = threading.Thread(target=ww,args=(10,))
  384. #threads.append(t3)
  385. #t2 = threading.Thread(target=readOracle_writeES,args=("T_SOCIAL_SITESEARCH", "pom", "spiderInfo"))
  386. #threads.append(t2)
  387. print time.ctime()
  388. for t in threads:
  389. t.setDaemon(True)
  390. t.start()
  391. t.join()
  392. """

五、编译过程的问题

 
1、直接print游标cur.execute ( ) 将不能得到我们想要的结果
 
result2 = cur.execute('select T_SOCIAL_ID from T_SOCIAL')
print result2
返回:<__builtin__.OracleCursor on <cx_Oracle.Connection to pom@172.17.7.118:1521/ORCL>>
 
 
result2 = cur.execute('select T_SOCIAL_ID  from T_SOCIAL')
print result2
num = result2.fetchall()
print num
for i in num:
    print i[0]
 
返回:[(55,), (56,), (57,), (58,), (59,), (60,), (61,), (62,), (63,), (64,), (65,), (66,), (67,), (68,), (69,), (70,)]
     55
注意:用fetchall()得到的数据为:[(55,), (56,), (57,), (58,), (59,)] 元组而不是数字。
用 变量[num] 的方式取出具体的数值
 
2、cx_Oracle中文编码乱码问题
 
显示中文乱码:������DZ��� �����������
或者显示未知的编码:('\xce\xd2\xd5\xe6\xb5\xc4\xca\xc7\xb1\xea\xcc\xe2',)
需要注意一下几个地方,将数据库中的中文编码转化成utf-8编码,并将中文写入elasticsearch
 
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' #中文编码
 
 
reload(sys) #默认编码设置为utf-8 一定需要reload(sys)
sys.setdefaultencoding('utf-8')
 
'title':unicode('中文')
 
python传递给js的列表中文乱码怎么解决?  
json.dumps(dictionary,ensure_ascii=False)
 
 
3、远程连接不上Oracle数据库的问题
 
第一:确保connect()中各个参数的值都正确。例如
 
conn1 = cx_Oracle.connect("username","password","172.17.7.118:1521/ORCL")  #连接远程数据库
conn1 = cx_Oracle.connect('username','password','localhost:1521/ORCL') #连接本地数据库
conn2 = pyes.ES('127.0.0.1:9200')  #连接ES
 
第二:确保安装的版本都符合要求,包括模块的版本。
 
4、提示TypeError: 'NoneType' object is not callable
 
确保mapping中的各个字段类型都设置正确
检查索引和映射是否都书写正确
 
5、脚本同时读取多个数据库表
涉及到Python中多线程的问题,给每一个表起一个线程,同时给每一个线程加锁
编译时碰到问题:AssertionError: group argument must be None for now(检查函数是否书写正确,读写冲突)
AttributeError: 'builtin_function_or_method' object has no attribute 'setDaemon'
cx_Oracle.ProgrammingError: LOB variable no longer valid after subsequent fetch(fetchall数据量过大,溢出 设置一次取数据库中 rownum数)
TypeError: 'NoneType' object has no attribute '__getitem__'  (注意数据库查询对应的大小写)
No handlers could be found for logger "pyes"  可能是连接超时
AttributeError: 'tuple' object has no attribute 'append'   tuple不能直接用append
TypeError: 'tuple' object does not support item assignment  tuple不能赋值
数据库批量读取
就多线程问题咨询了大神,大神建议用多进程来实现会比较简单
 
6、脚本定时触发问题
Linux crontab定时执行任务,crontab防止脚本周期内未执行完重复执行
 
 
7、单实例的问题。防止脚本没有执行完再次触发
刚开始设想在脚本中完成,后来知道这个可以在系统中设定
 
8、数据同步插件
网上有大量的关于同步关系型数据库的有关插件 logstash-input-jdbc  不太好安装,不知道如何使用。
MySQL和ES同步插件的介绍,例如elasticsearch-river-jdbc
在这儿启用的是bulk接口,批量导入。数据同步的速度大大提高
 
9、判断数据是否同步成功
这个是之前一直没有注意的问题,但其实在数据传输的时候是非常重要的。
目前的判断方法是看ES中的数据量到底有多少,然后对照统计量进行判断分析,,这也是在后期发现有部分数据没有同步过去的方法。
 
10、统计写入了多少数据
UnboundLocalError: local variable 'cc' referenced before assignment 
定义了全局变量cc,但是在局部进行了修改,所以报错 修改同名的全局变量,则认为是一个局部变量
 
五、源码改进
因为数据写入的速度太慢(40条数据 800Kb大小 写入花费2S左右),所有在原来的基础上,修改了读取数据库中未写入内容的策略和ES写入的策略。
 
插入完成的源码
 
调试问题:
1、pip install elasticsearch  引入helpers函数模块,使用bulk函数批量导入。
2、AttributeError: 'ES' object has no attribute 'transport'  因为原来使用的是pyes模块 现在换成了elasticsearch,所以改成对应模块
conn2 = Elasticsearch("127.0.0.1:9200")
其他常见错误
    SerializationError:JSON数据序列化出错,通常是因为不支持某个节点值的数据类型
    RequestError:提交数据格式不正确
    ConflictError:索引ID冲突
    TransportError:连接无法建立
 
最后通过了解其实是找到了数据同步的插件 logstash-input-jdbc 能够实现数据的同步增删改查,按照网上的教程能够很轻松的实现,遇到的问题就是插件同步过去的字段都必须是小写。
 
------------
Python中cx_Oracle的一些函数:
commit() 提交
rollback() 回滚
cursor用来执行命令的方法:
callproc(self, procname, args):用来执行存储过程,接收的参数为存储过程名和参数列表,返回值为受影响的行数
execute(self, query, args):执行单条sql语句,接收的参数为sql语句本身和使用的参数列表,返回值为受影响的行数
executemany(self, query, args):执行单挑sql语句,但是重复执行参数列表里的参数,返回值为受影响的行数
nextset(self):移动到下一个结果集

cursor用来接收返回值的方法:
fetchall(self):接收全部的返回结果行.
fetchmany(self, size=None):接收size条返回结果行.如果size的值大于返回的结果行的数量,则会返回cursor.arraysize条数据.
fetchone(self):返回一条结果行.

scroll(self, value, mode='relative'):移动指针到某一行.如果mode='relative',则表示从当前所在行移动value条,如果 mode='absolute',则表示从结果集的第一行移动value条.
MySQL中关于中文编码的问题
conn = MySQLdb.Connect(host='localhost', user='root', passwd='root', db='python') 中加一个属性:
conn = MySQLdb.Connect(host='localhost', user='root', passwd='root', db='python',charset='utf8') 
charset是要跟你数据库的编码一样,如果是数据库是gb2312 ,则写charset='gb2312'。