python多线程读取MySQL执行效率优化问题

时间:2021-04-27 21:11:14
[root@yetest01 app]# vi db.py 
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)   
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
    os.mkdir(logdir)


def writelog(filename,message):
    file=logdir+'/'+filename
    f=open(file,"a")
    f.write(message)
    f.close

def maxdata():
    try:
        conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
        cur=conn.cursor()
        cur.execute("select max(userid) as maxid from roomfamily")
        data=cur.fetchone()
    finally:
        conn.close()
        return data[0]

maxiddata=maxdata()
minid=0
maxid=0
que=Queue.Queue()


def serial(id):
    global minid,maxid
    if minid < id:
       maxid=minid+9999
    if maxid > id:
       maxid=id
    return minid,maxid 

class runsql(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
            global minid,maxid,mutex
            threadname = threading.currentThread().getName()
            mutex.acquire()
            #对数据加锁
            if maxid >= maxiddata:
               time.sleep(0.1)
            (minid,maxid)=serial(maxiddata)
            que.put((minid,maxid))
            if maxid < maxiddata :
                minid=maxid+1
            mutex.release()
            conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
            if not que.empty():
                (mid,aid)=que.get()
                #conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
                cur=conn.cursor()
                cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
                print threadname+" affect rows:%d:%s~%s"%(cur.rowcount,mid,aid)
                data=cur.fetchall();
                for num in data:
                   tablename="t_roomfamily_"+str(num[0]%200)
                   prefixname=str(num[0]%200)
                   writelog(tablename,num[1]+"\n")
                conn.close()
def main():
    global minid,maxid,mutex
    #### 定义循环序列,就是一个线程池     
    threads = []
    #### 定义运行所使用的线程数 
    thread_lines =10 
    start_line=0
    mutex = threading.Lock()
    for t in range(0,thread_lines):
        t=runsql()
        threads.append(t)
        start_line+=1
    for t in threads:
        t.start() 
    while True:
      for num_line in xrange(0,thread_lines): 
    #### 初始化当前线程的状态
         thread_status = False
    #### 初始化检查循环线程的开始值 
         loop_line = 0
    #### 开始循环检查线程池中的线程状态
         while thread_status == False :
    #### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
    #### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。 
    #### 如果线程池中线程全部在运行,则开始从头检查                
            if threads[loop_line].isAlive() == False:
                   threads[loop_line] = runsql() 
                   threads[loop_line].start() 
                   thread_status = True
            else:
                if loop_line >= thread_lines-1 :
                   loop_line=0
                else:
                   loop_line+=1
      if maxid >= maxiddata:
          break
    for number_line in xrange(start_line,thread_lines): 
        thread[number_line].exit() 

if __name__ == '__main__':
   main()

执行结果:
.........以上线程省略
Thread-18486 affect rows:12635:184850000~184859999
Thread-18487 affect rows:12630:184860000~184869999
Thread-18488 affect rows:12865:184870000~184879999
Thread-18489 affect rows:782:184880000~184880596
Thread-18490 affect rows:782:184880000~184880596

4个疑问:
1.最后2个线程读取的数据重复了,不知道BUG在哪里?
2.我愿意是想开10个线程,然后复用老线程的,从结果看出是不段的在新增加线程,没有达到我想到的目的,该如何优化?
3.多线程里读取数据库的数据然后写文件,如果多个线程同时刻在写同一个文件,会引起阻塞现象吗?现在发现此脚本执行起来效率不如任意。
4.以上代码还有其他该优化的地方,请大家提出来?

本人初学PYTHON,有很多疑问,望各位大侠指点迷津!小女子跪拜了,谢谢!

12 个解决方案

#1


python线程共有全局锁,基本不能实现多线程并行/并发执行

#2


1:que里多了个重复的任务,因为if判断在后。
2:que.put是线程安全不用加锁,而且跟线程任务无关,应该把代码搬到外头,serial函数应该按线程数的若干倍数进行id分割,你的例子预定线程数比任务数量还多,跑一次就完了哪还能继续循环复?
3:并发写入同一文件要上锁...

#3


虽然有gil限制,不过有可能长时间IO等待,用多线程编程还是有点用处..。

#4


python多线程是假滴, 别用python多线程喽.

#5


引用 4 楼 qq120848369 的回复:
python多线程是假滴, 别用python多线程喽.

那有撒提高效率的方法呢?

#6


#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)   
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
    os.mkdir(logdir)


def writelog(filename,message):
    file=logdir+'/'+filename
    f=open(file,"a")
    f.write(message)
    f.close

def maxdata():
    try:
        conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
        cur=conn.cursor()
        cur.execute("select max(userid) as maxid from roomfamily")
        data=cur.fetchone()
    finally:
        conn.close()
        return data[0]

maxiddata=maxdata()
que=Queue.Queue()
maxid=0
minid=0

def serial(id):
    if minid < id:
       maxid=minid+9999
    if maxid > id:
       maxid=id
    return minid,maxid 

def queuein(id):
    global minid,maxid
    while True:
       (minid,maxid)=serial(id)
       que.put((minid,maxid))
       print "queue:%d,%d"%(minid,maxid)
       if maxid >= maxiddata:
           break
       if maxid < maxiddata :
           minid=maxid+1

class runsql(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
            global mutex
            threadname = threading.currentThread().getName()
            conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
            (mid,aid)=que.get()
            print "qget:%d,%d"%(mid,aid)
            #cur=conn.cursor()
            #cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
def main():
    global mutex
    #新增加一个线程来产生数字队列
    threading.Thread(target=queuein(maxiddata),args=()).start() 
    #### 定义循环序列,就是一个线程池     
    threads = []
    #### 定义运行所使用的线程数 
    thread_lines =10 
    start_line=0
    for t in range(0,thread_lines):
        t=runsql()
        threads.append(t)
        start_line+=1
    for t in threads:
        t.start() 
    while True:
      for num_line in xrange(0,thread_lines): 
    #### 初始化当前线程的状态
         thread_status = False
    #### 初始化检查循环线程的开始值 
         loop_line = 0
    #### 开始循环检查线程池中的线程状态
         while thread_status == False :
    #### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
    #### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。 
    #### 如果线程池中线程全部在运行,则开始从头检查                
            if threads[loop_line].isAlive() == False:
                   threads[loop_line] = runsql() 
                   threads[loop_line].start() 
                   thread_status = True
            else:
                if loop_line >= thread_lines-1 :
                   loop_line=0
                else:
                   loop_line+=1
      if maxid >= maxiddata:
          break
    for number_line in xrange(start_line,thread_lines): 
        thread[number_line].exit() 

if __name__ == '__main__':
   main()
现在改了一下,改成用一个线程来产生数据队列,执行结果如下:
.......
上面省略了很多行
queue:184830000,184839999
queue:184840000,184849999
queue:184850000,184859999
queue:184860000,184869999
queue:184870000,184879999
queue:184880000,184880596
qget:0,9999
qget:10000,19999
qget:20000,29999
qget:30000,39999
qget:40000,49999
qget:50000,59999
qget:60000,69999
qget:70000,79999
qget:80000,89999
qget:90000,99999
qget:100000,109999
qget:110000,119999
qget:120000,129999
qget:130000,139999
qget:140000,149999
qget:150000,159999
qget:160000,169999
qget:170000,179999
qget:180000,189999
qget:190000,199999

现在问题是get出来的数据比put进去的数据少了很多,请问哪里出了问题?

#7


监测部分while-for-while代码块貌似很复杂呀,逻辑容易搞错吧,试试照之前让线程自己不断循环,循环底部调用Queue.task_done()自行计数,主线程简单Queue.join() 阻塞等待即可...

#8


引用 7 楼 angel_su 的回复:
监测部分while-for-while代码块貌似很复杂呀,逻辑容易搞错吧,试试照之前让线程自己不断循环,循环底部调用Queue.task_done()自行计数,主线程简单Queue.join() 阻塞等待即可...

如何让线程不断循环呢?能不能帮忙写个demo出来?谢谢!

#9


文档里有关queue模块的范例看一看...

#10


#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)   
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
    os.mkdir(logdir)

def writelog(filename,message):
    file=logdir+'/'+filename
    f=open(file,"a")
    f.write(message)
    f.close

def maxdata():
    try:
        conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
        cur=conn.cursor()
        cur.execute("select max(userid) as maxid from roomfamily")
        data=cur.fetchone()
    finally:
        conn.close()
        return data[0]

maxiddata=maxdata()
maxid=0
minid=0

def serial(id):
    if minid < id:
       maxid=minid+9999
    if maxid > id:
       maxid=id
    return minid,maxid 

def queueuein(id,queue):
    global minid,maxid
    while True:
       (minid,maxid)=serial(id)
       queue.put((minid,maxid))
       if maxid >= maxiddata:
           break
       if maxid < maxiddata :
           minid=maxid+1

class runsql(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue=queue
    def run(self):
        while 1:
            if self.queue.empty():
                break
            conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
            threadname = threading.currentThread().getName()
            (mid,aid)=self.queue.get()
            cur=conn.cursor()
            cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
            print threadname+" affect rows:%d:%s~%s"%(cur.rowcount,mid,aid)
                #data=cur.fetchall();
                #for num in data:
                #   tablename="t_roomfamily_"+str(num[0]%200)
                #   prefixname=str(num[0]%200)
                #   writelog(tablename,num[1]+"\n")
            self.queue.task_done()
            conn.close()
def main():
    #新增加一个线程来产生数字队列
    queue=Queue.Queue()
    conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
    threading.Thread(target=queueuein,args=(maxiddata,queue,)).start() 
    #### 定义循环序列,就是一个线程池     
    threads = []
    #### 定义运行所使用的线程数 
    thread_lines =10 
    start_line=0
    for t in range(0,thread_lines):
        t=runsql(queue)
        threads.append(t)
    for t in threads:
        t.start() 
    for j in threads: 
        j.join()
    queue.join()
if __name__ == '__main__':
   main()

执行结果:
....以上省略几千行
Thread-6 affect rows:782:184880000~184880596
 Thread-2 affect rows:12630:184860000~184869999
Thread-10 affect rows:12635:184850000~184859999
Thread-3 affect rows:15161:184820000~184829999
Thread-11 affect rows:12865:184870000~184879999
程序卡这里不动了
最后改成这样了,基本实现了需求,不过最后退不出来,卡在最后了。

#11


#for j in threads: 
    #j.join()
queue.join()

按上面注释掉,不然线程是死循环不会自己退出,就会阻塞在那里,queue.join()即可,不过主线程是不阻塞了会结束,但是子线程还在跑,程序不会退出,所以前面起线程时加行t.daemon = True,这样主线程结束时,没有其他非deamon线程时,整个程序就能退出...

#12


引用 11 楼 angel_su 的回复:
#for j in threads: 
    #j.join()
queue.join()

按上面注释掉,不然线程是死循环不会自己退出,就会阻塞在那里,queue.join()即可,不过主线程是不阻塞了会结束,但是子线程还在跑,程序不会退出,所以前面起线程时加行t.daemon = True,这样主线程结束时,没有其他非deamon线程时,整个程序就能退出..……

谢谢!!

#1


python线程共有全局锁,基本不能实现多线程并行/并发执行

#2


1:que里多了个重复的任务,因为if判断在后。
2:que.put是线程安全不用加锁,而且跟线程任务无关,应该把代码搬到外头,serial函数应该按线程数的若干倍数进行id分割,你的例子预定线程数比任务数量还多,跑一次就完了哪还能继续循环复?
3:并发写入同一文件要上锁...

#3


虽然有gil限制,不过有可能长时间IO等待,用多线程编程还是有点用处..。

#4


python多线程是假滴, 别用python多线程喽.

#5


引用 4 楼 qq120848369 的回复:
python多线程是假滴, 别用python多线程喽.

那有撒提高效率的方法呢?

#6


#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)   
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
    os.mkdir(logdir)


def writelog(filename,message):
    file=logdir+'/'+filename
    f=open(file,"a")
    f.write(message)
    f.close

def maxdata():
    try:
        conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
        cur=conn.cursor()
        cur.execute("select max(userid) as maxid from roomfamily")
        data=cur.fetchone()
    finally:
        conn.close()
        return data[0]

maxiddata=maxdata()
que=Queue.Queue()
maxid=0
minid=0

def serial(id):
    if minid < id:
       maxid=minid+9999
    if maxid > id:
       maxid=id
    return minid,maxid 

def queuein(id):
    global minid,maxid
    while True:
       (minid,maxid)=serial(id)
       que.put((minid,maxid))
       print "queue:%d,%d"%(minid,maxid)
       if maxid >= maxiddata:
           break
       if maxid < maxiddata :
           minid=maxid+1

class runsql(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
            global mutex
            threadname = threading.currentThread().getName()
            conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
            (mid,aid)=que.get()
            print "qget:%d,%d"%(mid,aid)
            #cur=conn.cursor()
            #cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
def main():
    global mutex
    #新增加一个线程来产生数字队列
    threading.Thread(target=queuein(maxiddata),args=()).start() 
    #### 定义循环序列,就是一个线程池     
    threads = []
    #### 定义运行所使用的线程数 
    thread_lines =10 
    start_line=0
    for t in range(0,thread_lines):
        t=runsql()
        threads.append(t)
        start_line+=1
    for t in threads:
        t.start() 
    while True:
      for num_line in xrange(0,thread_lines): 
    #### 初始化当前线程的状态
         thread_status = False
    #### 初始化检查循环线程的开始值 
         loop_line = 0
    #### 开始循环检查线程池中的线程状态
         while thread_status == False :
    #### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
    #### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。 
    #### 如果线程池中线程全部在运行,则开始从头检查                
            if threads[loop_line].isAlive() == False:
                   threads[loop_line] = runsql() 
                   threads[loop_line].start() 
                   thread_status = True
            else:
                if loop_line >= thread_lines-1 :
                   loop_line=0
                else:
                   loop_line+=1
      if maxid >= maxiddata:
          break
    for number_line in xrange(start_line,thread_lines): 
        thread[number_line].exit() 

if __name__ == '__main__':
   main()
现在改了一下,改成用一个线程来产生数据队列,执行结果如下:
.......
上面省略了很多行
queue:184830000,184839999
queue:184840000,184849999
queue:184850000,184859999
queue:184860000,184869999
queue:184870000,184879999
queue:184880000,184880596
qget:0,9999
qget:10000,19999
qget:20000,29999
qget:30000,39999
qget:40000,49999
qget:50000,59999
qget:60000,69999
qget:70000,79999
qget:80000,89999
qget:90000,99999
qget:100000,109999
qget:110000,119999
qget:120000,129999
qget:130000,139999
qget:140000,149999
qget:150000,159999
qget:160000,169999
qget:170000,179999
qget:180000,189999
qget:190000,199999

现在问题是get出来的数据比put进去的数据少了很多,请问哪里出了问题?

#7


监测部分while-for-while代码块貌似很复杂呀,逻辑容易搞错吧,试试照之前让线程自己不断循环,循环底部调用Queue.task_done()自行计数,主线程简单Queue.join() 阻塞等待即可...

#8


引用 7 楼 angel_su 的回复:
监测部分while-for-while代码块貌似很复杂呀,逻辑容易搞错吧,试试照之前让线程自己不断循环,循环底部调用Queue.task_done()自行计数,主线程简单Queue.join() 阻塞等待即可...

如何让线程不断循环呢?能不能帮忙写个demo出来?谢谢!

#9


文档里有关queue模块的范例看一看...

#10


#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)   
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
    os.mkdir(logdir)

def writelog(filename,message):
    file=logdir+'/'+filename
    f=open(file,"a")
    f.write(message)
    f.close

def maxdata():
    try:
        conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
        cur=conn.cursor()
        cur.execute("select max(userid) as maxid from roomfamily")
        data=cur.fetchone()
    finally:
        conn.close()
        return data[0]

maxiddata=maxdata()
maxid=0
minid=0

def serial(id):
    if minid < id:
       maxid=minid+9999
    if maxid > id:
       maxid=id
    return minid,maxid 

def queueuein(id,queue):
    global minid,maxid
    while True:
       (minid,maxid)=serial(id)
       queue.put((minid,maxid))
       if maxid >= maxiddata:
           break
       if maxid < maxiddata :
           minid=maxid+1

class runsql(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue=queue
    def run(self):
        while 1:
            if self.queue.empty():
                break
            conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
            threadname = threading.currentThread().getName()
            (mid,aid)=self.queue.get()
            cur=conn.cursor()
            cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
            print threadname+" affect rows:%d:%s~%s"%(cur.rowcount,mid,aid)
                #data=cur.fetchall();
                #for num in data:
                #   tablename="t_roomfamily_"+str(num[0]%200)
                #   prefixname=str(num[0]%200)
                #   writelog(tablename,num[1]+"\n")
            self.queue.task_done()
            conn.close()
def main():
    #新增加一个线程来产生数字队列
    queue=Queue.Queue()
    conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
    threading.Thread(target=queueuein,args=(maxiddata,queue,)).start() 
    #### 定义循环序列,就是一个线程池     
    threads = []
    #### 定义运行所使用的线程数 
    thread_lines =10 
    start_line=0
    for t in range(0,thread_lines):
        t=runsql(queue)
        threads.append(t)
    for t in threads:
        t.start() 
    for j in threads: 
        j.join()
    queue.join()
if __name__ == '__main__':
   main()

执行结果:
....以上省略几千行
Thread-6 affect rows:782:184880000~184880596
 Thread-2 affect rows:12630:184860000~184869999
Thread-10 affect rows:12635:184850000~184859999
Thread-3 affect rows:15161:184820000~184829999
Thread-11 affect rows:12865:184870000~184879999
程序卡这里不动了
最后改成这样了,基本实现了需求,不过最后退不出来,卡在最后了。

#11


#for j in threads: 
    #j.join()
queue.join()

按上面注释掉,不然线程是死循环不会自己退出,就会阻塞在那里,queue.join()即可,不过主线程是不阻塞了会结束,但是子线程还在跑,程序不会退出,所以前面起线程时加行t.daemon = True,这样主线程结束时,没有其他非deamon线程时,整个程序就能退出...

#12


引用 11 楼 angel_su 的回复:
#for j in threads: 
    #j.join()
queue.join()

按上面注释掉,不然线程是死循环不会自己退出,就会阻塞在那里,queue.join()即可,不过主线程是不阻塞了会结束,但是子线程还在跑,程序不会退出,所以前面起线程时加行t.daemon = True,这样主线程结束时,没有其他非deamon线程时,整个程序就能退出..……

谢谢!!