一、实现运行爬虫模块
我们的目标:根据配置文件信息, 加载爬虫,抓取HTTP代理,进行校验,如果可用,写入到数据库中
根据以下思路:
1.在run_spider.py中,创建RunSpider类
2.提供一个运行爬虫的run方法,作为运行爬虫的入口,实现核心的处理逻辑
- 根据配置信息,获取爬虫对象列表
- 遍历爬虫对象列表,获取爬虫对象,遍历爬虫对象的get_proxies方法,获取HTTP代理
- 检测HTTP代理(HTTP代理检测模块)
- 如果可用,写入数据库(数据库模块)
- 处理异常,防止一个爬虫内部出错了,影响其他的爬虫
3.使用异步来执行每一个爬虫任务,以提高抓取HTTP代理效率
- 在init方法中创建协程池对象
- 把处理一个代理爬虫的代码抽到一个方法
- 使用异步执行这个方法
- 调用协程的join方法,让当前线程等待队列任务的完成
-
4.使用schedule模块,实现每隔一定的时间,执行一次爬取任务
- 定义一个start的类方法
- 创建当前类的对象,调用run方法
- 使用schedule模块,每隔一定的时间,执行当前对象的run方法
所以我们可以在run_spider.py中,创建RunSpider类,进而修改settings.py增加HTTP代理爬虫的配置信息。具体代码如下:
from gevent import monkey
monkey.patch_all()
import importlib
from gevent.pool import Pool
from proxy_pool.settings import SPIDER_LIST, RUN_SPIDER_INTERVAL
from proxy_pool.utils.http_validate import check_proxy
from proxy_pool.utils.mongo_pool import MongoPool
from proxy_pool.utils.log import logger
import schedule
import time
class RunSpider(object):
def __init__(self):
self.mongo_pool = MongoPool()
self.coroutine_pool = Pool()
def get_spider_from_settings(self):
for class_full_name in SPIDER_LIST:
module_name, class_name = class_full_name.rsplit('.', 1)
module = importlib.import_module(module_name)
spider_class = getattr(module, class_name)
spider = spider_class()
yield spider
def process_one_spider(self, spider):
try:
for proxy in spider.run():
check_proxy(proxy)
if proxy.speed != -1:
self.mongo_pool.insert_one(proxy)
except Exception as ex:
logger.warning(ex)
def run(self):
spider_list = self.get_spider_from_settings()
for spider in spider_list:
self.coroutine_pool.apply_async(func=self.process_one_spider, args=(spider,))
self.coroutine_pool.join()
@classmethod
def start(cls):
rs = cls()
rs.run()
schedule.every(RUN_SPIDER_INTERVAL).hours.do(rs.run)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
RunSpider().run()
settings.py代码如下:
# 配置run_spider模块
# ---配置spider
SPIDER_LIST = {
"proxy_pool.core.proxy_spider.xila_spider.XiLaSpider",
"proxy_pool.core.proxy_spider.ip3366_spider.Ip3366Spider",
}
# ---配置schedule的周期
RUN_SPIDER_INTERVAL = 4
二、实现HTTP代理检测模块
之后,我们需要检查HTTP代理可用性,保证代理池中HTTP代理基本可用。这部分基本是大家耳熟能详的:
1.在proxy_test.py中,创建ProxyTester类
2.提供一个run方法,用于处理检测HTTP代理核心逻辑
我们可以先从数据哭中获取所有的HTTP代理,然后从HTTP代理列表中,检查HTTP代理的可用性,如果HTTP代理不可用,让代理分数-1,如果代理分数=0,就从数据库中删除该代理,否则更新该代理,如果代理可用,就恢复该代理的分数,更新到数据库中。
3.为了提高检查的速度,使用异步来执行检测任务
首先,我们需要把要检测的HTTP代理,放到队列中;
其次,把检查一个代理可用性的代码,抽取到一个方法中,从队列中获取HTTP代理,进行检查,检查完毕,调度队列的task_done方法;
然后通过异步回调,使用死循环不断执行这个方法;
最后,开启多个一个异步任务,来处理HTTP代理的检测,可以通过配置文件指定异步数量。
4.使用schedule模块,每隔一定的时间,执行一次检测任务
具体代码如下
from gevent import monkey
monkey.patch_all()
from gevent.pool import Pool
from queue import Queue
from proxy_pool.utils.mongo_pool import MongoPool
from proxy_pool.settings import ASYNC_COUNT, DEFAULT_SCORE, TEST_SPIDER_INTERVAL
from proxy_pool.utils.http_validate import check_proxy
import schedule
import time
class ProxyTest(object):
def __init__(self):
self.mongo_pool = MongoPool()
self.proxy_queue = Queue()
self.coroutine_pool = Pool()
def check_one_proxy(self):
proxy = self.proxy_queue.get()
check_proxy(proxy)
if proxy.speed == -1:
proxy.score -= 1
if proxy.score == 0:
self.mongo_pool.delete_one(proxy)
self.mongo_pool.update_one(proxy)
proxy.score = DEFAULT_SCORE
self.mongo_pool.update_one(proxy)
self.proxy_queue.task_done()
def check_call_back(self, temp):
self.coroutine_pool.apply_async(func=self.check_one_proxy, callback=self.check_call_back)
def run(self):
for proxy in self.mongo_pool.find_all():
self.proxy_queue.put(proxy)
for i in range(ASYNC_COUNT):
self.coroutine_pool.apply_async(func=self.check_one_proxy, callback=self.check_call_back)
self.coroutine_pool.join()
self.proxy_queue.join()
@classmethod
def start(cls):
rs = cls()
rs.run()
schedule.every(TEST_SPIDER_INTERVAL).hours.do(rs.run)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
obj = ProxyTest()
obj.start()
本文转载自https://zhuanlan.zhihu.com/p/156778725