引言
之前也写过多线程的博客,用的是 threading ,今天来讲下 python 的另外一个自带库 concurrent 。concurrent 是在 Python3.2 中引入的,只用几行代码就可以编写出线程池/进程池,并且计算型任务效率和 mutiprocessing.pool 提供的 poll 和 ThreadPoll 相比不分伯仲,而且在 IO 型任务由于引入了 Future 的概念效率要高数倍。而 threading 的话还要自己维护相关的队列防止死锁,代码的可读性也会下降,相反 concurrent 提供的线程池却非常的便捷,不用自己操心死锁以及编写线程池代码,由于异步的概念 IO 型任务也更有优势。
concurrent 的确很好用,主要提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 。一个多线程,一个多进程。但 concurrent 本质上都是对 threading 和 mutiprocessing 的封装。看它的源码可以知道,所以最底层并没有异步。
ThreadPoolExecutor 自己提供了任务队列,不需要自己写了。而所谓的线程池,它只是简单的比较当前的 threads 数量和定义的 max_workers 的大小,小于 max_workers 就允许任务创建线程执行任务。
操作多线程/多进程
1、创建线程池
通过 ThreadPoolExecutor 类创建线程池对象,max_workers 设置最大运行线程数数。使用 ThreadPoolExecutor 的好处是不用担心线程死锁问题,让多线程编程更简洁。
1
2
3
|
from concurrent import futures
pool = futures.ThreadPoolExecutor(max_workers = 2 )
|
2、submit
submit(self, fn, *args, **kwargs):
- fn:需要异步执行的函数
- *args,**kwargs:fn 接受的参数
该方法的作用就是提交一个可执行的回调task,它返回一个Future对象。可以看出此方法不会阻塞主线程的执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print ( '{}:{} {}' . format (datetime.datetime.now(),url,r.status_code))
urls = [ 'https://www.baidu.com' , 'https://www.tmall.com' , 'https://www.jd.com' ]
pool = futures.ThreadPoolExecutor(max_workers = 2 )
for url in urls:
task = pool.submit(get_request,url)
print ( '{}主线程' . format (datetime.datetime.now()))
time.sleep( 2 )
# 输出结果
2021 - 03 - 12 15 : 29 : 10.780141 :主线程
2021 - 03 - 12 15 : 29 : 10.865425 :https: / / www.baidu.com 200
2021 - 03 - 12 15 : 29 : 10.923062 :https: / / www.tmall.com 200
2021 - 03 - 12 15 : 29 : 10.940930 :https: / / www.jd.com 200
|
3、map
map(self, fn, *iterables, timeout=None, chunksize=1):
- fn:需要异步执行的函数
- *iterables:可迭代对象
map 第二个参数是可迭代对象,比如 list、tuple 等,写法相对简单。map 方法也不会阻塞主线程的执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print ( '{}:{} {}' . format (datetime.datetime.now(),url,r.status_code))
urls = [ 'https://www.baidu.com' , 'https://www.tmall.com' , 'https://www.jd.com' ]
pool = futures.ThreadPoolExecutor(max_workers = 2 )
tasks = pool. map (get_request,urls)
print ( '{}:主线程' . format (datetime.datetime.now()))
time.sleep( 2 )
# 输出结果
2021 - 03 - 12 16 : 14 : 04.854452 :主线程
2021 - 03 - 12 16 : 14 : 04.938870 :https: / / www.baidu.com 200
2021 - 03 - 12 16 : 14 : 05.033849 :https: / / www.jd.com 200
2021 - 03 - 12 16 : 14 : 05.048952 :https: / / www.tmall.com 200
|
4、wait
如果要等待子线程执行完之后再执行主线程要怎么办呢,可以通过 wait 。
wait(fs, timeout=None, return_when=ALL_COMPLETED):
- fs:所有任务 tasks
- return_when:有三个参数 FIRST_COMPLETED:只要有一个子线程完成则返回结果。 FIRST_EXCEPTION:只要有一个子线程抛异常则返回结果,若没有异常则等同于ALL_COMPLETED。 ALL_COMPLETED:默认参数,等待所有子线程完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print ( '{}:{} {}' . format (datetime.datetime.now(),url,r.status_code))
urls = [ 'https://www.baidu.com' , 'https://www.tmall.com' , 'https://www.jd.com' ]
pool = futures.ThreadPoolExecutor(max_workers = 2 )
tasks = []
for url in urls:
task = pool.submit(get_request,url)
tasks.append(task)
futures.wait(tasks)
print ( '{}:主线程' . format (datetime.datetime.now()))
time.sleep( 2 )
# 输出结果
2021 - 03 - 12 16 : 30 : 13.437042 :https: / / www.baidu.com 200
2021 - 03 - 12 16 : 30 : 13.552700 :https: / / www.jd.com 200
2021 - 03 - 12 16 : 30 : 14.117325 :https: / / www.tmall.com 200
2021 - 03 - 12 16 : 30 : 14.118284 :主线程
|
5、异常处理
as_completed(fs, timeout=None)
- 所有任务 tasks
使用 concurrent.futures 操作 多线程/多进程 过程中,很多函数报错并不会直接终止程序,而是什么都没发生。使用 as_completed 可以捕获异常,代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print ( '{}:{} {}' . format (datetime.datetime.now(),url,r.status_code))
urls = [ 'www.baidu.com' , 'https://www.tmall.com' , 'https://www.jd.com' ]
# 创建线程池
pool = futures.ThreadPoolExecutor(max_workers = 2 )
tasks = []
for url in urls:
task = pool.submit(get_request,url)
tasks.append(task)
# 异常捕获
errors = futures.as_completed(tasks)
for error in errors:
# error.result() 等待子线程都完成,并抛出异常,中断主线程
# 捕获子线程异常,不会终止主线程继续运行
print (error.exception())
futures.wait(tasks)
print ( '{}:主线程' . format (datetime.datetime.now()))
time.sleep( 2 )
# 输出结果
Invalid URL 'www.baidu.com' : No schema supplied. Perhaps you meant http: / / www.baidu.com?
2021 - 03 - 12 17 : 24 : 26.984933 :https: / / www.tmall.com 200
None
2021 - 03 - 12 17 : 24 : 26.993939 :https: / / www.jd.com 200
None
2021 - 03 - 12 17 : 24 : 26.994937 :主线程
|
多进程编程也类似,将 ThreadPoolExecutor 替换成 ProcessPoolExecutor 。
以上就是python基于concurrent模块实现多线程的详细内容,更多关于python concurrent实现多线程的资料请关注服务器之家其它相关文章!
原文链接:https://www.cnblogs.com/shenh/p/14338173.html