#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
os.mkdir(logdir)
def writelog(filename,message):
file=logdir+'/'+filename
f=open(file,"a")
f.write(message)
f.close
def maxdata():
try:
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select max(userid) as maxid from roomfamily")
data=cur.fetchone()
finally:
conn.close()
return data[0]
maxiddata=maxdata()
minid=0
maxid=0
que=Queue.Queue()
def serial(id):
global minid,maxid
if minid < id:
maxid=minid+9999
if maxid > id:
maxid=id
return minid,maxid
class runsql(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global minid,maxid,mutex
threadname = threading.currentThread().getName()
mutex.acquire()
#对数据加锁
if maxid >= maxiddata:
time.sleep(0.1)
(minid,maxid)=serial(maxiddata)
que.put((minid,maxid))
if maxid < maxiddata :
minid=maxid+1
mutex.release()
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
if not que.empty():
(mid,aid)=que.get()
#conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
print threadname+" affect rows:%d:%s~%s"%(cur.rowcount,mid,aid)
data=cur.fetchall();
for num in data:
tablename="t_roomfamily_"+str(num[0]%200)
prefixname=str(num[0]%200)
writelog(tablename,num[1]+"\n")
conn.close()
def main():
global minid,maxid,mutex
#### 定义循环序列,就是一个线程池
threads = []
#### 定义运行所使用的线程数
thread_lines =10
start_line=0
mutex = threading.Lock()
for t in range(0,thread_lines):
t=runsql()
threads.append(t)
start_line+=1
for t in threads:
t.start()
while True:
for num_line in xrange(0,thread_lines):
#### 初始化当前线程的状态
thread_status = False
#### 初始化检查循环线程的开始值
loop_line = 0
#### 开始循环检查线程池中的线程状态
while thread_status == False :
#### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
#### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。
#### 如果线程池中线程全部在运行,则开始从头检查
if threads[loop_line].isAlive() == False:
threads[loop_line] = runsql()
threads[loop_line].start()
thread_status = True
else:
if loop_line >= thread_lines-1 :
loop_line=0
else:
loop_line+=1
if maxid >= maxiddata:
break
for number_line in xrange(start_line,thread_lines):
thread[number_line].exit()
if __name__ == '__main__':
main()
执行结果:
.........以上线程省略
Thread-18486 affect rows:12635:184850000~184859999
Thread-18487 affect rows:12630:184860000~184869999
Thread-18488 affect rows:12865:184870000~184879999
Thread-18489 affect rows:782:184880000~184880596
Thread-18490 affect rows:782:184880000~184880596
4个疑问:
1.最后2个线程读取的数据重复了,不知道BUG在哪里?
2.我愿意是想开10个线程,然后复用老线程的,从结果看出是不段的在新增加线程,没有达到我想到的目的,该如何优化?
3.多线程里读取数据库的数据然后写文件,如果多个线程同时刻在写同一个文件,会引起阻塞现象吗?现在发现此脚本执行起来效率不如任意。
4.以上代码还有其他该优化的地方,请大家提出来?
本人初学PYTHON,有很多疑问,望各位大侠指点迷津!小女子跪拜了,谢谢!
12 个解决方案
#1
python线程共有全局锁,基本不能实现多线程并行/并发执行
#2
1:que里多了个重复的任务,因为if判断在后。
2:que.put是线程安全不用加锁,而且跟线程任务无关,应该把代码搬到外头,serial函数应该按线程数的若干倍数进行id分割,你的例子预定线程数比任务数量还多,跑一次就完了哪还能继续循环复?
3:并发写入同一文件要上锁...
2:que.put是线程安全不用加锁,而且跟线程任务无关,应该把代码搬到外头,serial函数应该按线程数的若干倍数进行id分割,你的例子预定线程数比任务数量还多,跑一次就完了哪还能继续循环复?
3:并发写入同一文件要上锁...
#3
虽然有gil限制,不过有可能长时间IO等待,用多线程编程还是有点用处..。
#4
python多线程是假滴, 别用python多线程喽.
#5
那有撒提高效率的方法呢?
#6
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
os.mkdir(logdir)
def writelog(filename,message):
file=logdir+'/'+filename
f=open(file,"a")
f.write(message)
f.close
def maxdata():
try:
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select max(userid) as maxid from roomfamily")
data=cur.fetchone()
finally:
conn.close()
return data[0]
maxiddata=maxdata()
que=Queue.Queue()
maxid=0
minid=0
def serial(id):
if minid < id:
maxid=minid+9999
if maxid > id:
maxid=id
return minid,maxid
def queuein(id):
global minid,maxid
while True:
(minid,maxid)=serial(id)
que.put((minid,maxid))
print "queue:%d,%d"%(minid,maxid)
if maxid >= maxiddata:
break
if maxid < maxiddata :
minid=maxid+1
class runsql(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global mutex
threadname = threading.currentThread().getName()
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
(mid,aid)=que.get()
print "qget:%d,%d"%(mid,aid)
#cur=conn.cursor()
#cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
def main():
global mutex
#新增加一个线程来产生数字队列
threading.Thread(target=queuein(maxiddata),args=()).start()
#### 定义循环序列,就是一个线程池
threads = []
#### 定义运行所使用的线程数
thread_lines =10
start_line=0
for t in range(0,thread_lines):
t=runsql()
threads.append(t)
start_line+=1
for t in threads:
t.start()
while True:
for num_line in xrange(0,thread_lines):
#### 初始化当前线程的状态
thread_status = False
#### 初始化检查循环线程的开始值
loop_line = 0
#### 开始循环检查线程池中的线程状态
while thread_status == False :
#### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
#### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。
#### 如果线程池中线程全部在运行,则开始从头检查
if threads[loop_line].isAlive() == False:
threads[loop_line] = runsql()
threads[loop_line].start()
thread_status = True
else:
if loop_line >= thread_lines-1 :
loop_line=0
else:
loop_line+=1
if maxid >= maxiddata:
break
for number_line in xrange(start_line,thread_lines):
thread[number_line].exit()
if __name__ == '__main__':
main()
现在改了一下,改成用一个线程来产生数据队列,执行结果如下:
.......
上面省略了很多行
queue:184830000,184839999
queue:184840000,184849999
queue:184850000,184859999
queue:184860000,184869999
queue:184870000,184879999
queue:184880000,184880596
qget:0,9999
qget:10000,19999
qget:20000,29999
qget:30000,39999
qget:40000,49999
qget:50000,59999
qget:60000,69999
qget:70000,79999
qget:80000,89999
qget:90000,99999
qget:100000,109999
qget:110000,119999
qget:120000,129999
qget:130000,139999
qget:140000,149999
qget:150000,159999
qget:160000,169999
qget:170000,179999
qget:180000,189999
qget:190000,199999
现在问题是get出来的数据比put进去的数据少了很多,请问哪里出了问题?
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
os.mkdir(logdir)
def writelog(filename,message):
file=logdir+'/'+filename
f=open(file,"a")
f.write(message)
f.close
def maxdata():
try:
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select max(userid) as maxid from roomfamily")
data=cur.fetchone()
finally:
conn.close()
return data[0]
maxiddata=maxdata()
que=Queue.Queue()
maxid=0
minid=0
def serial(id):
if minid < id:
maxid=minid+9999
if maxid > id:
maxid=id
return minid,maxid
def queuein(id):
global minid,maxid
while True:
(minid,maxid)=serial(id)
que.put((minid,maxid))
print "queue:%d,%d"%(minid,maxid)
if maxid >= maxiddata:
break
if maxid < maxiddata :
minid=maxid+1
class runsql(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global mutex
threadname = threading.currentThread().getName()
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
(mid,aid)=que.get()
print "qget:%d,%d"%(mid,aid)
#cur=conn.cursor()
#cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
def main():
global mutex
#新增加一个线程来产生数字队列
threading.Thread(target=queuein(maxiddata),args=()).start()
#### 定义循环序列,就是一个线程池
threads = []
#### 定义运行所使用的线程数
thread_lines =10
start_line=0
for t in range(0,thread_lines):
t=runsql()
threads.append(t)
start_line+=1
for t in threads:
t.start()
while True:
for num_line in xrange(0,thread_lines):
#### 初始化当前线程的状态
thread_status = False
#### 初始化检查循环线程的开始值
loop_line = 0
#### 开始循环检查线程池中的线程状态
while thread_status == False :
#### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
#### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。
#### 如果线程池中线程全部在运行,则开始从头检查
if threads[loop_line].isAlive() == False:
threads[loop_line] = runsql()
threads[loop_line].start()
thread_status = True
else:
if loop_line >= thread_lines-1 :
loop_line=0
else:
loop_line+=1
if maxid >= maxiddata:
break
for number_line in xrange(start_line,thread_lines):
thread[number_line].exit()
if __name__ == '__main__':
main()
现在改了一下,改成用一个线程来产生数据队列,执行结果如下:
.......
上面省略了很多行
queue:184830000,184839999
queue:184840000,184849999
queue:184850000,184859999
queue:184860000,184869999
queue:184870000,184879999
queue:184880000,184880596
qget:0,9999
qget:10000,19999
qget:20000,29999
qget:30000,39999
qget:40000,49999
qget:50000,59999
qget:60000,69999
qget:70000,79999
qget:80000,89999
qget:90000,99999
qget:100000,109999
qget:110000,119999
qget:120000,129999
qget:130000,139999
qget:140000,149999
qget:150000,159999
qget:160000,169999
qget:170000,179999
qget:180000,189999
qget:190000,199999
现在问题是get出来的数据比put进去的数据少了很多,请问哪里出了问题?
#7
监测部分while-for-while代码块貌似很复杂呀,逻辑容易搞错吧,试试照之前让线程自己不断循环,循环底部调用Queue.task_done()自行计数,主线程简单Queue.join() 阻塞等待即可...
#8
如何让线程不断循环呢?能不能帮忙写个demo出来?谢谢!
#9
文档里有关queue模块的范例看一看...
#10
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
os.mkdir(logdir)
def writelog(filename,message):
file=logdir+'/'+filename
f=open(file,"a")
f.write(message)
f.close
def maxdata():
try:
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select max(userid) as maxid from roomfamily")
data=cur.fetchone()
finally:
conn.close()
return data[0]
maxiddata=maxdata()
maxid=0
minid=0
def serial(id):
if minid < id:
maxid=minid+9999
if maxid > id:
maxid=id
return minid,maxid
def queueuein(id,queue):
global minid,maxid
while True:
(minid,maxid)=serial(id)
queue.put((minid,maxid))
if maxid >= maxiddata:
break
if maxid < maxiddata :
minid=maxid+1
class runsql(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.queue=queue
def run(self):
while 1:
if self.queue.empty():
break
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
threadname = threading.currentThread().getName()
(mid,aid)=self.queue.get()
cur=conn.cursor()
cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
print threadname+" affect rows:%d:%s~%s"%(cur.rowcount,mid,aid)
#data=cur.fetchall();
#for num in data:
# tablename="t_roomfamily_"+str(num[0]%200)
# prefixname=str(num[0]%200)
# writelog(tablename,num[1]+"\n")
self.queue.task_done()
conn.close()
def main():
#新增加一个线程来产生数字队列
queue=Queue.Queue()
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
threading.Thread(target=queueuein,args=(maxiddata,queue,)).start()
#### 定义循环序列,就是一个线程池
threads = []
#### 定义运行所使用的线程数
thread_lines =10
start_line=0
for t in range(0,thread_lines):
t=runsql(queue)
threads.append(t)
for t in threads:
t.start()
for j in threads:
j.join()
queue.join()
if __name__ == '__main__':
main()
执行结果:
....以上省略几千行
Thread-6 affect rows:782:184880000~184880596
Thread-2 affect rows:12630:184860000~184869999
Thread-10 affect rows:12635:184850000~184859999
Thread-3 affect rows:15161:184820000~184829999
Thread-11 affect rows:12865:184870000~184879999
程序卡这里不动了
最后改成这样了,基本实现了需求,不过最后退不出来,卡在最后了。
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
os.mkdir(logdir)
def writelog(filename,message):
file=logdir+'/'+filename
f=open(file,"a")
f.write(message)
f.close
def maxdata():
try:
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select max(userid) as maxid from roomfamily")
data=cur.fetchone()
finally:
conn.close()
return data[0]
maxiddata=maxdata()
maxid=0
minid=0
def serial(id):
if minid < id:
maxid=minid+9999
if maxid > id:
maxid=id
return minid,maxid
def queueuein(id,queue):
global minid,maxid
while True:
(minid,maxid)=serial(id)
queue.put((minid,maxid))
if maxid >= maxiddata:
break
if maxid < maxiddata :
minid=maxid+1
class runsql(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.queue=queue
def run(self):
while 1:
if self.queue.empty():
break
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
threadname = threading.currentThread().getName()
(mid,aid)=self.queue.get()
cur=conn.cursor()
cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
print threadname+" affect rows:%d:%s~%s"%(cur.rowcount,mid,aid)
#data=cur.fetchall();
#for num in data:
# tablename="t_roomfamily_"+str(num[0]%200)
# prefixname=str(num[0]%200)
# writelog(tablename,num[1]+"\n")
self.queue.task_done()
conn.close()
def main():
#新增加一个线程来产生数字队列
queue=Queue.Queue()
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
threading.Thread(target=queueuein,args=(maxiddata,queue,)).start()
#### 定义循环序列,就是一个线程池
threads = []
#### 定义运行所使用的线程数
thread_lines =10
start_line=0
for t in range(0,thread_lines):
t=runsql(queue)
threads.append(t)
for t in threads:
t.start()
for j in threads:
j.join()
queue.join()
if __name__ == '__main__':
main()
执行结果:
....以上省略几千行
Thread-6 affect rows:782:184880000~184880596
Thread-2 affect rows:12630:184860000~184869999
Thread-10 affect rows:12635:184850000~184859999
Thread-3 affect rows:15161:184820000~184829999
Thread-11 affect rows:12865:184870000~184879999
程序卡这里不动了
最后改成这样了,基本实现了需求,不过最后退不出来,卡在最后了。
#11
#for j in threads:
#j.join()
queue.join()
按上面注释掉,不然线程是死循环不会自己退出,就会阻塞在那里,queue.join()即可,不过主线程是不阻塞了会结束,但是子线程还在跑,程序不会退出,所以前面起线程时加行t.daemon = True,这样主线程结束时,没有其他非deamon线程时,整个程序就能退出...
#j.join()
queue.join()
按上面注释掉,不然线程是死循环不会自己退出,就会阻塞在那里,queue.join()即可,不过主线程是不阻塞了会结束,但是子线程还在跑,程序不会退出,所以前面起线程时加行t.daemon = True,这样主线程结束时,没有其他非deamon线程时,整个程序就能退出...
#12
谢谢!!
#1
python线程共有全局锁,基本不能实现多线程并行/并发执行
#2
1:que里多了个重复的任务,因为if判断在后。
2:que.put是线程安全不用加锁,而且跟线程任务无关,应该把代码搬到外头,serial函数应该按线程数的若干倍数进行id分割,你的例子预定线程数比任务数量还多,跑一次就完了哪还能继续循环复?
3:并发写入同一文件要上锁...
2:que.put是线程安全不用加锁,而且跟线程任务无关,应该把代码搬到外头,serial函数应该按线程数的若干倍数进行id分割,你的例子预定线程数比任务数量还多,跑一次就完了哪还能继续循环复?
3:并发写入同一文件要上锁...
#3
虽然有gil限制,不过有可能长时间IO等待,用多线程编程还是有点用处..。
#4
python多线程是假滴, 别用python多线程喽.
#5
那有撒提高效率的方法呢?
#6
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
os.mkdir(logdir)
def writelog(filename,message):
file=logdir+'/'+filename
f=open(file,"a")
f.write(message)
f.close
def maxdata():
try:
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select max(userid) as maxid from roomfamily")
data=cur.fetchone()
finally:
conn.close()
return data[0]
maxiddata=maxdata()
que=Queue.Queue()
maxid=0
minid=0
def serial(id):
if minid < id:
maxid=minid+9999
if maxid > id:
maxid=id
return minid,maxid
def queuein(id):
global minid,maxid
while True:
(minid,maxid)=serial(id)
que.put((minid,maxid))
print "queue:%d,%d"%(minid,maxid)
if maxid >= maxiddata:
break
if maxid < maxiddata :
minid=maxid+1
class runsql(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global mutex
threadname = threading.currentThread().getName()
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
(mid,aid)=que.get()
print "qget:%d,%d"%(mid,aid)
#cur=conn.cursor()
#cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
def main():
global mutex
#新增加一个线程来产生数字队列
threading.Thread(target=queuein(maxiddata),args=()).start()
#### 定义循环序列,就是一个线程池
threads = []
#### 定义运行所使用的线程数
thread_lines =10
start_line=0
for t in range(0,thread_lines):
t=runsql()
threads.append(t)
start_line+=1
for t in threads:
t.start()
while True:
for num_line in xrange(0,thread_lines):
#### 初始化当前线程的状态
thread_status = False
#### 初始化检查循环线程的开始值
loop_line = 0
#### 开始循环检查线程池中的线程状态
while thread_status == False :
#### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
#### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。
#### 如果线程池中线程全部在运行,则开始从头检查
if threads[loop_line].isAlive() == False:
threads[loop_line] = runsql()
threads[loop_line].start()
thread_status = True
else:
if loop_line >= thread_lines-1 :
loop_line=0
else:
loop_line+=1
if maxid >= maxiddata:
break
for number_line in xrange(start_line,thread_lines):
thread[number_line].exit()
if __name__ == '__main__':
main()
现在改了一下,改成用一个线程来产生数据队列,执行结果如下:
.......
上面省略了很多行
queue:184830000,184839999
queue:184840000,184849999
queue:184850000,184859999
queue:184860000,184869999
queue:184870000,184879999
queue:184880000,184880596
qget:0,9999
qget:10000,19999
qget:20000,29999
qget:30000,39999
qget:40000,49999
qget:50000,59999
qget:60000,69999
qget:70000,79999
qget:80000,89999
qget:90000,99999
qget:100000,109999
qget:110000,119999
qget:120000,129999
qget:130000,139999
qget:140000,149999
qget:150000,159999
qget:160000,169999
qget:170000,179999
qget:180000,189999
qget:190000,199999
现在问题是get出来的数据比put进去的数据少了很多,请问哪里出了问题?
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
os.mkdir(logdir)
def writelog(filename,message):
file=logdir+'/'+filename
f=open(file,"a")
f.write(message)
f.close
def maxdata():
try:
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select max(userid) as maxid from roomfamily")
data=cur.fetchone()
finally:
conn.close()
return data[0]
maxiddata=maxdata()
que=Queue.Queue()
maxid=0
minid=0
def serial(id):
if minid < id:
maxid=minid+9999
if maxid > id:
maxid=id
return minid,maxid
def queuein(id):
global minid,maxid
while True:
(minid,maxid)=serial(id)
que.put((minid,maxid))
print "queue:%d,%d"%(minid,maxid)
if maxid >= maxiddata:
break
if maxid < maxiddata :
minid=maxid+1
class runsql(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global mutex
threadname = threading.currentThread().getName()
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
(mid,aid)=que.get()
print "qget:%d,%d"%(mid,aid)
#cur=conn.cursor()
#cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
def main():
global mutex
#新增加一个线程来产生数字队列
threading.Thread(target=queuein(maxiddata),args=()).start()
#### 定义循环序列,就是一个线程池
threads = []
#### 定义运行所使用的线程数
thread_lines =10
start_line=0
for t in range(0,thread_lines):
t=runsql()
threads.append(t)
start_line+=1
for t in threads:
t.start()
while True:
for num_line in xrange(0,thread_lines):
#### 初始化当前线程的状态
thread_status = False
#### 初始化检查循环线程的开始值
loop_line = 0
#### 开始循环检查线程池中的线程状态
while thread_status == False :
#### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
#### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。
#### 如果线程池中线程全部在运行,则开始从头检查
if threads[loop_line].isAlive() == False:
threads[loop_line] = runsql()
threads[loop_line].start()
thread_status = True
else:
if loop_line >= thread_lines-1 :
loop_line=0
else:
loop_line+=1
if maxid >= maxiddata:
break
for number_line in xrange(start_line,thread_lines):
thread[number_line].exit()
if __name__ == '__main__':
main()
现在改了一下,改成用一个线程来产生数据队列,执行结果如下:
.......
上面省略了很多行
queue:184830000,184839999
queue:184840000,184849999
queue:184850000,184859999
queue:184860000,184869999
queue:184870000,184879999
queue:184880000,184880596
qget:0,9999
qget:10000,19999
qget:20000,29999
qget:30000,39999
qget:40000,49999
qget:50000,59999
qget:60000,69999
qget:70000,79999
qget:80000,89999
qget:90000,99999
qget:100000,109999
qget:110000,119999
qget:120000,129999
qget:130000,139999
qget:140000,149999
qget:150000,159999
qget:160000,169999
qget:170000,179999
qget:180000,189999
qget:190000,199999
现在问题是get出来的数据比put进去的数据少了很多,请问哪里出了问题?
#7
监测部分while-for-while代码块貌似很复杂呀,逻辑容易搞错吧,试试照之前让线程自己不断循环,循环底部调用Queue.task_done()自行计数,主线程简单Queue.join() 阻塞等待即可...
#8
如何让线程不断循环呢?能不能帮忙写个demo出来?谢谢!
#9
文档里有关queue模块的范例看一看...
#10
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
os.mkdir(logdir)
def writelog(filename,message):
file=logdir+'/'+filename
f=open(file,"a")
f.write(message)
f.close
def maxdata():
try:
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select max(userid) as maxid from roomfamily")
data=cur.fetchone()
finally:
conn.close()
return data[0]
maxiddata=maxdata()
maxid=0
minid=0
def serial(id):
if minid < id:
maxid=minid+9999
if maxid > id:
maxid=id
return minid,maxid
def queueuein(id,queue):
global minid,maxid
while True:
(minid,maxid)=serial(id)
queue.put((minid,maxid))
if maxid >= maxiddata:
break
if maxid < maxiddata :
minid=maxid+1
class runsql(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.queue=queue
def run(self):
while 1:
if self.queue.empty():
break
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
threadname = threading.currentThread().getName()
(mid,aid)=self.queue.get()
cur=conn.cursor()
cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
print threadname+" affect rows:%d:%s~%s"%(cur.rowcount,mid,aid)
#data=cur.fetchall();
#for num in data:
# tablename="t_roomfamily_"+str(num[0]%200)
# prefixname=str(num[0]%200)
# writelog(tablename,num[1]+"\n")
self.queue.task_done()
conn.close()
def main():
#新增加一个线程来产生数字队列
queue=Queue.Queue()
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
threading.Thread(target=queueuein,args=(maxiddata,queue,)).start()
#### 定义循环序列,就是一个线程池
threads = []
#### 定义运行所使用的线程数
thread_lines =10
start_line=0
for t in range(0,thread_lines):
t=runsql(queue)
threads.append(t)
for t in threads:
t.start()
for j in threads:
j.join()
queue.join()
if __name__ == '__main__':
main()
执行结果:
....以上省略几千行
Thread-6 affect rows:782:184880000~184880596
Thread-2 affect rows:12630:184860000~184869999
Thread-10 affect rows:12635:184850000~184859999
Thread-3 affect rows:15161:184820000~184829999
Thread-11 affect rows:12865:184870000~184879999
程序卡这里不动了
最后改成这样了,基本实现了需求,不过最后退不出来,卡在最后了。
# -*- coding: UTF-8 -*-
import threading
import MySQLdb as mdb
import os,sys,math,time
import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
appdir=os.getcwd()
logdir=appdir+'/log'
if not os.path.exists(logdir):
os.mkdir(logdir)
def writelog(filename,message):
file=logdir+'/'+filename
f=open(file,"a")
f.write(message)
f.close
def maxdata():
try:
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
cur=conn.cursor()
cur.execute("select max(userid) as maxid from roomfamily")
data=cur.fetchone()
finally:
conn.close()
return data[0]
maxiddata=maxdata()
maxid=0
minid=0
def serial(id):
if minid < id:
maxid=minid+9999
if maxid > id:
maxid=id
return minid,maxid
def queueuein(id,queue):
global minid,maxid
while True:
(minid,maxid)=serial(id)
queue.put((minid,maxid))
if maxid >= maxiddata:
break
if maxid < maxiddata :
minid=maxid+1
class runsql(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.queue=queue
def run(self):
while 1:
if self.queue.empty():
break
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
threadname = threading.currentThread().getName()
(mid,aid)=self.queue.get()
cur=conn.cursor()
cur.execute("select UserID,concat_ws('|',UserID,FamilyUserID,UserName,FamilyUserName,RelationTypeID,SecrecyType,CreateDate,UserType,UserCode,BatchNum,ClassID) as UserID2 from roomfamily where UserID between %d and %d"%(mid,aid))
print threadname+" affect rows:%d:%s~%s"%(cur.rowcount,mid,aid)
#data=cur.fetchall();
#for num in data:
# tablename="t_roomfamily_"+str(num[0]%200)
# prefixname=str(num[0]%200)
# writelog(tablename,num[1]+"\n")
self.queue.task_done()
conn.close()
def main():
#新增加一个线程来产生数字队列
queue=Queue.Queue()
conn=mdb.connect(host='localhost',user='root',passwd='',db='roomfamily',charset='utf8',port=3306)
threading.Thread(target=queueuein,args=(maxiddata,queue,)).start()
#### 定义循环序列,就是一个线程池
threads = []
#### 定义运行所使用的线程数
thread_lines =10
start_line=0
for t in range(0,thread_lines):
t=runsql(queue)
threads.append(t)
for t in threads:
t.start()
for j in threads:
j.join()
queue.join()
if __name__ == '__main__':
main()
执行结果:
....以上省略几千行
Thread-6 affect rows:782:184880000~184880596
Thread-2 affect rows:12630:184860000~184869999
Thread-10 affect rows:12635:184850000~184859999
Thread-3 affect rows:15161:184820000~184829999
Thread-11 affect rows:12865:184870000~184879999
程序卡这里不动了
最后改成这样了,基本实现了需求,不过最后退不出来,卡在最后了。
#11
#for j in threads:
#j.join()
queue.join()
按上面注释掉,不然线程是死循环不会自己退出,就会阻塞在那里,queue.join()即可,不过主线程是不阻塞了会结束,但是子线程还在跑,程序不会退出,所以前面起线程时加行t.daemon = True,这样主线程结束时,没有其他非deamon线程时,整个程序就能退出...
#j.join()
queue.join()
按上面注释掉,不然线程是死循环不会自己退出,就会阻塞在那里,queue.join()即可,不过主线程是不阻塞了会结束,但是子线程还在跑,程序不会退出,所以前面起线程时加行t.daemon = True,这样主线程结束时,没有其他非deamon线程时,整个程序就能退出...
#12
谢谢!!