本文实例讲述了Python多进程multiprocessing用法。分享给大家供大家参考,具体如下:
mutilprocess简介
像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。
简单的创建进程:
1
2
3
4
5
6
7
8
9
10
11
|
import multiprocessing
def worker(num):
"""thread worker function"""
print 'Worker:' , num
return
if __name__ = = '__main__' :
jobs = []
for i in range ( 5 ):
p = multiprocessing.Process(target = worker, args = (i,))
jobs.append(p)
p.start()
|
确定当前的进程,即是给进程命名,方便标识区分,跟踪
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep( 2 )
print name, 'Exiting'
def my_service():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep( 3 )
print name, 'Exiting'
if __name__ = = '__main__' :
service = multiprocessing.Process(name = 'my_service' ,
target = my_service)
worker_1 = multiprocessing.Process(name = 'worker 1' ,
target = worker)
worker_2 = multiprocessing.Process(target = worker) # default name
worker_1.start()
worker_2.start()
service.start()
|
守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了
守护进程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print 'Starting:' , name
time.sleep( 2 )
print 'Exiting :' , name
def non_daemon():
name = multiprocessing.current_process().name
print 'Starting:' , name
print 'Exiting :' , name
if __name__ = = '__main__' :
d = multiprocessing.Process(name = 'daemon' ,
target = daemon)
d.daemon = True
n = multiprocessing.Process(name = 'non-daemon' ,
target = non_daemon)
n.daemon = False
d.start()
n.start()
d.join( 1 )
print 'd.is_alive()' , d.is_alive()
n.join()
|
最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态
终止进程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import multiprocessing
import time
def slow_worker():
print 'Starting worker'
time.sleep( 0.1 )
print 'Finished worker'
if __name__ = = '__main__' :
p = multiprocessing.Process(target = slow_worker)
print 'BEFORE:' , p, p.is_alive()
p.start()
print 'DURING:' , p, p.is_alive()
p.terminate()
print 'TERMINATED:' , p, p.is_alive()
p.join()
print 'JOINED:' , p, p.is_alive()
|
①. == 0 未生成任何错误
②. 0 进程有一个错误,并以该错误码退出
③. < 0 进程由一个-1 * exitcode信号结束
进程的退出状态:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import multiprocessing
import sys
import time
def exit_error():
sys.exit( 1 )
def exit_ok():
return
def return_value():
return 1
def raises():
raise RuntimeError( 'There was an error!' )
def terminated():
time.sleep( 3 )
if __name__ = = '__main__' :
jobs = []
for f in [exit_error, exit_ok, return_value, raises, terminated]:
print 'Starting process for' , f.func_name
j = multiprocessing.Process(target = f, name = f.func_name)
jobs.append(j)
j.start()
jobs[ - 1 ].terminate()
for j in jobs:
j.join()
print '%15s.exitcode = %s' % (j.name, j.exitcode)
|
方便的调试,可以用logging
日志:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import multiprocessing
import logging
import sys
def worker():
print 'Doing some work'
sys.stdout.flush()
if __name__ = = '__main__' :
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target = worker)
p.start()
p.join()
|
利用class来创建进程,定制子类
派生进程:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import multiprocessing
class Worker(multiprocessing.Process):
def run( self ):
print 'In %s' % self .name
return
if __name__ = = '__main__' :
jobs = []
for i in range ( 5 ):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
|
python进程间传递消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
import multiprocessing
class MyFancyClass( object ):
def __init__( self , name):
self .name = name
def do_something( self ):
proc_name = multiprocessing.current_process().name
print 'Doing something fancy in %s for %s!' % \
(proc_name, self .name)
def worker(q):
obj = q.get()
obj.do_something()
if __name__ = = '__main__' :
queue = multiprocessing.Queue()
p = multiprocessing.Process(target = worker, args = (queue,))
p.start()
queue.put(MyFancyClass( 'Fancy Dan' ))
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__( self , task_queue, result_queue):
multiprocessing.Process.__init__( self )
self .task_queue = task_queue
self .result_queue = result_queue
def run( self ):
proc_name = self .name
while True :
next_task = self .task_queue.get()
if next_task is None :
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self .task_queue.task_done()
break
print '%s: %s' % (proc_name, next_task)
answer = next_task()
self .task_queue.task_done()
self .result_queue.put(answer)
return
class Task( object ):
def __init__( self , a, b):
self .a = a
self .b = b
def __call__( self ):
time.sleep( 0.1 ) # pretend to take some time to do the work
return '%s * %s = %s' % ( self .a, self .b, self .a * self .b)
def __str__( self ):
return '%s * %s' % ( self .a, self .b)
if __name__ = = '__main__' :
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print 'Creating %d consumers' % num_consumers
consumers = [ Consumer(tasks, results)
for i in xrange (num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in xrange (num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in xrange (num_consumers):
tasks.put( None )
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print 'Result:' , result
num_jobs - = 1
|
Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。
进程间信号传递:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print 'wait_for_event: starting'
e.wait()
print 'wait_for_event: e.is_set()->' , e.is_set()
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print 'wait_for_event_timeout: starting'
e.wait(t)
print 'wait_for_event_timeout: e.is_set()->' , e.is_set()
if __name__ = = '__main__' :
e = multiprocessing.Event()
w1 = multiprocessing.Process(name = 'block' ,
target = wait_for_event,
args = (e,))
w1.start()
w2 = multiprocessing.Process(name = 'nonblock' ,
target = wait_for_event_timeout,
args = (e, 2 ))
w2.start()
print 'main: waiting before calling Event.set()'
time.sleep( 3 )
e. set ()
print 'main: event is set'
|
Python多进程,一般的情况是Queue来传递。
Queue:
1
2
3
4
5
6
7
8
9
|
from multiprocessing import Process, Queue
def f(q):
q.put([ 42 , None , 'hello' ])
if __name__ = = '__main__' :
q = Queue()
p = Process(target = f, args = (q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
|
多线程优先队列Queue:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
import Queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__( self , threadID, name, q):
threading.Thread.__init__( self )
self .threadID = threadID
self .name = name
self .q = q
def run( self ):
print "Starting " + self .name
process_data( self .name, self .q)
print "Exiting " + self .name
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
queueLock.release()
print "%s processing %s" % (threadName, data)
else :
queueLock.release()
time.sleep( 1 )
threadList = [ "Thread-1" , "Thread-2" , "Thread-3" ]
nameList = [ "One" , "Two" , "Three" , "Four" , "Five" ]
queueLock = threading.Lock()
workQueue = Queue.Queue( 10 )
threads = []
threadID = 1
# Create new threads
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID + = 1
# Fill the queue
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# Wait for queue to empty
while not workQueue.empty():
pass
# Notify threads it's time to exit
exitFlag = 1
# Wait for all threads to complete
for t in threads:
t.join()
print "Exiting Main Thread"
|
多进程使用Queue通信的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import time
from multiprocessing import Process,Queue
MSG_QUEUE = Queue( 5 )
def startA(msgQueue):
while True :
if msgQueue.empty() > 0 :
print ( 'queue is empty %d' % (msgQueue.qsize()))
else :
msg = msgQueue.get()
print ( 'get msg %s' % (msg,))
time.sleep( 1 )
def startB(msgQueue):
while True :
msgQueue.put( 'hello world' )
print ( 'put hello world queue size is %d' % (msgQueue.qsize(),))
time.sleep( 3 )
if __name__ = = '__main__' :
processA = Process(target = startA,args = (MSG_QUEUE,))
processB = Process(target = startB,args = (MSG_QUEUE,))
processA.start()
print ( 'processA start..' )
|
主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。
希望本文所述对大家Python程序设计有所帮助。
原文链接:http://www.cnblogs.com/IPYQ/p/5573628.html