cpython中的GIL和pool
-
GIL锁(全局解释器锁)
1.what?
GIL是全局解释器锁,和普通锁加在数据上不同的是:GIL加在加在解释器上,是为了防止多个线程在同一时间执行python字节码,也就是这个锁是用来防止同一时间有多个线程被执行。
2.why?
由于cpython的内存管理是非线程安全,于是cpython就给解释器加个锁,解决安全问题,但是降低了效率。
3.GIL带来的问题。
给线程加锁让线程无法并行,即使在多核处理器下也无法并行。
-
线程和进程的效率对比
#计算密集型任务
from multiprocessing import Process
import time
from threading import Thread
def task():
a=0
for i in range(10000000):
a+=1
a*10/2-3
if __name__ == '__main__':
s = time.time()
# t1 = Thread(target=task)
# t2 = Thread(target=task)
# t3 = Thread(target=task)
t1 = Process(target=task)
t2 = Process(target=task)
t3 = Process(target=task)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print(time.time()-s)
# 4.394615173339844 #多个线程运行结果
# 1.6428406238555908 #多进程运行结果
#IO任务型
from multiprocessing import Process
from threading import Thread
import time
def task():
with open('1.py', mode='rb')as f:
while True:
data = f.read(1024)
if not data:
break
if __name__ == '__main__':
s = time.time()
t1 = Thread(target=task)
t2 = Thread(target=task)
t3 = Thread(target=task) #t1 = Process(target=task)
#t2 = Process(target=task)
#t3 = Process(target=task)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print(time.time()-s) #0.001976490020751953 # 多个线程运行结果
#0.2104020118713379 # 多个进程运行结果总结:
1、单核下,IO密集型和计算密集型任务,GIL不会带来影响
2、在多核下,IO密集型任务会受到GIL影响,但是影响可以忽略不记
3、在多核下,如果是计算密集型,应该使用cpython中的多进程,IO密集型用多线程。
-
GIL与线程锁的区别
#GIL是用于保护解释器相关的数据,解释器也是一段程序,肯定有其定义各种数据
#对于程序中自己定义的数据则没有任何的保护效果,这一点在没有介绍GIL前我们就已经知道了,所以当程序中出现了共享自定义的数据时就要自己加锁,如下例:
from threading import Thread,Lock
import time
a = 0
def task():
global a
temp = a
time.sleep(0.01)
a = temp + 1
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
t1.join()
t2.join()
print(a)
过程分析:
1.线程1获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL
2.线程2获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL
3.线程1睡醒后获得CPU执行权,并获取GIL执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU并释放GIL
4.线程2睡醒后获得CPU执行权,并获取GIL执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU并释放GIL,最后a的值也就是1
之所以出现问题是因为两个线程在并发的执行同一段代码,解决方案就是加锁!
from threading import Thread,Lock
import time
lock = Lock()
a = 0
def task():
global a
lock.acquire()
temp = a
time.sleep(0.01)
a = temp + 1
lock.release()
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
t1.join()
t2.join()
print(a)
过程分析:
1.线程1获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL,不释放lock
2.线程2获得CPU执行权,并获取GIL锁,尝试获取lock失败,无法执行,释放CPU并释放GIL
3.线程1睡醒后获得CPU执行权,并获取GIL继续执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为1
4.线程2获得CPU执行权,获取GIL锁,尝试获取lock成功,执行代码,得到a的值为1后进入睡眠,释放CPU并释放GIL,不释放lock
5.线程2睡醒后获得CPU执行权,获取GIL继续执行代码 ,将temp的值1+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为2 -
进程池/线程池
1、what?
池表示一个容器,本质就是一个存储进程或线程的列表。
2、why?
在很多情况下需要控制进程或线程的数量在一个合理的范围,例如TCP程序中,一个客户端对应一个线程。虽然线程开销小,但也不能无限开,否则系统资源迟早被耗尽,解决办法就是控制线程的数量。
3、池有什么作用?
线程/进程池不仅帮我们控制线程/进程的数量,还帮我们完成了线程/进程的创建,销毁,以及任务的分配
4、how?
#进程池的使用
from concurrent.futures import ProcessPoolExecutor
import os
#指定进程池最大容量是3,如果不指定默认为CPU核心数
pool = ProcessPoolExecutor(3)
def task():
print('%s running...' % os.getpid())
if __name__ == '__main__':
for i in range(10):
# 提交任务到进程池
pool.submit(task)
#线程池的使用
from concurrent.futures import ThreadPoolExecutor
from threading import active_count,current_thread
import time
#指定线程池最大容量为3,如果不指定则默认为CPU核心数*5
pool = ThreadPoolExecutor(3)
print(active_count())
def task():
print('%s running...' % current_thread().name)
time.sleep(1)
if __name__ == '__main__':
#提交任务到线程池
for i in range(10):
pool.submit(task) # 第一次提交任务时会创建进程 ,后续再提交任务,直接交给以及存在的进程来完成,如果没有空闲进程就等待 # 与信号量的区别 ,信号量也是一种锁 适用于保证同一时间能有多少个进程或线程访问
# 而线程/进程池,没有对数据访问进行限制仅仅是控制数量
-
同步和异步
-
程序运行的状态:
就绪、运行、阻塞
程序遇到IO操作时,无法继续执行代码,叫做阻塞。
程序没有遇到IO操作,正常执行,叫做非阻塞。
-
同步与异步:
同步:发起任务后必须等待任务结束,拿到任务结果才能继续执行
异步:发起任务后,不需要关心任务的执行过程,可以继续往下运行。
异步的效率高于同步。但是不是所有的任务都可以异步执行,判断一个任务是否可以异步的条件是任务执行方是否立即需要执行结果,需要结果必须用同步。
-
同步和异步与阻塞和非阻塞的区别
同步不等于阻塞,异步不等于非阻塞
当用异步方式发起任务时,任务中可能也包含IO操作,异步也可能阻塞。
同步执行任务的时候,也会卡主程序,但是等同于阻塞,因为任务可能时纯计算任务,CPU没走。
-
使用线程池发起异步任务。
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor()
def task(i):
time.sleep(1)
print("sub thread run..")
i += 100
return i
fs = []
for i in range(10):
f = pool.submit(task, i) # submit就是一异步的方式提交任务
# print(f)
# print(f.result()) # result是阻塞的 会等到这任务执行完成才继续执行 ,会异步变成同步
fs.append(f)
# 是一个阻塞函数,会等到池子中所有任务完成后继续执行
pool.shutdown(wait=True)
# pool.submit(task,1) # 注意 在shutdown之后 就不能提交新任务了
for i in fs:
print(i.result())
print("over")
-
异步回调
1.什么是异步回调?
a有任务给b执行,b在执行完后回来调用了a的一个函数,称之为回调。
异步任务使用的场景:爬虫
1.从目标站点下载网页数据,本质就是HTML格式字符串
2.用re从字符串中提取出需要的数据
2.为什么需要回调函数?
需要获取异步任务的结果,但是又不应该阻塞。高效获取任务。
3.怎么去用回调函数?
通常异步任务都会和回调函数一起使用
使用 方式:
使用add_done_callback函数给Future对象绑定一个函数
需要注意:
在多进程中,回调函数是交给主进程执行,在多线程中,回调函数是那个线程空闲就去执行,但不会是主线程去执行。
#示例
#在多进程中异步回调
import requests
import re
from multiprocessing import Process
import os
from concurrent.futures import ProcessPoolExecutor
# response = requests.get('http://www.baidu.com')
# htm = response.content.decode('utf-8')
# print(re.findall('href=.*?com', htm))
def get_data(url):
print('%s正在爬取%s' % (os.getpid(), url))
response = requests.get(url)
print('%s爬取%s完成' % (os.getpid(), url))
return response
def parser(obj):
res = obj.result()
htm = res.content.decode('utf-8')
ls = re.findall('href=.*?com', htm)
print(ls)
print('%s解析完成!共%s个连接' % (os.getpid(), len(ls)))
if __name__ == '__main__':
urls = ['http://www.baidu.com', 'http://www.taobao.com',
'http://www.tmall.com', 'http://www.baidu.com',
'http://www.taobao.com', 'http://www.tmall.com']
pool = ProcessPoolExecutor(3) # 创建进程池
for i in urls:
obj = pool.submit(get_data, i)
obj.add_done_callback(parser)
#在多线程中使用异步回调
import requests
import re
import os
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
def get_data(url):
print('%s %s正在爬取%s' % (os.getpid(), current_thread().name, url))
response = requests.get(url)
print('%s %s 爬取%s完成' % (os.getpid(), current_thread().name, url))
return response
def parser(obj):
res = obj.result()
htm = res.content.decode('utf-8')
ls = re.findall('href=.*?com', htm)
# print(ls)
print('%s %s解析完成!共%s个连接' % (os.getpid(), current_thread().name, len(ls)))
if __name__ == '__main__':
urls = ['http://www.baidu.com', 'http://www.taobao.com',
'http://www.tmall.com', 'http://www.baidu.com',
'http://www.taobao.com', 'http://www.tmall.com']
pool = ThreadPoolExecutor(3) # 创建线程池,最大容量3
for i in urls:
obj = pool.submit(get_data, i)
# res = obj.result() # 会把任务变成串行
# parser(res)
obj.add_done_callback(parser)线程队列、事件、协程
-
线程队列
与进程队列的区别:
进程队列是进程数据共享的容器,可以被多个进程共享。
线程队列就是一个普通的容器,不能被进程间实现共享。
-
第一种:普通队列Queue (先进先出)
from queue import Queue
q = Queue()
q.put(123)
q.put('a')
q.put('b')
print(q.get()) #
print(q.get()) # a
print(q.get()) # b -
第二种:后进先出队列(堆栈)
from queue import LifoQueue
q = LifoQueue()
q.put(123)
q.put('a')
q.put('b')
print(q.get()) # b
print(q.get()) # a
print(q.get()) # -
第三种:优先级队列(优先值小的先出)PriorityQueue
from queue import PriorityQueue
q = PriorityQueue()
q.put(3)
q.put(1)
q.put(2)
print(q.get()) #
print(q.get()) #
print(q.get()) #
pq = PriorityQueue()
pq.put((2,"b"))
pq.put((3,"c"))
pq.put((1,"a"))
print(pq.get()) # (1,"a")
print(pq.get()) # (2,"b")
print(pq.get()) # (3,"c")
###注意:放入的数据必须是可以比大小的同类数据
如果数据取完继续取不会报错,而是阻塞
-
-
线程事件
事件是用于协调多个线程工作的,当一个线程要执行某个操作,需要获取两一个线程状态。
#用法:
import time
from threading import Thread,Event
e = Event() #创建一个事件 (有点类似状态为False)
def start():
print('server starting...') #模拟创建一个服务器
time.sleep(5)
print('server starting...')
e.set()
def connect(): #模拟连接
# 重试三次,如果三次都没成功就会走for 的else
for i in range(3):
print('connect waiting')
e.wait(1) # 可以加上参数时间timeout,wait会阻塞,阻塞的参数timeout
if e.isSet():
print('connect successful')
else:
print('connect lose')
else:
print('server starting lose')
Thread(target=start).start()
Thread(target=connect).start() -
单线程实现并发的效率
在单线程下实现并发效果
import time
def task1():
a = 0
for i in range(10000000):
a += i
yield
def task2():
b = 0
g = task1()
for i in range(10000000):
b += i
next(g)
s = time.time()
task2()
print(time.time()-s)
#所谓的单线程并发是为了提高线程效率
#对于计算密集型任务,单线程并发反而降低效率
#对于IO密集型的任务,如果在执行IO操作时可以切换到其他的任务,能提高线程对CPU的占有率。也就是充分利用CPU时间片。 -
greenlet模块
"""
greenlet 主要封装了生成器 是的我们在使用生成器实现并发时 简化了代码
"""
import greenlet
import time
def task1():
print("task1 run")
time.sleep(10)
g2.switch()
print("task1 run")
def task2():
print("task2 run")
g1.switch()
g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)
g1.switch() -
协程
1.什么是协程?
协助线程更高效的执行任务。
本质就是单线程充分利用CPU的时间片,实现并发,也称为微线程。
2.为什么要用协程?
在CPython解释器中,在未达到时间片之前一旦遇到IO操作,CPU就会被切换至其他的线程,又无法线程并行执行任务。所以利用协程去充分利用CPU有限的时间片。
3.怎么用协程?
使用gevent模块实现协程,其能在多任务间进行切换,而且能自己检测IO,充分有效利用CPU。
import gevent
from gevent import monkey # 补丁
import time
monkey.patch_all() # 加上补丁遇到IO阻塞就会切换
def test1():
print('test1 run')
time.sleep(10)
print('test1 run')
def test2():
print('test2 run')
print('test2 run')
g1 = gevent.spawn(test1)
g2 = gevent.spawn(test2)
print('over')
# g1.join()
# g2.join()
# g1.join(),g2.join()等效于 gevent.joinall([g1, g2])
gevent.joinall([g1, g2])
-
-