上面一个随笔已经简单介绍了多线程,比如下面在举个简单的例子:
1 #!/usr/bin/env python 2 #-*-coding:utf-8 -*- 3 4 import threading 5 import time 6 7 def add(): 8 sum = 0 9 10 for i in range(1000000): 11 sum += i 12 13 print("sum: ",sum) 14 15 16 def mul(): 17 sum2 = 1 18 for i in range(1,100000): 19 sum2 *= i 20 21 print("sum2: ",sum2) 22 23 24 if __name__ == '__main__': 25 26 start = time.time() 27 t1 = threading.Thread(target=add) 28 t2 = threading.Thread(target=mul) 29 30 l = [] 31 l.append(t1) 32 l.append(t2) 33 34 for t in l: 35 t.start() 36 37 for t in l: 38 t.join() 39 40 print("cost time %s"%(time.time()-start))
但是当多线程处理共享数据等问题时,简单的多线程会遇到一些困难,比如:
1 import threading 2 import time 3 4 def sub(): 5 6 global num 7 8 # num -= 1 9 temp = num # 一下三行与上一行意思一样,只是加了一个时间 10 time.sleep(0.0001) 11 num = temp-1 12 ''' 13 这样结果就不像原来的0了,原因是,原来线程1过来减1后,num少一,没问题; 14 现在加了.sleep(),当线程1到temp = num 时,因为到等待,所以切换给下一个线程,线程2拿到的还是100, 15 等第一线程到num = temp -1 = 99 时,下一个线程的temp还是100。 16 一个线程cpu还没处理结束,交出处理权,但是它们共享一个数据,所以可能出错。 17 解决办法,执行上面三行的过程中,CPU不可以切换,就OK了,因此同步锁诞生。详细见lesson3 18 ''' 19 20 num = 100 21 22 l = [] 23 24 for i in range(100): # 开了一百个线程 25 t = threading.Thread(target=sub) 26 t.start() 27 l.append(t) 28 29 for j in l: 30 j.join()
因此对于这种情况需要用同步锁,lock = threading.Lock() 定义锁, lock.acquire()上锁,lock.release()开锁。锁之间内容等于变成串行。使用如下:
1 import threading 2 import time 3 4 def sub(): 5 6 global num 7 8 # num -= 1 9 lock.acquire() # 加上同步锁,意思是,在释放锁前的内容不可以进行CPU线程切换 10 11 temp = num # 一下三行与上一行意思一样,只是加了一个时间 12 time.sleep(0.001) 13 num = temp-1 14 15 lock.release() 16 17 num = 100 18 19 l = [] 20 lock = threading.Lock() 21 22 for i in range(100): # 开了一百个线程 23 t = threading.Thread(target=sub) 24 t.start() 25 l.append(t) 26 27 for j in l: 28 j.join() 29 30 print(num)
以上说明了为什么需要同步锁,以及它的使用方法,但是在多线程中还有死锁的问题。死锁:在线程见共享拿多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方资源就会造成死锁。比如:下面例子,线程1拿actionB中的B锁后,sleep()时间切换线程,线程2拿了actionA中A锁后,在拿B锁时,发现没有B锁(还在线程1手里),造成死锁。
1 import threading,time 2 3 4 # 用继承类创建多线程 5 6 class MyThread(threading.Thread): 7 8 def actionA(self): 9 A.acquire() 10 print(self.name,"gotA",time.ctime()) # self.name是线程名 11 time.sleep(2) 12 # 注意A锁还没释放,又上了一把锁 13 B.acquire() 14 print(self.name,"gotB",time.ctime()) 15 time.sleep(1) 16 17 B.release() 18 A.release() 19 20 def actionB(self): 21 B.acquire() 22 print(self.name, "gotB", time.ctime()) 23 time.sleep(2) 24 # 注意A锁还没释放,又上了一把锁 25 A.acquire() 26 print(self.name, "gotB", time.ctime()) 27 time.sleep(1) 28 29 A.release() 30 B.release() 31 32 def run(self): # run函数名字必须这样,就跟之前handle一个意思 33 self.actionA() 34 time.sleep(2) 35 self.actionB() 36 37 38 if __name__ == '__main__': 39 40 # 定义两把锁,注意是两把,不是两种,只要两个,你用A的同时,它就不可以 41 A = threading.Lock() 42 B = threading.Lock() 43 44 L = [] 45 46 for i in range(5): # 开了五个线程 47 t = MyThread() 48 t.start() 49 L.append(t) 50 51 for t in L: 52 t.join()
使用递归锁可以解决问题,threading.Rlock(),内部就是多一个计数,acquire一次,内部计数加一个1,release减1,只有计数等于0时,其他线程才可以拿这把锁,不然等待。自己线程可以多次用这个锁。具体如下:
1 import threading,time 2 3 4 # 用继承类创建多线程 5 6 class MyThread(threading.Thread): 7 8 def actionA(self): 9 r_lock.acquire() 10 print(self.name,"gotA",time.ctime()) # self.name是线程名 11 time.sleep(2) 12 # 注意A锁还没释放,又上了一把锁 13 r_lock.acquire() 14 print(self.name,"gotB",time.ctime()) 15 time.sleep(1) 16 17 r_lock.release() 18 r_lock.release() 19 20 def actionB(self): 21 r_lock.acquire() 22 print(self.name, "gotB", time.ctime()) 23 time.sleep(2) 24 # 注意A锁还没释放,又上了一把锁 25 r_lock.acquire() 26 print(self.name, "gotB", time.ctime()) 27 time.sleep(1) 28 29 r_lock.release() 30 r_lock.release() 31 32 def run(self): # run函数名字必须这样,就跟之前handle一个意思 33 self.actionA() 34 self.actionB() 35 36 37 if __name__ == '__main__': 38 39 # 定义两把锁,注意是两把,不是两种,只要两个,你用A的同时,它就不可以 40 # A = threading.Lock() 41 # B = threading.Lock() 42 43 r_lock = threading.RLock() 44 45 L = [] 46 47 for i in range(5): # 开了五个线程 48 t = MyThread() 49 t.start() 50 L.append(t) 51 52 for t in L: 53 t.join() 54 55 print("ending.........")
有了同步锁(threading.Lock),递归锁(threading.Rlock),下面介绍threading.Event。说这个之前,先说一下同步对象的概念:当你同时开五个线程执行任务的时候,到底哪个任务先执行,是由抢占CPU决定的,不可控。但是有时候需求需要其中一个特定的线程任务完成到一定阶段,其他线程才可以开始。这时候就涉及到同步对象的概念。
创建同步对象:event = threading.Event()
event.set() 设定
event.wait() 如果之前没设定,到这步就阻塞等待;如果已经设定,这步等于pass(跳过)
event.clear() 清除设定
1 import threading,time 2 class Boss(threading.Thread): 3 4 def run(self): 5 print("BOSS:今晚大家都要加班到22:00。") 6 print(event.isSet())# False 7 event.set() 8 time.sleep(5) 9 print("BOSS:<22:00>可以下班了。") 10 print(event.isSet()) 11 event.set() 12 13 class Worker(threading.Thread): 14 def run(self): 15 16 event.wait()# 一旦event被设定,等同于pass 17 18 print("Worker:哎……命苦啊!") 19 time.sleep(1) 20 event.clear() 21 event.wait() 22 print("Worker:OhYeah!") 23 24 25 if __name__=="__main__": 26 event=threading.Event() 27 28 29 threads=[] 30 for i in range(5): 31 threads.append(Worker()) 32 threads.append(Boss()) 33 for t in threads: # 本来这6个线程随先执行是随意的(如果没有Event),但是需求肯定是先是老板说加班,员工叫苦,老板说下班,员工欢呼 34 t.start() 35 for t in threads: 36 t.join() 37 38 print("ending.....")
实际应用中可能还有一个信号量的事,也就是允许同时运行的最大线程数目。
1 import threading,time 2 3 4 class myThread(threading.Thread): 5 def run(self): 6 7 if semaphore.acquire(): 8 print(self.name) 9 time.sleep(3) 10 semaphore.release() 11 12 if __name__=="__main__": 13 semaphore=threading.Semaphore(5) 14 # semaphore也是一把锁,里面数字代表对于一个加锁任务允许执行的最大线程数 15 thrs=[] 16 for i in range(100): 17 thrs.append(myThread()) 18 for t in thrs: 19 t.start()