python当中因为一个进程同一时刻只能执行一个线程,所以多线程效率并不高,要提高效率需要使用多进程。
Process([group [, target [, name [, args [, kwargs]]]]])
- target表示调用对象,你可以传入方法的名字
- args表示被调用对象的位置参数元组,比如target是函数a,他有两个参数m,n,那么args就传入(m, n)即可
- kwargs表示调用对象的字典
- name是别名,相当于给这个进程取一个名字
- group分组,实际上不使用
import multiprocessing
def process(num):
print 'Process:', num
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=(i,))
p.start()
可以通过cpu_count()方法还有active_children()方法获取当前机器的CPU核心数量以及得到目前所有的运行的进程。
mport multiprocessing
import time
def process(num):
time.sleep(num)
print 'Process:', num
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=(i,))
p.start()
print('CPU number:' + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print('Child process name: ' + p.name + ' id: ' + str(p.pid))
print('Process Ended')
通过设置deamon属性,如果设置为true,那么当父进程结束后,子进程会自动被终止,从而有效防止无控制地生成子进程。
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
p.start()
print 'Main process Ended!'
可以通过join来让所有子进程执行完了然后在结束。
rom multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
p.start()
p.join()
print 'Main process Ended!'
Lock可以避免进程同时占用资源而导致的一些问题。在一个进程输出时,尚未输出完,其他进程就输出了结果,导致排版问题。
from multiprocessing import Process, Lock
import time
class MyProcess(Process):
def __init__(self, loop, lock):
Process.__init__(self)
self.loop = loop
self.lock = lock
def run(self):
for count in range(self.loop):
time.sleep(0.1)
self.lock.acquire()
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
self.lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10, 15):
p = MyProcess(i, lock)
p.start()
Semaphore可以控制临界资源的数量,保证各个进程之间的互斥和同步。
from multiprocessing import Process, Semaphore, Lock, Queue
import time
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
class Consumer(Process):
def run(self):
global buffer, empty, full, lock
while True:
full.acquire()
lock.acquire()
buffer.get()
print('Consumer pop an element')
time.sleep(1)
lock.release()
empty.release()
class Producer(Process):
def run(self):
global buffer, empty, full, lock
while True:
empty.acquire()
lock.acquire()
buffer.put(1)
print('Producer append an element')
time.sleep(1)
lock.release()
full.release()
if __name__ == '__main__':
p = Producer()
c = Consumer()
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print 'Ended!'
Queue作为进程通信的共享队列使用,将其换成普通的list完全不起效果,因为即使一个进程中改变了list,另一个进程也不能获取它的状态。
Queue.empty() 如果队列为空,返回True, 反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 阻塞式写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
当进程很多,直接通过process生成大量进程并且又手动去限制进程数量又太过繁琐,这个时候就可以发挥进程池的功效。
进程池就是提供指定数量的进程供用户调用,当有新的请求到达时,如果池还没有满,就创建一个新的进程来执行该请求,如果池中进程已经满了,那么该请求就会等待,直到池中有进程结束才创建新的进程来执行它。
阻塞和没阻塞的差别:阻塞就是要等到回调结果出来,否则当前进程就挂起:非阻塞就是立马执行,不用等待某个进程结束。
非阻塞:
from multiprocessing import Lock, Pool
import time
def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index
if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
pool.apply_async(function, (i,))
print "Started processes"
pool.close()
pool.join()
print "Subprocess done."
阻塞:
from multiprocessing import Lock, Poolterminiate() 结束工作进程,不在处理未完成的任务
import time
def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index
if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
pool.apply(function, (i,))
print "Started processes"
pool.close()
pool.join()
print "Subprocess done."
join()主进程阻塞,等待子进程的退出。join方法要在close或terminiate后使用。
可以根据apply或apply_async返回的结果进行处理:
from multiprocessing import Lock, Pool
import time
def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index
return index
if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
result = pool.apply_async(function, (i,))
print result.get()
print "Started processes"
pool.close()
pool.join()
print "Subprocess done."
也可以使用map方法,适用于你有一堆数据要处理,每一项都需要经过一个方法来处理。
from multiprocessing import Pool
import requests
from requests.exceptions import ConnectionError
def scrape(url):
try:
print requests.get(url)
except ConnectionError:
print 'Error Occured ', url
finally:
print 'URL ', url, ' Scraped'
if __name__ == '__main__':
pool = Pool(processes=3)
urls = [
'https://www.baidu.com',
'http://www.meituan.com/',
'http://blog.csdn.net/',
'http://xxxyxxx.net'
]
pool.map(scrape, urls)