Python的并发并行[1] -> 线程[2] -> 锁与信号量

时间:2022-12-07 17:57:21

锁与信号量


目录

  1. 添加线程锁
  2. 锁的本质
  3. 互斥锁与可重入锁
  4. 死锁的产生
  5. 锁的上下文管理
  6. 信号量与有界信号量

 

添加线程锁

由于多线程对资源的抢占顺序不同,可能会产生冲突,通过添加线程锁来对共有资源进行控制。

 1 import atexit
 2 from random import randrange
 3 from threading import Thread, Lock, current_thread # or currentThread
 4 from time import ctime, sleep
 5 
 6 
 7 # Use set to record the running thread
 8 # If we print set directly, it will shows set([a, b, c])
 9 # So we reload the __str__ function to show it more clean(shows a, b, c as we want)
10 class CleanOutputSet(set):
11     def __str__(self):
12         return ', '.join(x for x in self)
13     
14 # randrange(2, 5) will generate a num in range(2, 5)
15 # for x in range(randrange(3, 7)) will do "for" loop 3-7 times
16 # Below code generate a list which contains 3-6 numbers in range 2-4
17 # If not list(), it will return a generator, and it will be null after one time iteration
18 loops = list((randrange(2, 5) for x in range(randrange(3, 7))))
19 
20 remaining = CleanOutputSet()
21 lock = Lock()
22 
23 def loop_without_lock(nsec):
24     myname = current_thread().name
25     remaining.add(myname)
26     print('[%s] Start %s' % (ctime(), myname))
27     sleep(nsec)
28     remaining.remove(myname)
29     print('[%s] Completed %s (%d secs)' % (ctime(), myname, nsec))
30     # Note: remaining or 'NONE', will return 'NONE' if remaining is Null
31     # Null including: None, False, (), [], {}, 0
32     print('    (remaining: %s)' % (remaining or 'NONE'))
33 
34 def loop_with_lock(nsec):
35     myname = current_thread().name
36     # When we need to modifiy public resource, acquire lock to block other threads
37     # Lock acquire and release can use 'with lock' to simplify
38     lock.acquire()
39     remaining.add(myname)
40     print('[%s] Start %s' % (ctime(), myname))
41     # After using resource, release lock for other threads to use
42     lock.release()
43     sleep(nsec)
44 
45     lock.acquire()
46     remaining.remove(myname)
47     print('[%s] Completed %s (%d secs)' % (ctime(), myname, nsec))
48     print('    (remaining: %s)' % (remaining or 'NONE'))
49     lock.release()
50     
51 def _main():
52     print('-----Below threads without lock-----')
53     threads = []
54     for pause in loops:
55         threads.append(Thread(target=loop_without_lock, args=(pause, )))
56     for t in threads:
57         t.start()
58     for t in threads:
59         t.join()
60     print('-----Below threads with lock-----')
61     threads = []
62     for pause in loops:
63         threads.append(Thread(target=loop_with_lock, args=(pause, )))
64     for t in threads:
65         t.start()
66     for t in threads:
67         t.join()
68 
69 # This is an exit function, when script exit, this function will be called
70 # You can use atexit.register(_atexit) to replace @atexit.register
71 # The function name '_atexit' can be change to others
72 @atexit.register
73 def _atexit():
74     print('All DONE at:', ctime())
75 
76 if __name__ == '__main__':
77     _main()

第 1-4 行,首先导入需要的模块,atexit用于设置退出脚本时的处理函数,random用于产生随机数来增加线程的不确定性。

第 7- 12 行,定义一个新的集合类,用于输出当前运行线程的集合,新的集合类CleanOutputSet重载了__str__方法,使集合的显示由set([a, b, c])变为我们想要的a, b, c形式。

 第 14-21行,利用随机数模块,产生一个生成器,包含3-6个大小在2-4之间的随机数,为了后续重复使用,此处对生成器进行list操作,否则生成器在迭代一次之后将变为空,无法复用。同时对全局锁和集合输出类进行实例化。

第 23-32 行,定义一个不加锁的线程函数,该函数会在进入时向集合添加线程名,sleep相应时间后,移除线程名,同时显示集合(共有资源)内剩余的线程名。

第 34-49 行,定义一个加锁的线程函数,该函数会在进入时获取线程锁,之后再向集合添加线程名,添加完成后释放线程锁,sleep相应时间后,获取线程锁,移除线程名,同时显示集合(共有资源)内剩余的线程名,最后释放线程锁。

第 51-67 行,主函数中分别对加锁和不加锁的两种线程方式进行调用,并利用join()方法挂起线程以区分开两种方式的运行。

第 69-77 行,利用atexit.register函数/@register装饰器定义脚本退出函数。

最后输出结果

Python的并发并行[1] -> 线程[2] -> 锁与信号量Python的并发并行[1] -> 线程[2] -> 锁与信号量
-----Below threads without lock-----  
[Tue Aug  1 11:01:57 2017] Start Thread-1  
[Tue Aug  1 11:01:57 2017] Start Thread-2[Tue Aug  1 11:01:57 2017] Start Thread-3  
[Tue Aug  1 11:01:57 2017] Start Thread-4  
  
[Tue Aug  1 11:01:59 2017] Completed Thread-3 (2 secs)  
    (remaining: Thread-1, Thread-4, Thread-2)  
[Tue Aug  1 11:02:00 2017] Completed Thread-1 (3 secs)  
    (remaining: Thread-4, Thread-2)  
[Tue Aug  1 11:02:01 2017] Completed Thread-2 (4 secs)[Tue Aug  1 11:02:01 2017] Completed Thread-4 (4 secs)  
  
    (remaining: NONE)    (remaining: NONE)  
  
-----Below threads with lock-----  
[Tue Aug  1 11:02:01 2017] Start Thread-5  
[Tue Aug  1 11:02:01 2017] Start Thread-6  
[Tue Aug  1 11:02:01 2017] Start Thread-7  
[Tue Aug  1 11:02:01 2017] Start Thread-8  
[Tue Aug  1 11:02:03 2017] Completed Thread-7 (2 secs)  
    (remaining: Thread-8, Thread-6, Thread-5)  
[Tue Aug  1 11:02:04 2017] Completed Thread-5 (3 secs)  
    (remaining: Thread-8, Thread-6)  
[Tue Aug  1 11:02:05 2017] Completed Thread-6 (4 secs)  
    (remaining: Thread-8)  
[Tue Aug  1 11:02:05 2017] Completed Thread-8 (4 secs)  
    (remaining: NONE) 
View Code

从输出的结果中可以看到,不加线程锁时,可能出现print的错乱以及同时输出两个剩余线程为NONE的结果,而加了线程锁之后,线程对资源的使用则按照顺序进行。

  

锁的本质

为验证锁的本质作用并非通过锁定资源实现,利用代码创建两个线程,其中一个正常线程会遵循约定,在获取锁之后再对共有资源进行修改,而另一个异常线程则不会尝试获取锁,且在没有锁的权限基础上,直接对共有资源进行修改。

 1 from threading import Thread, Lock
 2 import time
 3 lock = Lock()
 4 COUNT = 0
 5 
 6 def gentleCounter():
 7     name = 'gentleCounter'
 8     lock.acquire()
 9     print('%s acquired the lock' % name)
10     global COUNT
11     COUNT += 1
12     print('%s made a count plus, now the COUNT is %d' % (name, COUNT))
13     print('%s is taking a rest...' % name)
14     time.sleep(3)
15     COUNT += 1
16     print('%s made a count plus again, now the COUNT is %d' % (name, COUNT))
17     lock.release()
18     print('%s released the lock' % name)
19 
20 def wildCounter():
21     time.sleep(1)
22     name = 'wildCounter'
23     print('%s didn\'t acquire the lock' % name)
24     global COUNT
25     COUNT += 1
26     print('%s made a count plus, now the COUNT is %d' % (name, COUNT))
27 
28 Thread(target=gentleCounter).start()
29 Thread(target=wildCounter).start()

第 1-4 行,导入线程和锁两个类,并定义一个锁对应的全局变量COUNT,赋予初值。

第 6-18 行,定义一个正常计数方法,该方法遵循规则,会在获取锁后对COUNT进行+1操作,随后休眠3秒再次进行COUNT+1操作,在这期间不会对锁释放,两次加值之后,正常计数方法会释放锁。

第 20-26 行,定义一个异常计数方法,该方法不会对锁进行获取,它会在进入时等待1秒,当正常计数方法的线程进入休眠状态时,不获取锁权限直接对COUNT进行+1操作。

第 28-29 行,将两种方法以两个线程分别启动。

最后得到结果

gentleCounter acquired the lock  
gentleCounter made a count plus, now the COUNT is 1  
gentleCounter is taking a rest...  
wildCounter didn't acquire the lock  
wildCounter made a count plus, now the COUNT is 2  
gentleCounter made a count plus again, now the COUNT is 3  
gentleCounter released the lock  

从输出的结果中可以看到,wildCounter在gentleCounter休眠且锁未释放的期间,成功修改了共有资源,影响了gentleCounter的下一次计数。因此锁在物理上并未真正的对资源进行锁死,而是线程之间相互约定的一种修改权限。

 

互斥锁与可重入锁

Python threading模块有两类锁,互斥锁(threading.Lock )和可重用锁(threading.RLock)。两者的用法基本相同。但互斥锁只能被获取一次,若多次获取则会产生阻塞,需等待原锁释放后才能再次入锁。而可重入锁则可被本线程多次acquire入锁,但是要求入锁次数与释放次数相同,才能完全解锁,且锁的释放需要在同一个线程中进行。

 1 from threading import Lock, RLock
 2 
 3 def call():
 4     print('This is call() function')
 5     with lock:
 6         g()
 7         h()
 8 
 9 def g():
10     if not lock.acquire(True, 1):
11         print('g() acquires lock failed')
12     else:
13         print('This is g() function')
14         lock.release()
15     h()
16 
17 def h():
18     if not lock.acquire(True, 1):
19         print('h() acquires lock failed')
20     else:
21         print('This is h() function')
22         lock.release()
23 
24 print('\n-------Using Lock-------')
25 lock = Lock()
26 call()
27 print('\n-------Using RLock-------')
28 lock = RLock()
29 call()

第 3-22 行,定义call函数,在call函数中会利用with lock进行一次入锁,入锁后调用g()和h(),其中g和h函数的功能都是尝试对锁进行获取,获取失败或超时则输出失败语句,成功则显示函数调用并释放锁,而在g中会对h函数再进行一次调用。

第 24-29 行,运行程序,分别使用互斥锁和可重入锁进行试验。

输出结果

-------Using Lock-------  
This is call() function  
g() acquires lock failed  
h() acquires lock failed  
h() acquires lock failed  
  
-------Using RLock-------  
This is call() function  
This is g() function  
This is h() function  
This is h() function  

最终程序输出中可以看出,对于互斥锁,在call函数获取锁之后,所有的函数都无法再次入锁,而对于可重入锁,其余函数可以再次入锁并且入锁与释放成对即可。

Note: 值得注意的是,对于互斥锁,当一个线程获取锁之后,可以在另一个线程中释放锁,而对于可重入锁,则必须在同一个线程中对锁的获取进行释放 

 

4 死锁的产生

死锁出现在一个资源被多次调用,而调用方均未能释放资源,便会造成死锁现象。死锁大致可分为两种形式出现,迭代死锁和相互调用死锁。

4.1 迭代死锁

迭代死锁一般出现在同一个线程中对锁的多次获取,例如函数迭代嵌套,最终导致无法释放的现象。其本质依旧是acquire的阻塞,本线程进行了acquire时,其他线程或本线程再次acquire则会产生线程的阻塞等待,若是其他线程的acquire则可以等待至release,而若是本线程的二次acquire,则阻塞后无法调用release进行释放,因此产生了死锁现象。

 1 from threading import Thread, Lock, RLock, current_thread
 2 
 3 
 4 mutex = Lock()
 5 reentrant = RLock()
 6 class MyThread(Thread):
 7     def __init__(self, lock):
 8         Thread.__init__(self)
 9         self.lock = lock
10 
11     def run(self):
12         self.name = current_thread().name
13         print('-------This is %s-------' % self.name)
14         if self.lock.acquire():
15             print('%s get lock one' % self.name, '\nTrying to get second lock')
16             self.lock.acquire()
17             print('Got second lock')
18             print('Trying to release lock...')
19             self.lock.release()
20             print('First lock released')
21             self.lock.release()
22             print('Second lock released')
23             print('Lock all released')
24         print('--------Exit %s---------' %
25               self.name)
26 
27 t = MyThread(reentrant)
28 t.start()
29 t.join()
30 t = MyThread(mutex)
31 t.start()
32 t.join()

在上面代码的run函数中,首先对传入的lock进行获取,在未释放的前提下,再次获取锁,若此时的锁为互斥锁,则会造成死锁现象。

运行的到结果

-------This is Thread-1-------  
Thread-1 get lock one   
Trying to get second lock  
Got second lock  
Trying to release lock...  
First lock released  
Second lock released  
Lock all released  
--------Exit Thread-1---------  
-------This is Thread-2-------  
Thread-2 get lock one   
Trying to get second lock  

从结果中可以看到,可重入锁完成了两次入锁与释放,而互斥锁则造成了死锁阻塞

4.2 互调死锁

互调死锁是在不同的线程中互相对对方已占有的资源进行了acquire,即两个线程均占有部分资源后,需要互相获取对方已占有资源锁时,都陷入阻塞等待中,从而造成了互调死锁的现象。

可以假设,有两把资源锁,一把A锁,一把B锁,当线程1执行顺序为获取A锁,获取B锁,释放B锁,释放A锁,线程2为获取A,获取B,释放B,释放A。当启动两个线程时,线程1获取到A锁,线程2获取到B锁,此时,线程1需要获取B锁,线程2需要获取A锁,而两个锁均被对方锁定,没有被释放,最终造成了互调死锁的现象。

 1 from threading import Thread, Lock, RLock, current_thread
 2 import time
 3 
 4 mutex_1 = Lock()
 5 mutex_2 = Lock()
 6 reentrant_1 = RLock()
 7 reentrant_2 = RLock()
 8 COUNT = 0
 9 class MyThread(Thread):
10     def __init__(self, lock_1, lock_2):
11         Thread.__init__(self)
12         self.lock_1 = lock_1
13         self.lock_2 = lock_2
14 
15     def run(self):
16         self.name = current_thread().name
17         if self.lock_1.acquire():
18             print('%s got its first lock' % self.name)
19             global COUNT
20             COUNT += 1
21             print('%s make COUNT plus one, now COUNT is %d' % (self.name, COUNT))
22             time.sleep(2)
23             print('%s trying to get another one...' % self.name)
24             if self.lock_2.acquire():
25                 print('%s got its second lock' % self.name)
26                 self.lock_2.release()
27                 print('%s release its second lock' % self.name)
28             self.lock_1.release()
29             print('%s release its first lock' % self.name)
30 
31 threads = [MyThread(mutex_1, mutex_2), MyThread(mutex_2, mutex_1)]
32 #threads = [MyThread(reentrant_1, reentrant_2), MyThread(reentrant_2, reentrant_1)]
33 for t in threads:
34     t.start()
35 for t in threads:
36     t.join()

第 8-29 行,在导入必须模块后,派生了一个线程子类,这个线程子类要求先后传入两把锁的实例,在run函数中,会对先传入的锁进行获取,获取之后将COUNT+1并休眠等待(以保证另一个线程有足够的时间获取它的第一把锁),随后尝试获取传入的第二把锁,获取成功后依次释放前面的两把锁。

第 31-36 行,此处对两个线程进行实例化,两个线程的区别在于,传入锁的顺序不同,因此线程1会先获取锁1,再获取锁2,而线程2则相反。传入的锁类型可选择互斥锁或可重入锁。

运行程序得到结果

Thread-1 got its first lock  
Thread-1 make COUNT plus one, now COUNT is 1  
Thread-1 trying to get another one...  
Thread-2 got its first lock  
Thread-2 make COUNT plus one, now COUNT is 2  
Thread-2 trying to get another one...  

查看程序运行结果可以发现,两个线程都在尝试获取对方手里的锁时,产生了阻塞,造成了互调死锁的现象。

Note: 值得注意的是,此处即便换成可重入锁,也不能解决互调死锁的问题,因为可重入锁仅对本线程支持可重入,对于其他线程依旧互斥。

 

5 锁的上下文管理

对于锁来说,同样支持上下文管理器with,下面使用with进行锁的管理

 1 import atexit
 2 from random import randrange
 3 import threading
 4 from threading import Thread, Lock
 5 from time import ctime, sleep
 6 
 7 class CleanOutputSet(set):
 8     def __str__(self):
 9         return ', '.join(x for x in self)
10 
11 # Use generator is much better than list
12 # loops = (randrange(1, 7) for x in range(2, 8))
13 loops = list(randrange(1, 7) for x in range(2, 8))
14 remaining = CleanOutputSet()
15 lock = Lock()
16 
17 def loop(nsec):
18     myname = threading.current_thread().name
19     with lock:
20         remaining.add(myname)
21         print('[{0}] Start {1}'.format(ctime(), myname))
22     # loops is generator, after 'for' iteration, remains []
23     if len(remaining) == len(loops):
24         func_for_trial()
25     sleep(nsec)
26     with lock:
27         remaining.remove(myname)
28         print('[{0}] Completed {1} ({2} secs)'.format(ctime(), myname, nsec))
29         print('    (remaining: {0})'.format(remaining or 'NONE'))
30 
31 def func_for_trial():
32     count = threading.active_count()
33     active_thread = threading.enumerate()
34     print('There are %d active threads, \n%s' % (count, str(active_thread).replace(', ', '\n')))
35 
36 def _main():
37     threads = []
38     for pause in loops:
39         threads.append(Thread(target=loop, args=(pause, )))
40     for t in threads:
41         t.start()
42     for t in threads:
43         t.join()
44 
45 @atexit.register
46 def _atexit():
47     print('All DONE at:', ctime())
48 
49 if __name__ == '__main__':
50     _main()

 

信号量与有界信号量

对于信号量,是最古老的同步原语之一,是一个计数器,在资源消耗时递减,在资源释放时递增,可以认为信号量代表着资源的可用与否。信号量比锁更加灵活,可以处理当有多个线程,每个线程拥有有限资源的实例。

下面的例子模拟了一个有限资源的糖果机,使用信号量对资源进行跟踪。

 1 from atexit import register
 2 from random import randrange
 3 from threading import Semaphore, BoundedSemaphore, Lock, Thread
 4 from time import sleep, ctime
 5 import threading
 6 
 7 """
 8 # This Obj reload the __len__ method to return current number of semaphore
 9 class MySemaphore(BoundedSemaphore):
10     def __len__(self):
11         return self._value
12 candy = MySemaphore(5)
13 print(len(candy))
14 """
15 lock = Lock()
16 MAX = 5
17 
18 def refill():
19     lock.acquire()
20     print('Refilling candy...')
21     try:
22         candyTray.release()
23     except ValueError:
24         print('Full, skipping')
25     else:
26         print('OK, current candy num is %d' % candyTray._value)
27     lock.release()
28 
29 def buy():
30     lock.acquire()
31     print('Buying candy...')
32     if candyTray.acquire(False):
33         print('OK, current candy num is %d' % candyTray._value)
34     else:
35         print('Empty, skipping')
36     lock.release()
37 
38 def producer(loops):
39     for i in range(loops):
40         refill()
41         sleep(randrange(3))
42 
43 def consumer(loops):
44     for i in range(loops):
45         buy()
46         sleep(randrange(3))
47 
48 def _main():
49     print('Starting at', ctime())
50     nloops = randrange(2, 6)
51     print('THE CANDY MACHINE (full with %d bars)!' % MAX)
52     # Buyer Thread
53     buyer = Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2), ))
54     buyer.start()
55     # Vendor Thread
56     vendor = Thread(target=producer, args=(nloops, ))
57     vendor.start()
58     for t in [buyer, vendor]:
59         t.join()
60 
61 @register
62 def _atexit():
63     print('All DONE at:', ctime())
64 
65 if __name__ == '__main__':
66     print('-------BoundedSemaphore-------')
67     candyTray = BoundedSemaphore(MAX)
68     _main()
69     print('-------Semaphore------')
70     candyTray = Semaphore(MAX)
71     _main()

第 7-14 行,在导入必须模块之后,派生的类重定义了一个__len__方法,调用len()方法后,会返回当前信号量的可用资源数。这样的方式使得查看资源的方式变得更加清晰。

第 15-16 行,初始化锁,设置初始信号量参数,用于模拟糖果机初始糖果数量,当使用有界信号量时,糖果数量不允许超过初始值,即糖果最多为5个。

第 18-27 行,定义一个refill函数,这个函数会在调用时进行加锁,然后释放一个信号量+1(模拟糖果机补充糖果),并在信号量将要超过初始值时捕获异常。

第 29-36 行,定义一个buy函数,这个函数会在调用时进行加锁,然后获取一个信号量-1(模拟用户购买一个糖果),并在信号量为0时处理阻塞。

第 38-46 行,定义生产者与购买者函数,分别以随机间隔时间执行糖果购买与糖果补充。

第 48-63 行,定义主函数,在主函数中启动购买者与糖果机线程,通过join挂起线程等待其余线程结束再退出主函数。

运行结果

Python的并发并行[1] -> 线程[2] -> 锁与信号量Python的并发并行[1] -> 线程[2] -> 锁与信号量
-------BoundedSemaphore-------  
Starting at Tue Aug  1 16:53:32 2017  
THE CANDY MACHINE (full with 5 bars)!  
Buying candy...  
OK, current candy num is 4  
Refilling candy...  
OK, current candy num is 5  
Refilling candy...  
Full, skipping  
Refilling candy...  
Full, skipping  
Buying candy...  
OK, current candy num is 4  
Buying candy...  
OK, current candy num is 3  
Refilling candy...  
OK, current candy num is 4  
Buying candy...  
OK, current candy num is 3  
Refilling candy...  
OK, current candy num is 4  
Buying candy...  
OK, current candy num is 3  
-------Semaphore------  
Starting at Tue Aug  1 16:53:38 2017  
THE CANDY MACHINE (full with 5 bars)!  
Buying candy...  
OK, current candy num is 4  
Refilling candy...  
OK, current candy num is 5  
Buying candy...  
OK, current candy num is 4  
Refilling candy...  
OK, current candy num is 5  
Buying candy...  
OK, current candy num is 4  
Refilling candy...  
OK, current candy num is 5  
Refilling candy...  
OK, current candy num is 6  
Buying candy...  
OK, current candy num is 5  
Buying candy...  
OK, current candy num is 4  
Buying candy...  
OK, current candy num is 3  
Buying candy...  
OK, current candy num is 2  
Buying candy...  
OK, current candy num is 1  
View Code

查看最终输出结果可以看到,对于有界信号量模拟的糖果机来说,当信号量(糖果)大于初始值的时候,会抛出一个异常,此处改为输出糖果机已满。而对于非有界信号量,信号量(糖果)可以超过初始值,且不设上限。

 

相关阅读


1. 基本概念

2. threading 模块

3. 多线程的建立

 

参考链接


《Python 核心编程 第3版》

https://*.com/questions/22885775/what-is-the-difference-between-lock-and-rlock

http://www.jb51.net/article/74426.htm