Python之路(第四十六篇)多种方法实现python线程池(threadpool模块\multiprocessing.dummy模块\concurrent.futures模块)

时间:2024-01-07 14:38:08

一、线程池

很久(python2.6)之前python没有官方的线程池模块,只有第三方的threadpool模块,

之后再python2.6加入了multiprocessing.dummy 作为可以使用线程池的方式,

在python3.2(2012年)之后加入了concurrent.futures模块(python3.1.5也有,但是python3.1.5发布时间晚于python3.2一年多),这个模块是python3中自带的模块,但是python2.7以上版本也可以安装使用。

下面分别介绍下各个线程池模块的使用

使用环境python3.6.4,win7

threadpool模块(上古时期--python2.6之前)

由于threadpool模块是第三方模块需要进行安装,

安装

pip install threadpool 或者在pycharm--settings--Project interpreter中安装

使用介绍

(1)引入threadpool模块

(2)定义线程函数

(3)创建线程 池threadpool.ThreadPool()

(4)创建需要线程池处理的任务即threadpool.makeRequests()

(5)将创建的多个任务put到线程池中,threadpool.putRequest

(6)等到所有任务处理完毕theadpool.pool()

代码实例

import threadpool, time

name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
'Amuro Namie',' Sarah Brightman']


def Say_hello(str):
print("Hello ", str)
time.sleep(2)


def main():
start_time = time.time()
#第一步 创建线程池,线程数为4:
pool = threadpool.ThreadPool(4)
# 第二步创建线程请求,包涵调用的函数、参数和回调函数:
# requests = threadpool.makeRequests(func, args_list, call_back) requests = threadpool.makeRequests(Say_hello, name_list)
# 第三步将所有要运行多线程的请求扔进线程池:
[pool.putRequest(req) for req in requests]
# 第四步等待所有的线程完成工作后退出:
pool.wait()
print('用时共: %s second' % (time.time() - start_time))



if __name__ == '__main__':
main()

  

输出

Hello  Satomi Ishihara
Hello Aragaki Yui
Hello Nainaiwei Hashimoto
Hello HIKARU UTADA
Hello Mai Kuraki
Hello Nozomi Sasaki
Hello Amuro Namie
Hello Sarah Brightman
用时共: 4.0012288093566895 second

  

说明:makeRequests存放的是要开启多线程的函数,以及函数相关参数和回调函数,其中回调函数可以不写(默认是无),也就是说makeRequests只需要2个参数就可以运行。

multiprocessing.dummy(中古时期,python3.2起)

从python3.2起,可以使用multiprocessing.dummy 创建线程池,注意

from  multiprocessing  import  Pool  #这里引入的是进程池
from multiprocessing.dummy import Pool as ThreadPool #这里引入的才是线程池

  

multiprocessing.dummy模块与multiprocessing模块的区别:dummy模块是多线程,而multiprocessing是多进程,api都是通用的。简单地说,multiprocessing.dummy是multiprocessing多进程模块复制的一个多线程模块,API都是通用的。

这里的多线程也是受到它受到全局解释器锁(GIL)的限制,并且一次只有一个线程可以执行附加到CPU的操作。

使用介绍

使用有四种方式:apply_async、apply、map_async、map。

其中apply_async和map_async是异步的,也就是启动进程函数之后会继续执行后续的代码不用等待进程函数返回。apply_async和map_async方式提供了一写获取进程函数状态的函数:ready()successful()get()。

PS:join()语句要放在close()语句后面。

具体可以参考

Python之路(第四十篇)进程池

代码实例

import  time
from multiprocessing.dummy import Pool as ThreadPool

name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
'Amuro Namie',' Sarah Brightman']


def Say_hello(str):
print("Hello ", str)
time.sleep(2)


def main():
start_time = time.time()
#第一步 创建线程池,线程数为4:
pool = ThreadPool(4)
# 第二步用map方法执行
# pool.map(func, args) ,注意这里的map方法自带pool.close()和pool.join()方法,等待所有子线程执行完
pool.map(Say_hello,name_list)
print('用时共: %s second' % (time.time() - start_time))


if __name__ == '__main__':
main()

  

输出结果

Hello  Satomi Ishihara
Hello Aragaki Yui
Hello Nainaiwei Hashimoto
Hello HIKARU UTADA
Hello Mai Kuraki
Hello Nozomi Sasaki
Hello Amuro Namie
Hello Sarah Brightman
用时共: 4.004228830337524 second

  

concurrent.futures模块(从python3.2开始自带)

concurrent.futures模块,可以利用multiprocessing实现真正的平行计算。

核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核。

使用介绍

模块主要包含下面两个类:

  1. ThreadPoolExecutor

  2. ProcessPoolExecutor

也就是对 threading 和 multiprocessing 进行了高级别的抽象, 暴露出统一的接口, 方便开发者使用。

可以使用 ThreadPoolExecutor 来进行多线程编程,ProcessPoolExecutor 进行多进程编程,两者实现了同样的接口,这些接口由抽象类 Executor 定义。 这个模块提供了两大类型,一个是执行器类 Executor,另一个是 Future 类。

ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用

其实 concurrent.futures 底层还是用的 threading 和 multiprocessing 这两个模块, 相当于在这上面又封装了一层, 所以速度上会慢一点, 这个是架构和接口实现上的取舍造成的。

基类Executor

#submit(fn, *args, **kwargs)
异步提交任务

#map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作,map方法接收两个参数,第一个为要执行的函数,第二个为一个序列,会对序列中的每个元素都执行这个函数,返回值为执行结果组成的生成器

#shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
可以通过 with 语句来避免显式调用本方法。with 语句会用 wait=True 的默认参数调用 Executor.shutdown() 方法。


#result(timeout=None)
取得结果
submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future能够使用done()方法判断该任务是否结束,done()方法是不阻塞的,使用result()方法可以获取任务的返回值,这个方法是阻塞的。


#add_done_callback(fn)
回调函数

# done()
判断某一个线程是否完成

# cancle()
取消某个任务

  

代码示例

进程池:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time
def task(n):
print('%s is runing进程号' %os.getpid())
time.sleep(2)
return n**2


def main():
start_time = time.time()
executor=ProcessPoolExecutor(max_workers=3)

futures=[]
for i in range(10):
future=executor.submit(task,i) #这里使用submit提交进程
futures.append(future)
executor.shutdown(True)
print('*'*20)
for future in futures:
print(future.result())
print('用时共: %s second' % (time.time() - start_time))

if __name__ == '__main__':
main()

  

输出结果

10292 is runing进程号
12516 is runing进程号
9664 is runing进程号
10292 is runing进程号
12516 is runing进程号
9664 is runing进程号
10292 is runing进程号
12516 is runing进程号
9664 is runing进程号
10292 is runing进程号
********************
0
1
4
9
16
25
36
49
64
81
用时共: 8.176467418670654 second

  

也可以使用map方法提交进程

示例

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time
def task(n):
print('%s is runing进程号' %os.getpid())
time.sleep(2)
return n**2


def main():
start_time = time.time()
with ProcessPoolExecutor(max_workers=3) as executor:
futures = executor.map(task, [i for i in range(10)])
print('*'*20)
for future in futures:
print(future) #无需再次使用result()方法获取结果
print('用时共: %s second' % (time.time() - start_time))

if __name__ == '__main__':
main()

  

输出结果

1504 is runing进程号
12644 is runing进程号
7732 is runing进程号
1504 is runing进程号
12644 is runing进程号
7732 is runing进程号
1504 is runing进程号
7732 is runing进程号
12644 is runing进程号
1504 is runing进程号
********************
0
1
4
9
16
25
36
49
64
81
用时共: 8.171467304229736 second

  

分析:map方法返回的results列表是有序的,顺序和*iterables迭代器的顺序一致,这里也无需再次使用result()方法获取结果。

这里使用with操作符,使得当任务执行完成之后,自动执行shutdown函数,而无需编写相关释放代码。

map()与submit()使用场景

常用的方法是 submit(), 如果要提交任务的函数是一样的, 就可以简化成 map(), 但是如果提交的函数是不一样的, 或者执行的过程中可能出现异常, 就要使用到 submit(), 因为使用 map() 在执行过程中如果出现异常会直接抛出错误, 而 submit() 则会分开处理。

线程池

用法与ProcessPoolExecutor相同

示例

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time
def task(n):
print('%s is runing进程号' %os.getpid())
time.sleep(2)
return n**2


def main():
start_time = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
futures = executor.map(task, [i for i in range(10)])
print('*'*20)
for future in futures:
print(future)
print('用时共: %s second' % (time.time() - start_time))

if __name__ == '__main__':
main()

  

输出结果

7976 is runing进程号
7976 is runing进程号
7976 is runing进程号
7976 is runing进程号
7976 is runing进程号
7976 is runing进程号
7976 is runing进程号
7976 is runing进程号
7976 is runing进程号
7976 is runing进程号
********************
0
1
4
9
16
25
36
49
64
81
用时共: 8.001457929611206 second

  

分析:注意所有的进程号都是一样的,这里是开启的多线程,所以进程号是一样的

示例二

import  time
from concurrent.futures import ThreadPoolExecutor

name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
'Amuro Namie',' Sarah Brightman']


def say_hello(str):
print("Hello ", str)
time.sleep(2)


def main():
start_time = time.time()
# 用map方法执行
with ThreadPoolExecutor(max_workers=4) as executor:
futures = executor.map(say_hello,name_list)
print('用时共: %s second' % (time.time() - start_time))


if __name__ == '__main__':
main()

  

 

输出结果

Hello  Satomi Ishihara
Hello Aragaki Yui
Hello Nainaiwei Hashimoto
Hello HIKARU UTADA
Hello Mai Kuraki
Hello Nozomi Sasaki
Hello Amuro Namie
Hello Sarah Brightman
用时共: 4.0022289752960205 second

  回调函数的使用

import  time
from concurrent.futures import ThreadPoolExecutor name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
'Amuro Namie',' Sarah Brightman'] def say_hello(str):
print("Hello ", str)
time.sleep(2)
return str def call_back(res):
res = res.result() #获取结果
print(res,"长度是%s"%len(res)) def main():
start_time = time.time()
# 用submit方法执行
executor=ThreadPoolExecutor(max_workers=4) for i in name_list:
executor.submit(say_hello,i).add_done_callback(call_back)
executor.shutdown(True)
#这里使用submit提交线程,使用add_done_callback()添加回调函数
print('用时共: %s second' % (time.time() - start_time)) if __name__ == '__main__':
main()

  输出结果

Hello  Satomi Ishihara
Hello Aragaki Yui
Hello Nainaiwei Hashimoto
Hello HIKARU UTADA
HIKARU UTADA 长度是12
Aragaki Yui 长度是11
Hello Mai Kuraki
Hello Nozomi Sasaki
Satomi Ishihara 长度是15
Nainaiwei Hashimoto 长度是19
Hello Amuro Namie
Hello Sarah Brightman
Nozomi Sasaki 长度是13
Amuro Namie 长度是11
Sarah Brightman 长度是16
Mai Kuraki 长度是10
用时共: 4.0022289752960205 second