并发编程之线程与锁

时间:2021-07-18 18:00:53

  线程是操作系统调度的最小单位,随着多核处理器的流行多线程编程成为能有效发挥cpu潜能的利器(python除外)。线程之间的内存是共享,所以创建一个线程所需要的开销比创建一个进程的开销要小得多。抛开硬件层面,多线程也给我们提供同时执行多个任务的手段(并发编程),使得我们能有另一种编程思维。python的线程都封装在threading模块中。

 

threading

  • threading.active_count(): 返回当前活跃的线程数,返回数等于由模块中另一个方法 enumerate()返回的列表的长度。
  • threading.current_thread(): 返回当前的线程对象,当调用者的控制线程还没有通过threading模块创建时,一个有限制功能的dummy thread(我也不大明白)将被返回。
  • threading.get_ident(): 返回当前的识别号,一个非零整数。它的值没有直接的意义。它的值可能被循环利用,当一个线程被销毁另一个线程开启时
  • threading.enumerate(): 返回一个有所有活跃线程的列表,包括后台线程,dummy thread 和主线程。它不包含结束的和还位开始的线程。
  • threading.main_thread(): 返回主线程对象,通常情况下,这是有解释器一开始就创建的。
  • threading.settrace(func): 对所有线程设置一个跟踪函数(可用于调试)。对每个线程而言,这个函数将被传递给sys.settrace()。
  • threading.setprofile(func): 对所有线程设置一个轮廓函数(可用于调试)。对每个线程而言,这个函数将被传递给sys.setprofile()。
  • threading.TIMEOUT_MAX: 设置线程挂起(Lock.acquire(), RLock.acquire(), Condition.wait(), etc.)的超时时间。

Thread 对象

  线程对象的创建可通过两种方式:一种是传递一个可调用对象(函数);一种是通过继承Thread类,重写run方法(注意,除了构造函数被覆盖,其它的函数最好不要重写)

 

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

参数:

  • group: 线程组,暂时没用,留作以后做扩展。
  • target: 需要执行的函数。
  • name: 线程名,默认是"Thread-N"的格式。
  • args,kwargs: 调用函数执行所需要的参数。
  • daemon: 设置为后台线程,应该在调用start()方法前设置,否则报错。

方法和属性:

  • start():启动线程,重复启动相同的线程会引发RuntimeError异常。
  • run():线程执行的方法,子类应该覆盖这个方法。
  • join(timeout=None): 主线程等待,直到这个线程结束,可以加timeout超时时间,应该在线程开始后调用此方法。
  • name: 线程名。
  • getname: 得到线程名。
  • setname: 设置线程名。
  • ident:线程的id,如果线程还没开始返回一个None.
  • is_alive():返回线程是否处于活跃状态。
  • daemon:标志此线程是否为后台线程,True(后台线程)。
  • isDaemon(),setDaemon():获得、配置daemon的方法
import threading
import time

class A(threading.Thread):     # 用继承Thread类的方法实现线程
    def __init__(self, *args, **kwargs):
        super().__init__()
        print('线程 %s 创建了 状态(alive):%s id:%s'%(self.name, self.is_alive(), self.ident))
        
        
    def run(self):
        print("线程 %s 开始执行"%self.name)
        print('线程 %s 执行中 状态(alive):%s id:%s'%(self.name, self.is_alive(), self.ident))
        time.sleep(5)
        print("线程%s结束了"%self.name)

执行:

a = A()
a.start()    # 启动线程

print(a.isDaemon())   # 线程结束后
print(a.is_alive())

输出:

线程 Thread-4 创建了 状态(alive):False id:None

线程 Thread-4 开始执行
线程 Thread-4 执行中 状态(alive):True id:139913777370880

False
True
线程Thread-4结束了

执行:

t = []        # 所有线程都放在这个列表中
for _ in range(3):
    t.append(A())
for i in t:     # 开启所有线程
    i.start()
for i in t:     # 等待所有线程结束
    i.join()
print("所有任务完成")

输出为:

线程 Thread-6 开始执行线程 Thread-5 开始执行线程 Thread-7 开始执行线程 Thread-5 创建了 状态(alive):False id:None
线程 Thread-6 执行中 状态(alive):True id:139913768978176线程 Thread-5 执行中 状态(alive):True id:139913777370880线程 Thread-7 执行中 状态(alive):True id:139913760585472

线程 Thread-6 创建了 状态(alive):False id:None
线程 Thread-7 创建了 状态(alive):False id:None
线程Thread-6结束了线程Thread-7结束了

线程Thread-5结束了
所有任务完成

 

线程的锁

  线程之间执行的程序是互不干涉的(异步的),但他们的数据(内存)是共享的。当一个线程读取了一个变量的值并开始做一系列的运算最后把结果重新赋给此变量,在此期间另一个线程刚好也要对这个做处理。这样就会造成数据混乱。换句话说,我们现在需要在操作某个数据时,不允许其它线程对这个数据做操作,也就意味着这是一个原子操作。所以我们需要一种工具来使线程同步,能使线程在操作某个数据时能串行化,这就是锁。多线程编程最棘手的问题在于数据混乱。不幸的是,解决现实问题逻辑是复杂的,当维护的锁多了,难免会出现问题。所以,是否能用好锁能体现一个程序员的并发编程能力,就如同C语言的指针一样。

 

class threading.Lock

方法:

  • acquire(blocking=True, timeout=-1): 获得此锁。当参数acquire设置为True时,会阻塞直到此锁被其它线程释放。这时就又把此锁锁住,并返回True;当参数acquire设置为False时,不会阻塞,并直接返回一个False。参数timeout接收个浮点类型的数字,表示最多可以阻塞等待的时间,超过这个时间的话就会返回一个False.当它为-1时,表示无限期等待阻塞。
  • release(): 释放这个锁。释放一个没有锁住的锁会报错。无返回值。

注意: release()这个方法不是仅仅只有执行过acquire方法的线程才能调用,其它线程都可以调用。在实际使用锁时,推荐使用with语句,一是代码看上去优雅,有层次感;二是避免忘记调用release()方法,造成死锁。

继续上面的代码执行:

t = 0
threads_list = []
def fun():
    global t
    a = t
    time.sleep(0.5)
    t = a + 1
for _ in range(100):
    threads_list.append(threading.Thread(target=fun))
for i in threads_list:
    i.start()
    
for i in threads_list:
    i.join()

print("结果为:%s"%t)

输出为:

结果为:1

执行:

t = 0           # 一个全局变量,多个线程可对其更改
threads_list = []
lock = threading.Lock()

def fun():
    global t
    with lock:
        a = t
        time.sleep(0.5)
        t = a + 1

for _ in range(100):
    threads_list.append(threading.Thread(target=fun))
    
for i in threads_list:
    i.start()

for i in threads_list:
    i.join()

print("结果为:%s"%t)

输出为:

结果为:100

 

class threading.RLock

  可重入锁,它唯一的特点就是当一个Rlock被一个线程acquire后,还可以继续被这个线程使用acquire而无需等待。

方法:

  • acquire(blocking=True, timeout=-1): 方法和Lock一样使用。不同的是,每当线程(同一个线程)调用acquire后,里面的一个递归等级会自加1。如果要释放该锁,那么必须调用相同次数的release()方法,里面的递归等级自减1,不然其它线程aquire该锁时会被阻塞。
  • release(): 使用如上所述。

执行:

NUM = 0                     # 维护一个全局变量
rlock = threading.RLock()
class A(threading.Thread):
    
    def fun_1(self):
        global NUM
        NUM += 1
        time.sleep(0.5)
        print("线程%s执行了方法fun_1, NUM为:%s"%(self.name, NUM))
        
    def fun_2(self):
        global NUM
        NUM += 1
        time.sleep(0.5)
        print("线程%s执行了方法fun_2, NUM为:%s"%(self.name, NUM))
        
    def run(self):
        with rlock:      # 可重入锁可以在相同的线程里嵌套
            self.fun_1()
            with rlock:
                self.fun_2()
                
t_list = []
for i in range(3):
    t_list.append(A())
for i in t_list:
    i.start()

输出为:

线程Thread-7执行了方法fun_1, NUM为:1
线程Thread-7执行了方法fun_2, NUM为:2
线程Thread-8执行了方法fun_1, NUM为:3
线程Thread-8执行了方法fun_2, NUM为:4
线程Thread-9执行了方法fun_1, NUM为:5
线程Thread-9执行了方法fun_2, NUM为:6
 

为什么要有RLock?

  当初我看到可重入锁时,我就傻眼了。当一个线程获得该锁后,在锁还还没释放前,它可以对数据做任何操作,其它线程根本不影响。完全没有必要在获得该锁之后再获得该锁。即使这种做法可以,但这符合实际使用的需求吗?这个问题我想了很久,终于想到了一种使用场景。如下所示:

credit = 1000

class AcountHandle(threading.Thread):
    rlock = threading.RLock()          # 所有实例都用同一把锁
    
    def withdraw(self, amount):        # 取钱
        global credit
        with self.rlock:
            a = credit
            time.sleep(0.5)
            credit = a - amount
            print("取款%s,余额:%s"%(amount, credit))
            
    def save(self, amount):            # 存钱
        global credit
        with self.rlock:
            a = credit
            time.sleep(0.5)
            credit = a + amount
            print("存款%s,余额:%s"%(amount, credit))
            
    def withdraw_fee(self, amount):     # 跨行取款,多收2块钱手续费
        global credit                   # 把取钱和收手续费做成了一个原子操作
        with self.rlock:
            time.sleep(0.5)
            self.withdraw(amount)
            credit = credit - 2
            print("收取手续费%s,余额:%s"%(2, credit))



thread_1 = threading.Thread(target=AcountHandle().withdraw, args=(100,))
thread_2 = threading.Thread(target=AcountHandle().save, args=(200,))
thread_3 = threading.Thread(target=AcountHandle().withdraw_fee, args=(50,))

for i in [thread_3, thread_2, thread_1]:    # 3个不同的实例分别在3个线程里同时
    i.start()                               # 执行3种不同的操作

输出为:

取款50,余额:950
收取手续费2,余额:948
存款200,余额:1148
取款100,余额:1048

  在上面的代码中,类AcountHandle有两种基本的方法:存钱和取钱。在执行它们时,肯定会用到锁,这没问题。但这时,我的业务开始拓展了,增加了跨行取钱同时收取手续费业务。在这项业务中我调用了它的取钱方法,同时也对账户余额进行了操作。毫无疑问,这两个动作都必须用到锁,取钱方法里面已经自带使用了锁,所以只需在扣除手续费时加上锁。请注意上面的代码,如果我把RLock换成Lock,withdraw_fee方法中在对rlock执行with语句后,再执行withdraw方法,在其中aquire该锁时,就会出现等待。而这个线程先前已经对该锁aquire了,自己把自己给锁住了,就出现了死锁。

  当然可能出有这么一个想法,我可以在withdraw_fee方法中,不使用锁的嵌套,取钱和扣手续费这两个动作分别加锁而且处在同一级。当然,这么做可以实现我要实现的动作。但会出现个问题,在取钱动作完成后,我马上有另一个人也在操作取钱,而且他马上争到了锁并完成了动作。对于先前取钱的人取钱时又可以扣除手续费了,当这个人看到扣完手续费的余额时就傻眼了(无缘无故被多扣除了一笔钱)。所以在收手续费和扣钱两个动作必须做成一个原子操作。请注意,上述我提到的锁都是同一把锁,多个实例都通用同一把锁。

 

未完,待续~~~~~~~