Day037--Python--线程的其他方法,GIL, 线程事件,队列,线程池,协程

时间:2023-03-09 17:36:53
Day037--Python--线程的其他方法,GIL, 线程事件,队列,线程池,协程

1. 线程的一些其他方法

  threading.current_thread()  # 线程对象

  threading.current_thread().getName()  # 线程名称

  threading.current_thread().ident   # 当前线程ID

  threading.get_ident()  #  当前线程ID

  threading.enumerate()  # 连同主线程在内的正在运行的线程名称

  threading.active_count()  # 活跃的线程数

from threading import Thread
import threading
import time def work():
time.sleep(1)
print('子线程', threading.get_ident()) # 当前线程 ID
print('子线程', threading.current_thread().ident) # 当前线程ID
print('子线程', threading.current_thread().getName()) # 当前线程名称 if __name__ == '__main__':
t = Thread(target=work)
t.start()
# time.sleep(2)
print('主线程', threading.current_thread()) # 主线程对象
print('主线程', threading.current_thread().getName()) # 主线程名称
print('主线程', threading.current_thread().ident) # 当前线程ID
print('主线程', threading.get_ident()) # 当前线程ID print(threading.enumerate()) # 连同主线程在内正在运行的线程
print(threading.active_count()) # 目前正在活跃的线程数量

2. 线程事件

from threading import Thread, Event

e = Event()  # e的两种状态: False, True, 当事件对象的状态为False的时候, wait的地方会阻塞 

# e.set()  # 将事件对象的状态改为True
# e.clear() # 将事件对象的状态改为False
print('在这里等待')
e.wait()
print('好了')

事件

3. 线程队列

  双向队列

import queue
# 先进先出 FIFO q = queue.Queue()
q.put('first')
q.put('second')
q.put('third')
q.put_nowait(4) # 没有数据会报错
print(q.get())
print(q.get())
print(q.get())
print(q.get_nowait())

先进先出

import queue
# 后进先出 LIFO Last in First out q = queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
q.put_nowait(4) # 没有数据会报错
print(q.get())
print(q.get())
print(q.get())
print(q.get_nowait())

后进先出  Lifo

  优先级队列

# 优先级队列

import queue

q = queue.PriorityQueue()
q.put((20, 'b')) # 元组第一个对象表示优先级, 第二个对象是放入队列的数据
q.put((20, 'c'))
q.put((20, 'a')) # 先比较优先级, 优先级相同的按照数据ASCII码顺序排列
q.put((0, 'b'))
q.put((30, 'c')) print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())

优先级队列

import queue
q = queue.PriorityQueue()
q.put((20, 'abc')) # 元组第一个对象表示优先级, 第二个对象是放入队列的数据
q.put((20, 'acc')) # 先比较优先级, 优先级相同的按照数据ASCII码顺序排列
q.put((20, 'ac')) print(q.get())
print(q.get())
print(q.get())

4.线程池 (重点)

  Python 标准模块: concurrent.futures

早期的时候我们没有线程池,现在python提供了一个新的标准或者说内置的模块,这个模块里面提供了新的线程池和进程池,之前我们说的进程池是在multiprocessing里面的,现在这个在这个新的模块里面,
他俩用法上是一样的。为什么要将进程池和线程池放到一起呢,是为了统一使用方式,使用threadPoolExecutor和ProcessPoolExecutor的方式一样,而且只要通过这个concurrent.futures导入就可以
直接用他们两个了。

  multiprocessing VS. concurrent.futures

  线程池中的基本方法

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务 #map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作 #shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前 #result(timeout=None)
取得结果 #add_done_callback(fn)
回调函数
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread def func(n):
time.sleep(1)
print(n, current_thread().ident)
return n**2 if __name__ == '__main__':
t_p = ThreadPoolExecutor(max_workers=4) # 最大线程数量
map_res = t_p.map(func, range(10))
# print(map_res)
# for i in map_res:
# print(i)
print([i for i in map_res]) # 异步执行, map自带join功能

map

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread
import time def func(n):
time.sleep(1)
print(n, current_thread().ident)
return n**2 if __name__ == '__main__':
t_p = ThreadPoolExecutor(max_workers=4) # 此处切换成ProcessPoolExecutor就可以直接转换成进程, 下面都一样
t_res_list = []
for i in range(10):
res_obj = t_p.submit(func, i) # 整合了创建线程和启动线程 异步提交了10个任务
# res_obj.result() # 和get一样会阻塞等待
t_res_list.append(res_obj)

   t_p.shutdown()  # 等同于close() + join() 如果不加, res.result会四个四个地打印,加了就一次性打印
for res in t_res_list: # 和进程池一样, 不用等到get阻塞,异步进行
print(res.result()) print('主线程结束')

  回调函数

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread
import time def func(n):
# time.sleep(1)
# print(n, current_thread().ident)
return n**2 def func2(n):
# print('>>>>>>>', n)
print('current_thread>>>', current_thread().getName())
print('>>>>>>>', n.result()) if __name__ == '__main__':
t_p = ThreadPoolExecutor(max_workers=4) # 此处切换成ProcessPoolExecutor就可以直接转换成进程, 下面都一样
for i in range(10):
res_obj = t_p.submit(func, i).add_done_callback(func2) t_p.shutdown() # 等同于close() + join()
print('主线程结束')

 

5.GIL 全局解释器锁 (重点)

Day037--Python--线程的其他方法,GIL, 线程事件,队列,线程池,协程

'''
执行进程要调用python解释器, 开启一个python解释器进程. 代码写在硬盘上, 需要加载到内存中交给cpu运行. 要想运行,系统先开辟
一块内存, 里面跑进程. 先把python解释器代码加载到内存,也是在这个进程中,此时这个进程中包含解释器和py文件, 将py文件的代码
交给解释器解释(一堆字符串,相当于传参), 解释器解释,遇到函数解释器自动调用. 目前默认的解释器是CPython解释器. 此时解释器
有两项事情要做:1. 编译器 2. 虚拟机 . 先经过编译器编译, 先编译成C语言的字节码, 生成.pyc文件, 然后虚拟机拿到c的字节码,
输出机器码, 再配合操作系统,把这个进程扔给CPU去执行. 所有py文件里的线程都要去cpython解释器中去编译解释, CPython解释器
中有一个GIL全局解释器锁, 导致同一时间同一进程内只有一个线程能够执行解释, 无法应用多核.
'''

  在C语言写的python解释器中存在全局解释器锁, 由于全局解释器锁的存在, 在同一时间内, python解释器只能运行一个线程的代码, 这大大影响了python多线程的性能. 而这个解释器锁由于历史原因, 现在几乎无法消除.

  python GIL之所以会影响多线程等性能, 是因为再多线程的情况下, 只有当线程获得了一个全局锁的时候, 该线程代码才能运行, 而全局锁只有一个, 所以使用python多线程, 在同一时刻也只有一个线程在运行, 因此即使是在多核的情况下,也只能发挥出单核的性能.

  python线程切换:

  对于有IO操作的线程, 当一个线程执行IO操作时,python会强制收回GIL, 其他线程可以抢夺.

  对于计算密集型线程, python中会有一个执行指令的计数器, 当一个线程执行了一定数量的指令时, 该线程就会停止执行并让出当前的锁, 其他线程就可以抢夺GIL.

  参见: GIL锁简介、以及解决方案

  GIL锁与互斥锁的区别

# 线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来

# join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高

    #1.多个线程去抢GIL锁,即抢执行权限
#2. 肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()
#3. 极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,*交出执行权限,即释放GIL
#4.直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程

  GIL锁保证只有一个线程可以进入cpu运行, 如果想用多核, 可以用多进程.

  多线程与多进程进行纯计算的效率对比

from multiprocessing import Process
from threading import Thread
import time def func():
num = 0
for i in range(1, 10000000):
num += i if __name__ == '__main__':
p_s_t = time.time()
p_list = []
for i in range(10):
p = Process(target=func)
p.start()
p_list.append(p)
[p.join() for p in p_list]
p_e_t = time.time()
p_dif_t = p_e_t - p_s_t t_s_t = time.time()
t_list = []
for i in range(10):
t = Thread(target=func,)
t.start()
t_list.append(t)
[t.join() for t in t_list]
t_e_t = time.time()
t_dif_t = t_e_t - t_s_t
print('多进程执行时间', p_dif_t)
print('多线程执行时间', t_dif_t)

  I/o密集型效率对比   当计算少, IO操作多的时候, 多线程有优势

from multiprocessing import Process
from threading import Thread
import time def func():
time.sleep(1)
for i in range(10):
print('\r%s' % i, end='') if __name__ == '__main__':
p_s_t = time.time()
p_list = []
for i in range(10):
p = Process(target=func)
p.start()
p_list.append(p)
[p.join() for p in p_list]
p_e_t = time.time()
p_dif_t = p_e_t - p_s_t t_s_t = time.time()
t_list = []
for i in range(10):
t = Thread(target=func,)
t.start()
t_list.append(t)
[t.join() for t in t_list]
t_e_t = time.time()
t_dif_t = t_e_t - t_s_t
print('多进程执行时间', p_dif_t)
print('多线程执行时间', t_dif_t)

6.协程的认识

  协程: 单线程下实现并发

  并发: 伪并行, 遇到IO就切换, 单核下多个任务之间切换执行, 给你的效果就是貌似你的几个程序在同时执行, 提高效率.

     (任务切换+保存状态)

  并行: 多核cpu, 真正的同时执行

  串行: 一个任务执行完再执行下一个任务

  多线程多进程下的任务切换+保存状态是操作系统做的, 也要耗时. 为了提高效率, 出现协程.

并发与串行效率对比

# 并发执行的效率(在没有IO的情况下,两个纯计算的任务)
import time def func1():
num2 = 0
for i in range(1000001):
num2 += i
yield
# print('执行到下一个yield',current_thread().name) def func2():
g = func1()
next(g)
sum = 0
for i in range(1000000):
# g.send(i)
sum += i
next(g) s_t = time.time()
func2()
print('协程的时间', time.time() - s_t) # 串行执行的效率
def func3():
num2 = 0
for i in range(1000001):
num2 += i
# print('执行到下一个yield',current_thread().name) def func4():
sum = 0
for i in range(1000000):
sum = sum + i s_t = time.time()
func3()
func4()
print('串行的时间', time.time() - s_t)

  此时串行比yield切换省时间

# 串行
import time
def consumer(i):
time.sleep(1)
print('处理了数据:', i) def producer():
for i in range(3):
consumer(i)
print('发送了数据:', i) start2 = time.time()
producer()
stop2 = time.time()
print(stop2 - start2)

串行

  通过生成器实现单线程下的并发, 没有实现IO切换, 比串行慢

import time
# 协程, 单线程下实现并发
def consumer():
while 1:
x = yield
time.sleep(1)
print('处理了数据:', x) def producer():
g = consumer()
next(g)
for i in range(3):
g.send(i)
print('发送了数据:', i) start = time.time()
producer()
stop = time.time()
print(stop - start)

协程

7.Greenlet

import time
from greenlet import greenlet def eat(name):
print('%s eat 1' % name)
time.sleep(1)
g2.switch('taibai')
print('%s eat 2' % name)
g2.switch() def play(name):
print('%s play 1' % name)
time.sleep(1)
g1.switch()
print('%s play 2' % name) g1 = greenlet(eat)
g2 = greenlet(play) g1.switch('taibai') # 也没有实现IO切换

8.Gevent

9. condition 条件

  使得线程等待,只有满足某条件时,才释放n个线程

import threading

def run(n):
con.acquire()
con.wait()
print("run the thread: %s" %n)
con.release() if __name__ == '__main__': con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start() while True:
inp = input('>>>')
if inp == 'q':
break
con.acquire()
con.notify(int(inp))
con.release()
def condition_func():

    ret = False
inp = input('>>>')
if inp == '':
ret = True return ret def run(n):
con.acquire()
con.wait_for(condition_func)
print("run the thread: %s" %n)
con.release() if __name__ == '__main__': con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()

10. 定时器 Timer

import time
from threading import Timer,current_thread #这里就不需要再引入Timer
import threading
def hello():
print(current_thread().getName())
print("hello, world")
# time.sleep(3) #如果你的子线程的程序执行时间比较长,那么这个定时任务也会乱,当然了,主要还是看业务需求
# t = Timer(10, hello) #创建一个子线程去执行后面的函数
# t.start()
# after 10 seconds, "hello, world" will be printed
for i in range(5):
t = Timer(2, hello)
t.start()
time.sleep(3) # 这个是创建一个t用的时间是2秒,等待3秒后创建出来第二个. print(threading.active_count())
print('主进程',current_thread().getName())

参见: https://www.cnblogs.com/clschao/articles/9684694.html#part_9