本文主要内容python MySQLdb数据库批量插入insert,更新update的:
1.python MySQLdb的使用,写了一个基类让其他的sqldb继承这样比较方便,数据库的ip, port等信息使用json配置文件
2.常见的查找,批量插入更新
下面贴出基类代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# _*_ coding:utf-8 _*_
import MySQLdb
import json
import codecs
# 这个自己改一下啊
from utils.JsonUtil import get_json_from_file
def byteify( input ):
"""
the string of json typed unicode to str in python
This function coming from stack overflow
:param input: {u'first_name': u'Guido', u'last_name': u'jack'}
:return: {'first_name': 'Guido', 'last_name': 'jack'}
"""
if isinstance ( input , dict ):
return {byteify(key): byteify(value)
for key, value in input .iteritems()}
elif isinstance ( input , list ):
return [byteify(element) for element in input ]
elif isinstance ( input , unicode ):
return input .encode( 'utf-8' )
else :
return input
def get_json_from_file(filename):
with open (filename) as jf:
jsondata = json.load(jf)
return byteify(jsondata)
class DbBase( object ):
def __init__( self , * * kwargs):
self .db_config_file = kwargs[ 'db_config_file' ]
self .config_db( self .db_config_file)
def config_db( self , db_config_file):
data = get_json_from_file(db_config_file)
host = data[ 'host' ]
user = data[ 'user' ]
pwd = data[ 'pwd' ]
db = data[ 'db' ]
port = data[ 'port' ]
self .tb_audit_mobile = data[ 'tb_audit_mobile' ]
self .conn = MySQLdb.connect(host = host, port = port, user = user, passwd = pwd, db = db, charset = "utf8" , use_unicode = True )
self .cursor = self .conn.cursor()
|
子类的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
class DbAuditTestService(DbBase):
def __init__( self , * * kwargs):
super (DbAuditTestService, self ).__init__( * * kwargs)
def getAdTestURl( self , beg, end):
sql = """select url, source from tb_name where create_date BETWEEN '%s' and '%s' """ % (beg, end)
self .cursor.execute(sql)
res = [row for row in self .cursor]
return res
def insert( self , lst_row):
"""batch insert, use ignore 避免索引唯一问题"""
try :
insert_sql = 'INSERT ignore INTO tb_ms_mobile_report_test (appid, source) VALUES (%s, %s)'
self .cursor.executemany(insert_sql, lst_row)
self .conn.commit()
except MySQLdb.OperationalError as e:
logger.info( '%s' % e)
self .cursor.close()
self .conn.close()
self .config_db( self .db_config_file)
def update_ip_info( self , ip_info):
"""
batch update
[[voilate_right_rate, ip]]
:param ip_info:
:return:
"""
query = """
update tb_ms_audit_ip_info set
ip_right_rate=%s
where submit_ip=%s """
self .cursor.executemany(query, ip_info)
self .conn.commit()
def insert_all():
"""批量操作的示例"""
db_audit = DbAuditService(db_config_file = '../config/mysql_police_audit.json' )
size = db_audit.count()
db_audit_test = DbAuditTestService(db_config_file = '../config/mysql_local_audit.json' )
batch_size = 2000
for k in xrange ( 100000 , size, batch_size):
logger.info( 'query limit %s ~ %s' % (k, batch_size))
lst_row = db_audit.query_limit(k, batch_size)
logger.info( 'convert_rows ' )
lst_row = convert_rows(lst_row)
db_audit_test.insert(lst_row)
|
总结
以上所述是小编给大家介绍的python MySQLdb使用教程详解,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
原文链接:http://blog.csdn.net/haluoluo211/article/details/77721138