python 多进程间交换信息与共享信息

时间:2021-03-18 06:49:28

多线程调用函数,获取其返回值,个人总结了三种方法:

一、Queue(进程队列)

构造方法:multiprocessing.Queue([maxsize])

  Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

常用方法:

  1. q.size()    返回队列中信息大概数量,有时候可能不太准确。
  2. empty()   检测队列是否为空,空返回True,否则返回false。
  3. full()        检测队列是否存满,满返回Ture,否则返回false。
  4. put(obj[, block[, timeout]])    put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为True。如果队列当前为空且block为False,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为Flase,put方法将引发Full异常。
  5. put_nowait()  等同于put(obj,False)
  6. get([block[, timeout]])    移除并返回队列中一个信息。

    block为True(default)、timeout为None(default),有信息立刻返回,否则进程将进入拥塞状态,直到获取到一个信息。

    block为True(default)、设置timeout时间,有信息立刻返回,否则进程将进入拥塞状态,timeout时间过后,无信息产生Queue.Empty异常。

block为false,有信息立刻返回,无信息立刻产生Queue.Empty异常。

7. get_nowait()  等同于get(False)

Python Queue模块有三种队列及构造函数:

  • Python Queue模块的FIFO队列先进先出。 class Queue.Queue(maxsize)
  • LIFO类似于堆,即先进后出。 class Queue.LifoQueue(maxsize)
  • 还有一种是优先级队列级别越低越先出来。 class Queue.PriorityQueue(maxsize)

队列可存储多个数据,数据按照存取顺序依次获取。如果队列已为空,再次使用get()函数,主进程将进入等待状态。

from multiprocessing import Process,Queue

def multiply(a,b,que):                          #add a argument to function for assigning a queue
que.put(a * b) #putting return value into queue
que.put(a * (b - 1)) #putting return value into queue if __name__ == '__main__':
queue1 = Queue() #create a queue object
p = Process(target = multiply,args = (4,5,queue1)) #setting 3rd argument to queue1
p.start()
p.join()
print(queue1.get()) #getting return value: 20
print(queue1.get()) #getting return value: 16
print('OK')

  

运行结果:

20
16
OK

  

二、manager对象

Python中进程间共享数据,处理基本的queue,pipe和value+array外,还提供了更高层次的封装。使用multiprocessing.Manager可以简单地使用这些高级接口。
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。
Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

import multiprocessing

def worker(procnum, return_dict):
'''worker function'''
print str(procnum) + ' represent!'
return_dict[procnum] = procnum if __name__ == '__main__':
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,return_dict))
jobs.append(p)
p.start() for proc in jobs:
proc.join()
print return_dict.values()

运行结果:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

三、map

import multiprocessing
from os import getpid def worker(procnum):
print 'I am number %d in process %d' % (procnum, getpid())
return getpid() if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print pool.map(worker, range(5))

运行结果:

I am number 0 in process 8108
I am number 1 in process 8108
I am number 2 in process 8108
I am number 3 in process 8108
I am number 4 in process 8108
[8108, 8108, 8108, 8108, 8108]

四、Pipes(通道)

构造方法:multiprocessing.Pipe([duplex])

  duplex默认情况下为True,表示通道是双向的;为False,表示通道是单向的。

返回值:(conn1, conn2)

  返回一对连接对象,代表一个通道的两端。conn1只能用于接收信息,conn2只能用于发送信息。

from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close() if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()

运行结果:

[42, None, 'hello']

Pipe成对出现,一个发送信息,一个接受信息。

注意:两个以上进程或者线程同一时间读、写信息,将破坏管道中的信息。