threading 线程是操作系统能够进行运算调度的最小单位。若干个线程组成一个进程,一个进程至少有一个线程。
Python的标准库提供了两个模块:_thread和threading,后者是对前者的高级封闭。绝大多数情况下我们只需要使用threading这个高级模块。
threading模块提供的类:
Thread,Event
threading模块的常用方法和属性:
threading.current_thread().name:返回当前线程的线程名。
threading.activeCount():返回活动的线程数量。
Thread类
有两种方法来使用thread类创建线程。
import threading,time #方法一:将要执行的方法作为参数传给Thread的构造方法 def run(arg): print('thread %s is running...'%arg) time.sleep(1) for i in range(5): t = threading.Thread(target=run,args=(i,)) t.start() print('thread %s is end!' %threading.current_thread().name)
import threading,time #方法二:从Thread继承,并重写run() class MyThread(threading.Thread): def __init__(self,arg): super(MyThread,self).__init__() self.arg = arg def run(self):#重写run(),定义要运行的函数 print('thread %s is running...'%self.arg) time.sleep(1) for i in range(5): t = MyThread(i) t.start() print('thread %s is end!' %threading.current_thread().name)
构造方法:
Thread(self, group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None):
target:要执行的方法,比如方法一中的run。此处不用加(),因为不执行方法只传方法名;
args/kwargs:方法的参数;
name:线程名 就是 threading.current_thread().name 的值
实例方法:
start():启动线程
join(timeout=None):阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数);
setDaemon(bool):设置为守护进程。注意:需要在start()之前设置。
设置为守护进程:主线程结束,守护进程自动结束,无论当时守护进程为何执行状态。
不为守护进程:主线程等待此线程结束后方可结束。
守护进程例子:
例一:默认情况下不设置为守护进程
import threading,time def run(arg): time.sleep(2) print('thread %s is running...%s'%(arg,time.ctime())) for i in range(5): t = threading.Thread(target=run,args=(i,)) # t.setDaemon(True) t.start() print('thread %s is end!' %threading.current_thread().name)
#可看到系统同时启动了5个子线程,等待全部执行结束后,才结束主线程。
例一:设置为守护进程
import threading,time def run(arg): time.sleep(2) print('thread %s is running...%s'%(arg,time.ctime())) for i in range(5): t = threading.Thread(target=run,args=(i,)) t.setDaemon(True) t.start() print('thread %s is end!' %threading.current_thread().name)
#可看到只打印出‘thread MainThread is end!’,子线程未执行完成,主结束已完成结束。
join()使用
import threading,time def run(arg): time.sleep(3) print('thread %s is running...%s'%(arg,time.ctime())) thread_list = [] for i in range(5): t = threading.Thread(target=run,args=(i,)) t.setDaemon(True) t.start() thread_list.append(t) #为了不阻塞后面的线程的启动,不在这里join,先放到一个列表里 for res in thread_list: #循环线程实例列表,等待所有线程执行完毕 res.join() print('thread %s is end!' %threading.current_thread().name)
线程间交换数据
方法一:通过一个全局变量
import threading def run(): global sum #申明使用全局变量sum(并非生成一个全局变量sum) sum += 1 sum = 0 thread_list = [] for i in range(50): t = threading.Thread(target=run,args=()) t.start() for res in thread_list: res.join() print('all threads has done...',threading.current_thread().name,threading.active_count()) print('sum:',sum)
以上代码块中,对全局变量并没有加上线程锁。在Python3中,一般不会出什么问题,但官方也并没有明确表示会默认在后台加上锁,所以还是推荐手动加上锁。但在Python2中,极有可能会出现计算的值比理想中的值小的情况。为什么会出现这种情况呢,详情请见下图:(图来自金角大王)
一个加锁后的代码:
import threading lock = threading.Lock() #创建一个锁实例 def run(): lock.acquire() #加锁 global sum sum += 1 lock.release() #解锁 sum = 0 thread_list = [] for i in range(50): t = threading.Thread(target=run,args=()) t.start() for res in thread_list: res.join() print('all threads has done...',threading.current_thread().name,threading.active_count()) print('sum:',sum)
值得注意的是,在很少的情况下,锁中套锁,多道锁的情况下一定要使用递归锁,可避免程序被自己锁死。方法很简单:
lock = threading.RLock() #生成一个递归锁实例,即可
方法二:队列
队列是一个容器,类似于列表。但与列表有明显的区别!从列表中读取数据后,列表中的数据不会消失。但从队列中读取数据,此数据会从原队列中被取走,不再存在于队列中。
作用:
-
- 解耦,使程序之间实现松耦合(一个模块修改了不影响另一个调用此模块的模块)
- 提高处理效率
队列类型:
先入先出 class queue.Queue(maxsize=0)
后入先出 class queue.LifoQueue(maxsize=0) #last in first out
自定义顺序 class queue.PriorityQueue()
三种队列举例:
import queue #先入先出队列 q = queue.Queue() #默认不限制队列大小,也可指定maxsize= q.put('d1') #向队列中放入数据 q.put('d2') q.put('d3') b = q.qsize() #大小要在这里先得到,如果放在while中,q.qsize会不断变小,取出一个小一个 i = 0 while i < b: print(q.get()) i += 1 #创建一个后进先出队列 q = queue.LifoQueue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) #创建一个优先级队列 q = queue.PriorityQueue() q.put((10,'lyj')) q.put((-1,'mxi')) q.put((3,'ww')) q.put((11,'aa')) print(q.get()) print(q.get()) print(q.get()) print(q.get())
信号量
与锁功能相同,只不过信号量规定了多把锁的存在,规定了最多同时执行的线程数。
作用:像数据库这类应用,规定只有100个客户端可同时连接执行操作,超过此量会引起性能衰减。就可以使用信号量来规定最大放行数量。
import threading,time def run(n): semaphore.acquire() #信号量加锁 time.sleep(1) print('run the thread:%s' %n) semaphore.release() #信号量解锁 if __name__ == '__main__': semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行 for i in range(22): t = threading.Thread(target=run,args=(i,)) t.start() while threading.active_count() != 1: pass else: print('----all threads done---') #注意:程序执行效果可能为5个同时结束,同时开始,实际上并非同时开始,只是总量为5个,先进先出执行。
Event类
Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。
构造方法:
Event()
实例方法:
set():将标志位设置为True,阻塞线程转为放行状态(绿灯)
clear():将标志位设置为False,放行状态转为阻塞状态(红灯)
is_set():检测标志位是否为True,返回True,否则返回False
wait([timeout]):如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。
代码实例:红绿灯
import time,threading event = threading.Event() event.set() def lighter(): count = 0 while True: if count > 20 and count < 30: #改成红灯 event.clear() #把标志位清除 print('\033[41;1mred light is on...\033[0m') elif count > 30: event.set() #变绿灯 count = 0 else: print('\033[42;1mgreen light is on...\033[0m') time.sleep(1) count += 1 def car(name): while True: if event.is_set(): #代表绿灯 print('[%s] running...' % name) time.sleep(1) else: print('waiting...') event.wait() #阻塞线程
lighter = threading.Thread(target=lighter,) lighter.start() car1 = threading.Thread(target=car,args=('TOYOTO',)) car1.start()
生产者消费者模型
生产者负责向容器(如队列)内投放数据,消费者从容器中取数据使用。达到生产者与消费者彼此之间不直接通讯的目的。容器就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
生产者消费者模型实现了程序的解耦(一个模块进行了修改,不影响另一个调用它的模块)
举例:
import threading,time,queue q = queue.Queue(maxsize=10) def producer(name): count = 0 while True: q.put('骨头%s'%count) print('%s生产了第%s块骨头' %(name,count)) count += 1 time.sleep(1) def consumer(name): while True: print('%s吃了%s'%(name,q.get())) time.sleep(1) p = threading.Thread(target=producer,args=('lyj',)) c1 = threading.Thread(target=consumer,args=('dog',)) c2 = threading.Thread(target=consumer,args=('meixi',)) p.start() c1.start() c2.start()
参考文章:https://www.cnblogs.com/tkqasn/p/5700281.html