python爬虫-多进程

时间:2021-07-29 03:49:46

python当中因为一个进程同一时刻只能执行一个线程,所以多线程效率并不高,要提高效率需要使用多进程。


Process([group [, target [, name [, args [, kwargs]]]]])

  • target表示调用对象,你可以传入方法的名字
  • args表示被调用对象的位置参数元组,比如target是函数a,他有两个参数m,n,那么args就传入(m, n)即可
  • kwargs表示调用对象的字典
  • name是别名,相当于给这个进程取一个名字
  • group分组,实际上不使用
import multiprocessing

def process(num):
print 'Process:', num

if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=(i,))
p.start()

可以通过cpu_count()方法还有active_children()方法获取当前机器的CPU核心数量以及得到目前所有的运行的进程。

mport multiprocessing
import time

def process(num):
time.sleep(num)
print 'Process:', num

if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=(i,))
p.start()

print('CPU number:' + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print('Child process name: ' + p.name + ' id: ' + str(p.pid))

print('Process Ended')

通过设置deamon属性,如果设置为true,那么当父进程结束后,子进程会自动被终止,从而有效防止无控制地生成子进程。

from multiprocessing import Process
import time


class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop

def run(self):
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))


if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
p.start()


print 'Main process Ended!'

可以通过join来让所有子进程执行完了然后在结束。

rom multiprocessing import Process
import time


class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop

def run(self):
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))


if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
p.start()
p.join()


print 'Main process Ended!'

Lock可以避免进程同时占用资源而导致的一些问题。在一个进程输出时,尚未输出完,其他进程就输出了结果,导致排版问题。

from multiprocessing import Process, Lock
import time


class MyProcess(Process):
def __init__(self, loop, lock):
Process.__init__(self)
self.loop = loop
self.lock = lock

def run(self):
for count in range(self.loop):
time.sleep(0.1)
self.lock.acquire()
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
self.lock.release()

if __name__ == '__main__':
lock = Lock()
for i in range(10, 15):
p = MyProcess(i, lock)
p.start()

Semaphore可以控制临界资源的数量,保证各个进程之间的互斥和同步。

from multiprocessing import Process, Semaphore, Lock, Queue
import time

buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()

class Consumer(Process):

def run(self):
global buffer, empty, full, lock
while True:
full.acquire()
lock.acquire()
buffer.get()
print('Consumer pop an element')
time.sleep(1)
lock.release()
empty.release()


class Producer(Process):
def run(self):
global buffer, empty, full, lock
while True:
empty.acquire()
lock.acquire()
buffer.put(1)
print('Producer append an element')
time.sleep(1)
lock.release()
full.release()


if __name__ == '__main__':
p = Producer()
c = Consumer()
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print 'Ended!'

Queue作为进程通信的共享队列使用,将其换成普通的list完全不起效果,因为即使一个进程中改变了list,另一个进程也不能获取它的状态。

Queue.empty() 如果队列为空,返回True, 反之False

Queue.full() 如果队列满了,返回True,反之False

Queue.get([block[, timeout]]) 获取队列,timeout等待时间

Queue.get_nowait() 相当Queue.get(False)

Queue.put(item) 阻塞式写入队列,timeout等待时间

Queue.put_nowait(item) 相当Queue.put(item, False)


当进程很多,直接通过process生成大量进程并且又手动去限制进程数量又太过繁琐,这个时候就可以发挥进程池的功效。

进程池就是提供指定数量的进程供用户调用,当有新的请求到达时,如果池还没有满,就创建一个新的进程来执行该请求,如果池中进程已经满了,那么该请求就会等待,直到池中有进程结束才创建新的进程来执行它。


阻塞和没阻塞的差别:阻塞就是要等到回调结果出来,否则当前进程就挂起:非阻塞就是立马执行,不用等待某个进程结束。

非阻塞:

from multiprocessing import Lock, Pool
import time


def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index


if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
pool.apply_async(function, (i,))

print "Started processes"
pool.close()
pool.join()
print "Subprocess done."


阻塞:

from multiprocessing import Lock, Pool
import time


def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index


if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
pool.apply(function, (i,))

print "Started processes"
pool.close()
pool.join()
print "Subprocess done."
terminiate() 结束工作进程,不在处理未完成的任务

join()主进程阻塞,等待子进程的退出。join方法要在close或terminiate后使用。


可以根据apply或apply_async返回的结果进行处理:

from multiprocessing import Lock, Pool
import time


def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index
return index

if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
result = pool.apply_async(function, (i,))
print result.get()
print "Started processes"
pool.close()
pool.join()
print "Subprocess done."

也可以使用map方法,适用于你有一堆数据要处理,每一项都需要经过一个方法来处理。

from multiprocessing import Pool
import requests
from requests.exceptions import ConnectionError


def scrape(url):
try:
print requests.get(url)
except ConnectionError:
print 'Error Occured ', url
finally:
print 'URL ', url, ' Scraped'


if __name__ == '__main__':
pool = Pool(processes=3)
urls = [
'https://www.baidu.com',
'http://www.meituan.com/',
'http://blog.csdn.net/',
'http://xxxyxxx.net'
]
pool.map(scrape, urls)