1. 什么是进程?
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。
一个进程至少包含一个线程。
2. 在python中有了多线程编程为何还需要多进程编程?
在python中由于有GIL(全局解释器锁)的存在,在任一时刻只有一个线程在运行(无论你的CPU是多少核),无法实现真正的多线程。那么该如何让python程序真正的并行运行呢?答案就是不要使用多线程,使用多进程。python标准库提供了multiprocessing模块(multiprocessing
是一个和threading
模块类似,提供API,生成进程的模块。multiprocessing
包提供本地和远程并发,通过使用子进程而不是线程有效地转移全局解释器锁。),它的API几乎复制了threading模块的API,当然它还有一行threading模块没有的API。
例一(multiprocessing模块的简单使用):
import multiprocessing,time class Task(multiprocessing.Process):
def __init__(self):
super(Task, self).__init__() def run(self):
print("Process---%s" % self.name)
time.sleep(2) if __name__ == "__main__":
for i in range(1, 8+1):
t = Task()
t.start()
注:由于multiprocessing模块基本的API同threading模块,就不挨个演示了,本文主要讲解multiprocessing模块不同于threading模块的API的使用。要了解其他同threading模块相同的API的使用,可参见:http://www.cnblogs.com/God-Li/p/7732407.html
multiprocessing.Process源码:
class Process(object):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
self.name = ''
self.daemon = False #守护进程标志,必须在start()之前设置
self.authkey = None #The process’s authentication key (a byte string).
self.exitcode = None #The child’s exit code. This will be None if the process has not yet terminated. A negative value -N indicates that the child was terminated by signal N.
self.ident = 0
self.pid = 0 #进程ID。在生成进程之前,这将是Non。
self.sentinel = None #A numeric handle of a system object which will become “ready” when the process ends. def run(self):
pass def start(self):
pass def terminate(self):
"""
Terminate the process. On Unix this is done using the SIGTERM signal; on Windows TerminateProcess() is used.
Note that exit handlers and finally clauses, etc., will not be executed.
Note that descendant processes of the process will not be terminated – they will simply become orphaned.
:return:
"""
pass def join(self, timeout=None):
pass def is_alive(self):
return False
multiprocessing模块中的队列:
class multiprocessing.
Queue
([maxsize])实现除task_done()
和join()
之外的queue.Queue
的所有方法,下面列出queue.Queue中没有的方法:
class multiprocessing.Queue([maxsize])
close()
"""
指示当前进程不会在此队列上放置更多数据。
The background thread will quit once it has flushed all buffered data to the pipe.
当队列被垃圾回收时,这被自动调用。
""" join_thread()
"""
加入后台线程。这只能在调用close()之后使用。它阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到pipe。
默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。
该进程可以调用cancel_join_thread()使join_thread()不执行任何操作
""" cancel_join_thread()
"""
使join_thread()不执行任何操作
"""
class multiprocessing.
SimpleQueue是class
multiprocessing.
Queue
([maxsize])的简化,只有三个方法------empty(), get(), put()
class multiprocessing.
JoinableQueue
([maxsize])是class multiprocessing.
Queue
([maxsize])的子类,增加了take_done()和join()方法
注:由于进程之间内存空间不共享,所以必须将实例化后的queue对象当作参数传入其他进程,其他进程才能使用。而且,每传入一次相当于克隆一份,与原来的queue独立,只是python会同步queue中的数据,而不是像在多线程的queue数据只有一份。
进程之间的通信:
multiprocessing.
Pipe
([duplex]) --------------- 返回表示管道末端的Connection
对象(类似与socket中的连接可用于发送和接收数据)的(conn1, conn2)。
如果duplex是True
(默认值),则管道是双向的。如果duplex是False
,则管道是单向的:conn1
只能用于接收消息,conn2
用于发送消息。
例二(multiprocessing.Pipe使用演示):
import multiprocessing,time class Processing_1(multiprocessing.Process):
def __init__(self, conn):
super(Processing_1, self).__init__()
self.conn = conn
def run(self):
send_data = "this message is from p1"
self.conn.send(send_data) #使用conn发送数据
time.sleep(0.8)
recv_data = self.conn.recv() #使用conn接收数据
print("p1 recv: " + recv_data)
self.conn.close() class Processing_2(multiprocessing.Process):
def __init__(self, conn):
super(Processing_2, self).__init__()
self.conn = conn def run(self):
send_data = "this message is from p2"
self.conn.send(send_data)
time.sleep(0.8)
recv_data = self.conn.recv()
print("p2 recv: " + recv_data)
self.conn.close() if __name__ == "__main__":
conn1, conn2 = multiprocessing.Pipe() #实例化Pipe对象,conn1, conn2分别代表连接两端 p1 = Processing_1(conn1) #将连接对象当作参数传递给子进程
p2 = Processing_2(conn2) p1.start()
p2.start() p1.join()
p2.join()
multiprocessing.Pipe使用演示
进程之间的数据共享:
multiprocessing.
Manager
() ----------- 返回开始的SyncManager
对象,可用于在进程之间共享对象。返回的管理器对象对应于生成的子进程,并且具有将创建共享对象并返回相应代理的方法。管理器进程将在垃圾收集或其父进程退出时立即关闭。
例三(Manager的简单使用):
import multiprocessing,time
import os class Processing(multiprocessing.Process):
def __init__(self, d, l):
super(Processing, self).__init__()
self.d = d
self.l = l def run(self):
self.d[os.getpid()] = os.getpid() #当作正常dict使用即可
self.l.append(1)
print(self.l) if __name__ == "__main__": manager = multiprocessing.Manager() #生成Manager 对象
d = manager.dict() #生成共享dict
l = manager.list() #生成共享list p_s = []
for i in range(10):
p = Processing(d, l)
p.start()
p_s.append(p) for p in p_s:
p.join() print(d)
print(l)
Manager简单使用
manager可以生成以下共享数据对象(常用):
-
Event
() -
Create a shared
threading.Event
object and return a proxy for it.
-
Lock
() -
Create a shared
threading.Lock
object and return a proxy for it.
-
Namespace
() -
Create a shared
Namespace
object and return a proxy for it.
-
Queue
([maxsize]) -
Create a shared
queue.Queue
object and return a proxy for it.
-
RLock
() -
Create a shared
threading.RLock
object and return a proxy for it.
-
Semaphore
([value]) -
Create a shared
threading.Semaphore
object and return a proxy for it.
-
Array
(typecode, sequence) -
Create an array and return a proxy for it.
-
Value
(typecode, value)¶ -
Create an object with a writable
value
attribute and return a proxy for it.
-
dict
() -
dict
(mapping) -
dict
(sequence) -
Create a shared
dict
object and return a proxy for it.
-
list
() -
list
(sequence) -
Create a shared
list
object and return a proxy for it.
进程锁:
进程锁有两种multiprocessing.
Lock(非递归锁)和
multiprocessing.
RLock(递归锁)。
multiprocessing.
Lock(非递归锁):一旦进程或线程获得了锁,随后从任何进程或线程获取它的尝试将阻塞,直到它被释放;任何进程或线程都可以释放它。
multiprocessing.
RLock(递归锁): A recursive lock must be released by the process or thread that acquired it. Once a process or thread has acquired a recursive lock, the same process or thread may acquire it again without blocking; that process or thread must release it once for each time it has been acquired.
这两种锁都只用两种方法:acquire
(block=True, timeout=None)和release
(),它们的使用基本和线程锁类似(只不是要把锁的示例对象当作参数传入其他的进程):http://www.cnblogs.com/God-Li/p/7732407.html
进程池:
为了便于对多进程的管理,通常使用进程池来进行多进程编程(而不是使用multiprocessing.Process)。
例:
import multiprocessing,os
import time def run():
print(str(os.getpid()) + "-----running")
time.sleep(2)
print(str(os.getpid()) + "-----done") def done():
print("done") def error():
print("error") if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4) #实力化进程池对象 for i in range(8):
# pool.apply(func=run) #进程池中的进程串行运行
pool.apply_async(func=run) pool.close()
pool.join()
print("finish....")
Pool对象常用方法:
-
apply
(func[, args[, kwds]]) -
Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks,
apply_async()
is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.将任务提交到进程池,只有一个进程在工作,其他进程处于阻塞状态(相当于串行运行)。
-
apply_async
(func[, args[, kwds[, callback[, error_callback]]]]) -
A variant of the
apply()
method which returns a result object.If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.
If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.
Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.
将任务提交到进程池,多个进程(进程数量由之前实例化时的processes参数设置)同时运行,callback工作进程完成时(由当前进程的父进程)调用由此传入的任务,error_callback工作进程出错时(由当前进程的父进程)调用由此传入的任务。
-
close
() -
Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
调用此方法后进程池不能在提交新的任务
-
terminate
() -
Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected
terminate()
will be called immediately.立即停止工作进程,而不需要等待未完成的工作进程。
-
join
() -
Wait for the worker processes to exit. One must call
close()
orterminate()
before usingjoin()
.等待进程池中的工作进程结束(在此之前必须调用close()或者terminate())。
注:Pool对象在生成时进程内的进程(阻塞)就已经启动,使用apply(或者apply_async)方法只是将任务提交给线程池,不会再建立新进程。