【Python多任务--进程池Pool】

时间:2022-12-15 12:01:39

进程池Pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时就可以用到multiprocessing模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

一:使用进程池

例1:非阻塞

from multiprocessing import Pool
import os, time, random


def worker(name):
t_start = time.time()
print("%s开始执行,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
t_stop = time.time()
print(name, "执行完毕,耗时%0.2f" % (t_stop - t_start))

def main():
po = Pool(5) # 定义一个进程池,最大进程数5

# 往进程池中添加任务
for i in range(10):
# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标
po.apply_async(worker, (f'liang{i}',))

print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
print("----all_done----")

if __name__ == '__main__':
main()

执行结果

----start----
liang0开始执行,进程号为10404
liang1开始执行,进程号为9920
liang2开始执行,进程号为13136
liang3开始执行,进程号为10180
liang4开始执行,进程号为7708
liang4 执行完毕,耗时0.57
liang5开始执行,进程号为7708
liang2 执行完毕,耗时1.20
liang6开始执行,进程号为13136
liang1 执行完毕,耗时1.33
liang7开始执行,进程号为9920
liang0 执行完毕,耗时1.34
liang8开始执行,进程号为10404
liang3 执行完毕,耗时1.96
liang9开始执行,进程号为10180
liang5 执行完毕,耗时1.73
liang9 执行完毕,耗时0.54
liang8 执行完毕,耗时1.28
liang7 执行完毕,耗时1.37
liang6 执行完毕,耗时1.88
----all_done----

函数解释:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表, kwds为传递给func的关键字参数列表;
  • apply(func[, args[, kwds]]):使用阻塞方式调用func
  • close():关闭Pool,使其不再接受新的任务;
  • terminate():不管任务是否完成,立即终止;
  • join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;

执行说明:创建一个进程池pool,并设定进程的数量为5,range(10)会相继产生10个对象,10个对象被提交到pool中,因pool指定进程数为5,所以0、1、2、3、4会直接送到进程中执行,当其中一个执行完后才空出一个进程处理对象,继续去执行新的对象,所以会出现输出“liang5开始执行,进程号为7708”出现在"liang4 执行完毕,耗时0.57"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“----start----”,主程序在pool.join()处等待各个进程的结束。

例2:阻塞

from multiprocessing import Pool
import os, time, random


def worker(name):
t_start = time.time()
print("%s开始执行,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
t_stop = time.time()
print(name, "执行完毕,耗时%0.2f" % (t_stop - t_start))

def main():
po = Pool(3) # 定义一个进程池,最大进程数3

# 往进程池中添加任务
for i in range(0, 5):
# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标
po.apply(worker, (f'liang{i}',))

print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
print("----all_done----")

if __name__ == '__main__':
main()

输出


liang0开始执行,进程号为1976
liang0 执行完毕,耗时1.75
liang1开始执行,进程号为12624
liang1 执行完毕,耗时0.57
liang2开始执行,进程号为12444
liang2 执行完毕,耗时0.52
liang3开始执行,进程号为1976
liang3 执行完毕,耗时1.23
liang4开始执行,进程号为12624
liang4 执行完毕,耗时0.85
----start----
----all_done----

因为是阻塞,主函数会等待进程的执行,执行完之后才会继续往下,所以运行完所有进程后才输出“----start----”


例3、使用进程池,并返回结果

from multiprocessing import Pool
import os, time, random


def worker(name):
print("%s开始执行,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)
return name,os.getpid()

def main():
po = Pool(3) # 定义一个进程池,最大进程数3

res=[]
# 往进程池中添加任务
for i in range(0, 5):
# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标
res.append(po.apply_async(worker, (f'liang{i}',)))

print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
for result in res:
print(result.get()) #get()函数得出每个返回结果的值
print("----all_done----")

if __name__ == '__main__':
main()

输出结果:

----start----
liang0开始执行,进程号为14012
liang1开始执行,进程号为13000
liang2开始执行,进程号为14120
liang3开始执行,进程号为14012
liang4开始执行,进程号为14012
('liang0', 14012)
('liang1', 13000)
('liang2', 14120)
('liang3', 14012)
('liang4', 14012)
----all_done----

例4、多进程执行多个任务

from multiprocessing import Pool
import os, time, random


def worker1(name):
print("%s开始执行work1,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)

def worker2(name):
print("%s开始执行work2,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)

def worker3(name):
print("%s开始执行work3,进程号为%d" % (name, os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random() * 2)

def main():
po = Pool(4) # 定义一个进程池,最大进程数3

work_list=[worker1,worker2,worker3]
# 往进程池中添加任务
for work in work_list:
for i in range(3):
po.apply_async(work, (f'liang{i}',))

print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后

print("----all_done----")

if __name__ == '__main__':
main()

线程池4个线程执行3个任务,每个任务执行3次。
输出:

----start----
liang0开始执行work1,进程号为13088
liang1开始执行work1,进程号为4908
liang2开始执行work1,进程号为4200
liang0开始执行work2,进程号为8124
liang1开始执行work2,进程号为4908
liang2开始执行work2,进程号为13088
liang0开始执行work3,进程号为8124
liang1开始执行work3,进程号为4200
liang2开始执行work3,进程号为4908
----all_done----

二、进程池进程之间的通讯

进程池中进程的通讯队列
from multiprocessing import Pool, Manager
q = Manager().Queue()

import os
import time
from multiprocessing import Pool, Manager


def work(name, q):
time.sleep(1)
print(f"{name}:---{os.getpid()}---{q.get()}")

def main():
# 创建一个用于进程池通信的队列
q = Manager().Queue()

for i in range(1000):
q.put(f'data-{i}')

# 创建一个拥有五个进程的进程池
po = Pool(5)
# 往进程池中添加20个任务
for i in range(20):
po.apply_async(work, (f'liang{i}', q))

# close:关闭进程池(进程池停止接收任务)
po.close()
# 主进程等待进程池中的任务结束再往下执行
po.join()

if __name__ == '__main__':
main()


三、多进程+协程实现并发

小练习:

假设一个队列中有100000个URL地址,每个请求需要1秒钟,尝试用4个进程,每个进程中开启1000个协程去请求!统计运行时间


from gevent import monkey
monkey.patch_all(thread=False)
import gevent
import time
from multiprocessing import Process, Queue
import os


def time_count(func):
def wrapper(*args, **kwargs):
start_time = time.time()
func(*args, **kwargs)
end_time = time.time()
print('总耗时:', end_time - start_time)

return wrapper


class Myprocess(Process):
def __init__(self, que):
super().__init__()
self.que = que

#重写进程子类的run函数
def run(self):
cos = []
#开启多协程
for i in range(1000):
#调用工作函数
cor = gevent.spawn(self.work)
cos.append(cor)
gevent.joinall(cos)

#定义工作函数
def work(self):
while self.que.qsize() > 0:
try:
url = self.que.get(timeout=1)
time.sleep(1)
print(f"{os.getpid()}正在请求url:{url}")
except Exception as e:
print(e.__repr__())
break


@time_count
def main():
q = Queue()
for i in range(100000):
q.put(f'https://www.baidu.com--{i}')

process_list = []
for i in range(4):
p = Myprocess(q)
process_list.append(p)
p.start()
for p in process_list:
p.join()

print("任务结束")


if __name__ == '__main__':
main()

运行时间27秒