
在使用多线程的应用下,如何保证线程安全,以及线程之间的同步,或者访问共享变量等问题是十分棘手的问题,也是使用多线程下面临的问题,如果处理不好,会带来较严重的后果,使用python多线程中提供Lock Rlock Semaphore Event Condition 用来保证线程之间的同步,后者保证访问共享变量的互斥问题
Lock & RLock:互斥锁 用来保证多线程访问共享变量的问题
Semaphore对象:Lock互斥锁的加强版,可以被多个线程同时拥有,而Lock只能被某一个线程同时拥有。
Event对象: 它是线程间通信的方式,相当于信号,一个线程可以给另外一个线程发送信号后让其执行操作。
Condition对象:其可以在某些事件触发或者达到特定的条件后才处理数据
一、线程同步锁(互斥锁)
概念
线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完 成对数据的操作。
锁(Lock):一旦线程获得锁,其他试图获取锁的线程将被阻塞等待。
锁:凡是存在共享支援争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。
锁的应用场景
锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。
使用锁的注意事项:
1、少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行。 举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道 上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须 加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
2、加锁时间越短越好,不需要就立即释放锁。
3、一定要避免死锁。
线程锁与GIL(全局解释器锁)
需要注意的点:
1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来
2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高
Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?
首先我们需要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据,
然后,我们可以得出结论:保护不同的数据就应该加不同的锁。
最后,问题就很明朗了,GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock。
过程分析:所有线程抢的是GIL锁,或者说所有线程抢的是执行权限
线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock。。。这就导致了串行运行的效果
既然是串行,那我们执行
t1.start()
t1.join
t2.start()
t2.join()
这也是串行执行啊,为何还要加Lock呢,需知join是等待t1所有的代码执行完,相当于锁住了t1的所有代码,而Lock只是锁住一部分操作共享数据的代码。
深层原因
因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题, 这可以说是Python早期版本的遗留问题。
例子
加锁前
import time
from threading import Lock,Thread
# Lock 互斥锁加锁前
def func(lock):
global n
temp = n
time.sleep(0.2)
n = temp - 1
if __name__ == '__main__':
n = 10
t_lst = []
lock = Lock()
for i in range(10):
t = Thread(target=func, args=(lock,))
t.start()
t_lst.append(t)
for t in t_lst: t.join()
print(n) #结果很大可能为9
分析:多个线程同时得到n的值为10,然后同时执行n-1即10-1得到的最后的值为9.
加锁后
import time
from threading import Lock,Thread
# Lock 互斥锁
def func(lock):
global n
lock.acquire()
temp = n
time.sleep(0.2)
n = temp - 1
lock.release()
if __name__ == '__main__':
n = 10
t_lst = []
lock = Lock()
for i in range(10):
t = Thread(target=func, args=(lock,))
t.start()
t_lst.append(t)
for t in t_lst: t.join()
print(n) #结果为0
# 当多个线程都修改某一个共享数据的时候,需要进行同步控制。
#
# 线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。
# 互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,
# 此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,
# 其他的线程才能再次锁定该资源。
# 互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
二、线程死锁和递归锁
死锁:所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。在一个线程中,我们在部分代码中使用了锁lock.acquire()还没有释放锁时,调用了另一个方法,方法中又使用了锁,这样如果使用同一把普通的锁很容易出现死锁。
例子
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('\033[41m%s 拿到A锁\033[0m' %self.name)
mutexB.acquire()
print('\033[42m%s 拿到B锁\033[0m' %self.name)
mutexB.release()
mutexA.release()
def func2(self):
mutexB.acquire()
print('\033[43m%s 拿到B锁\033[0m' %self.name)
time.sleep(2)
mutexA.acquire()
print('\033[44m%s 拿到A锁\033[0m' %self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t=MyThread()
t.start()
'''
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
然后就卡住,死锁了
'''
解决办法:
递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了“可重入锁(递归锁)”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
面的例子如果使用RLock代替Lock,则不会发生死锁:
mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
例子
from threading import Thread,RLock,Lock
import time
# mutexA=Lock()
# mutexB=Lock()
mutexA = mutexB=RLock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire() #锁的计数 count = 1
print('\033[41m%s 拿到A锁\033[0m' %self.name)
mutexB.acquire() #锁的计数 count = 1+1 =2
print('\033[42m%s 拿到B锁\033[0m' %self.name)
mutexB.release() #锁的计数 count = 2-1= 1
mutexA.release() #锁的计数 count = 1-1= 0
def func2(self):
mutexB.acquire() #锁的计数 count = 1
print('\033[43m%s 拿到B锁\033[0m' %self.name)
time.sleep(2)
mutexA.acquire() #锁的计数 count = 1+1 =2
print('\033[44m%s 拿到A锁\033[0m' %self.name)
mutexA.release() #锁的计数 count = 2-1= 1
mutexB.release() #锁的计数 count = 1-1= 0
if __name__ == '__main__':
for i in range(10):
t=MyThread()
t.start()
三、信号量Semaphore
和Lock很像,信号量对象内部维护一个倒计数器,每当调用acquire()时内置计数器-1,调用release() 时内置计数器+1。当acquire方法发现计数为0就阻塞请求 的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程。
例子
from threading import Thread,Semaphore
import time
def func(sem,i):
sem.acquire()
print("被执行了一次,获取一个信号量 _value={}".format(sem._value))
print(i)
time.sleep(1)
print("释放一个信号量")
sem.release()
sem = Semaphore(5)
for i in range(20):
Thread(target=func,args=(sem,i)).start()
例子2
import threading, time
class myThread(threading.Thread):
def run(self): # 启动后,执行run方法
if semaphore.acquire(): # 加把锁,可以放进去多个(相当于5把锁,5个钥匙,同时有5个线程)
print(self.name)
time.sleep(5)
semaphore.release()
if __name__ == "__main__":
semaphore = threading.Semaphore(5) # 同时能有几个线程进去(设置为5就是一次5个线程进去),类似于停车厂一次能停几辆车
thrs = [] # 空列表
for i in range(100): # 100个线程
thrs.append(myThread()) # 加线程对象
for t in thrs:
t.start() # 分别启动
输出结果
Thread-1
Thread-2
Thread-3
Thread-4
Thread-5 #5个线程同时出来
Thread-8
Thread-6
Thread-9
Thread-7
Thread-10 #每隔3秒再打印5个出来
部分省略.......
与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程
参考资料
[1]https://blog.csdn.net/u013008795/article/details/91357383