Python多进程multiprocessing使用示例

时间:2022-05-01 04:05:59

mutilprocess简介

像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。

Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
import multiprocessing

def worker(num):
"""thread worker function"""
print 'Worker:', num
return

if __name__ == '__main__':
jobs
= []
for i in range(5):
p
= multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
简单的创建进程

确定当前的进程,即是给进程命名,方便标识区分,跟踪

Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
import multiprocessing
import time

def worker():
name
= multiprocessing.current_process().name
print name, 'Starting'
time.sleep(
2)
print name, 'Exiting'

def my_service():
name
= multiprocessing.current_process().name
print name, 'Starting'
time.sleep(
3)
print name, 'Exiting'

if __name__ == '__main__':
service
= multiprocessing.Process(name='my_service',
target
=my_service)
worker_1
= multiprocessing.Process(name='worker 1',
target
=worker)
worker_2
= multiprocessing.Process(target=worker) # default name

worker_1.start()
worker_2.start()
service.start()
View Code

守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了

Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
import multiprocessing
import time
import sys

def daemon():
name
= multiprocessing.current_process().name
print 'Starting:', name
time.sleep(
2)
print 'Exiting :', name

def non_daemon():
name
= multiprocessing.current_process().name
print 'Starting:', name
print 'Exiting :', name

if __name__ == '__main__':
d
= multiprocessing.Process(name='daemon',
target
=daemon)
d.daemon
= True

n
= multiprocessing.Process(name='non-daemon',
target
=non_daemon)
n.daemon
= False

d.start()
n.start()

d.join(
1)
print 'd.is_alive()', d.is_alive()
n.join()
守护进程

最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态

Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
import multiprocessing
import time

def slow_worker():
print 'Starting worker'
time.sleep(
0.1)
print 'Finished worker'

if __name__ == '__main__':
p
= multiprocessing.Process(target=slow_worker)
print 'BEFORE:', p, p.is_alive()

p.start()
print 'DURING:', p, p.is_alive()

p.terminate()
print 'TERMINATED:', p, p.is_alive()

p.join()
print 'JOINED:', p, p.is_alive()
终止进程
  1. == 0 未生成任何错误  
  2. 0 进程有一个错误,并以该错误码退出
  3. < 0 进程由一个-1 * exitcode信号结束
Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
import multiprocessing
import sys
import time

def exit_error():
sys.exit(
1)

def exit_ok():
return

def return_value():
return 1

def raises():
raise RuntimeError('There was an error!')

def terminated():
time.sleep(
3)

if __name__ == '__main__':
jobs
= []
for f in [exit_error, exit_ok, return_value, raises, terminated]:
print 'Starting process for', f.func_name
j
= multiprocessing.Process(target=f, name=f.func_name)
jobs.append(j)
j.start()

jobs[
-1].terminate()

for j in jobs:
j.join()
print '%15s.exitcode = %s' % (j.name, j.exitcode)
进程的退出状态

方便的调试,可以用logging

Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
import multiprocessing
import logging
import sys

def worker():
print 'Doing some work'
sys.stdout.flush()

if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger
= multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p
= multiprocessing.Process(target=worker)
p.start()
p.join()
日志

利用class来创建进程,定制子类

Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
import multiprocessing

class Worker(multiprocessing.Process):

def run(self):
print 'In %s' % self.name
return

if __name__ == '__main__':
jobs
= []
for i in range(5):
p
= Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
派生进程
Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
import multiprocessing

class MyFancyClass(object):

def __init__(self, name):
self.name
= name

def do_something(self):
proc_name
= multiprocessing.current_process().name
print 'Doing something fancy in %s for %s!' % \
(proc_name, self.name)

def worker(q):
obj
= q.get()
obj.do_something()

if __name__ == '__main__':
queue
= multiprocessing.Queue()

p
= multiprocessing.Process(target=worker, args=(queue,))
p.start()

queue.put(MyFancyClass(
'Fancy Dan'))

# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()

import multiprocessing
import time

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
multiprocessing.Process.
__init__(self)
self.task_queue
= task_queue
self.result_queue
= result_queue

def run(self):
proc_name
= self.name
while True:
next_task
= self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
print '%s: %s' % (proc_name, next_task)
answer
= next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return

class Task(object):
def __init__(self, a, b):
self.a
= a
self.b
= b
def __call__(self):
time.sleep(
0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)

if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results
= multiprocessing.Queue()

# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print 'Creating %d consumers' % num_consumers
consumers
= [ Consumer(tasks, results)
for i in xrange(num_consumers) ]
for w in consumers:
w.start()

# Enqueue jobs
num_jobs = 10
for i in xrange(num_jobs):
tasks.put(Task(i, i))

# Add a poison pill for each consumer
for i in xrange(num_consumers):
tasks.put(None)

# Wait for all of the tasks to finish
tasks.join()

# Start printing results
while num_jobs:
result
= results.get()
print 'Result:', result
num_jobs
-= 1
python进程间传递消息

Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。

Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
import multiprocessing
import time

def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print 'wait_for_event: starting'
e.wait()
print 'wait_for_event: e.is_set()->', e.is_set()

def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print 'wait_for_event_timeout: starting'
e.wait(t)
print 'wait_for_event_timeout: e.is_set()->', e.is_set()

if __name__ == '__main__':
e
= multiprocessing.Event()
w1
= multiprocessing.Process(name='block',
target
=wait_for_event,
args
=(e,))
w1.start()

w2
= multiprocessing.Process(name='nonblock',
target
=wait_for_event_timeout,
args
=(e, 2))
w2.start()

print 'main: waiting before calling Event.set()'
time.sleep(
3)
e.set()
print 'main: event is set'
进程间信号传递

Python多进程,一般的情况是Queue来传递。

Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
from multiprocessing import Process, Queue

def f(q):
q.put([
42, None, 'hello'])

if __name__ == '__main__':
q
= Queue()
p
= Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
Queue
Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
import Queue
import threading
import time

exitFlag
= 0

class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.
__init__(self)
self.threadID
= threadID
self.name
= name
self.q
= q
def run(self):
print "Starting " + self.name
process_data(self.name, self.q)
print "Exiting " + self.name

def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data
= q.get()
queueLock.release()
print "%s processing %s" % (threadName, data)
else:
queueLock.release()
time.sleep(
1)

threadList
= ["Thread-1", "Thread-2", "Thread-3"]
nameList
= ["One", "Two", "Three", "Four", "Five"]
queueLock
= threading.Lock()
workQueue
= Queue.Queue(10)
threads
= []
threadID
= 1

# Create new threads
for tName in threadList:
thread
= myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID
+= 1

# Fill the queue
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()

# Wait for queue to empty
while not workQueue.empty():
pass

# Notify threads it's time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
t.join()
print "Exiting Main Thread"
多线程优先队列Queue

多进程使用Queue通信的例子

Python多进程multiprocessing使用示例Python多进程multiprocessing使用示例
import time
from multiprocessing import Process,Queue

MSG_QUEUE
= Queue(5)

def startA(msgQueue):
while True:
if msgQueue.empty() > 0:
print ('queue is empty %d' % (msgQueue.qsize()))
else:
msg
= msgQueue.get()
print( 'get msg %s' % (msg,))
time.sleep(
1)

def startB(msgQueue):
while True:
msgQueue.put(
'hello world')
print( 'put hello world queue size is %d' % (msgQueue.qsize(),))
time.sleep(
3)

if __name__ == '__main__':
processA
= Process(target=startA,args=(MSG_QUEUE,))
processB
= Process(target=startB,args=(MSG_QUEUE,))

processA.start()
print( 'processA start..')
View Code

主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。