其实scrapy想要玩得好,还是需要大量全栈知识的。scrapy 被比喻为爬虫里的django,框架和django类似。
安装:
Linux/mac
- pip3 install scrapy
Windows:
- 安装twsited
a. pip3 install wheel
b. 下载twisted http://www.lfd.uci.edu/~gohlke/pythonlibs/#twisted
c. 进入下载目录,执行 pip3 install Twisted-xxxxx.whl
- 安装scrapy
d. pip3 install scrapy -i http://pypi.douban.com/simple --trusted-host pypi.douban.com
- 安装pywin32
e. pip3 install pywin32 -i http://pypi.douban.com/simple --trusted-host pypi.douban.com
快速使用 Scrapy:
创建project:
scrapy startproject test
cd test
scrapy genspider chouti chouti.com
# 开爬命令
scrapy crawl chouti --nolog
爬虫文件 chouti.py
# -*- coding: utf-8 -*-
import scrapy
# import sys,os
# sys.stdout=io.TextIOWrapper(sys.stdout.buffer,encoding='gb18030') # 如果windows cmd有乱码,加此行 from bs4 import BeautifulSoup
from scrapy.selector import HtmlXPathSelector
from scrapy.http import Request
from ..items import KillerItem class ChoutiSpider(scrapy.Spider):
name = 'chouti'
allowed_domains = ['chouti.com']
start_urls = ['https://dig.chouti.com/all/discovery/sh/1'] def parse(self, response):
# print(response.text)
"""
当起始URL下载完毕后,自动执行parse函数:response封装了响应相关的所有内容。
:param response:
:return:
""" hxs = HtmlXPathSelector(response=response) # 去下载的页面中:找新闻
items = hxs.xpath("//div[@id='content-list']/div[@class='item']")
for item in items:
# 从当前位置开始找子孙.// //从根开始找子孙 /只找儿子 @ 后面是属性 a[1] 第一个A标签
href = item.xpath('.//div[@class="part1"]//a[1]/@href').extract_first() # a[1]/text() 拿A标签下的文本 .extract_first() 解析第一个文本
text = item.xpath('.//div[@class="part1"]//a[1]/text()').extract_first()
item = KillerItem(title=text.strip(), href=href)
yield item # 固定写法 交给pipelines # 抓取页码
pages = hxs.xpath('//div[@id="page-area"]//a[@class="ct_pagepa"]/@href').extract() # 解析全部
for page_url in pages:
page_url = "https://dig.chouti.com" + page_url
yield Request(url=page_url, callback=self.parse) # 回调函数 调用解析器 '''
# 通过自定义的 start_requests 函数来指定不同的回调函数
def start_requests(self):
for url in self.start_urls:
yield Request(url=url, callback=self.parse2) # 另一种方式:返回一个列表
def start_requests(self):
req_list = []
for url in self.start_urls:
req_list.append(Request(url=url, callback=self.parse2))
return req_list def parse2(self):
pass '''
items.py
# -*- coding: utf-8 -*- # Define here the models for your scraped items
#
# See documentation in:
# https://doc.scrapy.org/en/latest/topics/items.html import scrapy class KillerItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
title = scrapy.Field()
href = scrapy.Field()
pipelines.py
# -*- coding: utf-8 -*- # Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html # 需要先开启 settings 中注册
class KillerPipeline(object):
def process_item(self, item, spider):
self.f.write(item['title'] + '\n')
self.f.write(item['href'] + '\n')
self.f.flush() return item def open_spider(self, spider):
"""
爬虫开始执行时,调用
:param spider:
:return:
"""
self.f = open('url.log', 'a', encoding='utf-8') def close_spider(self, spider):
"""
爬虫关闭时,被调用
:param spider:
:return:
"""
self.f.close() class DBPipeline(object): def process_item(self, item, spider):
print('数据库', item) return item def open_spider(self, spider):
"""
爬虫开始执行时,调用
:param spider:
:return:
"""
print('打开数据') def close_spider(self, spider):
"""
爬虫关闭时,被调用
:param spider:
:return:
"""
print('关闭数据库')
settings.py 中加入:
# 优先级0~1000 数字越小,越先执行
ITEM_PIPELINES = {
'killer.pipelines.KillerPipeline': 300,
}
最后开爬,得到结果。
更多可参考:https://www.cnblogs.com/wupeiqi/articles/6229292.html
后面会用到 scrapy-redis组件,结合redis来实现
先补充个基本的知识: 队列和栈 queue & stack
#!/usr/bin/env python
# coding:utf-8 # 定义序列
lst = [] def enpush(i):
lst.append(i)
print(i) def enpop():
if (len(lst) == 0):
print("队列为空,无法出队")
else:
print("出队元素为:", lst.pop(0)) # 右入左出
enpush(10)
enpush(20)
enpush(2)
print("当前列表为:", lst)
enpop()
enpop()
enpop()
enpop() # 定义序列
lis = [] def pop():
if (len(lis) == 0):
print("栈为空", "无法出栈")
else:
print("此次出栈元素:", lis.pop()) def push(i):
lis.append(i) # 右入右出
push(1)
push(2)
push(3)
print("当前栈内:",lis)
pop()
pop()
pop()
pop()
redis的简单操作:
import redis conn = redis.Redis(host='127.0.0.1',port=6379) # conn.set('k1','v1')
# conn.get('k1') # 1. 集合的操作
'''
v =conn.sadd('test1','aaa') # 集合名称,数据 返回1成功 0失败
print(v) conn.sadd('test1','bbb','ccc','ddd') a = conn.scard('test1') # 集合中的数量
print(a) b = conn.smembers('test1')
print(b)
''' # 2. 列表的操作
# 左插入
"""
conn.lpush('users','alex','bob')
conn.lpush('users','carry','david') res = conn.lrange('users',0,10)
print(res)
# 右插入
conn.rpush('users','Emilly')
# 左弹出
t = conn.lpop('users')
print(t)
""" w = conn.keys()
print(w)
有个redis管理工具:redis-desktop-manager 网上可以下载到。
在settings中启用 scrapy-redis 具体可参考武神的链接:https://www.cnblogs.com/wupeiqi/articles/6229292.html
只用redis的去重规则:
################连接redis的信息########################
REDIS_HOST = 'localhost' # 主机名
REDIS_PORT = 6379 # 端口
# REDIS_URL = 'redis://user:pass@hostname:9001' # 连接URL(优先于以上配置)
REDIS_PARAMS = {} # Redis连接参数
# REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定连接Redis的Python模块 不写则默认:redis.StrictRedis
REDIS_ENCODING = "utf-8" # 自定义去重规则 利用redis的内部实现,只需加下面一行即可
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
REDIS_PARAMS = {} 默认:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
如果还想用 redis的调度器,则再加上以下配置
######## 以下参数会将任务和去重规则都放入redis中
# 引擎来执行:自定义调度器 既使用调度器又使用去重规则
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 默认使用优先级队列(默认 广度优先),其他:PriorityQueue(有序集合 广度优先),FifoQueue(列表 广度优先)、LifoQueue(列表 深度优先)
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"
SCHEDULER_PERSIST = True
SCHEDULER_FLUSH_ON_START = False
# SCHEDULER_IDLE_BEFORE_CLOSE = 10
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
DUPEFILTER_DEBUG = False
当然,也有人只用redis调度器,而使用scrapy的去重规则,那么上面的某行配置则改成:
SCHEDULER_DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter'
使用redis pipeline做持久化,则在settings中加入:
ITEM_PIPELINES = {
#启用 redis pipeline 持久化
'scrapy_redis.pipelines.RedisPipeline':301,
}
此时,爬虫中的 yield item 将进入redis保存
import scrapy
import sys,io
from scrapy.selector import Selector,HtmlXPathSelector
from scrapy.http import Request
from ..items import S3ScrapyTestItem class ChoutiSpider(scrapy.Spider):
name = 'chouti_redis'
allowed_domains = ['chouti.com']
start_urls = ['https://dig.chouti.com/'] def parse(self, response):
hxs1 = Selector(response=response).xpath('//div[@id="content-list"]/div[@class="item"]') for i in hxs1:
# 取 A 标签内容与链接 #
href = i.xpath('.//a[@class="show-content color-chag"]/@href').extract_first()
print(href)
title = i.xpath('.//a[@class="show-content color-chag"]/text()').extract_first().strip()
if not title:
# 下面方法虽然取出了包含<span>的内容,但是有太多空格
title = i.xpath('.//a[@class="show-content color-chag"]')
title = title.xpath('string(.)').extract_first().strip()
# 去掉空白
title = title.replace("\n", '').replace("\t", '').replace(" ", '')
# 所有的子节点文本
# title = "".join(i.xpath('.//a[@class="show-content color-chag"]//text()').extract().strip())
print(title, '\r\n')
# items 职责是格式化,包装成对象
yield S3ScrapyTestItem(title=title, href=href)
如果想要使用redis来设置起始 urls ,除了设置redis连接外,还要在配置文件中增加:
REDIS_START_URLS_BATCH_SIZE = 1
REDIS_START_URLS_AS_SET = False # True是使用集合 False是使用列表
爬虫中主类继承 RedisSpider 不再写 start_urls
import scrapy
import sys,io
from scrapy.selector import Selector,HtmlXPathSelector
from scrapy.http import Request
from ..items import S3ScrapyTestItem
from scrapy_redis.spiders import RedisSpider # 继承RedisSpider 将会去redis中取出 start_ursl
class ChoutiSpider(RedisSpider):
name = 'chouti_redis'
allowed_domains = ['chouti.com'] def parse(self, response):
hxs1 = Selector(response=response).xpath('//div[@id="content-list"]/div[@class="item"]') for i in hxs1:
# 取 A 标签内容与链接 #
href = i.xpath('.//a[@class="show-content color-chag"]/@href').extract_first()
print(href)
title = i.xpath('.//a[@class="show-content color-chag"]/text()').extract_first().strip()
if not title:
# 下面方法虽然取出了包含<span>的内容,但是有太多空格
title = i.xpath('.//a[@class="show-content color-chag"]')
title = title.xpath('string(.)').extract_first().strip()
# 去掉空白
title = title.replace("\n", '').replace("\t", '').replace(" ", '')
# 所有的子节点文本
# title = "".join(i.xpath('.//a[@class="show-content color-chag"]//text()').extract().strip())
print(title, '\r\n')
# 配置文件中'scrapy_redis.pipelines.RedisPipeline':301, 将决定下面的结果存入 redis
yield S3ScrapyTestItem(title=title, href=href)
此时运行scrapy crawl chouti_redis 后,爬虫将一直处于待命状态
一旦redis中对应的键中出现数据,爬虫将自动开始动作。或者也可以先设置好redis中的start_urls 再运行爬虫。
简单地在 redis 中插入一个url, 运行py
import redis conn = redis.Redis(host='localhost',port=6379) # 起始URL的key:
conn.lpush("chouti_redis:start_urls","https://dig.chouti.com/all/hot/recent/1")
再回去观看命令行,会发现爬虫已经开工。
使用大文件方式爬取图片
在练习的过程中,一直没能下载到图片,直到发现,我把图片的地址从https 改成http立刻好了。
pipelines 中增加两个类,详见下面代码:
from twisted.internet import defer, reactor, protocol
from twisted.web.client import Agent, getPage, ResponseDone, PotentialDataLoss # 增加下面两个类:
class _ResponseReader(protocol.Protocol): def __init__(self, finished, txresponse, file_name):
self._finished = finished
self._txresponse = txresponse
self._bytes_received = 0
self.filename = "img/%s" % file_name self.f = open(self.filename, mode='wb') def dataReceived(self, bodyBytes):
self._bytes_received += len(bodyBytes) # 一点一点地下载
self.f.write(bodyBytes) self.f.flush() def connectionLost(self, reason):
if self._finished.called:
return
if reason.check(ResponseDone):
# 下载完成
self._finished.callback((self._txresponse, 'success'))
elif reason.check(PotentialDataLoss):
# 下载部分
self._finished.callback((self._txresponse,'partial'))
else:
# 下载异常
self._finished.errback(reason) self.f.close() # 大文件下载
class BigfilePipeline(object):
def process_item(self, item, spider):
# 创建一个下载文件任务
if item['type'] == 'file':
# print('文件名是:',item['img_src'])
agent = Agent(reactor) # 实例化Agent
d = agent.request(
method=b'GET',
uri=bytes(item['img_src'], encoding='ascii')
)
# print('图片地址',bytes(item['img_src'], encoding='ascii'))
# 当文件开始下载,自动执行self._cb_bodyready
d.addCallback(self._cb_bodyready, file_name=item['file_name'])
return d
else:
return item def _cb_bodyready(self, txresponse, file_name):
# 创建 Deferred 对象,控制直到下载完成后,再关闭连接, 如果没有defer,则下载不成功。
d = defer.Deferred() # 目的是等待下载完成
d.addBoth(self.download_result)
# 没有执行到这里
txresponse.deliverBody(_ResponseReader(d, txresponse, file_name))
return d def download_result(self, response):
pass
settings中注册 BigfilePipeline
ITEM_PIPELINES = {
's3_scrapy_test.pipelines.S3ScrapyTestPipeline': 306, 's3_scrapy_test.pipelines.BigfilePipeline': 304, 'scrapy_redis.pipelines.RedisPipeline':307,
}
爬虫文件:注意爬图片时的 item_obj的内容
import scrapy
from scrapy.selector import Selector, HtmlXPathSelector
from scrapy.http import Request
from ..items import S3ScrapyTestItem class Chouti2Spider(scrapy.Spider):
name = 'chouti2'
allowed_domains = ['chouti.com']
start_urls = ['https://dig.chouti.com/'] # 设置数组用来存储 urls 并且手动去重
# 其实 Request 已经自带去重选项
visited_urls = set() def parse(self, response):
"""
response.meta = {'depth':''} # 深度
"""
# 取包含每条新闻的 div
hxs1 = Selector(response=response).xpath('//div[@id="content-list"]/div[@class="item"]') for i in hxs1:
# 取 A 标签内容与链接 #
href = i.xpath('.//a[@class="show-content color-chag"]/@href').extract_first()
print(href)
title = i.xpath('.//a[@class="show-content color-chag"]/text()').extract_first().strip()
if not title:
# 下面方法虽然取出了包含<span>的内容,但是有太多空格
title = i.xpath('.//a[@class="show-content color-chag"]')
title = title.xpath('string(.)').extract_first().strip()
# 去掉空白
title = title.replace("\n",'').replace("\t",'').replace(" ",'') # 所有的子节点文本
# title = "".join(i.xpath('.//a[@class="show-content color-chag"]//text()').extract().strip())
print(title, '\r\n') # 爬图片
img_src = i.xpath('.//div[@class="news-pic"]/img[@alt="抽屉新热榜"]/@original').extract_first()
img_name = i.xpath('.//div[@class="news-pic"]/img[@alt="抽屉新热榜"]/@lang').extract_first()
img_src = "http:%s" %(img_src)
img_name = "%s.jpg" %(img_name) # items 职责是格式化,包装成对象
item_obj = S3ScrapyTestItem(title=title,href=href,file_name=img_name,img_src=img_src,type='file') # pipelines 是用来持久化 别忘了在settings里注册
# 将item对象传递给pipelines处理
yield item_obj '''取页码链接'''
# hxs = Selector(response=response).xpath('//div[@id="dig_lcpage"]//a/@href').extract()
# 使用 starts-with 查找
# hxs = Selector(response=response).xpath('//a[starts-with(@href,"/all/hot/recent/")]/@href').extract()
# 使用正则
hxs2 = Selector(response=response).xpath('//a[re:test(@href,"/all/hot/recent/\d+")]/@href').extract() for i in hxs2:
# 使用md5转变成定长
# md5_url = self.md5(i) # if md5_url in self.visited_urls: if i in self.visited_urls:
# print("已经存在", i)
pass
else:
self.visited_urls.add(i)
# print(i)
i = "https://dig.chouti.com%s" %i # 将要访问的新 url 添加到调度器
yield Request(url=i, callback=self.parse) # 如果重写 start_requests 则可以指定最开始处理请求的方法
# settings 中指定 DEPTH_LIMIT 表示深度 def md5(self,url):
import hashlib
obj = hashlib.md5()
obj.update(bytes(url,encoding='utf-8'))
return obj.hexdigest()
items中:
import scrapy class S3ScrapyTestItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
title = scrapy.Field()
href = scrapy.Field()
img_src = scrapy.Field()
file_name = scrapy.Field()
type = scrapy.Field()
最后执行
scrapy crawl chouti2 --nolog
就会发现图片已经进入了 img目录。