基于协程的异步爬虫

时间:2021-07-16 23:34:24

基于tornado框架的异步爬虫小例子:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# created by fhqplzj on 2017/07/19 下午5:48
import logging
import time
from datetime import timedelta
from urlparse import urljoin, urldefrag

from scrapy import Selector
from tornado.gen import coroutine, Return
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
from tornado.queues import Queue

logging.basicConfig()
base_url = 'http://www.tornadoweb.org/en/stable/'
concurrency = 10


@coroutine
def get_links_from_url(url):
try:
response = yield AsyncHTTPClient().fetch(url)
print 'fetched %s' % url
html = response.body if isinstance(response.body, str) else response.body.decode()
urls = [urljoin(url, urldefrag(new_url)[0]) for new_url in get_links(html)]
except Exception as e:
print 'Exception: %s %s' % (e, url)
raise Return([])
raise Return(urls)


def get_links(html):
return Selector(text=html).xpath('//a/@href').extract()


@coroutine
def main():
q = Queue()
start = time.time()
# fetching: 已经抓的和正在抓的
# fetched: 已经抓的
fetching, fetched = set(), set()

@coroutine
def fetch_url():
current_url = yield q.get()
try:
if current_url in fetching:
return
print 'fetching %s' % current_url
fetching.add(current_url)
urls = yield get_links_from_url(current_url)
fetched.add(current_url)
for new_url in urls:
if new_url.startswith(base_url):
yield q.put(new_url)
finally:
q.task_done()

@coroutine
def worker():
while True:
yield fetch_url()

q.put(base_url)
for _ in range(concurrency):
worker()
yield q.join(timeout=timedelta(seconds=300))
assert fetching == fetched
print 'Done in %d seconds, fetched %s URLs.' % (time.time() - start, len(fetched))


if __name__ == '__main__':
IOLoop.current().run_sync(main)