数据库的连接池建议放在类似settings.py的配置模块中,因为基本都是配置项,方便统一管理。
#settings.py
import os from DBUtils.PooledDB import PooledDB from elasticsearch import Elasticsearch import pymysql class Config(object): POOL = PooledDB( creator = pymysql, maxconnections = 20, mincached = 3, maxcached = 3, host = ‘10.208.116.41‘, port = 3306, user = ‘hadoop‘, password= ‘XXXXX‘, database = ‘hadoop_tools‘ ) #elasticsearch connect es_conn = Elasticsearch( [‘10.208.116.33‘, ‘10.208.116.34‘, ‘10.208.116.35‘], sniff_timeout=60, sniff_on_start=True ) es_mapping = { ‘properties‘: { ‘title‘: { ‘type‘: ‘text‘, ‘analyzer‘: ‘ik_max_word‘, ‘search_analyzer‘: ‘ik_max_word‘ } } } #POOL test if __name__ == ‘__main__‘: conn = Config.POOL.connection() #cur = conn.cursor(pymysql.cursors.DictCursor) cur = conn.cursor() cur.execute("select * from hs2_status") ret = cur.fetchall() for i in ret: print(i)
2)封装MySQL的操作
utils/helper.py
import pymysql from ops.settings import Config def connect(): conn = Config.POOL.connection() cur = conn.cursor(pymysql.cursors.DictCursor) return conn,cur def connect_close(conn,cur): conn.close() cur.close() def fetch_all(sql,args): conn,cur = connect() cur.execute(sql,args) result = cur.fetchall() connect_close(conn,cur) return result def fetch_one(sql,args): conn,cur = connect() cur.execute(sql,args) result = cur.fetchone() connect_close(conn,cur) return result def insert(sql,args): conn, cur = connect() row = cur.execute(sql,args) conn.commit() connect_close(conn,cur) return row def update(sql,args): conn, cur = connect() row = cur.execute(sql,args) conn.commit() connect_close(conn,cur) return row def delete(sql,args): conn, cur = connect() row = cur.execute(sql,args) conn.commit() connect_close(conn,cur) return row
##下面这段调试用,可以去掉 if __name__ == ‘__main__‘: in_zk = ‘YES‘ ip = ‘10.208.106.159‘ # row = update("update hs2_status set in_zk=%s where host_ip = %s",(in_zk,ip)) # print("row is %s" %(row)) # 获得表头 # sql = "SHOW FIELDS FROM hs2_status" # tb_h = fetch_one(sql,()) # 执行sql # #hs2_header = [l[0] for l in tb_h] # # for i in tb_h: # print("this is %s" %(tb_h)) #获取内容 # hs2_content = fetch_all(sql, ()) sql = "select * from hs2_status" hs2_content = fetch_all(sql,()) for i in hs2_content: print(i)
ES连接引用示例:
#!/usr/bin/env python # encoding: UTF-8 from ops.settings import Config #create index,change mappings Config.es_conn.indices.create(index=‘news‘, ignore=400) Config.es_conn.indices.put_mapping(index=‘news‘, doc_type=‘politics‘, body=Config.es_mapping) datas = [ { ‘title‘: ‘美国留给伊拉克的是个烂摊子吗‘, ‘url‘: ‘http://view.news.qq.com/zt2011/usa_iraq/index.htm‘, ‘date‘: ‘2011-12-16‘ }, { ‘title‘: ‘*部:各地校车将享最高路权‘, ‘url‘: ‘http://www.chinanews.com/gn/2011/12-16/3536077.shtml‘, ‘date‘: ‘2011-12-16‘ }, { ‘title‘: ‘中韩渔警冲突调查:韩警平均每天扣1艘中国渔船‘, ‘url‘: ‘https://news.qq.com/a/20111216/001044.htm‘, ‘date‘: ‘2011-12-17‘ }, { ‘title‘: ‘中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首‘, ‘url‘: ‘http://news.ifeng.com/world/detail_2011_12/16/11372558_0.shtml‘, ‘date‘: ‘2011-12-18‘ } ] #insert data for data in datas: Config.es_conn.index(index=‘news‘, doc_type=‘politics‘, body=data) #query data result = Config.es_conn.search(index=‘news‘) print(result)