Pythpn并发编程——多线程与协程
1. 进程与线程
1.1 概念上
- 对于操作系统来说,一个任务就是一个进程Process,在一个进程内部,要同时干很多事情,就需要同时运行多个子任务,进程内的这些子任务就称为线程Thread
- 操作系统是让各个任务交替执行实现支持多任务的,真正地同时执行多任务需要多核CPU才能实现
- 线程是最小的执行单元,一个进程至少有一个线程,如何调读进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间
1.2 多进程与多线程——同时执行多个任务
要实现多任务,设计Master-Worker模式,Master负责分配任务,Worker负责执行任务
-
多进程模式
- 启动多个进程,每个进程只有一个线程,多个进程可以一块执行多个任务
- 最大的优点:稳定性高,一个子进程崩溃了,不会影响主进程和其他子进程
- 缺点:创建进程的开销大
-
多线程模式
- 启动一个进程,在一个进程内启动多个线程,多个线程也可以一块执行多个任务
- 致命缺点:任何一个线程挂掉都可能造成整个进程崩溃,因为所有线程共享进程的内存
-
多进程+多线程模式
- 实际很少采用
2. 并发和并行
并发:不是指同一时刻有多个操作同时进行,实际上,在某个特定时刻,只允许有一个操作发生,线程/任务之间互相切换,直到完成,threading和asyncio
- 通常应用于I/O操作频繁的场景,例如从网站上下载多个文件
并行:同一时刻,同时发生,multi-processing,m个处理器,开m个进程
3. Python多线程——futures
3.1 多线程用法
- 1.导入future模块,Python中的future模块,位于concurrent.futures 和 asyncio 中
- 2.创建线程池,函数ThreadPoolExecutor(max_workers=5),max_workers设置线程个数
- 3.调用,map()函数
import concurrent.futures
import requests
import threading
import time
def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))
def download_all(sites):
# 并发模式,创建了一个线性池,总共有5个线性可以分配使用
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 对sites中的每个元素,并发地调用函数download_one
executor.map(download_one, sites)
# 并行模式,创建进程池,系统自动返回CPU的数量作为可以调用的进程数
# with concurrent.futures.ProcessPoolExecutor() as executor: #
# 另一种写法
# def download_all(sites):
# with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# to_do =[] #
# for site in sites:
# future = executor.submit(download_one, site)
# to_do.append(future)
# for future in concurrent.futures.as_completed(to_do):
# future.result()
def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography'
]
start_time = time.perf_counter()
download_all(sites)
end_time = time.perf_counter()
print(f'Download {len(sites)} sites in {end_time - start_time} seconds')
if __name__ == '__main__':
main()
Read 182102 from https://en.wikipedia.org/wiki/Portal:Arts
Read 245181 from https://en.wikipedia.org/wiki/Portal:Society
Read 206928 from https://en.wikipedia.org/wiki/Portal:History
Read 336222 from https://en.wikipedia.org/wiki/Portal:Biography
Download 4 sites in 0.22546189799322747 seconds
3.2. 为什么多线程每次只允许只能有一个线程执行?
全局解释器锁的存在,GIL(Global Interpreter Lock)
3.3 多线程的缺点
- 多线程运行过程容易被打断,因此有可能出现 race condition 的情况
- 线程切换本身存在一定的损耗,线程数不能无限增加
4. python协程——asyncio
4.1 概念
单线程的异步编程模型称为协程。在执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行,注意,这不是函数调用。
Async异步:不同操作间可以相互交替执行,如果其中的某个操作被block了,程序并不会等待,而是找出可执行的操作继续执行
sync同步:指操作一个接一个地执行,下一个操作必须等上一个操作完成后才能执行
4.2 Asyncio原理
event loop对象维护两个任务列表,预备状态和等待状态,选取预备状态的一个任务,使其运行,一直到这个任务把控制权交还给event loop为止,当任务把控制权交还给event loop时,如果任务完成,它则将其放到预备状态的列表,否则,放在等待状态的列表,然后遍历等待状态的列表,查看它们是否完成,而原先在预备状态列表的任务位置仍旧不变,因为它们还未运行。当所有任务被重新放置在合适的列表后,新一轮的循环又开始了。
4.3 如何使用?
导入内置库 asyncio
async 修饰词声明异步函数,调用异步函数,便可得到一个协程对象
-
协程的执行:
- 通过await来调用,执行效果和正常执行一样,会阻塞在这里,进入被调用的协程函数,执行完毕返回后再继续
-
asynio.creat_task(调用异步函数) 创建任务
- asynio.run(main())作为主程序的入口函数,触发运行
import asyncio
async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time) # 从当前任务切出,事件调读器开始调度
print('OK {}'.format(url)) # 任务完成后,从事件循环中退出
async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls] # 列表生成式
for task in tasks: # 多个任务被创建,进入事件循环等待运行
await task # 执行,用户选择从当前主任务中切出,事件调度器开始调度
# await asyncio.gather(*tasks)
asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
########## 输出 ##########
crawling url_1
crawling url_2
crawling url_3
crawling url_4
OK url_1
OK url_2
OK url_3
OK url_4
import asyncio
async def worker_1():
await asyncio.sleep(1) # 4.从当前任务切出,事件调读器开始调度任务2
return 1 # 7.1秒后,事件调读器将控制权重新传给任务1,返回1,任务1完成,从事件循环中退出,并把控制器传给主任务
async def worker_2(): # 协程运行时出现错误
await asyncio.sleep(2) #5.从当前任务切出,事件调读器开始调度任务3,
return 2 / 0 # 8.2秒后,事件调读器将控制器重新传给任务2,运行出错,从事件循环中退出,控制器传给主任务
async def worker_3():
await asyncio.sleep(3) # 6.从当前任务中切出,事件调读器暂停调度
return 3 # 9.触发限定运行规则,任务3被取消,退出事件循环
async def main():
task_1 = asyncio.create_task(worker_1()) # 2.任务123被创建,并进入事件循环等待运行
task_2 = asyncio.create_task(worker_2())
task_3 = asyncio.create_task(worker_3())
await asyncio.sleep(2) # 给任务3限定运行时间,一旦超时就取消
task_3.cancel()
res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True) # 3.执行任务,用户选择从当前的主任务中切出,事件调读器开始调度任务1
print(res) # 10.主任务输出res,协程任务结束,事件循环结束
asyncio.run(main()) # 1.程序进入main()函数,事件循环开启
########## 输出 ##########
[1, ZeroDivisionError('division by zero'), CancelledError()]
4.4. 协程的优点
- 协程是单线程的,但其内部 event loop 的机制,可以让它并发地运行多个不同的任务,并且比多线程享有更大的自主控制权。
- Asyncio 中的任务,在运行过程中不会被打断,因此不会出现 race condition 的情况
- 子程序切换不是线程切换,而是由程序自身控制,在哪些地方交出控制权,切换到下一个任务,因此没有线程切换的开销,具有极高的执行效率
- 协程的写法更加清晰简洁,把async/await 语法和 create_task结合来用,对于中小级别的并发需求已经毫无压力
- 很多情况下,使用 Asyncio 需要特定第三方库的支持(缺点),例如不支持requests,但可以用aiohttp
6. 选择多线程还是协程
- 如果是 I/O bound,并且 I/O 操作很慢,需要很多任务 / 线程协同实现,那么使用 Asyncio 更合适。
- 如果是 I/O bound,但是 I/O 操作很快,只需要有限数量的任务 / 线程,那么使用多线程就可以了。
- 如果是 CPU bound,则需要使用多进程来提高程序运行效率