Python多进程(multiprocessing)

时间:2023-05-10 14:41:50

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程

由于Windows没有fork调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的,

有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

在Windows下通过multiprocessing模块实现

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

window系统下,需要注意的是要想启动一个子进程,必须加上那句if __name__ == "main",进程相关的要写在这句下面。

例子

直接调用

 from multiprocessing import Process
import time
def f(name):
time.sleep(1)
print('hello', name,time.ctime()) if __name__ == '__main__':
p_list=[]
for i in range(3):
p = Process(target=f, args=('alvin',))
p_list.append(p)
p.start()
for i in p_list:
p.join()
print('end')

继承(类)式调用

 from multiprocessing import Process
import time class MyProcess(Process):
def __init__(self):
super(MyProcess, self).__init__()
#self.name = name def run(self):
time.sleep(1)
print ('hello', self.name,time.ctime()) if __name__ == '__main__':
p_list=[]
for i in range(3):
p = MyProcess()
p.start()
p_list.append(p) for p in p_list:
p.join() print('end')

获取进程pid

 from multiprocessing import Process
import time
import os def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid()) # 获取父进程pid
print('process id:', os.getpid()) # 获取子进程pid def f(name):
info('\033[31;1mfunction f\033[0m')
print('hello', name) if __name__ == '__main__': # windows下多进程必须有这句
info('\033[32;1mmain process line\033[0m')
time.sleep(5)
p = Process(target=info, args=('bob',))
p.start()
p.join()
print('end', __name__)

输出为

 main process line
module name: __main__
parent process: 10704
process id: 216
end __mp_main__
bob
module name: __mp_main__
parent process: 216
process id: 7612
end __main__

Process类

Process类的__init__

class Process(object):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
self.name = ''
self.daemon = False
self.authkey = None
self.exitcode = None
self.ident = 0
self.pid = 0
self.sentinel = None

  target:要执行的方法

  name:进程名

  args/kwargs:要传入的方法

实例方法

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性

  daemon:和线程的setDeamon功能一样

  exitcode(进程在运行时为None、如果为–N,表示被信号N结束)

  name:进程名字。

  pid:进程号。

进程间通信

不同进程间内存是不共享的,要实现两个进程间的数据交换

Queues

使用方法与threading里的queue类似

 from multiprocessing import Process, Queue

 def f(q,n):
q.put([42, n, 'hello']) if __name__ == '__main__':
q = Queue()
p_list=[]
for i in range(3):
p = Process(target=f, args=(q,i))
p_list.append(p)
p.start()
print(q.get())
print(q.get())
print(q.get())
for i in p_list:
i.join()

Pipe(管道)

multiprocessing.Pipe([duplex])

返回2个连接对象(conn1, conn2),代表管道的两端,默认是双向通信.如果duplex=False,conn1只能用来接收消息,conn2只能用来发送消息.默认duplex = True

可用于多个对象通信,即建立多个子进程

 import os

 from multiprocessing import Process, Pipe

 def f(conn):
conn.send('约吗')
print(conn.recv(),'in the %s' % os.getpid())
conn.close() if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p2 = Process(target=f, args=(child_conn,))
p.start()
p2.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
print(parent_conn.recv()) # prints "[42, None, 'hello']"
parent_conn.send('hello')
parent_conn.send('hello2') p.join()

Manager

用于数据共享

Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。

Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

 from multiprocessing import Process, Manager

 def f(d,l,n):
d[n] = ''
d[''] = 2
d[0.25] = None
l.append(n)
#print(l)
print('sub',id(d)) if __name__ == '__main__':
with Manager() as manager:# with open() as f== f=open() manager=Manager()
d = manager.dict() l = manager.list(range(5))
p_list = [] print('main',id(d))
for i in range(10):
p = Process(target=f, args=(d,l,i))
p.start()
p_list.append(p) for res in p_list:
res.join() print(d)
print(l)

进程池

进程池可用于一下子启动多个子进程

pool = Pool()

Pool()里面可以添加数字,默认是cpu核心数

 from  multiprocessing import Process, Pool
import time
import os def Foo(i):
time.sleep(2)
print('sub %s'%os.getpid()) return i + 100 def Bar(arg):
print('Bar:',os.getpid())
print('-->exec done:', arg) if __name__=='__main__':
pool = Pool()
print('main:',os.getpid())
for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar)
#pool.apply(func=Foo, args=(i,))
print('end')
 from multiprocessing import Pool
import os, time, random def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')