[笔记]python multiprocessing模块

时间:2022-07-24 18:16:49

现在越来越多的计算机程序采用多进程,多线程。C++, Java都提供了多进程多线程模块,python也不例外。python在多进程方面提供了multiprocessing模块。

建立子进程

import os
import multiprocessing as mp
def run_proc(name):
    print("run child process %s (%s)"%(name,os.getpid()))
if __name__ == '__main__':
    print("Parent process %s." % os.getpid())
    p = mp.Process(target = run_proc,args=('test',))
    print("Child process will start.")
    p.start()

    p.join()
    print("Child process end.")

运行上面代码,发现如下结果:

Parent process 51908.
Child process will start.
run child process test (51908)
Child process end.

子进程的创建需要一个函数,和一个参数。
子进程的start()方法可以将子进程启动。
join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

pool

子进程比较多的话可以使用进程池

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

进程池的实例由Pool()创建,其中参数是进程池容纳子进程的个数,其默认值为电脑的cpu核心数,与cpu_count()函数的返回值相同。
创建完Pool实例后就可以直接向进程池里面加入子进程,这个由进程池的函数apply_async()负责,其参数与上面创建子进程相同,包括函数名,与对应的参数。
向进程池扔完子进程后,一定不要忘记把进程池关闭,就是利用close()函数,关闭进程池,这样就不能有其他子进程添加进来了。
对进程池实例使用join()函数,目的是等待所以进程池内的进程执行完毕。如果在join前面没有close进程池,一定会报错。
观察结果,任务0,1,2,3先执行,因为进程池只有4个位置,所以几乎同时执行了任务0,1,2,3。这是因为Pool的默认大小在我的电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果把任务改成4个,就不会看到等待着的第5个任务了。

子进程subprocess

子进程是由一个父进程创建的进程。
创建子进程后要给子进程输入输出的参数。
下面的例子,展示了利用子进程得到python.org网站信息的过程。

import subprocess
print("nslookup www.python.org")
r = subprocess.call(['nslookup','www.python.org'])

运行结果如下:
服务器: dns.xxx.edu.cn
Address: 10.0.0.10

非权威应答:
名称: python.map.fastly.net
Address: 103.245.222.223
Aliases: www.python.org

如果子进程还要更多参数,还可以使用communicate()函数进行操作。
在进行communicate之前,应先创建subprocess.Popen()实例。
communicate的作用是:
Interact with process: Send data to stdin. Read data from
stdout and stderr, until end-of-file is reached. Wait for
process to terminate. The optional input argument should be
bytes to be sent to the child process, or None, if no data
should be sent to the child.

import subprocess
print('nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('gbk'))
print('Exit code:', p.returncode)

运行结果如下:
默认服务器: dns.xxx.edu.cn
Address: 10.0.0.10

> > 服务器: dns.xxx.edu.cn
Address: 10.0.0.10

python.org MX preference = 50, mail exchanger = mail.python.org

python.org nameserver = ns2.p11.dynect.net
python.org nameserver = ns4.p11.dynect.net
python.org nameserver = ns1.p11.dynect.net
python.org nameserver = ns3.p11.dynect.net
mail.python.org internet address = 82.94.164.166
mail.python.org AAAA IPv6 address = 2001:888:2000:d::a6
ns3.p11.dynect.net internet address = 208.78.71.11
ns1.p11.dynect.net internet address = 208.78.70.11
ns4.p11.dynect.net internet address = 204.13.251.11
ns2.p11.dynect.net internet address = 204.13.250.11
>
Exit code: 0

进程间通信

进程间处理任务时需要共享信息,也就是通信。python的multiprocess模块提供了Queue,Pipes等方式进行通信操作。
下面的例子使用Queue传递数据。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import multiprocessing as mp
import time, random, os


def write(que):
    """ @parameter: que, a Queue Objct """
    print('Process %s to write' % (os.getpid(),))
    for val in ['a','b','c']:
        print('putting %s in que'%(val,))
        que.put(val)
        time.sleep(1)

def read(que):
    """ @parameter: que, a Queue Objct """
    print('Process %s to read' % (os.getpid(),))
    while True:
        value = que.get(block=True) # reading and writing cannot
                                    # happen simutaneously
        print('Get %s from queue.' % value)


if __name__ == "__main__":
    que = mp.Queue()
    pw = mp.Process(target=write, args=(que,))
    pr = mp.Process(target=read, args=(que,))

    pw.start()
    pr.start()

    pw.join() # wairing for the end of pw
    pr.terminate()

运行结果如下:
Process 32520 to read
Process 34836 to write
putting a in que
Get a from queue.
putting b in que
Get b from queue.
putting c in que
Get c from queue.