老师的博客:http://www.cnblogs.com/Eva-J/articles/8253549.html#_label14
Pipe
pipe是管道但是不是很推荐使用,因为有着不安全的危险,queue就相当于pipe加上lock,比较安全,但是的注意他们的close的时间,详见python3中的day38的笔记
下面是老师的代码
from multiprocessing import Lock,Pipe,Process def producer(con,pro,name,food): con.close() for i in range(100): f = '%s生产%s%s'%(name,food,i) print(f) pro.send(f) pro.send(None) pro.send(None) pro.send(None) pro.close() def consumer(con,pro,name,lock): pro.close() while True: lock.acquire() food = con.recv() lock.release() if food is None: con.close() break print('%s吃了%s' % (name, food)) if __name__ == '__main__': con,pro = Pipe() lock= Lock() p = Process(target=producer,args=(con,pro,'egon','泔水')) c1 = Process(target=consumer, args=(con, pro, 'alex',lock)) c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock)) c3 = Process(target=consumer, args=(con, pro, 'wusir',lock)) c1.start() c2.start() c3.start() p.start() con.close() pro.close() # pipe 数据不安全性 # IPC # 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象 # 队列 进程之间数据安全的 # 管道 + 锁
manager
老师的代码
from multiprocessing import Manager,Process,Lock def main(dic,lock): dic['count'] -= 1 if __name__ == '__main__': m = Manager() l = Lock() dic=m.dict({'count':100}) p_lst = [] for i in range(50): p = Process(target=main,args=(dic,l)) p.start() p_lst.append(p) for i in p_lst: i.join() print('主进程',dic)
运行几次后,你会发现得到的结果不一样,所以也存在数据不安全的现象,所以一般推荐使用queue比较安全,queue,pipe都是可是实现数据通信的。
数据池pool
参数介绍:
Pool([numprocess [,initializer [, initargs]]]):创建进程池
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组
1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 2 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,
必须从不同线程调用p.apply()函数或者使用p.apply_async()''' 3 4 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,
将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。''' 6 7 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 8 9 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法 2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。 3 obj.ready():如果调用完成,返回True 4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常 5 obj.wait([timeout]):等待结果变为可用。 6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
进程池的效率要高许多,详见老师的博客
import time from multiprocessing import Pool,Process def func(n): n=n for i in range(10): n+=i print(n) if __name__ == '__main__': start = time.time() pool = Pool(5) # 5个进程 pool.map(func,range(100)) # 100个任务 t1 = time.time() - start start = time.time() p_lst = [] for i in range(100):#100个任务 p = Process(target=func,args=(i,)) p_lst.append(p) p.start() for p in p_lst :p.join() t2 = time.time() - start print(t1,t2)
进程池的三种调用方式:
def fun(name): for i in range(10): print('my name is %s'%name,getpid()) if __name__=='__main__': pool=Pool(3) pool.map(fun,(1,2,3,4,5,6,7,8,9,10)) print('end') '''你可以发现进程号最多最多只有3个 ,你可以看end的输出始终一直在最后面) map(函数名,加上一个可迭代的对象)'''
import os import time from multiprocessing import Pool def func(n): print('start func%s'%n,os.getpid()) time.sleep(1) print('end func%s' % n,os.getpid()) if __name__ == '__main__': p = Pool(5) for i in range(10): p.apply_async(func,args=(i,)) p.close() # 结束进程池接收任务 p.join() # 感知进程池中的任务执行结束 '''注意此种调用方法时真真的异步,如果你不加入jion的的话你连打印的值就不可能显示出来 这个方法时一个一个的调,只是进程在那几个进程池中而已,而且传参的方式也不一样,前面一个是位置参数,后面一个是 第二个是默认参数须要=,具体自己看源码'''
import os import time from multiprocessing import Pool def func(n): print('start func%s'%n,os.getpid()) time.sleep(1) print('end func%s' % n,os.getpid()) if __name__ == '__main__': p = Pool(5) for i in range(10): p.apply(func,args=(i,)) p.close() # 结束进程池接收任务,及不能再想进池中加入显得代码,就是不能调用p了 p.apply(func,args=(10000,)) '''注意此种调用方法是同步的 这个方法时一个一个的调,只是进程在那几个进程池中而已,而且传参的方式也不一样,前面一个是位置参数,后面一个是 第二个是默认参数须要=,具体自己看源码 其实apply_async的形式是一样的 不用加join'''
import time from multiprocessing import Pool def func(name): print('你传递的参数是%s'%name) return '你的返回值' if __name__=='__main__': p=Pool(2) a=p.apply(func,args=('alex',)) b=p.apply_async(func, args=('jin',)) c = p.map(func, ('alex', 1)) time.sleep(1) print(a) print(b) print(c,type(c)) # print(c) '''输出:你传递的参数是alex 你传递的参数是jin 你传递的参数是alex 你传递的参数是1 你的返回值 <multiprocessing.pool.ApplyResult object at 0x0000000002BE8198> ['你的返回值', '你的返回值'] <class 'list'> 说明了apply是有返回值的 而apply_async这是一种方法 map由于其特殊性,返回值所有函数返回值组成的list'''
总结一下:
1.三种放方法的调用方式不同,map的调用时的参数是位置参数,必须传,第一个是函数名字,而且第二个是可迭代的对象,调用的是一群函数,返回值是list。
2.apply是第一个是位置参数,第二个是默认参数了,调用的是一个函数,而且有返回值
3.apply_asnyc与applyl一样的调用,也只能调用一个函数
4.比较:apply与map是同步的。不需要添加join
而apply_async如果主进程的时间太快则不会打印其内容的
5.p.close就是不能再往里里面添加新的进程了,pool里面的代码运行完毕后就结束了
更正一下:在apply_async是能拿到返回值的,但是需要.get()来调用的
map也是异步的,当时自动的添加了join,阻塞了,所以显示觉得是同步的。
另外,如果想要apply-async到达阻塞的效果,有两种放法,一种是通过get获取返回值,另外一种是先close在join也能达到前面的效果,但是再也不能往池子里加入代码了
回调函数
from multiprocessing import Pool from time import sleep def func(name): return name def func2(age): print('my name is %s'%age) return 'i am this func2’s retrun' if __name__=='__main__': p=Pool(5) a= p.apply_async(func,args=('alex',),callback=func2) # b = p.apply(func, args=('alex',), callback=func2) p.close() p.join() print(a) # print(b) '''输出结果;my name is alex <multiprocessing.pool.ApplyResult object at 0x0000000002BE35F8> 只有apply_aspnc才有回调函数,而且就是一样接受不到返回值,只是方法 原理是前面的函数的返回值带入callback函数的参数'''
from multiprocessing import Pool def func1(n): return n+1 def func2(m): print(m) if __name__ == '__main__': p = Pool(5) for i in range(10,20): a=p.apply_async(func1,args=(i,),callback=func2) a.get() #看一下这个是通过,get来达到依次执行的效果的
下面是老师的总结,可以看一下
# 管道 # 数据的共享 Manager dict list # 进程池 # cpu个数+1 # ret = map(func,iterable) # 异步 自带close和join # 所有结果的[] # apply # 同步的:只有当func执行完之后,才会继续向下执行其他代码 # ret = apply(func,args=()) # 返回值就是func的return # apply_async # 异步的:当func被注册进入一个进程之后,程序就继续向下执行 # apply_async(func,args=()) # 返回值 : apply_async返回的对象obj # 为了用户能从中获取func的返回值obj.get() # get会阻塞直到对应的func执行完毕拿到结果 # 使用apply_async给进程池分配任务, # 需要先close后join来保持多进程和主进程代码的同步性
。